diff --git a/PySONIC/core/pneuron.py b/PySONIC/core/pneuron.py index 0928226..59fb112 100644 --- a/PySONIC/core/pneuron.py +++ b/PySONIC/core/pneuron.py @@ -1,585 +1,585 @@ # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Email: theo.lemaire@epfl.ch # @Date: 2017-08-03 11:53:04 # @Last Modified by: Theo Lemaire -# @Last Modified time: 2019-09-04 11:43:36 +# @Last Modified time: 2019-09-10 14:45:22 import abc import inspect import numpy as np import pandas as pd from .batches import Batch from .model import Model from .lookups import SmartLookup from .simulators import PWSimulator from ..postpro import detectSpikes, computeFRProfile from ..constants import * from ..utils import * class PointNeuron(Model): ''' Generic point-neuron model interface. ''' tscale = 'ms' # relevant temporal scale of the model simkey = 'ESTIM' # keyword used to characterize simulations made with this model def __repr__(self): return self.__class__.__name__ @property @classmethod @abc.abstractmethod def name(cls): ''' Neuron name. ''' raise NotImplementedError @property @classmethod @abc.abstractmethod def Cm0(cls): ''' Neuron's resting capacitance (F/cm2). ''' raise NotImplementedError @property @classmethod @abc.abstractmethod def Vm0(cls): ''' Neuron's resting membrane potential(mV). ''' raise NotImplementedError @classmethod def Qm0(cls): return cls.Cm0 * cls.Vm0 * 1e-3 # C/cm2 @staticmethod def inputs(): return { 'Astim': { 'desc': 'current density amplitude', 'label': 'A', 'unit': 'mA/m2', 'factor': 1e0, 'precision': 1 }, 'tstim': { 'desc': 'stimulus duration', 'label': 't_{stim}', 'unit': 'ms', 'factor': 1e3, 'precision': 0 }, 'toffset': { 'desc': 'offset duration', 'label': 't_{offset}', 'unit': 'ms', 'factor': 1e3, 'precision': 0 }, 'PRF': { 'desc': 'pulse repetition frequency', 'label': 'PRF', 'unit': 'Hz', 'factor': 1e0, 'precision': 0 }, 'DC': { 'desc': 'duty cycle', 'label': 'DC', 'unit': '%', 'factor': 1e2, 'precision': 2 } } @classmethod def filecodes(cls, Astim, tstim, toffset, PRF, DC): is_CW = DC == 1. return { 'simkey': cls.simkey, 'neuron': cls.name, 'nature': 'CW' if is_CW else 'PW', 'Astim': '{:.1f}mAm2'.format(Astim), 'tstim': '{:.0f}ms'.format(tstim * 1e3), 'toffset': None, 'PRF': 'PRF{:.2f}Hz'.format(PRF) if not is_CW else None, 'DC': 'DC{:.2f}%'.format(DC * 1e2) if not is_CW else None } @classmethod def getPltVars(cls, wrapleft='df["', wrapright='"]'): pltvars = { 'Qm': { 'desc': 'membrane charge density', 'label': 'Q_m', 'unit': 'nC/cm^2', 'factor': 1e5, 'bounds': ((cls.Vm0 - 20.0) * cls.Cm0 * 1e2, 60) }, 'Vm': { 'desc': 'membrane potential', 'label': 'V_m', 'unit': 'mV', 'bounds': (-150, 70) }, 'ELeak': { 'constant': 'obj.ELeak', 'desc': 'non-specific leakage current resting potential', 'label': 'V_{leak}', 'unit': 'mV', 'ls': '--', 'color': 'k' } } for cname in cls.getCurrentsNames(): cfunc = getattr(cls, cname) cargs = inspect.getargspec(cfunc)[0][1:] pltvars[cname] = { 'desc': inspect.getdoc(cfunc).splitlines()[0], 'label': 'I_{{{}}}'.format(cname[1:]), 'unit': 'A/m^2', 'factor': 1e-3, 'func': '{}({})'.format(cname, ', '.join(['{}{}{}'.format(wrapleft, a, wrapright) for a in cargs])) } for var in cargs: if var != 'Vm': pltvars[var] = { 'desc': cls.states[var], 'label': var, 'bounds': (-0.1, 1.1) } pltvars['iNet'] = { 'desc': inspect.getdoc(getattr(cls, 'iNet')).splitlines()[0], 'label': 'I_{net}', 'unit': 'A/m^2', 'factor': 1e-3, 'func': 'iNet({0}Vm{1}, {2}{3}{4})'.format( wrapleft, wrapright, wrapleft[:-1], cls.statesNames(), wrapright[1:]), 'ls': '--', 'color': 'black' } pltvars['dQdt'] = { 'desc': inspect.getdoc(getattr(cls, 'dQdt')).splitlines()[0], 'label': 'dQ_m/dt', 'unit': 'A/m^2', 'factor': 1e-3, 'func': 'dQdt({0}Vm{1}, {2}{3}{4})'.format( wrapleft, wrapright, wrapleft[:-1], cls.statesNames(), wrapright[1:]), 'ls': '--', 'color': 'black' } pltvars['iCap'] = { 'desc': inspect.getdoc(getattr(cls, 'iCap')).splitlines()[0], 'label': 'I_{cap}', 'unit': 'A/m^2', 'factor': 1e-3, 'func': 'iCap({0}t{1}, {0}Vm{1})'.format(wrapleft, wrapright) } for rate in cls.rates: if 'alpha' in rate: prefix, suffix = 'alpha', rate[5:] else: prefix, suffix = 'beta', rate[4:] pltvars['{}'.format(rate)] = { 'label': '\\{}_{{{}}}'.format(prefix, suffix), 'unit': 'ms^{-1}', 'factor': 1e-3 } pltvars['FR'] = { 'desc': 'riring rate', 'label': 'FR', 'unit': 'Hz', 'factor': 1e0, # 'bounds': (0, 1e3), 'func': f'firingRateProfile({wrapleft[:-2]})' } return pltvars @classmethod def iCap(cls, t, Vm): ''' Capacitive current. ''' dVdt = np.insert(np.diff(Vm) / np.diff(t), 0, 0.) return cls.Cm0 * dVdt @classmethod def getPltScheme(cls): pltscheme = { 'Q_m': ['Qm'], 'V_m': ['Vm'] } pltscheme['I'] = cls.getCurrentsNames() + ['iNet'] for cname in cls.getCurrentsNames(): if 'Leak' not in cname: key = 'i_{{{}}}\ kin.'.format(cname[1:]) cargs = inspect.getargspec(getattr(cls, cname))[0][1:] pltscheme[key] = [var for var in cargs if var not in ['Vm', 'Cai']] return pltscheme @classmethod def statesNames(cls): ''' Return a list of names of all state variables of the model. ''' return list(cls.states.keys()) @classmethod @abc.abstractmethod def derStates(cls): ''' Dictionary of states derivatives functions ''' raise NotImplementedError @classmethod def getDerStates(cls, Vm, states): ''' Compute states derivatives array given a membrane potential and states dictionary ''' return np.array([cls.derStates()[k](Vm, states) for k in cls.statesNames()]) @classmethod @abc.abstractmethod def steadyStates(cls): ''' Return a dictionary of steady-states functions ''' raise NotImplementedError @classmethod def getSteadyStates(cls, Vm): ''' Compute array of steady-states for a given membrane potential ''' return np.array([cls.steadyStates()[k](Vm) for k in cls.statesNames()]) @classmethod def getDerEffStates(cls, lkp, states): ''' Compute effective states derivatives array given lookups and states dictionaries. ''' return np.array([ cls.derEffStates()[k](lkp, states) for k in cls.statesNames()]) @classmethod def getEffRates(cls, Vm): ''' Compute array of effective rate constants for a given membrane potential vector. ''' return {k: np.mean(np.vectorize(v)(Vm)) for k, v in cls.effRates().items()} @classmethod def getLookup(cls): ''' Get lookup of membrane potential rate constants interpolated along the neuron's charge physiological range. ''' Qmin, Qmax = expandRange(*cls.Qbounds(), 10.) Qref = np.arange(Qmin, Qmax, 1e-5) # C/m2 Vref = Qref / cls.Cm0 * 1e3 # mV tables = {k: np.vectorize(v)(Vref) for k, v in cls.effRates().items()} return SmartLookup({'Q': Qref}, {**{'V': Vref}, **tables}) @classmethod @abc.abstractmethod def currents(cls): ''' Dictionary of ionic currents functions (returning current densities in mA/m2) ''' @classmethod def iNet(cls, Vm, states): ''' net membrane current :param Vm: membrane potential (mV) :states: states of ion channels gating and related variables :return: current per unit area (mA/m2) ''' return sum([cfunc(Vm, states) for cfunc in cls.currents().values()]) @classmethod def dQdt(cls, Vm, states): ''' membrane charge density variation rate :param Vm: membrane potential (mV) :states: states of ion channels gating and related variables :return: variation rate (mA/m2) ''' return -cls.iNet(Vm, states) @classmethod def titrationFunc(cls, *args, **kwargs): ''' Default titration function. ''' return cls.isExcited(*args, **kwargs) @staticmethod def currentToConcentrationRate(z_ion, depth): ''' Compute the conversion factor from a specific ionic current (in mA/m2) into a variation rate of submembrane ion concentration (in M/s). :param: z_ion: ion valence :param depth: submembrane depth (m) :return: conversion factor (Mmol.m-1.C-1) ''' return 1e-6 / (z_ion * depth * FARADAY) @staticmethod def nernst(z_ion, Cion_in, Cion_out, T): ''' Nernst potential of a specific ion given its intra and extracellular concentrations. :param z_ion: ion valence :param Cion_in: intracellular ion concentration :param Cion_out: extracellular ion concentration :param T: temperature (K) :return: ion Nernst potential (mV) ''' return (Rg * T) / (z_ion * FARADAY) * np.log(Cion_out / Cion_in) * 1e3 @staticmethod def vtrap(x, y): ''' Generic function used to compute rate constants. ''' return x / (np.exp(x / y) - 1) @staticmethod def efun(x): ''' Generic function used to compute rate constants. ''' return x / (np.exp(x) - 1) @classmethod def ghkDrive(cls, Vm, Z_ion, Cion_in, Cion_out, T): ''' Use the Goldman-Hodgkin-Katz equation to compute the electrochemical driving force of a specific ion species for a given membrane potential. :param Vm: membrane potential (mV) :param Cin: intracellular ion concentration (M) :param Cout: extracellular ion concentration (M) :param T: temperature (K) :return: electrochemical driving force of a single ion particle (mC.m-3) ''' x = Z_ion * FARADAY * Vm / (Rg * T) * 1e-3 # [-] eCin = Cion_in * cls.efun(-x) # M eCout = Cion_out * cls.efun(x) # M return FARADAY * (eCin - eCout) * 1e6 # mC/m3 @classmethod def getCurrentsNames(cls): return list(cls.currents().keys()) @staticmethod def firingRateProfile(*args, **kwargs): return computeFRProfile(*args, **kwargs) @classmethod def Qbounds(cls): ''' Determine bounds of membrane charge physiological range for a given neuron. ''' return np.array([np.round(cls.Vm0 - 25.0), 50.0]) * cls.Cm0 * 1e-3 # C/m2 @classmethod def isVoltageGated(cls, state): ''' Determine whether a given state is purely voltage-gated or not.''' return 'alpha{}'.format(state.lower()) in cls.rates @classmethod def simQueue(cls, amps, durations, offsets, PRFs, DCs, outputdir=None): ''' Create a serialized 2D array of all parameter combinations for a series of individual parameter sweeps, while avoiding repetition of CW protocols for a given PRF sweep. :param amps: list (or 1D-array) of acoustic amplitudes :param durations: list (or 1D-array) of stimulus durations :param offsets: list (or 1D-array) of stimulus offsets (paired with durations array) :param PRFs: list (or 1D-array) of pulse-repetition frequencies :param DCs: list (or 1D-array) of duty cycle values :return: list of parameters (list) for each simulation ''' if amps is None: amps = [np.nan] DCs = np.array(DCs) queue = [] if 1.0 in DCs: queue += Batch.createQueue(amps, durations, offsets, min(PRFs), 1.0) if np.any(DCs != 1.0): queue += Batch.createQueue(amps, durations, offsets, PRFs, DCs[DCs != 1.0]) for item in queue: if np.isnan(item[0]): item[0] = None return cls.checkOutputDir(queue, outputdir) @staticmethod def checkInputs(Astim, tstim, toffset, PRF, DC): ''' Check validity of electrical stimulation parameters. :param Astim: pulse amplitude (mA/m2) :param tstim: pulse duration (s) :param toffset: offset duration (s) :param PRF: pulse repetition frequency (Hz) :param DC: pulse duty cycle (-) ''' if not all(isinstance(param, float) for param in [Astim, tstim, toffset, DC]): raise TypeError('Invalid stimulation parameters (must be float typed)') if tstim <= 0: raise ValueError('Invalid stimulus duration: {} ms (must be strictly positive)' .format(tstim * 1e3)) if toffset < 0: raise ValueError('Invalid stimulus offset: {} ms (must be positive or null)' .format(toffset * 1e3)) if DC <= 0.0 or DC > 1.0: raise ValueError('Invalid duty cycle: {} (must be within ]0; 1])'.format(DC)) if DC < 1.0: if not isinstance(PRF, float): raise TypeError('Invalid PRF value (must be float typed)') if PRF is None: raise AttributeError('Missing PRF value (must be provided when DC < 1)') if PRF < 1 / tstim: raise ValueError('Invalid PRF: {} Hz (PR interval exceeds stimulus duration)' .format(PRF)) def chooseTimeStep(self): ''' Determine integration time step based on intrinsic temporal properties. ''' dt = DT_EFFECTIVE - if self.name in ['FH', 'sweeney']: + if self.name in ['FH', 'SW']: dt /= 10 return dt @classmethod def derivatives(cls, t, y, Cm=None, Iinj=0.): ''' Compute system derivatives for a given mambrane capacitance and injected current. :param t: specific instant in time (s) :param y: vector of HH system variables at time t :param Cm: membrane capacitance (F/m2) :param Iinj: injected current (mA/m2) :return: vector of system derivatives at time t ''' if Cm is None: Cm = cls.Cm0 Qm, *states = y Vm = Qm / Cm * 1e3 # mV states_dict = dict(zip(cls.statesNames(), states)) dQmdt = (Iinj - cls.iNet(Vm, states_dict)) * 1e-3 # A/m2 return [dQmdt, *cls.getDerStates(Vm, states_dict)] @Model.logNSpikes @Model.checkTitrate('Astim') @Model.addMeta def simulate(self, Astim, tstim, toffset, PRF=100., DC=1.0): ''' Simulate a specific neuron model for a specific set of electrical parameters, and return output data in a dataframe. :param Astim: pulse amplitude (mA/m2) :param tstim: pulse duration (s) :param toffset: offset duration (s) :param PRF: pulse repetition frequency (Hz) :param DC: pulse duty cycle (-) :return: 2-tuple with the output dataframe and computation time. ''' logger.info( '%s: simulation @ A = %sA/m2, t = %ss (%ss offset)%s', self, si_format(Astim, 2, space=' '), *si_format([tstim, toffset], 1, space=' '), (', PRF = {}Hz, DC = {:.2f}%'.format( si_format(PRF, 2, space=' '), DC * 1e2) if DC < 1.0 else '')) # Check validity of stimulation parameters self.checkInputs(Astim, tstim, toffset, PRF, DC) # Set initial conditions y0 = np.array((self.Qm0(), *self.getSteadyStates(self.Vm0))) # Initialize simulator and compute solution logger.debug('Computing solution') simulator = PWSimulator( lambda t, y: self.derivatives(t, y, Iinj=Astim), lambda t, y: self.derivatives(t, y, Iinj=0.)) t, y, stim = simulator( y0, self.chooseTimeStep(), tstim, toffset, PRF, DC) # Prepend initial conditions (prior to stimulation) t, y, stim = simulator.prependSolution(t, y, stim) # Store output in dataframe and return data = pd.DataFrame({ 't': t, 'stimstate': stim, 'Qm': y[:, 0], 'Vm': y[:, 0] / self.Cm0 * 1e3, }) for i in range(len(self.states)): data[self.statesNames()[i]] = y[:, i + 1] return data @classmethod def meta(cls, Astim, tstim, toffset, PRF, DC): return { 'simkey': cls.simkey, 'neuron': cls.name, 'Astim': Astim, 'tstim': tstim, 'toffset': toffset, 'PRF': PRF, 'DC': DC } @staticmethod def getNSpikes(data): ''' Compute number of spikes in charge profile of simulation output. :param data: dataframe containing output time series :return: number of detected spikes ''' return detectSpikes(data)[0].size @staticmethod def getStabilizationValue(data): ''' Determine stabilization value from the charge profile of a simulation output. :param data: dataframe containing output time series :return: charge stabilization value (or np.nan if no stabilization detected) ''' # Extract charge signal posterior to observation window t, Qm = [data[key].values for key in ['t', 'Qm']] if t.max() <= TMIN_STABILIZATION: raise ValueError('solution length is too short to assess stabilization') Qm = Qm[t > TMIN_STABILIZATION] # Compute variation range Qm_range = np.ptp(Qm) logger.debug('%.2f nC/cm2 variation range over the last %.0f ms, Qmf = %.2f nC/cm2', Qm_range * 1e5, TMIN_STABILIZATION * 1e3, Qm[-1] * 1e5) # Return final value only if stabilization is detected if np.ptp(Qm) < QSS_Q_DIV_THR: return Qm[-1] else: return np.nan @classmethod def isExcited(cls, data): ''' Determine if neuron is excited from simulation output. :param data: dataframe containing output time series :return: boolean stating whether neuron is excited or not ''' return cls.getNSpikes(data) > 0 @classmethod def isSilenced(cls, data): ''' Determine if neuron is silenced from simulation output. :param data: dataframe containing output time series :return: boolean stating whether neuron is silenced or not ''' return not np.isnan(cls.getStabilizationValue(data)) def titrate(self, tstim, toffset, PRF, DC, xfunc=None, Arange=(0., 2 * AMP_UPPER_BOUND_ESTIM)): ''' Use a binary search to determine the threshold amplitude needed to obtain neural excitation for a given duration, PRF and duty cycle. :param tstim: duration of US stimulation (s) :param toffset: duration of the offset (s) :param PRF: pulse repetition frequency (Hz) :param DC: pulse duty cycle (-) :param xfunc: function determining whether condition is reached from simulation output :param Arange: search interval for Astim, iteratively refined :return: excitation threshold amplitude (mA/m2) ''' # Default output function if xfunc is None: xfunc = self.titrationFunc return binarySearch( lambda x: xfunc(self.simulate(*x)[0]), [tstim, toffset, PRF, DC], 0, Arange, THRESHOLD_CONV_RANGE_ESTIM ) diff --git a/PySONIC/neurons/sweeney.py b/PySONIC/neurons/sweeney.py index d1fb8b6..29c5afe 100644 --- a/PySONIC/neurons/sweeney.py +++ b/PySONIC/neurons/sweeney.py @@ -1,98 +1,102 @@ # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Email: theo.lemaire@epfl.ch # @Date: 2019-06-11 15:58:38 # @Last Modified by: Theo Lemaire -# @Last Modified time: 2019-08-26 14:45:26 +# @Last Modified time: 2019-09-10 14:59:35 import numpy as np from ..core import PointNeuron class SweeneyNode(PointNeuron): ''' Mammalian myelinated motor fiber fiber node. - Reference: + References: *Sweeney, J.D., Mortimer, J.T., and Durand, D. (1987). Modeling of mammalian myelinated nerve for functional neuromuscular stimulation. IEEE 9th Annual Conference of the Engineering in Medicine and Biology Society 3, 1577–1578.* + + Corrections of maximal conductances and alpham rate constant according to: + *Basser, P.J., and Roth, B.J. (1991). Stimulation of a myelinated nerve axon + by electromagnetic induction. Med Biol Eng Comput 29, 261–268.* ''' # Neuron name - name = 'sweeney' + name = 'SW' # ------------------------------ Biophysical parameters ------------------------------ # Resting parameters Cm0 = 2.5e-2 # Membrane capacitance (F/m2) Vm0 = -80.0 # Membrane potential (mV) # Reversal potentials (mV) ENa = 35.64 # Sodium ELeak = -80.01 # Non-specific leakage # Maximal channel conductances (S/m2) gNabar = 1445e1 # Sodium gLeak = 128e1 # Non-specific leakage # ------------------------------ States names & descriptions ------------------------------ states = { 'm': 'iNa activation gate', 'h': 'iNa inactivation gate', } # ------------------------------ Gating states kinetics ------------------------------ @classmethod def alpham(cls, Vm): - return (126 + 0.363 * Vm) / (1 + np.exp(-(Vm + 49) / 53)) * 1e3 # s-1 + return (126 + 0.363 * Vm) / (1 + np.exp(-(Vm + 49) / 5.3)) * 1e3 # s-1 @classmethod def betam(cls, Vm): return cls.alpham(Vm) / (np.exp((Vm + 56.2) / 4.17)) # s-1 @classmethod def betah(cls, Vm): return 15.6 / (1 + np.exp(-(Vm + 56) / 10)) * 1e3 # s-1 @classmethod def alphah(cls, Vm): return cls.betah(Vm) / np.exp((Vm + 74.5) / 5) # s-1 # ------------------------------ States derivatives ------------------------------ @classmethod def derStates(cls): return { 'm': lambda Vm, x: cls.alpham(Vm) * (1 - x['m']) - cls.betam(Vm) * x['m'], 'h': lambda Vm, x: cls.alphah(Vm) * (1 - x['h']) - cls.betah(Vm) * x['h'] } # ------------------------------ Steady states ------------------------------ @classmethod def steadyStates(cls): return { 'm': lambda Vm: cls.alpham(Vm) / (cls.alpham(Vm) + cls.betam(Vm)), 'h': lambda Vm: cls.alphah(Vm) / (cls.alphah(Vm) + cls.betah(Vm)) } # ------------------------------ Membrane currents ------------------------------ @classmethod def iNa(cls, m, h, Vm): ''' Sodium current ''' return cls.gNabar * m**2 * h * (Vm - cls.ENa) # mA/m2 @classmethod def iLeak(cls, Vm): ''' non-specific leakage current ''' return cls.gLeak * (Vm - cls.ELeak) # mA/m2 @classmethod def currents(cls): return { 'iNa': lambda Vm, x: cls.iNa(x['m'], x['h'], Vm), 'iLeak': lambda Vm, _: cls.iLeak(Vm) } diff --git a/PySONIC/neurons/sweeney_lookups.pkl b/PySONIC/neurons/sweeney_lookups.pkl deleted file mode 100644 index d6fb30b..0000000 Binary files a/PySONIC/neurons/sweeney_lookups.pkl and /dev/null differ diff --git a/PySONIC/parsers.py b/PySONIC/parsers.py index 6032df4..5569f45 100644 --- a/PySONIC/parsers.py +++ b/PySONIC/parsers.py @@ -1,666 +1,666 @@ # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Email: theo.lemaire@epfl.ch # @Date: 2019-06-04 18:24:29 # @Last Modified by: Theo Lemaire -# @Last Modified time: 2019-08-30 16:59:54 +# @Last Modified time: 2019-09-06 16:07:23 import os import logging import pprint import numpy as np import matplotlib.pyplot as plt from argparse import ArgumentParser from .utils import Intensity2Pressure, selectDirDialog, OpenFilesDialog, isIterable from .neurons import getPointNeuron, CorticalRS from .plt import GroupedTimeSeries, CompTimeSeries DEFAULT_OUTPUT_FOLDER = os.path.abspath(os.path.split(__file__)[0] + '../../../../dump') class Parser(ArgumentParser): ''' Generic parser interface. ''' dist_str = '[scale min max n]' def __init__(self): super().__init__() self.pp = pprint.PrettyPrinter(indent=4) self.defaults = {} self.allowed = {} self.factors = {} self.to_parse = {} self.addPlot() self.addVerbose() def pprint(self, args): self.pp.pprint(args) def getDistribution(self, xmin, xmax, nx, scale='lin'): if scale == 'log': xmin, xmax = np.log10(xmin), np.log10(xmax) return {'lin': np.linspace, 'log': np.logspace}[scale](xmin, xmax, nx) def getDistFromList(self, xlist): if not isinstance(xlist, list): raise TypeError('Input must be a list') if len(xlist) != 4: raise ValueError('List must contain exactly 4 arguments ([type, min, max, n])') scale = xlist[0] if scale not in ('log', 'lin'): raise ValueError('Unknown distribution type (must be "lin" or "log")') xmin, xmax = [float(x) for x in xlist[1:-1]] if xmin >= xmax: raise ValueError('Specified minimum higher or equal than specified maximum') nx = int(xlist[-1]) if nx < 2: raise ValueError('Specified number must be at least 2') return self.getDistribution(xmin, xmax, nx, scale=scale) def addVerbose(self): self.add_argument( '-v', '--verbose', default=False, action='store_true', help='Increase verbosity') self.to_parse['loglevel'] = self.parseLogLevel def addPlot(self): self.add_argument( '-p', '--plot', type=str, nargs='+', help='Variables to plot') self.to_parse['pltscheme'] = self.parsePltScheme def addMPI(self): self.add_argument( '--mpi', default=False, action='store_true', help='Use multiprocessing') def addTest(self): self.add_argument( '--test', default=False, action='store_true', help='Run test configuration') def addSave(self): self.add_argument( '-s', '--save', default=False, action='store_true', help='Save output figure(s)') def addFigureExtension(self): self.add_argument( '--figext', type=str, default='png', help='Figure type (extension)') def addCompare(self, desc='Comparative graph'): self.add_argument( '--compare', default=False, action='store_true', help=desc) def addSamplingRate(self): self.add_argument( '--sr', type=int, default=1, help='Sampling rate for plot') def addSpikes(self): self.add_argument( '--spikes', type=str, default='none', help='How to indicate spikes on charge profile ("none", "marks" or "details")') def addNColumns(self): self.add_argument( '--ncol', type=int, default=1, help='Number of columns in figure') def addNLevels(self): self.add_argument( '--nlevels', type=int, default=10, help='Number of levels') def addHideOutput(self): self.add_argument( '--hide', default=False, action='store_true', help='Hide output') def addTimeRange(self, default=None): self.add_argument( '--trange', type=float, nargs=2, default=default, help='Time lower and upper bounds (ms)') self.to_parse['trange'] = self.parseTimeRange def addPotentialBounds(self, default=None): self.add_argument( '--Vbounds', type=float, nargs=2, default=default, help='Membrane potential lower and upper bounds (mV)') def addFiringRateBounds(self, default): self.add_argument( '--FRbounds', type=float, nargs=2, default=default, help='Firing rate lower and upper bounds (Hz)') def addFiringRateScale(self, default='lin'): self.add_argument( '--FRscale', type=str, choices=('lin', 'log'), default=default, help='Firing rate scale for plot ("lin" or "log")') def addCmap(self, default=None): self.add_argument( '--cmap', type=str, default=default, help='Colormap name') def addCscale(self, default='lin'): self.add_argument( '--cscale', type=str, default=default, choices=('lin', 'log'), help='Color scale ("lin" or "log")') def addInputDir(self, dep_key=None): self.inputdir_dep_key = dep_key self.add_argument( '-i', '--inputdir', type=str, help='Input directory') self.to_parse['inputdir'] = self.parseInputDir def addOutputDir(self, dep_key=None): self.outputdir_dep_key = dep_key self.add_argument( '-o', '--outputdir', type=str, help='Output directory') self.to_parse['outputdir'] = self.parseOutputDir def addInputFiles(self, dep_key=None): self.inputfiles_dep_key = dep_key self.add_argument( '-i', '--inputfiles', type=str, help='Input files') self.to_parse['inputfiles'] = self.parseInputFiles def addPatches(self): self.add_argument( '--patches', type=str, default='one', help='Stimulus patching mode ("none", "one", all", or a boolean list)') self.to_parse['patches'] = self.parsePatches def addThresholdCurve(self): self.add_argument( '--threshold', default=False, action='store_true', help='Show threshold amplitudes') def addNeuron(self): self.add_argument( '-n', '--neuron', type=str, nargs='+', help='Neuron name (string)') self.to_parse['neuron'] = self.parseNeuron def parseNeuron(self, args): return [getPointNeuron(n) for n in args['neuron']] def addInteractive(self): self.add_argument( '--interactive', default=False, action='store_true', help='Make interactive') def addLabels(self): self.add_argument( '--labels', type=str, nargs='+', default=None, help='Labels') def addRelativeTimeBounds(self): self.add_argument( '--rel_tbounds', type=float, nargs='+', default=None, help='Relative time lower and upper bounds') def addPretty(self): self.add_argument( '--pretty', default=False, action='store_true', help='Make figure pretty') def addSubset(self, choices): self.add_argument( '--subset', type=str, nargs='+', default=['all'], choices=choices + ['all'], help='Run specific subset(s)') self.subset_choices = choices self.to_parse['subset'] = self.parseSubset def parseSubset(self, args): if args['subset'] == ['all']: return self.subset_choices else: return args['subset'] def parseTimeRange(self, args): if args['trange'] is None: return None return np.array(args['trange']) * 1e-3 def parsePatches(self, args): if args['patches'] not in ('none', 'one', 'all'): return eval(args['patches']) else: return args['patches'] def parseInputFiles(self, args): if self.inputfiles_dep_key is not None and not args[self.inputfiles_dep_key]: return None elif args['inputfiles'] is None: return OpenFilesDialog('pkl')[0] def parseDir(self, key, args, title, dep_key=None): if dep_key is not None and args[dep_key] is False: return None try: if args[key] is not None: return os.path.abspath(args[key]) else: return selectDirDialog(title=title) except ValueError: raise ValueError('No {} selected'.format(key)) def parseInputDir(self, args): return self.parseDir( 'inputdir', args, 'Select input directory', self.inputdir_dep_key) def parseOutputDir(self, args): if hasattr(self, 'outputdir') and self.outputdir is not None: return self.outputdir else: if args['outputdir'] is not None and args['outputdir'] == 'dump': return DEFAULT_OUTPUT_FOLDER else: return self.parseDir( 'outputdir', args, 'Select output directory', self.outputdir_dep_key) def parseLogLevel(self, args): return logging.DEBUG if args.pop('verbose') else logging.INFO def parsePltScheme(self, args): if args['plot'] is None or args['plot'] == ['all']: return None else: return {x: [x] for x in args['plot']} def restrict(self, args, keys): if sum([args[x] is not None for x in keys]) > 1: raise ValueError( 'You must provide only one of the following arguments: {}'.format(', '.join(keys))) def parse2array(self, args, key, factor=1): return np.array(args[key]) * factor def parse(self): args = vars(super().parse_args()) for k, v in self.defaults.items(): if k in args and args[k] is None: args[k] = v if isIterable(v) else [v] for k, parse_method in self.to_parse.items(): args[k] = parse_method(args) return args @staticmethod def parsePlot(args, output): render_args = {} if 'spikes' in args: render_args['spikes'] = args['spikes'] if args['compare']: if args['plot'] == ['all']: logger.error('Specific variables must be specified for comparative plots') return for key in ['cmap', 'cscale']: if key in args: render_args[key] = args[key] for pltvar in args['plot']: comp_plot = CompTimeSeries(output, pltvar) comp_plot.render(**render_args) else: scheme_plot = GroupedTimeSeries(output, pltscheme=args['pltscheme']) scheme_plot.render(**render_args) plt.show() class TestParser(Parser): def __init__(self, valid_subsets): super().__init__() self.addProfiling() self.addSubset(valid_subsets) def addProfiling(self): self.add_argument( '--profile', default=False, action='store_true', help='Run with profiling') class FigureParser(Parser): def __init__(self, valid_subsets): super().__init__() self.addSubset(valid_subsets) self.addSave() self.addOutputDir(dep_key='save') class PlotParser(Parser): def __init__(self): super().__init__() self.addHideOutput() self.addInputFiles() self.addOutputDir(dep_key='save') self.addSave() self.addFigureExtension() self.addCmap() self.addPretty() self.addTimeRange() self.addCscale() self.addLabels() class TimeSeriesParser(PlotParser): def __init__(self): super().__init__() self.addSpikes() self.addSamplingRate() self.addCompare() self.addPatches() class SimParser(Parser): ''' Generic simulation parser interface. ''' def __init__(self, outputdir=None): super().__init__() self.outputdir = outputdir self.addMPI() self.addOutputDir(dep_key='save') self.addSave() self.addCompare() self.addCmap() self.addCscale() def parse(self): args = super().parse() return args class MechSimParser(SimParser): ''' Parser to run mechanical simulations from the command line. ''' def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.defaults.update({ 'radius': 32.0, # nm 'embedding': 0., # um 'Cm0': CorticalRS.Cm0 * 1e2, # uF/m2 'Qm0': CorticalRS.Qm0() * 1e5, # nC/m2 'freq': 500.0, # kHz 'amp': 100.0, # kPa 'charge': 0., # nC/cm2 'fs': 100. # % }) self.factors.update({ 'radius': 1e-9, 'embedding': 1e-6, 'Cm0': 1e-2, 'Qm0': 1e-5, 'freq': 1e3, 'amp': 1e3, 'charge': 1e-5, 'fs': 1e-2 }) self.addRadius() self.addEmbedding() self.addCm0() self.addQm0() self.addFdrive() self.addAdrive() self.addCharge() self.addFs() def addRadius(self): self.add_argument( '-a', '--radius', nargs='+', type=float, help='Sonophore radius (nm)') def addEmbedding(self): self.add_argument( '--embedding', nargs='+', type=float, help='Embedding depth (um)') def addCm0(self): self.add_argument( '--Cm0', type=float, nargs='+', help='Resting membrane capacitance (uF/cm2)') def addQm0(self): self.add_argument( '--Qm0', type=float, nargs='+', help='Resting membrane charge density (nC/cm2)') def addFdrive(self): self.add_argument( '-f', '--freq', nargs='+', type=float, help='US frequency (kHz)') def addAdrive(self): self.add_argument( '-A', '--amp', nargs='+', type=float, help='Acoustic pressure amplitude (kPa)') self.add_argument( '--Arange', type=str, nargs='+', help='Amplitude range {} (kPa)'.format(self.dist_str)) self.add_argument( '-I', '--intensity', nargs='+', type=float, help='Acoustic intensity (W/cm2)') self.add_argument( '--Irange', type=str, nargs='+', help='Intensity range {} (W/cm2)'.format(self.dist_str)) self.to_parse['amp'] = self.parseAmp def addAscale(self, default='lin'): self.add_argument( '--Ascale', type=str, choices=('lin', 'log'), default=default, help='Amplitude scale for plot ("lin" or "log")') def addCharge(self): self.add_argument( '-Q', '--charge', nargs='+', type=float, help='Membrane charge density (nC/cm2)') def addFs(self): self.add_argument( '--fs', nargs='+', type=float, help='Sonophore coverage fraction (%%)') self.add_argument( '--spanFs', default=False, action='store_true', help='Span Fs from 1 to 100%%') self.to_parse['fs'] = self.parseFs def parseAmp(self, args): params = ['Irange', 'Arange', 'intensity', 'amp'] self.restrict(args, params[:-1]) Irange, Arange, Int, Adrive = [args.pop(k) for k in params] if Irange is not None: amps = Intensity2Pressure(self.getDistFromList(Irange) * 1e4) # Pa elif Int is not None: amps = Intensity2Pressure(np.array(Int) * 1e4) # Pa elif Arange is not None: amps = self.getDistFromList(Arange) * self.factors['amp'] # Pa else: amps = np.array(Adrive) * self.factors['amp'] # Pa return amps def parseFs(self, args): if args.pop('spanFs', False): return np.arange(1, 101) * self.factors['fs'] # (-) else: return np.array(args['fs']) * self.factors['fs'] # (-) def parse(self): args = super().parse() for key in ['radius', 'embedding', 'Cm0', 'Qm0', 'freq', 'charge']: args[key] = self.parse2array(args, key, factor=self.factors[key]) return args @staticmethod def parseSimInputs(args): return [args[k] for k in ['freq', 'amp', 'charge']] class NeuronSimParser(SimParser): def __init__(self): super().__init__() self.defaults.update({ 'neuron': 'RS', 'tstim': 100.0, # ms 'toffset': 50. # ms }) self.factors.update({ 'tstim': 1e-3, 'toffset': 1e-3 }) self.addNeuron() self.addTstim() self.addToffset() def addTstim(self): self.add_argument( '-t', '--tstim', nargs='+', type=float, help='Stimulus duration (ms)') def addToffset(self): self.add_argument( '--toffset', nargs='+', type=float, help='Offset duration (ms)') class VClampParser(NeuronSimParser): def __init__(self): super().__init__() self.defaults.update({ 'vhold': -70.0, # mV 'vstep': 0.0 # mV }) self.factors.update({ 'vhold': 1., 'vstep': 1. }) self.addVhold() self.addVstep() def addVhold(self): self.add_argument( '--vhold', nargs='+', type=float, help='Held membrane potential (mV)') def addVstep(self): self.add_argument( '--vstep', nargs='+', type=float, help='Step membrane potential (mV)') def parse(self, args=None): if args is None: args = super().parse() for key in ['vhold', 'vstep', 'tstim', 'toffset']: args[key] = self.parse2array(args, key, factor=self.factors[key]) return args @staticmethod def parseSimInputs(args): return [args[k] for k in ['vhold', 'vstep', 'tstim', 'toffset']] class PWSimParser(NeuronSimParser): ''' Generic parser interface to run PW patterned simulations from the command line. ''' def __init__(self): super().__init__() self.defaults.update({ 'PRF': 100.0, # Hz 'DC': 100.0 # % }) self.factors.update({ 'PRF': 1., 'DC': 1e-2 }) self.allowed.update({ 'DC': range(101) }) self.addPRF() self.addDC() self.addTitrate() self.addSpikes() def addPRF(self): self.add_argument( '--PRF', nargs='+', type=float, help='PRF (Hz)') def addDC(self): self.add_argument( '--DC', nargs='+', type=float, help='Duty cycle (%%)') self.add_argument( '--spanDC', default=False, action='store_true', help='Span DC from 1 to 100%%') self.to_parse['DC'] = self.parseDC def addTitrate(self): self.add_argument( '--titrate', default=False, action='store_true', help='Perform titration') def parseAmp(self, args): return NotImplementedError def parseDC(self, args): if args.pop('spanDC'): return np.arange(1, 101) * self.factors['DC'] # (-) else: return np.array(args['DC']) * self.factors['DC'] # (-) def parse(self, args=None): if args is None: args = super().parse() for key in ['tstim', 'toffset', 'PRF']: args[key] = self.parse2array(args, key, factor=self.factors[key]) return args @staticmethod def parseSimInputs(args): return [args[k] for k in ['amp', 'tstim', 'toffset', 'PRF', 'DC']] class EStimParser(PWSimParser): ''' Parser to run E-STIM simulations from the command line. ''' def __init__(self): super().__init__() self.defaults.update({'amp': 10.0}) # mA/m2 self.factors.update({'amp': 1.}) self.addAstim() def addAstim(self): self.add_argument( '-A', '--amp', nargs='+', type=float, help='Amplitude of injected current density (mA/m2)') self.add_argument( '--Arange', type=str, nargs='+', help='Amplitude range {} (mA/m2)'.format(self.dist_str)) self.to_parse['amp'] = self.parseAmp def addVext(self): self.add_argument( '--Vext', nargs='+', type=float, help='Extracellular potential (mV)') def parseAmp(self, args): if args.pop('titrate'): return None Arange, Astim = [args.pop(k) for k in ['Arange', 'amp']] if Arange is not None: amps = self.getDistFromList(Arange) * self.factors['amp'] # mA/m2 else: amps = np.array(Astim) * self.factors['amp'] # mA/m2 return amps class AStimParser(PWSimParser, MechSimParser): ''' Parser to run A-STIM simulations from the command line. ''' def __init__(self): MechSimParser.__init__(self) PWSimParser.__init__(self) self.defaults.update({'method': 'sonic'}) self.allowed.update({'method': ['full', 'hybrid', 'sonic']}) self.addMethod() def addMethod(self): self.add_argument( '-m', '--method', nargs='+', type=str, help='Numerical integration method ({})'.format(', '.join(self.allowed['method']))) self.to_parse['method'] = self.parseMethod def parseMethod(self, args): for item in args['method']: if item not in self.allowed['method']: raise ValueError('Unknown method type: "{}"'.format(item)) return args['method'] def parseAmp(self, args): if args.pop('titrate'): return None return MechSimParser.parseAmp(self, args) def parse(self): args = PWSimParser.parse(self, args=MechSimParser.parse(self)) for k in ['Cm0', 'Qm0', 'embedding', 'charge']: del args[k] return args @staticmethod def parseSimInputs(args): return [args['freq']] + PWSimParser.parseSimInputs(args) + [args[k] for k in ['fs', 'method']] diff --git a/tests/test_sims.py b/tests/test_sims.py index 18470e9..b6dd93b 100644 --- a/tests/test_sims.py +++ b/tests/test_sims.py @@ -1,103 +1,103 @@ # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Email: theo.lemaire@epfl.ch # @Date: 2017-06-14 18:37:45 # @Last Modified by: Theo Lemaire -# @Last Modified time: 2019-08-29 20:11:25 +# @Last Modified time: 2019-09-10 14:45:32 ''' Test the basic functionalities of the package. ''' from PySONIC.core import BilayerSonophore, NeuronalBilayerSonophore from PySONIC.utils import logger from PySONIC.neurons import getPointNeuron, getNeuronsDict from PySONIC.test import TestBase class TestSims(TestBase): def test_MECH(self, is_profiled=False): logger.info('Test: running MECH simulation') a = 32e-9 # m Qm0 = -80e-5 # membrane resting charge density (C/m2) Cm0 = 1e-2 # membrane resting capacitance (F/m2) Fdrive = 350e3 # Hz Adrive = 100e3 # Pa Qm = 50e-5 # C/m2 bls = BilayerSonophore(a, Cm0, Qm0) self.execute('bls.simulate(Fdrive, Adrive, Qm)', globals(), locals(), is_profiled) def test_ESTIM(self, is_profiled=False): logger.info('Test: running ESTIM simulation') Astim = 10.0 # mA/m2 tstim = 100e-3 # s toffset = 50e-3 # s pneuron = getPointNeuron('RS') self.execute('pneuron.simulate(Astim, tstim, toffset)', globals(), locals(), is_profiled) def test_ASTIM_sonic(self, is_profiled=False): logger.info('Test: ASTIM sonic simulation') a = 32e-9 # m Fdrive = 500e3 # Hz Adrive = 100e3 # Pa tstim = 50e-3 # s toffset = 10e-3 # s pneuron = getPointNeuron('RS') nbls = NeuronalBilayerSonophore(a, pneuron) # test error 1: sonophore radius outside of lookup range try: nbls = NeuronalBilayerSonophore(100e-9, pneuron) nbls.simulate(Fdrive, Adrive, tstim, toffset, method='sonic') except ValueError: logger.debug('Out of range radius: OK') # test error 2: frequency outside of lookups range try: nbls = NeuronalBilayerSonophore(a, pneuron) nbls.simulate(10e3, Adrive, tstim, toffset, method='sonic') except ValueError: logger.debug('Out of range frequency: OK') # test error 3: amplitude outside of lookups range try: nbls = NeuronalBilayerSonophore(a, pneuron) nbls.simulate(Fdrive, 1e6, tstim, toffset, method='sonic') except ValueError: logger.debug('Out of range amplitude: OK') # Run simulation on all neurons for name, neuron_class in getNeuronsDict().items(): - if name not in ('template', 'LeechP', 'LeechT', 'LeechR', 'sweeney'): + if name not in ('template', 'LeechP', 'LeechT', 'LeechR', 'SW'): pneuron = neuron_class() nbls = NeuronalBilayerSonophore(a, pneuron) self.execute("nbls.simulate(Fdrive, Adrive, tstim, toffset, method='sonic')", globals(), locals(), is_profiled) def test_ASTIM_full(self, is_profiled=False): logger.info('Test: running ASTIM detailed simulation') a = 32e-9 # m Fdrive = 500e3 # Hz Adrive = 100e3 # Pa tstim = 1e-6 # s toffset = 1e-6 # s pneuron = getPointNeuron('RS') nbls = NeuronalBilayerSonophore(a, pneuron) self.execute("nbls.simulate(Fdrive, Adrive, tstim, toffset, method='full')", globals(), locals(), is_profiled) def test_ASTIM_hybrid(self, is_profiled=False): logger.info('Test: running ASTIM hybrid simulation') a = 32e-9 # m Fdrive = 350e3 # Hz Adrive = 100e3 # Pa tstim = 0.6e-3 # s toffset = 0.1e-3 # s pneuron = getPointNeuron('RS') nbls = NeuronalBilayerSonophore(a, pneuron) self.execute("nbls.simulate(Fdrive, Adrive, tstim, toffset, method='hybrid')", globals(), locals(), is_profiled) if __name__ == '__main__': tester = TestSims() tester.main()