diff --git a/PySONIC/multicomp/benchmarks.py b/PySONIC/multicomp/benchmarks.py index 0fdcdf4..b30d35e 100644 --- a/PySONIC/multicomp/benchmarks.py +++ b/PySONIC/multicomp/benchmarks.py @@ -1,444 +1,444 @@ # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Email: theo.lemaire@epfl.ch # @Date: 2021-05-14 19:42:00 # @Last Modified by: Theo Lemaire -# @Last Modified time: 2021-06-19 14:13:30 +# @Last Modified time: 2021-06-20 16:27:31 import os import numpy as np import matplotlib.pyplot as plt from ..core import NeuronalBilayerSonophore, PulsedProtocol, Batch from ..core.drives import AcousticDrive, AcousticDriveArray from ..utils import si_format, rmse, rescale, logger, bounds from ..neurons import passiveNeuron from ..postpro import gamma from ..plt import harmonizeAxesLimits, hideSpines, hideTicks, addYscale, addXscale from .coupled_nbls import CoupledSonophores class Benchmark: tsparse_bounds = (1, -2) def __init__(self, a, nnodes, outdir=None, nodecolors=None): self.a = a self.nnodes = nnodes self.outdir = outdir if not os.path.isdir(self.outdir): os.mkdir(self.outdir) if nodecolors is None: nodecolors = plt.get_cmap('Dark2').colors self.nodecolors = nodecolors def pdict(self): return { 'a': f'{self.a * 1e9:.0f} nm', 'nnodes': f'{self.nnodes} nodes', } def pstr(self): l = [] for k, v in self.pdict().items(): if k == 'nnodes': l.append(v) else: l.append(f'{k} = {v}') return ', '.join(l) def __repr__(self): return f'{self.__class__.__name__}({self.pstr()})' def code(self): s = self.__repr__() for k in ['/', '(', ',']: s = s.replace(k, '_') for k in ['=', ' ', ')']: s = s.replace(k, '') return s def runSims(self, model, drives, tstim, covs): ''' Run full and sonic simulations for a specific combination drives, pulsed protocol and coverage fractions, and harmonize outputs. ''' Fdrive = drives[0].f assert all(x.f == Fdrive for x in drives), 'frequencies do not match' assert len(covs) == model.nnodes, 'coverages do not match model dimensions' assert len(drives) == model.nnodes, 'drives do not match model dimensions' # If not provided, compute stimulus duration from model passive properties min_ncycles = 10 ntaumax_conv = 5 if tstim is None: tstim = max(ntaumax_conv * model.taumax, min_ncycles / Fdrive) # Recast stimulus duration as finite multiple of acoustic period tstim = int(np.ceil(tstim * Fdrive)) / Fdrive # s # Pulsed protocol pp = PulsedProtocol(tstim, 0) # Simulate/Load with full and sonic methods data, meta = {}, {} for method in ['full', 'sonic']: data[method], meta[method] = model.simAndSave( drives, pp, covs, method, outdir=self.outdir, - overwrite=False, minimize_output=True) + overwrite=False, full_output=False) # Cycle-average full solution and interpolate sonic solution along same time vector data['cycleavg'] = data['full'].cycleAveraged(1 / Fdrive) data['sonic'] = data['sonic'].interpolate(data['cycleavg'].time) # # Compute normalized charge density profiles and add them to dataset # for simkey, simdata in data.items(): # for nodekey, nodedata in simdata.items(): # nodedata['Qnorm'] = nodedata['Qm'] / model.refpneuron.Cm0 * 1e3 # mV # Return dataset return data, meta def getTime(self, data): ''' Get time vector compatible with cycle-averaged and sonic charge density vectors (with, by default, discarding of bounding artefact elements). ''' return data['cycleavg'].time[self.tsparse_bounds[0]:self.tsparse_bounds[1]] def getCharges(self, data, k, cut_bounds=True): ''' Get node-specific list of cycle-averaged and sonic charge density vectors (with, by default, discarding of bounding artefact elements). ''' Qms = np.array([data[simkey][k]['Qm'].values for simkey in ['cycleavg', 'sonic']]) if cut_bounds: Qms = Qms[:, self.tsparse_bounds[0]:self.tsparse_bounds[1]] return Qms def computeRMSE(self, data): ''' Evaluate per-node RMSE on charge density profiles. ''' return {k: rmse(*self.getCharges(data, k)) for k in data['cycleavg'].keys()} def eval_funcs(self): return { 'rmse': (self.computeRMSE, 'nC/cm2') } def computeDivergence(self, data, eval_mode, *args): ''' Compute divergence according to given eval_mode, and return max value across nodes. ''' divs = list(self.eval_funcs()[eval_mode][0](data, *args).values()) if any(np.isnan(x) for x in divs): return np.nan return max(divs) def plotQm(self, ax, data): ''' Plot charge density signals on an axis. ''' markers = {'full': '-', 'cycleavg': '--', 'sonic': '-'} alphas = {'full': 0.5, 'cycleavg': 1., 'sonic': 1.} # tplt = TimeSeriesPlot.getTimePltVar('ms') # yplt = self.model.refpneuron.getPltVars()['Qm/Cm0'] # mode = 'details' for simkey, simdata in data.items(): for i, (nodekey, nodedata) in enumerate(simdata.items()): y = nodedata['Qm'].values y[-1] = y[-2] ax.plot(nodedata.time * 1e3, y * 1e5, markers[simkey], c=self.nodecolors[i], alpha=alphas[simkey], label=f'{simkey} - {nodekey}') # if simkey == 'cycleavg': # TimeSeriesPlot.materializeSpikes(ax, nodedata, tplt, yplt, c, mode) def plotSignalsOver2DSpace(self, gridxkey, gridxvec, gridxunit, gridykey, gridyvec, gridyunit, results, pltfunc, *args, yunit='', title=None, fs=10, flipud=True, fliplr=False): ''' Plot signals over 2D space. ''' # Create grid-like figure fig, axes = plt.subplots(gridxvec.size, gridyvec.size, figsize=(6, 5)) # Re-arrange axes and labels if flipud/fliplr option is set supylabel_args = {} supxlabel_args = {'y': 1.0, 'va': 'top'} if flipud: axes = axes[::-1] supxlabel_args = {} if fliplr: axes = axes[:, ::-1] supylabel_args = {'x': 1.0, 'ha': 'right'} # Add title and general axes labels if title is not None: fig.suptitle(title, fontsize=fs + 2) fig.supxlabel(gridxkey, fontsize=fs + 2, **supxlabel_args) fig.supylabel(gridykey, fontsize=fs + 2, **supylabel_args) # Loop through the axes and plot results, while storing time ranges i = 0 tranges = [] for i, axrow in enumerate(axes): for j, ax in enumerate(axrow): hideSpines(ax) hideTicks(ax) ax.margins(0) if results[i, j] is not None: pltfunc(ax, results[i, j], *args) tranges.append(np.ptp(ax.get_xlim())) if len(np.unique(tranges)) > 1: # If more than one time range, add common x-scale to all axes tmin = min(tranges) for axrow in axes[::-1]: for ax in axrow: trans = (ax.transData + ax.transAxes.inverted()) xpoints = [trans.transform([x, 0])[0] for x in [0, tmin]] ax.plot(xpoints, [-0.05] * 2, c='k', lw=2, transform=ax.transAxes) else: # Otherwise, add x-scale only to axis opposite to origin side = 'top' if flipud else 'bottom' addXscale(axes[-1, -1], 0, 0.05, unit='ms', fmt='.0f', fs=fs, side=side) # Harmonize y-limits across all axes, and add y-scale to axis opposite to origin harmonizeAxesLimits(axes, dim='y') side = 'left' if fliplr else 'right' if yunit is not None: addYscale(axes[-1, -1], 0.05, 0, unit=yunit, fmt='.0f', fs=fs, side=side) # Set labels for xvec and yvec values along the two figure grid dimensions for ax, x in zip(axes[0, :], gridxvec): ax.set_xlabel(f'{si_format(x)}{gridxunit}', labelpad=15, fontsize=fs + 2) if not flipud: ax.xaxis.set_label_position('top') for ax, y in zip(axes[:, 0], gridyvec): if fliplr: ax.yaxis.set_label_position('right') ax.set_ylabel(f'{si_format(y)}{gridyunit}', labelpad=15, fontsize=fs + 2) # Return figure object return fig class PassiveBenchmark(Benchmark): def __init__(self, a, nnodes, Cm0, ELeak, **kwargs): super().__init__(a, nnodes, **kwargs) self.Cm0 = Cm0 self.ELeak = ELeak def pdict(self): return { **super().pdict(), 'Cm0': f'{self.Cm0 * 1e2:.1f} uF/cm2', 'ELeak': f'{self.ELeak} mV', } def getModelAndRunSims(self, drives, covs, taum, tauax): ''' Create passive model for a combination of time constants. ''' gLeak = self.Cm0 / taum ga = self.Cm0 / tauax pneuron = passiveNeuron(self.Cm0, gLeak, self.ELeak) model = CoupledSonophores([ NeuronalBilayerSonophore(self.a, pneuron) for i in range(self.nnodes)], ga) return self.runSims(model, drives, None, covs) def runSimsOverTauSpace(self, drives, covs, taum_range, tauax_range, mpi=False): ''' Run simulations over 2D time constant space. ''' queue = [[drives, covs] + x for x in Batch.createQueue(taum_range, tauax_range)] batch = Batch(self.getModelAndRunSims, queue) # batch.printQueue(queue) output = batch.run(mpi=mpi) results = [x[0] for x in output] # removing meta return np.reshape(results, (taum_range.size, tauax_range.size)).T def computeSteadyStateDivergence(self, data): ''' Evaluate per-node steady-state absolute deviation on charge density profiles. ''' return {k: np.abs(np.squeeze(np.diff(self.getCharges(data, k), axis=0)))[-1] for k in data['cycleavg'].keys()} @staticmethod def computeAreaRatio(yref, yeval, dt): # Compute absolute differential signals: between reference solution and unit steady-state, # and between the two solutions signals = [np.ones_like(yref), yeval] diffsignals = [np.abs(y - yref) for y in signals] # Compute related areas areas = [np.sum(y) * dt for y in diffsignals] # Return ratio of the two areas ratio = areas[1] / areas[0] logger.debug( f"{', '.join([f'{x * 1e5:.2f}%.ms' for x in areas])}, ratio = {ratio * 1e2:.2f}%") return ratio def isExponentialChargeBuildup(self, Qm): ''' Check if charge signal corresponds to an exponential build-up. ''' if np.ptp(Qm) < 1e-5: # C/m2 logger.debug('too narrow') return False Qmin, Qmax = bounds(Qm) Qbounds_check = dict(atol=1e-7, rtol=1e-5) # if not np.isclose(Qm[0], Qmin, **Qbounds_check): # logger.debug('not starting from bottom') # return False if not np.isclose(Qm[-1], Qmax, **Qbounds_check): logger.debug('not finishing on top') return False return True def computeTransientDivergence(self, data): ''' Evaluate per-node mean absolute difference on [0, 1] normalized charge profiles. ''' d = {} t = self.getTime(data) dt = t[1] - t[0] for k in data['cycleavg'].keys(): y = self.getCharges(data, k) # If cycle-avg charge profile corresponds to an exponential build-up if self.isExponentialChargeBuildup(y[0]): # Rescale signals linearly between 0 and 1 ynorms = np.array([rescale(yy) for yy in y]) # Restrict signals to transient phase tthr = self.getConvergenceTime(t, ynorms[0]) ynorms = [yy[t <= tthr] for yy in ynorms] # Compute ratio between the cycle-avg steady-state convergence area and the # difference area between cycle-avg and sonic solutions d[k] = self.computeAreaRatio(*ynorms, dt) * 1e2 else: d[k] = np.nan return d def eval_funcs(self): return { **super().eval_funcs(), 'ss': (self.computeSteadyStateDivergence, 'nC/cm2', 1e5), 'transient': (self.computeTransientDivergence, '%', 1e0) } def plotSignalsOverTauSpace(self, taum_range, tauax_range, results, pltfunc=None, fs=10): if pltfunc is None: pltfunc = 'plotQm' yunit = {'plotQm': 'nC/cm2', 'plotQnorm': None}[pltfunc] title = pltfunc[4:] pltfunc = getattr(self, pltfunc) return self.plotSignalsOver2DSpace( 'taum', taum_range, 's', 'tauax', tauax_range, 's', results, pltfunc, title=title, yunit=yunit) @staticmethod def getConvergenceTime(t, y, ythr=0.999): i = np.where(y > ythr)[0][0] # return np.interp(ythr, y[i - 1:i + 1], t[i - 1:i + 1]) return t[i] def plotQnorm(self, ax, data): t = self.getTime(data) for i, (k, nodedata) in enumerate(data['cycleavg'].items()): dt = t[1] - t[0] y = self.getCharges(data, k) c = self.nodecolors[i] ynorms = np.array([rescale(yy) for yy in y]) for yn, marker in zip(ynorms, ['--', '-']): ax.plot(t * 1e3, yn, marker, c=c) ax.axhline(1., ls='--', color='k') if self.isExponentialChargeBuildup(y[0]): tthr = self.getConvergenceTime(t, ynorms[0]) t_fill = t[t <= tthr] ynorms_fill = [yy[t <= tthr] for yy in ynorms] ax.axvline(tthr * 1e3, ls='--', color=c) ax.fill_between(t_fill * 1e3, *ynorms_fill, alpha=0.5, color=c) eps = self.computeAreaRatio(*ynorms_fill, dt) else: eps = np.nan ax.text(0.5, 0.3 * (i + 1), f'{eps * 1e2:.2f}%', c=c, transform=ax.transAxes) class FiberBenchmark(Benchmark): def __init__(self, a, nnodes, pneuron, ga, **kwargs): super().__init__(a, nnodes, **kwargs) self.model = CoupledSonophores([ NeuronalBilayerSonophore(self.a, pneuron) for i in range(self.nnodes)], ga) def pdict(self): return { **super().pdict(), 'ga': self.model.gastr, 'pneuron': self.model.refpneuron, } def getModelAndRunSims(self, Fdrive, tstim, covs, A1, A2): ''' Create passive model for a combination of time constants. ''' drives = AcousticDriveArray([AcousticDrive(Fdrive, A1), AcousticDrive(Fdrive, A2)]) return self.runSims(self.model, drives, tstim, covs) def runSimsOverAmplitudeSpace(self, Fdrive, tstim, covs, A_range, mpi=False, subset=None): ''' Run simulations over 2D time constant space. ''' # Generate 2D amplitudes meshgrid A_combs = np.meshgrid(A_range, A_range) # Set elements below main diagonal to NaN tril_idxs = np.tril_indices(A_range.size, -1) for x in A_combs: x[tril_idxs] = np.nan # Flatten the meshgrid and assemble into list of tuples A_combs = list(zip(*[x.flatten().tolist() for x in A_combs])) # Remove NaN elements A_combs = list(filter(lambda x: not any(np.isnan(xx) for xx in x), A_combs)) # Assemble queue queue = [[Fdrive, tstim, covs] + list(x) for x in A_combs] # restrict queue if subset is specified if subset is not None: queue = queue[subset[0]:subset[1] + 1] batch = Batch(self.getModelAndRunSims, queue) output = batch.run(mpi=mpi) results = [x[0] for x in output] # removing meta # Re-organize results into upper-triangle matrix new_results = np.empty((A_range.size, A_range.size), dtype=object) triu_idxs = np.triu_indices(A_range.size, 0) for *idx, res in zip(*triu_idxs, results): new_results[idx[0], idx[1]] = res return new_results def computeGamma(self, data, *args): ''' Evaluate per-node gamma on charge density profiles. ''' gamma_dict = {} resolution = list(data['cycleavg'].values())[0].dt for k in data['cycleavg'].keys(): # Get charge vectors (discarding 1st and last indexes) and compute gamma gamma_dict[k] = gamma(*self.getCharges(data, k), *args, resolution) return gamma_dict def plotQm(self, ax, data, *gamma_args): super().plotQm(ax, data) if len(gamma_args) > 0: gamma_dict = self.computeGamma(data, *gamma_args) tplt = self.getTime(data) * 1e3 for i, (nodekey, nodegamma) in enumerate(gamma_dict.items()): # Compute states derivatives and identify transition indexes isgamma = nodegamma >= 1 itransitions = np.where(np.abs(np.diff(isgamma)) > 0)[0] + 1 if len(itransitions) > 0: if isgamma[0]: itransitions = np.hstack(([0], itransitions)) if isgamma[-1]: itransitions = np.hstack((itransitions, [isgamma.size - 1])) spans = list(zip(tplt[itransitions[:-1]], tplt[itransitions[1:]])) for span in spans: ax.axvspan(*span, ec='none', fc=self.nodecolors[i], alpha=0.2) def plotGamma(self, ax, data, *gamma_args): gamma_dict = self.computeGamma(data, *gamma_args) tplt = self.getTime(data) * 1e3 for i, (nodekey, nodegamma) in enumerate(gamma_dict.items()): ax.plot(tplt, nodegamma, c=self.nodecolors[i], label=nodekey) ax.axhline(1, linestyle='--', c='k') def plotSignalsOverAmplitudeSpace(self, A_range, results, *args, pltfunc=None, fs=10): if pltfunc is None: pltfunc = 'plotQm' yunit = {'plotQm': 'nC/cm2', 'plotGamma': ''}[pltfunc] title = pltfunc[4:] pltfunc = getattr(self, pltfunc) return self.plotSignalsOver2DSpace( 'A1', A_range, 'Pa', 'A2', A_range, 'Pa', results, pltfunc, *args, title=title, yunit=yunit) def computeGammaDivergence(self, data, *args): return {k: np.nanmax(v) for k, v in self.computeGamma(data, *args).items()} def eval_funcs(self): return { **super().eval_funcs(), 'gamma': (self.computeGammaDivergence, '', 1e0) } diff --git a/PySONIC/multicomp/coupled_nbls.py b/PySONIC/multicomp/coupled_nbls.py index 5e82796..9068ef7 100644 --- a/PySONIC/multicomp/coupled_nbls.py +++ b/PySONIC/multicomp/coupled_nbls.py @@ -1,319 +1,319 @@ # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Email: theo.lemaire@epfl.ch # @Date: 2021-05-14 17:50:14 # @Last Modified by: Theo Lemaire -# @Last Modified time: 2021-06-09 17:20:02 +# @Last Modified time: 2021-06-20 16:30:47 import os import pickle import logging import numpy as np from ..utils import logger, isWithin, os_name from ..core import Model, NeuronalBilayerSonophore, EventDrivenSolver from ..core.timeseries import TimeSeries, SpatiallyExtendedTimeSeries from ..constants import CLASSIC_TARGET_DT, MAX_NSAMPLES_EFFECTIVE class CoupledSonophores: ''' Interface allowing to run benchmark simulations of a two-compartment coupled NBLS model. ''' simkey = 'COUPLED_ASTIM' # keyword used to characterize simulations made with this model ga_bounds = [1e-10, 1e10] # S/m2 def __init__(self, nodes, ga): ''' Initialization. :param nodes: list of nbls objects :param ga: axial conductance (S/m2) ''' assert all(x.pneuron == nodes[0].pneuron for x in nodes), 'differing point-neuron models' self.nodes = nodes self.nnodes = len(nodes) self.ga = ga def normalizedConductanceMatrix(self): ones = np.ones(self.nnodes) return np.diag(ones, 0) + np.diag(-ones[:-1], -1) + np.diag(-ones[:-1], 1) def copy(self): return self.__class__(self.nodes, self.ga) @property def meta(self): return { 'nodes': [x.meta for x in self.nodes], 'ga': self.ga } @classmethod def initFromMeta(cls, meta): try: nodes, ga = meta['nodes'], meta['ga'] except KeyError: meta = meta['model'] nodes, ga = meta['nodes'], meta['ga'] nodes = [NeuronalBilayerSonophore.initFromMeta(x) for x in nodes] return cls(nodes, ga) @property def refnode(self): return self.nodes[0] @property def refpneuron(self): return self.refnode.pneuron @property def gastr(self): return f'{self.ga:.2e} S/m2' @property def mechstr(self): return self.refnode.pneuron.name def __repr__(self): params = [f'ga = {self.gastr}'] return f'{self.__class__.__name__}({self.mechstr} dynamics, {", ".join(params)})' @property def ga(self): return self._ga @ga.setter def ga(self, value): if value != 0.: assert isWithin('ga', value, self.ga_bounds) self._ga = value self.ga_matrix = self.normalizedConductanceMatrix() * value def Iax(self, Vm): ''' Compute array of axial currents in each compartment based on array of potentials. ''' return -self.ga_matrix.dot(Vm) # mA/m2 def serialize(self, y): ''' Serialize a single state vector into a state-per-node matrix. ''' return np.reshape(y.copy(), (self.npernode * self.nnodes)) def deserialize(self, y): ''' Deserialize a state per node matrix into a single state vector. ''' return np.reshape(y.copy(), (self.nnodes, self.npernode)) def fullDerivatives(self, t, y, drives, fs): ''' full derivatives method. ''' # Deserialize states vector y = self.deserialize(y) # Compute derivatives array for uncoupled nbls systems dydt = np.vstack([self.nodes[i].fullDerivatives(t, y[i], drives[i], fs[i]) for i in range(self.nnodes)]) # Compute membrane capacitance and potential profiles iZ, iQ = 1, 3 Cm = np.array([x.spatialAverage(fs[i], x.capacitance(y[i, iZ]), x.Cm0) for i, x in enumerate(self.nodes)]) # F/m2 Vm = y[:, iQ] / Cm * 1e3 # mV # Add axial currents to charge derivatives column dydt[:, iQ] += self.Iax(Vm) * 1e-3 # A/m2 # Return serialized derivatives vector return self.serialize(dydt) def effDerivatives(self, t, y, lkps1d): ''' effective derivatives method. ''' # Deserialize states vector y = self.deserialize(y) # Compute derivatives array for uncoupled nbls systems iQ = 0 dydt = np.vstack([self.nodes[i].effDerivatives(t, y[i], lkps1d[i], []) for i in range(self.nnodes)]) # Interpolate membrane potentials Vm = np.array([x.interpolate1D(Qm)['V'] for x, Qm in zip(lkps1d, y[:, iQ])]) # mV # Add axial currents to charge derivatives column dydt[:, iQ] += self.Iax(Vm) * 1e-3 # A/m2 # Return serialized derivatives vector return self.serialize(dydt) def deserializeSolution(self, data): ''' Re-arrange solution per node. ''' inputs = [data.time, data.stim] output_keys = { f'node{i + 1}': list(filter(lambda x: x.endswith(f'_{i + 1}'), data.outputs)) for i in range(self.nnodes)} outputs = {k: {x[:-2]: data[x].values for x in v} for k, v in output_keys.items()} return SpatiallyExtendedTimeSeries({ k: TimeSeries(*inputs, v) for k, v in outputs.items()}) def __simFull(self, drives, pp, fs): # Determine time step assert drives.is_monofrequency(), 'differing carrier frequencies' dt = drives.dt # Compute and serialize initial conditions y0 = [self.nodes[i].fullInitialConditions(drives[i], self.nodes[i].Qm0, dt) for i in range(self.nnodes)] self.npernode = len(y0[0]) y0 = {f'{k}_{i + 1}': v for i, x in enumerate(y0) for k, v in x.items()} # Define event function def updateDrives(obj, x): for sd, d in zip(obj.drives, drives): sd.xvar = d.xvar * x # Initialize solver solver = EventDrivenSolver( lambda x: updateDrives(solver, x), y0.keys(), lambda t, y: self.fullDerivatives(t, y, solver.drives, fs), event_params={'drives': drives.nullCopy()}, dt=dt) # Compute serialized solution data = solver( y0, pp.stimEvents(), pp.tstop, target_dt=CLASSIC_TARGET_DT, log_period=pp.tstop / 100 if logger.getEffectiveLevel() <= logging.INFO else None, # logfunc=lambda y: f'Qm = {y[3] * 1e5:.2f} nC/cm2' ) # Re-arrange solution per node data = self.deserializeSolution(data) # Remove velocity and add voltage timeseries to solution for i, (nodekey, nodedata) in enumerate(data.items()): del nodedata['U'] nodedata.addColumn( 'Vm', self.nodes[i].deflectionDependentVm(nodedata['Qm'], nodedata['Z'], fs[i])) # Return solution return data def __simSonic(self, drives, pp, fs): # Determine time step assert drives.is_monofrequency(), 'differing carrier frequencies' dt = drives.periodicity # s # Load appropriate 2D lookups lkps = [self.nodes[i].getLookup2D(drives[i].f, fs[i]) for i in range(self.nnodes)] # Compute and serialize initial conditions y0 = [{'Qm': x.Qm0, **{k: v(x.pneuron.Vm0) for k, v in x.pneuron.steadyStates().items()}} for x in self.nodes] self.npernode = len(y0[0]) y0 = {f'{k}_{i + 1}': v for i, x in enumerate(y0) for k, v in x.items()} # Define event function def updateLookups(obj, x): for i, (lkp, drive) in enumerate(zip(lkps, drives)): obj.lkps[i] = lkp.project('A', drive.xvar * x) # Initialize solver solver = EventDrivenSolver( lambda x: updateLookups(solver, x), y0.keys(), lambda t, y: self.effDerivatives(t, y, solver.lkps), event_params={'lkps': [lkp.project('A', 0.) for lkp in lkps]}, dt=dt) # Compute serialized solution data = solver( y0, pp.stimEvents(), pp.tstop, log_period=pp.tstop / 100 if pp.tstop >= 5 else None, max_nsamples=MAX_NSAMPLES_EFFECTIVE) # Re-arrange solution per node data = self.deserializeSolution(data) # Add voltage timeseries to solution (interpolated along Qm) and dummy Z and ng vectors for i, (nodekey, nodedata) in enumerate(data.items()): nodedata.addColumn('Vm', self.nodes[i].interpEffVariable( 'V', nodedata['Qm'], nodedata.stim * drives[i].A, lkps[i])) for key in ['Z', 'ng']: nodedata[key] = np.full(nodedata.time.size, np.nan) # Return solution dataframe return data def intMethods(self): ''' Listing of model integration methods. ''' return { 'full': self.__simFull, 'sonic': self.__simSonic } def desc(self, meta): method = meta['method'] if 'method' in meta else meta['model']['method'] fs = meta['fs'] if 'fs' in meta else meta['model']['fs'] drives_str = meta['drives'].desc fs_str = f'fs = ({", ".join([f"{x * 1e2:.2f}%" for x in fs])})' return f'{self}: {method} simulation @ ({drives_str}), {meta["pp"].desc}, {fs_str}' @Model.addMeta @Model.logDesc def simulate(self, drives, pp, fs, method='sonic'): ''' Simulate the coupled-nbls model for a specific set of US stimulation parameters, and coverage fractions, and return spatially-distributed output data. :param drives: acoustic drive array :param pp: pulse protocol object :param fs: list of sonophore membrane coverage fractions (-) :param method: selected integration method :return: SpatiallyExtendedTimeSeries object ''' # Check that inputs dimensions matches number of nodes assert len(drives) == self.nnodes, 'number of drives does not match number of nodes' assert len(fs) == self.nnodes, 'number of coverage inputs does not match number of nodes' simfunc = self.intMethods()[method] return simfunc(drives, pp, fs) def filecodes(self, drives, pp, fs, method): codes = { 'simkey': self.simkey, 'neuron': self.refpneuron.name, 'nnodes': f'{self.nnodes}node{"s" if self.nnodes > 1 else ""}', 'ga': f'ga{self.ga:.2e}S_m2', 'a': f'a{"_".join([f"{x.a * 1e9:.0f}nm" for x in self.nodes])}', **drives.filecodes, **pp.filecodes } codes['fs'] = f'fs{"_".join([f"{x * 1e2:.0f}%" for x in fs])}' codes['method'] = method return codes def filecode(self, *args): return '_'.join([x for x in self.filecodes(*args).values() if x is not None]) - def simAndSave(self, *args, outdir=None, overwrite=False, minimize_output=True): + def simAndSave(self, *args, outdir=None, overwrite=False, full_output=False): runsim = True if outdir is not None: fname = f'{self.filecode(*args)}.pkl' fpath = os.path.join(outdir, fname) if os_name == 'Windows': fpath = f'\\\\?\\{fpath}' if os.path.isfile(fpath) and not overwrite: logger.info(f'Loading data from "{os.path.basename(fpath)}"') with open(fpath, 'rb') as fh: frame = pickle.load(fh) data, meta = frame['data'], frame['meta'] runsim = False if runsim: data, meta = self.simulate(*args) - if minimize_output: + if not full_output: data.dumpOutputsOtherThan(['Qm', 'Vm']) if outdir is not None: with open(fpath, 'wb') as fh: pickle.dump({'meta': meta, 'data': data}, fh) logger.debug(f'simulation data exported to "{fpath}"') return data, meta @property def tauax(self): ''' Axial time constant (s). ''' return self.refnode.Cm0 / self.ga @property def taum(self): ''' Passive membrane time constant (s). ''' return self.refpneuron.tau_pas @property def taumax(self): ''' Maximal time constant of the model (s). ''' return max(self.taum, self.tauax) diff --git a/PySONIC/utils.py b/PySONIC/utils.py index d533fd1..fa04c43 100644 --- a/PySONIC/utils.py +++ b/PySONIC/utils.py @@ -1,1087 +1,1092 @@ # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Email: theo.lemaire@epfl.ch # @Date: 2016-09-19 22:30:46 # @Last Modified by: Theo Lemaire -# @Last Modified time: 2021-06-16 18:13:56 +# @Last Modified time: 2021-06-20 16:20:02 ''' Definition of generic utility functions used in other modules ''' import sys import platform import itertools import csv from functools import wraps import operator import time from inspect import signature import os from shutil import get_terminal_size import lockfile import math from fractions import Fraction import pickle import json from tqdm import tqdm import logging import tkinter as tk from tkinter import filedialog import base64 import datetime import numpy as np from scipy.optimize import brentq from scipy import linalg import colorlog from pushbullet import Pushbullet os_name = platform.system() # Package logger my_log_formatter = colorlog.ColoredFormatter( '%(log_color)s %(asctime)s %(message)s', datefmt='%d/%m/%Y %H:%M:%S:', reset=True, log_colors={ 'DEBUG': 'green', 'INFO': 'white', 'WARNING': 'yellow', 'ERROR': 'red', 'CRITICAL': 'red,bg_white', }, style='%') def setHandler(logger, handler): for h in logger.handlers: logger.removeHandler(h) logger.addHandler(handler) return logger def setLogger(name, formatter): handler = colorlog.StreamHandler() handler.setFormatter(formatter) handler.stream = sys.stdout logger = colorlog.getLogger(name) logger.addHandler(handler) return logger class TqdmHandler(logging.StreamHandler): def __init__(self, formatter): logging.StreamHandler.__init__(self) self.setFormatter(formatter) def emit(self, record): msg = self.format(record) tqdm.write(msg) logger = setLogger('PySONIC', my_log_formatter) LOOKUP_DIR = os.path.abspath(os.path.split(__file__)[0] + "/lookups/") def fillLine(text, char='-', totlength=None): ''' Surround a text with repetitions of a specific character in order to fill a line to a given total length. :param text: text to be surrounded :param char: surrounding character :param totlength: target number of characters in filled text line :return: filled text line ''' if totlength is None: totlength = get_terminal_size().columns - 1 ndashes = totlength - len(text) - 2 if ndashes < 2: return text else: nside = ndashes // 2 nleft, nright = nside, nside if ndashes % 2 == 1: nright += 1 return f'{char * nleft} {text} {char * nright}' # SI units prefixes si_prefixes = { 'y': 1e-24, # yocto 'z': 1e-21, # zepto 'a': 1e-18, # atto 'f': 1e-15, # femto 'p': 1e-12, # pico 'n': 1e-9, # nano 'u': 1e-6, # micro 'm': 1e-3, # mili '': 1e0, # None 'k': 1e3, # kilo 'M': 1e6, # mega 'G': 1e9, # giga 'T': 1e12, # tera 'P': 1e15, # peta 'E': 1e18, # exa 'Z': 1e21, # zetta 'Y': 1e24, # yotta } sorted_si_prefixes = sorted(si_prefixes.items(), key=operator.itemgetter(1)) def getSIpair(x, scale='lin'): ''' Get the correct SI factor and prefix for a floating point number. ''' if isIterable(x): # If iterable, get a representative number of the distribution x = np.asarray(x) x = x.prod()**(1.0 / x.size) if scale == 'log' else np.mean(x) if x == 0: return 1e0, '' else: vals = [tmp[1] for tmp in sorted_si_prefixes] ix = np.searchsorted(vals, np.abs(x)) - 1 if np.abs(x) == vals[ix + 1]: ix += 1 return vals[ix], sorted_si_prefixes[ix][0] def si_format(x, precision=0, space=' '): ''' Format a float according to the SI unit system, with the appropriate prefix letter. ''' if isinstance(x, float) or isinstance(x, int) or isinstance(x, np.float) or\ isinstance(x, np.int32) or isinstance(x, np.int64): factor, prefix = getSIpair(x) return f'{x / factor:.{precision}f}{space}{prefix}' elif isinstance(x, list) or isinstance(x, tuple): return [si_format(item, precision, space) for item in x] elif isinstance(x, np.ndarray) and x.ndim == 1: return [si_format(float(item), precision, space) for item in x] else: raise ValueError(f'cannot si_format {type(x)} objects') def pow10_format(number, precision=2): ''' Format a number in power of 10 notation. ''' sci_string = f'{number:.{precision}e}' value, exponent = sci_string.split("e") value, exponent = float(value), int(exponent) val_str = f'{value} * ' if value != 1. else '' return f'{val_str}10^{{{exponent}}}' def frac_format(x, key=None): frac = Fraction(x).limit_denominator(100) if frac.numerator != 1 or key is None: s = str(frac.numerator) else: s = '' if key is not None: s = f'{s}{key}' if frac.denominator != 1: s = f'{s}/{frac.denominator}' return s def rmse(x1, x2, axis=None): ''' Compute the root mean square error between two 1D arrays ''' return np.sqrt(((x1 - x2) ** 2).mean(axis=axis)) def rsquared(x1, x2): ''' compute the R-squared coefficient between two 1D arrays ''' residuals = x1 - x2 ss_res = np.sum(residuals**2) ss_tot = np.sum((x1 - np.mean(x1))**2) return 1 - (ss_res / ss_tot) def Pressure2Intensity(p, rho=1075.0, c=1515.0): ''' Return the spatial peak, pulse average acoustic intensity (ISPPA) associated with the specified pressure amplitude. :param p: pressure amplitude (Pa) :param rho: medium density (kg/m3) :param c: speed of sound in medium (m/s) :return: spatial peak, pulse average acoustic intensity (W/m2) ''' return p**2 / (2 * rho * c) def Intensity2Pressure(I, rho=1075.0, c=1515.0): ''' Return the pressure amplitude associated with the specified spatial peak, pulse average acoustic intensity (ISPPA). :param I: spatial peak, pulse average acoustic intensity (W/m2) :param rho: medium density (kg/m3) :param c: speed of sound in medium (m/s) :return: pressure amplitude (Pa) ''' return np.sqrt(2 * rho * c * I) def convertPKL2JSON(): for pkl_filepath in OpenFilesDialog('pkl')[0]: logger.info(f'Processing {pkl_filepath} ...') json_filepath = f'{os.path.splitext(pkl_filepath)[0]}.json' with open(pkl_filepath, 'rb') as fpkl, open(json_filepath, 'w') as fjson: data = pickle.load(fpkl) json.dump(data, fjson, ensure_ascii=False, sort_keys=True, indent=4) logger.info('All done!') def OpenFilesDialog(filetype, dirname=''): ''' Open a FileOpenDialogBox to select one or multiple file. The default directory and file type are given. :param dirname: default directory :param filetype: default file type :return: tuple of full paths to the chosen filenames ''' root = tk.Tk() root.withdraw() filenames = filedialog.askopenfilenames( filetypes=[(filetype + " files", '.' + filetype)], initialdir=dirname ) if len(filenames) == 0: raise ValueError('no input file selected') par_dir = os.path.abspath(os.path.join(filenames[0], os.pardir)) return filenames, par_dir def selectDirDialog(title='Select directory'): ''' Open a dialog box to select a directory. :return: full path to selected directory ''' root = tk.Tk() root.withdraw() directory = filedialog.askdirectory(title=title) if directory == '': raise ValueError('no directory selected') return directory def SaveFileDialog(filename, dirname=None, ext=None): ''' Open a dialog box to save file. :param filename: filename :param dirname: initial directory :param ext: default extension :return: full path to the chosen filename ''' root = tk.Tk() root.withdraw() filename_out = filedialog.asksaveasfilename( defaultextension=ext, initialdir=dirname, initialfile=filename) if len(filename_out) == 0: raise ValueError('no output filepath selected') return filename_out def loadData(fpath, frequency=1): ''' Load dataframe and metadata dictionary from pickle file. ''' logger.info('Loading data from "%s"', os.path.basename(fpath)) with open(fpath, 'rb') as fh: frame = pickle.load(fh) df = frame['data'].iloc[::frequency] meta = frame['meta'] return df, meta def rescale(x, lb=None, ub=None, lb_new=0, ub_new=1): ''' Rescale a value to a specific interval by linear transformation. ''' if lb is None: lb = x.min() if ub is None: ub = x.max() xnorm = (x - lb) / (ub - lb) return xnorm * (ub_new - lb_new) + lb_new def expandRange(xmin, xmax, exp_factor=2): ''' Expand a range by a specific factor around its mid-point. ''' if xmin > xmax: raise ValueError('values must be provided in (min, max) order') xptp = xmax - xmin xmid = (xmin + xmax) / 2 xdev = xptp * exp_factor / 2 return (xmid - xdev, xmid + xdev) def isIterable(x): for t in [list, tuple, np.ndarray]: if isinstance(x, t): return True return False def isWithin(name, val, bounds, rel_tol=1e-9, raise_warning=True): ''' Check if a floating point number is within an interval. If the value falls outside the interval, an error is raised. If the value falls just outside the interval due to rounding errors, the associated interval bound is returned. :param val: float value :param bounds: interval bounds (float tuple) :return: original or corrected value ''' if isIterable(val): return np.array([isWithin(name, v, bounds, rel_tol, raise_warning) for v in val]) if val >= bounds[0] and val <= bounds[1]: return val elif val < bounds[0] and math.isclose(val, bounds[0], rel_tol=rel_tol): if raise_warning: logger.warning( 'Rounding %s value (%s) to interval lower bound (%s)', name, val, bounds[0]) return bounds[0] elif val > bounds[1] and math.isclose(val, bounds[1], rel_tol=rel_tol): if raise_warning: logger.warning( 'Rounding %s value (%s) to interval upper bound (%s)', name, val, bounds[1]) return bounds[1] else: raise ValueError(f'{name} value ({val}) out of [{bounds[0]}, {bounds[1]}] interval') def getDistribution(xmin, xmax, nx, scale='lin'): if scale == 'log': xmin, xmax = np.log10(xmin), np.log10(xmax) return {'lin': np.linspace, 'log': np.logspace}[scale](xmin, xmax, nx) def getDistFromList(xlist): if not isinstance(xlist, list): raise TypeError('Input must be a list') if len(xlist) != 4: raise ValueError('List must contain exactly 4 arguments ([type, min, max, n])') scale = xlist[0] if scale not in ('log', 'lin'): raise ValueError('Unknown distribution type (must be "lin" or "log")') xmin, xmax = [float(x) for x in xlist[1:-1]] if xmin >= xmax: raise ValueError('Specified minimum higher or equal than specified maximum') nx = int(xlist[-1]) if nx < 2: raise ValueError('Specified number must be at least 2') return getDistribution(xmin, xmax, nx, scale=scale) def getIndex(container, value): ''' Return the index of a float / string value in a list / array :param container: list / 1D-array of elements :param value: value to search for :return: index of value (if found) ''' if isinstance(value, float): container = np.array(container) imatches = np.where(np.isclose(container, value, rtol=1e-9, atol=1e-16))[0] if len(imatches) == 0: raise ValueError(f'{value} not found in {container}') return imatches[0] elif isinstance(value, str): return container.index(value) def funcSig(func, args, kwargs): args_repr = [repr(a) for a in args] kwargs_repr = [f"{k}={v!r}" for k, v in kwargs.items()] return f'{func.__name__}({", ".join(args_repr + kwargs_repr)})' def debug(func): ''' Print the function signature and return value. ''' @wraps(func) def wrapper_debug(*args, **kwargs): print(f'Calling {funcSig(func, args, kwargs)}') value = func(*args, **kwargs) print(f"{func.__name__!r} returned {value!r}") return value return wrapper_debug def timer(func): ''' Monitor and return the runtime of the decorated function. ''' @wraps(func) def wrapper(*args, **kwargs): start_time = time.perf_counter() value = func(*args, **kwargs) end_time = time.perf_counter() run_time = end_time - start_time return value, run_time return wrapper def alignWithFuncDef(func, args, kwargs): ''' Align a set of provided positional and keyword arguments with the arguments signature in a specific function definition. :param func: function object :param args: list of provided positional arguments :param kwargs: dictionary of provided keyword arguments :return: 2-tuple with the modified arguments and ''' # Get positional and keyword arguments from function signature sig_params = {k: v for k, v in signature(func).parameters.items()} sig_args = list(filter(lambda x: x.default == x.empty, sig_params.values())) sig_kwargs = {k: v.default for k, v in sig_params.items() if v.default != v.empty} sig_nargs = len(sig_args) kwarg_keys = list(sig_kwargs.keys()) # Restrain provided positional arguments to those that are also positional in signature new_args = args[:sig_nargs] # Construct hybrid keyword arguments dictionary from: # - remaining positional arguments # - provided keyword arguments # - default keyword arguments new_kwargs = sig_kwargs for i, x in enumerate(args[sig_nargs:]): new_kwargs[kwarg_keys[i]] = x for k, v in kwargs.items(): new_kwargs[k] = v return new_args, new_kwargs def alignWithMethodDef(method, args, kwargs): args, kwargs = alignWithFuncDef(method, [None] + list(args), kwargs) return tuple(args[1:]), kwargs def logCache(fpath, delimiter='\t', out_type=float): ''' Add an extra IO memoization functionality to a function using file caching, to avoid repetitions of tedious computations with identical inputs. ''' def wrapper_with_args(func): @wraps(func) def wrapper(*args, **kwargs): # If function has history -> do not log if 'history' in kwargs: return func(*args, **kwargs) # Modify positional and keyword arguments to match function signature, if needed args, kwargs = alignWithFuncDef(func, args, kwargs) # Translate args and kwargs into string signature fsignature = funcSig(func, args, kwargs) # If entry present in log, return corresponding output if os.path.isfile(fpath): with open(fpath, 'r', newline='') as f: reader = csv.reader(f, delimiter=delimiter) for row in reader: if row[0] == fsignature: logger.debug(f'entry found in "{os.path.basename(fpath)}"') return out_type(row[1]) # Otherwise, compute output and log it into file before returning out = func(*args, **kwargs) lock = lockfile.FileLock(fpath) lock.acquire() with open(fpath, 'a', newline='') as csvfile: writer = csv.writer(csvfile, delimiter=delimiter) writer.writerow([fsignature, str(out)]) lock.release() return out return wrapper return wrapper_with_args def fileCache(root, fcode_func, ext='json'): def wrapper_with_args(func): @wraps(func) def wrapper(*args, **kwargs): # Get load and dump functions from file extension try: load_func = { 'json': json.load, 'pkl': pickle.load, 'csv': lambda f: np.loadtxt(f, delimiter=',') }[ext] dump_func = { 'json': json.dump, 'pkl': pickle.dump, 'csv': lambda x, f: np.savetxt(f, x, delimiter=',') }[ext] except KeyError: raise ValueError('Unknown file extension') # Get read and write mode (text or binary) from file extension mode = 'b' if ext == 'pkl' else '' # Get file path from root and function arguments, using fcode function if callable(fcode_func): fcode = fcode_func(*args) else: fcode = fcode_func fpath = os.path.join(os.path.abspath(root), f'{fcode}.{ext}') # If file exists, load output from it if os.path.isfile(fpath): logger.info(f'loading data from "{fpath}"') with open(fpath, 'r' + mode) as f: out = load_func(f) # Otherwise, execute function and create the file to dump the output else: logger.warning(f'reference data file not found: "{fpath}"') out = func(*args, **kwargs) logger.info(f'dumping data in "{fpath}"') lock = lockfile.FileLock(fpath) lock.acquire() with open(fpath, 'w' + mode) as f: dump_func(out, f) lock.release() return out return wrapper return wrapper_with_args def derivative(f, x, eps, method='central'): ''' Compute the difference formula for f'(x) with perturbation size eps. :param dfunc: derivatives function, taking an array of states and returning an array of derivatives :param x: states vector :param method: difference formula: 'forward', 'backward' or 'central' :param eps: perturbation vector (same size as states vector) :return: numerical approximation of the derivative around the fixed point ''' if isIterable(x): if not isIterable(eps) or len(eps) != len(x): raise ValueError('eps must be the same size as x') elif np.sum(eps != 0.) != 1: raise ValueError('eps must be zero-valued across all but one dimensions') eps_val = np.sum(eps) else: eps_val = eps if method == 'central': df = (f(x + eps) - f(x - eps)) / 2 elif method == 'forward': df = f(x + eps) - f(x) elif method == 'backward': df = f(x) - f(x - eps) else: raise ValueError("Method must be 'central', 'forward' or 'backward'.") return df / eps_val def jacobian(dfunc, x, rel_eps=None, abs_eps=None, method='central'): ''' Evaluate the Jacobian maatrix of a (time-invariant) system, given a states vector and derivatives function. :param dfunc: derivatives function, taking an array of n states and returning an array of n derivatives :param x: n-states vector :return: n-by-n square Jacobian matrix ''' if sum(e is not None for e in [abs_eps, rel_eps]) != 1: raise ValueError('one (and only one) of "rel_eps" or "abs_eps" parameters must be provided') # Determine vector size x = np.asarray(x) n = x.size # Initialize Jacobian matrix J = np.empty((n, n)) # Create epsilon vector if rel_eps is not None: mode = 'relative' eps_vec = rel_eps else: mode = 'absolute' eps_vec = abs_eps if not isIterable(eps_vec): eps_vec = np.array([eps_vec] * n) if mode == 'relative': eps = x * eps_vec else: eps = eps_vec # Perturb each state by epsilon on both sides, re-evaluate derivatives # and assemble Jacobian matrix ei = np.zeros(n) for i in range(n): ei[i] = 1 J[:, i] = derivative(dfunc, x, eps * ei, method=method) ei[i] = 0 return J def classifyFixedPoint(x, dfunc): ''' Characterize the stability of a fixed point by numerically evaluating its Jacobian matrix and evaluating the sign of the real part of its associated eigenvalues. :param x: n-states vector :param dfunc: derivatives function, taking an array of n states and returning an array of n derivatives ''' # Compute Jacobian numerically # print(f'x = {x}, dfunx(x) = {dfunc(x)}') eps_machine = np.sqrt(np.finfo(float).eps) J = jacobian(dfunc, x, rel_eps=eps_machine, method='forward') # Compute eigenvalues and eigenvectors eigvals, eigvecs = linalg.eig(J) logger.debug(f"eigenvalues = {[f'({x.real:.2e} + {x.imag:.2e}j)' for x in eigvals]}") # Determine fixed point stability based on eigenvalues is_neg_eigvals = eigvals.real < 0 if is_neg_eigvals.all(): # all real parts negative -> stable key = 'stable' elif is_neg_eigvals.any(): # both posivie and negative real parts -> saddle key = 'saddle' else: # all real parts positive -> unstable key = 'unstable' return eigvals, key def findModifiedEq(x0, dfunc, *args): ''' Find an equilibrium variable in a modified system by searching for its derivative root within an interval around its original equilibrium. :param x0: equilibrium value in original system. :param func: derivative function, taking the variable as first parameter. :param *args: remaining arguments needed for the derivative function. :return: variable equilibrium value in modified system. ''' is_iterable = [isIterable(arg) for arg in args] if any(is_iterable): if not all(is_iterable): raise ValueError('mix of iterables and non-iterables') lengths = [len(arg) for arg in args] if not all(n == lengths[0] for n in lengths): raise ValueError(f'inputs are not of the same size: {lengths}') n = lengths[0] res = [] for i in range(n): x = [arg[i] for arg in args] res.append(findModifiedEq(x0, dfunc, *x)) return np.array(res) else: return brentq(lambda x: dfunc(x, *args), x0 * 1e-4, x0 * 1e3, xtol=1e-16) def swapFirstLetterCase(s): if s[0].islower(): return s.capitalize() else: return s[0].lower() + s[1:] def getPow10(x, direction='up'): ''' Get the power of 10 that is closest to a number, in either direction("down" or "up"). ''' round_method = {'up': np.ceil, 'down': np.floor}[direction] return np.power(10, round_method(np.log10(x))) def rotAroundPoint2D(x, theta, p): ''' Rotate a 2D vector around a center point by a given angle. :param x: 2D coordinates vector :param theta: rotation angle (in rad) :param p: 2D center point coordinates :return: 2D rotated coordinates vector ''' n1, n2 = x.shape if n1 != 2: if n2 == 2: x = x.T else: raise ValueError('x should be a 2-by-n vector') # Rotation matrix R = np.array([ [np.cos(theta), -np.sin(theta)], [np.sin(theta), np.cos(theta)], ]) # Broadcast center point to input vector ptile = np.tile(p, (x.shape[1], 1)).T # Subtract, rotate and add return R.dot(x - ptile) + ptile def getKey(keyfile='pushbullet.key'): dir_path = os.path.dirname(os.path.realpath(__file__)) package_root = os.path.abspath(os.path.join(dir_path, os.pardir)) fpath = os.path.join(package_root, keyfile) if not os.path.isfile(fpath): raise FileNotFoundError('pushbullet API key file not found') with open(fpath) as f: encoded_key = f.readlines()[0] return base64.b64decode(str.encode(encoded_key)).decode() def sendPushNotification(msg): try: key = getKey() pb = Pushbullet(key) dt = datetime.datetime.now() s = dt.strftime('%Y-%m-%d %H:%M:%S') pb.push_note('Code Messenger', f'{s}\n{msg}') except FileNotFoundError: logger.error(f'Could not send push notification: "{msg}"') def alert(func): ''' Run a function, and send a push notification upon completion, or if an error is raised during its execution. ''' @wraps(func) def wrapper(*args, **kwargs): try: out = func(*args, **kwargs) sendPushNotification(f'completed "{func.__name__}" execution successfully') return out except BaseException as e: sendPushNotification(f'error during "{func.__name__}" execution: {e}') raise e return wrapper def sunflower(n, radius=1, alpha=1): ''' Generate a population of uniformly distributed 2D data points in a unit circle. :param n: number of data points :param alpha: coefficient determining evenness of the boundary :return: 2D matrix of Cartesian (x, y) positions ''' nbounds = np.round(alpha * np.sqrt(n)) # number of boundary points phi = (np.sqrt(5) + 1) / 2 # golden ratio k = np.arange(1, n + 1) # index vector theta = 2 * np.pi * k / phi**2 # angle vector r = np.sqrt((k - 1) / (n - nbounds - 1)) # radius vector r[r > 1] = 1 x = r * np.cos(theta) y = r * np.sin(theta) return radius * np.vstack((x, y)) def filecode(model, *args): ''' Generate file code given a specific combination of model input parameters. ''' # If meta dictionary was passed, generate inputs list from it if len(args) == 1 and isinstance(args[0], dict): meta = args[0].copy() if meta['simkey'] == 'ASTIM' and 'fs' not in meta: meta['fs'] = meta['model']['fs'] meta['method'] = meta['model']['method'] meta['qss_vars'] = None for k in ['simkey', 'model', 'tcomp', 'dt', 'atol']: if k in meta: del meta[k] args = list(meta.values()) # Otherwise, transform args tuple into list else: args = list(args) # If any argument is an iterable -> transform it to a continous string for i in range(len(args)): if isIterable(args[i]): args[i] = ''.join([str(x) for x in args[i]]) # Create file code by joining string-encoded inputs with underscores codes = model.filecodes(*args).values() return '_'.join([x for x in codes if x is not None]) def simAndSave(model, *args, **kwargs): ''' Simulate the model and save the results in a specific output directory. :param *args: list of arguments provided to the simulation function :param **kwargs: optional arguments dictionary :return: output filepath ''' # Extract output directory and overwrite boolean from keyword arguments. outputdir = kwargs.pop('outputdir', '.') overwrite = kwargs.pop('overwrite', True) + full_output = kwargs.pop('full_output', True) # Set data and meta to None data, meta = None, None # Extract drive object from args drive, *other_args = args # If drive is searchable and not fully resolved if drive.is_searchable: if not drive.is_resolved: # Call simulate to perform titration out = model.simulate(*args) # If titration yields nothing -> no file produced -> return None if out is None: logger.warning('returning None') return None # Store data and meta data, meta = out # Update args list with resovled drive try: args = (meta['drive'], *other_args) except KeyError: args = (meta['source'], *other_args) # Check if a output file corresponding to sim inputs is found in the output directory # That check is performed prior to running the simulation, such that # it is not run if the file is present and overwrite is set ot false. fname = f'{model.filecode(*args)}.pkl' fpath = os.path.join(outputdir, fname) if os_name == 'Windows': fpath = f'\\\\?\\{fpath}' existing_file_msg = f'File "{fname}" already present in directory "{outputdir}"' existing_file = os.path.isfile(fpath) # If file exists and overwrite is set ot false -> return if existing_file and not overwrite: logger.warning(f'{existing_file_msg} -> preserving') return fpath # Run simulation if not already done (for titration cases) if data is None: data, meta = model.simulate(*args) + # Remove internal variables if specified + if not full_output: + data.dumpOutputsOtherThan(['Qm', 'Vm']) + # Raise warning if an existing file is overwritten if existing_file: logger.warning(f'{existing_file_msg} -> overwriting') # Save output file and return output filepath with open(fpath, 'wb') as fh: pickle.dump({'meta': meta, 'data': data}, fh) logger.debug('simulation data exported to "%s"', fpath) return fpath def moveItem(l, value, itarget): ''' Move a list item to a specific target index. :param l: list object :param value: value of the item to move :param itarget: target index :return: re-ordered list. ''' # Get absolute target index if itarget < 0: itarget += len(l) assert itarget < len(l), f'target index {itarget} exceeds list size ({len(l)})' # Get index corresponding to element and delete entry from list iref = l.index(value) new_l = l.copy() del new_l[iref] # Return re-organized list return new_l[:itarget] + [value] + new_l[itarget:] def gaussian(x, mu=0., sigma=1., A=1.): return A * np.exp(-((x - mu) / sigma)**2 / 2) def isPickable(obj): try: pickle.dumps(obj) except Exception: return False return True def resolveFuncArgs(func, *args, **kwargs): ''' Return a dictionary of positional and keyword arguments upon function call, adding defaults from simfunc signature if not provided at call time. ''' bound_args = signature(func).bind(*args, **kwargs) bound_args.apply_defaults() return dict(bound_args.arguments) def getMeta(model, simfunc, *args, **kwargs): ''' Construct an informative dictionary about the model and simulation parameters. ''' # Retrieve function call arguments args_dict = resolveFuncArgs(simfunc, model, *args, **kwargs) # Construct meta dictionary meta = {'simkey': model.simkey} for k, v in args_dict.items(): if k == 'self': meta['model'] = v.meta else: meta[k] = v return meta def bounds(arr): ''' Return the bounds or a numpy array / list. ''' return (np.nanmin(arr), np.nanmax(arr)) def addColumn(df, key, arr, preceding_key=None): ''' Add a new column to a dataframe, right after a specific column. ''' df[key] = arr if preceding_key is not None: cols = df.columns.tolist()[:-1] preceding_index = cols.index(preceding_key) df = df[cols[:preceding_index + 1] + [key] + cols[preceding_index + 1:]] return df def integerSuffix(n): return 'th' if 4 <= n % 100 <= 20 else {1: 'st', 2: 'nd', 3: 'rd'}.get(n % 10, 'th') def customStrftime(fmt, dt_obj): return dt_obj.strftime(fmt).replace('{S}', str(dt_obj.day) + integerSuffix(dt_obj.day)) def friendlyLogspace(xmin, xmax, bases=None): ''' Define a "friendly" logspace between two bounds. ''' if bases is None: bases = [1, 2, 5] bases = np.asarray(bases) bounds = np.array([xmin, xmax]) logbounds = np.log10(bounds) bounds_orders = np.floor(logbounds) orders = np.arange(bounds_orders[0], bounds_orders[1] + 1) factors = np.power(10., np.floor(orders)) seq = np.hstack([bases * f for f in factors]) if xmax > seq.max(): seq = np.hstack((seq, xmax)) seq = seq[np.logical_and(seq >= xmin, seq <= xmax)] if xmin not in seq: seq = np.hstack((xmin, seq)) if xmax not in seq: seq = np.hstack((seq, xmax)) return seq def differing(d1, d2, subdkey=None, diff=None): ''' Find differences in values across two dictionaries (recursively). :param d1: first dictionary :param d2: second dictionary :param subdkey: specific sub-dictionary attribute key for objects :param diff: existing diff list to append to :return: list of (key, value1, value2) tuples for each differing values ''' # Initilize diff list if diff is None: diff = [] # Check that the two dicts have the same structure if sorted(list(d1.keys())) != sorted(list(d2.keys())): raise ValueError('inconsistent inputs') # For each key - values triplet for k in d1.keys(): # If values are dicts themselves, loop recursively through them if isinstance(d1[k], dict): diff = differing(d1[k], d2[k], subdkey=subdkey, diff=diff) # If values are objects with a specific sub-dictionary attribute, # loop recursively through them elif hasattr(d1[k], subdkey): diff = differing(getattr(d1[k], subdkey), getattr(d2[k], subdkey), subdkey=subdkey, diff=diff) # Otherwise else: # If values differ, add the key - values triplet to the diff list if d1[k] != d2[k]: diff.append((k, d1[k], d2[k])) # Return the diff list return diff def extractCommonPrefix(labels): ''' Extract a common prefix and a list of suffixes from a list of labels. ''' prefix = os.path.commonprefix(labels) if len(prefix) == 0: return None return prefix, [s.split(prefix)[1] for s in labels] def cycleAvg(t, y, T): ''' Cycle-average a vector according to a given periodicity. :param t: time vector ;param y: signal vector :param T: periodicity :return: cycle-averaged signal vector ''' t -= t[0] n = int(np.ceil(t[-1] / T)) return np.array([ np.mean(y[np.where((t >= i * T) & (t < (i + 1) * T))[0]]) for i in range(n)]) def pairwise(iterable): ''' s -> (s0,s1), (s1,s2), (s2, s3), ... ''' a, b = itertools.tee(iterable) next(b, None) return list(zip(a, b)) def padleft(x): return np.pad(x, (1, 0), 'edge') def padright(x): return np.pad(x, (0, 1), 'edge') def timeThreshold(t, y, dy_thr): ''' Find time interval required to reach a given threshold in a non-monotonous signal. ''' y -= y[0] # remove initial offset ifirst = np.where(y > dy_thr)[0][0] return np.interp(dy_thr, y[:ifirst + 1], t[:ifirst + 1]) def flatten(din): ''' Flatten a two level dictionary ''' dout = {} for k, v in din.items(): for k2, v2 in v.items(): dout[f'{k} - {k2}'] = v2 return dout def rangecode(x, label, unit): ''' String describing a range. ''' bounds_str = si_format([x.min(), x.max()], 1, space="") return '{0}{1}{3}-{2}{3}_{4}'.format(label.replace(' ', '_'), *bounds_str, unit, x.size) def getTimeStr(seconds): ss, subss = divmod(seconds, 1) ss, cs = int(ss), int(np.round(subss * 1e2)) mm, ss = divmod(ss, 60) hh, mm = divmod(mm, 60) dd, hh = divmod(hh, 24) s = f'{hh:d}:{mm:02d}:{ss:02d}.{cs:02d}' if dd > 0: def plural(n): return n, abs(n) != 1 and 's' or '' s = '{:d} day{}, {}'.format(*plural(dd), s) return s