diff --git a/PySONIC.sublime-project b/PySONIC.sublime-project index 9f884ae..4c82066 100644 --- a/PySONIC.sublime-project +++ b/PySONIC.sublime-project @@ -1,53 +1,53 @@ { "build_systems": [ { "file_regex": "^[ ]*File \"(...*?)\", line ([0-9]*)", "name": "Anaconda Python Builder", "selector": "source.python", - "shell_cmd": "\"C:\\Users\\lemaire\\Anaconda3\\python.exe\" -u \"$file\"" + "shell_cmd": "\"python\" -u \"$file\"" } ], "folders": [ { "file_exclude_patterns": [ "*.sublime-workspace", "MANIFEST.in", "LICENSE", "conf.py", "index.rst", "*.gitignore", "__init__.py", "*.c", "*.sh", "*.bat", "Makefile", "*.pkl" ], "folder_exclude_patterns": [ "docs", "*.egg-info", ".ipynb_checkpoints", "_build", "_static", "_templates", "__pycache__" ], "path": "." } ], "settings": { "anaconda_linting": true, "anaconda_linting_behaviour": "always", "pep257": false, "python_interpreter": "C:\\Users\\lemaire\\Anaconda3\\python.exe", "test_command": "python -m unittest discover", "use_pylint": false, "validate_imports": true }, "translate_tabs_to_spaces": true } diff --git a/PySONIC/core/nbls.py b/PySONIC/core/nbls.py index a913683..c3c5962 100644 --- a/PySONIC/core/nbls.py +++ b/PySONIC/core/nbls.py @@ -1,649 +1,649 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Date: 2016-09-29 16:16:19 # @Email: theo.lemaire@epfl.ch # @Last Modified by: Theo Lemaire -# @Last Modified time: 2019-06-06 21:22:13 +# @Last Modified time: 2019-06-07 15:33:36 from copy import deepcopy import logging import numpy as np import pandas as pd from scipy.interpolate import interp1d from scipy.integrate import solve_ivp from .simulators import PWSimulator, HybridSimulator from .bls import BilayerSonophore from .pneuron import PointNeuron from .batches import createQueue from ..neurons import getLookups2D, getLookupsDCavg from ..utils import * from ..constants import * from ..postpro import getFixedPoints class NeuronalBilayerSonophore(BilayerSonophore): ''' This class inherits from the BilayerSonophore class and receives an PointNeuron instance at initialization, to define the electro-mechanical NICE model and its SONIC variant. ''' tscale = 'ms' # relevant temporal scale of the model simkey = 'ASTIM' # keyword used to characterize simulations made with this model def __init__(self, a, neuron, Fdrive=None, embedding_depth=0.0): ''' Constructor of the class. :param a: in-plane radius of the sonophore structure within the membrane (m) :param neuron: neuron object :param Fdrive: frequency of acoustic perturbation (Hz) :param embedding_depth: depth of the embedding tissue around the membrane (m) ''' # Check validity of input parameters if not isinstance(neuron, PointNeuron): raise ValueError('Invalid neuron type: "{}" (must inherit from PointNeuron class)' .format(neuron.name)) self.neuron = neuron # Initialize BilayerSonophore parent object BilayerSonophore.__init__(self, a, neuron.Cm0, neuron.Cm0 * neuron.Vm0 * 1e-3, embedding_depth) def __repr__(self): s = '{}({:.1f} nm, {}'.format(self.__class__.__name__, self.a * 1e9, self.neuron) if self.d > 0.: s += ', d={}m'.format(si_format(self.d, precision=1, space=' ')) return s + ')' def params(self): params = super().params() params.update(self.neuron.params()) return params def getPltVars(self, wrapleft='df["', wrapright='"]'): pltvars = super().getPltVars(wrapleft, wrapright) pltvars.update(self.neuron.getPltVars(wrapleft, wrapright)) return pltvars def getPltScheme(self): return self.neuron.getPltScheme() def filecode(self, Fdrive, Adrive, tstim, toffset, PRF, DC, method='sonic'): return '{}_{}_{}_{:.0f}nm_{:.0f}kHz_{:.2f}kPa_{:.0f}ms_{}{}'.format( self.simkey, self.neuron.name, 'CW' if DC == 1 else 'PW', self.a * 1e9, Fdrive * 1e-3, Adrive * 1e-3, tstim * 1e3, 'PRF{:.2f}Hz_DC{:.2f}%_'.format(PRF, DC * 1e2) if DC < 1. else '', method) def fullDerivatives(self, y, t, Adrive, Fdrive, phi): ''' Compute the derivatives of the (n+3) ODE full NBLS system variables. :param y: vector of state variables :param t: specific instant in time (s) :param Adrive: acoustic drive amplitude (Pa) :param Fdrive: acoustic drive frequency (Hz) :param phi: acoustic drive phase (rad) :return: vector of derivatives ''' dydt_mech = BilayerSonophore.derivatives(self, y[:3], t, Adrive, Fdrive, y[3], phi) dydt_elec = self.neuron.Qderivatives(y[3:], t, self.Capct(y[1])) return dydt_mech + dydt_elec def effDerivatives(self, y, t, lkp): ''' Compute the derivatives of the n-ODE effective HH system variables, based on 1-dimensional linear interpolation of "effective" coefficients that summarize the system's behaviour over an acoustic cycle. :param y: vector of HH system variables at time t :param t: specific instant in time (s) :param lkp: dictionary of 1D data points of "effective" coefficients over the charge domain, for specific frequency and amplitude values. :return: vector of effective system derivatives at time t ''' # Split input vector explicitly Qm, *states = y # Compute charge and channel states variation Vmeff = self.neuron.interpVmeff(Qm, lkp) dQmdt = - self.neuron.iNet(Vmeff, states) * 1e-3 dstates = self.neuron.derEffStates(Qm, states, lkp) # Return derivatives vector return [dQmdt, *[dstates[k] for k in self.neuron.states]] def interpEffVariable(self, key, Qm, stim, lkps1D): ''' Interpolate Q-dependent effective variable along solution. :param key: lookup variable key :param Qm: charge density solution vector :param stim: stimulation state solution vector :param lkps1D: dictionary of lookups for ON and OFF states :return: interpolated effective variable vector ''' x = np.zeros(stim.size) x[stim == 0] = np.interp( Qm[stim == 0], lkps1D['ON']['Q'], lkps1D['ON'][key], left=np.nan, right=np.nan) x[stim == 1] = np.interp( Qm[stim == 1], lkps1D['ON']['Q'], lkps1D['OFF'][key], left=np.nan, right=np.nan) return x def runFull(self, Fdrive, Adrive, tstim, toffset, PRF, DC, phi=np.pi): ''' Compute solutions of the full electro-mechanical system for a specific set of US stimulation parameters, using a classic integration scheme. The first iteration uses the quasi-steady simplification to compute the initiation of motion from a flat leaflet configuration. Afterwards, the ODE system is solved iteratively until completion. :param Fdrive: acoustic drive frequency (Hz) :param Adrive: acoustic drive amplitude (Pa) :param tstim: duration of US stimulation (s) :param toffset: duration of the offset (s) :param PRF: pulse repetition frequency (Hz) :param DC: pulse duty cycle (-) :param phi: acoustic drive phase (rad) :return: 2-tuple with the output dataframe and computation time. ''' # Determine time step dt = 1 / (NPC_FULL * Fdrive) # Compute non-zero deflection value for a small perturbation (solving quasi-steady equation) Pac = self.Pacoustic(dt, Adrive, Fdrive, phi) Z0 = self.balancedefQS(self.ng0, self.Qm0, Pac) # Set initial conditions steady_states = self.neuron.steadyStates(self.neuron.Vm0) y0 = np.concatenate(( [0., Z0, self.ng0, self.Qm0], [steady_states[k] for k in self.neuron.states])) # Initialize simulator and compute solution logger.debug('Computing detailed solution') simulator = PWSimulator( lambda y, t: self.fullDerivatives(y, t, Adrive, Fdrive, phi), lambda y, t: self.fullDerivatives(y, t, 0., 0., 0.)) (t, y, stim), tcomp = simulator( y0, dt, tstim, toffset, PRF, DC, print_progress=logger.getEffectiveLevel() <= logging.INFO, target_dt=CLASSIC_TARGET_DT, monitor_time=True) logger.debug('completed in %ss', si_format(tcomp, 1)) # Store output in dataframe data = pd.DataFrame({ 't': t, 'stimstate': stim, 'Z': y[:, 1], 'ng': y[:, 2], 'Qm': y[:, 3] }) data['Vm'] = data['Qm'].values / self.v_Capct(data['Z'].values) * 1e3 # mV for i in range(len(self.neuron.states)): data[self.neuron.states[i]] = y[:, i + 4] # Return dataframe and computation time return data, tcomp def runHybrid(self, Fdrive, Adrive, tstim, toffset, PRF, DC, phi=np.pi): ''' Compute solutions of the system for a specific set of US stimulation parameters, using a hybrid integration scheme. :param Fdrive: acoustic drive frequency (Hz) :param Adrive: acoustic drive amplitude (Pa) :param tstim: duration of US stimulation (s) :param toffset: duration of the offset (s) :param phi: acoustic drive phase (rad) :return: 3-tuple with the time profile, the solution matrix and a state vector ''' # Determine time steps dt_dense, dt_sparse = [1. / (n * Fdrive) for n in [NPC_FULL, NPC_HH]] # Compute non-zero deflection value for a small perturbation (solving quasi-steady equation) Pac = self.Pacoustic(dt_dense, Adrive, Fdrive, phi) Z0 = self.balancedefQS(self.ng0, self.Qm0, Pac) # Set initial conditions steady_states = self.neuron.steadyStates(self.neuron.Vm0) y0 = np.concatenate(( [0., Z0, self.ng0, self.Qm0], [steady_states[k] for k in self.neuron.states], )) is_dense_var = np.array([True] * 3 + [False] * (len(self.neuron.states) + 1)) # Initialize simulator and compute solution logger.debug('Computing hybrid solution') simulator = HybridSimulator( lambda y, t: self.fullDerivatives(y, t, Adrive, Fdrive, phi), lambda y, t: self.fullDerivatives(y, t, 0., 0., 0.), lambda t, y, Cm: self.neuron.Qderivatives(y, t, Cm), lambda yref: self.Capct(yref[1]), is_dense_var, ivars_to_check=[1, 2]) (t, y, stim), tcomp = simulator( y0, dt_dense, dt_sparse, 1. / Fdrive, tstim, toffset, PRF, DC, monitor_time=True) logger.debug('completed in %ss', si_format(tcomp, 1)) # Store output in dataframe data = pd.DataFrame({ 't': t, 'stimstate': stim, 'Z': y[:, 1], 'ng': y[:, 2], 'Qm': y[:, 3] }) data['Vm'] = data['Qm'].values / self.v_Capct(data['Z'].values) * 1e3 # mV for i in range(len(self.neuron.states)): data[self.neuron.states[i]] = y[:, i + 4] # Return dataframe and computation time return data, tcomp def computeEffVars(self, Fdrive, Adrive, Qm, fs): ''' Compute "effective" coefficients of the HH system for a specific combination of stimulus frequency, stimulus amplitude and charge density. A short mechanical simulation is run while imposing the specific charge density, until periodic stabilization. The HH coefficients are then averaged over the last acoustic cycle to yield "effective" coefficients. :param Fdrive: acoustic drive frequency (Hz) :param Adrive: acoustic drive amplitude (Pa) :param Qm: imposed charge density (C/m2) :param fs: list of sonophore membrane coverage fractions :return: list with computation time and a list of dictionaries of effective variables ''' # Run simulation and retrieve deflection and gas content vectors from last cycle data, tcomp = BilayerSonophore.simulate(self, Fdrive, Adrive, Qm) Z_last = data.loc[-NPC_FULL:, 'Z'].values # m Cm_last = self.v_Capct(Z_last) # F/m2 # For each coverage fraction effvars = [] for x in fs: # Compute membrane capacitance and membrane potential vectors Cm = x * Cm_last + (1 - x) * self.Cm0 # F/m2 Vm = Qm / Cm * 1e3 # mV # Compute average cycle value for membrane potential and rate constants effvars.append({'V': np.mean(Vm)}) effvars[-1].update(self.neuron.computeEffRates(Vm)) # Log process log = '{}: lookups @ {}Hz, {}Pa, {:.2f} nC/cm2'.format( self, *si_format([Fdrive, Adrive], precision=1, space=' '), Qm * 1e5) if len(fs) > 1: log += ', fs = {:.0f} - {:.0f}%'.format(fs.min() * 1e2, fs.max() * 1e2) log += ', tcomp = {:.3f} s'.format(tcomp) logger.info(log) # Return effective coefficients return [tcomp, effvars] def runSONIC(self, Fdrive, Adrive, tstim, toffset, PRF, DC): ''' Compute solutions of the system for a specific set of US stimulation parameters, using charge-predicted "effective" coefficients to solve the HH equations at each step. :param Fdrive: acoustic drive frequency (Hz) :param Adrive: acoustic drive amplitude (Pa) :param tstim: duration of US stimulation (s) :param toffset: duration of the offset (s) :param PRF: pulse repetition frequency (Hz) :param DC: pulse duty cycle (-) :return: 3-tuple with the time profile, the effective solution matrix and a state vector ''' # Load appropriate 2D lookups Aref, Qref, lkps2D, _ = getLookups2D(self.neuron.name, a=self.a, Fdrive=Fdrive) # Check that acoustic amplitude is within lookup range Adrive = isWithin('amplitude', Adrive, (Aref.min(), Aref.max())) # Interpolate 2D lookups at zero and US amplitude logger.debug('Interpolating lookups at A = %.2f kPa and A = 0', Adrive * 1e-3) lkps1D = {state: {key: interp1d(Aref, y2D, axis=0)(val) for key, y2D in lkps2D.items()} for state, val in {'ON': Adrive, 'OFF': 0.}.items()} # Add reference charge vector to 1D lookup dictionaries for state in lkps1D.keys(): lkps1D[state]['Q'] = Qref # Set initial conditions steady_states = self.neuron.steadyStates(self.neuron.Vm0) y0 = np.insert(np.array([steady_states[k] for k in self.neuron.states]), 0, self.Qm0) # Initialize simulator and compute solution logger.debug('Computing effective solution') simulator = PWSimulator( lambda y, t: self.effDerivatives(y, t, lkps1D['ON']), lambda y, t: self.effDerivatives(y, t, lkps1D['OFF'])) (t, y, stim), tcomp = simulator(y0, DT_EFF, tstim, toffset, PRF, DC, monitor_time=True) logger.debug('completed in %ss', si_format(tcomp, 1)) # Store output in dataframe data = pd.DataFrame({ 't': t, 'stimstate': stim, 'Qm': y[:, 0] }) for key in ['ng', 'V']: data[key] = self.interpEffVariable(key, data['Qm'].values, stim, lkps1D) data['Z'] = np.array([self.balancedefQS(ng, Qm) for ng, Qm in zip( data['ng'].values, data['Qm'].values)]) # m data['Vm'] = data['Qm'].values / self.v_Capct(data['Z'].values) * 1e3 # mV for i in range(len(self.neuron.states)): data[self.neuron.states[i]] = y[:, i + 1] # Return dataframe and computation time return data, tcomp def meta(self, Fdrive, Adrive, tstim, toffset, PRF, DC, method): ''' Return information about object and simulation parameters. :param Fdrive: US frequency (Hz) :param Adrive: acoustic drive amplitude (Pa) :param tstim: stimulus duration (s) :param toffset: stimulus offset (s) :param PRF: pulse repetition frequency (Hz) :param DC: stimulus duty cycle (-) :param method: integration method :return: meta-data dictionary ''' return { 'neuron': self.neuron.name, 'a': self.a, 'd': self.d, 'Fdrive': Fdrive, 'Adrive': Adrive, 'tstim': tstim, 'toffset': toffset, 'PRF': PRF, 'DC': DC, 'method': method } def simulate(self, Fdrive, Adrive, tstim, toffset, PRF=100., DC=1.0, method='sonic'): ''' Simulate the electro-mechanical model for a specific set of US stimulation parameters, and return output data in a dataframe. :param Fdrive: acoustic drive frequency (Hz) :param Adrive: acoustic drive amplitude (Pa) :param tstim: duration of US stimulation (s) :param toffset: duration of the offset (s) :param PRF: pulse repetition frequency (Hz) :param DC: pulse duty cycle (-) :param method: selected integration method :return: 2-tuple with the output dataframe and computation time. ''' logger.info( '%s: simulation @ f = %sHz, A = %sPa, t = %ss (%ss offset)%s', self, si_format(Fdrive, 0, space=' '), si_format(Adrive, 2, space=' '), *si_format([tstim, toffset], 1, space=' '), (', PRF = {}Hz, DC = {:.2f}%'.format( si_format(PRF, 2, space=' '), DC * 1e2) if DC < 1.0 else '')) # Check validity of stimulation parameters BilayerSonophore.checkInputs(self, Fdrive, Adrive, 0.0, 0.0) self.neuron.checkInputs(Adrive, tstim, toffset, PRF, DC) # Call appropriate simulation function try: simfunc = { 'full': self.runFull, 'hybrid': self.runHybrid, 'sonic': self.runSONIC }[method] except KeyError: raise ValueError('Invalid integration method: "{}"'.format(method)) data, tcomp = simfunc(Fdrive, Adrive, tstim, toffset, PRF, DC) # Log number of detected spikes nspikes = self.neuron.getNSpikes(data) logger.debug('{} spike{} detected'.format(nspikes, plural(nspikes))) # Return dataframe and computation time return data, tcomp @logCache(os.path.join(os.path.split(__file__)[0], 'astim_titrations.log')) def titrate(self, Fdrive, tstim, toffset, PRF=100., DC=1., method='sonic', xfunc=None, Arange=None): ''' Use a binary search to determine the threshold amplitude needed to obtain neural excitation for a given frequency, duration, PRF and duty cycle. :param Fdrive: US frequency (Hz) :param tstim: duration of US stimulation (s) :param toffset: duration of the offset (s) :param PRF: pulse repetition frequency (Hz) :param DC: pulse duty cycle (-) :param method: integration method :param xfunc: function determining whether condition is reached from simulation output :param Arange: search interval for Adrive, iteratively refined :return: determined threshold amplitude (Pa) ''' # Default output function if xfunc is None: xfunc = self.neuron.titrationFunc # Default amplitude interval if Arange is None: Arange = (0, getLookups2D(self.neuron.name, a=self.a, Fdrive=Fdrive)[0].max()) return binarySearch( lambda x: xfunc(self.simulate(*x)[0]), [Fdrive, tstim, toffset, PRF, DC, method], 1, Arange, TITRATION_ASTIM_DA_MAX ) def simQueue(self, freqs, amps, durations, offsets, PRFs, DCs, method): ''' Create a serialized 2D array of all parameter combinations for a series of individual parameter sweeps, while avoiding repetition of CW protocols for a given PRF sweep. :param freqs: list (or 1D-array) of US frequencies :param amps: list (or 1D-array) of acoustic amplitudes :param durations: list (or 1D-array) of stimulus durations :param offsets: list (or 1D-array) of stimulus offsets (paired with durations array) :param PRFs: list (or 1D-array) of pulse-repetition frequencies :param DCs: list (or 1D-array) of duty cycle values :params method: integration method :return: list of parameters (list) for each simulation ''' if amps is None: amps = [np.nan] DCs = np.array(DCs) queue = [] if 1.0 in DCs: queue += createQueue(freqs, amps, durations, offsets, min(PRFs), 1.0) if np.any(DCs != 1.0): queue += createQueue(freqs, amps, durations, offsets, PRFs, DCs[DCs != 1.0]) for item in queue: if np.isnan(item[1]): item[1] = None item.append(method) return queue def quasiSteadyStates(self, Fdrive, amps=None, charges=None, DCs=1.0, squeeze_output=False): ''' Compute the quasi-steady state values of the neuron's gating variables for a combination of US amplitudes, charge densities and duty cycles, at a specific US frequency. :param Fdrive: US frequency (Hz) :param amps: US amplitudes (Pa) :param charges: membrane charge densities (C/m2) :param DCs: duty cycle value(s) :return: 4-tuple with reference values of US amplitude and charge density, as well as interpolated Vmeff and QSS gating variables ''' # Get DC-averaged lookups interpolated at the appropriate amplitudes and charges amps, charges, lookups = getLookupsDCavg( self.neuron.name, self.a, Fdrive, amps, charges, DCs) # Compute QSS states using these lookups nA, nQ, nDC = lookups['V'].shape QSS = {k: np.empty((nA, nQ, nDC)) for k in self.neuron.states} for iA in range(nA): for iDC in range(nDC): QSS_1D = self.neuron.quasiSteadyStates( {k: v[iA, :, iDC] for k, v in lookups.items()}) for k in QSS.keys(): QSS[k][iA, :, iDC] = QSS_1D[k] # Compress outputs if needed if squeeze_output: QSS = {k: v.squeeze() for k, v in QSS.items()} lookups = {k: v.squeeze() for k, v in lookups.items()} # Return reference inputs and outputs return amps, charges, lookups, QSS def iNetQSS(self, Qm, Fdrive, Adrive, DC): ''' Compute quasi-steady state net membrane current for a given combination of US parameters and a given membrane charge density. :param Qm: membrane charge density (C/m2) :param Fdrive: US frequency (Hz) :param Adrive: US amplitude (Pa) :param DC: duty cycle (-) :return: net membrane current (mA/m2) ''' _, _, lookups, QSS = self.quasiSteadyStates( Fdrive, amps=Adrive, charges=Qm, DCs=DC, squeeze_output=True) return self.neuron.iNet(lookups['V'], np.array(list(QSS.values()))) # mA/m2 def evaluateStability(self, Qm0, states0, lkp): ''' Integrate the effective differential system from a given starting point, until clear convergence or clear divergence is found. :param Qm0: initial membrane charge density (C/m2) :param states0: dictionary of initial states values :param lkp: dictionary of 1D data points of "effective" coefficients over the charge domain, for specific frequency and amplitude values. :return: boolean indicating convergence state ''' # Initialize y0 vector t0 = 0. y0 = np.array([Qm0] + list(states0.values())) # # Initialize simulator and compute solution # simulator = PeriodicSimulator( # lambda y, t: self.effDerivatives(y, t, lkp), # ivars_to_check=[0]) # simulator.stopfunc = simulator.isAsymptoticallyStable # nmax = int(QSS_HISTORY_INTERVAL // QSS_INTEGRATION_INTERVAL) # t, y, stim = simulator.compute(y0, DT_EFF, QSS_INTEGRATION_INTERVAL, nmax=nmax) # logger.debug('completed in %ss', si_format(tcomp, 1)) # conv = t[-1] < QSS_HISTORY_INTERVAL # Initializing empty list to record evolution of charge deviation n = int(QSS_HISTORY_INTERVAL // QSS_INTEGRATION_INTERVAL) # size of history dQ = [] # As long as there is no clear charge convergence or divergence conv, div = False, False tf, yf = t0, y0 while not conv and not div: # Integrate system for small interval and retrieve final charge deviation t0, y0 = tf, yf sol = solve_ivp( lambda t, y: self.effDerivatives(y, t, lkp), [t0, t0 + QSS_INTEGRATION_INTERVAL], y0, method='LSODA' ) tf, yf = sol.t[-1], sol.y[:, -1] dQ.append(yf[0] - Qm0) # logger.debug('{:.0f} ms: dQ = {:.5f} nC/cm2, avg dQ = {:.5f} nC/cm2'.format( # tf * 1e3, dQ[-1] * 1e5, np.mean(dQ[-n:]) * 1e5)) # If last charge deviation is too large -> divergence if np.abs(dQ[-1]) > QSS_Q_DIV_THR: div = True # If last charge deviation or average deviation in recent history # is small enough -> convergence for x in [dQ[-1], np.mean(dQ[-n:])]: if np.abs(x) < QSS_Q_CONV_THR: conv = True # If max integration duration is been reached -> error if tf > QSS_MAX_INTEGRATION_DURATION: raise ValueError('too many iterations') logger.debug('{}vergence after {:.0f} ms: dQ = {:.5f} nC/cm2'.format( {True: 'con', False: 'di'}[conv], tf * 1e3, dQ[-1] * 1e5)) return conv def fixedPointsQSS(self, Fdrive, Adrive, DC, lkp, dQdt): ''' Compute QSS fixed points along the charge dimension for a given combination of US parameters, and determine their stability. :param Fdrive: US frequency (Hz) :param Adrive: US amplitude (Pa) :param DC: duty cycle (-) :param lkp: lookup dictionary for effective variables along charge dimension :param dQdt: charge derivative profile along charge dimension :return: 2-tuple with values of stable and unstable fixed points ''' logger.debug('A = {:.2f} kPa, DC = {:.0f}%'.format(Adrive * 1e-3, DC * 1e2)) # Extract stable and unstable fixed points from QSS charge variation profile dfunc = lambda Qm: - self.iNetQSS(Qm, Fdrive, Adrive, DC) SFP_candidates = getFixedPoints(lkp['Q'], dQdt, filter='stable', der_func=dfunc).tolist() UFPs = getFixedPoints(lkp['Q'], dQdt, filter='unstable', der_func=dfunc).tolist() SFPs = [] pltvars = self.getPltVars() # For each candidate SFP for i, Qm in enumerate(SFP_candidates): logger.debug('Q-SFP = {:.2f} nC/cm2'.format(Qm * 1e5)) # Re-compute QSS *_, QSS_FP = self.quasiSteadyStates(Fdrive, amps=Adrive, charges=Qm, DCs=DC, squeeze_output=True) # Simulate from unperturbed QSS and evaluate stability if not self.evaluateStability(Qm, QSS_FP, lkp): logger.warning('diverging system at ({:.2f} kPa, {:.2f} nC/cm2)'.format( Adrive * 1e-3, Qm * 1e5)) UFPs.append(Qm) else: # For each state unstable_states = [] for x in self.neuron.states: pltvar = pltvars[x] unit_str = pltvar.get('unit', '') factor = pltvar.get('factor', 1) is_stable_direction = [] for sign in [-1, +1]: # Perturb state with small offset QSS_perturbed = deepcopy(QSS_FP) QSS_perturbed[x] *= (1 + sign * QSS_REL_OFFSET) # If gating state, bound within [0., 1.] if self.neuron.isVoltageGated(x): QSS_perturbed[x] = np.clip(QSS_perturbed[x], 0., 1.) logger.debug('{}: {:.5f} -> {:.5f} {}'.format( x, QSS_FP[x] * factor, QSS_perturbed[x] * factor, unit_str)) # Simulate from perturbed QSS and evaluate stability is_stable_direction.append( self.evaluateStability(Qm, QSS_perturbed, lkp)) # Check if system shows stability upon x-state perturbation # in both directions if not np.all(is_stable_direction): unstable_states.append(x) # Classify fixed point as stable only if all states show stability is_stable_FP = len(unstable_states) == 0 {True: SFPs, False: UFPs}[is_stable_FP].append(Qm) logger.info('{}stable fixed-point at ({:.2f} kPa, {:.2f} nC/cm2){}'.format( '' if is_stable_FP else 'un', Adrive * 1e-3, Qm * 1e5, '' if is_stable_FP else ', caused by {} states'.format(unstable_states))) return SFPs, UFPs diff --git a/PySONIC/parsers.py b/PySONIC/parsers.py index 1ab5536..7cb489f 100644 --- a/PySONIC/parsers.py +++ b/PySONIC/parsers.py @@ -1,358 +1,387 @@ import logging import numpy as np from argparse import ArgumentParser -from .utils import Intensity2Pressure, selectDirDialog +from .utils import Intensity2Pressure, selectDirDialog, isIterable from .neurons import getPointNeuron, CorticalRS -class SimParser(ArgumentParser): - ''' Generic simulation parser interface. ''' +class Parser(ArgumentParser): + ''' Generic parser interface. ''' dist_str = '[scale min max n]' def __init__(self): super().__init__() self.defaults = {} self.allowed = {} self.factors = {} self.addPlot() - self.addMPI() self.addVerbose() def getDistribution(self, xmin, xmax, nx, scale='lin'): if scale == 'log': xmin, xmax = np.log10(xmin), np.log10(xmax) return {'lin': np.linspace, 'log': np.logspace}[scale](xmin, xmax, nx) def getDistFromList(self, xlist): if not isinstance(xlist, list): raise TypeError('Input must be a list') if len(xlist) != 4: raise ValueError('List must contain exactly 4 arguments ([type, min, max, n])') scale = xlist[0] if scale not in ('log', 'lin'): raise ValueError('Unknown distribution type (must be "lin" or "log")') xmin, xmax = [float(x) for x in xlist[1:-1]] if xmin >= xmax: raise ValueError('Specified minimum higher or equal than specified maximum') nx = int(xlist[-1]) if nx < 2: raise ValueError('Specified number must be at least 2') return self.getDistribution(xmin, xmax, nx, scale=scale) def addVerbose(self): self.add_argument( '-v', '--verbose', default=False, action='store_true', help='Increase verbosity') def addPlot(self): self.add_argument( '-p', '--plot', type=str, nargs='+', help='Variables to plot') def addMPI(self): self.add_argument( '--mpi', default=False, action='store_true', help='Use multiprocessing') def addSave(self): self.add_argument( - '-s', '--save', default=False, action='store_true', help='Save output figures') + '-s', '--save', default=False, action='store_true', help='Save output figure(s)') + + def addCompare(self, desc='Comparative graph'): + self.add_argument( + '--compare', default=False, action='store_true', help=desc) + + def addSamplingRate(self): + self.add_argument( + '--sr', type=int, default=1, help='Sampling rate for plot') + + def addHideOutput(self): + self.add_argument( + '--hide', default=False, action='store_true', help='Hide output') def addCmap(self, default='viridis'): self.add_argument( - '-c', '--cmap', type=str, default=default, help='Colormap name') + '--cmap', type=str, default=default, help='Colormap name') - def addInputDir(self): + def addInputDir(self, dep_key=None): + self.inputdir_dep_key = dep_key self.add_argument( '-i', '--inputdir', type=str, help='Input directory') - def addOutputDir(self): + def addOutputDir(self, dep_key=None): + self.outputdir_dep_key = dep_key self.add_argument( '-o', '--outputdir', type=str, help='Output directory') - def parseDir(self, key, args): - directory = args[key] if args[key] is not None else selectDirDialog() + def parseDir(self, key, args, title, dep_key=None): + if dep_key is not None and args[dep_key] is False: + return None + directory = args[key] if args[key] is not None else selectDirDialog(title=title) if directory == '': raise ValueError('No {} selected'.format(key)) return directory def parseInputDir(self, args): - return self.parseDir('inputdir', args) + return self.parseDir('inputdir', args, 'Select input directory', self.inputdir_dep_key) def parseOutputDir(self, args): - return self.parseDir('outputdir', args) + return self.parseDir('outputdir', args, 'Select output directory', self.outputdir_dep_key) def parseLogLevel(self, args): return logging.DEBUG if args.pop('verbose') else logging.INFO def parsePltScheme(self, args): if args['plot'] == ['all']: return None else: return {x: [x] for x in args['plot']} def restrict(self, args, keys): if sum([args[x] is not None for x in keys]) > 1: raise ValueError( 'You must provide only one of the following arguments: {}'.format(', '.join(keys))) def parse2array(self, args, key, factor=1): return np.array(args[key]) * factor def parse(self): args = vars(super().parse_args()) args['loglevel'] = self.parseLogLevel(args) for k, v in self.defaults.items(): if args[k] is None: - args[k] = [v] + args[k] = v if isIterable(v) else [v] + return args + + +class SimParser(Parser): + ''' Generic simulation parser interface. ''' + + def __init__(self): + super().__init__() + self.addMPI() + self.addOutputDir() + + def parse(self): + args = super().parse() + args['outputdir'] = self.parseOutputDir(args) return args class MechSimParser(SimParser): ''' Parser to run mechanical simulations from the command line. ''' def __init__(self): super().__init__() self.defaults.update({ 'radius': 32.0, # nm 'embedding': 0., # um 'Cm0': CorticalRS().Cm0 * 1e2, # uF/m2 'Qm0': CorticalRS().Qm0 * 1e5, # nC/m2 'freq': 500.0, # kHz 'amp': 100.0, # kPa 'charge': 0. # nC/cm2 }) self.factors.update({ 'radius': 1e-9, 'embedding': 1e-6, 'Cm0': 1e-2, 'Qm0': 1e-5, 'freq': 1e3, 'amp': 1e3, 'charge': 1e-5 }) self.addRadius() self.addEmbedding() self.addCm0() self.addQm0() self.addFdrive() self.addAdrive() self.addCharge() def addRadius(self): self.add_argument( '-a', '--radius', nargs='+', type=float, help='Sonophore radius (nm)') def addEmbedding(self): self.add_argument( '--embedding', nargs='+', type=float, help='Embedding depth (um)') def addCm0(self): self.add_argument( '--Cm0', type=float, help='Resting membrane capacitance (uF/cm2)') def addQm0(self): self.add_argument( '--Qm0', type=float, help='Resting membrane charge density (nC/cm2)') def addFdrive(self): self.add_argument( '-f', '--freq', nargs='+', type=float, help='US frequency (kHz)') def addAdrive(self): self.add_argument( '-A', '--amp', nargs='+', type=float, help='Acoustic pressure amplitude (kPa)') self.add_argument( '--Arange', type=str, nargs='+', help='Amplitude range {} (kPa)'.format(self.dist_str)) self.add_argument( '-I', '--intensity', nargs='+', type=float, help='Acoustic intensity (W/cm2)') self.add_argument( '--Irange', type=str, nargs='+', help='Intensity range {} (W/cm2)'.format(self.dist_str)) + def addAscale(self, default='lin'): + self.add_argument( + '--Ascale', type=str, choices=('lin', 'log'), default=default, + help='Amplitude scale for plot ("lin" or "log")') + def addCharge(self): self.add_argument( '-Q', '--charge', nargs='+', type=float, help='Membrane charge density (nC/cm2)') def parseAmp(self, args): params = ['Irange', 'Arange', 'intensity', 'amp'] self.restrict(args, params[:-1]) Irange, Arange, Int, Adrive = [args.pop(k) for k in params] - Ascale = None if Irange is not None: amps = Intensity2Pressure(self.getDistFromList(Irange) * 1e4) # Pa - Ascale = Irange[0] elif Int is not None: amps = Intensity2Pressure(np.array(Int) * 1e4) # Pa elif Arange is not None: amps = self.getDistFromList(Arange) * self.factors['amp'] # Pa - Ascale = Arange[0] else: amps = np.array(Adrive) * self.factors['amp'] # Pa - Ascale = args.get('Arange', ['lin'])[0] - return amps, Ascale + return amps def parse(self): args = super().parse() - args['amp'], args['Ascale'] = self.parseAmp(args) + args['amp'] = self.parseAmp(args) for key in ['radius', 'embedding', 'Cm0', 'Qm0', 'freq', 'charge']: args[key] = self.parse2array(args, key, factor=self.factors[key]) return args class PWSimParser(SimParser): ''' Generic parser interface to run PW patterned simulations from the command line. ''' def __init__(self): super().__init__() self.defaults.update({ 'neuron': 'RS', 'tstim': 100.0, # ms 'toffset': 50., # ms 'PRF': 100.0, # Hz 'DC': 100.0 # % }) self.factors.update({ 'tstim': 1e-3, 'toffset': 1e-3, 'PRF': 1., 'DC': 1e-2 }) self.allowed.update({ 'DC': range(101) }) self.addNeuron() self.addTstim() self.addToffset() self.addPRF() self.addDC() self.addSpanDC() self.addTitrate() def addNeuron(self): self.add_argument( '-n', '--neuron', type=str, nargs='+', help='Neuron name (string)') def addTstim(self): self.add_argument( '-t', '--tstim', nargs='+', type=float, help='Stimulus duration (ms)') def addToffset(self): self.add_argument( '--toffset', nargs='+', type=float, help='Offset duration (ms)') def addPRF(self): self.add_argument( '--PRF', nargs='+', type=float, help='PRF (Hz)') def addDC(self): self.add_argument( '--DC', nargs='+', type=float, help='Duty cycle (%%)') def addSpanDC(self): self.add_argument( '--spanDC', default=False, action='store_true', help='Span DC from 1 to 100%%') def addTitrate(self): self.add_argument( '--titrate', default=False, action='store_true', help='Perform titration') def parseNeuron(self, args): # for item in args['neuron']: # if item not in self.allowed['neuron']: # raise ValueError('Unknown neuron type: "{}"'.format(item)) return [getPointNeuron(n) for n in args['neuron']] def parseAmp(self, args): return NotImplementedError def parseDC(self, args): if args.pop('spanDC'): return np.arange(1, 101) * self.factors['DC'] # (-) else: return np.array(args['DC']) * self.factors['DC'] # (-) def parse(self, args=None): if args is None: args = super().parse() args['neuron'] = self.parseNeuron(args) args['DC'] = self.parseDC(args) for key in ['tstim', 'toffset', 'PRF']: args[key] = self.parse2array(args, key, factor=self.factors[key]) return args class EStimParser(PWSimParser): ''' Parser to run E-STIM simulations from the command line. ''' def __init__(self): super().__init__() self.defaults.update({'amp': 10.0}) # mA/m2 self.factors.update({'amp': 1.}) self.addAstim() def addAstim(self): self.add_argument( '-A', '--amp', nargs='+', type=float, help='Amplitude of injected current density (mA/m2)') self.add_argument( '--Arange', type=str, nargs='+', help='Amplitude range {} (mA/m2)'.format(self.dist_str)) def parseAmp(self, args): if args.pop('titrate'): return None Arange, Astim = [args.pop(k) for k in ['Arange', 'amp']] - Ascale = 'lin' if Arange is not None: amps = self.getDistFromList(Arange) * self.factors['amp'] # mA/m2 Ascale = Arange[0] else: amps = np.array(Astim) * self.factors['amp'] # mA/m2 - return amps, Ascale + return amps def parse(self): args = super().parse() - args['amp'], args['Ascale'] = self.parseAmp(args) + args['amp'] = self.parseAmp(args) return args class AStimParser(PWSimParser, MechSimParser): ''' Parser to run A-STIM simulations from the command line. ''' def __init__(self): MechSimParser.__init__(self) PWSimParser.__init__(self) self.defaults.update({'method': 'sonic'}) self.allowed.update({'method': ['classic', 'hybrid', 'sonic']}) self.addMethod() def addMethod(self): self.add_argument( '-m', '--method', nargs='+', type=str, help='Numerical integration method ({})'.format(', '.join(self.allowed['method']))) def parseMethod(self, args): for item in args['method']: if item not in self.allowed['method']: raise ValueError('Unknown neuron type: "{}"'.format(item)) def parseAmp(self, args): if args.pop('titrate'): return None return MechSimParser.parseAmp(self, args) def parse(self): args = PWSimParser.parse(self, args=MechSimParser.parse(self)) for k in ['Cm0', 'Qm0', 'embedding', 'charge']: del args[k] self.parseMethod(args) return args diff --git a/PySONIC/utils.py b/PySONIC/utils.py index 2b45acc..6619cdf 100644 --- a/PySONIC/utils.py +++ b/PySONIC/utils.py @@ -1,450 +1,459 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Date: 2016-09-19 22:30:46 # @Email: theo.lemaire@epfl.ch # @Last Modified by: Theo Lemaire -# @Last Modified time: 2019-06-06 21:28:44 +# @Last Modified time: 2019-06-07 15:42:20 ''' Definition of generic utility functions used in other modules ''' import csv from functools import wraps import operator import time import os import math import pickle from tqdm import tqdm import logging import tkinter as tk from tkinter import filedialog import numpy as np import colorlog # Package logger my_log_formatter = colorlog.ColoredFormatter( '%(log_color)s %(asctime)s %(message)s', datefmt='%d/%m/%Y %H:%M:%S:', reset=True, log_colors={ 'DEBUG': 'green', 'INFO': 'white', 'WARNING': 'yellow', 'ERROR': 'red', 'CRITICAL': 'red,bg_white', }, style='%') def setHandler(logger, handler): for h in logger.handlers: logger.removeHandler(h) logger.addHandler(handler) return logger def setLogger(name, formatter): handler = colorlog.StreamHandler() handler.setFormatter(formatter) logger = colorlog.getLogger(name) logger.addHandler(handler) return logger class TqdmHandler(logging.StreamHandler): def __init__(self, formatter): logging.StreamHandler.__init__(self) self.setFormatter(formatter) def emit(self, record): msg = self.format(record) tqdm.write(msg) logger = setLogger('PySONIC', my_log_formatter) # SI units prefixes si_prefixes = { 'y': 1e-24, # yocto 'z': 1e-21, # zepto 'a': 1e-18, # atto 'f': 1e-15, # femto 'p': 1e-12, # pico 'n': 1e-9, # nano 'u': 1e-6, # micro 'm': 1e-3, # mili '': 1e0, # None 'k': 1e3, # kilo 'M': 1e6, # mega 'G': 1e9, # giga 'T': 1e12, # tera 'P': 1e15, # peta 'E': 1e18, # exa 'Z': 1e21, # zetta 'Y': 1e24, # yotta } def si_format(x, precision=0, space=' '): ''' Format a float according to the SI unit system, with the appropriate prefix letter. ''' if isinstance(x, float) or isinstance(x, int) or isinstance(x, np.float) or\ isinstance(x, np.int32) or isinstance(x, np.int64): if x == 0: factor = 1e0 prefix = '' else: sorted_si_prefixes = sorted(si_prefixes.items(), key=operator.itemgetter(1)) vals = [tmp[1] for tmp in sorted_si_prefixes] # vals = list(si_prefixes.values()) ix = np.searchsorted(vals, np.abs(x)) - 1 if np.abs(x) == vals[ix + 1]: ix += 1 factor = vals[ix] prefix = sorted_si_prefixes[ix][0] # prefix = list(si_prefixes.keys())[ix] return '{{:.{}f}}{}{}'.format(precision, space, prefix).format(x / factor) elif isinstance(x, list) or isinstance(x, tuple): return [si_format(item, precision, space) for item in x] elif isinstance(x, np.ndarray) and x.ndim == 1: return [si_format(float(item), precision, space) for item in x] else: print(type(x)) def pow10_format(number, precision=2): ''' Format a number in power of 10 notation. ''' ret_string = '{0:.{1:d}e}'.format(number, precision) a, b = ret_string.split("e") a = float(a) b = int(b) return '{}10^{{{}}}'.format('{} * '.format(a) if a != 1. else '', b) def plural(n): if n < 0: raise ValueError('Cannot format negative integer (n = {})'.format(n)) if n == 0: return '' else: return 's' def rmse(x1, x2): ''' Compute the root mean square error between two 1D arrays ''' return np.sqrt(((x1 - x2) ** 2).mean()) def rsquared(x1, x2): ''' compute the R-squared coefficient between two 1D arrays ''' residuals = x1 - x2 ss_res = np.sum(residuals**2) ss_tot = np.sum((x1 - np.mean(x1))**2) return 1 - (ss_res / ss_tot) def Pressure2Intensity(p, rho=1075.0, c=1515.0): ''' Return the spatial peak, pulse average acoustic intensity (ISPPA) associated with the specified pressure amplitude. :param p: pressure amplitude (Pa) :param rho: medium density (kg/m3) :param c: speed of sound in medium (m/s) :return: spatial peak, pulse average acoustic intensity (W/m2) ''' return p**2 / (2 * rho * c) def Intensity2Pressure(I, rho=1075.0, c=1515.0): ''' Return the pressure amplitude associated with the specified spatial peak, pulse average acoustic intensity (ISPPA). :param I: spatial peak, pulse average acoustic intensity (W/m2) :param rho: medium density (kg/m3) :param c: speed of sound in medium (m/s) :return: pressure amplitude (Pa) ''' return np.sqrt(2 * rho * c * I) def OpenFilesDialog(filetype, dirname=''): ''' Open a FileOpenDialogBox to select one or multiple file. The default directory and file type are given. :param dirname: default directory :param filetype: default file type :return: tuple of full paths to the chosen filenames ''' root = tk.Tk() root.withdraw() - filenames = filedialog.askopenfilenames(filetypes=[(filetype + " files", '.' + filetype)], - initialdir=dirname) + filenames = filedialog.askopenfilenames( + filetypes=[(filetype + " files", '.' + filetype)], + initialdir=dirname + ) if filenames: par_dir = os.path.abspath(os.path.join(filenames[0], os.pardir)) else: par_dir = None return (filenames, par_dir) -def selectDirDialog(): +def selectDirDialog(title='Select directory'): ''' Open a dialog box to select a directory. :return: full path to selected directory ''' root = tk.Tk() root.withdraw() - return filedialog.askdirectory() + return filedialog.askdirectory(title=title) def SaveFileDialog(filename, dirname=None, ext=None): ''' Open a dialog box to save file. :param filename: filename :param dirname: initial directory :param ext: default extension :return: full path to the chosen filename ''' root = tk.Tk() root.withdraw() filename_out = filedialog.asksaveasfilename( defaultextension=ext, initialdir=dirname, initialfile=filename) return filename_out def loadData(fpath, frequency=1): ''' Load dataframe and metadata dictionary from pickle file. ''' logger.info('Loading data from "%s"', os.path.basename(fpath)) with open(fpath, 'rb') as fh: frame = pickle.load(fh) df = frame['data'].iloc[::frequency] meta = frame['meta'] return df, meta def rescale(x, lb=None, ub=None, lb_new=0, ub_new=1): ''' Rescale a value to a specific interval by linear transformation. ''' if lb is None: lb = x.min() if ub is None: ub = x.max() xnorm = (x - lb) / (ub - lb) return xnorm * (ub_new - lb_new) + lb_new +def isIterable(x): + for t in [list, tuple, np.ndarray]: + if isinstance(x, t): + return True + return False + + def isWithin(name, val, bounds, rel_tol=1e-9): ''' Check if a floating point number is within an interval. If the value falls outside the interval, an error is raised. If the value falls just outside the interval due to rounding errors, the associated interval bound is returned. :param val: float value :param bounds: interval bounds (float tuple) :return: original or corrected value ''' - if isinstance(val, list) or isinstance(val, np.ndarray) or isinstance(val, tuple): + if isIterable(val): return [isWithin(name, v, bounds, rel_tol) for v in val] if val >= bounds[0] and val <= bounds[1]: return val elif val < bounds[0] and math.isclose(val, bounds[0], rel_tol=rel_tol): logger.warning('Rounding %s value (%s) to interval lower bound (%s)', name, val, bounds[0]) return bounds[0] elif val > bounds[1] and math.isclose(val, bounds[1], rel_tol=rel_tol): logger.warning('Rounding %s value (%s) to interval upper bound (%s)', name, val, bounds[1]) return bounds[1] else: raise ValueError('{} value ({}) out of [{}, {}] interval'.format( name, val, bounds[0], bounds[1])) def getDistribution(xmin, xmax, nx, scale='lin'): if scale == 'log': xmin, xmax = np.log10(xmin), np.log10(xmax) return {'lin': np.linspace, 'log': np.logspace}[scale](xmin, xmax, nx) def getDistFromList(xlist): if not isinstance(xlist, list): raise TypeError('Input must be a list') if len(xlist) != 4: raise ValueError('List must contain exactly 4 arguments ([type, min, max, n])') scale = xlist[0] if scale not in ('log', 'lin'): raise ValueError('Unknown distribution type (must be "lin" or "log")') xmin, xmax = [float(x) for x in xlist[1:-1]] if xmin >= xmax: raise ValueError('Specified minimum higher or equal than specified maximum') nx = int(xlist[-1]) if nx < 2: raise ValueError('Specified number must be at least 2') return getDistribution(xmin, xmax, nx, scale=scale) def getIndex(container, value): ''' Return the index of a float / string value in a list / array :param container: list / 1D-array of elements :param value: value to search for :return: index of value (if found) ''' if isinstance(value, float): container = np.array(container) imatches = np.where(np.isclose(container, value, rtol=1e-9, atol=1e-16))[0] if len(imatches) == 0: raise ValueError('{} not found in {}'.format(value, container)) return imatches[0] elif isinstance(value, str): return container.index(value) def debug(func): ''' Print the function signature and return value. ''' @wraps(func) def wrapper_debug(*args, **kwargs): args_repr = [repr(a) for a in args] kwargs_repr = [f"{k}={v!r}" for k, v in kwargs.items()] signature = '{}({})'.format(func.__name__, ', '.join(args_repr + kwargs_repr)) print('Calling {}'.format(signature)) value = func(*args, **kwargs) print(f"{func.__name__!r} returned {value!r}") return value return wrapper_debug def timer(func): ''' Monitor and return the runtime of the decorated function. ''' @wraps(func) def wrapper(*args, **kwargs): start_time = time.perf_counter() value = func(*args, **kwargs) end_time = time.perf_counter() run_time = end_time - start_time return value, run_time return wrapper def logCache(fpath, delimiter='\t', out_type=float): ''' Add an extra IO memoization functionality to a function using file caching, to avoid repetitions of tedious computations with identical inputs. ''' def wrapper_with_args(func): @wraps(func) def wrapper(*args, **kwargs): # If function has history -> do not log if 'history' in kwargs: return func(*args, **kwargs) # Translate function arguments into string signature args_repr = [repr(a) for a in args] kwargs_repr = [f"{k}={v!r}" for k, v in kwargs.items()] signature = '{}({})'.format(func.__name__, ', '.join(args_repr + kwargs_repr)) # If entry present in log, return corresponding output if os.path.isfile(fpath): with open(fpath, 'r', newline='') as f: reader = csv.reader(f, delimiter=delimiter) for row in reader: if row[0] == signature: logger.info('entry found in "{}"'.format(os.path.basename(fpath))) return out_type(row[1]) # Otherwise, compute output and log it into file before returning out = func(*args, **kwargs) with open(fpath, 'a', newline='') as csvfile: writer = csv.writer(csvfile, delimiter=delimiter) writer.writerow([signature, str(out)]) return out return wrapper return wrapper_with_args def fileCache(root, fcode_func, load_func=pickle.load, dump_func=pickle.dump): def wrapper_with_args(func): @wraps(func) def wrapper(*args, **kwargs): # Get file path from root and function arguments, using fcode function fpath = os.path.join(os.path.abspath(root), '{}.pkl'.format(fcode_func(*args))) # If file exists, load output from it if os.path.isfile(fpath): print('loading data from "{}"'.format(fpath)) with open(fpath, 'rb') as f: out = load_func(f) # Otherwise, execute function and create the file to dump the output else: out = func(*args, **kwargs) print('dumping data in "{}"'.format(fpath)) with open(fpath, 'wb') as f: dump_func(out, f) return out return wrapper return wrapper_with_args def binarySearch(bool_func, args, ix, xbounds, dx_thr, history=None): ''' Use a binary search to determine the threshold satisfying a given condition within a continuous search interval. :param bool_func: boolean function returning whether condition is satisfied :param args: list of function arguments other than refined value :param xbounds: search interval for threshold (progressively refined) :param dx_thr: accuracy criterion for threshold :return: excitation threshold ''' # Assign empty history if first function call if history is None: history = [] # Compute function output at interval mid-point x = (xbounds[0] + xbounds[1]) / 2 sim_args = args[:] sim_args.insert(ix, x) history.append(bool_func(sim_args)) # If titration interval is small enough conv = False if (xbounds[1] - xbounds[0]) <= dx_thr: logger.debug('titration interval smaller than defined threshold') # If both conditions have been encountered during titration process, # we're going towards convergence if (0 in history and 1 in history): logger.debug('converging around threshold') # If current value satisfies condition, convergence is achieved # -> return threshold if history[-1]: logger.debug('currently satisfying condition -> convergence') return x # If only one condition has been encountered during titration process, # then no titration is impossible within the defined interval -> return NaN else: logger.warning('titration does not converge within this interval') return np.nan # Return threshold if convergence is reached, otherwise refine interval and iterate if conv: return x else: if x > 0.: xbounds = (xbounds[0], x) if history[-1] else (x, xbounds[1]) else: xbounds = (x, xbounds[1]) if history[-1] else (xbounds[0], x) return binarySearch(bool_func, args, ix, xbounds, dx_thr, history=history) diff --git a/paper figures/fig2.py b/paper figures/fig2.py index ce3234d..60b7d90 100644 --- a/paper figures/fig2.py +++ b/paper figures/fig2.py @@ -1,329 +1,329 @@ # -*- coding: utf-8 -*- # @Author: Theo # @Date: 2018-06-06 18:38:04 # @Last Modified by: Theo Lemaire -# @Last Modified time: 2019-06-06 18:26:24 +# @Last Modified time: 2019-06-07 13:54:14 ''' Sub-panels of the model optimization figure. ''' import os import logging import numpy as np import matplotlib import matplotlib.pyplot as plt from matplotlib.ticker import FormatStrFormatter from matplotlib.patches import Rectangle from argparse import ArgumentParser from PySONIC.utils import logger, rescale, si_format, selectDirDialog from PySONIC.plt import SchemePlot, cm2inch from PySONIC.constants import NPC_FULL from PySONIC.neurons import CorticalRS from PySONIC.core import BilayerSonophore, NeuronalBilayerSonophore # Plot parameters matplotlib.rcParams['pdf.fonttype'] = 42 matplotlib.rcParams['ps.fonttype'] = 42 matplotlib.rcParams['font.family'] = 'arial' # Figure basename figbase = os.path.splitext(__file__)[0] def PmApprox(bls, Z, fs=12, lw=2): fig, ax = plt.subplots(figsize=cm2inch(7, 7)) for key in ['right', 'top']: ax.spines[key].set_visible(False) for key in ['bottom', 'left']: ax.spines[key].set_linewidth(2) ax.spines['bottom'].set_position('zero') ax.set_xlabel('Z (nm)', fontsize=fs) ax.set_ylabel('Pressure (kPa)', fontsize=fs, labelpad=-10) ax.set_xticks([0, bls.a * 1e9]) ax.set_xticklabels(['0', 'a']) ax.tick_params(axis='x', which='major', length=25, pad=5) ax.set_yticks([0]) ax.set_ylim([-10, 50]) for item in ax.get_xticklabels() + ax.get_yticklabels(): item.set_fontsize(fs) ax.plot(Z * 1e9, bls.v_PMavg(Z, bls.v_curvrad(Z), bls.surface(Z)) * 1e-3, c='g', label='$P_m$') ax.plot(Z * 1e9, bls.PMavgpred(Z) * 1e-3, '--', c='r', label='$\~P_m$') ax.axhline(y=0, color='k') ax.legend(fontsize=fs, frameon=False) fig.tight_layout() fig.canvas.set_window_title(figbase + 'a') return fig def recasting(nbls, Fdrive, Adrive, fs=12, lw=2, ps=15): # Run effective simulation data, _ = nbls.simulate(Fdrive, Adrive, 5 / Fdrive, 0., method='full') t, Qm, Vm = [data[key].values for key in ['t', 'Qm', 'Vm']] t *= 1e6 # us Qm *= 1e5 # nC/cm2 Qrange = (Qm.min(), Qm.max()) dQ = Qrange[1] - Qrange[0] # Create figure and axes fig, axes = plt.subplots(1, 2, figsize=cm2inch(17, 5)) for ax in axes: ax.set_xticks([]) ax.set_yticks([]) # Plot Q-trace and V-trace ax = axes[0] for key in ['top', 'right']: ax.spines[key].set_visible(False) for key in ['bottom', 'left']: ax.spines[key].set_position(('axes', -0.03)) ax.spines[key].set_linewidth(2) ax.plot(t, Vm, label='Vm', c='dimgrey', linewidth=lw) ax.plot(t, Qm, label='Qm', c='k', linewidth=lw) ax.add_patch(Rectangle( (t[0], Qrange[0] - 5), t[-1], dQ + 10, fill=False, edgecolor='k', linestyle='--', linewidth=1 )) ax.yaxis.set_tick_params(width=2) ax.yaxis.set_major_formatter(FormatStrFormatter('%.0f')) # ax.set_xlim((t.min(), t.max())) ax.set_xticks([]) ax.set_xlabel('{}s'.format(si_format((t.max()), space=' ')), fontsize=fs) ax.set_ylabel('$\\rm nC/cm^2$ - mV', fontsize=fs, labelpad=-15) ax.set_yticks(ax.get_ylim()) for item in ax.get_yticklabels(): item.set_fontsize(fs) # Plot inset on Q-trace ax = axes[1] for key in ['top', 'right', 'bottom', 'left']: ax.spines[key].set_linewidth(1) ax.spines[key].set_linestyle('--') ax.plot(t, Vm, label='Vm', c='dimgrey', linewidth=lw) ax.plot(t, Qm, label='Qm', c='k', linewidth=lw) ax.set_xlim((t.min(), t.max())) ax.set_xticks([]) ax.set_yticks([]) delta = 0.05 ax.set_ylim(Qrange[0] - delta * dQ, Qrange[1] + delta * dQ) fig.canvas.set_window_title(figbase + 'b') return fig def mechSim(bls, Fdrive, Adrive, Qm, fs=12, lw=2, ps=15): # Run mechanical simulation data, _ = bls.simulate(Fdrive, Adrive, Qm) t, Z, ng = [data[key].values for key in ['t', 'Z', 'ng']] # Create figure fig, ax = plt.subplots(figsize=cm2inch(7, 7)) fig.suptitle('Mechanical simulation', fontsize=12) for skey in ['bottom', 'left', 'right', 'top']: ax.spines[skey].set_visible(False) ax.set_xticks([]) ax.set_yticks([]) # Plot variables and labels t_plot = np.insert(t, 0, -1e-6) * 1e6 Pac = Adrive * np.sin(2 * np.pi * Fdrive * t + np.pi) # Pa yvars = {'P_A': Pac * 1e-3, 'Z': Z * 1e9, 'n_g': ng * 1e22} colors = {'P_A': 'k', 'Z': 'C0', 'n_g': 'C5'} dy = 1.2 for i, ykey in enumerate(yvars.keys()): y = yvars[ykey] y_plot = rescale(np.insert(y, 0, y[0])) - dy * i ax.plot(t_plot, y_plot, color=colors[ykey], linewidth=lw) ax.text(t_plot[0] - 0.1, y_plot[0], '$\mathregular{{{}}}$'.format(ykey), fontsize=fs, horizontalalignment='right', verticalalignment='center', color=colors[ykey]) # Acoustic pressure annotations ax.annotate(s='', xy=(1.5, 1.1), xytext=(3.5, 1.1), arrowprops=dict(arrowstyle='<|-|>', color='k')) ax.text(2.5, 1.12, '1/f', fontsize=fs, color='k', horizontalalignment='center', verticalalignment='bottom') ax.annotate(s='', xy=(1.5, -0.1), xytext=(1.5, 1), arrowprops=dict(arrowstyle='<|-|>', color='k')) ax.text(1.55, 0.4, '2A', fontsize=fs, color='k', horizontalalignment='left', verticalalignment='center') # Periodic stabilization patch ax.add_patch(Rectangle((2, -2 * dy - 0.1), 2, 2 * dy, color='dimgrey', alpha=0.3)) ax.text(3, -2 * dy - 0.2, 'limit cycle', fontsize=fs, color='dimgrey', horizontalalignment='center', verticalalignment='top') # Z_last patch ax.add_patch(Rectangle((2, -dy - 0.1), 2, dy, edgecolor='k', facecolor='none', linestyle='--')) # ngeff annotations c = plt.get_cmap('tab20').colors[11] ax.text(t_plot[-1] + 0.1, y_plot[-1], '$\mathregular{n_{g,eff}}$', fontsize=fs, color=c, horizontalalignment='left', verticalalignment='center') ax.scatter([t_plot[-1]], [y_plot[-1]], color=c, s=ps) fig.canvas.set_window_title(figbase + 'c mechsim') return fig def cycleAveraging(bls, neuron, Fdrive, Adrive, Qm, fs=12, lw=2, ps=15): # Run mechanical simulation data, _ = bls.simulate(Fdrive, Adrive, Qm) t, Z, ng = [data[key].values for key in ['t', 'Z', 'ng']] # Compute variables evolution over last acoustic cycle t_last = t[-NPC_FULL:] * 1e6 # us Z_last = Z[-NPC_FULL:] # m Cm = bls.v_Capct(Z_last) * 1e2 # uF/m2 Vm = Qm / Cm * 1e5 # mV yvars = { 'C_m': Cm, # uF/cm2 'V_m': Vm, # mV '\\alpha_m': neuron.alpham(Vm) * 1e3, # ms-1 '\\beta_m': neuron.betam(Vm) * 1e3, # ms-1 'p_\\infty / \\tau_p': neuron.pinf(Vm) / neuron.taup(Vm) * 1e3, # ms-1 '(1-p_\\infty) / \\tau_p': (1 - neuron.pinf(Vm)) / neuron.taup(Vm) * 1e3 # ms-1 } # Determine colors violets = plt.get_cmap('Paired').colors[8:10][::-1] oranges = plt.get_cmap('Paired').colors[6:8][::-1] colors = { 'C_m': ['k', 'dimgrey'], 'V_m': plt.get_cmap('tab20').colors[14:16], '\\alpha_m': violets, '\\beta_m': oranges, 'p_\\infty / \\tau_p': violets, '(1-p_\\infty) / \\tau_p': oranges } # Create figure and axes fig, axes = plt.subplots(6, 1, figsize=cm2inch(4, 15)) fig.suptitle('Cycle-averaging', fontsize=fs) for ax in axes: ax.set_xticks([]) ax.set_yticks([]) for skey in ['bottom', 'left', 'right', 'top']: ax.spines[skey].set_visible(False) # Plot variables for ax, ykey in zip(axes, yvars.keys()): ax.set_xticks([]) ax.set_yticks([]) for skey in ['bottom', 'left', 'right', 'top']: ax.spines[skey].set_visible(False) y = yvars[ykey] ax.plot(t_last, y, color=colors[ykey][0], linewidth=lw) ax.plot([t_last[0], t_last[-1]], [np.mean(y)] * 2, '--', color=colors[ykey][1]) ax.scatter([t_last[-1]], [np.mean(y)], s=ps, color=colors[ykey][1]) ax.text(t_last[0] - 0.1, y[0], '$\mathregular{{{}}}$'.format(ykey), fontsize=fs, horizontalalignment='right', verticalalignment='center', color=colors[ykey][0]) fig.canvas.set_window_title(figbase + 'c cycleavg') return fig def Qsolution(nbls, Fdrive, Adrive, tstim, toffset, PRF, DC, fs=12, lw=2, ps=15): # Run effective simulation data, _ = nbls.simulate(Fdrive, Adrive, tstim, toffset, PRF, DC, method='sonic') t, Qm, states = [data[key].values for key in ['t', 'Qm', 'stimstate']] t *= 1e3 # ms Qm *= 1e5 # nC/cm2 - _, tpulse_on, tpulse_off = SchemePlot.getStimPulses(t, states) + _, tpulse_on, tpulse_off = SchemePlot.getStimPulses(_, t, states) # Add small onset t = np.insert(t, 0, -5.0) Qm = np.insert(Qm, 0, Qm[0]) # Create figure and axes fig, ax = plt.subplots(figsize=cm2inch(12, 6)) ax.set_xticks([]) ax.set_yticks([]) for key in ['top', 'right']: ax.spines[key].set_visible(False) for key in ['bottom', 'left']: ax.spines[key].set_position(('axes', -0.03)) ax.spines[key].set_linewidth(2) # Plot Q-trace and stimulation pulses ax.plot(t, Qm, label='Qm', c='k', linewidth=lw) for ton, toff in zip(tpulse_on, tpulse_off): ax.axvspan(ton, toff, edgecolor='none', facecolor='#8A8A8A', alpha=0.2) ax.yaxis.set_tick_params(width=2) ax.yaxis.set_major_formatter(FormatStrFormatter('%.0f')) ax.set_xlim((t.min(), t.max())) ax.set_xticks([]) ax.set_xlabel('{}s'.format(si_format((t.max()) * 1e-3, space=' ')), fontsize=fs) ax.set_ylabel('$\\rm nC/cm^2$', fontsize=fs, labelpad=-15) ax.set_yticks(ax.get_ylim()) for item in ax.get_yticklabels(): item.set_fontsize(fs) ax.legend(fontsize=fs, frameon=False) fig.canvas.set_window_title(figbase + 'e Qtrace') return fig def main(): ap = ArgumentParser() # Runtime options ap.add_argument('-v', '--verbose', default=False, action='store_true', help='Increase verbosity') ap.add_argument('-o', '--outdir', type=str, help='Output directory') ap.add_argument('-f', '--figset', type=str, nargs='+', help='Figure set', default='all') ap.add_argument('-s', '--save', default=False, action='store_true', help='Save output figures as pdf') args = ap.parse_args() loglevel = logging.DEBUG if args.verbose is True else logging.INFO logger.setLevel(loglevel) figset = args.figset if figset == 'all': figset = ['a', 'b', 'c', 'e'] logger.info('Generating panels {} of {}'.format(figset, figbase)) # Parameters neuron = CorticalRS() a = 32e-9 # m Fdrive = 500e3 # Hz Adrive = 100e3 # Pa PRF = 100. # Hz DC = 0.5 tstim = 150e-3 # s toffset = 100e-3 # s Qm = -71.9e-5 # C/cm2 bls = BilayerSonophore(a, neuron.Cm0, neuron.Cm0 * neuron.Vm0 * 1e-3) nbls = NeuronalBilayerSonophore(a, neuron) # Figures figs = [] if 'a' in figset: figs.append(PmApprox(bls, np.linspace(-0.4 * bls.Delta_, bls.a, 1000))) if 'b' in figset: figs.append(recasting(nbls, Fdrive, Adrive)) if 'c' in figset: figs += [ mechSim(bls, Fdrive, Adrive, Qm), cycleAveraging(bls, neuron, Fdrive, Adrive, Qm) ] if 'e' in figset: figs.append(Qsolution(nbls, Fdrive, Adrive, tstim, toffset, PRF, DC)) if args.save: outdir = selectDirDialog() if args.outdir is None else args.outdir if outdir == '': logger.error('No input directory chosen') return for fig in figs: figname = '{}.pdf'.format(fig.canvas.get_window_title()) fig.savefig(os.path.join(outdir, figname), transparent=True) else: plt.show() if __name__ == '__main__': main() diff --git a/scripts/plot_QSS.py b/scripts/plot_QSS.py index c240645..05b104b 100644 --- a/scripts/plot_QSS.py +++ b/scripts/plot_QSS.py @@ -1,68 +1,70 @@ # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Date: 2018-09-28 16:13:34 # @Last Modified by: Theo Lemaire -# @Last Modified time: 2019-06-03 20:10:53 +# @Last Modified time: 2019-06-07 15:55:50 ''' Phase-plane analysis of neuron behavior under quasi-steady state approximation. ''' import os +import numpy as np import matplotlib.pyplot as plt from PySONIC.utils import logger from PySONIC.plt import plotQSSvars, plotQSSVarVsAmp, plotEqChargeVsAmp from PySONIC.parsers import AStimParser def main(): # Parse command line arguments parser = AStimParser() parser.addCmap(default='viridis') + parser.addAscale() parser.addSave() - parser.addInputDir() - parser.addOutputDir() - parser.add_argument( - '--comp', default=False, action='store_true', help='Compare with simulations') + parser.outputdir_dep_key = 'save' + parser.addCompare(desc='Compare with simulations') + parser.addInputDir(dep_key='compare') + parser.defaults['amp'] = np.logspace(np.log10(1), np.log10(600), 100) # kPa + parser.defaults['tstim'] = 1000. # ms + parser.defaults['toffset'] = 0. # ms args = parser.parse() + args['inputdir'] = parser.parseInputDir(args) logger.setLevel(args['loglevel']) - args['inputdir'] = parser.parseInputDir(args) if args['comp'] else None - args['outputdir'] = parser.parseOutputDir(args) if args['save'] else None if args['plot'] is None: args['plot'] = ['dQdt'] - a, Fdrive, tstim, toffset, PRF = [ args[k][0] for k in ['radius', 'freq', 'tstim', 'toffset', 'PRF']] figs = [] for i, neuron in enumerate(args['neuron']): for DC in args['DC']: if args['amp'].size == 1: figs.append( plotQSSvars(neuron, a, Fdrive, args['amp'][0])) else: # Plot evolution of QSS vars vs Q for different amplitudes for pvar in args['plot']: figs.append(plotQSSVarVsAmp( neuron, a, Fdrive, pvar, amps=args['amp'], DC=DC, cmap=args['cmap'], zscale=args['Ascale'])) # Plot equilibrium charge as a function of amplitude if 'dQdt' in args['plot']: figs.append(plotEqChargeVsAmp( neuron, a, Fdrive, amps=args['amp'], tstim=tstim, toffset=toffset, PRF=PRF, DC=DC, xscale=args['Ascale'], compdir=args['inputdir'], mpi=args['mpi'], loglevel=args['loglevel'])) if args['save']: for fig in figs: s = fig.canvas.get_window_title() s = s.replace('(', '- ').replace('/', '_').replace(')', '') figname = '{}.png'.format(s) fig.savefig(os.path.join(args['outputdir'], figname), transparent=True) else: plt.show() if __name__ == '__main__': main() diff --git a/scripts/plot_timeseries.py b/scripts/plot_timeseries.py index 6610010..e38a758 100644 --- a/scripts/plot_timeseries.py +++ b/scripts/plot_timeseries.py @@ -1,71 +1,63 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Date: 2017-02-13 12:41:26 # @Email: theo.lemaire@epfl.ch # @Last Modified by: Theo Lemaire -# @Last Modified time: 2019-06-06 18:13:12 +# @Last Modified time: 2019-06-07 15:12:51 ''' Plot temporal profiles of specific simulation output variables. ''' import logging -from argparse import ArgumentParser import matplotlib.pyplot as plt from PySONIC.utils import logger, OpenFilesDialog, selectDirDialog from PySONIC.plt import ComparativePlot, SchemePlot +from PySONIC.parsers import Parser # Set logging level logger.setLevel(logging.INFO) def main(): - ap = ArgumentParser() - - # Runtime options - ap.add_argument('-v', '--verbose', default=False, action='store_true', help='Increase verbosity') - ap.add_argument('--hide', default=False, action='store_true', help='Hide output') - ap.add_argument('-o', '--outputdir', type=str, default=None, help='Output directory') - ap.add_argument('-c', '--compare', default=False, action='store_true', help='Comparative graph') - ap.add_argument('-s', '--save', default=False, action='store_true', help='Save output') - ap.add_argument('-p', '--plot', type=str, nargs='+', default=None, help='Variables to plot') - ap.add_argument('-f', '--frequency', type=int, default=1, help='Sampling frequency for plot') - - # Parse arguments - args = {key: value for key, value in vars(ap.parse_args()).items() if value is not None} - logger.setLevel(logging.DEBUG if args['verbose'] else logging.INFO) + # Parse command line arguments + parser = Parser() + parser.addHideOutput() + parser.addOutputDir(dep_key='save') + parser.addCompare() + parser.addSave() + parser.addSamplingRate() + args = parser.parse() # Select data files pkl_filepaths, _ = OpenFilesDialog('pkl') if not pkl_filepaths: logger.error('No input file') return # Plot appropriate graph if args['compare']: - if 'plot' not in args: - logger.error('Plot variable must be specified') - quit() - try: - comp_plot = ComparativePlot(pkl_filepaths, args['plot'][0]) - comp_plot.render() - except KeyError as e: - logger.error(e) + if args['plot'] == ['all']: + logger.error('Specific variables must be specified for comparative plots') quit() + for pltvar in args['plot']: + try: + comp_plot = ComparativePlot(pkl_filepaths, pltvar) + comp_plot.render() + except KeyError as e: + logger.error(e) + quit() else: - pltscheme = {key: [key] for key in args['plot']} if 'plot' in args else None - if 'outputdir' not in args: - args['outputdir'] = selectDirDialog() if args['save'] else None - scheme_plot = SchemePlot(pkl_filepaths, pltscheme=pltscheme) + scheme_plot = SchemePlot(pkl_filepaths, pltscheme=parser.parsePltScheme(args)) scheme_plot.render( title=True, save=args['save'], ask_before_save=not args['save'], directory=args['outputdir'] ) if not args['hide']: plt.show() if __name__ == '__main__': main() diff --git a/scripts/run_astim.py b/scripts/run_astim.py index 652e267..e24d024 100644 --- a/scripts/run_astim.py +++ b/scripts/run_astim.py @@ -1,54 +1,52 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Date: 2017-02-13 18:16:09 # @Email: theo.lemaire@epfl.ch # @Last Modified by: Theo Lemaire -# @Last Modified time: 2019-06-06 18:20:17 +# @Last Modified time: 2019-06-07 14:59:17 ''' Run A-STIM simulations of a specific point-neuron. ''' import matplotlib.pyplot as plt from PySONIC.core import NeuronalBilayerSonophore, Batch from PySONIC.utils import logger from PySONIC.plt import SchemePlot from PySONIC.parsers import AStimParser def main(): # Parse command line arguments parser = AStimParser() - parser.addOutputDir() args = parser.parse() logger.setLevel(args['loglevel']) - args['outputdir'] = parser.parseOutputDir(args) # Run A-STIM batch logger.info("Starting A-STIM simulation batch") pkl_filepaths = [] for a in args['radius']: for neuron in args['neuron']: nbls = NeuronalBilayerSonophore(a, neuron) queue = nbls.simQueue( args['freq'], args['amp'], args['tstim'], args['toffset'], args['PRF'], args['DC'], args['method'][0] ) for item in queue: item.insert(0, args['outputdir']) batch = Batch(nbls.runAndSave, queue) pkl_filepaths += batch(mpi=args['mpi'], loglevel=args['loglevel']) # Plot resulting profiles if args['plot'] is not None: SchemePlot(pkl_filepaths, pltscheme=parser.parsePltScheme(args))() plt.show() if __name__ == '__main__': main() diff --git a/scripts/run_estim.py b/scripts/run_estim.py index bc4cbbd..f05a61b 100644 --- a/scripts/run_estim.py +++ b/scripts/run_estim.py @@ -1,48 +1,46 @@ # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Date: 2017-08-24 11:55:07 # @Last Modified by: Theo Lemaire -# @Last Modified time: 2019-06-06 18:20:45 +# @Last Modified time: 2019-06-07 14:57:30 ''' Run E-STIM simulations of a specific point-neuron. ''' import matplotlib.pyplot as plt from PySONIC.core import Batch from PySONIC.utils import logger from PySONIC.plt import SchemePlot from PySONIC.parsers import EStimParser def main(): # Parse command line arguments parser = EStimParser() - parser.addOutputDir() args = parser.parse() logger.setLevel(args['loglevel']) - args['outputdir'] = parser.parseOutputDir(args) # Run E-STIM batch logger.info("Starting E-STIM simulation batch") pkl_filepaths = [] for neuron in args['neuron']: queue = neuron.simQueue( args['amp'], args['tstim'], args['toffset'], args['PRF'], args['DC'], ) for item in queue: item.insert(0, args['outputdir']) batch = Batch(neuron.runAndSave, queue) pkl_filepaths += batch(mpi=args['mpi'], loglevel=args['loglevel']) # Plot resulting profiles if args['plot'] is not None: SchemePlot(pkl_filepaths, pltscheme=parser.parsePltScheme(args))() plt.show() if __name__ == '__main__': main() diff --git a/scripts/run_lookups.py b/scripts/run_lookups.py index 8ca5812..eaf1b75 100644 --- a/scripts/run_lookups.py +++ b/scripts/run_lookups.py @@ -1,218 +1,218 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Date: 2017-06-02 17:50:10 # @Email: theo.lemaire@epfl.ch # @Last Modified by: Theo Lemaire -# @Last Modified time: 2019-06-06 15:05:34 +# @Last Modified time: 2019-06-07 15:41:10 ''' Create lookup table for specific neuron. ''' import os import itertools import pickle import logging import numpy as np from argparse import ArgumentParser -from PySONIC.utils import logger, getNeuronLookupsFile +from PySONIC.utils import logger, getNeuronLookupsFile, isIterable from PySONIC.neurons import getPointNeuron from PySONIC.core import NeuronalBilayerSonophore, createQueue, Batch # Default parameters defaults = dict( neuron='RS', radius=np.array([16.0, 32.0, 64.0]), # nm freq=np.array([20., 100., 500., 1e3, 2e3, 3e3, 4e3]), # kHz amp=np.insert(np.logspace(np.log10(0.1), np.log10(600), num=50), 0, 0.0), # kPa ) def computeAStimLookups(neuron, aref, fref, Aref, Qref, fsref=None, mpi=False, loglevel=logging.INFO): ''' Run simulations of the mechanical system for a multiple combinations of imposed sonophore radius, US frequencies, acoustic amplitudes charge densities and (spatially-averaged) sonophore membrane coverage fractions, compute effective coefficients and store them in a dictionary of n-dimensional arrays. :param neuron: neuron object :param aref: array of sonophore radii (m) :param fref: array of acoustic drive frequencies (Hz) :param Aref: array of acoustic drive amplitudes (Pa) :param Qref: array of membrane charge densities (C/m2) :param fsref: acoustic drive phase (rad) :param mpi: boolean statting wether or not to use multiprocessing :param loglevel: logging level :return: lookups dictionary ''' descs = { 'a': 'sonophore radii', 'f': 'US frequencies', 'A': 'US amplitudes', 'fs': 'sonophore membrane coverage fractions' } # Populate inputs dictionary inputs = { 'a': aref, # nm 'f': fref, # Hz 'A': Aref, # Pa 'Q': Qref # C/m2 } # Add fs to inputs if provided, otherwise add default value (1) err_fs = 'cannot span {} for more than 1 {}' if fsref is not None: for x in ['a', 'f']: assert inputs[x].size == 1, err_fs.format(descs['fs'], descs[x]) inputs['fs'] = fsref else: inputs['fs'] = np.array([1.]) # Check validity of input parameters for key, values in inputs.items(): - if not (isinstance(values, list) or isinstance(values, np.ndarray)): + if not isIterable(values): raise TypeError( 'Invalid {} (must be provided as list or numpy array)'.format(descs[key])) if not all(isinstance(x, float) for x in values): raise TypeError('Invalid {} (must all be float typed)'.format(descs[key])) if len(values) == 0: raise ValueError('Empty {} array'.format(key)) if key in ('a', 'f') and min(values) <= 0: raise ValueError('Invalid {} (must all be strictly positive)'.format(descs[key])) if key in ('A', 'fs') and min(values) < 0: raise ValueError('Invalid {} (must all be positive or null)'.format(descs[key])) # Get dimensions of inputs that have more than one value dims = np.array([x.size for x in inputs.values()]) dims = dims[dims > 1] ncombs = dims.prod() # Create simulation queue per radius queue = createQueue(fref, Aref, Qref) for i in range(len(queue)): queue[i].append(inputs['fs']) # Run simulations and populate outputs (list of lists) logger.info('Starting simulation batch for %s neuron', neuron.name) outputs = [] for a in aref: nbls = NeuronalBilayerSonophore(a, neuron) batch = Batch(nbls.computeEffVars, queue) outputs += batch(mpi=mpi, loglevel=loglevel) # Split comp times and effvars from outputs tcomps, effvars = [list(x) for x in zip(*outputs)] effvars = list(itertools.chain.from_iterable(effvars)) # Reshape effvars into nD arrays and add them to lookups dictionary logger.info('Reshaping output into lookup tables') varkeys = list(effvars[0].keys()) nout = len(effvars) assert nout == ncombs, 'number of outputs does not match number of combinations' lookups = {} for key in varkeys: effvar = [effvars[i][key] for i in range(nout)] lookups[key] = np.array(effvar).reshape(dims) # Reshape comp times into nD array (minus fs dimension) if fsref is not None: dims = dims[:-1] tcomps = np.array(tcomps).reshape(dims) # Store inputs, lookup data and comp times in dictionary df = { 'input': inputs, 'lookup': lookups, 'tcomp': tcomps } return df def main(): ap = ArgumentParser() # Runtime options ap.add_argument('--mpi', default=False, action='store_true', help='Use multiprocessing') ap.add_argument('-v', '--verbose', default=False, action='store_true', help='Increase verbosity') ap.add_argument('-t', '--test', default=False, action='store_true', help='Test configuration') # Stimulation parameters ap.add_argument('-n', '--neuron', type=str, default=defaults['neuron'], help='Neuron name (string)') ap.add_argument('-a', '--radius', nargs='+', type=float, help='Sonophore radius (nm)') ap.add_argument('-f', '--freq', nargs='+', type=float, help='US frequency (kHz)') ap.add_argument('-A', '--amp', nargs='+', type=float, help='Acoustic pressure amplitude (kPa)') ap.add_argument('-Q', '--charge', nargs='+', type=float, help='Membrane charge density (nC/cm2)') ap.add_argument('--spanFs', default=False, action='store_true', help='Span sonophore coverage fraction') # Parse arguments args = {key: value for key, value in vars(ap.parse_args()).items() if value is not None} loglevel = logging.DEBUG if args['verbose'] is True else logging.INFO logger.setLevel(loglevel) mpi = args['mpi'] neuron_str = args['neuron'] radii = np.array(args.get('radius', defaults['radius'])) * 1e-9 # m freqs = np.array(args.get('freq', defaults['freq'])) * 1e3 # Hz amps = np.array(args.get('amp', defaults['amp'])) * 1e3 # Pa # Check neuron name validity try: neuron = getPointNeuron(neuron_str) except ValueError as err: logger.error(err) return # Determine charge vector if 'charge' in args: charges = np.array(args['charge']) * 1e-5 # C/m2 else: charges = np.arange(neuron.Qbounds()[0], neuron.Qbounds()[1] + 1e-5, 1e-5) # C/m2 # Determine fs vector fs = None if args['spanFs']: fs = np.linspace(0, 100, 101) * 1e-2 # (-) # Determine output filename lookup_path = { True: getNeuronLookupsFile(neuron.name), False: getNeuronLookupsFile(neuron.name, a=radii[0], Fdrive=freqs[0], fs=True) }[fs is None] # Combine inputs into single list inputs = [radii, freqs, amps, charges, fs] # Adapt inputs and output filename if test case if args['test']: for i, x in enumerate(inputs): if x is not None and x.size > 1: inputs[i] = np.array([x.min(), x.max()]) lookup_path = '{}_test{}'.format(*os.path.splitext(lookup_path)) # Check if lookup file already exists if os.path.isfile(lookup_path): logger.warning('"%s" file already exists and will be overwritten. ' + 'Continue? (y/n)', lookup_path) user_str = input() if user_str not in ['y', 'Y']: logger.error('%s Lookup creation canceled', neuron.name) return # Compute lookups df = computeAStimLookups(neuron, *inputs, mpi=mpi, loglevel=loglevel) # Save dictionary in lookup file logger.info('Saving %s neuron lookup table in file: "%s"', neuron.name, lookup_path) with open(lookup_path, 'wb') as fh: pickle.dump(df, fh) if __name__ == '__main__': main() diff --git a/scripts/run_mech.py b/scripts/run_mech.py index 2d16650..ce079d7 100644 --- a/scripts/run_mech.py +++ b/scripts/run_mech.py @@ -1,48 +1,46 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Date: 2016-11-21 10:46:56 # @Email: theo.lemaire@epfl.ch # @Last Modified by: Theo Lemaire -# @Last Modified time: 2019-06-06 18:21:13 +# @Last Modified time: 2019-06-07 14:58:39 ''' Run simulations of the NICE mechanical model. ''' import matplotlib.pyplot as plt from PySONIC.core import BilayerSonophore, Batch from PySONIC.utils import logger from PySONIC.plt import SchemePlot from PySONIC.parsers import MechSimParser def main(): # Parse command line arguments parser = MechSimParser() - parser.addOutputDir() args = parser.parse() logger.setLevel(args['loglevel']) - args['outputdir'] = parser.parseOutputDir(args) # Run MECH batch logger.info("Starting mechanical simulation batch") pkl_filepaths = [] for a in args['radius']: for d in args['embedding']: for Cm0 in args['Cm0']: for Qm0 in args['Qm0']: bls = BilayerSonophore(a, Cm0, Qm0, embedding_depth=d) queue = bls.simQueue(args['freq'], args['amp'], args['charge']) for item in queue: item.insert(0, args['outputdir']) batch = Batch(bls.runAndSave, queue) pkl_filepaths += batch(mpi=args['mpi'], loglevel=args['loglevel']) # Plot resulting profiles if args['plot'] is not None: SchemePlot(pkl_filepaths, pltscheme=parser.parsePltScheme(args))() plt.show() if __name__ == '__main__': main()