diff --git a/PySONIC/core/nbls.py b/PySONIC/core/nbls.py index 688520f..8165230 100644 --- a/PySONIC/core/nbls.py +++ b/PySONIC/core/nbls.py @@ -1,958 +1,865 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Date: 2016-09-29 16:16:19 # @Email: theo.lemaire@epfl.ch # @Last Modified by: Theo Lemaire -# @Last Modified time: 2019-05-29 09:48:36 +# @Last Modified time: 2019-05-29 16:16:26 from copy import deepcopy import time import logging import pickle import progressbar as pb import numpy as np import pandas as pd from scipy.integrate import ode, odeint, solve_ivp from scipy.interpolate import interp1d -from .simulators import PWSimulator +from .simulators import PWSimulator, HybridSimulator from .bls import BilayerSonophore from .pneuron import PointNeuron from ..utils import * from ..constants import * from ..postpro import findPeaks, getFixedPoints class NeuronalBilayerSonophore(BilayerSonophore): ''' This class inherits from the BilayerSonophore class and receives an PointNeuron instance at initialization, to define the electro-mechanical NICE model and its SONIC variant. ''' tscale = 'ms' # relevant temporal scale of the model defvar = 'Q' # default plot variable def __init__(self, a, neuron, Fdrive=None, embedding_depth=0.0): ''' Constructor of the class. :param a: in-plane radius of the sonophore structure within the membrane (m) :param neuron: neuron object :param Fdrive: frequency of acoustic perturbation (Hz) :param embedding_depth: depth of the embedding tissue around the membrane (m) ''' # Check validity of input parameters if not isinstance(neuron, PointNeuron): raise ValueError('Invalid neuron type: "{}" (must inherit from PointNeuron class)' .format(neuron.name)) self.neuron = neuron # Initialize BilayerSonophore parent object BilayerSonophore.__init__(self, a, neuron.Cm0, neuron.Cm0 * neuron.Vm0 * 1e-3, embedding_depth) def __repr__(self): return 'NeuronalBilayerSonophore({}m, {})'.format( si_format(self.a, precision=1, space=' '), self.neuron) def pprint(self): return '{}m radius NBLS - {} neuron'.format( si_format(self.a, precision=0, space=' '), self.neuron.name) def getPltVars(self, wrapleft='df["', wrapright='"]'): pltvars = super().getPltVars(wrapleft, wrapright) pltvars.update(self.neuron.getPltVars(wrapleft, wrapright)) return pltvars def getPltScheme(self): return self.neuron.getPltScheme() def filecode(self, Fdrive, Adrive, tstim, PRF, DC, method): return 'ASTIM_{}_{}_{:.0f}nm_{:.0f}kHz_{:.2f}kPa_{:.0f}ms_{}{}'.format( self.neuron.name, 'CW' if DC == 1 else 'PW', self.a * 1e9, Fdrive * 1e-3, Adrive * 1e-3, tstim * 1e3, 'PRF{:.2f}Hz_DC{:.2f}%_'.format(PRF, DC * 1e2) if DC < 1. else '', method) def fullDerivatives(self, y, t, Adrive, Fdrive, phi): ''' Compute the derivatives of the (n+3) ODE full NBLS system variables. :param y: vector of state variables :param t: specific instant in time (s) :param Adrive: acoustic drive amplitude (Pa) :param Fdrive: acoustic drive frequency (Hz) :param phi: acoustic drive phase (rad) :return: vector of derivatives ''' dydt_mech = BilayerSonophore.derivatives(self, y[:3], t, Adrive, Fdrive, y[3], phi) dydt_elec = self.neuron.Qderivatives(y[3:], t, self.Capct(y[1])) return dydt_mech + dydt_elec def effDerivatives(self, y, t, lkp): ''' Compute the derivatives of the n-ODE effective HH system variables, based on 1-dimensional linear interpolation of "effective" coefficients that summarize the system's behaviour over an acoustic cycle. :param y: vector of HH system variables at time t :param t: specific instant in time (s) :param lkp: dictionary of 1D data points of "effective" coefficients over the charge domain, for specific frequency and amplitude values. :return: vector of effective system derivatives at time t ''' # Split input vector explicitly Qm, *states = y # Compute charge and channel states variation Vmeff = self.neuron.interpVmeff(Qm, lkp) dQmdt = - self.neuron.iNet(Vmeff, states) * 1e-3 dstates = self.neuron.derEffStates(Qm, states, lkp) # Return derivatives vector return [dQmdt, *[dstates[k] for k in self.neuron.states]] def interpEffVariable(self, key, Qm, stim, lkp_on, lkp_off): ''' Interpolate Q-dependent effective variable along solution. :param key: lookup variable key :param Qm: charge density solution vector :param stim: stimulation state solution vector :param lkp_on: lookups for ON states :param lkp_off: lookups for OFF states :return: interpolated effective variable vector ''' x = np.zeros(stim.size) x[stim == 0] = np.interp( Qm[stim == 0], lkp_on['Q'], lkp_on[key], left=np.nan, right=np.nan) x[stim == 1] = np.interp( Qm[stim == 1], lkp_off['Q'], lkp_off[key], left=np.nan, right=np.nan) return x def runFull(self, Fdrive, Adrive, tstim, toffset, PRF, DC, phi=np.pi): ''' Compute solutions of the full electro-mechanical system for a specific set of US stimulation parameters, using a classic integration scheme. The first iteration uses the quasi-steady simplification to compute the initiation of motion from a flat leaflet configuration. Afterwards, the ODE system is solved iteratively until completion. :param Fdrive: acoustic drive frequency (Hz) :param Adrive: acoustic drive amplitude (Pa) :param tstim: duration of US stimulation (s) :param toffset: duration of the offset (s) :param PRF: pulse repetition frequency (Hz) :param DC: pulse duty cycle (-) :param phi: acoustic drive phase (rad) :return: 3-tuple with the time profile, the effective solution matrix and a state vector ''' # Determine time step dt = 1 / (NPC_FULL * Fdrive) # Compute non-zero deflection value for a small perturbation (solving quasi-steady equation) Pac = self.Pacoustic(dt, Adrive, Fdrive, phi) Z0 = self.balancedefQS(self.ng0, self.Qm0, Pac) # Set initial conditions steady_states = self.neuron.steadyStates(self.neuron.Vm0) y0 = np.concatenate(( [0., Z0, self.ng0, self.Qm0], [steady_states[k] for k in self.neuron.states], )) # Initialize simulator and compute solution logger.debug('Computing detailed solution') simulator = PWSimulator( lambda y, t: self.fullDerivatives(y, t, Adrive, Fdrive, phi), lambda y, t: self.fullDerivatives(y, t, 0., 0., 0.), ) t, y, stim = simulator.compute( y0, dt, tstim, toffset, PRF, DC, print_progress=logger.getEffectiveLevel() <= logging.INFO, target_dt=CLASSIC_TARGET_DT ) # Compute membrane potential vector (in mV) Qm = y[:, 3] Z = y[:, 1] Vm = Qm / self.v_Capct(Z) * 1e3 # mV # Add Vm to solution matrix y = np.hstack(( y[:, 1:4], np.array([Vm]).T, y[:, 4:] )) # Return output variables return t, y.T, stim def runSONIC(self, Fdrive, Adrive, tstim, toffset, PRF, DC, dt=DT_EFF): ''' Compute solutions of the system for a specific set of US stimulation parameters, using charge-predicted "effective" coefficients to solve the HH equations at each step. :param Fdrive: acoustic drive frequency (Hz) :param Adrive: acoustic drive amplitude (Pa) :param tstim: duration of US stimulation (s) :param toffset: duration of the offset (s) :param PRF: pulse repetition frequency (Hz) :param DC: pulse duty cycle (-) :param dt: integration time step (s) :return: 3-tuple with the time profile, the effective solution matrix and a state vector ''' # Load appropriate 2D lookups Aref, Qref, lookups2D, _ = getLookups2D(self.neuron.name, a=self.a, Fdrive=Fdrive) # Check that acoustic amplitude is within lookup range Adrive = isWithin('amplitude', Adrive, (Aref.min(), Aref.max())) # Interpolate 2D lookups at zero and US amplitude logger.debug('Interpolating lookups at A = %.2f kPa and A = 0', Adrive * 1e-3) lookups_on = {key: interp1d(Aref, y2D, axis=0)(Adrive) for key, y2D in lookups2D.items()} lookups_off = {key: interp1d(Aref, y2D, axis=0)(0.0) for key, y2D in lookups2D.items()} # Add reference charge vector to 1D lookup dictionaries lookups_on['Q'] = Qref lookups_off['Q'] = Qref # Set initial conditions steady_states = self.neuron.steadyStates(self.neuron.Vm0) y0 = np.insert( np.array([steady_states[k] for k in self.neuron.states]), 0, self.Qm0 ) # Initialize simulator and compute solution logger.debug('Computing effective solution') simulator = PWSimulator( lambda y, t: self.effDerivatives(y, t, lookups_on), lambda y, t: self.effDerivatives(y, t, lookups_off) ) t, y, stim = simulator.compute(y0, dt, tstim, toffset, PRF, DC) Qm = y[:, 0] # Compute effective gas content and membrane potential vectors ng, Vm = [ self.interpEffVariable(key, Qm, stim, lookups_on, lookups_off) for key in ['ng', 'V'] ] # Compute quasi-steady deflection vector Z = np.array([self.balancedefQS(x1, x2) for x1, x2 in zip(ng, Qm)]) # m # Add Z, ng and Vm to solution matrix y = np.hstack(( np.array([Z, ng, Qm, Vm]).T, y[:, 1:] )) # Return output variables return t, y.T, stim - def runHybrid(self, Fdrive, Adrive, tstim, toffset, phi=np.pi): + def runHybrid(self, Fdrive, Adrive, tstim, toffset, PRF, DC, phi=np.pi): ''' Compute solutions of the system for a specific set of US stimulation parameters, using a hybrid integration scheme. The first iteration uses the quasi-steady simplification to compute the initiation of motion from a flat leaflet configuration. Afterwards, the NBLS ODE system is solved iteratively for "slices" of N-microseconds, in a 2-steps scheme: - First, the full (n+3) ODE system is integrated for a few acoustic cycles until Z and ng reach a stable periodic solution (limit cycle) - Second, the signals of the 3 mechanical variables over the last acoustic period are selected and resampled to a far lower sampling rate - Third, the HH n-ODE system is integrated for the remaining time of the slice, using periodic expansion of the mechanical signals to precompute the values of capacitance. :param Fdrive: acoustic drive frequency (Hz) :param Adrive: acoustic drive amplitude (Pa) :param tstim: duration of US stimulation (s) :param toffset: duration of the offset (s) :param phi: acoustic drive phase (rad) :return: 3-tuple with the time profile, the solution matrix and a state vector .. warning:: This method cannot handle pulsed stimuli ''' - # Initialize full and HH systems solvers - solver_full = ode( - lambda t, y, Adrive, Fdrive, phi: self.fullDerivatives(y, t, Adrive, Fdrive, phi)) - solver_full.set_f_params(Adrive, Fdrive, phi) - solver_full.set_integrator('lsoda', nsteps=SOLVER_NSTEPS) - solver_hh = ode(lambda t, y, Cm: self.neuron.Qderivatives(y, t, Cm)) - solver_hh.set_integrator('dop853', nsteps=SOLVER_NSTEPS, atol=1e-12) - - # Determine full and HH systems time steps - Tdrive = 1 / Fdrive - dt_full = Tdrive / NPC_FULL - dt_hh = Tdrive / NPC_HH - n_full_per_hh = int(NPC_FULL / NPC_HH) - t_full_cycle = np.linspace(0, Tdrive - dt_full, NPC_FULL) - t_hh_cycle = np.linspace(0, Tdrive - dt_hh, NPC_HH) - - # Determine number of samples in prediction vectors - npc_pred = NPC_FULL - n_full_per_hh + 1 - - # Solve quasi-steady equation to compute first deflection value - Z0 = 0.0 - ng0 = self.ng0 - Qm0 = self.Qm0 - Pac1 = self.Pacoustic(dt_full, Adrive, Fdrive, phi) - Z1 = self.balancedefQS(ng0, Qm0, Pac1) - - # Initialize global arrays - stimstate = np.array([1, 1]) - t = np.array([0., dt_full]) - y_membrane = np.array([[0., (Z1 - Z0) / dt_full], [Z0, Z1], [ng0, ng0], [Qm0, Qm0]]) + # Determine time step + dt_dense = 1 / (NPC_FULL * Fdrive) + dt_sparse = 1 / (NPC_HH * Fdrive) + + # Compute non-zero deflection value for a small perturbation (solving quasi-steady equation) + Pac = self.Pacoustic(dt_dense, Adrive, Fdrive, phi) + Z0 = self.balancedefQS(self.ng0, self.Qm0, Pac) + + # Set initial conditions steady_states = self.neuron.steadyStates(self.neuron.Vm0) - y_channels = np.tile(np.array([steady_states[k] for k in self.neuron.states]), (2, 1)).T - y = np.vstack((y_membrane, y_channels)) - nvar = y.shape[0] - - # Initialize progress bar - if logger.getEffectiveLevel() == logging.DEBUG: - widgets = ['Running: ', pb.Percentage(), ' ', pb.Bar(), ' ', pb.ETA()] - pbar = pb.ProgressBar(widgets=widgets, max_value=1000) - pbar.start() - - # For each hybrid integration interval - irep = 0 - sim_error = False - while not sim_error and t[-1] < tstim + toffset: - - # Integrate full system for a few acoustic cycles until stabilization - periodic_conv = False - j = 0 - ng_last = None - Z_last = None - while not sim_error and not periodic_conv: - if t[-1] > tstim: - solver_full.set_f_params(0.0, 0.0, 0.0) - t_full = t_full_cycle + t[-1] + dt_full - y_full = np.empty((nvar, NPC_FULL)) - y0_full = y[:, -1] - solver_full.set_initial_value(y0_full, t[-1]) - k = 0 - while solver_full.successful() and k <= NPC_FULL - 1: - solver_full.integrate(t_full[k]) - y_full[:, k] = solver_full.y - k += 1 - - # Compare Z and ng signals over the last 2 acoustic periods - if j > 0 and rmse(Z_last, y_full[1, :]) < Z_ERR_MAX \ - and rmse(ng_last, y_full[2, :]) < NG_ERR_MAX: - periodic_conv = True - - # Update last vectors for next comparison - Z_last = y_full[1, :] - ng_last = y_full[2, :] - - # Concatenate time and solutions to global vectors - stimstate = np.concatenate([stimstate, np.ones(NPC_FULL)], axis=0) - t = np.concatenate([t, t_full], axis=0) - y = np.concatenate([y, y_full], axis=1) - - # Increment loop index - j += 1 - - # Retrieve last period of the 3 mechanical variables to propagate in HH system - t_last = t[-npc_pred:] - mech_last = y[0:3, -npc_pred:] - - # Downsample signals to specified HH system time step - (_, mech_pred) = downsample(t_last, mech_last, NPC_HH) - - # Integrate HH system until certain dQ or dT is reached - Q0 = y[3, -1] - dQ = 0.0 - t0_interval = t[-1] - dt_interval = 0.0 - j = 0 - if t[-1] < tstim: - tlim = tstim - else: - tlim = tstim + toffset - while (not sim_error and t[-1] < tlim and - (np.abs(dQ) < DQ_UPDATE or dt_interval < DT_UPDATE)): - t_hh = t_hh_cycle + t[-1] + dt_hh - y_hh = np.empty((nvar - 3, NPC_HH)) - y0_hh = y[3:, -1] - solver_hh.set_initial_value(y0_hh, t[-1]) - k = 0 - while solver_hh.successful() and k <= NPC_HH - 1: - solver_hh.set_f_params(self.Capct(mech_pred[1, k])) - solver_hh.integrate(t_hh[k]) - y_hh[:, k] = solver_hh.y - k += 1 - - # Concatenate time and solutions to global vectors - stimstate = np.concatenate([stimstate, np.zeros(NPC_HH)], axis=0) - t = np.concatenate([t, t_hh], axis=0) - y = np.concatenate([y, np.concatenate([mech_pred, y_hh], axis=0)], axis=1) - - # Compute charge variation from interval beginning - dQ = y[3, -1] - Q0 - dt_interval = t[-1] - t0_interval - - # Increment loop index - j += 1 - - # Update progress bar - if logger.getEffectiveLevel() == logging.DEBUG: - pbar.update(int(1000 * (t[-1] / (tstim + toffset)))) - - irep += 1 - - # Terminate progress bar - if logger.getEffectiveLevel() == logging.DEBUG: - pbar.finish() + y0 = np.concatenate(( + [0., Z0, self.ng0, self.Qm0], + [steady_states[k] for k in self.neuron.states], + )) + is_dense_var = np.array([True] * 3 + [False] * (len(self.neuron.states) + 1)) + + # Initialize simulator and compute solution + logger.debug('Computing hybrid solution') + simulator = HybridSimulator( + lambda y, t: self.fullDerivatives(y, t, Adrive, Fdrive, phi), + lambda y, t: self.fullDerivatives(y, t, 0., 0., 0.), + lambda t, y, Cm: self.neuron.Qderivatives(y, t, Cm), + lambda yref: self.Capct(yref[1]), + is_dense_var=is_dense_var, + ivars_to_check=[1, 2] + ) + t, y, stim = simulator.compute( + y0, dt_dense, dt_sparse, Fdrive, tstim, toffset, PRF, DC, + print_progress=False + ) # Compute membrane potential vector (in mV) - Vm = y[3, :] / self.v_Capct(y[1, :]) * 1e3 # mV + Qm = y[:, 3] + Z = y[:, 1] + Vm = Qm / self.v_Capct(Z) * 1e3 # mV - # Return output variables with Vm - return (t, np.vstack([y[1:4, :], Vm, y[4:, :]]), stimstate) + # Add Vm to solution matrix + y = np.hstack(( + y[:, 1:4], + np.array([Vm]).T, + y[:, 4:] + )) + + # Return output variables + return t, y.T, stim def checkInputs(self, Fdrive, Adrive, tstim, toffset, PRF, DC, method): ''' Check validity of simulation parameters. :param Fdrive: acoustic drive frequency (Hz) :param Adrive: acoustic drive amplitude (Pa) :param tstim: duration of US stimulation (s) :param toffset: duration of the offset (s) :param PRF: pulse repetition frequency (Hz) :param DC: pulse duty cycle (-) :param method: selected integration method :return: 3-tuple with the time profile, the solution matrix and a state vector ''' BilayerSonophore.checkInputs(self, Fdrive, Adrive, 0.0, 0.0) self.neuron.checkInputs(Adrive, tstim, toffset, PRF, DC) # Check validity of simulation type if method not in ('full', 'hybrid', 'sonic'): raise ValueError('Invalid integration method: "{}"'.format(method)) def simulate(self, Fdrive, Adrive, tstim, toffset, PRF=None, DC=1.0, method='sonic'): ''' Run simulation of the system for a specific set of US stimulation parameters. :param Fdrive: acoustic drive frequency (Hz) :param Adrive: acoustic drive amplitude (Pa) :param tstim: duration of US stimulation (s) :param toffset: duration of the offset (s) :param PRF: pulse repetition frequency (Hz) :param DC: pulse duty cycle (-) :param method: selected integration method :return: 3-tuple with the time profile, the solution matrix and a state vector ''' # Check validity of stimulation parameters self.checkInputs(Fdrive, Adrive, tstim, toffset, PRF, DC, method) # Call appropriate simulation function if method == 'full': return self.runFull(Fdrive, Adrive, tstim, toffset, PRF, DC) elif method == 'sonic': return self.runSONIC(Fdrive, Adrive, tstim, toffset, PRF, DC) elif method == 'hybrid': - if DC < 1.0: - raise ValueError('Pulsed protocol incompatible with hybrid integration method') - return self.runHybrid(Fdrive, Adrive, tstim, toffset) + # if DC < 1.0: + # raise ValueError('Pulsed protocol incompatible with hybrid integration method') + return self.runHybrid(Fdrive, Adrive, tstim, toffset, PRF, DC) def isExcited(self, Adrive, Fdrive, tstim, toffset, PRF, DC, method, return_val=False): ''' Run a simulation and determine if neuron is excited. :param Adrive: acoustic amplitude (Pa) :param Fdrive: US frequency (Hz) :param tstim: duration of US stimulation (s) :param toffset: duration of the offset (s) :param PRF: pulse repetition frequency (Hz) :param DC: pulse duty cycle (-) :return: boolean stating whether neuron is excited or not ''' t, y, _ = self.simulate(Fdrive, Adrive, tstim, toffset, PRF, DC, method=method) dt = t[1] - t[0] ipeaks, *_ = findPeaks(y[2, :], SPIKE_MIN_QAMP, int(np.ceil(SPIKE_MIN_DT / dt)), SPIKE_MIN_QPROM) nspikes = ipeaks.size logger.debug('A = %sPa ---> %s spike%s detected', si_format(Adrive, 2, space=' '), nspikes, "s" if nspikes > 1 else "") cond = nspikes > 0 if return_val: return {True: nspikes, False: np.nan}[cond] else: return cond def isSilenced(self, Adrive, Fdrive, tstim, toffset, PRF, DC, method, return_val=False): ''' Run a simulation and determine if neuron is silenced. :param Adrive: acoustic amplitude (Pa) :param Fdrive: US frequency (Hz) :param tstim: duration of US stimulation (s) :param PRF: pulse repetition frequency (Hz) :param DC: pulse duty cycle (-) :return: boolean stating whether neuron is silenced or not ''' if tstim <= TMIN_STABILIZATION: raise ValueError( 'stimulus duration must be greater than {:.0f} ms'.format(TMIN_STABILIZATION * 1e3)) # Simulate model without offset t, y, _ = self.simulate(Fdrive, Adrive, tstim, 0., PRF, DC, method=method) # Extract charge signal posterior to observation window Qm = y[2, t > TMIN_STABILIZATION] # Compute variation range Qm_range = np.ptp(Qm) logger.debug('A = %sPa ---> %.2f nC/cm2 variation range over the last %.0f ms', si_format(Adrive, 2, space=' '), Qm_range * 1e5, TMIN_STABILIZATION * 1e3) cond = np.ptp(Qm) < QSS_Q_DIV_THR if return_val: return {True: Qm[-1], False: np.nan}[cond] else: return cond def titrate(self, Fdrive, tstim, toffset, PRF=None, DC=1.0, Arange=None, method='sonic'): ''' Use a binary search to determine the threshold amplitude needed to obtain neural excitation for a given frequency, duration, PRF and duty cycle. :param Fdrive: US frequency (Hz) :param tstim: duration of US stimulation (s) :param toffset: duration of the offset (s) :param PRF: pulse repetition frequency (Hz) :param DC: pulse duty cycle (-) :param Arange: search interval for Adrive, iteratively refined :return: determined threshold amplitude (Pa) ''' # Determine amplitude interval if needed if Arange is None: Arange = (0, getLookups2D(self.neuron.name, a=self.a, Fdrive=Fdrive)[0].max()) # Determine output function if self.neuron.isTitratable(): xfunc = self.isExcited else: xfunc = self.isSilenced # Titrate return titrate(xfunc, (Fdrive, tstim, toffset, PRF, DC, method), Arange, TITRATION_ASTIM_DA_MAX) def run(self, Fdrive, tstim, toffset, PRF=None, DC=1.0, Adrive=None, method='sonic'): ''' Run a simulation of the full electro-mechanical system for a given neuron type with specific parameters, and return output data and metadata. :param Fdrive: US frequency (Hz) :param tstim: stimulus duration (s) :param toffset: stimulus offset (s) :param PRF: pulse repetition frequency (Hz) :param DC: stimulus duty cycle (-) :param Adrive: acoustic pressure amplitude (Pa) :param method: integration method ''' logger.info( '%s: %s @ f = %sHz, %st = %ss (%ss offset)%s', self, 'titration' if Adrive is None else 'simulation', si_format(Fdrive, 0, space=' '), 'A = {}Pa, '.format(si_format(Adrive, 2, space=' ')) if Adrive is not None else '', *si_format([tstim, toffset], 1, space=' '), (', PRF = {}Hz, DC = {:.2f}%'.format( si_format(PRF, 2, space=' '), DC * 1e2) if DC < 1.0 else '')) # If no amplitude provided, perform titration if Adrive is None: Adrive = self.titrate(Fdrive, tstim, toffset, PRF, DC, method=method) if np.isnan(Adrive): logger.error('Could not find threshold excitation amplitude') return None # Run simulation tstart = time.time() t, y, stimstate = self.simulate(Fdrive, Adrive, tstim, toffset, PRF, DC, method=method) tcomp = time.time() - tstart Z, ng, Qm, Vm, *channels = y logger.debug('completed in %ss', si_format(tcomp, 1)) # Store dataframe and metadata data = pd.DataFrame({ 't': t, 'stimstate': stimstate, 'Z': Z, 'ng': ng, 'Qm': Qm, 'Vm': Vm }) for j in range(len(self.neuron.states)): data[self.neuron.states[j]] = channels[j] meta = { 'neuron': self.neuron.name, 'a': self.a, 'd': self.d, 'Fdrive': Fdrive, 'Adrive': Adrive, 'phi': np.pi, 'tstim': tstim, 'toffset': toffset, 'PRF': PRF, 'DC': DC, 'tcomp': tcomp, 'method': method } # Log number of detected spikes self.neuron.logNSpikes(data) return data, meta def runAndSave(self, outdir, Fdrive, tstim, toffset, PRF=None, DC=1.0, Adrive=None, method='sonic'): ''' Run a simulation of the full electro-mechanical system for a given neuron type with specific parameters, and save the results in a PKL file. :param outdir: full path to output directory :param Fdrive: US frequency (Hz) :param tstim: stimulus duration (s) :param toffset: stimulus offset (s) :param PRF: pulse repetition frequency (Hz) :param DC: stimulus duty cycle (-) :param Adrive: acoustic pressure amplitude (Pa) :param method: integration method ''' data, meta = self.run(Fdrive, tstim, toffset, PRF, DC, Adrive, method) simcode = self.filecode(Fdrive, Adrive, tstim, PRF, DC, method) outpath = '{}/{}.pkl'.format(outdir, simcode) with open(outpath, 'wb') as fh: pickle.dump({'meta': meta, 'data': data}, fh) logger.debug('simulation data exported to "%s"', outpath) return outpath # def runIfNone(self, outdir, Fdrive, Adrive, tstim, toffset, PRF=None, DC=1.0, Adrive=None, # method='sonic'): # ''' Run a simulation of the full electro-mechanical system for a given neuron type # with specific parameters, and save the results in a PKL file, # only if file not present. # :param outdir: full path to output directory # :param Fdrive: US frequency (Hz) # :param tstim: stimulus duration (s) # :param toffset: stimulus offset (s) # :param PRF: pulse repetition frequency (Hz) # :param DC: stimulus duty cycle (-) # :param Adrive: acoustic pressure amplitude (Pa) # :param method: integration method # ''' # fname = self.filecode(Fdrive, Adrive, tstim, PRF, DC, method) # fpath = os.path.join(outdir, fname) # if not os.path.isfile(fpath): # logger.warning('"{}"" file not found'.format(fname)) # self.runAndSave(outdir=Fdrive, tstim, toffset, PRF, DC, Adrive, method) # return loadData(fpath) def getStabPoints(): # Simulate model without offset t, y, _ = self.simulate(Fdrive, Adrive, tstim, 0., PRF, DC, method=method) # Extract charge signal posterior to observation window Qm = y[2, t > TMIN_STABILIZATION] # Compute variation range Qm_range = np.ptp(Qm) logger.debug('A = %sPa ---> %.2f nC/cm2 variation range over the last %.0f ms', si_format(Adrive, 2, space=' '), Qm_range * 1e5, TMIN_STABILIZATION * 1e3) cond = np.ptp(Qm) < QSS_Q_DIV_THR if return_val: return {True: Qm[-1], False: np.nan}[cond] else: return cond def quasiSteadyStates(self, Fdrive, amps=None, charges=None, DCs=1.0, squeeze_output=False): ''' Compute the quasi-steady state values of the neuron's gating variables for a combination of US amplitudes, charge densities and duty cycles, at a specific US frequency. :param Fdrive: US frequency (Hz) :param amps: US amplitudes (Pa) :param charges: membrane charge densities (C/m2) :param DCs: duty cycle value(s) :return: 4-tuple with reference values of US amplitude and charge density, as well as interpolated Vmeff and QSS gating variables ''' # Get DC-averaged lookups interpolated at the appropriate amplitudes and charges amps, charges, lookups = getLookupsDCavg( self.neuron.name, self.a, Fdrive, amps, charges, DCs) # Compute QSS states using these lookups nA, nQ, nDC = lookups['V'].shape QSS = {k: np.empty((nA, nQ, nDC)) for k in self.neuron.states} for iA in range(nA): for iDC in range(nDC): QSS_1D = self.neuron.quasiSteadyStates( {k: v[iA, :, iDC] for k, v in lookups.items()}) for k in QSS.keys(): QSS[k][iA, :, iDC] = QSS_1D[k] # Compress outputs if needed if squeeze_output: QSS = {k: v.squeeze() for k, v in QSS.items()} lookups = {k: v.squeeze() for k, v in lookups.items()} # Return reference inputs and outputs return amps, charges, lookups, QSS def quasiSteadyStateiNet(self, Qm, Fdrive, Adrive, DC): ''' Compute quasi-steady state net membrane current for a given combination of US parameters and a given membrane charge density. :param Qm: membrane charge density (C/m2) :param Fdrive: US frequency (Hz) :param Adrive: US amplitude (Pa) :param DC: duty cycle (-) :return: net membrane current (mA/m2) ''' _, _, lookups, QSS = self.quasiSteadyStates( Fdrive, amps=Adrive, charges=Qm, DCs=DC, squeeze_output=True) return self.neuron.iNet(lookups['V'], np.array(list(QSS.values()))) # mA/m2 def evaluateStability(self, Qm0, states0, lkp): ''' Integrate the effective differential system from a given starting point, until clear convergence or clear divergence is found. :param Qm0: initial membrane charge density (C/m2) :param states0: dictionary of initial states values :param lkp: dictionary of 1D data points of "effective" coefficients over the charge domain, for specific frequency and amplitude values. :return: boolean indicating convergence state ''' # Initialize y0 vector t0 = 0. y0 = np.array([Qm0] + list(states0.values())) # Initializing empty list to record evolution of charge deviation n = int(QSS_HISTORY_INTERVAL // QSS_INTEGRATION_INTERVAL) # size of history dQ = [] # As long as there is no clear charge convergence or divergence conv, div = False, False tf, yf = t0, y0 while not conv and not div: # Integrate system for small interval and retrieve final charge deviation t0, y0 = tf, yf sol = solve_ivp( lambda t, y: self.effDerivatives(y, t, lkp), [t0, t0 + QSS_INTEGRATION_INTERVAL], y0, method='LSODA' ) tf, yf = sol.t[-1], sol.y[:, -1] dQ.append(yf[0] - Qm0) # logger.debug('{:.0f} ms: dQ = {:.5f} nC/cm2, avg dQ = {:.5f} nC/cm2'.format( # tf * 1e3, dQ[-1] * 1e5, np.mean(dQ[-n:]) * 1e5)) # If last charge deviation is too large -> divergence if np.abs(dQ[-1]) > QSS_Q_DIV_THR: div = True # If last charge deviation or average deviation in recent history # is small enough -> convergence for x in [dQ[-1], np.mean(dQ[-n:])]: if np.abs(x) < QSS_Q_CONV_THR: conv = True # If max integration duration is been reached -> error if tf > QSS_MAX_INTEGRATION_DURATION: raise ValueError('too many iterations') logger.debug('{}vergence after {:.0f} ms: dQ = {:.5f} nC/cm2'.format( {True: 'con', False: 'di'}[conv], tf * 1e3, dQ[-1] * 1e5)) return conv def quasiSteadyStateFixedPoints(self, Fdrive, Adrive, DC, lkp, dQdt): ''' Compute QSS fixed points along the charge dimension for a given combination of US parameters, and determine their stability. :param Fdrive: US frequency (Hz) :param Adrive: US amplitude (Pa) :param DC: duty cycle (-) :param lkp: lookup dictionary for effective variables along charge dimension :param dQdt: charge derivative profile along charge dimension :return: 2-tuple with values of stable and unstable fixed points ''' logger.debug('A = {:.2f} kPa, DC = {:.0f}%'.format(Adrive * 1e-3, DC * 1e2)) # Extract stable and unstable fixed points from QSS charge variation profile dfunc = lambda Qm: - self.quasiSteadyStateiNet(Qm, Fdrive, Adrive, DC) SFP_candidates = getFixedPoints(lkp['Q'], dQdt, filter='stable', der_func=dfunc).tolist() UFPs = getFixedPoints(lkp['Q'], dQdt, filter='unstable', der_func=dfunc).tolist() SFPs = [] pltvars = self.getPltVars() # For each candidate SFP for i, Qm in enumerate(SFP_candidates): logger.debug('Q-SFP = {:.2f} nC/cm2'.format(Qm * 1e5)) # Re-compute QSS *_, QSS_FP = self.quasiSteadyStates(Fdrive, amps=Adrive, charges=Qm, DCs=DC, squeeze_output=True) # Simulate from unperturbed QSS and evaluate stability if not self.evaluateStability(Qm, QSS_FP, lkp): logger.warning('diverging system at ({:.2f} kPa, {:.2f} nC/cm2)'.format( Adrive * 1e-3, Qm * 1e5)) UFPs.append(Qm) else: # For each state unstable_states = [] for x in self.neuron.states: pltvar = pltvars[x] unit_str = pltvar.get('unit', '') factor = pltvar.get('factor', 1) is_stable_direction = [] for sign in [-1, +1]: # Perturb state with small offset QSS_perturbed = deepcopy(QSS_FP) QSS_perturbed[x] *= (1 + sign * QSS_REL_OFFSET) # If gating state, bound within [0., 1.] if self.neuron.isVoltageGated(x): QSS_perturbed[x] = np.clip(QSS_perturbed[x], 0., 1.) logger.debug('{}: {:.5f} -> {:.5f} {}'.format( x, QSS_FP[x] * factor, QSS_perturbed[x] * factor, unit_str)) # Simulate from perturbed QSS and evaluate stability is_stable_direction.append( self.evaluateStability(Qm, QSS_perturbed, lkp)) # Check if system shows stability upon x-state perturbation # in both directions if not np.all(is_stable_direction): unstable_states.append(x) # Classify fixed point as stable only if all states show stability is_stable_FP = len(unstable_states) == 0 {True: SFPs, False: UFPs}[is_stable_FP].append(Qm) logger.info('{}stable fixed-point at ({:.2f} kPa, {:.2f} nC/cm2){}'.format( '' if is_stable_FP else 'un', Adrive * 1e-3, Qm * 1e5, '' if is_stable_FP else ', caused by {} states'.format(unstable_states))) return SFPs, UFPs def findRheobaseAmps(self, DCs, Fdrive, Vthr): ''' Find the rheobase amplitudes (i.e. threshold acoustic amplitudes of infinite duration that would result in excitation) of a specific neuron for various duty cycles. :param DCs: duty cycles vector (-) :param Fdrive: acoustic drive frequency (Hz) :param Vthr: threshold membrane potential above which the neuron necessarily fires (mV) :return: rheobase amplitudes vector (Pa) ''' # Get threshold charge from neuron's spike threshold parameter Qthr = self.neuron.Cm0 * Vthr * 1e-3 # C/m2 # Get QSS variables for each amplitude at threshold charge Aref, _, Vmeff, QS_states = self.quasiSteadyStates(Fdrive, charges=Qthr, DCs=DCs) if DCs.size == 1: QS_states = QS_states.reshape((*QS_states.shape, 1)) Vmeff = Vmeff.reshape((*Vmeff.shape, 1)) # Compute 2D QSS charge variation array at Qthr dQdt = -self.neuron.iNet(Vmeff, QS_states) # Find the threshold amplitude that cancels dQdt for each duty cycle Arheobase = np.array([np.interp(0, dQdt[:, i], Aref, left=0., right=np.nan) for i in range(DCs.size)]) # Check if threshold amplitude is found for all DCs inan = np.where(np.isnan(Arheobase))[0] if inan.size > 0: if inan.size == Arheobase.size: logger.error( 'No rheobase amplitudes within [%s - %sPa] for the provided duty cycles', *si_format((Aref.min(), Aref.max()))) else: minDC = DCs[inan.max() + 1] logger.warning( 'No rheobase amplitudes within [%s - %sPa] below %.1f%% duty cycle', *si_format((Aref.min(), Aref.max())), minDC * 1e2) return Arheobase, Aref def computeEffVars(self, Fdrive, Adrive, Qm, fs): ''' Compute "effective" coefficients of the HH system for a specific combination of stimulus frequency, stimulus amplitude and charge density. A short mechanical simulation is run while imposing the specific charge density, until periodic stabilization. The HH coefficients are then averaged over the last acoustic cycle to yield "effective" coefficients. :param Fdrive: acoustic drive frequency (Hz) :param Adrive: acoustic drive amplitude (Pa) :param Qm: imposed charge density (C/m2) :param fs: list of sonophore membrane coverage fractions :return: list with computation time and a list of dictionaries of effective variables ''' tstart = time.time() # Run simulation and retrieve deflection and gas content vectors from last cycle _, [Z, ng], _ = BilayerSonophore.simulate(self, Fdrive, Adrive, Qm) Z_last = Z[-NPC_FULL:] # m Cm_last = self.v_Capct(Z_last) # F/m2 # For each coverage fraction effvars = [] for x in fs: # Compute membrane capacitance and membrane potential vectors Cm = x * Cm_last + (1 - x) * self.Cm0 # F/m2 Vm = Qm / Cm * 1e3 # mV # Compute average cycle value for membrane potential and rate constants effvars.append({'V': np.mean(Vm)}) effvars[-1].update(self.neuron.computeEffRates(Vm)) tcomp = time.time() - tstart # Log process log = '{}: lookups @ {}Hz, {}Pa, {:.2f} nC/cm2'.format( self, *si_format([Fdrive, Adrive], precision=1, space=' '), Qm * 1e5) if len(fs) > 1: log += ', fs = {:.0f} - {:.0f}%'.format(fs.min() * 1e2, fs.max() * 1e2) log += ', tcomp = {:.3f} s'.format(tcomp) logger.info(log) # Return effective coefficients return [tcomp, effvars] diff --git a/PySONIC/core/pneuron.py b/PySONIC/core/pneuron.py index 90c92a6..1419cac 100644 --- a/PySONIC/core/pneuron.py +++ b/PySONIC/core/pneuron.py @@ -1,600 +1,600 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Date: 2017-08-03 11:53:04 # @Email: theo.lemaire@epfl.ch # @Last Modified by: Theo Lemaire -# @Last Modified time: 2019-05-29 09:59:17 +# @Last Modified time: 2019-05-29 13:31:11 import time import pickle import abc import inspect import re import numpy as np from scipy.integrate import odeint import pandas as pd from ..postpro import findPeaks from ..constants import * from ..utils import si_format, logger, titrate from .simulators import PWSimulator class PointNeuron(metaclass=abc.ABCMeta): ''' Abstract class defining the common API (i.e. mandatory attributes and methods) of all subclasses implementing the channels mechanisms of specific point neurons. ''' tscale = 'ms' # relevant temporal scale of the model defvar = 'V' # default plot variable def __repr__(self): return self.__class__.__name__ def pprint(self): return '{} neuron'.format(self.__class__.__name__) def filecode(self, Astim, tstim, PRF, DC): ''' File naming convention. ''' return 'ESTIM_{}_{}_{:.1f}mA_per_m2_{:.0f}ms{}'.format( self.name, 'CW' if DC == 1 else 'PW', Astim, tstim * 1e3, '_PRF{:.2f}Hz_DC{:.2f}%'.format(PRF, DC * 1e2) if DC < 1. else '') @property @abc.abstractmethod def name(self): return 'Should never reach here' @property @abc.abstractmethod def Cm0(self): return 'Should never reach here' @property @abc.abstractmethod def Vm0(self): return 'Should never reach here' @abc.abstractmethod def currents(self, Vm, states): ''' Compute all ionic currents per unit area. :param Vm: membrane potential (mV) :states: state probabilities of the ion channels :return: dictionary of ionic currents per unit area (mA/m2) ''' def iNet(self, Vm, states): ''' net membrane current :param Vm: membrane potential (mV) :states: states of ion channels gating and related variables :return: current per unit area (mA/m2) ''' return sum(self.currents(Vm, states).values()) def dQdt(self, Vm, states): ''' membrane charge density variation rate :param Vm: membrane potential (mV) :states: states of ion channels gating and related variables :return: variation rate (mA/m2) ''' return -self.iNet(Vm, states) def isTitratable(self): ''' Simple method returning whether the neuron can be titrated (defaults to True). ''' return True def currentToConcentrationRate(self, z_ion, depth): ''' Compute the conversion factor from a specific ionic current (in mA/m2) into a variation rate of submembrane ion concentration (in M/s). :param: z_ion: ion valence :param depth: submembrane depth (m) :return: conversion factor (Mmol.m-1.C-1) ''' return 1e-6 / (z_ion * depth * FARADAY) def nernst(self, z_ion, Cion_in, Cion_out, T): ''' Nernst potential of a specific ion given its intra and extracellular concentrations. :param z_ion: ion valence :param Cion_in: intracellular ion concentration :param Cion_out: extracellular ion concentration :param T: temperature (K) :return: ion Nernst potential (mV) ''' return (Rg * T) / (z_ion * FARADAY) * np.log(Cion_out / Cion_in) * 1e3 def vtrap(self, x, y): ''' Generic function used to compute rate constants. ''' return x / (np.exp(x / y) - 1) def efun(self, x): ''' Generic function used to compute rate constants. ''' return x / (np.exp(x) - 1) def ghkDrive(self, Vm, Z_ion, Cion_in, Cion_out, T): ''' Use the Goldman-Hodgkin-Katz equation to compute the electrochemical driving force of a specific ion species for a given membrane potential. :param Vm: membrane potential (mV) :param Cin: intracellular ion concentration (M) :param Cout: extracellular ion concentration (M) :param T: temperature (K) :return: electrochemical driving force of a single ion particle (mC.m-3) ''' x = Z_ion * FARADAY * Vm / (Rg * T) * 1e-3 # [-] eCin = Cion_in * self.efun(-x) # M eCout = Cion_out * self.efun(x) # M return FARADAY * (eCin - eCout) * 1e6 # mC/m3 def getDesc(self): return inspect.getdoc(self).splitlines()[0] def getCurrentsNames(self): return list(self.currents(np.nan, [np.nan] * len(self.states)).keys()) def getPltScheme(self): pltscheme = { 'Q_m': ['Qm'], 'V_m': ['Vm'] } pltscheme['I'] = self.getCurrentsNames() + ['iNet'] for cname in self.getCurrentsNames(): if 'Leak' not in cname: key = 'i_{{{}}}\ kin.'.format(cname[1:]) cargs = inspect.getargspec(getattr(self, cname))[0][1:] pltscheme[key] = [var for var in cargs if var not in ['Vm', 'Cai']] return pltscheme def getPltVars(self, wrapleft='df["', wrapright='"]'): ''' Return a dictionary with information about all plot variables related to the neuron. ''' pltvars = { 'Qm': { 'desc': 'membrane charge density', 'label': 'Q_m', 'unit': 'nC/cm^2', 'factor': 1e5, 'bounds': (-100, 50) }, 'Vm': { 'desc': 'membrane potential', 'label': 'V_m', 'unit': 'mV', 'y0': self.Vm0, 'bounds': (-150, 70) }, 'ELeak': { 'constant': 'obj.ELeak', 'desc': 'non-specific leakage current resting potential', 'label': 'V_{leak}', 'unit': 'mV', 'ls': '--', 'color': 'k' } } for cname in self.getCurrentsNames(): cfunc = getattr(self, cname) cargs = inspect.getargspec(cfunc)[0][1:] pltvars[cname] = { 'desc': inspect.getdoc(cfunc).splitlines()[0], 'label': 'I_{{{}}}'.format(cname[1:]), 'unit': 'A/m^2', 'factor': 1e-3, 'func': '{}({})'.format(cname, ', '.join(['{}{}{}'.format(wrapleft, a, wrapright) for a in cargs])) } for var in cargs: if var not in ['Vm', 'Cai']: vfunc = getattr(self, 'der{}{}'.format(var[0].upper(), var[1:])) desc = cname + re.sub('^Evolution of', '', inspect.getdoc(vfunc).splitlines()[0]) pltvars[var] = { 'desc': desc, 'label': var, 'bounds': (-0.1, 1.1) } pltvars['iNet'] = { 'desc': inspect.getdoc(getattr(self, 'iNet')).splitlines()[0], 'label': 'I_{net}', 'unit': 'A/m^2', 'factor': 1e-3, 'func': 'iNet({0}Vm{1}, {2}{3}{4}.values.T)'.format( wrapleft, wrapright, wrapleft[:-1], self.states, wrapright[1:]), 'ls': '--', 'color': 'black' } pltvars['dQdt'] = { 'desc': inspect.getdoc(getattr(self, 'dQdt')).splitlines()[0], 'label': 'dQ_m/dt', 'unit': 'A/m^2', 'factor': 1e-3, 'func': 'dQdt({0}Vm{1}, {2}{3}{4}.values.T)'.format( wrapleft, wrapright, wrapleft[:-1], self.states, wrapright[1:]), 'ls': '--', 'color': 'black' } for x in self.getGates(): for rate in ['alpha', 'beta']: pltvars['{}{}'.format(rate, x)] = { 'label': '\\{}_{{{}}}'.format(rate, x), 'unit': 'ms^{-1}', 'factor': 1e-3 } return pltvars def getRatesNames(self, states): return list(sum( [['alpha{}'.format(x.lower()), 'beta{}'.format(x.lower())] for x in states], [] )) def Qm0(self): ''' Return the resting charge density (in C/m2). ''' return self.Cm0 * self.Vm0 * 1e-3 # C/cm2 @abc.abstractmethod def steadyStates(self, Vm): ''' Compute the steady-state values for a specific membrane potential value. :param Vm: membrane potential (mV) :return: dictionary of steady-states ''' @abc.abstractmethod def derStates(self, Vm, states): ''' Compute the derivatives of channel states. :param Vm: membrane potential (mV) :states: state probabilities of the ion channels :return: current per unit area (mA/m2) ''' @abc.abstractmethod def computeEffRates(self, Vm): ''' Get the effective rate constants of ion channels, averaged along an acoustic cycle, for future use in effective simulations. :param Vm: array of membrane potential values for an acoustic cycle (mV) :return: a dictionary of rate average constants (s-1) ''' def interpEffRates(self, Qm, lkp, keys=None): ''' Interpolate effective rate constants for a given charge density using reference lookup vectors. :param Qm: membrane charge density (C/m2) :states: state probabilities of the ion channels :param lkp: dictionary of 1D vectors of "effective" coefficients over the charge domain, for specific frequency and amplitude values. :return: dictionary of interpolated rate constants ''' if keys is None: keys = self.rates return {k: np.interp(Qm, lkp['Q'], lkp[k], left=np.nan, right=np.nan) for k in keys} def interpVmeff(self, Qm, lkp): ''' Interpolate the effective membrane potential for a given charge density using reference lookup vectors. :param Qm: membrane charge density (C/m2) :param lkp: dictionary of 1D vectors of "effective" coefficients over the charge domain, for specific frequency and amplitude values. :return: dictionary of interpolated rate constants ''' return np.interp(Qm, lkp['Q'], lkp['V'], left=np.nan, right=np.nan) @abc.abstractmethod def derEffStates(self, Qm, states, lkp): ''' Compute the effective derivatives of channel states, based on 1-dimensional linear interpolation of "effective" coefficients that summarize the system's behaviour over an acoustic cycle. :param Qm: membrane charge density (C/m2) :states: state probabilities of the ion channels :param lkp: dictionary of 1D vectors of "effective" coefficients over the charge domain, for specific frequency and amplitude values. ''' def Qbounds(self): ''' Determine bounds of membrane charge physiological range for a given neuron. ''' return np.array([np.round(self.Vm0 - 25.0), 50.0]) * self.Cm0 * 1e-3 # C/m2 def isVoltageGated(self, state): ''' Determine whether a given state is purely voltage-gated or not.''' return 'alpha{}'.format(state.lower()) in self.rates def getGates(self): ''' Retrieve the names of the neuron's states that match an ion channel gating. ''' gates = [] for x in self.states: if self.isVoltageGated(x): gates.append(x) return gates def qsStates(self, lkp, states): ''' Compute a collection of quasi steady states using the standard xinf = ax / (ax + Bx) equation. :param lkp: dictionary of 1D vectors of "effective" coefficients over the charge domain, for specific frequency and amplitude values. :return: dictionary of quasi-steady states ''' return { x: lkp['alpha{}'.format(x)] / (lkp['alpha{}'.format(x)] + lkp['beta{}'.format(x)]) for x in states } @abc.abstractmethod def quasiSteadyStates(self, lkp): ''' Compute the quasi-steady states of a neuron for a range of membrane charge densities, based on 1-dimensional lookups interpolated at a given sonophore diameter, US frequency, US amplitude and duty cycle. :param lkp: dictionary of 1D vectors of "effective" coefficients over the charge domain, for specific frequency and amplitude values. :return: dictionary of quasi-steady states ''' def getRates(self, Vm): ''' Compute the ion channels rate constants for a given membrane potential. :param Vm: membrane potential (mV) :return: a dictionary of rate constants and their values at the given potential. ''' rates = {} for x in self.getGates(): x = x.lower() alpha_str, beta_str = ['{}{}'.format(s, x.lower()) for s in ['alpha', 'beta']] inf_str, tau_str = ['{}inf'.format(x.lower()), 'tau{}'.format(x.lower())] if hasattr(self, 'alpha{}'.format(x)): alphax = getattr(self, alpha_str)(Vm) betax = getattr(self, beta_str)(Vm) elif hasattr(self, '{}inf'.format(x)): xinf = getattr(self, inf_str)(Vm) taux = getattr(self, tau_str)(Vm) alphax = xinf / taux betax = 1 / taux - alphax rates[alpha_str] = alphax rates[beta_str] = betax return rates def Vderivatives(self, y, t, Iinj): ''' Compute the derivatives of a V-cast HH system for a specific value of injected current. :param y: vector of HH system variables at time t :param t: time value (s, unused) :param Iinj: injected current (mA/m2) :return: vector of HH system derivatives at time t ''' Vm, *states = y Iionic = self.iNet(Vm, states) # mA/m2 dVmdt = (- Iionic + Iinj) / self.Cm0 # mV/s dstates = self.derStates(Vm, states) return [dVmdt, *[dstates[k] for k in self.states]] def Qderivatives(self, y, t, Cm=None): ''' Compute the derivatives of the n-ODE HH system variables, based on a value of membrane capacitance. :param y: vector of HH system variables at time t :param t: specific instant in time (s) :param Cm: membrane capacitance (F/m2) :return: vector of HH system derivatives at time t ''' if Cm is None: Cm = self.Cm0 Qm, *states = y Vm = Qm / Cm * 1e3 # mV dQmdt = - self.iNet(Vm, states) * 1e-3 # A/m2 dstates = self.derStates(Vm, states) return [dQmdt, *[dstates[k] for k in self.states]] def checkInputs(self, Astim, tstim, toffset, PRF, DC): ''' Check validity of electrical stimulation parameters. :param Astim: pulse amplitude (mA/m2) :param tstim: pulse duration (s) :param toffset: offset duration (s) :param PRF: pulse repetition frequency (Hz) :param DC: pulse duty cycle (-) ''' # Check validity of stimulation parameters if not all(isinstance(param, float) for param in [Astim, tstim, toffset, DC]): raise TypeError('Invalid stimulation parameters (must be float typed)') if tstim <= 0: raise ValueError('Invalid stimulus duration: {} ms (must be strictly positive)' .format(tstim * 1e3)) if toffset < 0: raise ValueError('Invalid stimulus offset: {} ms (must be positive or null)' .format(toffset * 1e3)) if DC <= 0.0 or DC > 1.0: raise ValueError('Invalid duty cycle: {} (must be within ]0; 1])'.format(DC)) if DC < 1.0: if not isinstance(PRF, float): raise TypeError('Invalid PRF value (must be float typed)') if PRF is None: raise AttributeError('Missing PRF value (must be provided when DC < 1)') if PRF < 1 / tstim: raise ValueError('Invalid PRF: {} Hz (PR interval exceeds stimulus duration)' .format(PRF)) def simulate(self, Astim, tstim, toffset, PRF=None, DC=1.0, dt=DT_ESTIM): ''' Compute solutions of a neuron's HH system for a specific set of electrical stimulation parameters, using a classic integration scheme. :param Astim: pulse amplitude (mA/m2) :param tstim: pulse duration (s) :param toffset: offset duration (s) :param PRF: pulse repetition frequency (Hz) :param DC: pulse duty cycle (-) :param dt: integration time step (s) :return: 3-tuple with the time profile and solution matrix and a state vector ''' # Check validity of stimulation parameters self.checkInputs(Astim, tstim, toffset, PRF, DC) # Set initial conditions steady_states = self.steadyStates(self.Vm0) y0 = np.array([self.Vm0, *[steady_states[k] for k in self.states]]) # Initialize simulator and compute solution logger.debug('Computing solution') simulator = PWSimulator( lambda y, t: self.Vderivatives(y, t, Astim), lambda y, t: self.Vderivatives(y, t, 0.) ) t, y, stim = simulator.compute(y0, dt, tstim, toffset, PRF, DC) # Return output variables return t, y.T, stim def isExcited(self, Astim, tstim, toffset, PRF, DC): ''' Run a simulation and determine if neuron is excited. :param Astim: current amplitude (mA/m2) :param tstim: duration of US stimulation (s) :param toffset: duration of the offset (s) :param PRF: pulse repetition frequency (Hz) :param DC: pulse duty cycle (-) :return: boolean stating whether neuron is excited or not ''' t, y, _ = self.simulate(Astim, tstim, toffset, PRF, DC) dt = t[1] - t[0] ipeaks, *_ = findPeaks(y[0, :], SPIKE_MIN_VAMP, int(np.ceil(SPIKE_MIN_DT / dt)), SPIKE_MIN_VPROM) nspikes = ipeaks.size logger.debug('A = %sA/m2 ---> %s spike%s detected', si_format(Astim * 1e-3, 2, space=' '), nspikes, "s" if nspikes > 1 else "") return nspikes > 0 def titrate(self, tstim, toffset, PRF=None, DC=1.0, Arange=(0., 2 * TITRATION_ESTIM_A_MAX), xfunc=None): ''' Use a binary search to determine the threshold amplitude needed to obtain neural excitation for a given duration, PRF and duty cycle. :param tstim: duration of US stimulation (s) :param toffset: duration of the offset (s) :param PRF: pulse repetition frequency (Hz) :param DC: pulse duty cycle (-) :param Arange: search interval for Astim, iteratively refined :return: excitation threshold amplitude (mA/m2) ''' # Determine output function if xfunc is None: xfunc = self.isExcited return titrate(xfunc, (tstim, toffset, PRF, DC), Arange, TITRATION_ESTIM_DA_MAX) def logNSpikes(self, data): ''' Detect spikes on Qm signal. ''' dt = np.diff(data.ix[:1, 't'].values)[0] ipeaks, *_ = findPeaks( data['Qm'].values, SPIKE_MIN_QAMP, int(np.ceil(SPIKE_MIN_DT / dt)), SPIKE_MIN_QPROM ) nspikes = ipeaks.size logger.debug('{} spike{} detected'.format(nspikes, 's' if nspikes > 1 else '')) def run(self, tstim, toffset, PRF=None, DC=1.0, Astim=None): ''' Run a simulation of the point-neuron Hodgkin-Huxley system with specific parameters, and return output data and metadata. :param tstim: stimulus duration (s) :param toffset: stimulus offset (s) :param PRF: pulse repetition frequency (Hz) :param DC: stimulus duty cycle (-) :param Astim: stimulus amplitude (mA/m2) ''' logger.info( '%s: %s @ %st = %ss (%ss offset)%s', self, 'titration' if Astim is None else 'simulation', 'A = {}A/m2, '.format(si_format(Astim, 2, space=' ')) if Astim is not None else '', *si_format([tstim, toffset], 1, space=' '), (', PRF = {}Hz, DC = {:.2f}%'.format( si_format(PRF, 2, space=' '), DC * 1e2) if DC < 1.0 else '')) if Astim is None: Astim = self.titrate(tstim, toffset, PRF, DC) if np.isnan(Astim): logger.error('Could not find threshold excitation amplitude') return None # Run simulation tstart = time.time() t, y, stimstate = self.simulate(Astim, tstim, toffset, PRF, DC) Vm, *channels = y tcomp = time.time() - tstart logger.debug('completed in %ss', si_format(tcomp, 1)) # Store dataframe and metadata data = pd.DataFrame({ 't': t, 'stimstate': stimstate, 'Vm': Vm, 'Qm': Vm * self.Cm0 * 1e-3 }) for j in range(len(self.states)): data[self.states[j]] = channels[j] meta = { 'neuron': self.name, 'Astim': Astim, 'tstim': tstim, 'toffset': toffset, 'PRF': PRF, 'DC': DC, 'tcomp': tcomp } # Log number of detected spikes self.logNSpikes(data) return data, meta def runAndSave(self, outdir, tstim, toffset, PRF=None, DC=1.0, Astim=None): ''' Run a simulation of the point-neuron Hodgkin-Huxley system with specific parameters, and save the results in a PKL file. :param outdir: full path to output directory :param tstim: stimulus duration (s) :param toffset: stimulus offset (s) :param PRF: pulse repetition frequency (Hz) :param DC: stimulus duty cycle (-) :param Astim: stimulus amplitude (mA/m2) ''' data, meta = self.run(tstim, toffset, PRF, DC, Astim) simcode = self.filecode(Astim, tstim, PRF, DC) outpath = '{}/{}.pkl'.format(outdir, simcode) with open(outpath, 'wb') as fh: pickle.dump({'meta': meta, 'data': data}, fh) logger.debug('simulation data exported to "%s"', outpath) return outpath diff --git a/PySONIC/core/simulators.py b/PySONIC/core/simulators.py index f725282..f2873e4 100644 --- a/PySONIC/core/simulators.py +++ b/PySONIC/core/simulators.py @@ -1,250 +1,351 @@ # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Date: 2019-05-28 14:45:12 # @Last Modified by: Theo Lemaire -# @Last Modified time: 2019-05-29 09:47:07 +# @Last Modified time: 2019-05-29 16:13:05 import numpy as np -from scipy.integrate import odeint +from scipy.integrate import ode, odeint import progressbar as pb from ..utils import * -from ..constants import MIN_SAMPLES_PER_PULSE_INT, MAX_RMSE_PTP_RATIO, NCYCLES_MAX +from ..constants import * class Simulator: ''' Generic interface to simulator object. ''' def __init__(self): pass + def resample(self, t, y, stim, target_dt): + ''' Resample a solution to a new target time step. + + :param t: time vector + :param y: solution matrix + :param stim: stimulation state vector + :target_dt: target time step after resampling + :return: 3-tuple with the resampled time vector, solution matrix and state vector + ''' + dt = t[1] - t[0] + rf = int(np.round(target_dt / dt)) + assert rf >= 1, 'Hyper-sampling not supported' + logger.debug( + 'Downsampling output arrays by factor %u (Fs = %sHz)', + rf, si_format(1 / (dt * rf), 2) + ) + t = t[::rf] + y = y[::rf, :] + stim = stim[::rf] + return t, y, stim + def integrate(self, t, y, stim, tnew, func, is_on): ''' Integrate system for a time interval and append to preceding solution arrays. :param t: preceding time vector :param y: preceding solution matrix :param stim: preceding stimulation state vector :param tnew: integration time vector for current interval :param func: derivative function for current interval :param is_on: stimulation state for current interval :return: 3-tuple with the appended time vector, solution matrix and state vector ''' if tnew.size > 0: ynew = odeint(func, y[-1], tnew) t = np.concatenate((t, tnew[1:])) y = np.concatenate((y, ynew[1:]), axis=0) stim = np.concatenate((stim, np.ones(tnew.size - 1) * is_on)) return t, y, stim def compute(self): pass class PeriodicSimulator(Simulator): def __init__(self, derf, ivars_to_check=None): ''' Initialize simulator with specific derivative function :param derf: derivative function :param ivars_to_check: solution indexes of variables to check for stability ''' self.derf = derf self.ivars_to_check = ivars_to_check - def getTimeVector(self, dt, f): + def getNPerCycle(self, dt, f): + ''' Compute number of samples per cycle given a time step and a specific periodicity. + + :param dt: integration time step (s) + :param f: periodic frequency (Hz) + :return: number of samples per cycle + ''' + return int(np.round(1 / (f * dt))) + 1 + + def getTimeReference(self, dt, f): ''' Compute reference integration time vector for a specific periodicity. :param dt: integration time step (s) :param f: periodic frequency (Hz) :return: time vector for 1 periodic cycle ''' - return np.linspace(0, 1 / f, int(np.round(1 / (f * dt))) + 1) + return np.linspace(0, 1 / f, self.getNPerCycle(dt, f)) def isPeriodicStable(self, y, ncycle, icycle): ''' Assess the periodic stabilization of a solution. :param y: solution matrix :param ncycle: number of samples per cycle :param icycle: index of cycle of interest :return: boolean stating whether the solution is stable or not ''' y_target = y[icycle * ncycle: (icycle + 1) * ncycle, :] y_prec = y[(icycle - 1) * ncycle: icycle * ncycle, :] x_ratios = [] for ivar in self.ivars_to_check: x_target, x_prec = y_target[:, ivar], y_prec[:, ivar] x_ptp = np.ptp(x_target) x_rmse = rmse(x_target, x_prec) x_ratios.append(x_rmse / x_ptp) is_periodically_stable = np.all(np.array(x_ratios) < MAX_RMSE_PTP_RATIO) logger.debug( 'step %u: ratios = [%s] -> %sstable', icycle, ', '.join(['{:.2e}'.format(r) for r in x_ratios]), '' if is_periodically_stable else 'un' ) return is_periodically_stable def compute(self, y0, dt, f): ''' Simulate system with a specific periodicity until stabilization. :param y0: 1D vector of initial conditions :param dt: integration time step (s) :param f: periodic frequency (Hz) :return: 3-tuple with the time profile, the effective solution matrix and a state vector ''' # Adjust variables to check if needed if self.ivars_to_check is None: self.ivars_to_check = range(y0.size) # Get reference time vector - tref = self.getTimeVector(dt, f) + tref = self.getTimeReference(dt, f) # Initialize global arrays t = np.array([0.0]) y = np.atleast_2d(y0) stim = np.array([1]) # Integrate system for a few cycles until stabilization icycle = 0 conv = False while not conv and icycle < NCYCLES_MAX: t, y, stim = self.integrate(t, y, stim, tref + icycle / f, self.derf, True) if icycle > 0: conv = self.isPeriodicStable(y, tref.size - 1, icycle) icycle += 1 # Log stopping criterion if icycle == NCYCLES_MAX: logger.warning('No convergence: stopping after %u cycles', icycle) else: logger.debug('Periodic convergence after %u cycles', icycle) # Return output variables return t, y, stim class PWSimulator(Simulator): def __init__(self, derf_on, derf_off): ''' Initialize simulator with specific derivative functions :param derf_on: derivative function for ON periods :param derf_off: derivative function for OFF periods ''' self.derf_on = derf_on self.derf_off = derf_off - def getTimeVectors(self, dt, tstim, toffset, PRF, DC): + def getTimeReference(self, dt, tstim, toffset, PRF, DC): ''' Compute reference integration time vectors for a specific stimulus application pattern. :param dt: integration time step (s) :param tstim: duration of US stimulation (s) :param toffset: duration of the offset (s) :param PRF: pulse repetition frequency (Hz) :param DC: pulse duty cycle (-) :return: 3-tuple with time vectors for stimulus ON and OFF periods and stimulus offset ''' # Compute vector sizes T_ON = DC / PRF T_OFF = (1 - DC) / PRF # For high-PRF pulsed protocols: adapt time step to ensure minimal # number of samples during TON or TOFF dt_warning_msg = 'high-PRF protocol: lowering time step to %.2e s to properly integrate %s' for key, T in {'TON': T_ON, 'TOFF': T_OFF}.items(): if T > 0 and T / dt < MIN_SAMPLES_PER_PULSE_INT: dt = T / MIN_SAMPLES_PER_PULSE_INT logger.warning(dt_warning_msg, dt, key) # Initializing accurate time vectors pulse ON and OFF periods, as well as offset t_on = np.linspace(0, T_ON, int(np.round(T_ON / dt)) + 1) t_off = np.linspace(T_ON, 1 / PRF, int(np.round(T_OFF / dt))) t_offset = np.linspace(tstim, tstim + toffset, int(np.round(toffset / dt))) return t_on, t_off, t_offset - def resample(self, t, y, stim, target_dt): - ''' Resample a solution to a new target time step. - - :param t: time vector - :param y: solution matrix - :param stim: stimulation state vector - :target_dt: target time step after resampling - :return: 3-tuple with the resampled time vector, solution matrix and state vector + def adjustPRF(self, tstim, PRF, DC, print_progress): + ''' Adjust the PRF in case of continuous wave stimulus, in order to obtain the desired + number of integration interval(s) during stimulus. ''' - dt = t[1] - t[0] - rf = int(np.round(target_dt / dt)) - assert rf >= 1, 'Hyper-sampling not supported' - logger.info( - 'Downsampling output arrays by factor %u (Fs = %sHz)', - rf, si_format(1 / (dt * rf), 2) - ) - t = t[::rf] - y = y[::rf, :] - stim = stim[::rf] - return t, y, stim + if DC < 1.0: + return PRF + else: + return {True: 100., False: 1.}[print_progress] / tstim + + def getNPulses(self, tstim, PRF): + ''' Calculate number of pulses from stimulus temporal pattern. ''' + return int(np.round(tstim * PRF)) def compute(self, y0, dt, tstim, toffset, PRF, DC, target_dt=None, print_progress=False): ''' Simulate system for a specific stimulus application pattern. :param y0: 1D vector of initial conditions :param dt: integration time step (s) :param tstim: duration of US stimulation (s) :param toffset: duration of the offset (s) :param PRF: pulse repetition frequency (Hz) :param DC: pulse duty cycle (-) :target_dt: target time step after resampling :return: 3-tuple with the time profile, the effective solution matrix and a state vector ''' - # If CW stimulus: change PRF to have exactly one integration interval during stimulus - if DC == 1.0: - if not print_progress: - PRF = 1 / tstim - else: - PRF = 100 / tstim - npulses = int(np.round(tstim * PRF)) + # Adjust PRF and get number of pulses + PRF = self.adjustPRF(tstim, PRF, DC, print_progress) + npulses = self.getNPulses(tstim, PRF) # Get reference time vectors - t_on, t_off, t_offset = self.getTimeVectors(dt, tstim, toffset, PRF, DC) + t_on, t_off, t_offset = self.getTimeReference(dt, tstim, toffset, PRF, DC) # Initialize global arrays t = np.array([0.0]) y = np.atleast_2d(y0) stim = np.array([1]) # Initialize progress bar if print_progress: widgets = ['Running: ', pb.Percentage(), ' ', pb.Bar(), ' ', pb.ETA()] pbar = pb.ProgressBar(widgets=widgets, max_value=int(npulses * (toffset + tstim) / tstim)) pbar.start() # Integrate ON and OFF intervals of each pulse for i in range(npulses): for j, (tref, func) in enumerate(zip([t_on, t_off], [self.derf_on, self.derf_off])): t, y, stim = self.integrate(t, y, stim, tref + i / PRF, func, j == 0) # Update progress bar if print_progress: pbar.update(i) # Integrate offset interval t, y, stim = self.integrate(t, y, stim, t_offset, self.derf_off, False) # Terminate progress bar if print_progress: pbar.finish() # Resample solution if specified if target_dt is not None: t, y, stim = self.resample(t, y, stim, target_dt) # Return output variables return t, y, stim + + +class HybridSimulator(PWSimulator): + + def __init__(self, derf_on, derf_off, derf_sparse, predf_sparse, + is_dense_var=None, ivars_to_check=None): + ''' Initialize simulator with specific derivative functions + + :param derf_on: derivative function for ON periods + :param derf_off: derivative function for OFF periods + ''' + PWSimulator.__init__(self, derf_on, derf_off) + self.sparse_solver = ode(derf_sparse) + self.sparse_solver.set_integrator('dop853', nsteps=SOLVER_NSTEPS, atol=1e-12) + self.predf_sparse = predf_sparse + self.is_dense_var = is_dense_var + self.is_sparse_var = np.invert(is_dense_var) + self.ivars_to_check = ivars_to_check + + def integrate(self, t, y, stim, tnew, func, is_on): + + if tnew.size > 0: + + dt_dense = tnew[1] - tnew[0] + + # Initialize periodic solver + dense_solver = PeriodicSimulator(func, self.ivars_to_check) + npc_dense = dense_solver.getNPerCycle(dt_dense, self.f) + + # Until final integration time is reached + while t[-1] < tnew[-1]: + logger.debug('t = {:.5f} ms: starting new hybrid integration'.format(t[-1] * 1e3)) + + # Integrate dense system until convergence + tdense, ydense, stimdense = dense_solver.compute(y[-1], dt_dense, self.f) + tdense += t[-1] + t = np.concatenate((t, tdense[1:])) + y = np.concatenate((y, ydense[1:]), axis=0) + stim = np.concatenate((stim, np.ones(tdense.size - 1) * is_on)) + + # Resample signals over last acoustic cycle to match sparse time step + tlast, ylast, stimlast = self.resample( + tdense[-npc_dense:], ydense[-npc_dense:], stimdense[-npc_dense:], + self.dt_sparse + ) + npc_sparse = tlast.size + + # Integrate until either the rest of the interval or max update interval is reached + t0 = tdense[-1] + tf = min(tnew[-1], tdense[0] + DT_UPDATE) + nsparse = int(np.round((tf - t0) / self.dt_sparse)) + tsparse = np.linspace(t0, tf, nsparse) + ysparse = np.empty((nsparse, y.shape[1])) + ysparse[0] = y[-1] + self.sparse_solver.set_initial_value(y[-1, self.is_sparse_var], t[-1]) + for j in range(1, tsparse.size): + self.sparse_solver.set_f_params( + self.predf_sparse(ylast[j % npc_sparse])) + self.sparse_solver.integrate(tsparse[j]) + if not self.sparse_solver.successful(): + raise ValueError( + 'integration error at t = {:.5f} ms'.format(tsparse[j] * 1e3)) + ysparse[j, self.is_dense_var] = ylast[j % npc_sparse, self.is_dense_var] + ysparse[j, self.is_sparse_var] = self.sparse_solver.y + t = np.concatenate((t, tsparse[1:])) + y = np.concatenate((y, ysparse[1:]), axis=0) + stim = np.concatenate((stim, np.ones(tsparse.size - 1) * is_on)) + + return t, y, stim + + def compute(self, y0, dt_dense, dt_sparse, f, tstim, toffset, PRF, DC, print_progress=False): + + # Set periodicity and sparse time step + self.f = f + self.dt_sparse = dt_sparse + + # Adjust dense variables + if self.is_dense_var is None: + self.is_dense_var = np.array([True] * y0.size) + + return PWSimulator.compute( + self, y0, dt_dense, tstim, toffset, PRF, DC, + target_dt=None, print_progress=print_progress) diff --git a/tests/test_basic.py b/tests/test_basic.py index 5315f9e..aa14138 100644 --- a/tests/test_basic.py +++ b/tests/test_basic.py @@ -1,252 +1,252 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Date: 2017-06-14 18:37:45 # @Email: theo.lemaire@epfl.ch # @Last Modified by: Theo Lemaire -# @Last Modified time: 2018-11-21 14:53:06 +# @Last Modified time: 2019-05-29 15:59:16 ''' Test the basic functionalities of the package. ''' import os import sys import logging import time import cProfile import pstats from argparse import ArgumentParser from PySONIC.core import BilayerSonophore, NeuronalBilayerSonophore from PySONIC.utils import logger from PySONIC.neurons import * def test_MECH(is_profiled=False): ''' Mechanical simulation. ''' logger.info('Test: running MECH simulation') # Create BLS instance a = 32e-9 # m Qm0 = -80e-5 # membrane resting charge density (C/m2) Cm0 = 1e-2 # membrane resting capacitance (F/m2) bls = BilayerSonophore(a, Cm0, Qm0) # Stimulation parameters Fdrive = 350e3 # Hz Adrive = 100e3 # Pa Qm = 50e-5 # C/m2 # Run simulation if is_profiled: pfile = 'tmp.stats' cProfile.runctx('bls.run(Fdrive, Adrive, Qm)', globals(), locals(), pfile) stats = pstats.Stats(pfile) os.remove(pfile) stats.strip_dirs() stats.sort_stats('cumulative') stats.print_stats() else: bls.simulate(Fdrive, Adrive, Qm) def test_ESTIM(is_profiled=False): ''' Electrical simulation ''' logger.info('Test: running ESTIM simulation') # Initialize neuron neuron = CorticalRS() # Stimulation parameters Astim = 10.0 # mA/m2 tstim = 100e-3 # s toffset = 50e-3 # s # Run simulation if is_profiled: pfile = 'tmp.stats' cProfile.runctx('solver.run(neuron, Astim, tstim, toffset)', globals(), locals(), pfile) stats = pstats.Stats(pfile) os.remove(pfile) stats.strip_dirs() stats.sort_stats('cumulative') stats.print_stats() else: neuron.simulate(Astim, tstim, toffset) neuron.simulate(Astim, tstim, toffset, PRF=100.0, DC=0.05) def test_ASTIM_sonic(is_profiled=False): ''' Effective acoustic simulation ''' logger.info('Test: ASTIM sonic simulation') # Default parameters a = 32e-9 # m neuron = CorticalRS() nbls = NeuronalBilayerSonophore(a, neuron) Fdrive = 500e3 # Hz Adrive = 100e3 # Pa tstim = 50e-3 # s toffset = 10e-3 # s # Run simulation if is_profiled: nbls = NeuronalBilayerSonophore(a, neuron) pfile = 'tmp.stats' cProfile.runctx("nbls.simulate(Fdrive, Adrive, tstim, toffset, method='sonic')", globals(), locals(), pfile) stats = pstats.Stats(pfile) os.remove(pfile) stats.strip_dirs() stats.sort_stats('cumulative') stats.print_stats() else: # test error 1: sonophore radius outside of lookup range try: nbls = NeuronalBilayerSonophore(100e-9, neuron) nbls.simulate(Fdrive, Adrive, tstim, toffset, method='sonic') except ValueError as err: logger.debug('Out of range radius: OK') # test error 2: frequency outside of lookups range try: nbls = NeuronalBilayerSonophore(a, neuron) nbls.simulate(10e3, Adrive, tstim, toffset, method='sonic') except ValueError as err: logger.debug('Out of range frequency: OK') # test error 3: amplitude outside of lookups range try: nbls = NeuronalBilayerSonophore(a, neuron) nbls.simulate(Fdrive, 1e6, tstim, toffset, method='sonic') except ValueError as err: logger.debug('Out of range amplitude: OK') # test: normal stimulation completion (CW and PW) nbls = NeuronalBilayerSonophore(a, neuron) nbls.simulate(Fdrive, Adrive, tstim, toffset, method='sonic') nbls.simulate(Fdrive, Adrive, tstim, toffset, PRF=100.0, DC=0.05, method='sonic') def test_ASTIM_full(is_profiled=False): ''' Classic acoustic simulation ''' logger.info('Test: running ASTIM classic simulation') # Initialize sonic neuron a = 32e-9 # m neuron = CorticalRS() nbls = NeuronalBilayerSonophore(a, neuron) # Stimulation parameters Fdrive = 500e3 # Hz Adrive = 100e3 # Pa tstim = 1e-6 # s toffset = 1e-6 # s # Run simulation if is_profiled: pfile = 'tmp.stats' cProfile.runctx("nbls.simulate(Fdrive, Adrive, tstim, toffset, method='full')", globals(), locals(), pfile) stats = pstats.Stats(pfile) os.remove(pfile) stats.strip_dirs() stats.sort_stats('cumulative') stats.print_stats() else: nbls.simulate(Fdrive, Adrive, tstim, toffset, method='full') def test_ASTIM_hybrid(is_profiled=False): ''' Hybrid acoustic simulation ''' logger.info('Test: running ASTIM hybrid simulation') # Initialize sonic neuron a = 32e-9 # m neuron = CorticalRS() nbls = NeuronalBilayerSonophore(a, neuron) # Stimulation parameters Fdrive = 350e3 # Hz Adrive = 100e3 # Pa - tstim = 1e-3 # s + tstim = 3e-3 # s toffset = 1e-3 # s # Run simulation if is_profiled: pfile = 'tmp.stats' cProfile.runctx("nbls.simulate(Fdrive, Adrive, tstim, toffset, method='hybrid')", globals(), locals(), pfile) stats = pstats.Stats(pfile) os.remove(pfile) stats.strip_dirs() stats.sort_stats('cumulative') stats.print_stats() else: nbls.simulate(Fdrive, Adrive, tstim, toffset, method='hybrid') def test_all(): t0 = time.time() test_MECH() test_ESTIM() test_ASTIM_sonic() test_ASTIM_full() test_ASTIM_hybrid() tcomp = time.time() - t0 logger.info('All tests completed in %.0f s', tcomp) def main(): # Define valid test sets valid_testsets = [ 'MECH', 'ESTIM', 'ASTIM_sonic', 'ASTIM_full', 'ASTIM_hybrid', 'all' ] # Define argument parser ap = ArgumentParser() ap.add_argument('-t', '--testset', type=str, default='all', choices=valid_testsets, help='Specific test set') ap.add_argument('-v', '--verbose', default=False, action='store_true', help='Increase verbosity') ap.add_argument('-p', '--profile', default=False, action='store_true', help='Profile test set') # Parse arguments args = ap.parse_args() if args.verbose: logger.setLevel(logging.DEBUG) else: logger.setLevel(logging.INFO) if args.profile and args.testset == 'all': logger.error('profiling can only be run on individual tests') sys.exit(2) # Run test if args.testset == 'all': test_all() else: possibles = globals().copy() possibles.update(locals()) method = possibles.get('test_{}'.format(args.testset)) method(args.profile) sys.exit(0) if __name__ == '__main__': main()