diff --git a/PySONIC/core/model.py b/PySONIC/core/model.py index c3c0bb0..bc5675b 100644 --- a/PySONIC/core/model.py +++ b/PySONIC/core/model.py @@ -1,102 +1,116 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Date: 2017-08-03 11:53:04 # @Email: theo.lemaire@epfl.ch # @Last Modified by: Theo Lemaire -# @Last Modified time: 2019-06-02 13:23:59 +# @Last Modified time: 2019-06-02 15:34:25 +import os import pickle import abc import inspect import numpy as np from .batches import createQueue -from ..utils import logger +from ..utils import logger, loadData class Model(metaclass=abc.ABCMeta): ''' Generic model interface. ''' @property @abc.abstractmethod def tscale(self): ''' Relevant temporal scale of the model. ''' raise NotImplementedError @property @abc.abstractmethod def __repr__(self): raise NotImplementedError def params(self): ''' Gather all model parameters in a dictionary ''' def toAvoid(p): return (p.startswith('__') and p.endswith('__')) or p.startswith('_abc_') class_attrs = inspect.getmembers(self.__class__, lambda a: not(inspect.isroutine(a))) inst_attrs = inspect.getmembers(self, lambda a: not(inspect.isroutine(a))) class_attrs = [a for a in class_attrs if not toAvoid(a[0])] inst_attrs = [a for a in inst_attrs if not toAvoid(a[0]) and a not in class_attrs] params_dict = {a[0]: a[1] for a in class_attrs + inst_attrs} return params_dict @property @abc.abstractmethod def filecode(self, *args): raise NotImplementedError def getDesc(self): return inspect.getdoc(self).splitlines()[0] @property @abc.abstractmethod def getPltScheme(self): raise NotImplementedError @property @abc.abstractmethod def getPltVars(self, *args, **kwargs): raise NotImplementedError @property @abc.abstractmethod def checkInputs(self, *args): raise NotImplementedError @property @abc.abstractmethod def meta(self, *args): raise NotImplementedError @property @abc.abstractmethod def simulate(self, *args, **kwargs): raise NotImplementedError def simQueue(self, *args): ''' Create a simulation queue from a combination of simulation parameters. ''' return createQueue(*args) def runAndSave(self, outdir, *args): ''' Simulate system and save results in a PKL file. ''' # If no amplitude provided, perform titration to find it if None in args: iA = args.index(None) new_args = [x for x in args if x is not None] Athr = self.titrate(*new_args) if np.isnan(Athr): logger.error('Could not find threshold excitation amplitude') return None new_args.insert(iA, Athr) args = new_args - # Simulate model, save inf file and return file path + # Simulate model, save file and return file path data, tcomp = self.simulate(*args) meta = self.meta(*args) meta['tcomp'] = tcomp - outpath = '{}/{}.pkl'.format(outdir, self.filecode(*args)) - with open(outpath, 'wb') as fh: + fpath = '{}/{}.pkl'.format(outdir, self.filecode(*args)) + with open(fpath, 'wb') as fh: pickle.dump({'meta': meta, 'data': data}, fh) - logger.debug('simulation data exported to "%s"', outpath) - return outpath + logger.debug('simulation data exported to "%s"', fpath) + return fpath + + def load(self, outdir, *args): + ''' Load output data for a specific parameters combination. ''' + + # Get file path from simulation parameters + fpath = '{}/{}.pkl'.format(outdir, self.filecode(*args)) + + # If output file does not exist, run simulation to generate it + if not os.path.isfile(fpath): + self.runAndSave(outdir, *args) + + # Return data and meta-data + return loadData(fpath) diff --git a/PySONIC/core/nbls.py b/PySONIC/core/nbls.py index 40bf83e..cc345a9 100644 --- a/PySONIC/core/nbls.py +++ b/PySONIC/core/nbls.py @@ -1,646 +1,647 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Date: 2016-09-29 16:16:19 # @Email: theo.lemaire@epfl.ch # @Last Modified by: Theo Lemaire -# @Last Modified time: 2019-06-02 13:22:11 +# @Last Modified time: 2019-06-02 15:27:53 from copy import deepcopy import logging import numpy as np import pandas as pd from scipy.interpolate import interp1d +from scipy.integrate import solve_ivp -from .simulators import PWSimulator, HybridSimulator, PeriodicSimulator +from .simulators import PWSimulator, HybridSimulator from .bls import BilayerSonophore from .pneuron import PointNeuron from .batches import createQueue from ..utils import * from ..constants import * from ..postpro import getFixedPoints class NeuronalBilayerSonophore(BilayerSonophore): ''' This class inherits from the BilayerSonophore class and receives an PointNeuron instance at initialization, to define the electro-mechanical NICE model and its SONIC variant. ''' tscale = 'ms' # relevant temporal scale of the model def __init__(self, a, neuron, Fdrive=None, embedding_depth=0.0): ''' Constructor of the class. :param a: in-plane radius of the sonophore structure within the membrane (m) :param neuron: neuron object :param Fdrive: frequency of acoustic perturbation (Hz) :param embedding_depth: depth of the embedding tissue around the membrane (m) ''' # Check validity of input parameters if not isinstance(neuron, PointNeuron): raise ValueError('Invalid neuron type: "{}" (must inherit from PointNeuron class)' .format(neuron.name)) self.neuron = neuron # Initialize BilayerSonophore parent object BilayerSonophore.__init__(self, a, neuron.Cm0, neuron.Cm0 * neuron.Vm0 * 1e-3, embedding_depth) def __repr__(self): s = '{}({:.1f} nm, {}'.format(self.__class__.__name__, self.a * 1e9, self.neuron) if self.d > 0.: s += ', d={}m'.format(si_format(self.d, precision=1, space=' ')) return s + ')' def params(self): params = super().params() params.update(self.neuron.params()) return params def getPltVars(self, wrapleft='df["', wrapright='"]'): pltvars = super().getPltVars(wrapleft, wrapright) pltvars.update(self.neuron.getPltVars(wrapleft, wrapright)) return pltvars def getPltScheme(self): return self.neuron.getPltScheme() - def filecode(self, Fdrive, Adrive, tstim, toffset, PRF, DC, method): + def filecode(self, Fdrive, Adrive, tstim, toffset, PRF, DC, method='sonic'): return 'ASTIM_{}_{}_{:.0f}nm_{:.0f}kHz_{:.2f}kPa_{:.0f}ms_{}{}'.format( self.neuron.name, 'CW' if DC == 1 else 'PW', self.a * 1e9, Fdrive * 1e-3, Adrive * 1e-3, tstim * 1e3, 'PRF{:.2f}Hz_DC{:.2f}%_'.format(PRF, DC * 1e2) if DC < 1. else '', method) def fullDerivatives(self, y, t, Adrive, Fdrive, phi): ''' Compute the derivatives of the (n+3) ODE full NBLS system variables. :param y: vector of state variables :param t: specific instant in time (s) :param Adrive: acoustic drive amplitude (Pa) :param Fdrive: acoustic drive frequency (Hz) :param phi: acoustic drive phase (rad) :return: vector of derivatives ''' dydt_mech = BilayerSonophore.derivatives(self, y[:3], t, Adrive, Fdrive, y[3], phi) dydt_elec = self.neuron.Qderivatives(y[3:], t, self.Capct(y[1])) return dydt_mech + dydt_elec def effDerivatives(self, y, t, lkp): ''' Compute the derivatives of the n-ODE effective HH system variables, based on 1-dimensional linear interpolation of "effective" coefficients that summarize the system's behaviour over an acoustic cycle. :param y: vector of HH system variables at time t :param t: specific instant in time (s) :param lkp: dictionary of 1D data points of "effective" coefficients over the charge domain, for specific frequency and amplitude values. :return: vector of effective system derivatives at time t ''' # Split input vector explicitly Qm, *states = y # Compute charge and channel states variation Vmeff = self.neuron.interpVmeff(Qm, lkp) dQmdt = - self.neuron.iNet(Vmeff, states) * 1e-3 dstates = self.neuron.derEffStates(Qm, states, lkp) # Return derivatives vector return [dQmdt, *[dstates[k] for k in self.neuron.states]] def interpEffVariable(self, key, Qm, stim, lkps1D): ''' Interpolate Q-dependent effective variable along solution. :param key: lookup variable key :param Qm: charge density solution vector :param stim: stimulation state solution vector :param lkps1D: dictionary of lookups for ON and OFF states :return: interpolated effective variable vector ''' x = np.zeros(stim.size) x[stim == 0] = np.interp( Qm[stim == 0], lkps1D['ON']['Q'], lkps1D['ON'][key], left=np.nan, right=np.nan) x[stim == 1] = np.interp( Qm[stim == 1], lkps1D['ON']['Q'], lkps1D['OFF'][key], left=np.nan, right=np.nan) return x def runFull(self, Fdrive, Adrive, tstim, toffset, PRF, DC, phi=np.pi): ''' Compute solutions of the full electro-mechanical system for a specific set of US stimulation parameters, using a classic integration scheme. The first iteration uses the quasi-steady simplification to compute the initiation of motion from a flat leaflet configuration. Afterwards, the ODE system is solved iteratively until completion. :param Fdrive: acoustic drive frequency (Hz) :param Adrive: acoustic drive amplitude (Pa) :param tstim: duration of US stimulation (s) :param toffset: duration of the offset (s) :param PRF: pulse repetition frequency (Hz) :param DC: pulse duty cycle (-) :param phi: acoustic drive phase (rad) :return: 2-tuple with the output dataframe and computation time. ''' # Determine time step dt = 1 / (NPC_FULL * Fdrive) # Compute non-zero deflection value for a small perturbation (solving quasi-steady equation) Pac = self.Pacoustic(dt, Adrive, Fdrive, phi) Z0 = self.balancedefQS(self.ng0, self.Qm0, Pac) # Set initial conditions steady_states = self.neuron.steadyStates(self.neuron.Vm0) y0 = np.concatenate(( [0., Z0, self.ng0, self.Qm0], [steady_states[k] for k in self.neuron.states])) # Initialize simulator and compute solution logger.debug('Computing detailed solution') simulator = PWSimulator( lambda y, t: self.fullDerivatives(y, t, Adrive, Fdrive, phi), lambda y, t: self.fullDerivatives(y, t, 0., 0., 0.)) (t, y, stim), tcomp = simulator( y0, dt, tstim, toffset, PRF, DC, print_progress=logger.getEffectiveLevel() <= logging.INFO, target_dt=CLASSIC_TARGET_DT, monitor_time=True) logger.debug('completed in %ss', si_format(tcomp, 1)) # Store output in dataframe data = pd.DataFrame({ 't': t, 'stimstate': stim, 'Z': y[:, 1], 'ng': y[:, 2], 'Qm': y[:, 3] }) data['Vm'] = data['Qm'].values / self.v_Capct(data['Z'].values) * 1e3 # mV for i in range(len(self.neuron.states)): data[self.neuron.states[i]] = y[:, i + 4] # Return dataframe and computation time return data, tcomp def runHybrid(self, Fdrive, Adrive, tstim, toffset, PRF, DC, phi=np.pi): ''' Compute solutions of the system for a specific set of US stimulation parameters, using a hybrid integration scheme. :param Fdrive: acoustic drive frequency (Hz) :param Adrive: acoustic drive amplitude (Pa) :param tstim: duration of US stimulation (s) :param toffset: duration of the offset (s) :param phi: acoustic drive phase (rad) :return: 3-tuple with the time profile, the solution matrix and a state vector ''' # Determine time steps dt_dense, dt_sparse = [1. / (n * Fdrive) for n in [NPC_FULL, NPC_HH]] # Compute non-zero deflection value for a small perturbation (solving quasi-steady equation) Pac = self.Pacoustic(dt_dense, Adrive, Fdrive, phi) Z0 = self.balancedefQS(self.ng0, self.Qm0, Pac) # Set initial conditions steady_states = self.neuron.steadyStates(self.neuron.Vm0) y0 = np.concatenate(( [0., Z0, self.ng0, self.Qm0], [steady_states[k] for k in self.neuron.states], )) is_dense_var = np.array([True] * 3 + [False] * (len(self.neuron.states) + 1)) # Initialize simulator and compute solution logger.debug('Computing hybrid solution') simulator = HybridSimulator( lambda y, t: self.fullDerivatives(y, t, Adrive, Fdrive, phi), lambda y, t: self.fullDerivatives(y, t, 0., 0., 0.), lambda t, y, Cm: self.neuron.Qderivatives(y, t, Cm), lambda yref: self.Capct(yref[1]), is_dense_var, ivars_to_check=[1, 2]) (t, y, stim), tcomp = simulator( y0, dt_dense, dt_sparse, 1. / Fdrive, tstim, toffset, PRF, DC, monitor_time=True) logger.debug('completed in %ss', si_format(tcomp, 1)) # Store output in dataframe data = pd.DataFrame({ 't': t, 'stimstate': stim, 'Z': y[:, 1], 'ng': y[:, 2], 'Qm': y[:, 3] }) data['Vm'] = data['Qm'].values / self.v_Capct(data['Z'].values) * 1e3 # mV for i in range(len(self.neuron.states)): data[self.neuron.states[i]] = y[:, i + 4] # Return dataframe and computation time return data, tcomp def computeEffVars(self, Fdrive, Adrive, Qm, fs): ''' Compute "effective" coefficients of the HH system for a specific combination of stimulus frequency, stimulus amplitude and charge density. A short mechanical simulation is run while imposing the specific charge density, until periodic stabilization. The HH coefficients are then averaged over the last acoustic cycle to yield "effective" coefficients. :param Fdrive: acoustic drive frequency (Hz) :param Adrive: acoustic drive amplitude (Pa) :param Qm: imposed charge density (C/m2) :param fs: list of sonophore membrane coverage fractions :return: list with computation time and a list of dictionaries of effective variables ''' # Run simulation and retrieve deflection and gas content vectors from last cycle data, tcomp = BilayerSonophore.simulate(self, Fdrive, Adrive, Qm) Z_last = data.loc[-NPC_FULL:, 'Z'].values # m Cm_last = self.v_Capct(Z_last) # F/m2 # For each coverage fraction effvars = [] for x in fs: # Compute membrane capacitance and membrane potential vectors Cm = x * Cm_last + (1 - x) * self.Cm0 # F/m2 Vm = Qm / Cm * 1e3 # mV # Compute average cycle value for membrane potential and rate constants effvars.append({'V': np.mean(Vm)}) effvars[-1].update(self.neuron.computeEffRates(Vm)) # Log process log = '{}: lookups @ {}Hz, {}Pa, {:.2f} nC/cm2'.format( self, *si_format([Fdrive, Adrive], precision=1, space=' '), Qm * 1e5) if len(fs) > 1: log += ', fs = {:.0f} - {:.0f}%'.format(fs.min() * 1e2, fs.max() * 1e2) log += ', tcomp = {:.3f} s'.format(tcomp) logger.info(log) # Return effective coefficients return [tcomp, effvars] def runSONIC(self, Fdrive, Adrive, tstim, toffset, PRF, DC): ''' Compute solutions of the system for a specific set of US stimulation parameters, using charge-predicted "effective" coefficients to solve the HH equations at each step. :param Fdrive: acoustic drive frequency (Hz) :param Adrive: acoustic drive amplitude (Pa) :param tstim: duration of US stimulation (s) :param toffset: duration of the offset (s) :param PRF: pulse repetition frequency (Hz) :param DC: pulse duty cycle (-) :return: 3-tuple with the time profile, the effective solution matrix and a state vector ''' # Load appropriate 2D lookups Aref, Qref, lkps2D, _ = getLookups2D(self.neuron.name, a=self.a, Fdrive=Fdrive) # Check that acoustic amplitude is within lookup range Adrive = isWithin('amplitude', Adrive, (Aref.min(), Aref.max())) # Interpolate 2D lookups at zero and US amplitude logger.debug('Interpolating lookups at A = %.2f kPa and A = 0', Adrive * 1e-3) lkps1D = {state: {key: interp1d(Aref, y2D, axis=0)(val) for key, y2D in lkps2D.items()} for state, val in {'ON': Adrive, 'OFF': 0.}.items()} # Add reference charge vector to 1D lookup dictionaries for state in lkps1D.keys(): lkps1D[state]['Q'] = Qref # Set initial conditions steady_states = self.neuron.steadyStates(self.neuron.Vm0) y0 = np.insert(np.array([steady_states[k] for k in self.neuron.states]), 0, self.Qm0) # Initialize simulator and compute solution logger.debug('Computing effective solution') simulator = PWSimulator( lambda y, t: self.effDerivatives(y, t, lkps1D['ON']), lambda y, t: self.effDerivatives(y, t, lkps1D['OFF'])) (t, y, stim), tcomp = simulator(y0, DT_EFF, tstim, toffset, PRF, DC, monitor_time=True) logger.debug('completed in %ss', si_format(tcomp, 1)) # Store output in dataframe data = pd.DataFrame({ 't': t, 'stimstate': stim, 'Qm': y[:, 0] }) for key in ['ng', 'V']: data[key] = self.interpEffVariable(key, data['Qm'].values, stim, lkps1D) data['Z'] = np.array([self.balancedefQS(ng, Qm) for ng, Qm in zip( data['ng'].values, data['Qm'].values)]) # m data['Vm'] = data['Qm'].values / self.v_Capct(data['Z'].values) * 1e3 # mV for i in range(len(self.neuron.states)): data[self.neuron.states[i]] = y[:, i + 1] # Return dataframe and computation time return data, tcomp def meta(self, Fdrive, Adrive, tstim, toffset, PRF, DC, method): ''' Return information about object and simulation parameters. :param Fdrive: US frequency (Hz) :param Adrive: acoustic drive amplitude (Pa) :param tstim: stimulus duration (s) :param toffset: stimulus offset (s) :param PRF: pulse repetition frequency (Hz) :param DC: stimulus duty cycle (-) :param method: integration method :return: meta-data dictionary ''' return { 'neuron': self.neuron.name, 'a': self.a, 'd': self.d, 'Fdrive': Fdrive, 'Adrive': Adrive, 'tstim': tstim, 'toffset': toffset, 'PRF': PRF, 'DC': DC, 'method': method } def simulate(self, Fdrive, Adrive, tstim, toffset, PRF=100., DC=1.0, method='sonic'): ''' Simulate the electro-mechanical model for a specific set of US stimulation parameters, and return output data in a dataframe. :param Fdrive: acoustic drive frequency (Hz) :param Adrive: acoustic drive amplitude (Pa) :param tstim: duration of US stimulation (s) :param toffset: duration of the offset (s) :param PRF: pulse repetition frequency (Hz) :param DC: pulse duty cycle (-) :param method: selected integration method :return: 2-tuple with the output dataframe and computation time. ''' logger.info( '%s: simulation @ f = %sHz, A = %sPa, t = %ss (%ss offset)%s', self, si_format(Fdrive, 0, space=' '), si_format(Adrive, 2, space=' '), *si_format([tstim, toffset], 1, space=' '), (', PRF = {}Hz, DC = {:.2f}%'.format( si_format(PRF, 2, space=' '), DC * 1e2) if DC < 1.0 else '')) # Check validity of stimulation parameters BilayerSonophore.checkInputs(self, Fdrive, Adrive, 0.0, 0.0) self.neuron.checkInputs(Adrive, tstim, toffset, PRF, DC) # Call appropriate simulation function try: simfunc = { 'full': self.runFull, 'hybrid': self.runHybrid, 'sonic': self.runSONIC }[method] except KeyError: raise ValueError('Invalid integration method: "{}"'.format(method)) data, tcomp = simfunc(Fdrive, Adrive, tstim, toffset, PRF, DC) # Log number of detected spikes nspikes = self.neuron.getNSpikes(data) logger.debug('{} spike{} detected'.format(nspikes, plural(nspikes))) # Return dataframe and computation time return data, tcomp @cache(os.path.join(os.path.split(__file__)[0], 'astim_titrations.log')) def titrate(self, Fdrive, tstim, toffset, PRF=100., DC=1., method='sonic', xfunc=None, Arange=None): ''' Use a binary search to determine the threshold amplitude needed to obtain neural excitation for a given frequency, duration, PRF and duty cycle. :param Fdrive: US frequency (Hz) :param tstim: duration of US stimulation (s) :param toffset: duration of the offset (s) :param PRF: pulse repetition frequency (Hz) :param DC: pulse duty cycle (-) :param method: integration method :param xfunc: function determining whether condition is reached from simulation output :param Arange: search interval for Adrive, iteratively refined :return: determined threshold amplitude (Pa) ''' # Default output function if xfunc is None: xfunc = self.neuron.titrationFunc # Default amplitude interval if Arange is None: Arange = (0, getLookups2D(self.neuron.name, a=self.a, Fdrive=Fdrive)[0].max()) return binarySearch( lambda x: xfunc(self.simulate(*x)[0]), [Fdrive, tstim, toffset, PRF, DC, method], 1, Arange, TITRATION_ASTIM_DA_MAX ) def simQueue(self, freqs, amps, durations, offsets, PRFs, DCs, method): ''' Create a serialized 2D array of all parameter combinations for a series of individual parameter sweeps, while avoiding repetition of CW protocols for a given PRF sweep. :param freqs: list (or 1D-array) of US frequencies :param amps: list (or 1D-array) of acoustic amplitudes :param durations: list (or 1D-array) of stimulus durations :param offsets: list (or 1D-array) of stimulus offsets (paired with durations array) :param PRFs: list (or 1D-array) of pulse-repetition frequencies :param DCs: list (or 1D-array) of duty cycle values :params method: integration method :return: list of parameters (list) for each simulation ''' if amps is None: amps = [np.nan] DCs = np.array(DCs) queue = [] if 1.0 in DCs: queue += createQueue(freqs, amps, durations, offsets, min(PRFs), 1.0) if np.any(DCs != 1.0): queue += createQueue(freqs, amps, durations, offsets, PRFs, DCs[DCs != 1.0]) for item in queue: if np.isnan(item[1]): item[1] = None item.append(method) return queue def quasiSteadyStates(self, Fdrive, amps=None, charges=None, DCs=1.0, squeeze_output=False): ''' Compute the quasi-steady state values of the neuron's gating variables for a combination of US amplitudes, charge densities and duty cycles, at a specific US frequency. :param Fdrive: US frequency (Hz) :param amps: US amplitudes (Pa) :param charges: membrane charge densities (C/m2) :param DCs: duty cycle value(s) :return: 4-tuple with reference values of US amplitude and charge density, as well as interpolated Vmeff and QSS gating variables ''' # Get DC-averaged lookups interpolated at the appropriate amplitudes and charges amps, charges, lookups = getLookupsDCavg( self.neuron.name, self.a, Fdrive, amps, charges, DCs) # Compute QSS states using these lookups nA, nQ, nDC = lookups['V'].shape QSS = {k: np.empty((nA, nQ, nDC)) for k in self.neuron.states} for iA in range(nA): for iDC in range(nDC): QSS_1D = self.neuron.quasiSteadyStates( {k: v[iA, :, iDC] for k, v in lookups.items()}) for k in QSS.keys(): QSS[k][iA, :, iDC] = QSS_1D[k] # Compress outputs if needed if squeeze_output: QSS = {k: v.squeeze() for k, v in QSS.items()} lookups = {k: v.squeeze() for k, v in lookups.items()} # Return reference inputs and outputs return amps, charges, lookups, QSS def iNetQSS(self, Qm, Fdrive, Adrive, DC): ''' Compute quasi-steady state net membrane current for a given combination of US parameters and a given membrane charge density. :param Qm: membrane charge density (C/m2) :param Fdrive: US frequency (Hz) :param Adrive: US amplitude (Pa) :param DC: duty cycle (-) :return: net membrane current (mA/m2) ''' _, _, lookups, QSS = self.quasiSteadyStates( Fdrive, amps=Adrive, charges=Qm, DCs=DC, squeeze_output=True) return self.neuron.iNet(lookups['V'], np.array(list(QSS.values()))) # mA/m2 def evaluateStability(self, Qm0, states0, lkp): ''' Integrate the effective differential system from a given starting point, until clear convergence or clear divergence is found. :param Qm0: initial membrane charge density (C/m2) :param states0: dictionary of initial states values :param lkp: dictionary of 1D data points of "effective" coefficients over the charge domain, for specific frequency and amplitude values. :return: boolean indicating convergence state ''' # Initialize y0 vector - # t0 = 0. + t0 = 0. y0 = np.array([Qm0] + list(states0.values())) - # Initialize simulator and compute solution - simulator = PeriodicSimulator( - lambda y, t: self.effDerivatives(y, t, lkp), - ivars_to_check=[0]) - simulator.stopfunc = simulator.isAsymptoticallyStable - nmax = int(QSS_HISTORY_INTERVAL // QSS_INTEGRATION_INTERVAL) - t, y, stim = simulator.compute(y0, DT_EFF, QSS_INTEGRATION_INTERVAL, nmax=nmax) - logger.debug('completed in %ss', si_format(tcomp, 1)) - conv = t[-1] < QSS_HISTORY_INTERVAL - - # # Initializing empty list to record evolution of charge deviation - # n = int(QSS_HISTORY_INTERVAL // QSS_INTEGRATION_INTERVAL) # size of history - # dQ = [] - - # # As long as there is no clear charge convergence or divergence - # conv, div = False, False - # tf, yf = t0, y0 - # while not conv and not div: - - # # Integrate system for small interval and retrieve final charge deviation - # t0, y0 = tf, yf - # sol = solve_ivp( - # lambda t, y: self.effDerivatives(y, t, lkp), - # [t0, t0 + QSS_INTEGRATION_INTERVAL], y0, - # method='LSODA' - # ) - # tf, yf = sol.t[-1], sol.y[:, -1] - # dQ.append(yf[0] - Qm0) - - # # logger.debug('{:.0f} ms: dQ = {:.5f} nC/cm2, avg dQ = {:.5f} nC/cm2'.format( - # # tf * 1e3, dQ[-1] * 1e5, np.mean(dQ[-n:]) * 1e5)) - - # # If last charge deviation is too large -> divergence - # if np.abs(dQ[-1]) > QSS_Q_DIV_THR: - # div = True - - # # If last charge deviation or average deviation in recent history - # # is small enough -> convergence - # for x in [dQ[-1], np.mean(dQ[-n:])]: - # if np.abs(x) < QSS_Q_CONV_THR: - # conv = True - - # # If max integration duration is been reached -> error - # if tf > QSS_MAX_INTEGRATION_DURATION: - # raise ValueError('too many iterations') - - # logger.debug('{}vergence after {:.0f} ms: dQ = {:.5f} nC/cm2'.format( - # {True: 'con', False: 'di'}[conv], tf * 1e3, dQ[-1] * 1e5)) + # # Initialize simulator and compute solution + # simulator = PeriodicSimulator( + # lambda y, t: self.effDerivatives(y, t, lkp), + # ivars_to_check=[0]) + # simulator.stopfunc = simulator.isAsymptoticallyStable + # nmax = int(QSS_HISTORY_INTERVAL // QSS_INTEGRATION_INTERVAL) + # t, y, stim = simulator.compute(y0, DT_EFF, QSS_INTEGRATION_INTERVAL, nmax=nmax) + # logger.debug('completed in %ss', si_format(tcomp, 1)) + # conv = t[-1] < QSS_HISTORY_INTERVAL + + # Initializing empty list to record evolution of charge deviation + n = int(QSS_HISTORY_INTERVAL // QSS_INTEGRATION_INTERVAL) # size of history + dQ = [] + + # As long as there is no clear charge convergence or divergence + conv, div = False, False + tf, yf = t0, y0 + while not conv and not div: + + # Integrate system for small interval and retrieve final charge deviation + t0, y0 = tf, yf + sol = solve_ivp( + lambda t, y: self.effDerivatives(y, t, lkp), + [t0, t0 + QSS_INTEGRATION_INTERVAL], y0, + method='LSODA' + ) + tf, yf = sol.t[-1], sol.y[:, -1] + dQ.append(yf[0] - Qm0) + + # logger.debug('{:.0f} ms: dQ = {:.5f} nC/cm2, avg dQ = {:.5f} nC/cm2'.format( + # tf * 1e3, dQ[-1] * 1e5, np.mean(dQ[-n:]) * 1e5)) + + # If last charge deviation is too large -> divergence + if np.abs(dQ[-1]) > QSS_Q_DIV_THR: + div = True + + # If last charge deviation or average deviation in recent history + # is small enough -> convergence + for x in [dQ[-1], np.mean(dQ[-n:])]: + if np.abs(x) < QSS_Q_CONV_THR: + conv = True + + # If max integration duration is been reached -> error + if tf > QSS_MAX_INTEGRATION_DURATION: + raise ValueError('too many iterations') + + logger.debug('{}vergence after {:.0f} ms: dQ = {:.5f} nC/cm2'.format( + {True: 'con', False: 'di'}[conv], tf * 1e3, dQ[-1] * 1e5)) return conv def fixedPointsQSS(self, Fdrive, Adrive, DC, lkp, dQdt): ''' Compute QSS fixed points along the charge dimension for a given combination of US parameters, and determine their stability. :param Fdrive: US frequency (Hz) :param Adrive: US amplitude (Pa) :param DC: duty cycle (-) :param lkp: lookup dictionary for effective variables along charge dimension :param dQdt: charge derivative profile along charge dimension :return: 2-tuple with values of stable and unstable fixed points ''' logger.debug('A = {:.2f} kPa, DC = {:.0f}%'.format(Adrive * 1e-3, DC * 1e2)) # Extract stable and unstable fixed points from QSS charge variation profile dfunc = lambda Qm: - self.iNetQSS(Qm, Fdrive, Adrive, DC) SFP_candidates = getFixedPoints(lkp['Q'], dQdt, filter='stable', der_func=dfunc).tolist() UFPs = getFixedPoints(lkp['Q'], dQdt, filter='unstable', der_func=dfunc).tolist() SFPs = [] pltvars = self.getPltVars() # For each candidate SFP for i, Qm in enumerate(SFP_candidates): logger.debug('Q-SFP = {:.2f} nC/cm2'.format(Qm * 1e5)) # Re-compute QSS *_, QSS_FP = self.quasiSteadyStates(Fdrive, amps=Adrive, charges=Qm, DCs=DC, squeeze_output=True) # Simulate from unperturbed QSS and evaluate stability if not self.evaluateStability(Qm, QSS_FP, lkp): logger.warning('diverging system at ({:.2f} kPa, {:.2f} nC/cm2)'.format( Adrive * 1e-3, Qm * 1e5)) UFPs.append(Qm) else: # For each state unstable_states = [] for x in self.neuron.states: pltvar = pltvars[x] unit_str = pltvar.get('unit', '') factor = pltvar.get('factor', 1) is_stable_direction = [] for sign in [-1, +1]: # Perturb state with small offset QSS_perturbed = deepcopy(QSS_FP) QSS_perturbed[x] *= (1 + sign * QSS_REL_OFFSET) # If gating state, bound within [0., 1.] if self.neuron.isVoltageGated(x): QSS_perturbed[x] = np.clip(QSS_perturbed[x], 0., 1.) logger.debug('{}: {:.5f} -> {:.5f} {}'.format( x, QSS_FP[x] * factor, QSS_perturbed[x] * factor, unit_str)) # Simulate from perturbed QSS and evaluate stability is_stable_direction.append( self.evaluateStability(Qm, QSS_perturbed, lkp)) # Check if system shows stability upon x-state perturbation # in both directions if not np.all(is_stable_direction): unstable_states.append(x) # Classify fixed point as stable only if all states show stability is_stable_FP = len(unstable_states) == 0 {True: SFPs, False: UFPs}[is_stable_FP].append(Qm) logger.info('{}stable fixed-point at ({:.2f} kPa, {:.2f} nC/cm2){}'.format( '' if is_stable_FP else 'un', Adrive * 1e-3, Qm * 1e5, '' if is_stable_FP else ', caused by {} states'.format(unstable_states))) return SFPs, UFPs diff --git a/PySONIC/core/pneuron.py b/PySONIC/core/pneuron.py index 80b64ca..e66fa63 100644 --- a/PySONIC/core/pneuron.py +++ b/PySONIC/core/pneuron.py @@ -1,588 +1,588 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Date: 2017-08-03 11:53:04 # @Email: theo.lemaire@epfl.ch # @Last Modified by: Theo Lemaire -# @Last Modified time: 2019-06-02 13:21:40 +# @Last Modified time: 2019-06-02 15:38:45 import abc import inspect import re import numpy as np import pandas as pd from .batches import createQueue from .model import Model from .simulators import PWSimulator from ..postpro import findPeaks from ..constants import * from ..utils import si_format, logger, plural, binarySearch class PointNeuron(Model): ''' Generic point-neuron model interface. ''' tscale = 'ms' # relevant temporal scale of the model def __init__(self): self.Qm0 = self.Cm0 * self.Vm0 * 1e-3 # C/cm2 def __repr__(self): return self.__class__.__name__ def filecode(self, Astim, tstim, toffset, PRF, DC): ''' File naming convention. ''' return 'ESTIM_{}_{}_{:.1f}mA_per_m2_{:.0f}ms{}'.format( self.name, 'CW' if DC == 1 else 'PW', Astim, tstim * 1e3, '_PRF{:.2f}Hz_DC{:.2f}%'.format(PRF, DC * 1e2) if DC < 1. else '') @property @abc.abstractmethod def name(self): raise NotImplementedError @property @abc.abstractmethod def Cm0(self): raise NotImplementedError @property @abc.abstractmethod def Vm0(self): raise NotImplementedError @abc.abstractmethod def currents(self, Vm, states): ''' Compute all ionic currents per unit area. :param Vm: membrane potential (mV) :states: state probabilities of the ion channels :return: dictionary of ionic currents per unit area (mA/m2) ''' def iNet(self, Vm, states): ''' net membrane current :param Vm: membrane potential (mV) :states: states of ion channels gating and related variables :return: current per unit area (mA/m2) ''' return sum(self.currents(Vm, states).values()) def dQdt(self, Vm, states): ''' membrane charge density variation rate :param Vm: membrane potential (mV) :states: states of ion channels gating and related variables :return: variation rate (mA/m2) ''' return -self.iNet(Vm, states) def titrationFunc(self, *args, **kwargs): ''' Default titration function. ''' return self.isExcited(*args, **kwargs) def currentToConcentrationRate(self, z_ion, depth): ''' Compute the conversion factor from a specific ionic current (in mA/m2) into a variation rate of submembrane ion concentration (in M/s). :param: z_ion: ion valence :param depth: submembrane depth (m) :return: conversion factor (Mmol.m-1.C-1) ''' return 1e-6 / (z_ion * depth * FARADAY) def nernst(self, z_ion, Cion_in, Cion_out, T): ''' Nernst potential of a specific ion given its intra and extracellular concentrations. :param z_ion: ion valence :param Cion_in: intracellular ion concentration :param Cion_out: extracellular ion concentration :param T: temperature (K) :return: ion Nernst potential (mV) ''' return (Rg * T) / (z_ion * FARADAY) * np.log(Cion_out / Cion_in) * 1e3 def vtrap(self, x, y): ''' Generic function used to compute rate constants. ''' return x / (np.exp(x / y) - 1) def efun(self, x): ''' Generic function used to compute rate constants. ''' return x / (np.exp(x) - 1) def ghkDrive(self, Vm, Z_ion, Cion_in, Cion_out, T): ''' Use the Goldman-Hodgkin-Katz equation to compute the electrochemical driving force of a specific ion species for a given membrane potential. :param Vm: membrane potential (mV) :param Cin: intracellular ion concentration (M) :param Cout: extracellular ion concentration (M) :param T: temperature (K) :return: electrochemical driving force of a single ion particle (mC.m-3) ''' x = Z_ion * FARADAY * Vm / (Rg * T) * 1e-3 # [-] eCin = Cion_in * self.efun(-x) # M eCout = Cion_out * self.efun(x) # M return FARADAY * (eCin - eCout) * 1e6 # mC/m3 def getCurrentsNames(self): return list(self.currents(np.nan, [np.nan] * len(self.states)).keys()) def getPltScheme(self): pltscheme = { 'Q_m': ['Qm'], 'V_m': ['Vm'] } pltscheme['I'] = self.getCurrentsNames() + ['iNet'] for cname in self.getCurrentsNames(): if 'Leak' not in cname: key = 'i_{{{}}}\ kin.'.format(cname[1:]) cargs = inspect.getargspec(getattr(self, cname))[0][1:] pltscheme[key] = [var for var in cargs if var not in ['Vm', 'Cai']] return pltscheme def getPltVars(self, wrapleft='df["', wrapright='"]'): ''' Return a dictionary with information about all plot variables related to the neuron. ''' pltvars = { 'Qm': { 'desc': 'membrane charge density', 'label': 'Q_m', 'unit': 'nC/cm^2', 'factor': 1e5, 'bounds': (-100, 50) }, 'Vm': { 'desc': 'membrane potential', 'label': 'V_m', 'unit': 'mV', 'y0': self.Vm0, 'bounds': (-150, 70) }, 'ELeak': { 'constant': 'obj.ELeak', 'desc': 'non-specific leakage current resting potential', 'label': 'V_{leak}', 'unit': 'mV', 'ls': '--', 'color': 'k' } } for cname in self.getCurrentsNames(): cfunc = getattr(self, cname) cargs = inspect.getargspec(cfunc)[0][1:] pltvars[cname] = { 'desc': inspect.getdoc(cfunc).splitlines()[0], 'label': 'I_{{{}}}'.format(cname[1:]), 'unit': 'A/m^2', 'factor': 1e-3, 'func': '{}({})'.format(cname, ', '.join(['{}{}{}'.format(wrapleft, a, wrapright) for a in cargs])) } for var in cargs: if var not in ['Vm', 'Cai']: vfunc = getattr(self, 'der{}{}'.format(var[0].upper(), var[1:])) desc = cname + re.sub( '^Evolution of', '', inspect.getdoc(vfunc).splitlines()[0]) pltvars[var] = { 'desc': desc, 'label': var, 'bounds': (-0.1, 1.1) } pltvars['iNet'] = { 'desc': inspect.getdoc(getattr(self, 'iNet')).splitlines()[0], 'label': 'I_{net}', 'unit': 'A/m^2', 'factor': 1e-3, 'func': 'iNet({0}Vm{1}, {2}{3}{4}.values.T)'.format( wrapleft, wrapright, wrapleft[:-1], self.states, wrapright[1:]), 'ls': '--', 'color': 'black' } pltvars['dQdt'] = { 'desc': inspect.getdoc(getattr(self, 'dQdt')).splitlines()[0], 'label': 'dQ_m/dt', 'unit': 'A/m^2', 'factor': 1e-3, 'func': 'dQdt({0}Vm{1}, {2}{3}{4}.values.T)'.format( wrapleft, wrapright, wrapleft[:-1], self.states, wrapright[1:]), 'ls': '--', 'color': 'black' } for x in self.getGates(): for rate in ['alpha', 'beta']: pltvars['{}{}'.format(rate, x)] = { 'label': '\\{}_{{{}}}'.format(rate, x), 'unit': 'ms^{-1}', 'factor': 1e-3 } return pltvars def getRatesNames(self, states): ''' Return a list of names of the alpha and beta rates of the neuron. ''' return list(sum( [['alpha{}'.format(x.lower()), 'beta{}'.format(x.lower())] for x in states], [])) @abc.abstractmethod def steadyStates(self, Vm): ''' Compute the steady-state values for a specific membrane potential value. :param Vm: membrane potential (mV) :return: dictionary of steady-states ''' @abc.abstractmethod def derStates(self, Vm, states): ''' Compute the derivatives of channel states. :param Vm: membrane potential (mV) :states: state probabilities of the ion channels :return: current per unit area (mA/m2) ''' @abc.abstractmethod def computeEffRates(self, Vm): ''' Get the effective rate constants of ion channels, averaged along an acoustic cycle, for future use in effective simulations. :param Vm: array of membrane potential values for an acoustic cycle (mV) :return: a dictionary of rate average constants (s-1) ''' def interpEffRates(self, Qm, lkp, keys=None): ''' Interpolate effective rate constants for a given charge density using reference lookup vectors. :param Qm: membrane charge density (C/m2) :states: state probabilities of the ion channels :param lkp: dictionary of 1D vectors of "effective" coefficients over the charge domain, for specific frequency and amplitude values. :return: dictionary of interpolated rate constants ''' if keys is None: keys = self.rates return {k: np.interp(Qm, lkp['Q'], lkp[k], left=np.nan, right=np.nan) for k in keys} def interpVmeff(self, Qm, lkp): ''' Interpolate the effective membrane potential for a given charge density using reference lookup vectors. :param Qm: membrane charge density (C/m2) :param lkp: dictionary of 1D vectors of "effective" coefficients over the charge domain, for specific frequency and amplitude values. :return: dictionary of interpolated rate constants ''' return np.interp(Qm, lkp['Q'], lkp['V'], left=np.nan, right=np.nan) @abc.abstractmethod def derEffStates(self, Qm, states, lkp): ''' Compute the effective derivatives of channel states, based on 1-dimensional linear interpolation of "effective" coefficients that summarize the system's behaviour over an acoustic cycle. :param Qm: membrane charge density (C/m2) :states: state probabilities of the ion channels :param lkp: dictionary of 1D vectors of "effective" coefficients over the charge domain, for specific frequency and amplitude values. ''' def Qbounds(self): ''' Determine bounds of membrane charge physiological range for a given neuron. ''' return np.array([np.round(self.Vm0 - 25.0), 50.0]) * self.Cm0 * 1e-3 # C/m2 def isVoltageGated(self, state): ''' Determine whether a given state is purely voltage-gated or not.''' return 'alpha{}'.format(state.lower()) in self.rates def getGates(self): ''' Retrieve the names of the neuron's states that match an ion channel gating. ''' gates = [] for x in self.states: if self.isVoltageGated(x): gates.append(x) return gates def qsStates(self, lkp, states): ''' Compute a collection of quasi steady states using the standard xinf = ax / (ax + Bx) equation. :param lkp: dictionary of 1D vectors of "effective" coefficients over the charge domain, for specific frequency and amplitude values. :return: dictionary of quasi-steady states ''' return { x: lkp['alpha{}'.format(x)] / (lkp['alpha{}'.format(x)] + lkp['beta{}'.format(x)]) for x in states } @abc.abstractmethod def quasiSteadyStates(self, lkp): ''' Compute the quasi-steady states of a neuron for a range of membrane charge densities, based on 1-dimensional lookups interpolated at a given sonophore diameter, US frequency, US amplitude and duty cycle. :param lkp: dictionary of 1D vectors of "effective" coefficients over the charge domain, for specific frequency and amplitude values. :return: dictionary of quasi-steady states ''' def getRates(self, Vm): ''' Compute the ion channels rate constants for a given membrane potential. :param Vm: membrane potential (mV) :return: a dictionary of rate constants and their values at the given potential. ''' rates = {} for x in self.getGates(): x = x.lower() alpha_str, beta_str = ['{}{}'.format(s, x.lower()) for s in ['alpha', 'beta']] inf_str, tau_str = ['{}inf'.format(x.lower()), 'tau{}'.format(x.lower())] if hasattr(self, 'alpha{}'.format(x)): alphax = getattr(self, alpha_str)(Vm) betax = getattr(self, beta_str)(Vm) elif hasattr(self, '{}inf'.format(x)): xinf = getattr(self, inf_str)(Vm) taux = getattr(self, tau_str)(Vm) alphax = xinf / taux betax = 1 / taux - alphax rates[alpha_str] = alphax rates[beta_str] = betax return rates def Vderivatives(self, y, t, Iinj): ''' Compute the derivatives of a V-cast HH system for a specific value of injected current. :param y: vector of HH system variables at time t :param t: time value (s, unused) :param Iinj: injected current (mA/m2) :return: vector of HH system derivatives at time t ''' Vm, *states = y Iionic = self.iNet(Vm, states) # mA/m2 dVmdt = (- Iionic + Iinj) / self.Cm0 # mV/s dstates = self.derStates(Vm, states) return [dVmdt, *[dstates[k] for k in self.states]] def Qderivatives(self, y, t, Cm=None): ''' Compute the derivatives of the n-ODE HH system variables, based on a value of membrane capacitance. :param y: vector of HH system variables at time t :param t: specific instant in time (s) :param Cm: membrane capacitance (F/m2) :return: vector of HH system derivatives at time t ''' if Cm is None: Cm = self.Cm0 Qm, *states = y Vm = Qm / Cm * 1e3 # mV dQmdt = - self.iNet(Vm, states) * 1e-3 # A/m2 dstates = self.derStates(Vm, states) return [dQmdt, *[dstates[k] for k in self.states]] def checkInputs(self, Astim, tstim, toffset, PRF, DC): ''' Check validity of electrical stimulation parameters. :param Astim: pulse amplitude (mA/m2) :param tstim: pulse duration (s) :param toffset: offset duration (s) :param PRF: pulse repetition frequency (Hz) :param DC: pulse duty cycle (-) ''' # Check validity of stimulation parameters if not all(isinstance(param, float) for param in [Astim, tstim, toffset, DC]): raise TypeError('Invalid stimulation parameters (must be float typed)') if tstim <= 0: raise ValueError('Invalid stimulus duration: {} ms (must be strictly positive)' .format(tstim * 1e3)) if toffset < 0: raise ValueError('Invalid stimulus offset: {} ms (must be positive or null)' .format(toffset * 1e3)) if DC <= 0.0 or DC > 1.0: raise ValueError('Invalid duty cycle: {} (must be within ]0; 1])'.format(DC)) if DC < 1.0: if not isinstance(PRF, float): raise TypeError('Invalid PRF value (must be float typed)') if PRF is None: raise AttributeError('Missing PRF value (must be provided when DC < 1)') if PRF < 1 / tstim: raise ValueError('Invalid PRF: {} Hz (PR interval exceeds stimulus duration)' .format(PRF)) def meta(self, Astim, tstim, toffset, PRF, DC): ''' Return information about object and simulation parameters. :param Astim: stimulus amplitude (mA/m2) :param tstim: stimulus duration (s) :param toffset: stimulus offset (s) :param PRF: pulse repetition frequency (Hz) :param DC: stimulus duty cycle (-) :return: meta-data dictionary ''' return { 'neuron': self.name, 'Astim': Astim, 'tstim': tstim, 'toffset': toffset, 'PRF': PRF, 'DC': DC } def simulate(self, Astim, tstim, toffset, PRF=100., DC=1.0): ''' Simulate a specific neuron model for a specific set of electrical parameters, and return output data in a dataframe. :param Astim: pulse amplitude (mA/m2) :param tstim: pulse duration (s) :param toffset: offset duration (s) :param PRF: pulse repetition frequency (Hz) :param DC: pulse duty cycle (-) :return: 2-tuple with the output dataframe and computation time. ''' logger.info( '%s: simulation @ A = %sA/m2, t = %ss (%ss offset)%s', self, si_format(Astim, 2, space=' '), *si_format([tstim, toffset], 1, space=' '), (', PRF = {}Hz, DC = {:.2f}%'.format( si_format(PRF, 2, space=' '), DC * 1e2) if DC < 1.0 else '')) # Check validity of stimulation parameters self.checkInputs(Astim, tstim, toffset, PRF, DC) # Set initial conditions steady_states = self.steadyStates(self.Vm0) y0 = np.array([self.Vm0, *[steady_states[k] for k in self.states]]) # Initialize simulator and compute solution logger.debug('Computing solution') simulator = PWSimulator( lambda y, t: self.Vderivatives(y, t, Astim), lambda y, t: self.Vderivatives(y, t, 0.)) (t, y, stim), tcomp = simulator(y0, DT_ESTIM, tstim, toffset, PRF, DC, monitor_time=True) logger.debug('completed in %ss', si_format(tcomp, 1)) # Store output in dataframe data = pd.DataFrame({ 't': t, 'stimstate': stim, 'Vm': y[:, 0], 'Qm': y[:, 0] * self.Cm0 * 1e-3 }) data['Qm'] = data['Vm'].values * self.Cm0 * 1e-3 for i in range(len(self.states)): data[self.states[i]] = y[:, i + 1] # Log number of detected spikes nspikes = self.getNSpikes(data) logger.debug('{} spike{} detected'.format(nspikes, plural(nspikes))) # Return dataframe and computation time return data, tcomp def simQueue(self, amps, durations, offsets, PRFs, DCs): ''' Create a serialized 2D array of all parameter combinations for a series of individual parameter sweeps, while avoiding repetition of CW protocols for a given PRF sweep. :param amps: list (or 1D-array) of acoustic amplitudes :param durations: list (or 1D-array) of stimulus durations :param offsets: list (or 1D-array) of stimulus offsets (paired with durations array) :param PRFs: list (or 1D-array) of pulse-repetition frequencies :param DCs: list (or 1D-array) of duty cycle values :return: list of parameters (list) for each simulation ''' if amps is None: amps = [np.nan] DCs = np.array(DCs) queue = [] if 1.0 in DCs: queue += createQueue(amps, durations, offsets, min(PRFs), 1.0) if np.any(DCs != 1.0): queue += createQueue(amps, durations, offsets, PRFs, DCs[DCs != 1.0]) for item in queue: if np.isnan(item[0]): item[0] = None return queue def getNSpikes(self, data): ''' Compute number of spikes in charge profile of simulation output. :param data: dataframe containing output time series :return: number of detected spikes ''' dt = np.diff(data.ix[:1, 't'].values)[0] ipeaks, *_ = findPeaks( data['Qm'].values, SPIKE_MIN_QAMP, int(np.ceil(SPIKE_MIN_DT / dt)), SPIKE_MIN_QPROM ) return ipeaks.size def getStabilizationValue(self, data): ''' Determine stabilization value from the charge profile of a simulation output. :param data: dataframe containing output time series :return: charge stabilization value (or np.nan if no stabilization detected) ''' # Extract charge signal posterior to observation window t, Qm = [data[key].values for key in ['t', 'Qm']] - Qm = y[2, t > TMIN_STABILIZATION] + Qm = Qm[t > TMIN_STABILIZATION] # Compute variation range Qm_range = np.ptp(Qm) - logger.debug('%.2f nC/cm2 variation range over the last %.0f ms', - Qm_range * 1e5, TMIN_STABILIZATION * 1e3) + logger.debug('%.2f nC/cm2 variation range over the last %.0f ms, Qmf = %.2f nC/cm2', + Qm_range * 1e5, TMIN_STABILIZATION * 1e3, Qm[-1] * 1e5) # Return final value only if stabilization is detected if np.ptp(Qm) < QSS_Q_DIV_THR: return Qm[-1] else: return np.nan def isExcited(self, data): ''' Determine if neuron is excited from simulation output. :param data: dataframe containing output time series :return: boolean stating whether neuron is excited or not ''' return self.getNSpikes(data) > 0 def isSilenced(self, data): ''' Determine if neuron is silenced from simulation output. :param data: dataframe containing output time series :return: boolean stating whether neuron is silenced or not ''' return not np.isinan(self.getStabilizationValue(data)) def titrate(self, tstim, toffset, PRF, DC, xfunc=None, Arange=(0., 2 * TITRATION_ESTIM_A_MAX)): ''' Use a binary search to determine the threshold amplitude needed to obtain neural excitation for a given duration, PRF and duty cycle. :param tstim: duration of US stimulation (s) :param toffset: duration of the offset (s) :param PRF: pulse repetition frequency (Hz) :param DC: pulse duty cycle (-) :param xfunc: function determining whether condition is reached from simulation output :param Arange: search interval for Astim, iteratively refined :return: excitation threshold amplitude (mA/m2) ''' # Default output function if xfunc is None: xfunc = self.titrationFunc return binarySearch( lambda x: xfunc(self.simulate(*x)[0]), [tstim, toffset, PRF, DC], 0, Arange, TITRATION_ESTIM_DA_MAX ) diff --git a/PySONIC/plt/QSS.py b/PySONIC/plt/QSS.py index d6951e3..510ca18 100644 --- a/PySONIC/plt/QSS.py +++ b/PySONIC/plt/QSS.py @@ -1,464 +1,454 @@ import inspect +import logging import pandas as pd import numpy as np import matplotlib.pyplot as plt from matplotlib import cm, colors from ..postpro import getFixedPoints from ..core import NeuronalBilayerSonophore, Batch from .pltutils import * from ..utils import logger def plotVarQSSDynamics(neuron, a, Fdrive, Adrive, charges, varname, varrange, fs=12): ''' Plot the QSS-approximated derivative of a specific variable as function of the variable itself, as well as equilibrium values, for various membrane charge densities at a given acoustic amplitude. :param neuron: neuron object :param a: sonophore radius (m) :param Fdrive: US frequency (Hz) :param Adrive: US amplitude (Pa) :param charges: charge density vector (C/m2) :param varname: name of variable to plot :param varrange: range over which to compute the derivative :return: figure handle ''' # Extract information about variable to plot pltvar = neuron.getPltVars()[varname] # Get methods to compute derivative and steady-state of variable of interest derX_func = getattr(neuron, 'der{}{}'.format(varname[0].upper(), varname[1:])) Xinf_func = getattr(neuron, '{}inf'.format(varname)) derX_args = inspect.getargspec(derX_func)[0][1:] Xinf_args = inspect.getargspec(Xinf_func)[0][1:] # Get dictionary of charge and amplitude dependent QSS variables nbls = NeuronalBilayerSonophore(a, neuron, Fdrive) _, Qref, lookups, QSS = nbls.quasiSteadyStates( Fdrive, amps=Adrive, charges=charges, squeeze_output=True) df = QSS df['Vm'] = lookups['V'] # Create figure fig, ax = plt.subplots(figsize=(6, 4)) ax.set_title('{} neuron - QSS {} dynamics @ {:.2f} kPa'.format( neuron.name, pltvar['desc'], Adrive * 1e-3), fontsize=fs) ax.set_xscale('log') for key in ['top', 'right']: ax.spines[key].set_visible(False) ax.set_xlabel('$\\rm {}\ ({})$'.format(pltvar['label'], pltvar.get('unit', '')), fontsize=fs) ax.set_ylabel('$\\rm QSS\ d{}/dt\ ({}/s)$'.format(pltvar['label'], pltvar.get('unit', '1')), fontsize=fs) ax.set_ylim(-40, 40) ax.axhline(0, c='k', linewidth=0.5) y0_str = '{}0'.format(varname) if hasattr(neuron, y0_str): ax.axvline(getattr(neuron, y0_str) * pltvar.get('factor', 1), label=y0_str, c='k', linewidth=0.5) # For each charge value icolor = 0 for j, Qm in enumerate(charges): lbl = 'Q = {:.0f} nC/cm2'.format(Qm * 1e5) # Compute variable derivative as a function of its value, as well as equilibrium value, # keeping other variables at quasi steady-state derX_inputs = [varrange if arg == varname else df[arg][j] for arg in derX_args] Xinf_inputs = [df[arg][j] for arg in Xinf_args] dX_QSS = neuron.derCai(*derX_inputs) Xeq_QSS = neuron.Caiinf(*Xinf_inputs) # Plot variable derivative and its root as a function of the variable itself c = 'C{}'.format(icolor) ax.plot(varrange * pltvar.get('factor', 1), dX_QSS * pltvar.get('factor', 1), c=c, label=lbl) ax.axvline(Xeq_QSS * pltvar.get('factor', 1), linestyle='--', c=c) icolor += 1 ax.legend(frameon=False, fontsize=fs - 3) for item in ax.get_xticklabels() + ax.get_yticklabels(): item.set_fontsize(fs) fig.tight_layout() fig.canvas.set_window_title('{}_QSS_{}_dynamics_{:.2f}kPa'.format( neuron.name, varname, Adrive * 1e-3)) return fig def plotQSSvars(neuron, a, Fdrive, Adrive, fs=12): ''' Plot effective membrane potential, quasi-steady states and resulting membrane currents as a function of membrane charge density, for a given acoustic amplitude. :param neuron: neuron object :param a: sonophore radius (m) :param Fdrive: US frequency (Hz) :param Adrive: US amplitude (Pa) :return: figure handle ''' # Get neuron-specific pltvars pltvars = neuron.getPltVars() # Compute neuron-specific charge and amplitude dependent QS states at this amplitude nbls = NeuronalBilayerSonophore(a, neuron, Fdrive) _, Qref, lookups, QSS = nbls.quasiSteadyStates(Fdrive, amps=Adrive, squeeze_output=True) Vmeff = lookups['V'] # Compute QSS currents currents = neuron.currents(Vmeff, np.array([QSS[k] for k in neuron.states])) iNet = sum(currents.values()) # Compute fixed points in dQdt profile dQdt = -iNet Q_SFPs = getFixedPoints(Qref, dQdt, filter='stable') Q_UFPs = getFixedPoints(Qref, dQdt, filter='unstable') # Extract dimensionless states norm_QSS = {} for x in neuron.states: if 'unit' not in pltvars[x]: norm_QSS[x] = QSS[x] # Create figure fig, axes = plt.subplots(3, 1, figsize=(7, 9)) axes[-1].set_xlabel('$\\rm Q_m\ (nC/cm^2)$', fontsize=fs) for ax in axes: for skey in ['top', 'right']: ax.spines[skey].set_visible(False) for item in ax.get_xticklabels() + ax.get_yticklabels(): item.set_fontsize(fs) for item in ax.get_xticklabels(minor=True): item.set_visible(False) figname = '{} neuron - QSS dynamics @ {:.2f} kPa'.format(neuron.name, Adrive * 1e-3) fig.suptitle(figname, fontsize=fs) # Subplot: Vmeff ax = axes[0] ax.set_ylabel('$V_m^*$ (mV)', fontsize=fs) ax.plot(Qref * 1e5, Vmeff, color='k') ax.axhline(neuron.Vm0, linewidth=0.5, color='k') # Subplot: dimensionless quasi-steady states cset = plt.get_cmap('Dark2').colors + plt.get_cmap('tab10').colors ax = axes[1] ax.set_ylabel('QSS gating variables (-)', fontsize=fs) ax.set_yticks([0, 0.5, 1]) ax.set_ylim([-0.05, 1.05]) for i, (label, QS_state) in enumerate(norm_QSS.items()): ax.plot(Qref * 1e5, QS_state, label=label, c=cset[i]) # Subplot: currents ax = axes[2] cset = plt.get_cmap('tab10').colors ax.set_ylabel('QSS currents ($\\rm A/m^2$)', fontsize=fs) for i, (k, I) in enumerate(currents.items()): ax.plot(Qref * 1e5, -I * 1e-3, '--', c=cset[i], label='$\\rm -{}$'.format(neuron.getPltVars()[k]['label'])) ax.plot(Qref * 1e5, -iNet * 1e-3, color='k', label='$\\rm -I_{Net}$') ax.axhline(0, color='k', linewidth=0.5) if Q_SFPs.size > 0: ax.plot(Q_SFPs * 1e5, np.zeros(Q_SFPs.size), 'o', c='k', markersize=5, zorder=2) if Q_SFPs.size > 0: ax.plot(Q_UFPs * 1e5, np.zeros(Q_UFPs.size), 'o', c='k', markersize=5, mfc='none', zorder=2) fig.tight_layout() fig.subplots_adjust(right=0.8) for ax in axes[1:]: ax.legend(loc='center right', fontsize=fs, frameon=False, bbox_to_anchor=(1.3, 0.5)) for ax in axes[:-1]: ax.set_xticklabels([]) fig.canvas.set_window_title( '{}_QSS_states_vs_Qm_{:.2f}kPa'.format(neuron.name, Adrive * 1e-3)) return fig def plotQSSVarVsAmp(neuron, a, Fdrive, varname, amps=None, DC=1., fs=12, cmap='viridis', yscale='lin', zscale='lin'): ''' Plot a specific QSS variable (state or current) as a function of membrane charge density, for various acoustic amplitudes. :param neuron: neuron object :param a: sonophore radius (m) :param Fdrive: US frequency (Hz) :param amps: US amplitudes (Pa) :param DC: duty cycle (-) :param varname: extraction key for variable to plot :return: figure handle ''' # Determine stimulation modality if a is None and Fdrive is None: stim_type = 'elec' a = 32e-9 Fdrive = 500e3 else: stim_type = 'US' # Extract information about variable to plot pltvar = neuron.getPltVars()[varname] Qvar = neuron.getPltVars()['Qm'] Afactor = {'US': 1e-3, 'elec': 1.}[stim_type] # Q_SFPs = [] # Q_UFPs = [] log = 'plotting {} neuron QSS {} vs. amp for {} stimulation @ {:.0f}% DC'.format( neuron.name, varname, stim_type, DC * 1e2) logger.info(log) nbls = NeuronalBilayerSonophore(a, neuron, Fdrive) # Get reference dictionaries for zero amplitude _, Qref, lookups0, QSS0 = nbls.quasiSteadyStates(Fdrive, amps=0., squeeze_output=True) Vmeff0 = lookups0['V'] if stim_type == 'elec': # if E-STIM case, compute steady states with constant capacitance Vmeff0 = Qref / neuron.Cm0 * 1e3 QSS0 = neuron.steadyStates(Vmeff0) df0 = QSS0 df0['Vm'] = Vmeff0 # Create figure fig, ax = plt.subplots(figsize=(6, 4)) title = '{} neuron - {}steady-state {}'.format( neuron.name, 'quasi-' if amps is not None else '', pltvar['desc']) if amps is not None: title += '\nvs. {} amplitude @ {:.0f}% DC'.format(stim_type, DC * 1e2) ax.set_title(title, fontsize=fs) ax.set_xlabel('$\\rm {}\ ({})$'.format(Qvar['label'], Qvar['unit']), fontsize=fs) ax.set_ylabel('$\\rm QSS\ {}\ ({})$'.format(pltvar['label'], pltvar.get('unit', '')), fontsize=fs) if yscale == 'log': ax.set_yscale('log') for key in ['top', 'right']: ax.spines[key].set_visible(False) # Plot y-variable reference line, if any y0 = None y0_str = '{}0'.format(varname) if hasattr(neuron, y0_str): y0 = getattr(neuron, y0_str) * pltvar.get('factor', 1) elif varname in neuron.getCurrentsNames() + ['iNet', 'dQdt']: y0 = 0. y0_str = '' if y0 is not None: ax.axhline(y0, label=y0_str, c='k', linewidth=0.5) # Plot reference QSS profile of variable as a function of charge density var0 = extractPltVar( neuron, pltvar, pd.DataFrame({k: df0[k] for k in df0.keys()}), name=varname) ax.plot(Qref * Qvar['factor'], var0, '--', c='k', zorder=1, label='$\\rm A_{{{}}}=0$'.format(stim_type)) # if varname == 'dQdt': # Q_SFPs += getFixedPoints(Qref, var0, filter='stable').tolist() # Q_UFPs += getFixedPoints(Qref, var0, filter='unstable').tolist() # Define color code mymap = plt.get_cmap(cmap) zref = amps * Afactor if zscale == 'lin': norm = colors.Normalize(zref.min(), zref.max()) elif zscale == 'log': norm = colors.LogNorm(zref.min(), zref.max()) sm = cm.ScalarMappable(norm=norm, cmap=mymap) sm._A = [] # Get amplitude-dependent QSS dictionary if stim_type == 'US': # Get dictionary of charge and amplitude dependent QSS variables _, Qref, lookups, QSS = nbls.quasiSteadyStates( Fdrive, amps=amps, DCs=DC, squeeze_output=True) df = QSS df['Vm'] = lookups['V'] else: # Repeat zero-amplitude QSS dictionary for all amplitudes df = {k: np.tile(df0[k], (amps.size, 1)) for k in df0} # Plot QSS profiles for various amplitudes for i, A in enumerate(amps): var = extractPltVar( neuron, pltvar, pd.DataFrame({k: df[k][i] for k in df.keys()}), name=varname) if varname == 'dQdt' and stim_type == 'elec': var += A * DC * pltvar['factor'] ax.plot(Qref * Qvar['factor'], var, c=sm.to_rgba(A * Afactor), zorder=0) # if varname == 'dQdt': # # mark eq. point if starting point provided, otherwise mark all FPs # Q_SFPs += getFixedPoints(Qref, var, filter='stable').tolist() # Q_UFPs += getFixedPoints(Qref, var, filter='unstable').tolist() # # Plot fixed-points, if any # if len(Q_SFPs) > 0: # ax.plot(np.array(Q_SFPs) * Qvar['factor'], np.zeros(len(Q_SFPs)), 'o', c='k', # markersize=5, zorder=2) # if len(Q_UFPs) > 0: # ax.plot(np.array(Q_UFPs) * Qvar['factor'], np.zeros(len(Q_UFPs)), 'x', c='k', # markersize=5, zorder=2) # Add legend and adjust layout ax.legend(frameon=False, fontsize=fs) for item in ax.get_xticklabels() + ax.get_yticklabels(): item.set_fontsize(fs) fig.tight_layout() fig.subplots_adjust(bottom=0.15, top=0.9, right=0.80, hspace=0.5) # Plot amplitude colorbar if amps is not None: cbarax = fig.add_axes([0.85, 0.15, 0.03, 0.75]) fig.colorbar(sm, cax=cbarax) cbarax.set_ylabel( 'Amplitude ({})'.format({'US': 'kPa', 'elec': 'mA/m2'}[stim_type]), fontsize=fs) for item in cbarax.get_yticklabels(): item.set_fontsize(fs) title = '{}_{}SS_{}'.format(neuron.name, 'Q' if amps is not None else '', varname) if amps is not None: title += '_vs_{}A_{}_{:.0f}%DC'.format(zscale, stim_type, DC * 1e2) fig.canvas.set_window_title(title) return fig -def plotEqChargeVsAmp(neurons, a, Fdrive, amps=None, tstim=250e-3, toffset=50e-3, PRF=100.0, - DCs=[1.], fs=12, xscale='lin', titrate=False, mpi=False): +def plotEqChargeVsAmp(neuron, a, Fdrive, amps=None, tstim=250e-3, toffset=50e-3, PRF=100.0, + DCs=[1.], fs=12, xscale='lin', compdir=None, mpi=False, + loglevel=logging.INFO): ''' Plot the equilibrium membrane charge density as a function of acoustic amplitude, given an initial value of membrane charge density. - :param neurons: neuron objects + :param neuron: neuron object :param a: sonophore radius (m) :param Fdrive: US frequency (Hz) :param amps: US amplitudes (Pa) :return: figure handle ''' # Determine stimulation modality if a is None and Fdrive is None: stim_type = 'elec' a = 32e-9 Fdrive = 500e3 else: stim_type = 'US' logger.info('plotting equilibrium charges for %s stimulation', stim_type) # Create figure fig, ax = plt.subplots(figsize=(6, 4)) - figname = 'charge stability vs. amplitude' + figname = '{} neuron - charge stability vs. amplitude'.format(neuron.name) ax.set_title(figname) ax.set_xlabel('Amplitude ({})'.format({'US': 'kPa', 'elec': 'mA/m2'}[stim_type]), fontsize=fs) ax.set_ylabel('$\\rm Q_m\ (nC/cm^2)$', fontsize=fs) if xscale == 'log': ax.set_xscale('log') for skey in ['top', 'right']: ax.spines[skey].set_visible(False) for item in ax.get_xticklabels() + ax.get_yticklabels(): item.set_fontsize(fs) Qrange = (np.inf, -np.inf) icolor = 0 - for i, neuron in enumerate(neurons): - - nbls = NeuronalBilayerSonophore(a, neuron, Fdrive) - - # Compute reference charge variation array for zero amplitude - _, Qref, lookups0, QSS0 = nbls.quasiSteadyStates(Fdrive, amps=0., squeeze_output=True) - Qrange = (min(Qrange[0], Qref.min()), max(Qrange[1], Qref.max())) - Vmeff0 = lookups0['V'] - if stim_type == 'elec': # if E-STIM case, compute steady states with constant capacitance - Vmeff0 = Qref / neuron.Cm0 * 1e3 - QSS0 = neuron.steadyStates(Vmeff0) - dQdt0 = -neuron.iNet(Vmeff0, np.array([QSS0[k] for k in neuron.states])) # mA/m2 - - # Compute 3D QSS charge variation array - if stim_type == 'US': - _, _, lookups, QSS = nbls.quasiSteadyStates(Fdrive, amps=amps, DCs=DCs) - dQdt = -neuron.iNet(lookups['V'], np.array([QSS[k] for k in neuron.states])) # mA/m2 - Afactor = 1e-3 - else: - Afactor = 1. - dQdt = np.empty((amps.size, Qref.size, DCs.size)) - for iA, A in enumerate(amps): - for iDC, DC in enumerate(DCs): - dQdt[iA, :, iDC] = dQdt0 + A * DC - - # For each duty cycle - for iDC, DC in enumerate(DCs): - color = 'k' if len(neurons) * len(DCs) == 1 else 'C{}'.format(icolor) - - # Initialize containers for stable and unstable fixed points - SFPs = [] - UFPs = [] - - stab_points = [] - - # Generate QSS batch queue - QSS_queue = [] - for iA, Adrive in enumerate(amps): - lookups1D = {k: v[iA, :, iDC] for k, v in lookups.items()} - lookups1D['Q'] = Qref - QSS_queue.append([Fdrive, Adrive, DC, lookups1D, dQdt[iA, :, iDC]]) - - # Run batch to find stable and unstable fixed points at each amplitude - QSS_batch = Batch(nbls.fixedPointsQSS, QSS_queue) - QSS_output = QSS_batch(mpi=mpi) - - # Generate simulations batch queue - sim_queue = nbls.simQueue([Fdrive], amps, [tstim], [toffset], [PRF], [DC], method) - for item in sim_queue: - item.insert(0, outdir) - - # Run batch to find stabilization points at each amplitude - sim_batch = Batch(nbls.runIfNone, sim_queue) - sim_output = sim_batch(mpi=mpi) - - # Retrieve batch output - for i, Adrive in enumerate(amps): - SFPs += [(Adrive, Qm) for Qm in QSS_output[i][0]] - UFPs += [(Adrive, Qm) for Qm in QSS_output[i][1]] - - # TODO: get stabilization point from simulation, if any - - - # Plot charge SFPs and UFPs for each acoustic amplitude - lbl = '{} neuron - {{}}stable fixed points @ {:.0f} % DC'.format( - neuron.name, DC * 1e2) - if len(SFPs) > 0: - A_SFPs, Q_SFPs = np.array(SFPs).T - ax.plot(np.array(A_SFPs) * Afactor, np.array(Q_SFPs) * 1e5, 'o', c=color, - markersize=3, label=lbl.format('')) - if len(UFPs) > 0: - A_UFPs, Q_UFPs = np.array(UFPs).T - ax.plot(np.array(A_UFPs) * Afactor, np.array(Q_UFPs) * 1e5, 'x', c=color, - markersize=3, label=lbl.format('un')) - - # If specified, compute and plot the threshold excitation amplitude - if titrate: - if stim_type == 'US': - Athr = nbls.titrate(Fdrive, tstim, toffset, PRF=PRF, DC=DC) - ax.axvline(Athr * Afactor, c=color, linestyle='--') - else: - for Arange, ls in zip([(0., amps.max(amps.min(), 0.)), ()], ['--', '-.']): - Athr = neuron.titrate(tstim, toffset, PRF=PRF, DC=DC, Arange=Arange) - ax.axvline(Athr * Afactor, c=color, linestyle=ls) - icolor += 1 + + nbls = NeuronalBilayerSonophore(a, neuron, Fdrive) + + # Compute reference charge variation array for zero amplitude + _, Qref, lookups0, QSS0 = nbls.quasiSteadyStates(Fdrive, amps=0., squeeze_output=True) + Qrange = (min(Qrange[0], Qref.min()), max(Qrange[1], Qref.max())) + Vmeff0 = lookups0['V'] + if stim_type == 'elec': # if E-STIM case, compute steady states with constant capacitance + Vmeff0 = Qref / neuron.Cm0 * 1e3 + QSS0 = neuron.steadyStates(Vmeff0) + dQdt0 = -neuron.iNet(Vmeff0, np.array([QSS0[k] for k in neuron.states])) # mA/m2 + + # Compute 3D QSS charge variation array + if stim_type == 'US': + _, _, lookups, QSS = nbls.quasiSteadyStates(Fdrive, amps=amps, DCs=DCs) + dQdt = -neuron.iNet(lookups['V'], np.array([QSS[k] for k in neuron.states])) # mA/m2 + Afactor = 1e-3 + else: + Afactor = 1. + dQdt = np.empty((amps.size, Qref.size, DCs.size)) + for iA, A in enumerate(amps): + for iDC, DC in enumerate(DCs): + dQdt[iA, :, iDC] = dQdt0 + A * DC + + # For each duty cycle + for iDC, DC in enumerate(DCs): + color = 'k' if len(DCs) == 1 else 'C{}'.format(icolor) + + # Initialize containers for stable and unstable fixed points + SFPs = [] + UFPs = [] + + stab_points = [] + + # Generate QSS batch queue + QSS_queue = [] + for iA, Adrive in enumerate(amps): + lookups1D = {k: v[iA, :, iDC] for k, v in lookups.items()} + lookups1D['Q'] = Qref + QSS_queue.append([Fdrive, Adrive, DC, lookups1D, dQdt[iA, :, iDC]]) + + # Run batch to find stable and unstable fixed points at each amplitude + QSS_batch = Batch(nbls.fixedPointsQSS, QSS_queue) + QSS_output = QSS_batch(mpi=mpi, loglevel=loglevel) + + # Retrieve batch output + for i, Adrive in enumerate(amps): + SFPs += [(Adrive, Qm) for Qm in QSS_output[i][0]] + UFPs += [(Adrive, Qm) for Qm in QSS_output[i][1]] + + # TODO: get stabilization point from simulation, if any + if compdir is not None: + data, _ = nbls.load(compdir, Fdrive, Adrive, tstim, toffset, PRF, DC, 'sonic') + stab_points.append((Adrive, nbls.neuron.getStabilizationValue(data))) + + # Plot charge SFPs and UFPs for each acoustic amplitude + lbl = '{} neuron - {{}}stable fixed points @ {:.0f} % DC'.format( + neuron.name, DC * 1e2) + if len(SFPs) > 0: + A_SFPs, Q_SFPs = np.array(SFPs).T + ax.plot(np.array(A_SFPs) * Afactor, np.array(Q_SFPs) * 1e5, 'o', c=color, + markersize=3, label=lbl.format('')) + if len(UFPs) > 0: + A_UFPs, Q_UFPs = np.array(UFPs).T + ax.plot(np.array(A_UFPs) * Afactor, np.array(Q_UFPs) * 1e5, 'x', c=color, + markersize=3, label=lbl.format('un')) + + if len(stab_points) > 0: + A_stab, Q_stab = np.array(stab_points).T + ax.plot(np.array(A_stab) * Afactor, np.array(Q_stab) * 1e5, '*', c=color, + markersize=3, label='stabilization points') + + icolor += 1 # Post-process figure ax.set_ylim(np.array([Qrange[0], 0]) * 1e5) ax.legend(frameon=False, fontsize=fs) fig.tight_layout() - fig.canvas.set_window_title('QSS_Qstab_vs_{}A_{}_{}_{}%DC{}'.format( + fig.canvas.set_window_title('{}_QSS_Qstab_vs_{}A_{}_{}%DC{}'.format( + neuron.name, xscale, - '_'.join([n.name for n in neurons]), stim_type, '_'.join(['{:.0f}'.format(DC * 1e2) for DC in DCs]), - '_with_thresholds' if titrate else '' + '_with_comp' if compdir is not None else '' )) return fig diff --git a/PySONIC/utils.py b/PySONIC/utils.py index ec5b2c8..4f87063 100644 --- a/PySONIC/utils.py +++ b/PySONIC/utils.py @@ -1,815 +1,823 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Date: 2016-09-19 22:30:46 # @Email: theo.lemaire@epfl.ch # @Last Modified by: Theo Lemaire -# @Last Modified time: 2019-06-01 16:49:00 +# @Last Modified time: 2019-06-02 15:22:51 ''' Definition of generic utility functions used in other modules ''' import csv from functools import wraps import operator import time import os import math import pickle from tqdm import tqdm import logging import tkinter as tk from tkinter import filedialog import numpy as np import colorlog from scipy.interpolate import interp1d # Package logger my_log_formatter = colorlog.ColoredFormatter( '%(log_color)s %(asctime)s %(message)s', datefmt='%d/%m/%Y %H:%M:%S:', reset=True, log_colors={ 'DEBUG': 'green', 'INFO': 'white', 'WARNING': 'yellow', 'ERROR': 'red', 'CRITICAL': 'red,bg_white', }, style='%' ) def setHandler(logger, handler): for h in logger.handlers: logger.removeHandler(h) logger.addHandler(handler) return logger def setLogger(name, formatter): handler = colorlog.StreamHandler() handler.setFormatter(formatter) logger = colorlog.getLogger(name) logger.addHandler(handler) return logger class TqdmHandler(logging.StreamHandler): def __init__(self, formatter): logging.StreamHandler.__init__(self) self.setFormatter(formatter) def emit(self, record): msg = self.format(record) tqdm.write(msg) logger = setLogger('PySONIC', my_log_formatter) titrations_logfile = os.path.join(os.path.split(__file__)[0], 'neurons', 'titrations.log') # Figure naming conventions def figtitle(meta): ''' Return appropriate title based on simulation metadata. ''' if 'Cm0' in meta: return '{:.0f}nm radius BLS structure: MECH-STIM {:.0f}kHz, {:.2f}kPa, {:.1f}nC/cm2'.format( meta['a'] * 1e9, meta['Fdrive'] * 1e-3, meta['Adrive'] * 1e-3, meta['Qm'] * 1e5) else: if meta['DC'] < 1: wavetype = 'PW' suffix = ', {:.2f}Hz PRF, {:.0f}% DC'.format(meta['PRF'], meta['DC'] * 1e2) else: wavetype = 'CW' suffix = '' if 'Astim' in meta: return '{} neuron: {} E-STIM {:.2f}mA/m2, {:.0f}ms{}'.format( meta['neuron'], wavetype, meta['Astim'], meta['tstim'] * 1e3, suffix) else: return '{} neuron ({:.1f}nm): {} A-STIM {:.0f}kHz {:.2f}kPa, {:.0f}ms{} - {} model'.format( meta['neuron'], meta['a'] * 1e9, wavetype, meta['Fdrive'] * 1e-3, meta['Adrive'] * 1e-3, meta['tstim'] * 1e3, suffix, meta['method']) # SI units prefixes si_prefixes = { 'y': 1e-24, # yocto 'z': 1e-21, # zepto 'a': 1e-18, # atto 'f': 1e-15, # femto 'p': 1e-12, # pico 'n': 1e-9, # nano 'u': 1e-6, # micro 'm': 1e-3, # mili '': 1e0, # None 'k': 1e3, # kilo 'M': 1e6, # mega 'G': 1e9, # giga 'T': 1e12, # tera 'P': 1e15, # peta 'E': 1e18, # exa 'Z': 1e21, # zetta 'Y': 1e24, # yotta } def loadData(fpath, frequency=1): ''' Load dataframe and metadata dictionary from pickle file. ''' logger.info('Loading data from "%s"', os.path.basename(fpath)) with open(fpath, 'rb') as fh: frame = pickle.load(fh) df = frame['data'].iloc[::frequency] meta = frame['meta'] return df, meta def si_format(x, precision=0, space=' '): ''' Format a float according to the SI unit system, with the appropriate prefix letter. ''' if isinstance(x, float) or isinstance(x, int) or isinstance(x, np.float) or\ isinstance(x, np.int32) or isinstance(x, np.int64): if x == 0: factor = 1e0 prefix = '' else: sorted_si_prefixes = sorted(si_prefixes.items(), key=operator.itemgetter(1)) vals = [tmp[1] for tmp in sorted_si_prefixes] # vals = list(si_prefixes.values()) ix = np.searchsorted(vals, np.abs(x)) - 1 if np.abs(x) == vals[ix + 1]: ix += 1 factor = vals[ix] prefix = sorted_si_prefixes[ix][0] # prefix = list(si_prefixes.keys())[ix] return '{{:.{}f}}{}{}'.format(precision, space, prefix).format(x / factor) elif isinstance(x, list) or isinstance(x, tuple): return [si_format(item, precision, space) for item in x] elif isinstance(x, np.ndarray) and x.ndim == 1: return [si_format(float(item), precision, space) for item in x] else: print(type(x)) def pow10_format(number, precision=2): ''' Format a number in power of 10 notation. ''' ret_string = '{0:.{1:d}e}'.format(number, precision) a, b = ret_string.split("e") a = float(a) b = int(b) return '{}10^{{{}}}'.format('{} * '.format(a) if a != 1. else '', b) def rmse(x1, x2): ''' Compute the root mean square error between two 1D arrays ''' return np.sqrt(((x1 - x2) ** 2).mean()) def rsquared(x1, x2): ''' compute the R-squared coefficient between two 1D arrays ''' residuals = x1 - x2 ss_res = np.sum(residuals**2) ss_tot = np.sum((x1 - np.mean(x1))**2) return 1 - (ss_res / ss_tot) +def getInDict(d, key, func): + ''' Return value of specific dictionary key, or function return alias if not there. ''' + if key in d: + return d[key] + else: + return func() + + def Pressure2Intensity(p, rho=1075.0, c=1515.0): ''' Return the spatial peak, pulse average acoustic intensity (ISPPA) associated with the specified pressure amplitude. :param p: pressure amplitude (Pa) :param rho: medium density (kg/m3) :param c: speed of sound in medium (m/s) :return: spatial peak, pulse average acoustic intensity (W/m2) ''' return p**2 / (2 * rho * c) def Intensity2Pressure(I, rho=1075.0, c=1515.0): ''' Return the pressure amplitude associated with the specified spatial peak, pulse average acoustic intensity (ISPPA). :param I: spatial peak, pulse average acoustic intensity (W/m2) :param rho: medium density (kg/m3) :param c: speed of sound in medium (m/s) :return: pressure amplitude (Pa) ''' return np.sqrt(2 * rho * c * I) def OpenFilesDialog(filetype, dirname=''): ''' Open a FileOpenDialogBox to select one or multiple file. The default directory and file type are given. :param dirname: default directory :param filetype: default file type :return: tuple of full paths to the chosen filenames ''' root = tk.Tk() root.withdraw() filenames = filedialog.askopenfilenames(filetypes=[(filetype + " files", '.' + filetype)], initialdir=dirname) if filenames: par_dir = os.path.abspath(os.path.join(filenames[0], os.pardir)) else: par_dir = None return (filenames, par_dir) def selectDirDialog(): ''' Open a dialog box to select a directory. :return: full path to selected directory ''' root = tk.Tk() root.withdraw() return filedialog.askdirectory() def SaveFileDialog(filename, dirname=None, ext=None): ''' Open a dialog box to save file. :param filename: filename :param dirname: initial directory :param ext: default extension :return: full path to the chosen filename ''' root = tk.Tk() root.withdraw() filename_out = filedialog.asksaveasfilename( defaultextension=ext, initialdir=dirname, initialfile=filename) return filename_out def downsample(t_dense, y, nsparse): ''' Decimate periodic signals to a specified number of samples.''' if(y.ndim) > 1: nsignals = y.shape[0] else: nsignals = 1 y = np.array([y]) # determine time step and period of input signal T = t_dense[-1] - t_dense[0] dt_dense = t_dense[1] - t_dense[0] # resample time vector linearly t_ds = np.linspace(t_dense[0], t_dense[-1], nsparse) # create MAV window nmav = int(0.03 * T / dt_dense) if nmav % 2 == 0: nmav += 1 mav = np.ones(nmav) / nmav # determine signals padding npad = int((nmav - 1) / 2) # determine indexes of sampling on convolved signals ids = np.round(np.linspace(0, t_dense.size - 1, nsparse)).astype(int) y_ds = np.empty((nsignals, nsparse)) # loop through signals for i in range(nsignals): # pad, convolve and resample pad_left = y[i, -(npad + 2):-2] pad_right = y[i, 1:npad + 1] y_ext = np.concatenate((pad_left, y[i, :], pad_right), axis=0) y_mav = np.convolve(y_ext, mav, mode='valid') y_ds[i, :] = y_mav[ids] if nsignals == 1: y_ds = y_ds[0, :] return (t_ds, y_ds) def rescale(x, lb=None, ub=None, lb_new=0, ub_new=1): ''' Rescale a value to a specific interval by linear transformation. ''' if lb is None: lb = x.min() if ub is None: ub = x.max() xnorm = (x - lb) / (ub - lb) return xnorm * (ub_new - lb_new) + lb_new def getNeuronLookupsFile(mechname, a=None, Fdrive=None, Adrive=None, fs=False): fpath = os.path.join( os.path.split(__file__)[0], 'neurons', '{}_lookups'.format(mechname) ) if a is not None: fpath += '_{:.0f}nm'.format(a * 1e9) if Fdrive is not None: fpath += '_{:.0f}kHz'.format(Fdrive * 1e-3) if Adrive is not None: fpath += '_{:.0f}kPa'.format(Adrive * 1e-3) if fs is True: fpath += '_fs' return '{}.pkl'.format(fpath) def getLookups4D(mechname): ''' Retrieve 4D lookup tables and reference vectors for a given membrane mechanism. :param mechname: name of membrane density mechanism :return: 4-tuple with 1D numpy arrays of reference input vectors (charge density and one other variable), a dictionary of associated 2D lookup numpy arrays, and a dictionary with information about the other variable. ''' # Check lookup file existence lookup_path = getNeuronLookupsFile(mechname) if not os.path.isfile(lookup_path): raise FileNotFoundError('Missing lookup file: "{}"'.format(lookup_path)) # Load lookups dictionary # logger.debug('Loading %s lookup table', mechname) with open(lookup_path, 'rb') as fh: df = pickle.load(fh) inputs = df['input'] lookups4D = df['lookup'] # Retrieve 1D inputs from lookups dictionary aref = inputs['a'] Fref = inputs['f'] Aref = inputs['A'] Qref = inputs['Q'] return aref, Fref, Aref, Qref, lookups4D def getLookupsOff(mechname): ''' Retrieve appropriate US-OFF lookup tables and reference vectors for a given membrane mechanism. :param mechname: name of membrane density mechanism :return: 2-tuple with 1D numpy array of reference charge density and dictionary of associated 1D lookup numpy arrays. ''' # Get 4D lookups and input vectors aref, Fref, Aref, Qref, lookups4D = getLookups4D(mechname) # Perform 2D projection in appropriate dimensions logger.debug('Interpolating lookups at A = 0') lookups_off = {key: y4D[0, 0, 0, :] for key, y4D in lookups4D.items()} return Qref, lookups_off def getLookups2D(mechname, a=None, Fdrive=None, Adrive=None): ''' Retrieve appropriate 2D lookup tables and reference vectors for a given membrane mechanism, projected at a specific combination of sonophore radius, US frequency and/or acoustic pressure amplitude. :param mechname: name of membrane density mechanism :param a: sonophore radius (m) :param Fdrive: US frequency (Hz) :param Adrive: Acoustic peak pressure amplitude (Hz) :return: 4-tuple with 1D numpy arrays of reference input vectors (charge density and one other variable), a dictionary of associated 2D lookup numpy arrays, and a dictionary with information about the other variable. ''' # Get 4D lookups and input vectors aref, Fref, Aref, Qref, lookups4D = getLookups4D(mechname) # Check that inputs are within lookup range if a is not None: a = isWithin('radius', a, (aref.min(), aref.max())) if Fdrive is not None: Fdrive = isWithin('frequency', Fdrive, (Fref.min(), Fref.max())) if Adrive is not None: Adrive = isWithin('amplitude', Adrive, (Aref.min(), Aref.max())) # Determine projection dimensions based on inputs var_a = {'name': 'a', 'label': 'sonophore radius', 'val': a, 'unit': 'm', 'factor': 1e9, 'ref': aref, 'axis': 0} var_Fdrive = {'name': 'f', 'label': 'frequency', 'val': Fdrive, 'unit': 'Hz', 'factor': 1e-3, 'ref': Fref, 'axis': 1} var_Adrive = {'name': 'A', 'label': 'amplitude', 'val': Adrive, 'unit': 'Pa', 'factor': 1e-3, 'ref': Aref, 'axis': 2} if not isinstance(Adrive, float): var1 = var_a var2 = var_Fdrive var3 = var_Adrive elif not isinstance(Fdrive, float): var1 = var_a var2 = var_Adrive var3 = var_Fdrive elif not isinstance(a, float): var1 = var_Fdrive var2 = var_Adrive var3 = var_a # Perform 2D projection in appropriate dimensions # logger.debug('Interpolating lookups at (%s = %s%s, %s = %s%s)', # var1['name'], si_format(var1['val'], space=' '), var1['unit'], # var2['name'], si_format(var2['val'], space=' '), var2['unit']) lookups3D = {key: interp1d(var1['ref'], y4D, axis=var1['axis'])(var1['val']) for key, y4D in lookups4D.items()} if var2['axis'] > var1['axis']: var2['axis'] -= 1 lookups2D = {key: interp1d(var2['ref'], y3D, axis=var2['axis'])(var2['val']) for key, y3D in lookups3D.items()} if var3['val'] is not None: logger.debug('Interpolating lookups at %d new %s values between %s%s and %s%s', len(var3['val']), var3['name'], si_format(min(var3['val']), space=' '), var3['unit'], si_format(max(var3['val']), space=' '), var3['unit']) lookups2D = {key: interp1d(var3['ref'], y2D, axis=0)(var3['val']) for key, y2D in lookups2D.items()} var3['ref'] = np.array(var3['val']) return var3['ref'], Qref, lookups2D, var3 def getLookups2Dfs(mechname, a, Fdrive, fs): # Check lookup file existence lookup_path = getNeuronLookupsFile(mechname, a=a, Fdrive=Fdrive, fs=True) if not os.path.isfile(lookup_path): raise FileNotFoundError('Missing lookup file: "{}"'.format(lookup_path)) # Load lookups dictionary logger.debug('Loading %s lookup table with fs = %.0f%%', mechname, fs * 1e2) with open(lookup_path, 'rb') as fh: df = pickle.load(fh) inputs = df['input'] lookups3D = df['lookup'] # Retrieve 1D inputs from lookups dictionary fsref = inputs['fs'] Aref = inputs['A'] Qref = inputs['Q'] # Check that fs is within lookup range fs = isWithin('coverage', fs, (fsref.min(), fsref.max())) # Perform projection at fs logger.debug('Interpolating lookups at fs = %s%%', fs * 1e2) lookups2D = {key: interp1d(fsref, y3D, axis=2)(fs) for key, y3D in lookups3D.items()} return Aref, Qref, lookups2D def getLookupsDCavg(mechname, a, Fdrive, amps=None, charges=None, DCs=1.0): ''' Get the DC-averaged lookups of a specific neuron for a combination of US amplitudes, charge densities and duty cycles, at a specific US frequency. :param mechname: name of membrane density mechanism :param a: sonophore radius (m) :param Fdrive: US frequency (Hz) :param amps: US amplitudes (Pa) :param charges: membrane charge densities (C/m2) :param DCs: duty cycle value(s) :return: 4-tuple with reference values of US amplitude and charge density, as well as interpolated Vmeff and QSS gating variables ''' # Get lookups for specific (a, f, A) combination Aref, Qref, lookups2D, _ = getLookups2D(mechname, a=a, Fdrive=Fdrive) if 'ng' in lookups2D: lookups2D.pop('ng') # Derive inputs from lookups reference if not provided if amps is None: amps = Aref if charges is None: charges = Qref # Transform inputs into arrays if single value provided if isinstance(amps, float): amps = np.array([amps]) if isinstance(charges, float): charges = np.array([charges]) if isinstance(DCs, float): DCs = np.array([DCs]) nA, nQ, nDC = amps.size, charges.size, DCs.size cs = {True: 's', False: ''} # logger.debug('%u amplitude%s, %u charge%s, %u DC%s', # nA, cs[nA > 1], nQ, cs[nQ > 1], nDC, cs[nDC > 1]) # Re-interpolate lookups at input charges lookups2D = {key: interp1d(Qref, y2D, axis=1)(charges) for key, y2D in lookups2D.items()} # Interpolate US-ON (for each input amplitude) and US-OFF (A = 0) lookups amps = isWithin('amplitude', amps, (Aref.min(), Aref.max())) lookups_on = {key: interp1d(Aref, y2D, axis=0)(amps) for key, y2D in lookups2D.items()} lookups_off = {key: interp1d(Aref, y2D, axis=0)(0.0) for key, y2D in lookups2D.items()} # Compute DC-averaged lookups lookups_DCavg = {} for key in lookups2D.keys(): x_on, x_off = lookups_on[key], lookups_off[key] x_avg = np.empty((nA, nQ, nDC)) for iA, Adrive in enumerate(amps): for iDC, DC in enumerate(DCs): x_avg[iA, :, iDC] = x_on[iA, :] * DC + x_off * (1 - DC) lookups_DCavg[key] = x_avg return amps, charges, lookups_DCavg def isWithin(name, val, bounds, rel_tol=1e-9): ''' Check if a floating point number is within an interval. If the value falls outside the interval, an error is raised. If the value falls just outside the interval due to rounding errors, the associated interval bound is returned. :param val: float value :param bounds: interval bounds (float tuple) :return: original or corrected value ''' if isinstance(val, list) or isinstance(val, np.ndarray) or isinstance(val, tuple): return [isWithin(name, v, bounds, rel_tol) for v in val] if val >= bounds[0] and val <= bounds[1]: return val elif val < bounds[0] and math.isclose(val, bounds[0], rel_tol=rel_tol): logger.warning('Rounding %s value (%s) to interval lower bound (%s)', name, val, bounds[0]) return bounds[0] elif val > bounds[1] and math.isclose(val, bounds[1], rel_tol=rel_tol): logger.warning('Rounding %s value (%s) to interval upper bound (%s)', name, val, bounds[1]) return bounds[1] else: raise ValueError('{} value ({}) out of [{}, {}] interval'.format( name, val, bounds[0], bounds[1])) def getLookupsCompTime(mechname): # Check lookup file existence lookup_path = getNeuronLookupsFile(mechname) if not os.path.isfile(lookup_path): raise FileNotFoundError('Missing lookup file: "{}"'.format(lookup_path)) # Load lookups dictionary logger.debug('Loading comp times') with open(lookup_path, 'rb') as fh: df = pickle.load(fh) tcomps4D = df['tcomp'] return np.sum(tcomps4D) def getLowIntensitiesSTN(): ''' Return an array of acoustic intensities (W/m2) used to study the STN neuron in Tarnaud, T., Joseph, W., Martens, L., and Tanghe, E. (2018). Computational Modeling of Ultrasonic Subthalamic Nucleus Stimulation. IEEE Trans Biomed Eng. ''' return np.hstack(( np.arange(10, 101, 10), np.arange(101, 131, 1), np.array([140]) )) # W/m2 def getDistribution(xmin, xmax, nx, scale='lin'): if scale == 'log': xmin, xmax = np.log10(xmin), np.log10(xmax) return {'lin': np.linspace, 'log': np.logspace}[scale](xmin, xmax, nx) def getDistFromList(xlist): if not isinstance(xlist, list): raise TypeError('Input must be a list') if len(xlist) != 4: raise ValueError('List must contain exactly 4 arguments ([type, min, max, n])') scale = xlist[0] if scale not in ('log', 'lin'): raise ValueError('Unknown distribution type (must be "lin" or "log")') xmin, xmax = [float(x) for x in xlist[1:-1]] if xmin >= xmax: raise ValueError('Specified minimum higher or equal than specified maximum') nx = int(xlist[-1]) if nx < 2: raise ValueError('Specified number must be at least 2') return getDistribution(xmin, xmax, nx, scale=scale) def parseUSAmps(args, defaults): # Check if several mutually exclusive arguments were provided Aparams = ['Arange', 'Irange', 'amp', 'intensity'] if sum([x in args for x in Aparams]) > 1: raise ValueError('You must provide only one of the following arguments: {}'.format( ', '.join(Aparams))) if 'Arange' in args: return getDistFromList(args['Arange']) * 1e3 # Pa elif 'Irange' in args: return Intensity2Pressure(getDistFromList(args['Irange']) * 1e4) # Pa elif 'amp' in args: return np.array(args['amp']) * 1e3 # Pa elif 'intensity' in args: return Intensity2Pressure(np.array(args['intensity']) * 1e4) # Pa return np.array(defaults['amp']) * 1e3 # Pa def parseElecAmps(args, defaults): # Check if several mutually exclusive arguments were provided Aparams = ['Arange', 'amp'] if sum([x in args for x in Aparams]) > 1: raise ValueError('You must provide only one of the following arguments: {}'.format( ', '.join(Aparams))) if 'Arange' in args: return getDistFromList(args['Arange']) # mA/m2 elif 'amp' in args: return np.array(args['amp']) # mA/m2 return np.array(defaults['amp']) # mA/m2 def getIndex(container, value): ''' Return the index of a float / string value in a list / array :param container: list / 1D-array of elements :param value: value to search for :return: index of value (if found) ''' if isinstance(value, float): container = np.array(container) imatches = np.where(np.isclose(container, value, rtol=1e-9, atol=1e-16))[0] if len(imatches) == 0: raise ValueError('{} not found in {}'.format(value, container)) return imatches[0] elif isinstance(value, str): return container.index(value) def debug(func): ''' Print the function signature and return value. ''' @wraps(func) def wrapper_debug(*args, **kwargs): args_repr = [repr(a) for a in args] kwargs_repr = [f"{k}={v!r}" for k, v in kwargs.items()] signature = '{}({})'.format(func.__name__, ', '.join(args_repr + kwargs_repr)) print('Calling {}'.format(signature)) value = func(*args, **kwargs) print(f"{func.__name__!r} returned {value!r}") return value return wrapper_debug def timer(func): ''' Monitor and return the runtime of the decorated function. ''' @wraps(func) def wrapper(*args, **kwargs): start_time = time.perf_counter() value = func(*args, **kwargs) end_time = time.perf_counter() run_time = end_time - start_time return value, run_time return wrapper def cache(fpath, delimiter='\t', out_type=float): ''' Add an extra IO memoization functionality to a function using file caching, to avoid repetitions of tedious computations with identical inputs. ''' def wrapper_with_args(func): @wraps(func) def wrapper(*args, **kwargs): # If function has history -> do not log if 'history' in kwargs: return func(*args, **kwargs) # Translate function arguments into string signature args_repr = [repr(a) for a in args] kwargs_repr = [f"{k}={v!r}" for k, v in kwargs.items()] signature = '{}({})'.format(func.__name__, ', '.join(args_repr + kwargs_repr)) # If entry present in log, return corresponding output if os.path.isfile(fpath): with open(fpath, 'r', newline='') as f: reader = csv.reader(f, delimiter=delimiter) for row in reader: if row[0] == signature: logger.info('entry found in "{}"'.format(os.path.basename(fpath))) return out_type(row[1]) # Otherwise, compute output and log it into file before returning out = func(*args, **kwargs) with open(fpath, 'a', newline='') as csvfile: writer = csv.writer(csvfile, delimiter=delimiter) writer.writerow([signature, str(out)]) return out return wrapper return wrapper_with_args def binarySearch(bool_func, args, ix, xbounds, dx_thr, history=None): ''' Use a binary search to determine the threshold satisfying a given condition within a continuous search interval. :param bool_func: boolean function returning whether condition is satisfied :param args: list of function arguments other than refined value :param xbounds: search interval for threshold (progressively refined) :param dx_thr: accuracy criterion for threshold :return: excitation threshold ''' # Assign empty history if first function call if history is None: history = [] # Compute function output at interval mid-point x = (xbounds[0] + xbounds[1]) / 2 sim_args = args[:] sim_args.insert(ix, x) history.append(bool_func(sim_args)) # If titration interval is small enough conv = False if (xbounds[1] - xbounds[0]) <= dx_thr: logger.debug('titration interval smaller than defined threshold') # If both conditions have been encountered during titration process, # we're going towards convergence if (0 in history and 1 in history): logger.debug('converging around threshold') # If current value satisfies condition, convergence is achieved # -> return threshold if history[-1]: logger.debug('currently satisfying condition -> convergence') return x # If only one condition has been encountered during titration process, # then no titration is impossible within the defined interval -> return NaN else: logger.warning('titration does not converge within this interval') return np.nan # Return threshold if convergence is reached, otherwise refine interval and iterate if conv: return x else: if x > 0.: xbounds = (xbounds[0], x) if history[-1] else (x, xbounds[1]) else: xbounds = (x, xbounds[1]) if history[-1] else (xbounds[0], x) return binarySearch(bool_func, args, ix, xbounds, dx_thr, history=history) def resolveDependencies(deps, join_items=True): ''' Solve a dictionary of dependencies. :param arg: dependency dictionary in which the values are the dependencies of their respective keys. :param join_items: boolean specifying whether or not to serialize output :return: list of inter-dependent elements in resolved order ''' # Transform input dictionary of lists into dictionary of sets, # while removing circular (auto) dependencies deps = dict((k, set([x for x in deps[k] if x != k])) for k in deps) # Initialize empty list of resolved dependencies resolved_deps = [] # Iterate while dependencies not entirely resolved while deps: # Extract latest items without dependencies (values that are not in keys # and keys without value) into a set nd_items = set(i for v in deps.values() for i in v) - set(deps.keys()) nd_items.update(k for k, v in deps.items() if not v) # Append new set of non-dependent items to output list resolved_deps.append(nd_items) # Remove those items from remaining dependencies in input dictionary deps = dict(((k, v - nd_items) for k, v in deps.items() if v)) # If specified, merge list of sets into a unique list (while preserving order) if join_items: tmp = [] for item in resolved_deps: tmp += list(item) resolved_deps = tmp return resolved_deps def plural(n): if n < 0: raise ValueError('Cannot format negative integer (n = {})'.format(n)) if n == 0: return '' else: return 's' diff --git a/scripts/plot_QSS_IQ.py b/scripts/plot_QSS_IQ.py index 07fed99..e3fa197 100644 --- a/scripts/plot_QSS_IQ.py +++ b/scripts/plot_QSS_IQ.py @@ -1,137 +1,117 @@ # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Date: 2018-09-28 16:13:34 # @Last Modified by: Theo Lemaire -# @Last Modified time: 2019-05-23 18:55:37 +# @Last Modified time: 2019-06-02 15:25:45 ''' Phase-plane analysis of neuron behavior under quasi-steady state approximation. ''' import os import numpy as np import matplotlib.pyplot as plt from argparse import ArgumentParser import logging -from PySONIC.utils import logger, selectDirDialog +from PySONIC.utils import logger, selectDirDialog, parseUSAmps, getInDict from PySONIC.neurons import getNeuronsDict from PySONIC.plt import plotQSSvars, plotQSSVarVsAmp, plotEqChargeVsAmp def main(): ap = ArgumentParser() - # Stimulation parameters - ap.add_argument('-n', '--neurons', type=str, nargs='+', default=None, help='Neuron types') - ap.add_argument('-o', '--outputdir', type=str, default=None, help='Output directory') + # Runtime options ap.add_argument('-c', '--cmap', type=str, default='viridis', help='Colormap name') ap.add_argument('-v', '--verbose', default=False, action='store_true', help='Increase verbosity') ap.add_argument('-s', '--save', default=False, action='store_true', help='Save output figures') - ap.add_argument('--titrate', default=False, action='store_true', - help='Titrate excitation threshold') - ap.add_argument('-A', '--amp', nargs='+', type=float, default=None, - help='Amplitude (kPa or mA/m2)') - ap.add_argument('--tstim', type=float, default=500., - help='Stimulus duration for titration (ms)') - ap.add_argument('--toffset', type=float, default=500., - help='Offset duration for titration (ms)') - ap.add_argument('--PRF', type=float, default=100., - help='Pulse-repetition-frequency for titration (Hz)') - ap.add_argument('--DC', type=float, nargs='+', default=None, help='Duty cycle (%)') - ap.add_argument('--Ascale', type=str, default='lin', - help='Scale type for acoustic amplitude ("lin" or "log")') - ap.add_argument('--Amin', type=float, default=None, help='Amplitude lower bound (kPa or mA/m2)') - ap.add_argument('--Amax', type=float, default=None, help='Amplitude upper bound (kPa or mA/m2)') - ap.add_argument('--nA', type=float, default=100, help='Number of amplitude values') - ap.add_argument('--stim', type=str, default='US', help='Stimulation type ("US" or "elec")') + ap.add_argument('--comp', default=False, action='store_true', + help='Compare with simulations') ap.add_argument('--vars', type=str, nargs='+', default=None, help='Variables to plot') ap.add_argument('--mpi', default=False, action='store_true', help='Use multiprocessing') + ap.add_argument('-o', '--outputdir', type=str, default=None, help='Output directory') + ap.add_argument('-i', '--inputdir', type=str, default=None, help='Input directory') + + # Stimulation parameters + ap.add_argument('-n', '--neurons', type=str, nargs='+', default=None, help='Neuron types') + ap.add_argument('-a', '--radius', type=float, default=32., help='Sonophore radius (nm)') + ap.add_argument('-f', '--freq', type=float, default=500., help='US frequency (kHz)') + ap.add_argument('-A', '--amp', nargs='+', type=float, help='Acoustic pressure amplitude (kPa)') + ap.add_argument('--Arange', type=str, nargs='+', help='Amplitude range [scale min max n] (kPa)') + ap.add_argument('-I', '--intensity', nargs='+', type=float, help='Acoustic intensity (W/cm2)') + ap.add_argument('--Irange', type=str, nargs='+', + help='Intensity range [scale min max n] (W/cm2)') + ap.add_argument('--tstim', type=float, default=1000., help='Stimulus duration (ms)') + ap.add_argument('--toffset', type=float, default=0., help='Offset duration (ms)') + ap.add_argument('--PRF', type=float, default=100., help='Pulse-repetition-frequency (Hz)') + ap.add_argument('--DC', type=float, nargs='+', default=None, help='Duty cycle (%)') # Parse arguments - args = ap.parse_args() - logger.setLevel(logging.DEBUG if args.verbose else logging.INFO) - neurons = ['RS', 'LTS'] if args.neurons is None else args.neurons - neurons = [getNeuronsDict()[n]() for n in neurons] - - # US parameters - a = 32e-9 # m - Fdrive = 500e3 # Hz - AUS_range = (1., 600.) # kPa - - # E-STIM parameters - Aelec_range = (-20., 20.) # mA/m2 - - # Pulsing parameters - tstim = args.tstim * 1e-3 # s - toffset = args.toffset * 1e-3 # s - PRF = args.PRF # Hz - DCs = [100.] if args.DC is None else args.DC # % - DCs = np.array(DCs) * 1e-2 # (-) - - if args.stim == 'US': - if args.amp is not None: - amps = np.array(args.amp) * 1e3 - else: - Arange = list(AUS_range) - for i, val in enumerate([args.Amin, args.Amax]): - if val is not None: - Arange[i] = val - amps = { - 'lin': np.linspace(Arange[0], Arange[1], args.nA), - 'log': np.logspace(np.log10(Arange[0]), np.log10(Arange[1]), args.nA) - }[args.Ascale] * 1e3 # Pa - cmap = args.cmap - else: - a = None - Fdrive = None - if args.amp is not None: - amps = np.array(args.amp) # mA/m2 - else: - Arange = list(Aelec_range) - for i, val in enumerate([args.Amin, args.Amax]): - if val is not None: - Arange[i] = val - amps = np.linspace(Arange[0], Arange[1], args.nA) # mA/m2 - cmap = 'RdBu_r' + args = {key: value for key, value in vars(ap.parse_args()).items() if value is not None} + loglevel = logging.DEBUG if args['verbose'] is True else logging.INFO + logger.setLevel(loglevel) + mpi = args['mpi'] + comp = args['comp'] + save = args['save'] + cmap = args['cmap'] + pltvars = args.get('vars', ['dQdt']) + Ascale = args.get('Arange', ['lin'])[0] + + if comp: + indir = getInDict(args, 'inputdir', selectDirDialog) + if indir == '': + logger.error('no input directory') + quit() + if save: + outdir = getInDict(args, 'outputdir', selectDirDialog) + if outdir == '': + logger.error('no output directory') + quit() - if args.vars is None: - args.vars = ['dQdt'] + neurons = [getNeuronsDict()[n]() for n in args.get('neurons', ['RS', 'LTS'])] + a = args['radius'] * 1e-9 # m + Fdrive = args['freq'] * 1e3 # Hz + amps = parseUSAmps(args, np.linspace(1., 600., 3) * 1e3) # Pa + tstim = args['tstim'] * 1e-3 # s + toffset = args['toffset'] * 1e-3 # s + PRF = args['PRF'] # Hz + DCs = np.array(args.get('DC', [100.])) * 1e-2 # (-) figs = [] # Plot iNet vs Q for different amplitudes for each neuron and DC for i, neuron in enumerate(neurons): for DC in DCs: if amps.size == 1: figs.append( plotQSSvars(neuron, a, Fdrive, amps[0])) else: - for var in args.vars: + for var in pltvars: figs.append(plotQSSVarVsAmp( - neuron, a, Fdrive, var, amps=amps, DC=DC, cmap=cmap, zscale=args.Ascale)) - - # Plot equilibrium charge as a function of amplitude for each neuron - if amps.size > 1 and 'dQdt' in args.vars: - figs.append( - plotEqChargeVsAmp( - neurons, a, Fdrive, amps=amps, tstim=tstim, PRF=PRF, DCs=DCs, toffset=toffset, - xscale=args.Ascale, titrate=args.titrate, mpi=args.mpi)) - - if args.save: - outputdir = args.outputdir if args.outputdir is not None else selectDirDialog() - if outputdir == '': + neuron, a, Fdrive, var, amps=amps, DC=DC, cmap=cmap, zscale=Ascale)) + + # Plot equilibrium charge as a function of amplitude for each neuron + if amps.size > 1 and 'dQdt' in pltvars: + figs.append( + plotEqChargeVsAmp( + neuron, a, Fdrive, amps=amps, tstim=tstim, PRF=PRF, DCs=DCs, toffset=toffset, + xscale=Ascale, compdir=indir, mpi=mpi, loglevel=loglevel)) + + if save: + outdir = args['outputdir'] if 'outputdir' in args else selectDirDialog() + if outdir == '': logger.error('no output directory') else: for fig in figs: s = fig.canvas.get_window_title() s = s.replace('(', '- ').replace('/', '_').replace(')', '') figname = '{}.png'.format(s) - fig.savefig(os.path.join(outputdir, figname), transparent=True) + fig.savefig(os.path.join(outdir, figname), transparent=True) else: plt.show() if __name__ == '__main__': main()