diff --git a/PySONIC/core/timeseries.py b/PySONIC/core/timeseries.py index fe4aaa6..20aaf24 100644 --- a/PySONIC/core/timeseries.py +++ b/PySONIC/core/timeseries.py @@ -1,207 +1,219 @@ # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Email: theo.lemaire@epfl.ch # @Date: 2021-05-15 11:01:04 # @Last Modified by: Theo Lemaire -# @Last Modified time: 2021-05-17 21:48:19 +# @Last Modified time: 2021-05-18 12:47:48 import pandas as pd import numpy as np from scipy.interpolate import interp1d from ..utils import cycleAvg class TimeSeries(pd.DataFrame): ''' Wrapper around pandas DataFrame to store timeseries data. ''' time_key = 't' stim_key = 'stimstate' def __init__(self, t, stim, dout): super().__init__(data={ self.time_key: t, self.stim_key: stim, **dout }) @property def time(self): return self[self.time_key].values @property def tbounds(self): return self.time.min(), self.time.max() @property def stim(self): return self[self.stim_key].values @property def inputs(self): return [self.time_key, self.stim_key] @property def outputs(self): return list(set(self.columns.values) - set(self.inputs)) def addColumn(self, key, arr, preceding_key=None): ''' Add a new column to the timeseries dataframe, right after a specific column. ''' self[key] = arr if preceding_key is not None: cols = self.columns.tolist()[:-1] preceding_index = cols.index(preceding_key) new_cols = cols[:preceding_index + 1] + [key] + cols[preceding_index + 1:] self.reindex(columns=new_cols) # self = self[cols[:preceding_index + 1] + [key] + cols[preceding_index + 1:]] def interpCol(self, t, k): ''' Interpolate a column according to a new time vector. ''' kind = 'nearest' if k == self.stim_key else 'linear' return interp1d(self.time, self[k].values, kind=kind)(t) def interpolate(self, t): ''' Interpolate the entire dataframe according to a new time vector. ''' stim = self.interpCol(t, self.stim_key) outputs = {k: self.interpCol(t, k) for k in self.outputs} return self.__class__(t, stim, outputs) def resample(self, dt): ''' Resample dataframe at regular time step. ''' tmin, tmax = self.tbounds n = int((tmax - tmin) / dt) + 1 return self.interpolate(np.linspace(tmin, tmax, n)) def cycleAveraged(self, T): ''' Cycle-average a periodic solution. ''' t = np.arange(self.time[0], self.time[-1], T) stim = interp1d(self.time, self.stim, kind='nearest')(t) outputs = {k: cycleAvg(self.time, self[k].values, T) for k in self.outputs} outputs = {k: interp1d(t + T / 2, v, kind='linear', fill_value='extrapolate')(t) for k, v in outputs.items()} return self.__class__(t, stim, outputs) def prepend(self, t0=0): ''' Repeat first row outputs for a preceding time. ''' if t0 > self.time.min(): raise ValueError('t0 greater than minimal time value') self.loc[-1] = self.iloc[0] # repeat first row self.index = self.index + 1 # shift index self.sort_index(inplace=True) self[self.time_key][0] = t0 self[self.stim_key][0] = 0 def bound(self, tbounds): ''' Restrict all columns of dataframe to indexes corresponding to time values within specific bounds. ''' tmin, tmax = tbounds return self[np.logical_and(self.time >= tmin, self.time <= tmax)].reset_index(drop=True) def checkAgainst(self, other): assert isinstance(other, self.__class__), 'classes do not match' assert all(self.keys() == other.keys()), 'differing keys' for k in self.inputs: assert all(self[k].values == other[k].values), f'{k} vectors do not match' def operate(self, other, op): ''' Generic arithmetic operator. ''' self.checkAgainst(other) return self.__class__( self.time, self.stim, {k: getattr(self[k].values, op)(other[k].values) for k in self.outputs} ) def __add__(self, other): ''' Addition operator. ''' return self.operate(other, '__add__') def __sub__(self, other): ''' Subtraction operator. ''' return self.operate(other, '__sub__') def __mul__(self, other): ''' Multiplication operator. ''' return self.operate(other, '__mul__') def __truediv__(self, other): ''' Division operator. ''' return self.operate(other, '__truediv__') + def dump(self, keys): + for k in keys: + del self[k] + + def dumpOutputsOtherThan(self, storekeys): + self.dump(list(filter(lambda x: x not in storekeys, self.outputs))) + + class SpatiallyExtendedTimeSeries: def __init__(self, data): self.data = data def __iter__(self): raise ValueError(f'{self.__class__.__name__} is not iterable') def keys(self): return self.data.keys() def values(self): return self.data.values() def items(self): return self.data.items() def __getitem__(self, key): return self.data[key] def __delitem__(self, key): del self.data[key] def __setitem__(self, key, value): self.data[key] = value def checkAgainst(self, other): assert isinstance(other, self.__class__), 'differing classes' assert self.keys() == other.keys(), 'differing keys' for k in self.keys(): self.data[k].checkAgainst(other.data[k]) def operate(self, other, op): self.checkAgainst(other) return self.__class__({ k: getattr(self.data[k], op)(other.data[k]) for k in self.keys()}) def __add__(self, other): ''' Addition operator. ''' return self.operate(other, '__add__') def __sub__(self, other): ''' Subtraction operator. ''' return self.operate(other, '__sub__') def __mul__(self, other): ''' Multiplication operator. ''' return self.operate(other, '__mul__') def __truediv__(self, other): ''' Division operator. ''' return self.operate(other, '__truediv__') def cycleAveraged(self, *args, **kwargs): return self.__class__({k: v.cycleAveraged(*args, **kwargs) for k, v in self.items()}) def prepend(self, *args, **kwargs): for k in self.keys(): self.data[k].prepend(*args, **kwargs) def getArray(self, varkey, prefix=None): section_keys = list(self.keys()) if prefix is not None: section_keys = list(filter(lambda x: x.startswith(prefix), section_keys)) return np.array([self[k][varkey].values for k in section_keys]) @property def refkey(self): return list(self.keys())[0] @property def time(self): return self.data[self.refkey].time @property def stim(self): return self.data[self.refkey].stim + + def dumpOutputsOtherThan(self, *args, **kwargs): + for k, v in self.items(): + v.dumpOutputsOtherThan(*args, **kwargs) diff --git a/PySONIC/multicomp/coupled_nbls.py b/PySONIC/multicomp/coupled_nbls.py index 6bb69de..14382a5 100644 --- a/PySONIC/multicomp/coupled_nbls.py +++ b/PySONIC/multicomp/coupled_nbls.py @@ -1,266 +1,302 @@ # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Email: theo.lemaire@epfl.ch # @Date: 2021-05-14 17:50:14 # @Last Modified by: Theo Lemaire -# @Last Modified time: 2021-05-17 22:19:01 +# @Last Modified time: 2021-05-18 14:24:12 +import os +import pickle import logging import numpy as np from ..utils import logger, isWithin from ..core import Model, EventDrivenSolver, TimeSeries, SpatiallyExtendedTimeSeries from ..constants import CLASSIC_TARGET_DT, MAX_NSAMPLES_EFFECTIVE class CoupledSonophores: ''' Interface allowing to run benchmark simulations of a two-compartment coupled NBLS model. ''' simkey = 'COUPLED_ASTIM' # keyword used to characterize simulations made with this model ga_bounds = [1e-10, 1e10] # S/m2 def __init__(self, nodes, ga): ''' Initialization. :param nodes: list of nbls objects :param ga: axial conductance (S/m2) ''' assert all(x.pneuron == nodes[0].pneuron for x in nodes), 'differing point-neuron models' self.nodes = nodes self.nnodes = len(nodes) self.ga = ga def normalizedConductanceMatrix(self): ones = np.ones(self.nnodes) return np.diag(ones, 0) + np.diag(-ones[:-1], -1) + np.diag(-ones[:-1], 1) def copy(self): return self.__class__(self.nodes, self.ga) @property def meta(self): return { 'nodes': self.nodes, 'ga': self.ga } @property def refnode(self): return self.nodes[0] @property def refpneuron(self): return self.refnode.pneuron @property def gastr(self): return f'{self.ga:.2e} S/m2' @property def mechstr(self): return self.refnode.pneuron.name def __repr__(self): params = [f'ga = {self.gastr}'] return f'{self.__class__.__name__}({self.mechstr} dynamics, {", ".join(params)})' @property def ga(self): return self._ga @ga.setter def ga(self, value): if value != 0.: assert isWithin('ga', value, self.ga_bounds) self._ga = value self.ga_matrix = self.normalizedConductanceMatrix() * value def Iax(self, Vm): ''' Compute array of axial currents in each compartment based on array of potentials. ''' return -self.ga_matrix.dot(Vm) # mA/m2 def serialize(self, y): ''' Serialize a single state vector into a state-per-node matrix. ''' return np.reshape(y.copy(), (self.npernode * self.nnodes)) def deserialize(self, y): ''' Deserialize a state per node matrix into a single state vector. ''' return np.reshape(y.copy(), (self.nnodes, self.npernode)) def fullDerivatives(self, t, y, drives, fs): ''' full derivatives method. ''' # Deserialize states vector y = self.deserialize(y) # Compute derivatives array for uncoupled nbls systems dydt = np.vstack([self.nodes[i].fullDerivatives(t, y[i], drives[i], fs[i]) for i in range(self.nnodes)]) # Compute membrane capacitance and potential profiles iZ, iQ = 1, 3 Cm = np.array([x.spatialAverage(fs[i], x.capacitance(y[i, iZ]), x.Cm0) for i, x in enumerate(self.nodes)]) # F/m2 Vm = y[:, iQ] / Cm * 1e3 # mV # Add axial currents to charge derivatives column dydt[:, iQ] += self.Iax(Vm) * 1e-3 # A/m2 # Return serialized derivatives vector return self.serialize(dydt) def effDerivatives(self, t, y, lkps1d): ''' effective derivatives method. ''' # Deserialize states vector y = self.deserialize(y) # Compute derivatives array for uncoupled nbls systems iQ = 0 dydt = np.vstack([self.nodes[i].effDerivatives(t, y[i], lkps1d[i], []) for i in range(self.nnodes)]) # Interpolate membrane potentials Vm = np.array([x.interpolate1D(Qm)['V'] for x, Qm in zip(lkps1d, y[:, iQ])]) # mV # Add axial currents to charge derivatives column dydt[:, iQ] += self.Iax(Vm) * 1e-3 # A/m2 # Return serialized derivatives vector return self.serialize(dydt) def deserializeSolution(self, data): ''' Re-arrange solution per node. ''' inputs = [data.time, data.stim] output_keys = { f'node{i + 1}': list(filter(lambda x: x.endswith(f'_{i + 1}'), data.outputs)) for i in range(self.nnodes)} outputs = {k: {x[:-2]: data[x].values for x in v} for k, v in output_keys.items()} return SpatiallyExtendedTimeSeries({ k: TimeSeries(*inputs, v) for k, v in outputs.items()}) def __simFull(self, drives, pp, fs): # Determine time step assert all(x.dt == drives[0].dt for x in drives), 'differing time steps' dt = drives[0].dt # Compute and serialize initial conditions y0 = [self.nodes[i].fullInitialConditions(drives[i], self.nodes[i].Qm0, dt) for i in range(self.nnodes)] self.npernode = len(y0[0]) y0 = {f'{k}_{i + 1}': v for i, x in enumerate(y0) for k, v in x.items()} # Define event function def updateDrives(obj, x): for sd, d in zip(obj.drives, drives): sd.xvar = d.xvar * x # Initialize solver solver = EventDrivenSolver( lambda x: updateDrives(solver, x), y0.keys(), lambda t, y: self.fullDerivatives(t, y, solver.drives, fs), event_params={'drives': [drive.copy().updatedX(0.) for drive in drives]}, dt=dt) # Compute serialized solution data = solver( y0, pp.stimEvents(), pp.tstop, target_dt=CLASSIC_TARGET_DT, log_period=pp.tstop / 100 if logger.getEffectiveLevel() <= logging.INFO else None, # logfunc=lambda y: f'Qm = {y[3] * 1e5:.2f} nC/cm2' ) # Re-arrange solution per node data = self.deserializeSolution(data) # Remove velocity and add voltage timeseries to solution for i, (nodekey, nodedata) in enumerate(data.items()): del nodedata['U'] nodedata.addColumn( 'Vm', self.nodes[i].deflectionDependentVm(nodedata['Qm'], nodedata['Z'], fs[i])) # Return solution return data def __simSonic(self, drives, pp, fs): # Determine time step # dt = self.refpneuron.chooseTimeStep() # s dt = drives[0].periodicity # s # Load appropriate 2D lookups lkps = [self.nodes[i].getLookup2D(drives[i].f, fs[i]) for i in range(self.nnodes)] # Compute and serialize initial conditions y0 = [{'Qm': x.Qm0, **{k: v(x.pneuron.Vm0) for k, v in x.pneuron.steadyStates().items()}} for x in self.nodes] self.npernode = len(y0[0]) y0 = {f'{k}_{i + 1}': v for i, x in enumerate(y0) for k, v in x.items()} # Define event function def updateLookups(obj, x): for i, (lkp, drive) in enumerate(zip(lkps, drives)): obj.lkps[i] = lkp.project('A', drive.xvar * x) # Initialize solver solver = EventDrivenSolver( lambda x: updateLookups(solver, x), y0.keys(), lambda t, y: self.effDerivatives(t, y, solver.lkps), event_params={'lkps': [lkp.project('A', 0.) for lkp in lkps]}, dt=dt) # Compute serialized solution data = solver( y0, pp.stimEvents(), pp.tstop, log_period=pp.tstop / 100 if pp.tstop >= 5 else None, max_nsamples=MAX_NSAMPLES_EFFECTIVE) # Re-arrange solution per node data = self.deserializeSolution(data) # Add voltage timeseries to solution (interpolated along Qm) and dummy Z and ng vectors for i, (nodekey, nodedata) in enumerate(data.items()): nodedata.addColumn('Vm', self.nodes[i].interpEffVariable( 'V', nodedata['Qm'], nodedata.stim * drives[i].A, lkps[i])) for key in ['Z', 'ng']: nodedata[key] = np.full(nodedata.time.size, np.nan) # Return solution dataframe return data def intMethods(self): ''' Listing of model integration methods. ''' return { 'full': self.__simFull, 'sonic': self.__simSonic } def desc(self, meta): method = meta['method'] if 'method' in meta else meta['model']['method'] fs = meta['fs'] if 'fs' in meta else meta['model']['fs'] drives_str = ', '.join([x.desc for x in meta['drives']]) fs_str = f'fs = ({", ".join([f"{x * 1e2:.2f}%" for x in fs])})' return f'{self}: {method} simulation @ ({drives_str}), {meta["pp"].desc}, {fs_str}' @Model.addMeta @Model.logDesc def simulate(self, drives, pp, fs, method='sonic'): ''' Simulate the coupled-nbls model for a specific set of US stimulation parameters, and coverage fractions, and return spatially-distributed output data. :param drives: list of acoustic drive objects :param pp: pulse protocol object :param fs: list of sonophore membrane coverage fractions (-) :param method: selected integration method :return: SpatiallyExtendedTimeSeries object ''' # Check that inputs dimensions matches number of nodes assert len(drives) == self.nnodes, 'number of drives does not match number of nodes' assert len(fs) == self.nnodes, 'number of coverage inputs does not match number of nodes' simfunc = self.intMethods()[method] return simfunc(drives, pp, fs) + def filecodes(self, drives, pp, fs, method): + codes = { + 'simkey': self.simkey, + 'neuron': self.refpneuron.name, + 'nnodes': f'{self.nnodes}node{"s" if self.nnodes > 1 else ""}', + 'a': f'a{"_".join([f"{x.a * 1e9:.0f}nm" for x in self.nodes])}' + } + for i, drive in enumerate(drives): + codes.update({f'{k}_{i}': v for k, v in drive.filecodes.items()}) + codes.update(pp.filecodes) + codes['fs'] = f'fs{"_".join([f"{x * 1e2:.0f}%" for x in fs])}' + codes['method'] = method + return codes + + def filecode(self, *args): + return '_'.join([x for x in self.filecodes(*args).values() if x is not None]) + + def simAndSave(self, *args, outdir='.', overwrite=False, minimize_output=True): + fname = f'{self.filecode(*args)}.pkl' + fpath = os.path.join(outdir, fname) + if os.path.isfile(fpath) and not overwrite: + logger.info(f'Loading data from "{os.path.basename(fpath)}"') + with open(fpath, 'rb') as fh: + frame = pickle.load(fh) + data, meta = frame['data'], frame['meta'] + else: + data, meta = self.simulate(*args) + if minimize_output: + data.dumpOutputsOtherThan(['Qm', 'Vm']) + with open(fpath, 'wb') as fh: + pickle.dump({'meta': meta, 'data': data}, fh) + logger.debug(f'simulation data exported to "{fpath}"') + return data, meta + @property def tauax(self): ''' Axial time constant (s). ''' return self.refnode.Cm0 / self.ga @property def taum(self): ''' Passive membrane time constant (s). ''' return self.refpneuron.tau_pas @property def taumax(self): ''' Maximal time constant of the model (s). ''' return max(self.taum, self.tauax)