diff --git a/PySONIC/core/nbls.py b/PySONIC/core/nbls.py index 9b88530..210c46f 100644 --- a/PySONIC/core/nbls.py +++ b/PySONIC/core/nbls.py @@ -1,688 +1,682 @@ # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Email: theo.lemaire@epfl.ch # @Date: 2016-09-29 16:16:19 # @Last Modified by: Theo Lemaire -# @Last Modified time: 2020-05-02 17:10:32 +# @Last Modified time: 2020-05-02 17:13:13 import logging import numpy as np from .solvers import EventDrivenSolver, HybridSolver from .bls import BilayerSonophore from .pneuron import PointNeuron from .model import Model -from .drives import AcousticDrive +from .drives import AcousticDrive, ElectricDrive from .protocols import * from ..utils import * from ..constants import * from ..postpro import getFixedPoints from .lookups import EffectiveVariablesLookup from ..neurons import getPointNeuron NEURONS_LOOKUP_DIR = os.path.abspath(os.path.split(__file__)[0] + "/../lookups/") class NeuronalBilayerSonophore(BilayerSonophore): ''' This class inherits from the BilayerSonophore class and receives an PointNeuron instance at initialization, to define the electro-mechanical NICE model and its SONIC variant. ''' tscale = 'ms' # relevant temporal scale of the model simkey = 'ASTIM' # keyword used to characterize simulations made with this model def __init__(self, a, pneuron, embedding_depth=0.0): ''' Constructor of the class. :param a: in-plane radius of the sonophore structure within the membrane (m) :param pneuron: point-neuron model :param embedding_depth: depth of the embedding tissue around the membrane (m) ''' self.pneuron = pneuron super().__init__(a, pneuron.Cm0, pneuron.Qm0, embedding_depth=embedding_depth) @property def a_str(self): return f'{self.a * 1e9:.1f} nm' def __repr__(self): s = f'{self.__class__.__name__}({self.a_str}, {self.pneuron}' if self.d > 0.: s += f', d={si_format(self.d, precision=1)}m' return f'{s})' def copy(self): return self.__class__(self.a, self.pneuron, embedding_depth=self.d) def __eq__(self, other): if not isinstance(other, self.__class__): return False return self.a == other.a and self.pneuron == other.pneuron and self.d == other.d @property def pneuron(self): return self._pneuron @pneuron.setter def pneuron(self, value): if not isinstance(value, PointNeuron): raise ValueError(f'{value} is not a valid PointNeuron instance') if not hasattr(self, '_pneuron') or value != self._pneuron: self._pneuron = value if hasattr(self, 'a'): super().__init__(self.a, self.pneuron.Cm0, self.pneuron.Qm0, embedding_depth=self.d) @property def meta(self): return { 'neuron': self.pneuron.name, 'a': self.a, 'd': self.d, } @classmethod def initFromMeta(cls, meta): return cls(meta['a'], getPointNeuron(meta['neuron']), embedding_depth=meta['d']) def params(self): return {**super().params(), **self.pneuron.params()} def getPltVars(self, wrapleft='df["', wrapright='"]'): return {**super().getPltVars(wrapleft, wrapright), **self.pneuron.getPltVars(wrapleft, wrapright)} @property def pltScheme(self): return self.pneuron.pltScheme def filecode(self, *args): return Model.filecode(self, *args) @staticmethod def inputs(): # Get parent input vars and supress irrelevant entries inputvars = BilayerSonophore.inputs() del inputvars['Qm'] # Fill in current input vars in appropriate order inputvars.update({ **AcousticDrive.inputs(), 'fs': { 'desc': 'sonophore membrane coverage fraction', 'label': 'f_s', 'unit': '\%', 'factor': 1e2, 'precision': 0 }, 'method': None }) return inputvars def filecodes(self, drive, pp, fs, method, qss_vars): codes = { 'simkey': self.simkey, 'neuron': self.pneuron.name, 'nature': pp.nature, 'a': f'{self.a * 1e9:.0f}nm', **drive.filecodes, **pp.filecodes, } codes['fs'] = f'fs{fs * 1e2:.0f}%' if fs < 1 else None codes['method'] = method codes['qss_vars'] = qss_vars return codes @staticmethod def interpEffVariable(key, Qm, stim, lkp): ''' Interpolate Q-dependent effective variable along various stimulation states of a solution. :param key: lookup variable key :param Qm: charge density solution vector :param stim: stimulation state solution vector :param lkp: 2D lookup object :return: interpolated effective variable vector ''' x = np.zeros(stim.size) stim_vals = np.unique(stim) for s in stim_vals: x[stim == s] = lkp.project('A', s).interpVar1D(Qm[stim == s], key) return x @staticmethod def spatialAverage(fs, x, x0): ''' fs-modulated spatial averaging. ''' return fs * x + (1 - fs) * x0 @timer def computeEffVars(self, drive, Qm, fs): ''' Compute "effective" coefficients of the HH system for a specific acoustic stimulus and charge density. A short mechanical simulation is run while imposing the specific charge density, until periodic stabilization. The HH coefficients are then averaged over the last acoustic cycle to yield "effective" coefficients. :param drive: acoustic drive object :param Qm: imposed charge density (C/m2) :param fs: list of sonophore membrane coverage fractions :return: list with computation time and a list of dictionaries of effective variables ''' # Run simulation and retrieve deflection and gas content vectors from last cycle data = super().simCycles(drive, Qm) Z_last = data.loc[-NPC_DENSE:, 'Z'].values # m Cm_last = self.v_capacitance(Z_last) # F/m2 # For each coverage fraction effvars = [] for x in fs: # Compute membrane capacitance and membrane potential vectors Cm = self.spatialAverage(x, Cm_last, self.Cm0) # F/m2 Vm = Qm / Cm * 1e3 # mV # Compute average cycle value for membrane potential and rate constants effvars.append({**{'V': np.mean(Vm)}, **self.pneuron.getEffRates(Vm)}) # Log process log = f'{self}: lookups @ {drive.desc}, {Qm * 1e5:.2f} nC/cm2' if len(fs) > 1: log += f', fs = {fs.min() * 1e2:.0f} - {fs.max() * 1e2:.0f}%' logger.info(log) # Return effective coefficients return effvars def getLookupFileName(self, a=None, f=None, A=None, fs=False): fname = f'{self.pneuron.name}_lookups' if a is not None: fname += f'_{a * 1e9:.0f}nm' if f is not None: fname += f'_{f * 1e-3:.0f}kHz' if A is not None: fname += f'_{A * 1e-3:.0f}kPa' if fs is True: fname += '_fs' return f'{fname}.pkl' def getLookupFilePath(self, *args, **kwargs): return os.path.join(NEURONS_LOOKUP_DIR, self.getLookupFileName(*args, **kwargs)) def getLookup(self, *args, **kwargs): keep_tcomp = kwargs.pop('keep_tcomp', False) lookup_path = self.getLookupFilePath(*args, **kwargs) lkp = EffectiveVariablesLookup.fromPickle(lookup_path) if not keep_tcomp: del lkp.tables['tcomp'] return lkp def getLookup2D(self, f, fs): proj_kwargs = {'a': self.a, 'f': f, 'fs': fs} if fs < 1.: kwargs = proj_kwargs.copy() kwargs['fs'] = True else: kwargs = {} return self.getLookup(**kwargs).projectN(proj_kwargs) def fullDerivatives(self, t, y, drive, fs): ''' Compute the full system derivatives. :param t: specific instant in time (s) :param y: vector of state variables :param drive: acoustic drive object :param fs: sonophore membrane coverage fraction (-) :return: vector of derivatives ''' dydt_mech = BilayerSonophore.derivatives( self, t, y[:3], drive, y[3]) dydt_elec = self.pneuron.derivatives( t, y[3:], Cm=self.spatialAverage(fs, self.capacitance(y[1]), self.Cm0)) return dydt_mech + dydt_elec def effDerivatives(self, t, y, lkp1d, qss_vars): ''' Compute the derivatives of the n-ODE effective system variables, based on 1-dimensional linear interpolation of "effective" coefficients that summarize the system's behaviour over an acoustic cycle. :param t: specific instant in time (s) :param y: vector of HH system variables at time t :param lkp: dictionary of 1D data points of "effective" coefficients over the charge domain, for specific frequency and amplitude values. :param qss_vars: list of QSS variables :return: vector of effective system derivatives at time t ''' # Unpack values and interpolate lookup at current charge density Qm, *states = y lkp0d = lkp1d.interpolate1D(Qm) # Compute states dictionary from differential and QSS variables states_dict = {} i = 0 for k in self.pneuron.statesNames(): if k in qss_vars: states_dict[k] = self.pneuron.quasiSteadyStates()[k](lkp0d) else: states_dict[k] = states[i] i += 1 # Compute charge density derivative dQmdt = - self.pneuron.iNet(lkp0d['V'], states_dict) * 1e-3 # Compute states derivative vector only for differential variable dstates = [] for k in self.pneuron.statesNames(): if k not in qss_vars: dstates.append(self.pneuron.derEffStates()[k](lkp0d, states_dict)) return [dQmdt, *dstates] def deflectionDependentVm(self, Qm, Z, fs): ''' Compute deflection (and sonopphore coverage fraction) dependent voltage profile. ''' return Qm / self.spatialAverage(fs, self.v_capacitance(Z), self.Cm0) * 1e3 # mV def fullInitialConditions(self, *args, **kwargs): ''' Compute simulation initial conditions. ''' y0 = super().initialConditions(*args, **kwargs) y0.update({ 'Qm': [self.Qm0] * 2, **{k: [self.pneuron.steadyStates()[k](self.pneuron.Vm0)] * 2 for k in self.pneuron.statesNames()} }) return y0 def __simFull(self, drive, pp, fs): # Compute initial conditions y0 = self.fullInitialConditions(drive, self.Qm0, drive.dt) # Initialize solver and compute solution solver = EventDrivenSolver( lambda x: setattr(solver.drive, 'xvar', drive.xvar * x), # eventfunc y0.keys(), # variables list lambda t, y: self.fullDerivatives(t, y, solver.drive, fs), # dfunc event_params={'drive': drive.copy().updatedX(0.)}, # event parameters dt=drive.dt) # time step data = solver( y0, pp.stimEvents(), pp.tstop, target_dt=CLASSIC_TARGET_DT, log_period=pp.tstop / 100 if logger.getEffectiveLevel() <= logging.INFO else None, # logfunc=lambda y: f'Qm = {y[3] * 1e5:.2f} nC/cm2' ) # Remove velocity and add voltage timeseries to solution del data['U'] data = addColumn( data, 'Vm', self.deflectionDependentVm(data['Qm'], data['Z'], fs), preceding_key='Qm') # Return solution dataframe return data def __simHybrid(self, drive, pp, fs): # Compute initial conditions y0 = self.fullInitialConditions(drive, self.Qm0, drive.dt) # Initialize solver and compute solution solver = HybridSolver( y0.keys(), lambda t, y: self.fullDerivatives(t, y, solver.drive, fs), # dfunc lambda t, y, Cm: self.pneuron.derivatives( t, y, Cm=self.spatialAverage(fs, Cm, self.Cm0)), # dfunc_sparse lambda yref: self.capacitance(yref[1]), # predfunc lambda x: setattr(solver.drive, 'xvar', drive.xvar * x), # eventfunc drive.periodicity, # periodicity ['U', 'Z', 'ng'], # fast-evolving variables drive.dt, # dense time step drive.dt_sparse, # sparse time step event_params={'drive': drive.copy().updatedX(0.)}, # event parameters primary_vars=['Z', 'ng'] # primary variables ) data = solver( y0, pp.stimEvents(), pp.tstop, HYBRID_UPDATE_INTERVAL, target_dt=CLASSIC_TARGET_DT, log_period=pp.tstop / 100 if logger.getEffectiveLevel() < logging.INFO else None, logfunc=lambda y: f'Qm = {y[3] * 1e5:.2f} nC/cm2' ) # Remove velocity and add voltage timeseries to solution del data['U'] data = addColumn( data, 'Vm', self.deflectionDependentVm(data['Qm'], data['Z'], fs), preceding_key='Qm') # Return solution dataframe return data def __simSonic(self, drive, pp, fs, qss_vars=None, pavg=False): # Load appropriate 2D lookup lkp = self.getLookup2D(drive.f, fs) # Adapt lookup and pulsing protocol if pulse-average mode is selected if pavg: lkp = lkp * pp.DC + lkp.project('A', 0.).tile('A', lkp.refs['A']) * (1 - pp.DC) tstim = (int(pp.tstim * pp.PRF) - 1 + pp.DC) / pp.PRF pp = TimeProtocol(tstim, pp.tstim + pp.toffset - tstim) # Determine QSS and differential variables, and create optional QSS lookup if qss_vars is None: qss_vars = [] else: lkp_QSS = EffectiveVariablesLookup( lkp.refs, {k: self.pneuron.quasiSteadyStates()[k](lkp) for k in qss_vars}) diff_vars = [item for item in self.pneuron.statesNames() if item not in qss_vars] # Set initial conditions y0 = { 'Qm': self.Qm0, **{k: self.pneuron.steadyStates()[k](self.pneuron.Vm0) for k in diff_vars} } # Initialize solver and compute solution solver = EventDrivenSolver( lambda x: setattr(solver, 'lkp', lkp.project('A', drive.xvar * x)), # eventfunc y0.keys(), # variables list lambda t, y: self.effDerivatives(t, y, solver.lkp, qss_vars), # dfunc event_params={'lkp': lkp.project('A', 0.)}, # event parameters dt=self.pneuron.chooseTimeStep()) # time step data = solver( y0, pp.stimEvents(), pp.tstop, log_period=pp.tstop / 100 if pp.tstop >= 5 else None, max_nsamples=MAX_NSAMPLES_EFFECTIVE) # Interpolate Vm and QSS variables along charge vector and store them in solution dataframe data = addColumn( data, 'Vm', self.interpEffVariable('V', data['Qm'], data['stimstate'] * drive.A, lkp), preceding_key='Qm') for k in qss_vars: data[k] = self.interpEffVariable(k, data['Qm'], data['stimstate'] * drive.A, lkp_QSS) # Add dummy deflection and gas content vectors to solution for key in ['Z', 'ng']: data[key] = np.full(data['t'].size, np.nan) # Return solution dataframe return data def intMethods(self): ''' Listing of model integration methods. ''' return { 'full': self.__simFull, 'hybrid': self.__simHybrid, 'sonic': self.__simSonic } @classmethod @Model.checkOutputDir def simQueue(cls, freqs, amps, durations, offsets, PRFs, DCs, fs, methods, qss_vars, **kwargs): ''' Create a serialized 2D array of all parameter combinations for a series of individual parameter sweeps, while avoiding repetition of CW protocols for a given PRF sweep. :param freqs: list (or 1D-array) of US frequencies :param amps: list (or 1D-array) of acoustic amplitudes :param durations: list (or 1D-array) of stimulus durations :param offsets: list (or 1D-array) of stimulus offsets (paired with durations array) :param PRFs: list (or 1D-array) of pulse-repetition frequencies :param DCs: list (or 1D-array) of duty cycle values :param fs: sonophore membrane coverage fractions (-) :params methods: integration methods :param qss_vars: QSS variables :return: list of parameters (list) for each simulation ''' if ('full' in methods or 'hybrid' in methods) and kwargs['outputdir'] is None: logger.warning('Running cumbersome simulation(s) without file saving') if amps is None: amps = [None] drives = AcousticDrive.createQueue(freqs, amps) protocols = PulsedProtocol.createQueue(durations, offsets, PRFs, DCs) queue = [] for drive in drives: for pp in protocols: for cov in fs: for method in methods: queue.append([drive, pp, cov, method, qss_vars]) return queue @classmethod @Model.checkOutputDir def simQueueBurst(cls, freqs, amps, durations, PRFs, DCs, BRFs, nbursts, fs, methods, qss_vars, **kwargs): if ('full' in methods or 'hybrid' in methods) and kwargs['outputdir'] is None: logger.warning('Running cumbersome simulation(s) without file saving') if amps is None: amps = [None] drives = AcousticDrive.createQueue(freqs, amps) protocols = BurstProtocol.createQueue(durations, PRFs, DCs, BRFs, nbursts) queue = [] for drive in drives: for pp in protocols: for cov in fs: for method in methods: queue.append([drive, pp, cov, method, qss_vars]) return queue def checkInputs(self, drive, pp, fs, method, qss_vars): PointNeuron.checkInputs(drive, pp) _, xevents, = zip(*pp.stimEvents()) if np.any(np.array([xevents]) < 0.): raise ValueError('Invalid time protocol: contains negative modulators') if not isinstance(fs, float): raise TypeError(f'Invalid "fs" parameter (must be float typed)') if qss_vars is not None: if not isIterable(qss_vars) or not isinstance(qss_vars[0], str): raise ValueError('Invalid QSS variables: must be None or an iterable of strings') sn = self.pneuron.statesNames() for item in qss_vars: if item not in sn: raise ValueError(f'Invalid QSS variable: {item} (must be in {sn}') if method not in list(self.intMethods().keys()): raise ValueError(f'Invalid integration method: "{method}"') @Model.logNSpikes @Model.checkTitrate @Model.addMeta @Model.logDesc @Model.checkSimParams def simulate(self, drive, pp, fs=1., method='sonic', qss_vars=None): ''' Simulate the electro-mechanical model for a specific set of US stimulation parameters, and return output data in a dataframe. :param drive: acoustic drive object :param pp: pulse protocol object :param fs: sonophore membrane coverage fraction (-) :param method: selected integration method :return: output dataframe ''' # Set the tissue elastic modulus self.setTissueModulus(drive) # Call appropriate simulation function and return simfunc = self.intMethods()[method] simargs = [drive, pp, fs] if method == 'sonic': simargs.append(qss_vars) return simfunc(*simargs) def desc(self, meta): method = meta['method'] if 'method' in meta else meta['model']['method'] fs = meta['fs'] if 'fs' in meta else meta['model']['fs'] s = f'{self}: {method} simulation @ {meta["drive"].desc}, {meta["pp"].desc}' if fs < 1.0: s += f', fs = {(fs * 1e2):.2f}%' if 'qss_vars' in meta and meta['qss_vars'] is not None: s += f" - QSS ({', '.join(meta['qss_vars'])})" return s @staticmethod def getNSpikes(data): return PointNeuron.getNSpikes(data) def getArange(self, drive): return (0., self.getLookup().refs['A'].max()) @property def titrationFunc(self): return self.pneuron.titrationFunc @logCache(os.path.join(os.path.split(__file__)[0], 'astim_titrations.log')) def titrate(self, drive, pp, fs=1., method='sonic', qss_vars=None, xfunc=None, Arange=None): ''' Use a binary search to determine the threshold amplitude needed to obtain neural excitation for a given frequency and pulsed protocol. :param drive: unresolved acoustic drive object :param pp: pulse protocol object :param fs: sonophore membrane coverage fraction (-) :param method: integration method :return: determined threshold amplitude (Pa) ''' return super().titrate(drive, pp, fs=fs, method=method, qss_vars=qss_vars, xfunc=xfunc, Arange=Arange) def getQuasiSteadyStates(self, f, amps=None, charges=None, DC=1.0, squeeze_output=False): ''' Compute the quasi-steady state values of the neuron's gating variables for a combination of US amplitudes, charge densities, at a specific US frequency and duty cycle. :param f: US frequency (Hz) :param amps: US amplitudes (Pa) :param charges: membrane charge densities (C/m2) :param DC: duty cycle :return: 4-tuple with reference values of US amplitude and charge density, as well as interpolated Vmeff and QSS gating variables ''' # Get DC-averaged lookups interpolated at the appropriate amplitudes and charges lkp = self.getLookup().projectDC(amps=amps, DC=DC).projectN({'a': self.a, 'f': f}) if charges is not None: lkp = lkp.project('Q', charges) # Specify dimensions with A as the first axis lkp.move('A', 0) # Compute QSS states using these lookups QSS = EffectiveVariablesLookup( lkp.refs, {k: v(lkp) for k, v in self.pneuron.quasiSteadyStates().items()}) # Compress outputs if needed if squeeze_output: QSS = QSS.squeeze() lkp = lkp.squeeze() return lkp, QSS def iNetQSS(self, Qm, f, A, DC): ''' Compute quasi-steady state net membrane current for a given combination of US parameters and a given membrane charge density. :param Qm: membrane charge density (C/m2) :param f: US frequency (Hz) :param A: US amplitude (Pa) :param DC: duty cycle (-) :return: net membrane current (mA/m2) ''' lkp, QSS = self.getQuasiSteadyStates( f, amps=A, charges=Qm, DC=DC, squeeze_output=True) return self.pneuron.iNet(lkp['V'], QSS) # mA/m2 def fixedPointsQSS(self, f, A, DC, lkp, dQdt): ''' Compute QSS fixed points along the charge dimension for a given combination of US parameters, and determine their stability. :param f: US frequency (Hz) :param A: US amplitude (Pa) :param DC: duty cycle (-) :param lkp: lookup dictionary for effective variables along charge dimension :param dQdt: charge derivative profile along charge dimension :return: 2-tuple with values of stable and unstable fixed points ''' logger.debug(f'A = {A * 1e-3:.2f} kPa, DC = {DC * 1e2:.0f}%') # Extract fixed points from QSS charge variation profile def dfunc(Qm): return - self.iNetQSS(Qm, f, A, DC) fixed_points = getFixedPoints( lkp.refs['Q'], dQdt, filter='both', der_func=dfunc).tolist() dfunc = lambda x: np.array(self.effDerivatives(_, x, lkp)) # classified_fixed_points = {'stable': [], 'unstable': [], 'saddle': []} classified_fixed_points = [] np.set_printoptions(precision=2) # For each fixed point for i, Qm in enumerate(fixed_points): # Re-compute QSS at fixed point *_, QSS = self.getQuasiSteadyStates(f, amps=A, charges=Qm, DC=DC, squeeze_output=True) # Classify fixed point stability by numerically evaluating its Jacobian and # computing its eigenvalues x = np.array([Qm, *QSS.tables.values()]) eigvals, key = classifyFixedPoint(x, dfunc) # classified_fixed_points[key].append(Qm) classified_fixed_points.append((x, eigvals, key)) # eigenvalues.append(eigvals) logger.debug(f'{key} point @ Q = {(Qm * 1e5):.1f} nC/cm2') # eigenvalues = np.array(eigenvalues).T # print(eigenvalues.shape) return classified_fixed_points def isStableQSS(self, f, A, DC): lookups, QSS = self.getQuasiSteadyStates( f, amps=A, DCs=DC, squeeze_output=True) dQdt = -self.pneuron.iNet(lookups['V'], QSS.tables) # mA/m2 classified_fixed_points = self.fixedPointsQSS(f, A, DC, lookups, dQdt) return len(classified_fixed_points['stable']) > 0 class DrivenNeuronalBilayerSonophore(NeuronalBilayerSonophore): simkey = 'DASTIM' # keyword used to characterize simulations made with this model def __init__(self, Idrive, *args, **kwargs): self.Idrive = Idrive super().__init__(*args, **kwargs) def __repr__(self): return super().__repr__()[:-1] + f', Idrive = {self.Idrive:.2f} mA/m2)' @classmethod def initFromMeta(cls, meta): return cls(meta['Idrive'], meta['a'], getPointNeuron(meta['neuron']), embedding_depth=meta['d']) def params(self): return {**{'Idrive': self.Idrive}, **super().params()} @staticmethod def inputs(): return { **NeuronalBilayerSonophore.inputs(), - 'Idrive': { - 'desc': 'driving current density', - 'label': 'I_{drive}', - 'unit': 'mA/m2', - 'factor': 1e0, - 'precision': 0 - } + 'Idrive': ElectricDrive.inputs()['I'] } @property def meta(self): return { **super().meta, 'Idrive': self.Idrive } def filecodes(self, *args): return { **super().filecodes(*args), 'Idrive': f'Idrive{self.Idrive:.1f}mAm2' } def fullDerivatives(self, *args): dydt = super().fullDerivatives(*args) dydt[3] += self.Idrive * 1e-3 return dydt def effDerivatives(self, *args): dQmdt, *dstates = super().effDerivatives(*args) dQmdt += self.Idrive * 1e-3 return [dQmdt, *dstates]