diff --git a/PySONIC/multicomp/benchmarks.py b/PySONIC/multicomp/benchmarks.py index b92d2cd..4e11a39 100644 --- a/PySONIC/multicomp/benchmarks.py +++ b/PySONIC/multicomp/benchmarks.py @@ -1,207 +1,269 @@ # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Email: theo.lemaire@epfl.ch # @Date: 2021-05-14 19:42:00 # @Last Modified by: Theo Lemaire -# @Last Modified time: 2021-05-25 13:52:46 +# @Last Modified time: 2021-05-25 20:23:43 import os import numpy as np import matplotlib.pyplot as plt -from ..core import NeuronalBilayerSonophore, PulsedProtocol, Batch -from ..utils import si_format, rmse +from ..core import NeuronalBilayerSonophore, PulsedProtocol, Batch, AcousticDrive, AcousticDriveArray +from ..utils import si_format, rmse, rescale from ..neurons import passiveNeuron from ..postpro import gamma from ..plt import harmonizeAxesLimits, hideSpines, hideTicks, addYscale, XYMap from .coupled_nbls import CoupledSonophores class Benchmark: def __init__(self, a, nnodes, outdir=None): self.a = a self.nnodes = nnodes self.outdir = outdir if not os.path.isdir(self.outdir): os.mkdir(self.outdir) def pdict(self): return { 'a': f'{self.a * 1e9:.0f} nm', 'nnodes': f'{self.nnodes} nodes', } def pstr(self): l = [] for k, v in self.pdict().items(): if k == 'nnodes': l.append(v) else: l.append(f'{k} = {v}') return ', '.join(l) def __repr__(self): return f'{self.__class__.__name__}({self.pstr()})' def code(self): s = self.__repr__() for k in ['/', '(', ',']: s = s.replace(k, '_') for k in ['=', ' ', ')']: s = s.replace(k, '') return s def runSims(self, model, drives, tstim, covs): ''' Run full and sonic simulations for a specific combination drives, pulsed protocol and coverage fractions, harmonize outputs and compute normalized charge density profiles. ''' Fdrive = drives[0].f assert all(x.f == Fdrive for x in drives), 'frequencies do not match' assert len(covs) == model.nnodes, 'coverages do not match model dimensions' assert len(drives) == model.nnodes, 'drives do not match model dimensions' # If not provided, compute stimulus duration from model passive properties min_ncycles = 10 ntaumax_conv = 5 if tstim is None: tstim = max(ntaumax_conv * model.taumax, min_ncycles / Fdrive) # Recast stimulus duration as finite multiple of acoustic period tstim = int(np.ceil(tstim * Fdrive)) / Fdrive # s # Pulsed protocol pp = PulsedProtocol(tstim, 0) # Simulate/Load with full and sonic methods data, meta = {}, {} for method in ['full', 'sonic']: data[method], meta[method] = model.simAndSave( drives, pp, covs, method, outdir=self.outdir, overwrite=False, minimize_output=True) # Cycle-average full solution and interpolate sonic solution along same time vector data['cycleavg'] = data['full'].cycleAveraged(1 / Fdrive) data['sonic'] = data['sonic'].interpolate(data['cycleavg'].time) # Compute normalized charge density profiles and add them to dataset for simkey, simdata in data.items(): for nodekey, nodedata in simdata.items(): nodedata['Qnorm'] = nodedata['Qm'] / model.refpneuron.Cm0 * 1e3 # mV # Return dataset return data, meta + def getQnorms(self, data, k, cut_bounds=True): + ''' Get node-specific list of cycle-averaged and sonic normalized charge vectors + (with, by default, discarding of bounding artefact elements). + ''' + Qnorms = np.array([data[simkey][k]['Qnorm'].values for simkey in ['cycleavg', 'sonic']]) + if cut_bounds: + Qnorms = Qnorms[:, 1:-1] + return Qnorms + def computeGamma(self, data, *args): - ''' Perform per-node gamma evaluation on charge density profiles. ''' + ''' Evaluate per-node gamma on charge density profiles. ''' gamma_dict = {} resolution = list(data['cycleavg'].values())[0].dt for k in data['cycleavg'].keys(): - Qnorms = [data[simkey][k]['Qnorm'].values for simkey in ['cycleavg', 'sonic']] - gamma_dict[k] = gamma(*[x[1:-1] for x in Qnorms], *args, resolution) - # Discard 1st and last indexes of evaluation - gamma_dict[k] = np.hstack(([np.nan], gamma_dict[k], [np.nan])) + # Get normalized charge vectors (discarding 1st and last indexes) and compute gamma + gamma_dict[k] = gamma(*self.getQnorms(data, k), *args, resolution) + # Pad gamma with nan on each side to ensure size consistency with time vector + gamma_dict[k] = np.pad(gamma_dict[k], 1, mode='constant', constant_values=(np.nan,)) return gamma_dict def computeRMSE(self, data): - ''' Perform per-node RMSE evaluation on charge density profiles. ''' - RMSE_dict = {} + ''' Evaluate per-node RMSE on charge density profiles. ''' + return {k: rmse(*self.getQnorms(data, k)) for k in data['cycleavg'].keys()} + + def computeSteadyStateDeviation(self, data): + ''' Evaluate per-node steady-state absolute deviation on charge density profiles. ''' + return {k: np.abs(np.squeeze(np.diff(self.getQnorms(data, k), axis=0)))[-1] + for k in data['cycleavg'].keys()} + + def computeAreaUnderCurveDeviation(self, data): + ''' Evaluate per-node steady-state absolute deviation on charge density profiles. ''' + d = {} for k in data['cycleavg'].keys(): - Qnorms = [data[simkey][k]['Qnorm'].values for simkey in ['cycleavg', 'sonic']] - RMSE_dict[k] = rmse(*[x[1:-1] for x in Qnorms]) - return RMSE_dict + y = self.getQnorms(data, k) + # Rescale signals linearly between 0 and 100 + ynorms = np.array([rescale(yy) for yy in y]) * 100 + # Compute absolute difference signal + ydiff = np.squeeze(np.abs(np.diff(ynorms, axis=0))) + # Compute area under the curve of difference signal (in %.s) + d[k] = np.sum(ydiff) * data['cycleavg'][k].dt + return d def computeDivergence(self, data, eval_mode, *args): ''' Compute divergence according to given eval_mode. ''' if eval_mode == 'rmse': div_dict = self.computeRMSE(data) elif eval_mode == 'gamma': div_dict = {k: np.nanmax(v) for k, v in self.computeGamma(data, *args).items()} + elif eval_mode == 'ss': + div_dict = self.computeSteadyStateDeviation(data) + elif eval_mode == 'Anorm': + div_dict = self.computeAreaUnderCurveDeviation(data) return max(div_dict.values()) def plotQnorm(self, ax, data): ''' Plot normalized charge density signals on an axis. ''' markers = {'full': '-', 'cycleavg': '--', 'sonic': '-'} alphas = {'full': 0.5, 'cycleavg': 1., 'sonic': 1.} for simkey, simdata in data.items(): for i, (nodekey, nodedata) in enumerate(simdata.items()): - ax.plot(nodedata.time * 1e3, nodedata['Qnorm'], markers[simkey], c=f'C{i}', + y = nodedata['Qnorm'].values + y[-1] = y[-2] + ax.plot(nodedata.time * 1e3, y, markers[simkey], c=f'C{i}', alpha=alphas[simkey], label=f'{simkey} - {nodekey}', clip_on=False) def plotGamma(self, ax, data, *gamma_args): gamma_dict = self.computeGamma(data, *gamma_args) tplt = list(data['cycleavg'].values())[0].time * 1e3 for i, (nodekey, nodegamma) in enumerate(gamma_dict.items()): ax.plot(tplt, nodegamma, c=f'C{i}', label=nodekey, clip_on=False) ax.axhline(1, linestyle='--', c='k') class PassiveBenchmark(Benchmark): def __init__(self, a, nnodes, Cm0, ELeak, **kwargs): super().__init__(a, nnodes, **kwargs) self.Cm0 = Cm0 self.ELeak = ELeak def pdict(self): return { **super().pdict(), 'Cm0': f'{self.Cm0 * 1e2:.1f} uF/cm2', 'ELeak': f'{self.ELeak} mV', } def getModelAndRunSims(self, drives, covs, taum, tauax): ''' Create passive model for a combination of time constants. ''' gLeak = self.Cm0 / taum ga = self.Cm0 / tauax pneuron = passiveNeuron(self.Cm0, gLeak, self.ELeak) model = CoupledSonophores([ NeuronalBilayerSonophore(self.a, pneuron) for i in range(self.nnodes)], ga) return self.runSims(model, drives, None, covs) def runSimsOverTauSpace(self, drives, covs, taum_range, tauax_range, mpi=False): ''' Run simulations over 2D time constant space. ''' queue = [[drives, covs] + x for x in Batch.createQueue(taum_range, tauax_range)] batch = Batch(self.getModelAndRunSims, queue) # batch.printQueue(queue) output = batch.run(mpi=mpi) results = [x[0] for x in output] # removing meta return np.reshape(results, (taum_range.size, tauax_range.size)).T def plotSignalsOverTauSpace(self, taum_range, tauax_range, results, *gamma_args, fs=10): ''' Plot signals over 2D time constants space. ''' plt_gamma = len(gamma_args) > 0 title = 'gamma evaluation' if plt_gamma else 'normalized charge densities' if plt_gamma: pltfunc = lambda *args: self.plotGamma(*args, *gamma_args) else: pltfunc = self.plotQnorm yunit = ''if plt_gamma else 'mV' fig, axes = plt.subplots(taum_range.size, tauax_range.size) fig.suptitle(f'passive neuron - {title}', fontsize=fs + 2) fig.supxlabel('taum', fontsize=fs + 2) fig.supylabel('tauax', fontsize=fs + 2) i = 0 tmin = np.inf for i, axrow in enumerate(axes[::-1]): for j, ax in enumerate(axrow): pltfunc(ax, results[i, j]) hideSpines(ax) hideTicks(ax) ax.margins(0) tmin = min(tmin, np.ptp(ax.get_xlim())) # addXscale(ax, 0, 0, unit='ms', fmt='.2f', fs=fs) for axrow in axes[::-1]: for ax in axrow: trans = (ax.transData + ax.transAxes.inverted()) xpoints = [trans.transform([x, 0])[0] for x in [0, tmin]] ax.plot(xpoints, [-0.05] * 2, c='k', lw=2, transform=ax.transAxes, clip_on=False) harmonizeAxesLimits(axes, dim='y') addYscale(axes[-1, -1], 0.05, 0, unit=yunit, fmt='.0f', fs=fs) for ax, tauax in zip(axes[-1, :], tauax_range): ax.set_xlabel(f'{si_format(tauax)}s', labelpad=15, fontsize=fs + 2) for ax, taum in zip(axes[:, 0], taum_range[::-1]): ax.set_ylabel(f'{si_format(taum)}s', labelpad=15, fontsize=fs + 2) fig.tight_layout() return fig + + +class FiberBenchmark(Benchmark): + + def __init__(self, a, nnodes, pneuron, ga, **kwargs): + super().__init__(a, nnodes, **kwargs) + self.model = CoupledSonophores([ + NeuronalBilayerSonophore(self.a, pneuron) for i in range(self.nnodes)], ga) + + def pdict(self): + return { + **super().pdict(), + 'ga': self.model.gastr, + 'pneuron': self.model.refpneuron, + } + + def getModelAndRunSims(self, Fdrive, tstim, covs, A1, A2): + ''' Create passive model for a combination of time constants. ''' + drives = AcousticDriveArray([AcousticDrive(Fdrive, A1), AcousticDrive(Fdrive, A2)]) + return self.runSims(self.model, drives, tstim, covs) + + def runSimsOverAmplitudeSpace(self, Fdrive, tstim, covs, A_range, mpi=False): + ''' Run simulations over 2D time constant space. ''' + amps = [] + for i, A in enumerate(A_range): + for j in range(i, len(A_range)): + amps.append([A, A_range[j]]) + queue = [[Fdrive, tstim, covs] + x for x in amps] + Batch.printQueue(queue) + batch = Batch(self.getModelAndRunSims, queue) + output = batch.run(mpi=mpi) + results = [x[0] for x in output] # removing meta + return np.reshape(results, (A_range.size, A_range.size)).T