diff --git a/PySONIC/core/model.py b/PySONIC/core/model.py index 6d1413c..8869666 100644 --- a/PySONIC/core/model.py +++ b/PySONIC/core/model.py @@ -1,243 +1,227 @@ # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Email: theo.lemaire@epfl.ch # @Date: 2017-08-03 11:53:04 # @Last Modified by: Theo Lemaire -# @Last Modified time: 2020-04-17 11:41:54 +# @Last Modified time: 2020-04-17 19:31:15 from functools import wraps from inspect import getdoc import abc import inspect import numpy as np from .batches import Batch from ..threshold import titrate from ..utils import * class Model(metaclass=abc.ABCMeta): ''' Generic model interface. ''' @property @abc.abstractmethod def tscale(self): ''' Relevant temporal scale of the model. ''' raise NotImplementedError @property @abc.abstractmethod def simkey(self): ''' Keyword used to characterize simulations made with the model. ''' raise NotImplementedError @abc.abstractmethod def __repr__(self): ''' String representation. ''' raise NotImplementedError def params(self): ''' Return a dictionary of all model parameters (class and instance attributes) ''' def toAvoid(p): return (p.startswith('__') and p.endswith('__')) or p.startswith('_abc_') class_attrs = inspect.getmembers(self.__class__, lambda a: not(inspect.isroutine(a))) inst_attrs = inspect.getmembers(self, lambda a: not(inspect.isroutine(a))) class_attrs = [a for a in class_attrs if not toAvoid(a[0])] inst_attrs = [a for a in inst_attrs if not toAvoid(a[0]) and a not in class_attrs] params_dict = {a[0]: a[1] for a in class_attrs + inst_attrs} return params_dict @classmethod def description(cls): return getdoc(cls).split('\n', 1)[0].strip() @abc.abstractmethod def copy(self): raise NotImplementedError @staticmethod @abc.abstractmethod def inputs(): ''' Return an informative dictionary on input variables used to simulate the model. ''' raise NotImplementedError @abc.abstractmethod def filecodes(self, *args): ''' Return a dictionary of string-encoded inputs used for file naming. ''' raise NotImplementedError def filecode(self, *args): return filecode(self, *args) @classmethod @abc.abstractmethod def getPltVars(self, *args, **kwargs): ''' Return a dictionary with information about all plot variables related to the model. ''' raise NotImplementedError @property @abc.abstractmethod def pltScheme(self): ''' Return a dictionary model plot variables grouped by context. ''' raise NotImplementedError @staticmethod def checkOutputDir(queuefunc): ''' Check if an output directory is provided in input arguments, and if so, add it to each item of the returned queue (along with an "overwrite" boolean). ''' @wraps(queuefunc) def wrapper(self, *args, **kwargs): outputdir = kwargs.get('outputdir') queue = queuefunc(self, *args, **kwargs) if outputdir is not None: overwrite = kwargs.get('overwrite', True) queue = queuefunc(self, *args, **kwargs) for i, params in enumerate(queue): position_args, keyword_args = Batch.resolve(params) keyword_args['overwrite'] = overwrite keyword_args['outputdir'] = outputdir queue[i] = (position_args, keyword_args) else: if len(queue) > 5: logger.warning('Running more than 5 simulations without file saving') return queue return wrapper @classmethod @abc.abstractmethod def simQueue(cls, *args, outputdir=None, overwrite=True): raise NotImplementedError @staticmethod @abc.abstractmethod def checkInputs(self, *args): ''' Check the validity of simulation input parameters. ''' raise NotImplementedError @abc.abstractmethod def derivatives(self, *args, **kwargs): ''' Compute ODE derivatives for a specific set of ODE states and external parameters. ''' raise NotImplementedError @abc.abstractmethod def simulate(self, *args, **kwargs): ''' Simulate the model's differential system for specific input parameters and return output data in a dataframe. ''' raise NotImplementedError @classmethod @abc.abstractmethod def meta(self, *args): ''' Return an informative dictionary about model and simulation parameters. ''' raise NotImplementedError @staticmethod def addMeta(simfunc): - ''' Add an informative dictionary about model and simulation parameters to simulation output ''' - + ''' Add informative dictionary to simulation output ''' @wraps(simfunc) def wrapper(self, *args, **kwargs): data, tcomp = timer(simfunc)(self, *args, **kwargs) logger.debug('completed in %ss', si_format(tcomp, 1)) meta_dict = getMeta(self, simfunc, *args, **kwargs) - - # target_args = resolveFuncArgs(simfunc, model, *args, **kwargs) - # # Try to retrieve meta information - # try: - # meta_params_names = list(signature(self.meta).parameters.keys()) - # meta_params = [target_args[k] for k in meta_params_names] - # meta = self.meta(*meta_params) - # except KeyError as err: - # logger.error(f'Could not find {err} parameter in "{simfunc.__name__}" function') - # meta = {} - - # Add computation time to it meta_dict['tcomp'] = tcomp - - # Return data with meta dict return data, meta_dict - return wrapper @staticmethod def logNSpikes(simfunc): ''' Log number of detected spikes on charge profile of simulation output. ''' @wraps(simfunc) def wrapper(self, *args, **kwargs): out = simfunc(self, *args, **kwargs) if out is None: return None data, meta = out nspikes = self.getNSpikes(data) logger.debug(f'{nspikes} spike{plural(nspikes)} detected') return data, meta return wrapper @staticmethod def checkSimParams(simfunc): ''' Check simulation parameters before launching simulation. ''' @wraps(simfunc) def wrapper(self, *args, **kwargs): args, kwargs = alignWithMethodDef(simfunc, args, kwargs) self.checkInputs(*args, *list(kwargs.values())) return simfunc(self, *args, **kwargs) return wrapper @staticmethod def logDesc(simfunc): ''' Log description of model and simulation parameters. ''' @wraps(simfunc) def wrapper(self, *args, **kwargs): args, kwargs = alignWithMethodDef(simfunc, args, kwargs) logger.info(self.desc(getMeta(self, simfunc, *args, **kwargs))) return simfunc(self, *args, **kwargs) return wrapper def titrate(self, *args, **kwargs): return titrate(self, *args, **kwargs) @staticmethod def checkTitrate(simfunc): ''' If unresolved drive provided in the list of input parameters, perform a titration to find the threshold drive, and simulate with resolved drive. ''' @wraps(simfunc) def wrapper(self, *args, **kwargs): # Extract drive object from args drive, *other_args = args # If drive is titratable and not fully resolved if drive.is_searchable: if not drive.is_resolved: # Titrate xthr = self.titrate(*args) # If no threshold was found, return None if np.isnan(xthr): logger.error( f'Could not find threshold {drive.inputs()[drive.xkey]["desc"]}') return None # Otherwise, update args list with resovled drive args = (drive.updatedX(xthr), *other_args) # Execute simulation function return simfunc(self, *args, **kwargs) return wrapper def simAndSave(self, *args, **kwargs): return simAndSave(self, *args, **kwargs) def getOutput(self, *args, **kwargs): ''' Get simulation output data for a specific parameters combination, by looking for an output file into a specific directory. If a corresponding output file is not found in the specified directory, the model is first run and results are saved in the output file. ''' fpath = self.simAndSave(*args, overwrite=False, **kwargs) return loadData(fpath) diff --git a/PySONIC/core/nbls.py b/PySONIC/core/nbls.py index a93f52e..b744a49 100644 --- a/PySONIC/core/nbls.py +++ b/PySONIC/core/nbls.py @@ -1,667 +1,664 @@ # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Email: theo.lemaire@epfl.ch # @Date: 2016-09-29 16:16:19 # @Last Modified by: Theo Lemaire -# @Last Modified time: 2020-04-17 15:23:12 +# @Last Modified time: 2020-04-17 18:06:52 import logging import numpy as np from .solvers import EventDrivenSolver, HybridSolver from .bls import BilayerSonophore from .pneuron import PointNeuron from .model import Model from .drives import Drive, AcousticDrive -from .protocols import TimeProtocol, PulsedProtocol +from .protocols import * from ..utils import * from ..constants import * from ..postpro import getFixedPoints from .lookups import EffectiveVariablesLookup from ..neurons import getPointNeuron NEURONS_LOOKUP_DIR = os.path.abspath(os.path.split(__file__)[0] + "/../lookups/") class NeuronalBilayerSonophore(BilayerSonophore): ''' This class inherits from the BilayerSonophore class and receives an PointNeuron instance at initialization, to define the electro-mechanical NICE model and its SONIC variant. ''' tscale = 'ms' # relevant temporal scale of the model simkey = 'ASTIM' # keyword used to characterize simulations made with this model def __init__(self, a, pneuron, embedding_depth=0.0): ''' Constructor of the class. :param a: in-plane radius of the sonophore structure within the membrane (m) :param pneuron: point-neuron model :param embedding_depth: depth of the embedding tissue around the membrane (m) ''' self.pneuron = pneuron super().__init__(a, pneuron.Cm0, pneuron.Qm0, embedding_depth=embedding_depth) @property def a_str(self): return f'{self.a * 1e9:.1f} nm' def __repr__(self): s = f'{self.__class__.__name__}({self.a_str}, {self.pneuron}' if self.d > 0.: s += f', d={si_format(self.d, precision=1)}m' return f'{s})' def copy(self): return self.__class__(self.a, self.pneuron, embedding_depth=self.d) def __eq__(self, other): if not isinstance(other, self.__class__): return False return self.a == other.a and self.pneuron == other.pneuron and self.d == other.d @property def pneuron(self): return self._pneuron @pneuron.setter def pneuron(self, value): if not isinstance(value, PointNeuron): raise ValueError(f'{value} is not a valid PointNeuron instance') if not hasattr(self, '_pneuron') or value != self._pneuron: self._pneuron = value if hasattr(self, 'a'): super().__init__(self.a, self.pneuron.Cm0, self.pneuron.Qm0, embedding_depth=self.d) @property def meta(self): return { 'neuron': self.pneuron.name, 'a': self.a, 'd': self.d, } @classmethod def initFromMeta(cls, meta): return cls(meta['a'], getPointNeuron(meta['neuron']), embedding_depth=meta['d']) def params(self): return {**super().params(), **self.pneuron.params()} def getPltVars(self, wrapleft='df["', wrapright='"]'): return {**super().getPltVars(wrapleft, wrapright), **self.pneuron.getPltVars(wrapleft, wrapright)} @property def pltScheme(self): return self.pneuron.pltScheme def filecode(self, *args): return Model.filecode(self, *args) @staticmethod def inputs(): # Get parent input vars and supress irrelevant entries inputvars = BilayerSonophore.inputs() del inputvars['Qm'] # Fill in current input vars in appropriate order inputvars.update({ **AcousticDrive.inputs(), 'fs': { 'desc': 'sonophore membrane coverage fraction', 'label': 'f_s', 'unit': '\%', 'factor': 1e2, 'precision': 0 }, 'method': None }) return inputvars def filecodes(self, drive, pp, fs, method, qss_vars): codes = { 'simkey': self.simkey, 'neuron': self.pneuron.name, 'nature': pp.nature, 'a': f'{self.a * 1e9:.0f}nm', **drive.filecodes, **pp.filecodes, } codes['fs'] = f'fs{fs * 1e2:.0f}%' if fs < 1 else None codes['method'] = method codes['qss_vars'] = qss_vars return codes @staticmethod def interpEffVariable(key, Qm, stim, lkp): ''' Interpolate Q-dependent effective variable along various stimulation states of a solution. :param key: lookup variable key :param Qm: charge density solution vector :param stim: stimulation state solution vector :param lkp: 2D lookup object :return: interpolated effective variable vector ''' x = np.zeros(stim.size) stim_vals = np.unique(stim) for s in stim_vals: x[stim == s] = lkp.project('A', s).interpVar1D(Qm[stim == s], key) return x @staticmethod def spatialAverage(fs, x, x0): ''' fs-modulated spatial averaging. ''' return fs * x + (1 - fs) * x0 @timer def computeEffVars(self, drive, Qm, fs): ''' Compute "effective" coefficients of the HH system for a specific acoustic stimulus and charge density. A short mechanical simulation is run while imposing the specific charge density, until periodic stabilization. The HH coefficients are then averaged over the last acoustic cycle to yield "effective" coefficients. :param drive: acoustic drive object :param Qm: imposed charge density (C/m2) :param fs: list of sonophore membrane coverage fractions :return: list with computation time and a list of dictionaries of effective variables ''' # Run simulation and retrieve deflection and gas content vectors from last cycle data = super().simCycles(drive, Qm) Z_last = data.loc[-NPC_DENSE:, 'Z'].values # m Cm_last = self.v_capacitance(Z_last) # F/m2 # For each coverage fraction effvars = [] for x in fs: # Compute membrane capacitance and membrane potential vectors Cm = self.spatialAverage(x, Cm_last, self.Cm0) # F/m2 Vm = Qm / Cm * 1e3 # mV # Compute average cycle value for membrane potential and rate constants effvars.append({**{'V': np.mean(Vm)}, **self.pneuron.getEffRates(Vm)}) # Log process log = f'{self}: lookups @ {drive.desc}, {Qm * 1e5:.2f} nC/cm2' if len(fs) > 1: log += f', fs = {fs.min() * 1e2:.0f} - {fs.max() * 1e2:.0f}%' logger.info(log) # Return effective coefficients return effvars def getLookupFileName(self, a=None, f=None, A=None, fs=False): fname = f'{self.pneuron.name}_lookups' if a is not None: fname += f'_{a * 1e9:.0f}nm' if f is not None: fname += f'_{f * 1e-3:.0f}kHz' if A is not None: fname += f'_{A * 1e-3:.0f}kPa' if fs is True: fname += '_fs' return f'{fname}.pkl' def getLookupFilePath(self, *args, **kwargs): return os.path.join(NEURONS_LOOKUP_DIR, self.getLookupFileName(*args, **kwargs)) def getLookup(self, *args, **kwargs): keep_tcomp = kwargs.pop('keep_tcomp', False) lookup_path = self.getLookupFilePath(*args, **kwargs) lkp = EffectiveVariablesLookup.fromPickle(lookup_path) if not keep_tcomp: del lkp.tables['tcomp'] return lkp def getLookup2D(self, f, fs): proj_kwargs = {'a': self.a, 'f': f, 'fs': fs} if fs < 1.: kwargs = proj_kwargs.copy() kwargs['fs'] = True else: kwargs = {} return self.getLookup(**kwargs).projectN(proj_kwargs) def fullDerivatives(self, t, y, drive, fs): ''' Compute the full system derivatives. :param t: specific instant in time (s) :param y: vector of state variables :param drive: acoustic drive object :param fs: sonophore membrane coverage fraction (-) :return: vector of derivatives ''' dydt_mech = BilayerSonophore.derivatives( self, t, y[:3], drive, y[3]) dydt_elec = self.pneuron.derivatives( t, y[3:], Cm=self.spatialAverage(fs, self.capacitance(y[1]), self.Cm0)) return dydt_mech + dydt_elec def effDerivatives(self, t, y, lkp1d, qss_vars): ''' Compute the derivatives of the n-ODE effective system variables, based on 1-dimensional linear interpolation of "effective" coefficients that summarize the system's behaviour over an acoustic cycle. :param t: specific instant in time (s) :param y: vector of HH system variables at time t :param lkp: dictionary of 1D data points of "effective" coefficients over the charge domain, for specific frequency and amplitude values. :param qss_vars: list of QSS variables :return: vector of effective system derivatives at time t ''' # Unpack values and interpolate lookup at current charge density Qm, *states = y lkp0d = lkp1d.interpolate1D(Qm) # Compute states dictionary from differential and QSS variables states_dict = {} i = 0 for k in self.pneuron.statesNames(): if k in qss_vars: states_dict[k] = self.pneuron.quasiSteadyStates()[k](lkp0d) else: states_dict[k] = states[i] i += 1 # Compute charge density derivative dQmdt = - self.pneuron.iNet(lkp0d['V'], states_dict) * 1e-3 # Compute states derivative vector only for differential variable dstates = [] for k in self.pneuron.statesNames(): if k not in qss_vars: dstates.append(self.pneuron.derEffStates()[k](lkp0d, states_dict)) return [dQmdt, *dstates] def deflectionDependentVm(self, Qm, Z, fs): ''' Compute deflection (and sonopphore coverage fraction) dependent voltage profile. ''' return Qm / self.spatialAverage(fs, self.v_capacitance(Z), self.Cm0) * 1e3 # mV def fullInitialConditions(self, *args, **kwargs): ''' Compute simulation initial conditions. ''' y0 = super().initialConditions(*args, **kwargs) y0.update({ 'Qm': [self.Qm0] * 2, **{k: [self.pneuron.steadyStates()[k](self.pneuron.Vm0)] * 2 for k in self.pneuron.statesNames()} }) return y0 def __simFull(self, drive, pp, fs): # Compute initial conditions y0 = self.fullInitialConditions(drive, self.Qm0, drive.dt) # Initialize solver and compute solution solver = EventDrivenSolver( lambda x: setattr(solver.drive, 'A', drive.A * x), # eventfunc y0.keys(), # variables list lambda t, y: self.fullDerivatives(t, y, solver.drive, fs), # dfunc event_params={'drive': drive.copy().updatedX(0.)}, # event parameters dt=drive.dt) # time step data = solver( y0, pp.stimEvents(), pp.ttotal, target_dt=CLASSIC_TARGET_DT, log_period=pp.ttotal / 100 if logger.getEffectiveLevel() < logging.INFO else None, # logfunc=lambda y: f'Qm = {y[3] * 1e5:.2f} nC/cm2' ) # Remove velocity and add voltage timeseries to solution del data['U'] data = addColumn( data, 'Vm', self.deflectionDependentVm(data['Qm'], data['Z'], fs), preceding_key='Qm') # Return solution dataframe return data def __simHybrid(self, drive, pp, fs): # Compute initial conditions y0 = self.fullInitialConditions(drive, self.Qm0, drive.dt) # Initialize solver and compute solution solver = HybridSolver( y0.keys(), lambda t, y: self.fullDerivatives(t, y, solver.drive, fs), # dfunc lambda t, y, Cm: self.pneuron.derivatives( t, y, Cm=self.spatialAverage(fs, Cm, self.Cm0)), # dfunc_sparse lambda yref: self.capacitance(yref[1]), # predfunc lambda x: setattr(solver.drive, 'A', drive.A * x), # eventfunc drive.periodicity, # periodicity ['U', 'Z', 'ng'], # fast-evolving variables drive.dt, # dense time step drive.dt_sparse, # sparse time step event_params={'drive': drive.copy().updatedX(0.)}, # event parameters primary_vars=['Z', 'ng'] # primary variables ) data = solver( y0, pp.stimEvents(), pp.ttotal, HYBRID_UPDATE_INTERVAL, target_dt=CLASSIC_TARGET_DT, log_period=pp.ttotal / 100 if logger.getEffectiveLevel() < logging.INFO else None, logfunc=lambda y: f'Qm = {y[3] * 1e5:.2f} nC/cm2' ) # Remove velocity and add voltage timeseries to solution del data['U'] data = addColumn( data, 'Vm', self.deflectionDependentVm(data['Qm'], data['Z'], fs), preceding_key='Qm') # Return solution dataframe return data def __simSonic(self, drive, pp, fs, qss_vars=None, pavg=False): # Load appropriate 2D lookup lkp = self.getLookup2D(drive.f, fs) # Adapt lookup and pulsing protocol if pulse-average mode is selected if pavg: lkp = lkp * pp.DC + lkp.project('A', 0.).tile('A', lkp.refs['A']) * (1 - pp.DC) tstim = (int(pp.tstim * pp.PRF) - 1 + pp.DC) / pp.PRF pp = TimeProtocol(tstim, pp.tstim + pp.toffset - tstim) # Determine QSS and differential variables, and create optional QSS lookup if qss_vars is None: qss_vars = [] else: lkp_QSS = EffectiveVariablesLookup( lkp.refs, {k: self.pneuron.quasiSteadyStates()[k](lkp) for k in qss_vars}) diff_vars = [item for item in self.pneuron.statesNames() if item not in qss_vars] # Set initial conditions y0 = { 'Qm': self.Qm0, **{k: self.pneuron.steadyStates()[k](self.pneuron.Vm0) for k in diff_vars} } # Initialize solver and compute solution solver = EventDrivenSolver( lambda x: setattr(solver, 'lkp', lkp.project('A', drive.A * x)), # eventfunc y0.keys(), # variables list lambda t, y: self.effDerivatives(t, y, solver.lkp, qss_vars), # dfunc dt=self.pneuron.chooseTimeStep()) # time step data = solver(y0, pp.stimEvents(), pp.ttotal) # Interpolate Vm and QSS variables along charge vector and store them in solution dataframe data = addColumn( data, 'Vm', self.interpEffVariable('V', data['Qm'], data['stimstate'] * drive.A, lkp), preceding_key='Qm') for k in qss_vars: data[k] = self.interpEffVariable(k, data['Qm'], data['stimstate'] * drive.A, lkp_QSS) # Add dummy deflection and gas content vectors to solution for key in ['Z', 'ng']: data[key] = np.full(data['t'].size, np.nan) # Return solution dataframe return data def intMethods(self): ''' Listing of model integration methods. ''' return { 'full': self.__simFull, 'hybrid': self.__simHybrid, 'sonic': self.__simSonic } @classmethod @Model.checkOutputDir def simQueue(cls, freqs, amps, durations, offsets, PRFs, DCs, fs, methods, qss_vars, **kwargs): ''' Create a serialized 2D array of all parameter combinations for a series of individual parameter sweeps, while avoiding repetition of CW protocols for a given PRF sweep. :param freqs: list (or 1D-array) of US frequencies :param amps: list (or 1D-array) of acoustic amplitudes :param durations: list (or 1D-array) of stimulus durations :param offsets: list (or 1D-array) of stimulus offsets (paired with durations array) :param PRFs: list (or 1D-array) of pulse-repetition frequencies :param DCs: list (or 1D-array) of duty cycle values :param fs: sonophore membrane coverage fractions (-) :params methods: integration methods :param qss_vars: QSS variables :return: list of parameters (list) for each simulation ''' if ('full' in methods or 'hybrid' in methods) and kwargs['outputdir'] is None: logger.warning('Running cumbersome simulation(s) without file saving') if amps is None: amps = [None] drives = AcousticDrive.createQueue(freqs, amps) protocols = PulsedProtocol.createQueue(durations, offsets, PRFs, DCs) queue = [] for drive in drives: for pp in protocols: for cov in fs: for method in methods: queue.append([drive, pp, cov, method, qss_vars]) return queue def checkInputs(self, drive, pp, fs, method, qss_vars): - if not isinstance(drive, Drive): - raise TypeError(f'Invalid "drive" parameter (must be an "Drive" object)') - if not isinstance(pp, PulsedProtocol): - raise TypeError('Invalid pulsed protocol (must be "PulsedProtocol" instance)') + PointNeuron.checkInputs(drive, pp) if not isinstance(fs, float): raise TypeError(f'Invalid "fs" parameter (must be float typed)') if qss_vars is not None: if not isIterable(qss_vars) or not isinstance(qss_vars[0], str): raise ValueError('Invalid QSS variables: must be None or an iterable of strings') sn = self.pneuron.statesNames() for item in qss_vars: if item not in sn: raise ValueError(f'Invalid QSS variable: {item} (must be in {sn}') if method not in list(self.intMethods().keys()): raise ValueError(f'Invalid integration method: "{method}"') @Model.logNSpikes @Model.checkTitrate @Model.addMeta @Model.logDesc @Model.checkSimParams def simulate(self, drive, pp, fs=1., method='sonic', qss_vars=None): ''' Simulate the electro-mechanical model for a specific set of US stimulation parameters, and return output data in a dataframe. :param drive: acoustic drive object :param pp: pulse protocol object :param fs: sonophore membrane coverage fraction (-) :param method: selected integration method :return: output dataframe ''' # Set the tissue elastic modulus self.setTissueModulus(drive) # Call appropriate simulation function and return simfunc = self.intMethods()[method] simargs = [drive, pp, fs] if method == 'sonic': simargs.append(qss_vars) return simfunc(*simargs) def desc(self, meta): method = meta['method'] if 'method' in meta else meta['model']['method'] fs = meta['fs'] if 'fs' in meta else meta['model']['fs'] s = f'{self}: {method} simulation @ {meta["drive"].desc}, {meta["pp"].desc}' if fs < 1.0: s += f', fs = {(fs * 1e2):.2f}%' if 'qss_vars' in meta and meta['qss_vars'] is not None: s += f" - QSS ({', '.join(meta['qss_vars'])})" return s @staticmethod def getNSpikes(data): return PointNeuron.getNSpikes(data) def getArange(self, drive): return (0., self.getLookup().refs['A'].max()) @property def titrationFunc(self): return self.pneuron.titrationFunc @logCache(os.path.join(os.path.split(__file__)[0], 'astim_titrations.log')) def titrate(self, drive, pp, fs=1., method='sonic', qss_vars=None, xfunc=None, Arange=None): ''' Use a binary search to determine the threshold amplitude needed to obtain neural excitation for a given frequency and pulsed protocol. :param drive: unresolved acoustic drive object :param pp: pulse protocol object :param fs: sonophore membrane coverage fraction (-) :param method: integration method :return: determined threshold amplitude (Pa) ''' return super().titrate(drive, pp, fs=fs, method=method, qss_vars=qss_vars, xfunc=xfunc, Arange=Arange) def getQuasiSteadyStates(self, f, amps=None, charges=None, DC=1.0, squeeze_output=False): ''' Compute the quasi-steady state values of the neuron's gating variables for a combination of US amplitudes, charge densities, at a specific US frequency and duty cycle. :param f: US frequency (Hz) :param amps: US amplitudes (Pa) :param charges: membrane charge densities (C/m2) :param DC: duty cycle :return: 4-tuple with reference values of US amplitude and charge density, as well as interpolated Vmeff and QSS gating variables ''' # Get DC-averaged lookups interpolated at the appropriate amplitudes and charges lkp = self.getLookup().projectDC(amps=amps, DC=DC).projectN({'a': self.a, 'f': f}) if charges is not None: lkp = lkp.project('Q', charges) # Specify dimensions with A as the first axis A_axis = lkp.getAxisIndex('A') lkp.move('A', 0) nA = lkp.dims()[0] # Compute QSS states using these lookups QSS = EffectiveVariablesLookup( lkp.refs, {k: v(lkp) for k, v in self.pneuron.quasiSteadyStates().items()}) # Compress outputs if needed if squeeze_output: QSS = QSS.squeeze() lkp = lkp.squeeze() return lkp, QSS def iNetQSS(self, Qm, f, A, DC): ''' Compute quasi-steady state net membrane current for a given combination of US parameters and a given membrane charge density. :param Qm: membrane charge density (C/m2) :param f: US frequency (Hz) :param A: US amplitude (Pa) :param DC: duty cycle (-) :return: net membrane current (mA/m2) ''' lkp, QSS = self.getQuasiSteadyStates( f, amps=A, charges=Qm, DC=DC, squeeze_output=True) return self.pneuron.iNet(lkp['V'], QSS) # mA/m2 def fixedPointsQSS(self, f, A, DC, lkp, dQdt): ''' Compute QSS fixed points along the charge dimension for a given combination of US parameters, and determine their stability. :param f: US frequency (Hz) :param A: US amplitude (Pa) :param DC: duty cycle (-) :param lkp: lookup dictionary for effective variables along charge dimension :param dQdt: charge derivative profile along charge dimension :return: 2-tuple with values of stable and unstable fixed points ''' pltvars = self.getPltVars() logger.debug(f'A = {A * 1e-3:.2f} kPa, DC = {DC * 1e2:.0f}%') # Extract fixed points from QSS charge variation profile def dfunc(Qm): return - self.iNetQSS(Qm, f, A, DC) fixed_points = getFixedPoints( lkp.refs['Q'], dQdt, filter='both', der_func=dfunc).tolist() dfunc = lambda x: np.array(self.effDerivatives(_, x, lkp)) # classified_fixed_points = {'stable': [], 'unstable': [], 'saddle': []} classified_fixed_points = [] np.set_printoptions(precision=2) # For each fixed point for i, Qm in enumerate(fixed_points): # Re-compute QSS at fixed point *_, QSS = self.getQuasiSteadyStates(f, amps=A, charges=Qm, DC=DC, squeeze_output=True) # Classify fixed point stability by numerically evaluating its Jacobian and # computing its eigenvalues x = np.array([Qm, *QSS.tables.values()]) eigvals, key = classifyFixedPoint(x, dfunc) # classified_fixed_points[key].append(Qm) classified_fixed_points.append((x, eigvals, key)) # eigenvalues.append(eigvals) logger.debug(f'{key} point @ Q = {(Qm * 1e5):.1f} nC/cm2') # eigenvalues = np.array(eigenvalues).T # print(eigenvalues.shape) return classified_fixed_points def isStableQSS(self, f, A, DC): lookups, QSS = self.getQuasiSteadyStates( f, amps=A, DCs=DC, squeeze_output=True) dQdt = -self.pneuron.iNet(lookups['V'], QSS.tables) # mA/m2 classified_fixed_points = self.fixedPointsQSS(f, A, DC, lookups, dQdt) return len(classified_fixed_points['stable']) > 0 class DrivenNeuronalBilayerSonophore(NeuronalBilayerSonophore): simkey = 'DASTIM' # keyword used to characterize simulations made with this model def __init__(self, Idrive, *args, **kwargs): self.Idrive = Idrive super().__init__(*args, **kwargs) def __repr__(self): return super().__repr__()[:-1] + f', Idrive = {self.Idrive:.2f} mA/m2)' @classmethod def initFromMeta(cls, meta): return cls(meta['Idrive'], meta['a'], getPointNeuron(meta['neuron']), embedding_depth=meta['d']) def params(self): return {**{'Idrive': self.Idrive}, **super().params()} @staticmethod def inputs(): inputvars = NeuronalBilayerSonophore.inputs() inputvars['Idrive'] = { 'desc': 'driving current density', 'label': 'I_{drive}', 'unit': 'mA/m2', 'factor': 1e0, 'precision': 0 } return inputvars def filecodes(self, *args): codes = super().filecodes(*args) codes['Idrive'] = f'Idrive{self.Idrive:.1f}mAm2' return codes def fullDerivatives(self, *args): dydt = super().fullDerivatives(*args) dydt[3] += self.Idrive * 1e-3 return dydt def effDerivatives(self, *args): dQmdt, *dstates = super().effDerivatives(*args) dQmdt += self.Idrive * 1e-3 return [dQmdt, *dstates] def meta(self, drive, pp, fs, method, qss_vars): d = super().meta(drive, pp, fs, method, qss_vars) d['Idrive'] = self.Idrive return d \ No newline at end of file diff --git a/PySONIC/core/pneuron.py b/PySONIC/core/pneuron.py index 553d843..3101aed 100644 --- a/PySONIC/core/pneuron.py +++ b/PySONIC/core/pneuron.py @@ -1,545 +1,545 @@ # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Email: theo.lemaire@epfl.ch # @Date: 2017-08-03 11:53:04 # @Last Modified by: Theo Lemaire -# @Last Modified time: 2020-04-16 12:14:52 +# @Last Modified time: 2020-04-17 19:20:05 import abc import inspect import numpy as np -from .protocols import PulsedProtocol +from .protocols import * from .model import Model from .lookups import EffectiveVariablesLookup from .solvers import EventDrivenSolver from .drives import Drive, ElectricDrive from ..postpro import detectSpikes, computeFRProfile from ..constants import * from ..utils import * class PointNeuron(Model): ''' Generic point-neuron model interface. ''' tscale = 'ms' # relevant temporal scale of the model simkey = 'ESTIM' # keyword used to characterize simulations made with this model def __repr__(self): return self.__class__.__name__ def copy(self): return self.__class__() def __eq__(self, other): if not isinstance(other, PointNeuron): return False return self.name == other.name @property @classmethod @abc.abstractmethod def name(cls): ''' Neuron name. ''' raise NotImplementedError @property @classmethod @abc.abstractmethod def Cm0(cls): ''' Neuron's resting capacitance (F/m2). ''' raise NotImplementedError @property @classmethod @abc.abstractmethod def Vm0(cls): ''' Neuron's resting membrane potential(mV). ''' raise NotImplementedError @property def Qm0(self): return self.Cm0 * self.Vm0 * 1e-3 # C/m2 @property def meta(self): return {'neuron': self.name} @staticmethod def inputs(): return ElectricDrive.inputs() @classmethod def filecodes(cls, drive, pp): return { 'simkey': cls.simkey, 'neuron': cls.name, 'nature': pp.nature, **drive.filecodes, **pp.filecodes } @classmethod def normalizedQm(cls, Qm): ''' Compute membrane charge density normalized by resting capacitance. :param Qm: membrane charge density (Q/m2) :return: normalized charge density (mV) ''' return Qm / cls.Cm0 * 1e3 @classmethod def getPltVars(cls, wrapleft='df["', wrapright='"]'): pltvars = { 'Qm': { 'desc': 'membrane charge density', 'label': 'Q_m', 'unit': 'nC/cm^2', 'factor': 1e5, 'bounds': ((cls.Vm0 - 20.0) * cls.Cm0 * 1e2, 60) }, 'Qm/Cm0': { 'desc': 'membrane charge density over resting capacitance', 'label': 'Q_m / C_{m0}', 'unit': 'mV', 'bounds': (-150, 70), 'func': f"normalizedQm({wrapleft}Qm{wrapright})" }, 'Vm': { 'desc': 'membrane potential', 'label': 'V_m', 'unit': 'mV', 'bounds': (-150, 70) }, 'ELeak': { 'constant': 'obj.ELeak', 'desc': 'non-specific leakage current resting potential', 'label': 'V_{leak}', 'unit': 'mV', 'ls': '--', 'color': 'k' } } for cname in cls.getCurrentsNames(): cfunc = getattr(cls, cname) cargs = inspect.getargspec(cfunc)[0][1:] pltvars[cname] = { 'desc': inspect.getdoc(cfunc).splitlines()[0], 'label': f'I_{{{cname[1:]}}}', 'unit': 'A/m^2', 'factor': 1e-3, 'func': f"{cname}({', '.join([f'{wrapleft}{a}{wrapright}' for a in cargs])})" } for var in cargs: if var != 'Vm': pltvars[var] = { 'desc': cls.states[var], 'label': var, 'bounds': (-0.1, 1.1) } pltvars['iNet'] = { 'desc': inspect.getdoc(getattr(cls, 'iNet')).splitlines()[0], 'label': 'I_{net}', 'unit': 'A/m^2', 'factor': 1e-3, 'func': f'iNet({wrapleft}Vm{wrapright}, {wrapleft[:-1]}{cls.statesNames()}{wrapright[1:]})', 'ls': '--', 'color': 'black' } pltvars['dQdt'] = { 'desc': inspect.getdoc(getattr(cls, 'dQdt')).splitlines()[0], 'label': 'dQ_m/dt', 'unit': 'A/m^2', 'factor': 1e-3, 'func': f'dQdt({wrapleft}Vm{wrapright}, {wrapleft[:-1]}{cls.statesNames()}{wrapright[1:]})', 'ls': '--', 'color': 'black' } pltvars['iCap'] = { 'desc': inspect.getdoc(getattr(cls, 'iCap')).splitlines()[0], 'label': 'I_{cap}', 'unit': 'A/m^2', 'factor': 1e-3, 'func': f'iCap({wrapleft}t{wrapright}, {wrapleft}Vm{wrapright})' } for rate in cls.rates: if 'alpha' in rate: prefix, suffix = 'alpha', rate[5:] else: prefix, suffix = 'beta', rate[4:] pltvars[rate] = { 'label': '\\{}_{{{}}}'.format(prefix, suffix), 'unit': 'ms^{-1}', 'factor': 1e-3 } pltvars['FR'] = { 'desc': 'riring rate', 'label': 'FR', 'unit': 'Hz', 'factor': 1e0, # 'bounds': (0, 1e3), 'func': f'firingRateProfile({wrapleft[:-2]})' } return pltvars @classmethod def iCap(cls, t, Vm): ''' Capacitive current. ''' dVdt = np.insert(np.diff(Vm) / np.diff(t), 0, 0.) return cls.Cm0 * dVdt @property def pltScheme(self): pltscheme = { 'Q_m': ['Qm'], 'V_m': ['Vm'] } pltscheme['I'] = self.getCurrentsNames() + ['iNet'] for cname in self.getCurrentsNames(): if 'Leak' not in cname: key = f'i_{{{cname[1:]}}}\ kin.' cargs = inspect.getargspec(getattr(self, cname))[0][1:] pltscheme[key] = [var for var in cargs if var not in ['Vm', 'Cai']] return pltscheme @classmethod def statesNames(cls): ''' Return a list of names of all state variables of the model. ''' return list(cls.states.keys()) @classmethod @abc.abstractmethod def derStates(cls): ''' Dictionary of states derivatives functions ''' raise NotImplementedError @classmethod def getDerStates(cls, Vm, states): ''' Compute states derivatives array given a membrane potential and states dictionary ''' return np.array([cls.derStates()[k](Vm, states) for k in cls.statesNames()]) @classmethod @abc.abstractmethod def steadyStates(cls): ''' Return a dictionary of steady-states functions ''' raise NotImplementedError @classmethod def getSteadyStates(cls, Vm): ''' Compute array of steady-states for a given membrane potential ''' return np.array([cls.steadyStates()[k](Vm) for k in cls.statesNames()]) @classmethod def getDerEffStates(cls, lkp, states): ''' Compute effective states derivatives array given lookups and states dictionaries. ''' return np.array([ cls.derEffStates()[k](lkp, states) for k in cls.statesNames()]) @classmethod def getEffRates(cls, Vm): ''' Compute array of effective rate constants for a given membrane potential vector. ''' return {k: np.mean(np.vectorize(v)(Vm)) for k, v in cls.effRates().items()} def getLookup(self): ''' Get lookup of membrane potential rate constants interpolated along the neuron's charge physiological range. ''' Qmin, Qmax = expandRange(*self.Qbounds, exp_factor=10.) Qref = np.arange(Qmin, Qmax, 1e-5) # C/m2 Vref = Qref / self.Cm0 * 1e3 # mV tables = {k: np.vectorize(v)(Vref) for k, v in self.effRates().items()} return EffectiveVariablesLookup({'Q': Qref}, {'V': Vref, **tables}) @classmethod @abc.abstractmethod def currents(cls): ''' Dictionary of ionic currents functions (returning current densities in mA/m2) ''' @classmethod def iNet(cls, Vm, states): ''' net membrane current :param Vm: membrane potential (mV) :states: states of ion channels gating and related variables :return: current per unit area (mA/m2) ''' return sum([cfunc(Vm, states) for cfunc in cls.currents().values()]) @classmethod def dQdt(cls, Vm, states): ''' membrane charge density variation rate :param Vm: membrane potential (mV) :states: states of ion channels gating and related variables :return: variation rate (mA/m2) ''' return -cls.iNet(Vm, states) @classmethod def titrationFunc(cls, *args, **kwargs): ''' Default titration function. ''' return cls.isExcited(*args, **kwargs) @staticmethod def currentToConcentrationRate(z_ion, depth): ''' Compute the conversion factor from a specific ionic current (in mA/m2) into a variation rate of submembrane ion concentration (in M/s). :param: z_ion: ion valence :param depth: submembrane depth (m) :return: conversion factor (Mmol.m-1.C-1) ''' return 1e-6 / (z_ion * depth * FARADAY) @staticmethod def nernst(z_ion, Cion_in, Cion_out, T): ''' Nernst potential of a specific ion given its intra and extracellular concentrations. :param z_ion: ion valence :param Cion_in: intracellular ion concentration :param Cion_out: extracellular ion concentration :param T: temperature (K) :return: ion Nernst potential (mV) ''' return (Rg * T) / (z_ion * FARADAY) * np.log(Cion_out / Cion_in) * 1e3 @staticmethod def vtrap(x, y): ''' Generic function used to compute rate constants. ''' return x / (np.exp(x / y) - 1) @staticmethod def efun(x): ''' Generic function used to compute rate constants. ''' return x / (np.exp(x) - 1) @classmethod def ghkDrive(cls, Vm, Z_ion, Cion_in, Cion_out, T): ''' Use the Goldman-Hodgkin-Katz equation to compute the electrochemical driving force of a specific ion species for a given membrane potential. :param Vm: membrane potential (mV) :param Cin: intracellular ion concentration (M) :param Cout: extracellular ion concentration (M) :param T: temperature (K) :return: electrochemical driving force of a single ion particle (mC.m-3) ''' x = Z_ion * FARADAY * Vm / (Rg * T) * 1e-3 # [-] eCin = Cion_in * cls.efun(-x) # M eCout = Cion_out * cls.efun(x) # M return FARADAY * (eCin - eCout) * 1e6 # mC/m3 @classmethod def xBG(cls, Vref, Vm): ''' Compute dimensionless Borg-Graham ratio for a given voltage. :param Vref: reference voltage membrane (mV) :param Vm: membrane potential (mV) :return: dimensionless ratio ''' return (Vm - Vref) * FARADAY / (Rg * cls.T) * 1e-3 # [-] @classmethod def alphaBG(cls, alpha0, zeta, gamma, Vref, Vm): ''' Compute the activation rate constant for a given voltage and temperature, using a Borg-Graham formalism. :param alpha0: pre-exponential multiplying factor :param zeta: effective valence of the gating particle :param gamma: normalized position of the transition state within the membrane :param Vref: membrane voltage at which alpha = alpha0 (mV) :param Vm: membrane potential (mV) :return: rate constant (in alpha0 units) ''' return alpha0 * np.exp(-zeta * gamma * cls.xBG(Vref, Vm)) @classmethod def betaBG(cls, beta0, zeta, gamma, Vref, Vm): ''' Compute the inactivation rate constant for a given voltage and temperature, using a Borg-Graham formalism. :param beta0: pre-exponential multiplying factor :param zeta: effective valence of the gating particle :param gamma: normalized position of the transition state within the membrane :param Vref: membrane voltage at which beta = beta0 (mV) :param Vm: membrane potential (mV) :return: rate constant (in beta0 units) ''' return beta0 * np.exp(zeta * (1 - gamma) * cls.xBG(Vref, Vm)) @classmethod def getCurrentsNames(cls): return list(cls.currents().keys()) @staticmethod def firingRateProfile(*args, **kwargs): return computeFRProfile(*args, **kwargs) @property def Qbounds(self): ''' Determine bounds of membrane charge physiological range for a given neuron. ''' return np.array([np.round(self.Vm0 - 25.0), 50.0]) * self.Cm0 * 1e-3 # C/m2 @classmethod def isVoltageGated(cls, state): ''' Determine whether a given state is purely voltage-gated or not.''' return f'alpha{state.lower()}' in cls.rates @classmethod @Model.checkOutputDir def simQueue(cls, amps, durations, offsets, PRFs, DCs, **kwargs): ''' Create a serialized 2D array of all parameter combinations for a series of individual parameter sweeps. :param amps: list (or 1D-array) of acoustic amplitudes :param durations: list (or 1D-array) of stimulus durations :param offsets: list (or 1D-array) of stimulus offsets (paired with durations array) :param PRFs: list (or 1D-array) of pulse-repetition frequencies :param DCs: list (or 1D-array) of duty cycle values :return: list of parameters (list) for each simulation ''' if amps is None: amps = [None] drives = ElectricDrive.createQueue(amps) protocols = PulsedProtocol.createQueue(durations, offsets, PRFs, DCs) queue = [] for drive in drives: for pp in protocols: queue.append([drive, pp]) return queue @staticmethod def checkInputs(drive, pp): ''' Check validity of electrical stimulation parameters. :param drive: electric drive object :param pp: pulse protocol object ''' if not isinstance(drive, Drive): raise TypeError(f'Invalid "drive" parameter (must be an "Drive" object)') - if not isinstance(pp, PulsedProtocol): - raise TypeError('Invalid pulsed protocol (must be "PulsedProtocol" instance)') + if not isinstance(pp, TimeProtocol): + raise TypeError('Invalid time protocol (must be "TimeProtocol" instance)') def chooseTimeStep(self): ''' Determine integration time step based on intrinsic temporal properties. ''' return DT_EFFECTIVE @classmethod def derivatives(cls, t, y, Cm=None, Iinj=0.): ''' Compute system derivatives for a given membrane capacitance and injected current. :param t: specific instant in time (s) :param y: vector of HH system variables at time t :param Cm: membrane capacitance (F/m2) :param Iinj: injected current (mA/m2) :return: vector of system derivatives at time t ''' if Cm is None: Cm = cls.Cm0 Qm, *states = y Vm = Qm / Cm * 1e3 # mV states_dict = dict(zip(cls.statesNames(), states)) dQmdt = (Iinj - cls.iNet(Vm, states_dict)) * 1e-3 # A/m2 return [dQmdt, *cls.getDerStates(Vm, states_dict)] @Model.logNSpikes @Model.checkTitrate @Model.addMeta @Model.logDesc @Model.checkSimParams def simulate(self, drive, pp): ''' Simulate a specific neuron model for a set of simulation parameters, and return output data in a dataframe. :param drive: electric drive object :param pp: pulse protocol object :return: output DataFrame ''' # Set initial conditions y0 = { 'Qm': self.Qm0, **{k: self.steadyStates()[k](self.Vm0) for k in self.statesNames()} } # Initialize solver and compute solution solver = EventDrivenSolver( lambda x: setattr(solver, 'A', drive.I * x), # eventfunc y0.keys(), # variables lambda t, y: self.derivatives(t, y, Iinj=solver.A), # dfunc dt=self.chooseTimeStep()) # time step data = solver(y0, pp.stimEvents(), pp.ttotal) # Add Vm timeries to solution data = addColumn(data, 'Vm', data['Qm'].values / self.Cm0 * 1e3, preceding_key='Qm') # Return solution dataframe return data def desc(self, meta): return f'{self}: simulation @ {meta["drive"].desc}, {meta["pp"].desc}' @staticmethod def getNSpikes(data): ''' Compute number of spikes in charge profile of simulation output. :param data: dataframe containing output time series :return: number of detected spikes ''' return detectSpikes(data)[0].size @staticmethod def getStabilizationValue(data): ''' Determine stabilization value from the charge profile of a simulation output. :param data: dataframe containing output time series :return: charge stabilization value (or np.nan if no stabilization detected) ''' # Extract charge signal posterior to observation window t, Qm = [data[key].values for key in ['t', 'Qm']] if t.max() <= TMIN_STABILIZATION: raise ValueError('solution length is too short to assess stabilization') Qm = Qm[t > TMIN_STABILIZATION] # Compute variation range Qm_range = np.ptp(Qm) logger.debug('%.2f nC/cm2 variation range over the last %.0f ms, Qmf = %.2f nC/cm2', Qm_range * 1e5, TMIN_STABILIZATION * 1e3, Qm[-1] * 1e5) # Return final value only if stabilization is detected if np.ptp(Qm) < QSS_Q_DIV_THR: return Qm[-1] else: return np.nan @classmethod def isExcited(cls, data): ''' Determine if neuron is excited from simulation output. :param data: dataframe containing output time series :return: boolean stating whether neuron is excited or not ''' return cls.getNSpikes(data) > 0 @classmethod def isSilenced(cls, data): ''' Determine if neuron is silenced from simulation output. :param data: dataframe containing output time series :return: boolean stating whether neuron is silenced or not ''' return not np.isnan(cls.getStabilizationValue(data)) def getArange(self, drive): return drive.xvar_range diff --git a/PySONIC/core/protocols.py b/PySONIC/core/protocols.py index ac5baad..0a80865 100644 --- a/PySONIC/core/protocols.py +++ b/PySONIC/core/protocols.py @@ -1,241 +1,355 @@ # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Email: theo.lemaire@epfl.ch # @Date: 2019-11-12 18:04:45 # @Last Modified by: Theo Lemaire -# @Last Modified time: 2020-04-15 20:43:24 +# @Last Modified time: 2020-04-17 18:31:40 import numpy as np from ..utils import si_format, StimObject from .batches import Batch class TimeProtocol(StimObject): def __init__(self, tstim, toffset): ''' Class constructor. :param tstim: pulse duration (s) :param toffset: offset duration (s) ''' self.tstim = tstim self.toffset = toffset @property def tstim(self): return self._tstim @tstim.setter def tstim(self, value): value = self.checkFloat('tstim', value) self.checkPositiveOrNull('tstim', value) self._tstim = value @property def toffset(self): return self._toffset @toffset.setter def toffset(self, value): value = self.checkFloat('toffset', value) self.checkPositiveOrNull('toffset', value) self._toffset = value @property def ttotal(self): return self.tstim + self.toffset + @property + def nature(self): + return 'CW' + def __eq__(self, other): if not isinstance(other, self.__class__): return False return self.tstim == other.tstim and self.toffset == other.toffset + def paramStr(self, k, strict_nfigs=False): + val = getattr(self, k) * self.inputs()[k].get('factor', 1.) + precision = self.inputs()[k].get('precision', 0) + unit = self.inputs()[k].get('unit', '') + formatted_val = si_format(val, precision=precision, space='') + if strict_nfigs: + minfigs = self.inputs()[k].get('minfigs', None) + if minfigs is not None: + nfigs = len(formatted_val.split('.')[0]) + if nfigs < minfigs: + formatted_val = '0' * (minfigs - nfigs) + formatted_val + return f'{formatted_val}{unit}' + + def pdict(self, sf='{key}={value}', **kwargs): + d = {k: sf.format(key=k, value=self.paramStr(k, **kwargs)) for k in self.inputs().keys()} + if self.toffset == 0.: + del d['toffset'] + return d + def __repr__(self): - params = [f'{si_format(x, 1, space="")}s' for x in [self.tstim, self.toffset]] - return f'{self.__class__.__name__}({", ".join(params)})' + return f'{self.__class__.__name__}({", ".join(self.pdict().values())})' @property def desc(self): - return f'{si_format(self.tstim, 1)}s stim, {si_format(self.toffset, 1)}s offset' + return ', '.join(self.pdict(sf='{value} {key}').values()) @property def filecodes(self): - return {'tstim': f'{(self.tstim * 1e3):.0f}ms', 'toffset': None} + return self.pdict(sf='{key}_{value}', strict_nfigs=True) @staticmethod def inputs(): return { 'tstim': { 'desc': 'stimulus duration', 'label': 't_{stim}', 'unit': 'ms', 'factor': 1e3, 'precision': 0 }, 'toffset': { 'desc': 'offset duration', 'label': 't_{offset}', 'unit': 'ms', 'factor': 1e3, 'precision': 0 } } - @classmethod - def createQueue(cls, durations, offsets): - ''' Create a serialized 2D array of all parameter combinations for a series of individual - parameter sweeps. - - :param durations: list (or 1D-array) of stimulus durations - :param offsets: list (or 1D-array) of stimulus offsets (paired with durations array) - :return: list of parameters (list) for each simulation - ''' - return [cls(*item) for item in Batch.createQueue(durations, offsets)] - def tOFFON(self): ''' Return vector of times of OFF-ON transitions (in s). ''' return np.array([0.]) def tONOFF(self): ''' Return vector of times of ON-OFF transitions (in s). ''' return np.array([self.tstim]) def stimEvents(self): ''' Return time-value pairs for each transition in stimulation state. ''' t_on_off = self.tONOFF() t_off_on = self.tOFFON() pairs = list(zip(t_off_on, [1] * len(t_off_on))) + list(zip(t_on_off, [0] * len(t_on_off))) return sorted(pairs, key=lambda x: x[0]) + @classmethod + def createQueue(cls, durations, offsets): + ''' Create a serialized 2D array of all parameter combinations for a series of individual + parameter sweeps. + + :param durations: list (or 1D-array) of stimulus durations + :param offsets: list (or 1D-array) of stimulus offsets (paired with durations array) + :return: list of parameters (list) for each simulation + ''' + return [cls(*item) for item in Batch.createQueue(durations, offsets)] + class PulsedProtocol(TimeProtocol): def __init__(self, tstim, toffset, PRF=100., DC=1.): ''' Class constructor. :param tstim: pulse duration (s) :param toffset: offset duration (s) :param PRF: pulse repetition frequency (Hz) :param DC: pulse duty cycle (-) ''' super().__init__(tstim, toffset) self.DC = DC self.PRF = PRF @property def DC(self): return self._DC @DC.setter def DC(self, value): value = self.checkFloat('DC', value) self.checkBounded('DC', value, (0., 1.)) self._DC = value @property def PRF(self): return self._PRF @PRF.setter def PRF(self, value): value = self.checkFloat('PRF', value) self.checkPositiveOrNull('PRF', value) if self.DC < 1.: self.checkBounded('PRF', value, (1 / self.tstim, np.inf)) self._PRF = value def __eq__(self, other): if not isinstance(other, self.__class__): return False return super().__eq__(other) and self.PRF == other.PRF and self.DC == other.DC - def __repr__(self): - params = [f'{si_format(self.PRF, 1, space="")}Hz', f'{self.DC:.2f}'] - return f'{super().__repr__()[:-1]}, {", ".join(params)})' + def pdict(self, **kwargs): + d = super().pdict(**kwargs) + if self.isCW: + del d['PRF'] + del d['DC'] + return d @property def T_ON(self): return self.DC / self.PRF @property def T_OFF(self): return (1 - self.DC) / self.PRF @property def npulses(self): return int(np.round(self.tstim * self.PRF)) - @property - def desc(self): - s = super().desc - if self.DC < 1: - s += f', {si_format(self.PRF, 2)}Hz PRF, {(self.DC * 1e2):.1f}% DC' - return s - @property def isCW(self): return self.DC == 1. @property def nature(self): return 'CW' if self.isCW else 'PW' - @property - def filecodes(self): - if self.isCW: - d = {'PRF': None, 'DC': None} - else: - d = {'PRF': f'PRF{self.PRF:.2f}Hz', 'DC': f'DC{self.DC * 1e2:04.1f}%'} - return {**super().filecodes, **d} - @staticmethod def inputs(): d = { 'PRF': { 'desc': 'pulse repetition frequency', 'label': 'PRF', 'unit': 'Hz', 'factor': 1e0, 'precision': 0 }, 'DC': { 'desc': 'duty cycle', 'label': 'DC', 'unit': '%', 'factor': 1e2, - 'precision': 2 + 'precision': 2, + 'minfigs': 2 } } return {**TimeProtocol.inputs(), **d} + def tOFFON(self): + if self.isCW: + return super().tOFFON() + else: + return np.arange(self.npulses) / self.PRF + + def tONOFF(self): + if self.isCW: + return super().tONOFF() + else: + return (np.arange(self.npulses) + self.DC) / self.PRF + @classmethod def createQueue(cls, durations, offsets, PRFs, DCs): ''' Create a serialized 2D array of all parameter combinations for a series of individual parameter sweeps, while avoiding repetition of CW protocols for a given PRF sweep. :param durations: list (or 1D-array) of stimulus durations :param offsets: list (or 1D-array) of stimulus offsets (paired with durations array) :param PRFs: list (or 1D-array) of pulse-repetition frequencies :param DCs: list (or 1D-array) of duty cycle values :return: list of parameters (list) for each simulation ''' DCs = np.array(DCs) queue = [] if 1.0 in DCs: queue += Batch.createQueue(durations, offsets, min(PRFs), 1.0) if np.any(DCs != 1.0): queue += Batch.createQueue(durations, offsets, PRFs, DCs[DCs != 1.0]) queue = [cls(*item) for item in queue] return queue + +class BurstProtocol(PulsedProtocol): + + def __init__(self, tburst, BRF, nbursts=1, **kwargs): + ''' Class constructor. + + :param tburst: burst duration (s) + :param BRF: burst repetition frequency (Hz) + :param nbursts: number of bursts + ''' + super().__init__(tburst, 1 / BRF - tburst, **kwargs) + self.BRF = BRF + self.nbursts = nbursts + + @property + def tburst(self): + return self.tstim + + @property + def ttotal(self): + return self.nbursts / self.BRF + + @property + def BRF(self): + return self._BRF + + @BRF.setter + def BRF(self, value): + value = self.checkFloat('BRF', value) + self.checkPositiveOrNull('BRF', value) + self.checkBounded('BRF', value, (0, 1 / self.tburst)) + self._BRF = value + + def __eq__(self, other): + if not isinstance(other, self.__class__): + return False + return super().__eq__(other) and self.BRF == other.BRF and self.nbursts == other.nbursts + + @staticmethod + def inputs(): + d = PulsedProtocol.inputs() + for k in ['tstim', 'toffset']: + del d[k] + d.update({ + 'tburst': { + 'desc': 'burst duration', + 'label': 't_{burst}', + 'unit': 'ms', + 'factor': 1e3, + 'precision': 0 + }, + 'BRF': { + 'desc': 'burst repetition frequency', + 'label': 'BRF', + 'unit': 'Hz', + 'precision': 0 + }, + 'nbursts': { + 'desc': 'number of bursts', + 'label': 'n_{bursts}' + } + }) + return { + 'tburst': d.pop('tburst'), + **{k: v for k, v in d.items()} + } + + def repeatBurstArray(self, tburst): + return np.ravel(np.array([tburst + i / self.BRF for i in range(self.nbursts)])) + def tOFFON(self): - if self.DC == 1: - return super().tOFFON() - else: - return np.arange(self.npulses) / self.PRF + return self.repeatBurstArray(super().tOFFON()) def tONOFF(self): - if self.DC == 1: - return super().tONOFF() - else: - return self.tOFFON() + self.DC / self.PRF + return self.repeatBurstArray(super().tONOFF()) + + @classmethod + def createQueue(cls, durations, PRFs, DCs, BRFs, nbursts): + ''' Create a serialized 2D array of all parameter combinations for a series of individual + parameter sweeps, while avoiding repetition of CW protocols for a given PRF sweep. + + :param durations: list (or 1D-array) of stimulus durations + :param PRFs: list (or 1D-array) of pulse-repetition frequencies + :param DCs: list (or 1D-array) of duty cycle values + :param BRFs: list (or 1D-array) of burst-repetition frequencies + :param nbursts: list (or 1D-array) of number of bursts + :return: list of parameters (list) for each simulation + ''' + # Get pulsed protocols queue (with unique zero offset) + pp_queue = PulsedProtocol.createQueue(durations, [0.], PRFs, DCs) + + # Extract parameters (without offset) + pp_queue = [[x.tstim, x.PRF, x.DC] for x in pp_queue] + + # Complete queue with each BRF-nburts combination + queue = [] + for nb in nbursts: + for BRF in BRFs: + queue.append(pp_queue + [BRF, nbursts]) + + # Construct and return objects queue + return [cls(*item) for item in queue] diff --git a/PySONIC/parsers.py b/PySONIC/parsers.py index 6201cda..1180ea4 100644 --- a/PySONIC/parsers.py +++ b/PySONIC/parsers.py @@ -1,731 +1,739 @@ # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Email: theo.lemaire@epfl.ch # @Date: 2019-06-04 18:24:29 # @Last Modified by: Theo Lemaire -# @Last Modified time: 2020-04-13 18:10:14 +# @Last Modified time: 2020-04-17 18:28:54 import os import logging import pprint import numpy as np import matplotlib.pyplot as plt from argparse import ArgumentParser from .utils import Intensity2Pressure, selectDirDialog, OpenFilesDialog, isIterable from .neurons import getPointNeuron, CorticalRS from .plt import GroupedTimeSeries, CompTimeSeries DEFAULT_OUTPUT_FOLDER = os.path.abspath(os.path.split(__file__)[0] + '../../../../dump') class Parser(ArgumentParser): ''' Generic parser interface. ''' dist_str = '[scale min max n]' def __init__(self): super().__init__() self.pp = pprint.PrettyPrinter(indent=4) self.defaults = {} self.allowed = {} self.factors = {} self.to_parse = {} self.addPlot() self.addVerbose() def pprint(self, args): self.pp.pprint(args) def getDistribution(self, xmin, xmax, nx, scale='lin'): if scale == 'log': xmin, xmax = np.log10(xmin), np.log10(xmax) return {'lin': np.linspace, 'log': np.logspace}[scale](xmin, xmax, nx) def getDistFromList(self, xlist): if not isinstance(xlist, list): raise TypeError('Input must be a list') if len(xlist) != 4: raise ValueError('List must contain exactly 4 arguments ([type, min, max, n])') scale = xlist[0] if scale not in ('log', 'lin'): raise ValueError('Unknown distribution type (must be "lin" or "log")') xmin, xmax = [float(x) for x in xlist[1:-1]] if xmin >= xmax: raise ValueError('Specified minimum higher or equal than specified maximum') nx = int(xlist[-1]) if nx < 2: raise ValueError('Specified number must be at least 2') return self.getDistribution(xmin, xmax, nx, scale=scale) def addRangeParam(self, key, desc, shortcut=None): if shortcut is not None: args = [f'-{shortcut}'] rangekey = shortcut else: args = [] rangekey = key args.append(f'--{key}') self.add_argument(*args, nargs='+', type=float, help=desc) self.add_argument( f'--{rangekey}range', type=str, nargs='+', help=f'Range of {desc}: {self.dist_str}') self.to_parse[key] = self.parseAmplitude def parseRangeParam(self, args, key): rangekey = f'{key}range' params = [key, rangekey] self.restrict(args) if key in args: return np.array(args[key]) * self.factors[key] elif rangekey in args: return self.getDistFromList(args[rangekey]) * self.factors[key] def addVerbose(self): self.add_argument( '-v', '--verbose', default=False, action='store_true', help='Increase verbosity') self.to_parse['loglevel'] = self.parseLogLevel def addPlot(self): self.add_argument( '-p', '--plot', type=str, nargs='+', help='Variables to plot') self.to_parse['pltscheme'] = self.parsePltScheme def addPhase(self): self.add_argument( '--phase', default=False, action='store_true', help='Phase plot') def addMPI(self): self.add_argument( '--mpi', default=False, action='store_true', help='Use multiprocessing') def addTest(self): self.add_argument( '--test', default=False, action='store_true', help='Run test configuration') def addSave(self): self.add_argument( '-s', '--save', default=False, action='store_true', help='Save output(s)') def addCheckForOutput(self): self.add_argument( '--checkout', default=False, action='store_true', help='Run only simulations for which there is no output file in the output directory') self.to_parse['overwrite'] = self.parseOverwrite def addOverwrite(self): self.add_argument( '--overwrite', default=False, action='store_true', help='Overwrite pre-existing simulation files with new output') def addFigureExtension(self): self.add_argument( '--figext', type=str, default='png', help='Figure type (extension)') def addCompare(self, desc='Comparative graph'): self.add_argument( '--compare', default=False, action='store_true', help=desc) def addSamplingRate(self): self.add_argument( '--sr', type=int, default=1, help='Sampling rate for plot') def addSpikes(self): self.add_argument( '--spikes', type=str, default='none', help='How to indicate spikes on charge profile ("none", "marks" or "details")') def addNColumns(self): self.add_argument( '--ncol', type=int, default=1, help='Number of columns in figure') def addNLevels(self): self.add_argument( '--nlevels', type=int, default=10, help='Number of levels') def addHideOutput(self): self.add_argument( '--hide', default=False, action='store_true', help='Hide output') def addTimeRange(self, default=None): self.add_argument( '--trange', type=float, nargs=2, default=default, help='Time lower and upper bounds (ms)') self.to_parse['trange'] = self.parseTimeRange def addZvar(self, default): self.add_argument( '-z', '--zvar', type=str, default=default, help='z-variable type') def addYscale(self, default='lin'): self.add_argument( '--yscale', type=str, choices=('lin', 'log'), default=default, help='y-scale type ("lin" or "log")') def addZscale(self, default='lin'): self.add_argument( '--zscale', type=str, choices=('lin', 'log'), default=default, help='z-scale type ("lin" or "log")') def addZbounds(self, default): self.add_argument( '--zbounds', type=float, nargs=2, default=default, help='z-scale lower and upper bounds') def addCmap(self, default=None): self.add_argument( '--cmap', type=str, default=default, help='Colormap name') def addCscale(self, default='lin'): self.add_argument( '--cscale', type=str, default=default, choices=('lin', 'log'), help='Color scale ("lin" or "log")') def addInputDir(self, dep_key=None): self.inputdir_dep_key = dep_key self.add_argument( '-i', '--inputdir', type=str, help='Input directory') self.to_parse['inputdir'] = self.parseInputDir def addOutputDir(self, dep_key=None): self.outputdir_dep_key = dep_key self.add_argument( '-o', '--outputdir', type=str, help='Output directory') self.to_parse['outputdir'] = self.parseOutputDir def addInputFiles(self, dep_key=None): self.inputfiles_dep_key = dep_key self.add_argument( '-i', '--inputfiles', type=str, help='Input files') self.to_parse['inputfiles'] = self.parseInputFiles def addPatches(self): self.add_argument( '--patches', type=str, default='one', help='Stimulus patching mode ("none", "one", all", or a boolean list)') self.to_parse['patches'] = self.parsePatches def addThresholdCurve(self): self.add_argument( '--threshold', default=False, action='store_true', help='Show threshold amplitudes') def addNeuron(self): self.add_argument( '-n', '--neuron', type=str, nargs='+', help='Neuron name (string)') self.to_parse['neuron'] = self.parseNeuron def parseNeuron(self, args): return [getPointNeuron(n) for n in args['neuron']] def addInteractive(self): self.add_argument( '--interactive', default=False, action='store_true', help='Make interactive') def addLabels(self): self.add_argument( '--labels', type=str, nargs='+', default=None, help='Labels') def addRelativeTimeBounds(self): self.add_argument( '--rel_tbounds', type=float, nargs='+', default=None, help='Relative time lower and upper bounds') def addPretty(self): self.add_argument( '--pretty', default=False, action='store_true', help='Make figure pretty') def addSubset(self, choices): self.add_argument( '--subset', type=str, nargs='+', default=['all'], choices=choices + ['all'], help='Run specific subset(s)') self.subset_choices = choices self.to_parse['subset'] = self.parseSubset def parseSubset(self, args): if args['subset'] == ['all']: return self.subset_choices else: return args['subset'] def parseTimeRange(self, args): if args['trange'] is None: return None return np.array(args['trange']) * 1e-3 def parsePatches(self, args): if args['patches'] not in ('none', 'one', 'all'): return eval(args['patches']) else: return args['patches'] def parseInputFiles(self, args): if self.inputfiles_dep_key is not None and not args[self.inputfiles_dep_key]: return None elif args['inputfiles'] is None: return OpenFilesDialog('pkl')[0] def parseDir(self, key, args, title, dep_key=None): if dep_key is not None and args[dep_key] is False: return None try: if args[key] is not None: return os.path.abspath(args[key]) else: return selectDirDialog(title=title) except ValueError: raise ValueError(f'No {key} selected') def parseInputDir(self, args): return self.parseDir( 'inputdir', args, 'Select input directory', self.inputdir_dep_key) def parseOutputDir(self, args): if hasattr(self, 'outputdir') and self.outputdir is not None: return self.outputdir else: if args['outputdir'] is not None and args['outputdir'] == 'dump': return DEFAULT_OUTPUT_FOLDER else: return self.parseDir( 'outputdir', args, 'Select output directory', self.outputdir_dep_key) def parseLogLevel(self, args): return logging.DEBUG if args.pop('verbose') else logging.INFO def parsePltScheme(self, args): if args['plot'] is None or args['plot'] == ['all']: return None else: return {x: [x] for x in args['plot']} def parseOverwrite(self, args): check_for_output = args.pop('checkout') return not check_for_output def restrict(self, args, keys): if sum([args[x] is not None for x in keys]) > 1: raise ValueError( f'You must provide only one of the following arguments: {", ".join(keys)}') def parse2array(self, args, key, factor=1): return np.array(args[key]) * factor def parse(self): args = vars(super().parse_args()) for k, v in self.defaults.items(): if k in args and args[k] is None: args[k] = v if isIterable(v) else [v] for k, parse_method in self.to_parse.items(): args[k] = parse_method(args) return args @staticmethod def parsePlot(args, output): render_args = {} if 'spikes' in args: render_args['spikes'] = args['spikes'] if args['compare']: if args['plot'] == ['all']: logger.error('Specific variables must be specified for comparative plots') return for key in ['cmap', 'cscale']: if key in args: render_args[key] = args[key] for pltvar in args['plot']: comp_plot = CompTimeSeries(output, pltvar) comp_plot.render(**render_args) else: scheme_plot = GroupedTimeSeries(output, pltscheme=args['pltscheme']) scheme_plot.render(**render_args) # phase_plot = PhaseDiagram(output, args['plot'][0]) # phase_plot.render( # # trange=args['trange'], # # rel_tbounds=args['rel_tbounds'], # labels=args['labels'], # prettify=args['pretty'], # cmap=args['cmap'], # cscale=args['cscale'] # ) plt.show() class TestParser(Parser): def __init__(self, valid_subsets): super().__init__() self.addProfiling() self.addSubset(valid_subsets) def addProfiling(self): self.add_argument( '--profile', default=False, action='store_true', help='Run with profiling') class FigureParser(Parser): def __init__(self, valid_subsets): super().__init__() self.addSubset(valid_subsets) self.addSave() self.addOutputDir(dep_key='save') class PlotParser(Parser): def __init__(self): super().__init__() self.addHideOutput() self.addInputFiles() self.addOutputDir(dep_key='save') self.addSave() self.addFigureExtension() self.addCmap() self.addPretty() self.addTimeRange() self.addCscale() self.addLabels() class TimeSeriesParser(PlotParser): def __init__(self): super().__init__() self.addSpikes() self.addSamplingRate() self.addCompare() self.addPatches() class SimParser(Parser): ''' Generic simulation parser interface. ''' def __init__(self, outputdir=None): super().__init__() self.outputdir = outputdir self.addMPI() self.addOutputDir(dep_key='save') self.addSave() self.addCheckForOutput() self.addCompare() self.addCmap() self.addCscale() def parse(self): args = super().parse() return args class MechSimParser(SimParser): ''' Parser to run mechanical simulations from the command line. ''' def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.defaults.update({ 'radius': 32.0, # nm 'embedding': 0., # um 'Cm0': CorticalRS().Cm0 * 1e2, # uF/m2 'Qm0': CorticalRS().Qm0 * 1e5, # nC/m2 'freq': 500.0, # kHz 'amp': 100.0, # kPa 'charge': 0., # nC/cm2 'fs': 100. # % }) self.factors.update({ 'radius': 1e-9, 'embedding': 1e-6, 'Cm0': 1e-2, 'Qm0': 1e-5, 'freq': 1e3, 'amp': 1e3, 'charge': 1e-5, 'fs': 1e-2 }) self.addRadius() self.addEmbedding() self.addCm0() self.addQm0() self.addFrequency() self.addAmplitude() self.addCharge() self.addFs() def addRadius(self): self.add_argument( '-a', '--radius', nargs='+', type=float, help='Sonophore radius (nm)') def addEmbedding(self): self.add_argument( '--embedding', nargs='+', type=float, help='Embedding depth (um)') def addCm0(self): self.add_argument( '--Cm0', type=float, nargs='+', help='Resting membrane capacitance (uF/cm2)') def addQm0(self): self.add_argument( '--Qm0', type=float, nargs='+', help='Resting membrane charge density (nC/cm2)') def addFrequency(self): self.add_argument( '-f', '--freq', nargs='+', type=float, help='US frequency (kHz)') def addAmplitude(self): self.add_argument( '-A', '--amp', nargs='+', type=float, help='Acoustic pressure amplitude (kPa)') self.add_argument( '--Arange', type=str, nargs='+', help=f'Amplitude range {self.dist_str} (kPa)') self.add_argument( '-I', '--intensity', nargs='+', type=float, help='Acoustic intensity (W/cm2)') self.add_argument( '--Irange', type=str, nargs='+', help=f'Intensity range {self.dist_str} (W/cm2)') self.to_parse['amp'] = self.parseAmplitude def addCharge(self): self.add_argument( '-Q', '--charge', nargs='+', type=float, help='Membrane charge density (nC/cm2)') def addFs(self): self.add_argument( '--fs', nargs='+', type=float, help='Sonophore coverage fraction (%%)') self.add_argument( '--spanFs', default=False, action='store_true', help='Span Fs from 1 to 100%%') self.to_parse['fs'] = self.parseFs def parseAmplitude(self, args): params = ['Irange', 'Arange', 'intensity', 'amp'] self.restrict(args, params[:-1]) Irange, Arange, Int, A = [args.pop(k) for k in params] if Irange is not None: amps = Intensity2Pressure(self.getDistFromList(Irange) * 1e4) # Pa elif Int is not None: amps = Intensity2Pressure(np.array(Int) * 1e4) # Pa elif Arange is not None: amps = self.getDistFromList(Arange) * self.factors['amp'] # Pa else: amps = np.array(A) * self.factors['amp'] # Pa return amps def parseFs(self, args): if args.pop('spanFs', False): return np.arange(1, 101) * self.factors['fs'] # (-) else: return np.array(args['fs']) * self.factors['fs'] # (-) def parse(self): args = super().parse() for key in ['radius', 'embedding', 'Cm0', 'Qm0', 'freq', 'charge']: args[key] = self.parse2array(args, key, factor=self.factors[key]) return args @staticmethod def parseSimInputs(args): return [args[k] for k in ['freq', 'amp', 'charge']] class NeuronSimParser(SimParser): def __init__(self): super().__init__() self.defaults.update({ 'neuron': 'RS', 'tstim': 100.0, # ms 'toffset': 50. # ms }) self.factors.update({ 'tstim': 1e-3, 'toffset': 1e-3 }) self.addNeuron() self.addTstim() self.addToffset() def addTstim(self): self.add_argument( - '-t', '--tstim', nargs='+', type=float, help='Stimulus duration (ms)') + '-t', '--tstim', nargs='+', type=float, help='Stimulus / burst duration (ms)') def addToffset(self): self.add_argument( '--toffset', nargs='+', type=float, help='Offset duration (ms)') class VClampParser(NeuronSimParser): def __init__(self): super().__init__() self.defaults.update({ 'vhold': -70.0, # mV 'vstep': 0.0 # mV }) self.factors.update({ 'vhold': 1., 'vstep': 1. }) self.addVhold() self.addVstep() def addVhold(self): self.add_argument( '--vhold', nargs='+', type=float, help='Held membrane potential (mV)') def addVstep(self): self.add_argument( '--vstep', nargs='+', type=float, help='Step membrane potential (mV)') self.add_argument( '--vsteprange', type=str, nargs='+', help=f'Step membrane potential range {self.dist_str} (mV)') self.to_parse['vstep'] = self.parseVstep def parseVstep(self, args): vstep_range, vstep = [args.pop(k) for k in ['vsteprange', 'vstep']] if vstep_range is not None: vsteps = self.getDistFromList(vstep_range) # mV else: vsteps = np.array(vstep) # mV return vsteps def parse(self, args=None): if args is None: args = super().parse() for key in ['vhold', 'vstep', 'tstim', 'toffset']: args[key] = self.parse2array(args, key, factor=self.factors[key]) return args @staticmethod def parseSimInputs(args): return [args[k] for k in ['vhold', 'vstep', 'tstim', 'toffset']] class PWSimParser(NeuronSimParser): ''' Generic parser interface to run PW patterned simulations from the command line. ''' def __init__(self): super().__init__() self.defaults.update({ 'PRF': 100.0, # Hz - 'DC': 100.0 # % + 'DC': 100.0, # % }) self.factors.update({ 'PRF': 1., 'DC': 1e-2 }) self.allowed.update({ 'DC': range(101) }) self.addPRF() self.addDC() self.addTitrate() self.addSpikes() def addPRF(self): self.add_argument( '--PRF', nargs='+', type=float, help='PRF (Hz)') def addDC(self): self.add_argument( '--DC', nargs='+', type=float, help='Duty cycle (%%)') self.add_argument( '--spanDC', default=False, action='store_true', help='Span DC from 1 to 100%%') self.to_parse['DC'] = self.parseDC + # def addBRF(self): + # self.add_argument( + # '--BRF', nargs='+', type=float, help='Burst repetition frequency (Hz)') + + # def addNBursts(self): + # self.add_argument( + # '--nbursts', nargs='+', type=float, help='Number of bursts') + def addTitrate(self): self.add_argument( '--titrate', default=False, action='store_true', help='Perform titration') def parseAmplitude(self, args): raise NotImplementedError def parseDC(self, args): if args.pop('spanDC'): return np.arange(1, 101) * self.factors['DC'] # (-) else: return np.array(args['DC']) * self.factors['DC'] # (-) def parse(self, args=None): if args is None: args = super().parse() for key in ['tstim', 'toffset', 'PRF']: args[key] = self.parse2array(args, key, factor=self.factors[key]) return args @staticmethod def parseSimInputs(args): return [args[k] for k in ['amp', 'tstim', 'toffset', 'PRF', 'DC']] class EStimParser(PWSimParser): ''' Parser to run E-STIM simulations from the command line. ''' def __init__(self): super().__init__() self.defaults.update({'amp': 10.0}) # mA/m2 self.factors.update({'amp': 1.}) self.addAmplitude() def addAmplitude(self): self.add_argument( '-A', '--amp', nargs='+', type=float, help='Amplitude of injected current density (mA/m2)') self.add_argument( '--Arange', type=str, nargs='+', help=f'Amplitude range {self.dist_str} (mA/m2)') self.to_parse['amp'] = self.parseAmplitude def addVext(self): self.add_argument( '--Vext', nargs='+', type=float, help='Extracellular potential (mV)') def parseAmplitude(self, args): if args.pop('titrate'): return None Arange, A = [args.pop(k) for k in ['Arange', 'amp']] if Arange is not None: amps = self.getDistFromList(Arange) * self.factors['amp'] # mA/m2 else: amps = np.array(A) * self.factors['amp'] # mA/m2 return amps class AStimParser(PWSimParser, MechSimParser): ''' Parser to run A-STIM simulations from the command line. ''' def __init__(self): MechSimParser.__init__(self) PWSimParser.__init__(self) self.defaults.update({'method': 'sonic'}) self.allowed.update({'method': ['full', 'hybrid', 'sonic']}) self.addMethod() self.addQSSVars() def addMethod(self): self.add_argument( '-m', '--method', nargs='+', type=str, help=f'Numerical integration method ({", ".join(self.allowed["method"])})') self.to_parse['method'] = self.parseMethod def parseMethod(self, args): for item in args['method']: if item not in self.allowed['method']: raise ValueError(f'Unknown method type: "{item}"') return args['method'] def addQSSVars(self): self.add_argument( '--qss', nargs='+', type=str, help='QSS variables') def parseAmplitude(self, args): if args.pop('titrate'): return None return MechSimParser.parseAmplitude(self, args) def parse(self): args = PWSimParser.parse(self, args=MechSimParser.parse(self)) for k in ['Cm0', 'Qm0', 'embedding', 'charge']: del args[k] return args @staticmethod def parseSimInputs(args): return [args['freq']] + PWSimParser.parseSimInputs(args) + [args[k] for k in ['fs', 'method', 'qss']] diff --git a/README.md b/README.md index 96fdbc1..7e757b8 100644 --- a/README.md +++ b/README.md @@ -1,228 +1,242 @@ # Description `PySONIC` is a Python implementation of the **multi-Scale Optimized Neuronal Intramembrane Cavitation (SONIC) model [1]**, a computationally efficient and interpretable model of neuronal intramembrane cavitation. It allows to simulate the responses of various neuron types to ultrasonic (and electrical) stimuli. ## Content of repository ### Models The package contains four model classes: - `Model` defines the generic interface of a model, including mandatory attributes and methods for simulating it. - `BilayerSonophore` defines the underlying **biomechanical model of intramembrane cavitation**. - `PointNeuron` defines an abstract generic interface to **conductance-based point-neuron electrical models**. It is inherited by classes defining the different neuron types with specific membrane dynamics. - `NeuronalBilayerSonophore` defines the **full electromechanical model for any given neuron type**. To do so, it inherits from `BilayerSonophore` and receives a specific `PointNeuron` object at initialization. All model classes contain a `simulate` method to simulate the underlying model's behavior for a given set of stimulation and physiological parameters. The `NeuronalBilayerSonophore.simulate` method contains an additional `method` argument defining whether to perform a detailed (`full`), coarse-grained (`sonic`) or hybrid (`hybrid`) integration of the differential system. ### Solvers Numerical integration routines are implemented outside the models, in a separate `solvers` module: - `PeriodicSolver` integrates a differential system periodically until a stable periodic behavior is detected. - `EventDrivenSolver` integrates a differential system across a specific set of "events" that modulate the stimuluation drive over time. - `HybridSolver` inherits from both `PeriodicSolver`and `EventDrivenSolver`. It integrates a differential system using a hybrid scheme inside each "ON" or "OFF" period: 1. The full ODE system is integrated for a few cycles with a dense time granularity until a periodic stabilization detection 2. The profiles of all variables over the last cycle are resampled to a far lower (i.e. sparse) sampling rate 3. A subset of the ODE system is integrated with a sparse time granularity, while the remaining variables are periodically expanded from their last cycle profile, until the end of the period or that of an predefined update interval. 4. The process is repeated from step 1 ### Neurons Several conductance-based point-neuron models are implemented that inherit from the `PointNeuron` generic interface. Among them, several CNS models: - `CorticalRS`: cortical regular spiking (`RS`) neuron - `CorticalFS`: cortical fast spiking (`FS`) neuron - `CorticalLTS`: cortical low-threshold spiking (`LTS`) neuron - `CorticalIB`: cortical intrinsically bursting (`IB`) neuron - `ThalamicRE`: thalamic reticular (`RE`) neuron - `ThalamoCortical`: thalamo-cortical (`TC`) neuron - `OstukaSTN`: subthalamic nucleus (`STN`) neuron But also some valuable models used in peripheral axon models: - `FrankenhaeuserHuxleyNode`: Amphibian (Xenopus) myelinated fiber node (`FHnode`) - `SweeneyNode`: Mammalian (rabbit) myelinated motor fiber node (`SWnode`) - `MRGNode`: Mammalian (human / cat) myelinated fiber node (`MRGnode`) - `SundtSegment`: Mammalian unmyelinated C-fiber segment (`SUseg`) ### Other modules - `batches`: a generic interface to run simulation batches with or without multiprocessing - `parsers`: command line parsing utilities - `plt`: graphing utilities - `postpro`: post-processing utilities (mostly signal features detection) - `constants`: algorithmic constants used across modules and classes - `utils`: generic utilities # Requirements - Python 3.6+ - Package dependencies (numpy, scipy, ...) are installed automatically upon installation of the package. # Installation - Open a terminal. - Install [git-lfs](https://github.com/git-lfs/git-lfs/wiki/Installation) if not already done - Activate a Python3 environment if needed, e.g. on the tnesrv5 machine: ```source /opt/apps/anaconda3/bin activate``` - Check that the appropriate version of pip is activated: ```pip --version``` - Clone the repository and install the python package: ```git clone https://c4science.ch/diffusion/4670/pysonic.git``` ```cd pysonic``` ```pip install -e .``` # Usage ## Python scripts -You can easily run simulations of any implemented point-neuron model under both electrical and ultrasonic stimuli, and visualize the simulation results, in just a few lines of code: +You can easily run simulations of any implemented point-neuron model under both electrical and ultrasonic stimuli with various temporal protocols, and visualize the simulation results, in just a few lines of code: ```python # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Email: theo.lemaire@epfl.ch -# @Date: 2020-04-04 15:26:28 +# @Date: 2020-04-17 16:09:42 # @Last Modified by: Theo Lemaire -# @Last Modified time: 2020-04-13 16:34:56 +# @Last Modified time: 2020-04-17 18:11:12 + +''' Example script showing how to simulate a point-neuron model upon application + of both electrical and ultrasonic stimuli, with various temporal protocols. +''' import logging import matplotlib.pyplot as plt -from PySONIC.core import PulsedProtocol, NeuronalBilayerSonophore, ElectricDrive, AcousticDrive -from PySONIC.neurons import getPointNeuron from PySONIC.utils import logger +from PySONIC.neurons import getPointNeuron +from PySONIC.core import NeuronalBilayerSonophore, ElectricDrive, AcousticDrive +from PySONIC.core.protocols import * from PySONIC.plt import GroupedTimeSeries # Set logging level logger.setLevel(logging.INFO) # Define point-neuron model and corresponding neuronal bilayer sonophore model pneuron = getPointNeuron('RS') a = 32e-9 # sonophore radius (m) nbls = NeuronalBilayerSonophore(a, pneuron) # Define electric and ultrasonic drives ELdrive = ElectricDrive(20.) # mA/m2 USdrive = AcousticDrive( 500e3, # Hz 100e3) # Pa -# Set pulsing protocol -tstim = 250e-3 # s -toffset = 50e-3 # s +# Pulsing parameters +tburst = 100e-3 # s PRF = 100. # Hz DC = 0.5 # - -pp = PulsedProtocol(tstim, toffset, PRF, DC) - -# Run simulation upon electrical stimulation, and plot results -data, meta = pneuron.simulate(ELdrive, pp) -GroupedTimeSeries([(data, meta)]).render() - -# Run simulation upon ultrasonic stimulation, and plot results -data, meta = nbls.simulate(USdrive, pp) -GroupedTimeSeries([(data, meta)]).render() +BRF = 1.0 # Hz +nbursts = 3 # - + +# Protocols +protocols = [ + TimeProtocol(tburst, 1 / BRF - tburst), + PulsedProtocol(tburst, 1 / BRF - tburst, PRF=PRF, DC=DC), + BurstProtocol(tburst, BRF, nbursts=nbursts, PRF=PRF, DC=DC) +] + +# For each protocol +for p in protocols: + # Run simulation upon electrical stimulation, and plot results + data, meta = pneuron.simulate(ELdrive, p) + GroupedTimeSeries([(data, meta)]).render() + + # Run simulation upon ultrasonic stimulation, and plot results + data, meta = nbls.simulate(USdrive, p) + GroupedTimeSeries([(data, meta)]).render() # Show figures plt.show() ``` ## From the command line ### Running simulations and batches You can easily run simulations of all 3 model types using the dedicated command line scripts. To do so, open a terminal in the `scripts` directory. - Use `run_mech.py` for simulations of the **mechanical model** upon **ultrasonic stimulation**. For instance, for a 32 nm radius bilayer sonophore sonicated at 500 kHz and 100 kPa: ```python run_mech.py -a 32 -f 500 -A 100 -p Z``` - Use `run_estim.py` for simulations of **point-neuron models** upon **intracellular electrical stimulation**. For instance, a regular-spiking (RS) neuron injected with 10 mA/m2 intracellular current for 30 ms: ```python run_estim.py -n RS -A 10 --tstim 30 -p Vm``` - Use `run_astim.py` for simulations of **point-neuron models** upon **ultrasonic stimulation**. For instance, for a coarse-grained simulation of a 32 nm radius bilayer sonophore within a regular-spiking (RS) neuron membrane, sonicated at 500 kHz and 100 kPa for 150 ms: ```python run_astim.py -n RS -a 32 -f 500 -A 100 --tstim 150 --method sonic -p Qm``` Additionally, you can run batches of simulations by specifying more than one value for any given stimulation parameter (e.g. `-A 100 200` for sonication with 100 and 200 kPa respectively). These batches can be parallelized using multiprocessing to optimize performance, with the extra argument `--mpi`. ### Saving and visualizing results By default, simulation results are neither shown, nor saved. To view results directly upon simulation completion, you can use the `-p [xxx]` option, where `[xxx]` can be `all` (to plot all resulting variables) or a given variable name (e.g. `Z` for membrane deflection, `Vm` for membrane potential, `Qm` for membrane charge density). To save simulation results in binary `.pkl` files, you can use the `-s` option. You will be prompted to choose an output directory, unless you also specify it with the `-o ` option. Output files are automatically named from model and simulation parameters to avoid ambiguity. When running simulation batches, it is highly advised to specify the `-s` option in order to save results of each simulation. You can then visualize results at a later stage. To visualize results, use the `plot_timeseries.py` script. You will be prompted to select the output files containing the simulation(s) results. By default, separate figures will be created for each simulation, showing the time profiles of all resulting variables. Here again, you can choose to show only a subset of variables using the `-p [xxx]` option. Moreover, if you select a subset of variables, you can visualize resulting profiles across simulations in comparative figures wih the `--compare` option. Several more options are available. To view them, type in: ```python -h``` # Extend the package ## Add other neuron types You can easily add other neuron types into the package, providing their ion channel populations and underlying voltage-gated dynamics equations are known. To add a new point-neuron model, follow this procedure: 1. Create a new file, and save it in the `neurons` sub-folder, with an explicit name (e.g. `my_neuron.py`). 2. Copy-paste the content of the `template.py` file (also located in the `neurons` sub-folder) into your file. 3. In your file, change the **class name** from `TemplateNeuron` to something more explicit (e.g. `MyNeuron`), and change the **neuron name** accordingly (e.g. `myneuron`). This name is a keyword used to refer to the model from outside the class. Also, comment out the `addSonicFeatures` decorator temporarily. 4. Modify/add **biophysical parameters** of your model (resting parameters, reversal potentials, channel conductances, ionic concentrations, temperatures, diffusion constants, etc...) as class attributes. If some parameters are not fixed and must be computed, assign them to the class inside a `__new__` method, taking the class (`cls`) as sole attribute. 5. Specify a **dictionary of names:descriptions of your different differential states** (i.e. all the differential variables of your model, except for the membrane potential). 6. Modify/add **gating states kinetics** (`alphax` and `betax` methods) that define the voltage-dependent activation and inactivation rates of the different ion channnels gates of your model. Those methods take the membrane potential `Vm` as input and return a rate in `s-1`. Alternatively, your can use steady-state open-probabilties (`xinf`) and adaptation time constants (`taux`) methods. 7. Modify the `derStates` method that defines the **derivatives of your different state variables**. These derivatives are defined inside a dictionary, where each state key is paired to a lambda function that takes the membrane potential `Vm` and a states vector `x` as inputs, and returns the associated state derivative (in `/s`). 8. Modify the `steadyStates` method that defines the **steady-state values of your different state variables**. These steady-states are defined inside a dictionary, where each state key is paired to a lambda function that takes the membrane potential `Vm` as only input, and returns the associated steady-state value (in ``). If some steady-states depend on the values of other-steady states, you can proceed as follows: - define all independent steady-states functions in a dictionary called `lambda_dict` - add dependent steady-state functions to the dictionary, calling `lambda_dict[k](Vm)` for each state `k` whose value is required. 9. Modify/add **membrane currents** (`iXX` methods) of your model. Those methods take relevant gating states and the membrane potential `Vm` as inputs, and must return a current density in `mA/m2`. **You also need to modify the docstring accordingly, as this information is used by the package**. 10. Modify the `currents` method that defines the **membrane currents of your model**. These currents are defined inside a dictionary, where each current key is paired to a lambda function that takes the membrane potential `Vm` and a states vector `x` as inputs, and returns the associated current (in `mA/m2`). 11. Add the neuron class to the package, by importing it in the `__init__.py` file of the `neurons` sub-folder: ```python from .my_neuron import MyNeuron ``` 12. Verify your point-neuron model by running simulations under various electrical stimuli and comparing the output to the neurons's expected behavior. Implement required corrections if any. 13. Uncomment the `addSonicFeatures` decorator on top of your class. **This decorator will automatically parse the `derStates`, `steadyStates` and `currents` methods in order to adapt your neuron model to US stimulation. For this to work, you need to make sure to**: - **keep them as class methods** - **check that all calls to functions that depend solely on `Vm` appear directly in the methods' lambda expressions and are not hidden inside nested function calls.** 14. Pre-compute lookup tables required to run coarse-grained simulations of the neuron model upon ultrasonic stimulation. To do so, go to the `scripts` directory and run the `run_lookups.py` script with the neuron's name as command line argument, e.g.: ```python run_lookups.py -n myneuron --mpi``` If possible, use the `--mpi` argument to enable multiprocessing, as lookups pre-computation greatly benefits from parallelization. That's it! You can now run simulations of your point-neuron model upon ultrasonic stimulation. # Authors Code written and maintained by Theo Lemaire (theo.lemaire@epfl.ch). # License & citation This project is licensed under the MIT License - see the LICENSE file for details. If PySONIC contributes to a project that leads to a scientific publication, please acknowledge this fact by citing [Lemaire, T., Neufeld, E., Kuster, N., and Micera, S. (2019). Understanding ultrasound neuromodulation using a computationally efficient and interpretable model of intramembrane cavitation. J. Neural Eng.](https://iopscience.iop.org/article/10.1088/1741-2552/ab1685) DOI: 10.1088/1741-2552/ab1685 # References [1] Lemaire, T., Neufeld, E., Kuster, N., and Micera, S. (2019). Understanding ultrasound neuromodulation using a computationally efficient and interpretable model of intramembrane cavitation. J. Neural Eng. diff --git a/README.mdpp b/README.mdpp index 1bfee88..730f3dc 100644 --- a/README.mdpp +++ b/README.mdpp @@ -1,180 +1,180 @@ # Description `PySONIC` is a Python implementation of the **multi-Scale Optimized Neuronal Intramembrane Cavitation (SONIC) model [1]**, a computationally efficient and interpretable model of neuronal intramembrane cavitation. It allows to simulate the responses of various neuron types to ultrasonic (and electrical) stimuli. ## Content of repository ### Models The package contains four model classes: - `Model` defines the generic interface of a model, including mandatory attributes and methods for simulating it. - `BilayerSonophore` defines the underlying **biomechanical model of intramembrane cavitation**. - `PointNeuron` defines an abstract generic interface to **conductance-based point-neuron electrical models**. It is inherited by classes defining the different neuron types with specific membrane dynamics. - `NeuronalBilayerSonophore` defines the **full electromechanical model for any given neuron type**. To do so, it inherits from `BilayerSonophore` and receives a specific `PointNeuron` object at initialization. All model classes contain a `simulate` method to simulate the underlying model's behavior for a given set of stimulation and physiological parameters. The `NeuronalBilayerSonophore.simulate` method contains an additional `method` argument defining whether to perform a detailed (`full`), coarse-grained (`sonic`) or hybrid (`hybrid`) integration of the differential system. ### Solvers Numerical integration routines are implemented outside the models, in a separate `solvers` module: - `PeriodicSolver` integrates a differential system periodically until a stable periodic behavior is detected. - `EventDrivenSolver` integrates a differential system across a specific set of "events" that modulate the stimuluation drive over time. - `HybridSolver` inherits from both `PeriodicSolver`and `EventDrivenSolver`. It integrates a differential system using a hybrid scheme inside each "ON" or "OFF" period: 1. The full ODE system is integrated for a few cycles with a dense time granularity until a periodic stabilization detection 2. The profiles of all variables over the last cycle are resampled to a far lower (i.e. sparse) sampling rate 3. A subset of the ODE system is integrated with a sparse time granularity, while the remaining variables are periodically expanded from their last cycle profile, until the end of the period or that of an predefined update interval. 4. The process is repeated from step 1 ### Neurons Several conductance-based point-neuron models are implemented that inherit from the `PointNeuron` generic interface. Among them, several CNS models: - `CorticalRS`: cortical regular spiking (`RS`) neuron - `CorticalFS`: cortical fast spiking (`FS`) neuron - `CorticalLTS`: cortical low-threshold spiking (`LTS`) neuron - `CorticalIB`: cortical intrinsically bursting (`IB`) neuron - `ThalamicRE`: thalamic reticular (`RE`) neuron - `ThalamoCortical`: thalamo-cortical (`TC`) neuron - `OstukaSTN`: subthalamic nucleus (`STN`) neuron But also some valuable models used in peripheral axon models: - `FrankenhaeuserHuxleyNode`: Amphibian (Xenopus) myelinated fiber node (`FHnode`) - `SweeneyNode`: Mammalian (rabbit) myelinated motor fiber node (`SWnode`) - `MRGNode`: Mammalian (human / cat) myelinated fiber node (`MRGnode`) - `SundtSegment`: Mammalian unmyelinated C-fiber segment (`SUseg`) ### Other modules - `batches`: a generic interface to run simulation batches with or without multiprocessing - `parsers`: command line parsing utilities - `plt`: graphing utilities - `postpro`: post-processing utilities (mostly signal features detection) - `constants`: algorithmic constants used across modules and classes - `utils`: generic utilities # Requirements - Python 3.6+ - Package dependencies (numpy, scipy, ...) are installed automatically upon installation of the package. # Installation - Open a terminal. - Install [git-lfs](https://github.com/git-lfs/git-lfs/wiki/Installation) if not already done - Activate a Python3 environment if needed, e.g. on the tnesrv5 machine: ```source /opt/apps/anaconda3/bin activate``` - Check that the appropriate version of pip is activated: ```pip --version``` - Clone the repository and install the python package: ```git clone https://c4science.ch/diffusion/4670/pysonic.git``` ```cd pysonic``` ```pip install -e .``` # Usage ## Python scripts -You can easily run simulations of any implemented point-neuron model under both electrical and ultrasonic stimuli, and visualize the simulation results, in just a few lines of code: +You can easily run simulations of any implemented point-neuron model under both electrical and ultrasonic stimuli with various temporal protocols, and visualize the simulation results, in just a few lines of code: -!INCLUDECODE "examples/run.py" (python) +!INCLUDECODE "examples/sims.py" (python) ## From the command line ### Running simulations and batches You can easily run simulations of all 3 model types using the dedicated command line scripts. To do so, open a terminal in the `scripts` directory. - Use `run_mech.py` for simulations of the **mechanical model** upon **ultrasonic stimulation**. For instance, for a 32 nm radius bilayer sonophore sonicated at 500 kHz and 100 kPa: ```python run_mech.py -a 32 -f 500 -A 100 -p Z``` - Use `run_estim.py` for simulations of **point-neuron models** upon **intracellular electrical stimulation**. For instance, a regular-spiking (RS) neuron injected with 10 mA/m2 intracellular current for 30 ms: ```python run_estim.py -n RS -A 10 --tstim 30 -p Vm``` - Use `run_astim.py` for simulations of **point-neuron models** upon **ultrasonic stimulation**. For instance, for a coarse-grained simulation of a 32 nm radius bilayer sonophore within a regular-spiking (RS) neuron membrane, sonicated at 500 kHz and 100 kPa for 150 ms: ```python run_astim.py -n RS -a 32 -f 500 -A 100 --tstim 150 --method sonic -p Qm``` Additionally, you can run batches of simulations by specifying more than one value for any given stimulation parameter (e.g. `-A 100 200` for sonication with 100 and 200 kPa respectively). These batches can be parallelized using multiprocessing to optimize performance, with the extra argument `--mpi`. ### Saving and visualizing results By default, simulation results are neither shown, nor saved. To view results directly upon simulation completion, you can use the `-p [xxx]` option, where `[xxx]` can be `all` (to plot all resulting variables) or a given variable name (e.g. `Z` for membrane deflection, `Vm` for membrane potential, `Qm` for membrane charge density). To save simulation results in binary `.pkl` files, you can use the `-s` option. You will be prompted to choose an output directory, unless you also specify it with the `-o ` option. Output files are automatically named from model and simulation parameters to avoid ambiguity. When running simulation batches, it is highly advised to specify the `-s` option in order to save results of each simulation. You can then visualize results at a later stage. To visualize results, use the `plot_timeseries.py` script. You will be prompted to select the output files containing the simulation(s) results. By default, separate figures will be created for each simulation, showing the time profiles of all resulting variables. Here again, you can choose to show only a subset of variables using the `-p [xxx]` option. Moreover, if you select a subset of variables, you can visualize resulting profiles across simulations in comparative figures wih the `--compare` option. Several more options are available. To view them, type in: ```python -h``` # Extend the package ## Add other neuron types You can easily add other neuron types into the package, providing their ion channel populations and underlying voltage-gated dynamics equations are known. To add a new point-neuron model, follow this procedure: 1. Create a new file, and save it in the `neurons` sub-folder, with an explicit name (e.g. `my_neuron.py`). 2. Copy-paste the content of the `template.py` file (also located in the `neurons` sub-folder) into your file. 3. In your file, change the **class name** from `TemplateNeuron` to something more explicit (e.g. `MyNeuron`), and change the **neuron name** accordingly (e.g. `myneuron`). This name is a keyword used to refer to the model from outside the class. Also, comment out the `addSonicFeatures` decorator temporarily. 4. Modify/add **biophysical parameters** of your model (resting parameters, reversal potentials, channel conductances, ionic concentrations, temperatures, diffusion constants, etc...) as class attributes. If some parameters are not fixed and must be computed, assign them to the class inside a `__new__` method, taking the class (`cls`) as sole attribute. 5. Specify a **dictionary of names:descriptions of your different differential states** (i.e. all the differential variables of your model, except for the membrane potential). 6. Modify/add **gating states kinetics** (`alphax` and `betax` methods) that define the voltage-dependent activation and inactivation rates of the different ion channnels gates of your model. Those methods take the membrane potential `Vm` as input and return a rate in `s-1`. Alternatively, your can use steady-state open-probabilties (`xinf`) and adaptation time constants (`taux`) methods. 7. Modify the `derStates` method that defines the **derivatives of your different state variables**. These derivatives are defined inside a dictionary, where each state key is paired to a lambda function that takes the membrane potential `Vm` and a states vector `x` as inputs, and returns the associated state derivative (in `/s`). 8. Modify the `steadyStates` method that defines the **steady-state values of your different state variables**. These steady-states are defined inside a dictionary, where each state key is paired to a lambda function that takes the membrane potential `Vm` as only input, and returns the associated steady-state value (in ``). If some steady-states depend on the values of other-steady states, you can proceed as follows: - define all independent steady-states functions in a dictionary called `lambda_dict` - add dependent steady-state functions to the dictionary, calling `lambda_dict[k](Vm)` for each state `k` whose value is required. 9. Modify/add **membrane currents** (`iXX` methods) of your model. Those methods take relevant gating states and the membrane potential `Vm` as inputs, and must return a current density in `mA/m2`. **You also need to modify the docstring accordingly, as this information is used by the package**. 10. Modify the `currents` method that defines the **membrane currents of your model**. These currents are defined inside a dictionary, where each current key is paired to a lambda function that takes the membrane potential `Vm` and a states vector `x` as inputs, and returns the associated current (in `mA/m2`). 11. Add the neuron class to the package, by importing it in the `__init__.py` file of the `neurons` sub-folder: ```python from .my_neuron import MyNeuron ``` 12. Verify your point-neuron model by running simulations under various electrical stimuli and comparing the output to the neurons's expected behavior. Implement required corrections if any. 13. Uncomment the `addSonicFeatures` decorator on top of your class. **This decorator will automatically parse the `derStates`, `steadyStates` and `currents` methods in order to adapt your neuron model to US stimulation. For this to work, you need to make sure to**: - **keep them as class methods** - **check that all calls to functions that depend solely on `Vm` appear directly in the methods' lambda expressions and are not hidden inside nested function calls.** 14. Pre-compute lookup tables required to run coarse-grained simulations of the neuron model upon ultrasonic stimulation. To do so, go to the `scripts` directory and run the `run_lookups.py` script with the neuron's name as command line argument, e.g.: ```python run_lookups.py -n myneuron --mpi``` If possible, use the `--mpi` argument to enable multiprocessing, as lookups pre-computation greatly benefits from parallelization. That's it! You can now run simulations of your point-neuron model upon ultrasonic stimulation. # Authors Code written and maintained by Theo Lemaire (theo.lemaire@epfl.ch). # License & citation This project is licensed under the MIT License - see the LICENSE file for details. If PySONIC contributes to a project that leads to a scientific publication, please acknowledge this fact by citing [Lemaire, T., Neufeld, E., Kuster, N., and Micera, S. (2019). Understanding ultrasound neuromodulation using a computationally efficient and interpretable model of intramembrane cavitation. J. Neural Eng.](https://iopscience.iop.org/article/10.1088/1741-2552/ab1685) DOI: 10.1088/1741-2552/ab1685 # References [1] Lemaire, T., Neufeld, E., Kuster, N., and Micera, S. (2019). Understanding ultrasound neuromodulation using a computationally efficient and interpretable model of intramembrane cavitation. J. Neural Eng. diff --git a/examples/run.py b/examples/run.py deleted file mode 100644 index 61833a2..0000000 --- a/examples/run.py +++ /dev/null @@ -1,46 +0,0 @@ -# -*- coding: utf-8 -*- -# @Author: Theo Lemaire -# @Email: theo.lemaire@epfl.ch -# @Date: 2020-04-04 15:26:28 -# @Last Modified by: Theo Lemaire -# @Last Modified time: 2020-04-13 16:34:56 - -import logging -import matplotlib.pyplot as plt - -from PySONIC.core import PulsedProtocol, NeuronalBilayerSonophore, ElectricDrive, AcousticDrive -from PySONIC.neurons import getPointNeuron -from PySONIC.utils import logger -from PySONIC.plt import GroupedTimeSeries - -# Set logging level -logger.setLevel(logging.INFO) - -# Define point-neuron model and corresponding neuronal bilayer sonophore model -pneuron = getPointNeuron('RS') -a = 32e-9 # sonophore radius (m) -nbls = NeuronalBilayerSonophore(a, pneuron) - -# Define electric and ultrasonic drives -ELdrive = ElectricDrive(20.) # mA/m2 -USdrive = AcousticDrive( - 500e3, # Hz - 100e3) # Pa - -# Set pulsing protocol -tstim = 250e-3 # s -toffset = 50e-3 # s -PRF = 100. # Hz -DC = 0.5 # - -pp = PulsedProtocol(tstim, toffset, PRF, DC) - -# Run simulation upon electrical stimulation, and plot results -data, meta = pneuron.simulate(ELdrive, pp) -GroupedTimeSeries([(data, meta)]).render() - -# Run simulation upon ultrasonic stimulation, and plot results -data, meta = nbls.simulate(USdrive, pp) -GroupedTimeSeries([(data, meta)]).render() - -# Show figures -plt.show() diff --git a/examples/sims.py b/examples/sims.py new file mode 100644 index 0000000..3bb8a3c --- /dev/null +++ b/examples/sims.py @@ -0,0 +1,60 @@ +# -*- coding: utf-8 -*- +# @Author: Theo Lemaire +# @Email: theo.lemaire@epfl.ch +# @Date: 2020-04-17 16:09:42 +# @Last Modified by: Theo Lemaire +# @Last Modified time: 2020-04-17 18:11:12 + +''' Example script showing how to simulate a point-neuron model upon application + of both electrical and ultrasonic stimuli, with various temporal protocols. +''' + +import logging +import matplotlib.pyplot as plt + +from PySONIC.utils import logger +from PySONIC.neurons import getPointNeuron +from PySONIC.core import NeuronalBilayerSonophore, ElectricDrive, AcousticDrive +from PySONIC.core.protocols import * +from PySONIC.plt import GroupedTimeSeries + +# Set logging level +logger.setLevel(logging.INFO) + +# Define point-neuron model and corresponding neuronal bilayer sonophore model +pneuron = getPointNeuron('RS') +a = 32e-9 # sonophore radius (m) +nbls = NeuronalBilayerSonophore(a, pneuron) + +# Define electric and ultrasonic drives +ELdrive = ElectricDrive(20.) # mA/m2 +USdrive = AcousticDrive( + 500e3, # Hz + 100e3) # Pa + +# Pulsing parameters +tburst = 100e-3 # s +PRF = 100. # Hz +DC = 0.5 # - +BRF = 1.0 # Hz +nbursts = 3 # - + +# Protocols +protocols = [ + TimeProtocol(tburst, 1 / BRF - tburst), + PulsedProtocol(tburst, 1 / BRF - tburst, PRF=PRF, DC=DC), + BurstProtocol(tburst, BRF, nbursts=nbursts, PRF=PRF, DC=DC) +] + +# For each protocol +for p in protocols: + # Run simulation upon electrical stimulation, and plot results + data, meta = pneuron.simulate(ELdrive, p) + GroupedTimeSeries([(data, meta)]).render() + + # Run simulation upon ultrasonic stimulation, and plot results + data, meta = nbls.simulate(USdrive, p) + GroupedTimeSeries([(data, meta)]).render() + +# Show figures +plt.show() diff --git a/scripts/run_astim.py b/scripts/run_astim.py index c4a6cb3..8c488f5 100644 --- a/scripts/run_astim.py +++ b/scripts/run_astim.py @@ -1,40 +1,40 @@ # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Email: theo.lemaire@epfl.ch # @Date: 2017-02-13 18:16:09 # @Last Modified by: Theo Lemaire -# @Last Modified time: 2020-04-09 15:28:43 +# @Last Modified time: 2020-04-17 19:31:23 ''' Run A-STIM simulations of a specific point-neuron. ''' from PySONIC.core import NeuronalBilayerSonophore, Batch from PySONIC.utils import logger from PySONIC.parsers import AStimParser def main(): # Parse command line arguments parser = AStimParser() args = parser.parse() logger.setLevel(args['loglevel']) # Create simulation queue queue = NeuronalBilayerSonophore.simQueue( *parser.parseSimInputs(args), outputdir=args['outputdir'], overwrite=args['overwrite']) # Run A-STIM batch logger.info("Starting A-STIM simulation batch") output = [] for a in args['radius']: for pneuron in args['neuron']: nbls = NeuronalBilayerSonophore(a, pneuron) batch = Batch(nbls.simAndSave if args['save'] else nbls.simulate, queue) output += batch(mpi=args['mpi'], loglevel=args['loglevel']) # Plot resulting profiles if args['plot'] is not None: parser.parsePlot(args, output) if __name__ == '__main__': main()