diff --git a/PySONIC/batches.py b/PySONIC/batches.py index e4f420a..396be48 100644 --- a/PySONIC/batches.py +++ b/PySONIC/batches.py @@ -1,179 +1,180 @@ # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Date: 2017-08-22 14:33:04 # @Last Modified by: Theo Lemaire -# @Last Modified time: 2019-05-31 15:09:27 +# @Last Modified time: 2019-05-31 15:19:22 ''' Utility functions used in simulations ''' import os import lockfile +import pickle import logging import multiprocessing as mp import numpy as np import pandas as pd from .utils import logger class Consumer(mp.Process): ''' Generic consumer process, taking tasks from a queue and outputing results in another queue. ''' def __init__(self, queue_in, queue_out): mp.Process.__init__(self) self.queue_in = queue_in self.queue_out = queue_out logger.debug('Starting %s', self.name) def run(self): while True: nextTask = self.queue_in.get() if nextTask is None: logger.debug('Exiting %s', self.name) self.queue_in.task_done() break answer = nextTask() self.queue_in.task_done() self.queue_out.put(answer) return class Worker(): ''' Generic worker class calling a specific object's method with a given set of parameters. ''' def __init__(self, wid, obj, method_str, params, loglevel): ''' Worker constructor. :param wid: worker ID :param obj: object containing the method to call :param method_str: name of the method to call :param params: list of method parameters :param loglevel: logging level ''' self.id = wid self.obj = obj self.method_str = method_str self.params = params self.loglevel = loglevel def __call__(self): ''' Caller to the specific object method. ''' logger.setLevel(self.loglevel) return self.id, getattr(self.obj, self.method_str)(*self.params) def createQueue(dims): ''' Create a serialized 2D array of all parameter combinations for a series of individual parameter sweeps. :param dims: list of lists (or 1D arrays) of input parameters :return: list of parameters (list) for each simulation ''' ndims = len(dims) dims_in = [dims[1], dims[0]] inds_out = [1, 0] if ndims > 2: dims_in += dims[2:] inds_out += list(range(2, ndims)) queue = np.stack(np.meshgrid(*dims_in), -1).reshape(-1, ndims) queue = queue[:, inds_out] return queue.tolist() -# def runAndSave(self, outdir, Fdrive, Adrive, Qm): -# data, tcomp = self.simulate(Fdrive, Adrive, Qm) -# meta = self.meta(Fdrive, Adrive, Qm) +# def runAndSave(self, outdir, obj, args): +# data, tcomp = obj.simulate(*args) +# meta = obj.meta(*args) # meta['tcomp'] = tcomp -# simcode = self.filecode(Fdrive, Adrive, Qm) +# simcode = obj.filecode(*args) # outpath = '{}/{}.pkl'.format(outdir, simcode) # with open(outpath, 'wb') as fh: # pickle.dump({'meta': meta, 'data': data}, fh) # logger.debug('simulation data exported to "%s"', outpath) # return outpath def runBatch(obj, method_str, queue, extra_params=[], mpi=False, loglevel=logging.INFO): ''' Run batch of simulations of a given object for various combinations of stimulation parameters. :param queue: array of all stimulation parameters combinations :param mpi: boolean stating whether or not to use multiprocessing ''' nsims = len(queue) if mpi: mp.freeze_support() tasks = mp.JoinableQueue() results = mp.Queue() nconsumers = min(mp.cpu_count(), nsims) consumers = [Consumer(tasks, results) for i in range(nconsumers)] for w in consumers: w.start() # Run simulations outputs = [] for i, stim_params in enumerate(queue): params = extra_params + stim_params if mpi: worker = Worker(i, obj, method_str, params, loglevel) tasks.put(worker, block=False) else: outputs.append(getattr(obj, method_str)(*params)) if mpi: for i in range(nconsumers): tasks.put(None, block=False) tasks.join() idxs = [] for i in range(nsims): wid, out = results.get() outputs.append(out) idxs.append(wid) outputs = [x for _, x in sorted(zip(idxs, outputs))] # Close tasks and results queues tasks.close() results.close() return outputs def xlslog(filepath, logentry, sheetname='Data'): ''' Append log data on a new row to specific sheet of excel workbook, using a lockfile to avoid read/write errors between concurrent processes. :param filepath: absolute or relative path to the Excel workbook :param logentry: log entry (dictionary) to add to log file :param sheetname: name of the Excel spreadsheet to which data is appended :return: boolean indicating success (1) or failure (0) of operation ''' # Parse log dataframe from Excel file if it exists, otherwise create new one if not os.path.isfile(filepath): df = pd.DataFrame(columns=logentry.keys()) else: df = pd.read_excel(filepath, sheet_name=sheetname) # Add log entry to log dataframe df = df.append(pd.Series(logentry), ignore_index=True) # Write log dataframe to Excel file try: lock = lockfile.FileLock(filepath) lock.acquire() writer = pd.ExcelWriter(filepath) df.to_excel(writer, sheet_name=sheetname, index=False) writer.save() lock.release() return 1 except PermissionError: # If file cannot be accessed for writing because already opened logger.warning('Cannot write to "%s". Close the file and type "Y"', filepath) user_str = input() if user_str in ['y', 'Y']: return xlslog(filepath, logentry, sheetname) else: return 0 diff --git a/PySONIC/core/nbls.py b/PySONIC/core/nbls.py index 5677004..63917bd 100644 --- a/PySONIC/core/nbls.py +++ b/PySONIC/core/nbls.py @@ -1,728 +1,728 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Date: 2016-09-29 16:16:19 # @Email: theo.lemaire@epfl.ch # @Last Modified by: Theo Lemaire -# @Last Modified time: 2019-05-31 15:12:41 +# @Last Modified time: 2019-05-31 15:24:12 from copy import deepcopy import logging import pickle import numpy as np import pandas as pd from scipy.integrate import solve_ivp from scipy.interpolate import interp1d from .simulators import PWSimulator, HybridSimulator from .bls import BilayerSonophore from .pneuron import PointNeuron from ..batches import createQueue from ..utils import * from ..constants import * from ..postpro import getFixedPoints class NeuronalBilayerSonophore(BilayerSonophore): ''' This class inherits from the BilayerSonophore class and receives an PointNeuron instance at initialization, to define the electro-mechanical NICE model and its SONIC variant. ''' tscale = 'ms' # relevant temporal scale of the model defvar = 'Q' # default plot variable def __init__(self, a, neuron, Fdrive=None, embedding_depth=0.0): ''' Constructor of the class. :param a: in-plane radius of the sonophore structure within the membrane (m) :param neuron: neuron object :param Fdrive: frequency of acoustic perturbation (Hz) :param embedding_depth: depth of the embedding tissue around the membrane (m) ''' # Check validity of input parameters if not isinstance(neuron, PointNeuron): raise ValueError('Invalid neuron type: "{}" (must inherit from PointNeuron class)' .format(neuron.name)) self.neuron = neuron # Initialize BilayerSonophore parent object BilayerSonophore.__init__(self, a, neuron.Cm0, neuron.Cm0 * neuron.Vm0 * 1e-3, embedding_depth) def __repr__(self): return 'NeuronalBilayerSonophore({}m, {})'.format( si_format(self.a, precision=1, space=' '), self.neuron) def pprint(self): return '{}m radius NBLS - {} neuron'.format( si_format(self.a, precision=0, space=' '), self.neuron.name) def getPltVars(self, wrapleft='df["', wrapright='"]'): pltvars = super().getPltVars(wrapleft, wrapright) pltvars.update(self.neuron.getPltVars(wrapleft, wrapright)) return pltvars def getPltScheme(self): return self.neuron.getPltScheme() - def filecode(self, Fdrive, Adrive, tstim, PRF, DC, method): + def filecode(self, Fdrive, Adrive, tstim, toffset, PRF, DC, method): return 'ASTIM_{}_{}_{:.0f}nm_{:.0f}kHz_{:.2f}kPa_{:.0f}ms_{}{}'.format( self.neuron.name, 'CW' if DC == 1 else 'PW', self.a * 1e9, Fdrive * 1e-3, Adrive * 1e-3, tstim * 1e3, 'PRF{:.2f}Hz_DC{:.2f}%_'.format(PRF, DC * 1e2) if DC < 1. else '', method) def fullDerivatives(self, y, t, Adrive, Fdrive, phi): ''' Compute the derivatives of the (n+3) ODE full NBLS system variables. :param y: vector of state variables :param t: specific instant in time (s) :param Adrive: acoustic drive amplitude (Pa) :param Fdrive: acoustic drive frequency (Hz) :param phi: acoustic drive phase (rad) :return: vector of derivatives ''' dydt_mech = BilayerSonophore.derivatives(self, y[:3], t, Adrive, Fdrive, y[3], phi) dydt_elec = self.neuron.Qderivatives(y[3:], t, self.Capct(y[1])) return dydt_mech + dydt_elec def effDerivatives(self, y, t, lkp): ''' Compute the derivatives of the n-ODE effective HH system variables, based on 1-dimensional linear interpolation of "effective" coefficients that summarize the system's behaviour over an acoustic cycle. :param y: vector of HH system variables at time t :param t: specific instant in time (s) :param lkp: dictionary of 1D data points of "effective" coefficients over the charge domain, for specific frequency and amplitude values. :return: vector of effective system derivatives at time t ''' # Split input vector explicitly Qm, *states = y # Compute charge and channel states variation Vmeff = self.neuron.interpVmeff(Qm, lkp) dQmdt = - self.neuron.iNet(Vmeff, states) * 1e-3 dstates = self.neuron.derEffStates(Qm, states, lkp) # Return derivatives vector return [dQmdt, *[dstates[k] for k in self.neuron.states]] def interpEffVariable(self, key, Qm, stim, lkp_on, lkp_off): ''' Interpolate Q-dependent effective variable along solution. :param key: lookup variable key :param Qm: charge density solution vector :param stim: stimulation state solution vector :param lkp_on: lookups for ON states :param lkp_off: lookups for OFF states :return: interpolated effective variable vector ''' x = np.zeros(stim.size) x[stim == 0] = np.interp( Qm[stim == 0], lkp_on['Q'], lkp_on[key], left=np.nan, right=np.nan) x[stim == 1] = np.interp( Qm[stim == 1], lkp_off['Q'], lkp_off[key], left=np.nan, right=np.nan) return x def runFull(self, Fdrive, Adrive, tstim, toffset, PRF, DC, phi=np.pi): ''' Compute solutions of the full electro-mechanical system for a specific set of US stimulation parameters, using a classic integration scheme. The first iteration uses the quasi-steady simplification to compute the initiation of motion from a flat leaflet configuration. Afterwards, the ODE system is solved iteratively until completion. :param Fdrive: acoustic drive frequency (Hz) :param Adrive: acoustic drive amplitude (Pa) :param tstim: duration of US stimulation (s) :param toffset: duration of the offset (s) :param PRF: pulse repetition frequency (Hz) :param DC: pulse duty cycle (-) :param phi: acoustic drive phase (rad) :return: 2-tuple with the output dataframe and computation time. ''' # Determine time step dt = 1 / (NPC_FULL * Fdrive) # Compute non-zero deflection value for a small perturbation (solving quasi-steady equation) Pac = self.Pacoustic(dt, Adrive, Fdrive, phi) Z0 = self.balancedefQS(self.ng0, self.Qm0, Pac) # Set initial conditions steady_states = self.neuron.steadyStates(self.neuron.Vm0) y0 = np.concatenate(( [0., Z0, self.ng0, self.Qm0], [steady_states[k] for k in self.neuron.states])) # Initialize simulator and compute solution logger.debug('Computing detailed solution') simulator = PWSimulator( lambda y, t: self.fullDerivatives(y, t, Adrive, Fdrive, phi), lambda y, t: self.fullDerivatives(y, t, 0., 0., 0.)) (t, y, stim), tcomp = simulator( y0, dt, tstim, toffset, PRF, DC, print_progress=logger.getEffectiveLevel() <= logging.INFO, target_dt=CLASSIC_TARGET_DT, monitor_time=True) logger.debug('completed in %ss', si_format(tcomp, 1)) # Store output in dataframe data = pd.DataFrame({ 't': t, 'stimstate': stim, 'Z': y[:, 1], 'ng': y[:, 2], 'Qm': y[:, 3] }) data['Vm'] = data['Qm'].values / self.v_Capct(data['Z'].values) * 1e3 # mV for i in range(len(self.neuron.states)): data[self.neuron.states[i]] = y[:, i + 4] # Return dataframe and computation time return data, tcomp def runSONIC(self, Fdrive, Adrive, tstim, toffset, PRF, DC, dt=DT_EFF): ''' Compute solutions of the system for a specific set of US stimulation parameters, using charge-predicted "effective" coefficients to solve the HH equations at each step. :param Fdrive: acoustic drive frequency (Hz) :param Adrive: acoustic drive amplitude (Pa) :param tstim: duration of US stimulation (s) :param toffset: duration of the offset (s) :param PRF: pulse repetition frequency (Hz) :param DC: pulse duty cycle (-) :param dt: integration time step (s) :return: 3-tuple with the time profile, the effective solution matrix and a state vector ''' # Load appropriate 2D lookups Aref, Qref, lookups2D, _ = getLookups2D(self.neuron.name, a=self.a, Fdrive=Fdrive) # Check that acoustic amplitude is within lookup range Adrive = isWithin('amplitude', Adrive, (Aref.min(), Aref.max())) # Interpolate 2D lookups at zero and US amplitude logger.debug('Interpolating lookups at A = %.2f kPa and A = 0', Adrive * 1e-3) lookups_on = {key: interp1d(Aref, y2D, axis=0)(Adrive) for key, y2D in lookups2D.items()} lookups_off = {key: interp1d(Aref, y2D, axis=0)(0.0) for key, y2D in lookups2D.items()} # Add reference charge vector to 1D lookup dictionaries lookups_on['Q'] = Qref lookups_off['Q'] = Qref # Set initial conditions steady_states = self.neuron.steadyStates(self.neuron.Vm0) y0 = np.insert( np.array([steady_states[k] for k in self.neuron.states]), 0, self.Qm0) # Initialize simulator and compute solution logger.debug('Computing effective solution') simulator = PWSimulator( lambda y, t: self.effDerivatives(y, t, lookups_on), lambda y, t: self.effDerivatives(y, t, lookups_off)) (t, y, stim), tcomp = simulator(y0, dt, tstim, toffset, PRF, DC, monitor_time=True) logger.debug('completed in %ss', si_format(tcomp, 1)) # Store output in dataframe data = pd.DataFrame({ 't': t, 'stimstate': stim, 'Qm': y[:, 0] }) for key in ['ng', 'V']: data[key] = self.interpEffVariable( key, data['Qm'].values, stim, lookups_on, lookups_off) data['Z'] = np.array([self.balancedefQS(ng, Qm) for ng, Qm in zip( data['ng'].values, data['Qm'].values)]) # m data['Vm'] = data['Qm'].values / self.v_Capct(data['Z'].values) * 1e3 # mV for i in range(len(self.neuron.states)): data[self.neuron.states[i]] = y[:, i + 1] # Return dataframe and computation time return data, tcomp def runHybrid(self, Fdrive, Adrive, tstim, toffset, PRF, DC, phi=np.pi): ''' Compute solutions of the system for a specific set of US stimulation parameters, using a hybrid integration scheme. :param Fdrive: acoustic drive frequency (Hz) :param Adrive: acoustic drive amplitude (Pa) :param tstim: duration of US stimulation (s) :param toffset: duration of the offset (s) :param phi: acoustic drive phase (rad) :return: 3-tuple with the time profile, the solution matrix and a state vector ''' # Determine time step dt_dense = 1 / (NPC_FULL * Fdrive) dt_sparse = 1 / (NPC_HH * Fdrive) # Compute non-zero deflection value for a small perturbation (solving quasi-steady equation) Pac = self.Pacoustic(dt_dense, Adrive, Fdrive, phi) Z0 = self.balancedefQS(self.ng0, self.Qm0, Pac) # Set initial conditions steady_states = self.neuron.steadyStates(self.neuron.Vm0) y0 = np.concatenate(( [0., Z0, self.ng0, self.Qm0], [steady_states[k] for k in self.neuron.states], )) is_dense_var = np.array([True] * 3 + [False] * (len(self.neuron.states) + 1)) # Initialize simulator and compute solution logger.debug('Computing hybrid solution') simulator = HybridSimulator( lambda y, t: self.fullDerivatives(y, t, Adrive, Fdrive, phi), lambda y, t: self.fullDerivatives(y, t, 0., 0., 0.), lambda t, y, Cm: self.neuron.Qderivatives(y, t, Cm), lambda yref: self.Capct(yref[1]), is_dense_var, ivars_to_check=[1, 2]) (t, y, stim), tcomp = simulator( y0, dt_dense, dt_sparse, Fdrive, tstim, toffset, PRF, DC, monitor_time=True) logger.debug('completed in %ss', si_format(tcomp, 1)) # Store output in dataframe data = pd.DataFrame({ 't': t, 'stimstate': stim, 'Z': y[:, 1], 'ng': y[:, 2], 'Qm': y[:, 3] }) data['Vm'] = data['Qm'].values / self.v_Capct(data['Z'].values) * 1e3 # mV for i in range(len(self.neuron.states)): data[self.neuron.states[i]] = y[:, i + 4] # Return dataframe and computation time return data, tcomp def simulate(self, Fdrive, Adrive, tstim, toffset, PRF=None, DC=1.0, method='sonic'): ''' Simulate the electro-mechanical model for a specific set of US stimulation parameters, and return output data in a dataframe. :param Fdrive: acoustic drive frequency (Hz) :param Adrive: acoustic drive amplitude (Pa) :param tstim: duration of US stimulation (s) :param toffset: duration of the offset (s) :param PRF: pulse repetition frequency (Hz) :param DC: pulse duty cycle (-) :param method: selected integration method :return: 2-tuple with the output dataframe and computation time. ''' logger.info( '%s: %s @ f = %sHz, %st = %ss (%ss offset)%s', self, 'titration' if Adrive is None else 'simulation', si_format(Fdrive, 0, space=' '), 'A = {}Pa, '.format(si_format(Adrive, 2, space=' ')) if Adrive is not None else '', *si_format([tstim, toffset], 1, space=' '), (', PRF = {}Hz, DC = {:.2f}%'.format( si_format(PRF, 2, space=' '), DC * 1e2) if DC < 1.0 else '')) # TODO: If no amplitude provided, perform titration if Adrive is None: Adrive = self.titrate(Fdrive, tstim, toffset, PRF, DC, method=method) if np.isnan(Adrive): logger.error('Could not find threshold excitation amplitude') return None # Check validity of stimulation parameters BilayerSonophore.checkInputs(self, Fdrive, Adrive, 0.0, 0.0) self.neuron.checkInputs(Adrive, tstim, toffset, PRF, DC) # Call appropriate simulation function try: simfunc = { 'full': self.runFull, 'hybrid': self.runHybrid, 'sonic': self.runSONIC }[method] except KeyError: raise ValueError('Invalid integration method: "{}"'.format(method)) data, tcomp = simfunc(Fdrive, Adrive, tstim, toffset, PRF, DC) # Log number of detected spikes nspikes = self.neuron.getNSpikes(data) logger.debug('{} spike{} detected'.format(nspikes, plural(nspikes))) # Return dataframe and computation time return data, tcomp def meta(self, Fdrive, Adrive, tstim, toffset, PRF, DC, method): ''' Return information about object and simulation parameters. :param Fdrive: US frequency (Hz) :param Adrive: acoustic drive amplitude (Pa) :param tstim: stimulus duration (s) :param toffset: stimulus offset (s) :param PRF: pulse repetition frequency (Hz) :param DC: stimulus duty cycle (-) :param method: integration method :return: meta-data dictionary ''' return { 'neuron': self.neuron.name, 'a': self.a, 'd': self.d, 'Fdrive': Fdrive, 'Adrive': Adrive, 'tstim': tstim, 'toffset': toffset, 'PRF': PRF, 'DC': DC, 'method': method } def titrate(self, Fdrive, tstim, toffset, PRF=None, DC=1.0, Arange=None, method='sonic'): ''' Use a binary search to determine the threshold amplitude needed to obtain neural excitation for a given frequency, duration, PRF and duty cycle. :param Fdrive: US frequency (Hz) :param tstim: duration of US stimulation (s) :param toffset: duration of the offset (s) :param PRF: pulse repetition frequency (Hz) :param DC: pulse duty cycle (-) :param Arange: search interval for Adrive, iteratively refined :return: determined threshold amplitude (Pa) ''' # Determine amplitude interval if needed if Arange is None: Arange = (0, getLookups2D(self.neuron.name, a=self.a, Fdrive=Fdrive)[0].max()) # Determine output function if self.neuron.isTitratable(): xfunc = self.isExcited else: xfunc = self.isSilenced # Titrate return titrate(xfunc, (Fdrive, tstim, toffset, PRF, DC, method), Arange, TITRATION_ASTIM_DA_MAX) def createQueue(self, freqs, amps, durations, offsets, PRFs, DCs, method): ''' Create a serialized 2D array of all parameter combinations for a series of individual parameter sweeps, while avoiding repetition of CW protocols for a given PRF sweep. :param freqs: list (or 1D-array) of US frequencies :param amps: list (or 1D-array) of acoustic amplitudes :param durations: list (or 1D-array) of stimulus durations :param offsets: list (or 1D-array) of stimulus offsets (paired with durations array) :param PRFs: list (or 1D-array) of pulse-repetition frequencies :param DCs: list (or 1D-array) of duty cycle values :params method: integration method :return: list of parameters (list) for each simulation ''' if amps is None: amps = [np.nan] DCs = np.array(DCs) queue = [] if 1.0 in DCs: queue += createQueue((freqs, amps, durations, offsets, min(PRFs), 1.0)) if np.any(DCs != 1.0): queue += createQueue((freqs, amps, durations, offsets, PRFs, DCs[DCs != 1.0])) for item in queue: if np.isnan(item[1]): item[1] = None item.append(method) return queue def runAndSave(self, outdir, Fdrive, Adrive, tstim, toffset, PRF=None, DC=1.0, method='sonic'): ''' Run a simulation of the full electro-mechanical system for a given neuron type with specific parameters, and save the results in a PKL file. :param outdir: full path to output directory :param Adrive: acoustic pressure amplitude (Pa) :param Fdrive: US frequency (Hz) :param tstim: stimulus duration (s) :param toffset: stimulus offset (s) :param PRF: pulse repetition frequency (Hz) :param DC: stimulus duty cycle (-) :param method: integration method ''' data, tcomp = self.simulate(Fdrive, Adrive, tstim, toffset, PRF, DC, method) meta = self.meta(Fdrive, Adrive, tstim, toffset, PRF, DC, method) meta['tcomp'] = tcomp - simcode = self.filecode(Fdrive, Adrive, tstim, PRF, DC, method) + simcode = self.filecode(Fdrive, Adrive, tstim, toffset, PRF, DC, method) outpath = '{}/{}.pkl'.format(outdir, simcode) with open(outpath, 'wb') as fh: pickle.dump({'meta': meta, 'data': data}, fh) logger.debug('simulation data exported to "%s"', outpath) return outpath def quasiSteadyStates(self, Fdrive, amps=None, charges=None, DCs=1.0, squeeze_output=False): ''' Compute the quasi-steady state values of the neuron's gating variables for a combination of US amplitudes, charge densities and duty cycles, at a specific US frequency. :param Fdrive: US frequency (Hz) :param amps: US amplitudes (Pa) :param charges: membrane charge densities (C/m2) :param DCs: duty cycle value(s) :return: 4-tuple with reference values of US amplitude and charge density, as well as interpolated Vmeff and QSS gating variables ''' # Get DC-averaged lookups interpolated at the appropriate amplitudes and charges amps, charges, lookups = getLookupsDCavg( self.neuron.name, self.a, Fdrive, amps, charges, DCs) # Compute QSS states using these lookups nA, nQ, nDC = lookups['V'].shape QSS = {k: np.empty((nA, nQ, nDC)) for k in self.neuron.states} for iA in range(nA): for iDC in range(nDC): QSS_1D = self.neuron.quasiSteadyStates( {k: v[iA, :, iDC] for k, v in lookups.items()}) for k in QSS.keys(): QSS[k][iA, :, iDC] = QSS_1D[k] # Compress outputs if needed if squeeze_output: QSS = {k: v.squeeze() for k, v in QSS.items()} lookups = {k: v.squeeze() for k, v in lookups.items()} # Return reference inputs and outputs return amps, charges, lookups, QSS def quasiSteadyStateiNet(self, Qm, Fdrive, Adrive, DC): ''' Compute quasi-steady state net membrane current for a given combination of US parameters and a given membrane charge density. :param Qm: membrane charge density (C/m2) :param Fdrive: US frequency (Hz) :param Adrive: US amplitude (Pa) :param DC: duty cycle (-) :return: net membrane current (mA/m2) ''' _, _, lookups, QSS = self.quasiSteadyStates( Fdrive, amps=Adrive, charges=Qm, DCs=DC, squeeze_output=True) return self.neuron.iNet(lookups['V'], np.array(list(QSS.values()))) # mA/m2 def evaluateStability(self, Qm0, states0, lkp): ''' Integrate the effective differential system from a given starting point, until clear convergence or clear divergence is found. :param Qm0: initial membrane charge density (C/m2) :param states0: dictionary of initial states values :param lkp: dictionary of 1D data points of "effective" coefficients over the charge domain, for specific frequency and amplitude values. :return: boolean indicating convergence state ''' # Initialize y0 vector t0 = 0. y0 = np.array([Qm0] + list(states0.values())) # Initializing empty list to record evolution of charge deviation n = int(QSS_HISTORY_INTERVAL // QSS_INTEGRATION_INTERVAL) # size of history dQ = [] # As long as there is no clear charge convergence or divergence conv, div = False, False tf, yf = t0, y0 while not conv and not div: # Integrate system for small interval and retrieve final charge deviation t0, y0 = tf, yf sol = solve_ivp( lambda t, y: self.effDerivatives(y, t, lkp), [t0, t0 + QSS_INTEGRATION_INTERVAL], y0, method='LSODA' ) tf, yf = sol.t[-1], sol.y[:, -1] dQ.append(yf[0] - Qm0) # logger.debug('{:.0f} ms: dQ = {:.5f} nC/cm2, avg dQ = {:.5f} nC/cm2'.format( # tf * 1e3, dQ[-1] * 1e5, np.mean(dQ[-n:]) * 1e5)) # If last charge deviation is too large -> divergence if np.abs(dQ[-1]) > QSS_Q_DIV_THR: div = True # If last charge deviation or average deviation in recent history # is small enough -> convergence for x in [dQ[-1], np.mean(dQ[-n:])]: if np.abs(x) < QSS_Q_CONV_THR: conv = True # If max integration duration is been reached -> error if tf > QSS_MAX_INTEGRATION_DURATION: raise ValueError('too many iterations') logger.debug('{}vergence after {:.0f} ms: dQ = {:.5f} nC/cm2'.format( {True: 'con', False: 'di'}[conv], tf * 1e3, dQ[-1] * 1e5)) return conv def quasiSteadyStateFixedPoints(self, Fdrive, Adrive, DC, lkp, dQdt): ''' Compute QSS fixed points along the charge dimension for a given combination of US parameters, and determine their stability. :param Fdrive: US frequency (Hz) :param Adrive: US amplitude (Pa) :param DC: duty cycle (-) :param lkp: lookup dictionary for effective variables along charge dimension :param dQdt: charge derivative profile along charge dimension :return: 2-tuple with values of stable and unstable fixed points ''' logger.debug('A = {:.2f} kPa, DC = {:.0f}%'.format(Adrive * 1e-3, DC * 1e2)) # Extract stable and unstable fixed points from QSS charge variation profile dfunc = lambda Qm: - self.quasiSteadyStateiNet(Qm, Fdrive, Adrive, DC) SFP_candidates = getFixedPoints(lkp['Q'], dQdt, filter='stable', der_func=dfunc).tolist() UFPs = getFixedPoints(lkp['Q'], dQdt, filter='unstable', der_func=dfunc).tolist() SFPs = [] pltvars = self.getPltVars() # For each candidate SFP for i, Qm in enumerate(SFP_candidates): logger.debug('Q-SFP = {:.2f} nC/cm2'.format(Qm * 1e5)) # Re-compute QSS *_, QSS_FP = self.quasiSteadyStates(Fdrive, amps=Adrive, charges=Qm, DCs=DC, squeeze_output=True) # Simulate from unperturbed QSS and evaluate stability if not self.evaluateStability(Qm, QSS_FP, lkp): logger.warning('diverging system at ({:.2f} kPa, {:.2f} nC/cm2)'.format( Adrive * 1e-3, Qm * 1e5)) UFPs.append(Qm) else: # For each state unstable_states = [] for x in self.neuron.states: pltvar = pltvars[x] unit_str = pltvar.get('unit', '') factor = pltvar.get('factor', 1) is_stable_direction = [] for sign in [-1, +1]: # Perturb state with small offset QSS_perturbed = deepcopy(QSS_FP) QSS_perturbed[x] *= (1 + sign * QSS_REL_OFFSET) # If gating state, bound within [0., 1.] if self.neuron.isVoltageGated(x): QSS_perturbed[x] = np.clip(QSS_perturbed[x], 0., 1.) logger.debug('{}: {:.5f} -> {:.5f} {}'.format( x, QSS_FP[x] * factor, QSS_perturbed[x] * factor, unit_str)) # Simulate from perturbed QSS and evaluate stability is_stable_direction.append( self.evaluateStability(Qm, QSS_perturbed, lkp)) # Check if system shows stability upon x-state perturbation # in both directions if not np.all(is_stable_direction): unstable_states.append(x) # Classify fixed point as stable only if all states show stability is_stable_FP = len(unstable_states) == 0 {True: SFPs, False: UFPs}[is_stable_FP].append(Qm) logger.info('{}stable fixed-point at ({:.2f} kPa, {:.2f} nC/cm2){}'.format( '' if is_stable_FP else 'un', Adrive * 1e-3, Qm * 1e5, '' if is_stable_FP else ', caused by {} states'.format(unstable_states))) return SFPs, UFPs def findRheobaseAmps(self, DCs, Fdrive, Vthr): ''' Find the rheobase amplitudes (i.e. threshold acoustic amplitudes of infinite duration that would result in excitation) of a specific neuron for various duty cycles. :param DCs: duty cycles vector (-) :param Fdrive: acoustic drive frequency (Hz) :param Vthr: threshold membrane potential above which the neuron necessarily fires (mV) :return: rheobase amplitudes vector (Pa) ''' # Get threshold charge from neuron's spike threshold parameter Qthr = self.neuron.Cm0 * Vthr * 1e-3 # C/m2 # Get QSS variables for each amplitude at threshold charge Aref, _, Vmeff, QS_states = self.quasiSteadyStates(Fdrive, charges=Qthr, DCs=DCs) if DCs.size == 1: QS_states = QS_states.reshape((*QS_states.shape, 1)) Vmeff = Vmeff.reshape((*Vmeff.shape, 1)) # Compute 2D QSS charge variation array at Qthr dQdt = -self.neuron.iNet(Vmeff, QS_states) # Find the threshold amplitude that cancels dQdt for each duty cycle Arheobase = np.array([np.interp(0, dQdt[:, i], Aref, left=0., right=np.nan) for i in range(DCs.size)]) # Check if threshold amplitude is found for all DCs inan = np.where(np.isnan(Arheobase))[0] if inan.size > 0: if inan.size == Arheobase.size: logger.error( 'No rheobase amplitudes within [%s - %sPa] for the provided duty cycles', *si_format((Aref.min(), Aref.max()))) else: minDC = DCs[inan.max() + 1] logger.warning( 'No rheobase amplitudes within [%s - %sPa] below %.1f%% duty cycle', *si_format((Aref.min(), Aref.max())), minDC * 1e2) return Arheobase, Aref def computeEffVars(self, Fdrive, Adrive, Qm, fs): ''' Compute "effective" coefficients of the HH system for a specific combination of stimulus frequency, stimulus amplitude and charge density. A short mechanical simulation is run while imposing the specific charge density, until periodic stabilization. The HH coefficients are then averaged over the last acoustic cycle to yield "effective" coefficients. :param Fdrive: acoustic drive frequency (Hz) :param Adrive: acoustic drive amplitude (Pa) :param Qm: imposed charge density (C/m2) :param fs: list of sonophore membrane coverage fractions :return: list with computation time and a list of dictionaries of effective variables ''' # Run simulation and retrieve deflection and gas content vectors from last cycle data, tcomp = BilayerSonophore.simulate(self, Fdrive, Adrive, Qm) Z_last = data.loc[-NPC_FULL:, 'Z'].values # m Cm_last = self.v_Capct(Z_last) # F/m2 # For each coverage fraction effvars = [] for x in fs: # Compute membrane capacitance and membrane potential vectors Cm = x * Cm_last + (1 - x) * self.Cm0 # F/m2 Vm = Qm / Cm * 1e3 # mV # Compute average cycle value for membrane potential and rate constants effvars.append({'V': np.mean(Vm)}) effvars[-1].update(self.neuron.computeEffRates(Vm)) # Log process log = '{}: lookups @ {}Hz, {}Pa, {:.2f} nC/cm2'.format( self, *si_format([Fdrive, Adrive], precision=1, space=' '), Qm * 1e5) if len(fs) > 1: log += ', fs = {:.0f} - {:.0f}%'.format(fs.min() * 1e2, fs.max() * 1e2) log += ', tcomp = {:.3f} s'.format(tcomp) logger.info(log) # Return effective coefficients return [tcomp, effvars] diff --git a/PySONIC/core/pneuron.py b/PySONIC/core/pneuron.py index c933fb1..9f26521 100644 --- a/PySONIC/core/pneuron.py +++ b/PySONIC/core/pneuron.py @@ -1,630 +1,630 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Date: 2017-08-03 11:53:04 # @Email: theo.lemaire@epfl.ch # @Last Modified by: Theo Lemaire -# @Last Modified time: 2019-05-31 15:08:49 +# @Last Modified time: 2019-05-31 15:24:22 import pickle import abc import inspect import re import numpy as np import pandas as pd from ..postpro import findPeaks from ..constants import * from ..batches import createQueue from ..utils import si_format, logger, titrate, plural from .simulators import PWSimulator class PointNeuron(metaclass=abc.ABCMeta): ''' Abstract class defining the common API (i.e. mandatory attributes and methods) of all subclasses implementing the channels mechanisms of specific point neurons. ''' tscale = 'ms' # relevant temporal scale of the model defvar = 'V' # default plot variable def __repr__(self): return self.__class__.__name__ def pprint(self): return '{} neuron'.format(self.__class__.__name__) def filecode(self, Astim, tstim, PRF, DC): ''' File naming convention. ''' return 'ESTIM_{}_{}_{:.1f}mA_per_m2_{:.0f}ms{}'.format( self.name, 'CW' if DC == 1 else 'PW', Astim, tstim * 1e3, '_PRF{:.2f}Hz_DC{:.2f}%'.format(PRF, DC * 1e2) if DC < 1. else '') @property @abc.abstractmethod def name(self): return 'Should never reach here' @property @abc.abstractmethod def Cm0(self): return 'Should never reach here' @property @abc.abstractmethod def Vm0(self): return 'Should never reach here' @abc.abstractmethod def currents(self, Vm, states): ''' Compute all ionic currents per unit area. :param Vm: membrane potential (mV) :states: state probabilities of the ion channels :return: dictionary of ionic currents per unit area (mA/m2) ''' def iNet(self, Vm, states): ''' net membrane current :param Vm: membrane potential (mV) :states: states of ion channels gating and related variables :return: current per unit area (mA/m2) ''' return sum(self.currents(Vm, states).values()) def dQdt(self, Vm, states): ''' membrane charge density variation rate :param Vm: membrane potential (mV) :states: states of ion channels gating and related variables :return: variation rate (mA/m2) ''' return -self.iNet(Vm, states) def isTitratable(self): ''' Simple method returning whether the neuron can be titrated (defaults to True). ''' return True def currentToConcentrationRate(self, z_ion, depth): ''' Compute the conversion factor from a specific ionic current (in mA/m2) into a variation rate of submembrane ion concentration (in M/s). :param: z_ion: ion valence :param depth: submembrane depth (m) :return: conversion factor (Mmol.m-1.C-1) ''' return 1e-6 / (z_ion * depth * FARADAY) def nernst(self, z_ion, Cion_in, Cion_out, T): ''' Nernst potential of a specific ion given its intra and extracellular concentrations. :param z_ion: ion valence :param Cion_in: intracellular ion concentration :param Cion_out: extracellular ion concentration :param T: temperature (K) :return: ion Nernst potential (mV) ''' return (Rg * T) / (z_ion * FARADAY) * np.log(Cion_out / Cion_in) * 1e3 def vtrap(self, x, y): ''' Generic function used to compute rate constants. ''' return x / (np.exp(x / y) - 1) def efun(self, x): ''' Generic function used to compute rate constants. ''' return x / (np.exp(x) - 1) def ghkDrive(self, Vm, Z_ion, Cion_in, Cion_out, T): ''' Use the Goldman-Hodgkin-Katz equation to compute the electrochemical driving force of a specific ion species for a given membrane potential. :param Vm: membrane potential (mV) :param Cin: intracellular ion concentration (M) :param Cout: extracellular ion concentration (M) :param T: temperature (K) :return: electrochemical driving force of a single ion particle (mC.m-3) ''' x = Z_ion * FARADAY * Vm / (Rg * T) * 1e-3 # [-] eCin = Cion_in * self.efun(-x) # M eCout = Cion_out * self.efun(x) # M return FARADAY * (eCin - eCout) * 1e6 # mC/m3 def getDesc(self): return inspect.getdoc(self).splitlines()[0] def getCurrentsNames(self): return list(self.currents(np.nan, [np.nan] * len(self.states)).keys()) def getPltScheme(self): pltscheme = { 'Q_m': ['Qm'], 'V_m': ['Vm'] } pltscheme['I'] = self.getCurrentsNames() + ['iNet'] for cname in self.getCurrentsNames(): if 'Leak' not in cname: key = 'i_{{{}}}\ kin.'.format(cname[1:]) cargs = inspect.getargspec(getattr(self, cname))[0][1:] pltscheme[key] = [var for var in cargs if var not in ['Vm', 'Cai']] return pltscheme def getPltVars(self, wrapleft='df["', wrapright='"]'): ''' Return a dictionary with information about all plot variables related to the neuron. ''' pltvars = { 'Qm': { 'desc': 'membrane charge density', 'label': 'Q_m', 'unit': 'nC/cm^2', 'factor': 1e5, 'bounds': (-100, 50) }, 'Vm': { 'desc': 'membrane potential', 'label': 'V_m', 'unit': 'mV', 'y0': self.Vm0, 'bounds': (-150, 70) }, 'ELeak': { 'constant': 'obj.ELeak', 'desc': 'non-specific leakage current resting potential', 'label': 'V_{leak}', 'unit': 'mV', 'ls': '--', 'color': 'k' } } for cname in self.getCurrentsNames(): cfunc = getattr(self, cname) cargs = inspect.getargspec(cfunc)[0][1:] pltvars[cname] = { 'desc': inspect.getdoc(cfunc).splitlines()[0], 'label': 'I_{{{}}}'.format(cname[1:]), 'unit': 'A/m^2', 'factor': 1e-3, 'func': '{}({})'.format(cname, ', '.join(['{}{}{}'.format(wrapleft, a, wrapright) for a in cargs])) } for var in cargs: if var not in ['Vm', 'Cai']: vfunc = getattr(self, 'der{}{}'.format(var[0].upper(), var[1:])) desc = cname + re.sub( '^Evolution of', '', inspect.getdoc(vfunc).splitlines()[0]) pltvars[var] = { 'desc': desc, 'label': var, 'bounds': (-0.1, 1.1) } pltvars['iNet'] = { 'desc': inspect.getdoc(getattr(self, 'iNet')).splitlines()[0], 'label': 'I_{net}', 'unit': 'A/m^2', 'factor': 1e-3, 'func': 'iNet({0}Vm{1}, {2}{3}{4}.values.T)'.format( wrapleft, wrapright, wrapleft[:-1], self.states, wrapright[1:]), 'ls': '--', 'color': 'black' } pltvars['dQdt'] = { 'desc': inspect.getdoc(getattr(self, 'dQdt')).splitlines()[0], 'label': 'dQ_m/dt', 'unit': 'A/m^2', 'factor': 1e-3, 'func': 'dQdt({0}Vm{1}, {2}{3}{4}.values.T)'.format( wrapleft, wrapright, wrapleft[:-1], self.states, wrapright[1:]), 'ls': '--', 'color': 'black' } for x in self.getGates(): for rate in ['alpha', 'beta']: pltvars['{}{}'.format(rate, x)] = { 'label': '\\{}_{{{}}}'.format(rate, x), 'unit': 'ms^{-1}', 'factor': 1e-3 } return pltvars def getRatesNames(self, states): ''' Return a list of names of the alpha and beta rates of the neuron. ''' return list(sum( [['alpha{}'.format(x.lower()), 'beta{}'.format(x.lower())] for x in states], [] )) def Qm0(self): ''' Return the resting charge density (in C/m2). ''' return self.Cm0 * self.Vm0 * 1e-3 # C/cm2 @abc.abstractmethod def steadyStates(self, Vm): ''' Compute the steady-state values for a specific membrane potential value. :param Vm: membrane potential (mV) :return: dictionary of steady-states ''' @abc.abstractmethod def derStates(self, Vm, states): ''' Compute the derivatives of channel states. :param Vm: membrane potential (mV) :states: state probabilities of the ion channels :return: current per unit area (mA/m2) ''' @abc.abstractmethod def computeEffRates(self, Vm): ''' Get the effective rate constants of ion channels, averaged along an acoustic cycle, for future use in effective simulations. :param Vm: array of membrane potential values for an acoustic cycle (mV) :return: a dictionary of rate average constants (s-1) ''' def interpEffRates(self, Qm, lkp, keys=None): ''' Interpolate effective rate constants for a given charge density using reference lookup vectors. :param Qm: membrane charge density (C/m2) :states: state probabilities of the ion channels :param lkp: dictionary of 1D vectors of "effective" coefficients over the charge domain, for specific frequency and amplitude values. :return: dictionary of interpolated rate constants ''' if keys is None: keys = self.rates return {k: np.interp(Qm, lkp['Q'], lkp[k], left=np.nan, right=np.nan) for k in keys} def interpVmeff(self, Qm, lkp): ''' Interpolate the effective membrane potential for a given charge density using reference lookup vectors. :param Qm: membrane charge density (C/m2) :param lkp: dictionary of 1D vectors of "effective" coefficients over the charge domain, for specific frequency and amplitude values. :return: dictionary of interpolated rate constants ''' return np.interp(Qm, lkp['Q'], lkp['V'], left=np.nan, right=np.nan) @abc.abstractmethod def derEffStates(self, Qm, states, lkp): ''' Compute the effective derivatives of channel states, based on 1-dimensional linear interpolation of "effective" coefficients that summarize the system's behaviour over an acoustic cycle. :param Qm: membrane charge density (C/m2) :states: state probabilities of the ion channels :param lkp: dictionary of 1D vectors of "effective" coefficients over the charge domain, for specific frequency and amplitude values. ''' def Qbounds(self): ''' Determine bounds of membrane charge physiological range for a given neuron. ''' return np.array([np.round(self.Vm0 - 25.0), 50.0]) * self.Cm0 * 1e-3 # C/m2 def isVoltageGated(self, state): ''' Determine whether a given state is purely voltage-gated or not.''' return 'alpha{}'.format(state.lower()) in self.rates def getGates(self): ''' Retrieve the names of the neuron's states that match an ion channel gating. ''' gates = [] for x in self.states: if self.isVoltageGated(x): gates.append(x) return gates def qsStates(self, lkp, states): ''' Compute a collection of quasi steady states using the standard xinf = ax / (ax + Bx) equation. :param lkp: dictionary of 1D vectors of "effective" coefficients over the charge domain, for specific frequency and amplitude values. :return: dictionary of quasi-steady states ''' return { x: lkp['alpha{}'.format(x)] / (lkp['alpha{}'.format(x)] + lkp['beta{}'.format(x)]) for x in states } @abc.abstractmethod def quasiSteadyStates(self, lkp): ''' Compute the quasi-steady states of a neuron for a range of membrane charge densities, based on 1-dimensional lookups interpolated at a given sonophore diameter, US frequency, US amplitude and duty cycle. :param lkp: dictionary of 1D vectors of "effective" coefficients over the charge domain, for specific frequency and amplitude values. :return: dictionary of quasi-steady states ''' def getRates(self, Vm): ''' Compute the ion channels rate constants for a given membrane potential. :param Vm: membrane potential (mV) :return: a dictionary of rate constants and their values at the given potential. ''' rates = {} for x in self.getGates(): x = x.lower() alpha_str, beta_str = ['{}{}'.format(s, x.lower()) for s in ['alpha', 'beta']] inf_str, tau_str = ['{}inf'.format(x.lower()), 'tau{}'.format(x.lower())] if hasattr(self, 'alpha{}'.format(x)): alphax = getattr(self, alpha_str)(Vm) betax = getattr(self, beta_str)(Vm) elif hasattr(self, '{}inf'.format(x)): xinf = getattr(self, inf_str)(Vm) taux = getattr(self, tau_str)(Vm) alphax = xinf / taux betax = 1 / taux - alphax rates[alpha_str] = alphax rates[beta_str] = betax return rates def Vderivatives(self, y, t, Iinj): ''' Compute the derivatives of a V-cast HH system for a specific value of injected current. :param y: vector of HH system variables at time t :param t: time value (s, unused) :param Iinj: injected current (mA/m2) :return: vector of HH system derivatives at time t ''' Vm, *states = y Iionic = self.iNet(Vm, states) # mA/m2 dVmdt = (- Iionic + Iinj) / self.Cm0 # mV/s dstates = self.derStates(Vm, states) return [dVmdt, *[dstates[k] for k in self.states]] def Qderivatives(self, y, t, Cm=None): ''' Compute the derivatives of the n-ODE HH system variables, based on a value of membrane capacitance. :param y: vector of HH system variables at time t :param t: specific instant in time (s) :param Cm: membrane capacitance (F/m2) :return: vector of HH system derivatives at time t ''' if Cm is None: Cm = self.Cm0 Qm, *states = y Vm = Qm / Cm * 1e3 # mV dQmdt = - self.iNet(Vm, states) * 1e-3 # A/m2 dstates = self.derStates(Vm, states) return [dQmdt, *[dstates[k] for k in self.states]] def checkInputs(self, Astim, tstim, toffset, PRF, DC): ''' Check validity of electrical stimulation parameters. :param Astim: pulse amplitude (mA/m2) :param tstim: pulse duration (s) :param toffset: offset duration (s) :param PRF: pulse repetition frequency (Hz) :param DC: pulse duty cycle (-) ''' # Check validity of stimulation parameters if not all(isinstance(param, float) for param in [Astim, tstim, toffset, DC]): raise TypeError('Invalid stimulation parameters (must be float typed)') if tstim <= 0: raise ValueError('Invalid stimulus duration: {} ms (must be strictly positive)' .format(tstim * 1e3)) if toffset < 0: raise ValueError('Invalid stimulus offset: {} ms (must be positive or null)' .format(toffset * 1e3)) if DC <= 0.0 or DC > 1.0: raise ValueError('Invalid duty cycle: {} (must be within ]0; 1])'.format(DC)) if DC < 1.0: if not isinstance(PRF, float): raise TypeError('Invalid PRF value (must be float typed)') if PRF is None: raise AttributeError('Missing PRF value (must be provided when DC < 1)') if PRF < 1 / tstim: raise ValueError('Invalid PRF: {} Hz (PR interval exceeds stimulus duration)' .format(PRF)) def simulate(self, Astim, tstim, toffset, PRF=None, DC=1.0, dt=DT_ESTIM): ''' Simulate a specific neuron model for a specific set of electrical parameters, and return output data in a dataframe. :param Astim: pulse amplitude (mA/m2) :param tstim: pulse duration (s) :param toffset: offset duration (s) :param PRF: pulse repetition frequency (Hz) :param DC: pulse duty cycle (-) :param dt: integration time step (s) :return: 2-tuple with the output dataframe and computation time. ''' logger.info( '%s: %s @ %st = %ss (%ss offset)%s', self, 'titration' if Astim is None else 'simulation', 'A = {}A/m2, '.format(si_format(Astim, 2, space=' ')) if Astim is not None else '', *si_format([tstim, toffset], 1, space=' '), (', PRF = {}Hz, DC = {:.2f}%'.format( si_format(PRF, 2, space=' '), DC * 1e2) if DC < 1.0 else '')) # TODO: If no amplitude provided, perform titration if Astim is None: Astim = self.titrate(tstim, toffset, PRF, DC) if np.isnan(Astim): logger.error('Could not find threshold excitation amplitude') return None # Check validity of stimulation parameters self.checkInputs(Astim, tstim, toffset, PRF, DC) # Set initial conditions steady_states = self.steadyStates(self.Vm0) y0 = np.array([self.Vm0, *[steady_states[k] for k in self.states]]) # Initialize simulator and compute solution logger.debug('Computing solution') simulator = PWSimulator( lambda y, t: self.Vderivatives(y, t, Astim), lambda y, t: self.Vderivatives(y, t, 0.)) (t, y, stim), tcomp = simulator(y0, dt, tstim, toffset, PRF, DC, monitor_time=True) logger.debug('completed in %ss', si_format(tcomp, 1)) # Store output in dataframe data = pd.DataFrame({ 't': t, 'stimstate': stim, 'Vm': y[:, 0], 'Qm': y[:, 0] * self.Cm0 * 1e-3 }) data['Qm'] = data['Vm'].values * self.Cm0 * 1e-3 for i in range(len(self.states)): data[self.states[i]] = y[:, i + 1] # Log number of detected spikes nspikes = self.getNSpikes(data) logger.debug('{} spike{} detected'.format(nspikes, plural(nspikes))) # Return dataframe and computation time return data, tcomp def meta(self, Astim, tstim, toffset, PRF, DC): ''' Return information about object and simulation parameters. :param Astim: stimulus amplitude (mA/m2) :param tstim: stimulus duration (s) :param toffset: stimulus offset (s) :param PRF: pulse repetition frequency (Hz) :param DC: stimulus duty cycle (-) :return: meta-data dictionary ''' return { 'neuron': self.name, 'Astim': Astim, 'tstim': tstim, 'toffset': toffset, 'PRF': PRF, 'DC': DC } def createQueue(self, amps, durations, offsets, PRFs, DCs): ''' Create a serialized 2D array of all parameter combinations for a series of individual parameter sweeps, while avoiding repetition of CW protocols for a given PRF sweep. :param amps: list (or 1D-array) of acoustic amplitudes :param durations: list (or 1D-array) of stimulus durations :param offsets: list (or 1D-array) of stimulus offsets (paired with durations array) :param PRFs: list (or 1D-array) of pulse-repetition frequencies :param DCs: list (or 1D-array) of duty cycle values :return: list of parameters (list) for each simulation ''' if amps is None: amps = [np.nan] DCs = np.array(DCs) queue = [] if 1.0 in DCs: queue += createQueue((amps, durations, offsets, min(PRFs), 1.0)) if np.any(DCs != 1.0): queue += createQueue((amps, durations, offsets, PRFs, DCs[DCs != 1.0])) for item in queue: if np.isnan(item[0]): item[0] = None return queue def runAndSave(self, outdir, Astim, tstim, toffset, PRF=None, DC=1.0): ''' Run a simulation of the point-neuron Hodgkin-Huxley system with specific parameters, and save the results in a PKL file. :param outdir: full path to output directory :param Astim: stimulus amplitude (mA/m2) :param tstim: stimulus duration (s) :param toffset: stimulus offset (s) :param PRF: pulse repetition frequency (Hz) :param DC: stimulus duty cycle (-) ''' data, tcomp = self.simulate(Astim, tstim, toffset, PRF, DC) meta = self.meta(Astim, tstim, toffset, PRF, DC) meta['tcomp'] = tcomp - simcode = self.filecode(Astim, tstim, PRF, DC) + simcode = self.filecode(Astim, tstim, toffset, PRF, DC) outpath = '{}/{}.pkl'.format(outdir, simcode) with open(outpath, 'wb') as fh: pickle.dump({'meta': meta, 'data': data}, fh) logger.debug('simulation data exported to "%s"', outpath) return outpath def getNSpikes(self, data): ''' Compute number of spikes in charge profile of simulation output. :param data: dataframe containing output time series :return: number of detected spikes ''' dt = np.diff(data.ix[:1, 't'].values)[0] ipeaks, *_ = findPeaks( data['Qm'].values, SPIKE_MIN_QAMP, int(np.ceil(SPIKE_MIN_DT / dt)), SPIKE_MIN_QPROM ) return ipeaks.size def getStabilizationValue(self, data): ''' Determine stabilization value from the charge profile of a simulation output. :param data: dataframe containing output time series :return: charge stabilization value (or np.nan if no stabilization detected) ''' # Extract charge signal posterior to observation window t, Qm = [data[key].values for key in ['t', 'Qm']] Qm = y[2, t > TMIN_STABILIZATION] # Compute variation range Qm_range = np.ptp(Qm) logger.debug('%.2f nC/cm2 variation range over the last %.0f ms', Qm_range * 1e5, TMIN_STABILIZATION * 1e3) # Return final value only if stabilization is detected if np.ptp(Qm) < QSS_Q_DIV_THR: return Qm[-1] else: return np.nan def isExcited(self, data): ''' Determine if neuron is excited from simulation output. :param data: dataframe containing output time series :return: boolean stating whether neuron is excited or not ''' return self.getNSpikes(data) > 0 def isSilenced(self, data): ''' Determine if neuron is silenced from simulation output. :param data: dataframe containing output time series :return: boolean stating whether neuron is silenced or not ''' return not np.isinan(self.getStabilizationValue(data)) def titrate(self, tstim, toffset, PRF=None, DC=1.0, Arange=(0., 2 * TITRATION_ESTIM_A_MAX), xfunc=None): ''' Use a binary search to determine the threshold amplitude needed to obtain neural excitation for a given duration, PRF and duty cycle. :param tstim: duration of US stimulation (s) :param toffset: duration of the offset (s) :param PRF: pulse repetition frequency (Hz) :param DC: pulse duty cycle (-) :param Arange: search interval for Astim, iteratively refined :return: excitation threshold amplitude (mA/m2) ''' # Determine output function if xfunc is None: xfunc = self.isExcited return titrate(xfunc, (tstim, toffset, PRF, DC), Arange, TITRATION_ESTIM_DA_MAX) diff --git a/PySONIC/plt/actmap.py b/PySONIC/plt/actmap.py index 128336a..9dc242a 100644 --- a/PySONIC/plt/actmap.py +++ b/PySONIC/plt/actmap.py @@ -1,632 +1,632 @@ # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Date: 2018-09-26 16:47:18 # @Last Modified by: Theo Lemaire -# @Last Modified time: 2019-05-27 15:44:30 +# @Last Modified time: 2019-05-31 15:26:49 import os import ntpath import pickle import numpy as np import pandas as pd import matplotlib.pyplot as plt import matplotlib from matplotlib.ticker import FormatStrFormatter from ..core import NeuronalBilayerSonophore from ..utils import logger, si_format from ..postpro import findPeaks from ..constants import * from ..neurons import getNeuronsDict from .pltutils import cm2inch, computeMeshEdges class ActivationMap: def __init__(self, root, neuron, a, Fdrive, tstim, PRF): self.root = root self.neuron = getNeuronsDict()[neuron]() self.a = a self.nbls = NeuronalBilayerSonophore(self.a, self.neuron) self.Fdrive = Fdrive self.tstim = tstim self.PRF = PRF self.out_fname = 'actmap {} {}Hz PRF{}Hz {}s.csv'.format( self.neuron.name, *si_format([self.Fdrive, self.PRF, self.tstim], space='')) self.out_fpath = os.path.join(self.root, self.out_fname) def cacheMap(self): # Load activation map from file if it exists if os.path.isfile(self.out_fpath): logger.info('Loading activation map for %s neuron', self.neuron.name) actmap = np.loadtxt(actmap_filepath, delimiter=',') else: # Save activation map to file self.compute(amps, DCs) np.savetxt(self.out_fpath, actmap, delimiter=',') def compute(self, amps, DCs): logger.info('Generating activation map for %s neuron', self.neuron.name) actmap = np.empty((amps.size, DCs.size)) nfiles = DCs.size * amps.size for i, A in enumerate(amps): for j, DC in enumerate(DCs): - fname = '{}.pkl'.format(nbls.filecode(Fdrive, A, tstim, PRF, DC, 'sonic')) + fname = '{}.pkl'.format(nbls.filecode(Fdrive, A, tstim, 0., PRF, DC, 'sonic')) fpath = os.path.join(root, fname) if not os.path.isfile(fpath): logger.error('"{}" file not found'.format(fname)) actmap[i, j] = np.nan else: # Load data logger.debug('Loading file {}/{}: "{}"'.format( i * amps.size + j + 1, nfiles, fname)) with open(fpath, 'rb') as fh: frame = pickle.load(fh) df = frame['data'] meta = frame['meta'] tstim = meta['tstim'] t = df['t'].values Qm = df['Qm'].values dt = t[1] - t[0] # Detect spikes on charge profile during stimulus mpd = int(np.ceil(SPIKE_MIN_DT / dt)) ispikes, *_ = findPeaks( Qm[t <= tstim], mph=SPIKE_MIN_QAMP, mpd=mpd, mpp=SPIKE_MIN_QPROM ) # Compute firing metrics if ispikes.size == 0: # if no spike, assign -1 actmap[i, j] = -1 elif ispikes.size == 1: # if only 1 spike, assign 0 actmap[i, j] = 0 else: # if more than 1 spike, assign firing rate FRs = 1 / np.diff(t[ispikes]) actmap[i, j] = np.mean(FRs) return actmap def onClick(self, event, amps, DCs, meshedges, tmax, Vbounds): ''' Retrieve the specific input parameters of the x and y dimensions when the user clicks on a cell in the 2D map, and define filename from it. ''' # Get DC and A from x and y coordinates x, y = event.xdata, event.ydata DC = DCs[np.searchsorted(meshedges[0], x * 1e-2) - 1] Adrive = amps[np.searchsorted(meshedges[1], y * 1e3) - 1] # Define filepath fname = '{}.pkl'.format(self.nbls.filecode( - self.Fdrive, Adrive, self.tstim, self.PRF, DC, 'sonic')) + self.Fdrive, Adrive, self.tstim, 0., self.PRF, DC, 'sonic')) fpath = os.path.join(self.root, fname) # Plot Q-trace try: plotQVeff(fpath, tmax=tmax, ybounds=Vbounds) plotFRspectrum(fpath) plt.show() except FileNotFoundError as err: logger.error(err) def plotQVeff(self, filepath, tonset=10, tmax=None, ybounds=None, fs=8, lw=1): ''' Plot superimposed profiles of membrane charge density and effective membrane potential. :param filepath: full path to the data file :param tonset: pre-stimulus onset to add to profiles (ms) :param tmax: max time value showed on graph (ms) :param ybounds: y-axis bounds (mV / nC/cm2) :return: handle to the generated figure ''' # Check file existence fname = ntpath.basename(filepath) if not os.path.isfile(filepath): raise FileNotFoundError('Error: "{}" file does not exist'.format(fname)) # Load data logger.debug('Loading data from "%s"', fname) with open(filepath, 'rb') as fh: frame = pickle.load(fh) df = frame['data'] t = df['t'].values * 1e3 # ms Qm = df['Qm'].values * 1e5 # nC/cm2 Vm = df['Vm'].values # mV # Add onset to profiles t = np.hstack((np.array([-tonset, t[0]]), t)) Vm = np.hstack((np.array([self.neuron.Vm0] * 2), Vm)) Qm = np.hstack((np.array([Qm[0]] * 2), Qm)) # Determine axes bounds if tmax is None: tmax = t.max() if ybounds is None: ybounds = (min(Vm.min(), Qm.min()), max(Vm.max(), Qm.max())) # Create figure fig, ax = plt.subplots(figsize=cm2inch(7, 3)) fig.canvas.set_window_title(fname) plt.subplots_adjust(left=0.2, bottom=0.2, right=0.95, top=0.95) for key in ['top', 'right']: ax.spines[key].set_visible(False) for key in ['bottom', 'left']: ax.spines[key].set_position(('axes', -0.03)) ax.spines[key].set_linewidth(2) ax.yaxis.set_tick_params(width=2) ax.yaxis.set_major_formatter(FormatStrFormatter('%.0f')) ax.set_xlim((-tonset, tmax)) ax.set_xticks([]) ax.set_xlabel('{}s'.format(si_format((tonset + tmax) * 1e-3, space=' ')), fontsize=fs) ax.set_ylabel('mV - $\\rm nC/cm^2$', fontsize=fs, labelpad=-15) ax.set_ylim(ybounds) ax.set_yticks(ybounds) for item in ax.get_yticklabels(): item.set_fontsize(fs) # Plot Qm and Vmeff profiles ax.plot(t, Vm, color='darkgrey', linewidth=lw) ax.plot(t, Qm, color='k', linewidth=lw) # fig.tight_layout() return fig def plotFRspectrum(self, filepath, FRbounds=None, fs=8, lw=1): ''' Plot firing rate specturm. :param filepath: full path to the data file :param FRbounds: firing rate bounds (Hz) :return: handle to the generated figure ''' # Determine FR bounds if FRbounds is None: FRbounds = (1e0, 1e3) # Check file existence fname = ntpath.basename(filepath) if not os.path.isfile(filepath): raise FileNotFoundError('Error: "{}" file does not exist'.format(fname)) # Load data logger.debug('Loading data from "%s"', fname) with open(filepath, 'rb') as fh: frame = pickle.load(fh) df = frame['data'] meta = frame['meta'] tstim = meta['tstim'] t = df['t'].values Qm = df['Qm'].values dt = t[1] - t[0] # Detect spikes on charge profile during stimulus mpd = int(np.ceil(SPIKE_MIN_DT / dt)) ispikes, *_ = findPeaks( Qm[t <= tstim], mph=SPIKE_MIN_QAMP, mpd=mpd, mpp=SPIKE_MIN_QPROM ) # Compute FR spectrum if ispikes.size <= MIN_NSPIKES_SPECTRUM: raise ValueError('Number of spikes is to small to form spectrum') FRs = 1 / np.diff(t[ispikes]) logbins = np.logspace(np.log10(FRbounds[0]), np.log10(FRbounds[1]), 30) # Create figure fig, ax = plt.subplots(figsize=cm2inch(7, 3)) fig.canvas.set_window_title(fname) for key in ['top', 'right']: ax.spines[key].set_visible(False) ax.set_xlim(FRbounds) ax.set_xlabel('Firing rate (Hz)', fontsize=fs) ax.set_ylabel('Density', fontsize=fs) for item in ax.get_yticklabels(): item.set_fontsize(fs) ax.hist(FRs, bins=logbins, density=True, color='k') ax.set_xscale('log') fig.tight_layout() return fig def getActivationMap(root, nbls, Fdrive, tstim, PRF, amps, DCs): ''' Compute the activation map of a neuron with specific sonophore radius at a given frequency and PRF, by computing the spiking metrics of simulation results over a 2D space (amplitude x duty cycle). :param root: directory containing the input data files :param neuron: neuron name :param a: sonophore radius :param Fdrive: US frequency (Hz) :param tstim: duration of US stimulation (s) :param PRF: pulse repetition frequency (Hz) :param amps: vector of acoustic amplitudes (Pa) :param DCs: vector of duty cycles (-) :return the activation matrix ''' # Load activation map from file if it exists actmap_filename = 'actmap {} {}Hz PRF{}Hz {}s.csv'.format( nbls.neuron.name, *si_format([Fdrive, PRF, tstim], space='')) actmap_filepath = os.path.join(root, actmap_filename) if os.path.isfile(actmap_filepath): logger.info('Loading activation map for %s neuron', nbls.neuron.name) return np.loadtxt(actmap_filepath, delimiter=',') # Otherwise generate it logger.info('Generating activation map for %s neuron', nbls.neuron.name) actmap = np.empty((amps.size, DCs.size)) nfiles = DCs.size * amps.size for i, A in enumerate(amps): for j, DC in enumerate(DCs): - fname = '{}.pkl'.format(nbls.filecode(Fdrive, A, tstim, PRF, DC, 'sonic')) + fname = '{}.pkl'.format(nbls.filecode(Fdrive, A, tstim, 0., PRF, DC, 'sonic')) fpath = os.path.join(root, fname) if not os.path.isfile(fpath): logger.error('"{}" file not found'.format(fname)) actmap[i, j] = np.nan else: # Load data logger.debug('Loading file {}/{}: "{}"'.format(i * amps.size + j + 1, nfiles, fname)) with open(fpath, 'rb') as fh: frame = pickle.load(fh) df = frame['data'] meta = frame['meta'] tstim = meta['tstim'] t = df['t'].values Qm = df['Qm'].values dt = t[1] - t[0] # Detect spikes on charge profile during stimulus mpd = int(np.ceil(SPIKE_MIN_DT / dt)) ispikes, *_ = findPeaks( Qm[t <= tstim], mph=SPIKE_MIN_QAMP, mpd=mpd, mpp=SPIKE_MIN_QPROM ) # Compute firing metrics if ispikes.size == 0: # if no spike, assign -1 actmap[i, j] = -1 elif ispikes.size == 1: # if only 1 spike, assign 0 actmap[i, j] = 0 else: # if more than 1 spike, assign firing rate FRs = 1 / np.diff(t[ispikes]) actmap[i, j] = np.mean(FRs) # Save activation map to file np.savetxt(actmap_filepath, actmap, delimiter=',') return actmap def onClick(event, root, nbls, Fdrive, tstim, PRF, amps, DCs, meshedges, tmax, Vbounds): ''' Retrieve the specific input parameters of the x and y dimensions when the user clicks on a cell in the 2D map, and define filename from it. ''' # Get DC and A from x and y coordinates x, y = event.xdata, event.ydata DC = DCs[np.searchsorted(meshedges[0], x * 1e-2) - 1] Adrive = amps[np.searchsorted(meshedges[1], y * 1e3) - 1] # Define filepath - fname = '{}.pkl'.format(nbls.filecode(Fdrive, Adrive, tstim, PRF, DC, 'sonic')) + fname = '{}.pkl'.format(nbls.filecode(Fdrive, Adrive, tstim, 0., PRF, DC, 'sonic')) filepath = os.path.join(root, fname) # Plot Q-trace try: plotQVeff(filepath, tmax=tmax, ybounds=Vbounds) plotFRspectrum(filepath) plt.show() except FileNotFoundError as err: logger.error(err) def plotQVeff(filepath, tonset=10, tmax=None, ybounds=None, fs=8, lw=1): ''' Plot superimposed profiles of membrane charge density and effective membrane potential. :param filepath: full path to the data file :param tonset: pre-stimulus onset to add to profiles (ms) :param tmax: max time value showed on graph (ms) :param ybounds: y-axis bounds (mV / nC/cm2) :return: handle to the generated figure ''' # Check file existence fname = ntpath.basename(filepath) if not os.path.isfile(filepath): raise FileNotFoundError('Error: "{}" file does not exist'.format(fname)) # Load data logger.debug('Loading data from "%s"', fname) with open(filepath, 'rb') as fh: frame = pickle.load(fh) df = frame['data'] meta = frame['meta'] t = df['t'].values * 1e3 # ms Qm = df['Qm'].values * 1e5 # nC/cm2 Vm = df['Vm'].values # mV # Add onset to profiles t = np.hstack((np.array([-tonset, t[0]]), t)) Vm = np.hstack((np.array([getNeuronsDict()[meta['neuron']]().Vm0] * 2), Vm)) Qm = np.hstack((np.array([Qm[0]] * 2), Qm)) # Determine axes bounds if tmax is None: tmax = t.max() if ybounds is None: ybounds = (min(Vm.min(), Qm.min()), max(Vm.max(), Qm.max())) # Create figure fig, ax = plt.subplots(figsize=cm2inch(7, 3)) fig.canvas.set_window_title(fname) plt.subplots_adjust(left=0.2, bottom=0.2, right=0.95, top=0.95) for key in ['top', 'right']: ax.spines[key].set_visible(False) for key in ['bottom', 'left']: ax.spines[key].set_position(('axes', -0.03)) ax.spines[key].set_linewidth(2) ax.yaxis.set_tick_params(width=2) ax.yaxis.set_major_formatter(FormatStrFormatter('%.0f')) ax.set_xlim((-tonset, tmax)) ax.set_xticks([]) ax.set_xlabel('{}s'.format(si_format((tonset + tmax) * 1e-3, space=' ')), fontsize=fs) ax.set_ylabel('mV - $\\rm nC/cm^2$', fontsize=fs, labelpad=-15) ax.set_ylim(ybounds) ax.set_yticks(ybounds) for item in ax.get_yticklabels(): item.set_fontsize(fs) # Plot Qm and Vmeff profiles ax.plot(t, Vm, color='darkgrey', linewidth=lw) ax.plot(t, Qm, color='k', linewidth=lw) # fig.tight_layout() return fig def plotFRspectrum(filepath, FRbounds=None, fs=8, lw=1): ''' Plot firing rate specturm. :param filepath: full path to the data file :param FRbounds: firing rate bounds (Hz) :return: handle to the generated figure ''' # Determine FR bounds if FRbounds is None: FRbounds = (1e0, 1e3) # Check file existence fname = ntpath.basename(filepath) if not os.path.isfile(filepath): raise FileNotFoundError('Error: "{}" file does not exist'.format(fname)) # Load data logger.debug('Loading data from "%s"', fname) with open(filepath, 'rb') as fh: frame = pickle.load(fh) df = frame['data'] meta = frame['meta'] tstim = meta['tstim'] t = df['t'].values Qm = df['Qm'].values dt = t[1] - t[0] # Detect spikes on charge profile during stimulus mpd = int(np.ceil(SPIKE_MIN_DT / dt)) ispikes, *_ = findPeaks( Qm[t <= tstim], mph=SPIKE_MIN_QAMP, mpd=mpd, mpp=SPIKE_MIN_QPROM ) # Compute FR spectrum if ispikes.size <= MIN_NSPIKES_SPECTRUM: raise ValueError('Number of spikes is to small to form spectrum') FRs = 1 / np.diff(t[ispikes]) logbins = np.logspace(np.log10(FRbounds[0]), np.log10(FRbounds[1]), 30) # Create figure fig, ax = plt.subplots(figsize=cm2inch(7, 3)) fig.canvas.set_window_title(fname) for key in ['top', 'right']: ax.spines[key].set_visible(False) ax.set_xlim(FRbounds) ax.set_xlabel('Firing rate (Hz)', fontsize=fs) ax.set_ylabel('Density', fontsize=fs) for item in ax.get_yticklabels(): item.set_fontsize(fs) ax.hist(FRs, bins=logbins, density=True, color='k') ax.set_xscale('log') fig.tight_layout() return fig def plotActivationMap(root, neuron, a, Fdrive, tstim, PRF, amps, DCs, Ascale='log', FRscale='log', FRbounds=None, title=None, fs=8, thrs=True, connect=False, tmax=None, Vbounds=None): ''' Plot a neuron's activation map over the amplitude x duty cycle 2D space. :param root: directory containing the input data files :param neuron: neuron name :param a: sonophore radius :param Fdrive: US frequency (Hz) :param tstim: duration of US stimulation (s) :param PRF: pulse repetition frequency (Hz) :param amps: vector of acoustic amplitudes (Pa) :param DCs: vector of duty cycles (-) :param Ascale: scale to use for the amplitude dimension ('lin' or 'log') :param FRscale: scale to use for the firing rate coloring ('lin' or 'log') :param FRbounds: lower and upper bounds of firing rate color-scale :param title: figure title :param fs: fontsize to use for the title and labels :return: 3-tuple with the handle to the generated figure and the mesh x and y coordinates ''' neuronobj = getNeuronsDict()[neuron]() nbls = NeuronalBilayerSonophore(a, neuronobj) # Get activation map actmap = getActivationMap(root, nbls, Fdrive, tstim, PRF, amps, DCs) # Check firing rate bounding minFR, maxFR = (actmap[actmap > 0].min(), actmap.max()) logger.info('FR range: %.0f - %.0f Hz', minFR, maxFR) if FRbounds is None: FRbounds = (minFR, maxFR) else: if minFR < FRbounds[0]: logger.warning('Minimal firing rate (%.0f Hz) is below defined lower bound (%.0f Hz)', minFR, FRbounds[0]) if maxFR > FRbounds[1]: logger.warning('Maximal firing rate (%.0f Hz) is above defined upper bound (%.0f Hz)', maxFR, FRbounds[1]) # Plot activation map if FRscale == 'lin': norm = matplotlib.colors.Normalize(*FRbounds) elif FRscale == 'log': norm = matplotlib.colors.LogNorm(*FRbounds) fig, ax = plt.subplots(figsize=cm2inch(8, 5.8)) fig.subplots_adjust(left=0.15, bottom=0.15, right=0.8, top=0.92) if title is None: title = '{} neuron @ {}Hz, {}Hz PRF ({}m sonophore)'.format( neuron, *si_format([Fdrive, PRF, a])) ax.set_title(title, fontsize=fs) if Ascale == 'log': ax.set_yscale('log') ax.set_xlabel('Duty cycle (%)', fontsize=fs, labelpad=-0.5) ax.set_ylabel('Amplitude (kPa)', fontsize=fs) ax.set_xlim(np.array([DCs.min(), DCs.max()]) * 1e2) for item in ax.get_xticklabels() + ax.get_yticklabels(): item.set_fontsize(fs) xedges = computeMeshEdges(DCs) yedges = computeMeshEdges(amps, scale=Ascale) actmap[actmap == -1] = np.nan actmap[actmap == 0] = 1e-3 cmap = plt.get_cmap('viridis') cmap.set_bad('silver') cmap.set_under('k') ax.pcolormesh(xedges * 1e2, yedges * 1e-3, actmap, cmap=cmap, norm=norm) if thrs: Athrs_fname = 'Athrs_{}_{:.0f}nm_{}Hz_PRF{}Hz_{}s.xlsx'.format( neuron, a * 1e9, *si_format([Fdrive, PRF, tstim], 0, space='')) fpath = os.path.join(root, Athrs_fname) if os.path.isfile(fpath): df = pd.read_excel(fpath, sheet_name='Data') DCs = df['Duty factor'].values Athrs = df['Adrive (kPa)'].values iDCs = np.argsort(DCs) DCs = DCs[iDCs] Athrs = Athrs[iDCs] ax.plot(DCs * 1e2, Athrs, '-', color='#F26522', linewidth=2, label='threshold amplitudes') ax.legend(loc='lower center', frameon=False, fontsize=8) else: logger.warning('%s file not found -> cannot draw threshold curve', fpath) # # Plot rheobase amplitudes if specified # if rheobase: # logger.info('Computing rheobase amplitudes') # dDC = 0.01 # DCs_dense = np.arange(dDC, 100 + dDC / 2, dDC) / 1e2 # neuronobj = getNeuronsDict()[neuron]() # nbls = NeuronalBilayerSonophore(a, neuronobj) # Athrs = nbls.findRheobaseAmps(DCs_dense, Fdrive, neuronobj.VT)[0] # ax.plot(DCs_dense * 1e2, Athrs * 1e-3, '-', color='#F26522', linewidth=2, # label='threshold amplitudes') # ax.legend(loc='lower center', frameon=False, fontsize=8) # Plot firing rate colorbar sm = plt.cm.ScalarMappable(cmap=cmap, norm=norm) sm._A = [] pos1 = ax.get_position() # get the map axis position cbarax = fig.add_axes([pos1.x1 + 0.02, pos1.y0, 0.03, pos1.height]) fig.colorbar(sm, cax=cbarax) cbarax.set_ylabel('Firing rate (Hz)', fontsize=fs) for item in cbarax.get_yticklabels(): item.set_fontsize(fs) # Link callback to figure if connect: fig.canvas.mpl_connect( 'button_press_event', lambda event: onClick(event, root, nbls, Fdrive, tstim, PRF, amps, DCs, (xedges, yedges), tmax, Vbounds) ) return fig def plotAstimRheobaseAmps(neuron, radii, freqs, fs=12): ''' Plot threshold excitation amplitudes (determined by quasi-steady approximation) of a specific neuron as a function of duty cycle, for various combinations of sonophore radius and US frequency. :param neuron: neuron object :param radii: list of sonophore radii (m) :param freqs: list US frequencies (Hz) :return: figure handle ''' linestyles = ['-', '--', ':', '-.'] assert len(freqs) <= len(linestyles), 'too many frequencies' fig, ax = plt.subplots() ax.set_title('{} neuron: rheobase amplitude profiles'.format(neuron.name), fontsize=fs) ax.set_xlabel('Duty cycle (%)', fontsize=fs) ax.set_ylabel('Threshold amplitude (kPa)', fontsize=fs) for item in ax.get_xticklabels() + ax.get_yticklabels(): item.set_fontsize(fs) ax.set_yscale('log') ax.set_xlim([0, 100]) ax.set_ylim([10, 600]) DCs = np.arange(1, 101) / 1e2 for i, a in enumerate(radii): nbls = NeuronalBilayerSonophore(a, neuron) for j, Fdrive in enumerate(freqs): Athrs, Aref = nbls.findRheobaseAmps(DCs, Fdrive, neuron.VT) color = 'C{}'.format(i) lbl = '{:.0f} nm radius sonophore, {}Hz'.format(a * 1e9, si_format(Fdrive, 1, space=' ')) ax.plot(DCs * 1e2, Athrs * 1e-3, linestyles[j], c=color, label=lbl) ax.legend(fontsize=fs, frameon=False) fig.tight_layout() return fig def plotEstimRheobaseAmps(neurons, fs=15): fig, ax = plt.subplots() ax.set_title('Rheobase amplitudes', fontsize=fs) ax.set_xlabel('Duty cycle (%)', fontsize=fs) ax.set_ylabel('Threshold amplitude (mA/m2)', fontsize=fs) for item in ax.get_xticklabels() + ax.get_yticklabels(): item.set_fontsize(fs) ax.set_yscale('log') ax.set_ylim([1e0, 1e3]) DCs = np.arange(1, 101) / 1e2 for neuron in neurons: Athrs = neuron.findRheobaseAmps(DCs, neuron.VT) ax.plot(DCs * 1e2, Athrs, label='{} neuron'.format(neuron.name)) ax.legend(fontsize=fs, frameon=False) fig.tight_layout() return fig diff --git a/paper figures/fig7.py b/paper figures/fig7.py index feeddcc..3c125fa 100644 --- a/paper figures/fig7.py +++ b/paper figures/fig7.py @@ -1,152 +1,152 @@ # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Date: 2018-09-26 09:51:43 # @Last Modified by: Theo Lemaire -# @Last Modified time: 2019-05-27 13:41:37 +# @Last Modified time: 2019-05-31 15:22:32 ''' Sub-panels of (duty-cycle x amplitude) US activation maps and related Q-V traces. ''' import os import numpy as np import logging import matplotlib import matplotlib.pyplot as plt from argparse import ArgumentParser from PySONIC.core import NeuronalBilayerSonophore from PySONIC.utils import logger, selectDirDialog, si_format from PySONIC.plt import plotActivationMap, plotQVeff from PySONIC.neurons import getNeuronsDict # Plot parameters matplotlib.rcParams['pdf.fonttype'] = 42 matplotlib.rcParams['ps.fonttype'] = 42 matplotlib.rcParams['font.family'] = 'arial' # Figure basename figbase = os.path.splitext(__file__)[0] def plot_actmap(inputdir, neuron, a, Fdrive, tstim, amps, PRF, DCs, FRbounds, insets, prefix): mapcode = '{} {}Hz PRF{}Hz 1s'.format(neuron, *si_format([Fdrive, PRF, tstim], space='')) subdir = os.path.join(inputdir, mapcode) fig = plotActivationMap( subdir, neuron, a, Fdrive, tstim, PRF, amps, DCs, FRbounds=FRbounds, thrs=True) fig.canvas.set_window_title('{} map {}'.format(prefix, mapcode)) ax = fig.axes[0] DC_insets, A_insets = zip(*insets) ax.scatter(DC_insets, A_insets, s=80, facecolors='none', edgecolors='k', linestyle='--') return fig def plot_traces(inputdir, neuron, a, Fdrive, Adrive, tstim, PRF, DC, tmax, Vbounds, prefix): mapcode = '{} {}Hz PRF{}Hz 1s'.format(neuron, *si_format([Fdrive, PRF, tstim], space='')) subdir = os.path.join(inputdir, mapcode) neuronobj = getNeuronsDict()[neuron]() nbls = NeuronalBilayerSonophore(a, neuronobj) - fname = '{}.pkl'.format(nbls.filecode(Fdrive, Adrive, tstim, PRF, DC, 'sonic')) + fname = '{}.pkl'.format(nbls.filecode(Fdrive, Adrive, tstim, 0., PRF, DC, 'sonic')) fpath = os.path.join(subdir, fname) fig = plotQVeff(fpath, tmax=tmax, ybounds=Vbounds) figcode = '{} VQ trace {} {:.1f}kPa {:.0f}%DC'.format(prefix, neuron, Adrive * 1e-3, DC * 1e2) fig.canvas.set_window_title(figcode) return fig def panel(inputdir, neurons, a, tstim, PRF, amps, DCs, FRbounds, tmax, Vbounds, insets, prefix): mapfigs = [ plot_actmap(inputdir, n, a, 500e3, tstim, amps, PRF, DCs, FRbounds, insets[n], prefix) for n in neurons ] tracefigs = [] for n in neurons: for inset in insets[n]: DC = inset[0] * 1e-2 Adrive = inset[1] * 1e3 tracefigs.append(plot_traces( inputdir, n, a, 500e3, Adrive, tstim, PRF, DC, tmax, Vbounds, prefix)) return mapfigs + tracefigs def main(): ap = ArgumentParser() # Runtime options ap.add_argument('-v', '--verbose', default=False, action='store_true', help='Increase verbosity') ap.add_argument('-i', '--inputdir', type=str, help='Input directory') ap.add_argument('-f', '--figset', type=str, nargs='+', help='Figure set', default='all') ap.add_argument('-s', '--save', default=False, action='store_true', help='Save output figures as pdf') args = ap.parse_args() loglevel = logging.DEBUG if args.verbose is True else logging.INFO logger.setLevel(loglevel) inputdir = selectDirDialog() if args.inputdir is None else args.inputdir if inputdir == '': logger.error('No input directory chosen') return figset = args.figset if figset == 'all': figset = ['a', 'b', 'c'] logger.info('Generating panel {} of {}'.format(figset, figbase)) # Parameters neurons = ['RS', 'LTS'] a = 32e-9 # m tstim = 1.0 # s amps = np.logspace(np.log10(10), np.log10(600), num=30) * 1e3 # Pa DCs = np.arange(1, 101) * 1e-2 FRbounds = (1e0, 1e3) # Hz tmax = 240 # ms Vbounds = -150, 50 # mV # Generate figures try: figs = [] if 'a' in figset: PRF = 1e1 insets = { 'RS': [(28, 127.0), (37, 168.4)], 'LTS': [(8, 47.3), (30, 146.2)] } figs += panel(inputdir, neurons, a, tstim, PRF, amps, DCs, FRbounds, tmax, Vbounds, insets, figbase + 'a') if 'b' in figset: PRF = 1e2 insets = { 'RS': [(51, 452.4), (56, 452.4)], 'LTS': [(13, 193.9), (43, 257.2)] } figs += panel(inputdir, neurons, a, tstim, PRF, amps, DCs, FRbounds, tmax, Vbounds, insets, figbase + 'b') if 'c' in figset: PRF = 1e3 insets = { 'RS': [(40, 110.2), (64, 193.9)], 'LTS': [(10, 47.3), (53, 168.4)] } figs += panel(inputdir, neurons, a, tstim, PRF, amps, DCs, FRbounds, tmax, Vbounds, insets, figbase + 'c') except Exception as e: logger.error(e) quit() if args.save: for fig in figs: figname = '{}.pdf'.format(fig.canvas.get_window_title()) fig.savefig(os.path.join(inputdir, figname), transparent=True) else: plt.show() if __name__ == '__main__': main() diff --git a/paper figures/fig9.py b/paper figures/fig9.py index ca12da3..9bbcd8f 100644 --- a/paper figures/fig9.py +++ b/paper figures/fig9.py @@ -1,105 +1,106 @@ # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Date: 2018-12-09 12:06:01 # @Last Modified by: Theo Lemaire -# @Last Modified time: 2019-05-27 13:34:46 +# @Last Modified time: 2019-05-31 15:23:40 ''' Sub-panels of SONIC model validation on an STN neuron (response to CW sonication). ''' import os import logging import numpy as np import matplotlib import matplotlib.pyplot as plt from argparse import ArgumentParser from PySONIC.core import NeuronalBilayerSonophore from PySONIC.neurons import OtsukaSTN from PySONIC.utils import logger, selectDirDialog, getLowIntensitiesSTN, Intensity2Pressure from PySONIC.plt import plotFRProfile, plotBatch # Plot parameters matplotlib.rcParams['pdf.fonttype'] = 42 matplotlib.rcParams['ps.fonttype'] = 42 matplotlib.rcParams['font.family'] = 'arial' # Figure basename figbase = os.path.splitext(__file__)[0] def main(): ap = ArgumentParser() # Runtime options ap.add_argument('-v', '--verbose', default=False, action='store_true', help='Increase verbosity') ap.add_argument('-i', '--inputdir', type=str, help='Input directory') ap.add_argument('-f', '--figset', type=str, nargs='+', help='Figure set', default='all') ap.add_argument('-s', '--save', default=False, action='store_true', help='Save output figures as pdf') args = ap.parse_args() loglevel = logging.DEBUG if args.verbose is True else logging.INFO logger.setLevel(loglevel) inputdir = selectDirDialog() if args.inputdir is None else args.inputdir if inputdir == '': logger.error('No input directory chosen') return figset = args.figset if figset is 'all': figset = ['a', 'b'] logger.info('Generating panels {} of {}'.format(figset, figbase)) # Parameters neuron = OtsukaSTN() a = 32e-9 # m Fdrive = 500e3 # Hz tstim = 1 # s + toffset = 0. # s PRF = 1e2 DC = 1. nbls = NeuronalBilayerSonophore(a, neuron) # Range of intensities intensities = getLowIntensitiesSTN() # W/m2 # Levels depicted with individual traces subset_intensities = [112, 114, 123] # W/m2 # convert to amplitudes and get filepaths amplitudes = Intensity2Pressure(intensities) # Pa - fnames = ['{}.pkl'.format(nbls.filecode(Fdrive, A, tstim, PRF, DC, 'sonic')) + fnames = ['{}.pkl'.format(nbls.filecode(Fdrive, A, tstim, toffset, PRF, DC, 'sonic')) for A in amplitudes] fpaths = [os.path.join(inputdir, 'STN', fn) for fn in fnames] # Generate figures figs = [] if 'a' in figset: fig = plotFRProfile(fpaths, 'Qm', no_offset=True, no_first=False, zref='A', zscale='lin', cmap='Oranges') fig.canvas.set_window_title(figbase + 'a') figs.append(fig) if 'b' in figset: isubset = [np.argwhere(intensities == x)[0][0] for x in subset_intensities] subset_amplitudes = amplitudes[isubset] titles = ['{:.2f} kPa ({:.0f} W/m2)'.format(A * 1e-3, I) for A, I in zip(subset_amplitudes, subset_intensities)] print(titles) figtraces = plotBatch([fpaths[i] for i in isubset], pltscheme={'Q_m': ['Qm']}) for fig, title in zip(figtraces, titles): fig.axes[0].set_title(title) fig.canvas.set_window_title(figbase + 'b {}'.format(title)) figs.append(fig) if args.save: for fig in figs: s = fig.canvas.get_window_title() s = s.replace('(', '- ').replace('/', '_').replace(')', '') figname = '{}.pdf'.format(s) fig.savefig(os.path.join(inputdir, figname), transparent=True) else: plt.show() if __name__ == '__main__': main() diff --git a/paper figures/utils.py b/paper figures/utils.py index 3bb5930..aaf9eb1 100644 --- a/paper figures/utils.py +++ b/paper figures/utils.py @@ -1,119 +1,119 @@ # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Date: 2018-10-01 20:45:29 # @Last Modified by: Theo Lemaire -# @Last Modified time: 2019-05-27 13:34:25 +# @Last Modified time: 2019-05-31 15:23:57 import os import numpy as np import pandas as pd from PySONIC.utils import * from PySONIC.core import NeuronalBilayerSonophore from PySONIC.neurons import * from PySONIC.batches import runBatch from PySONIC.postpro import computeSpikingMetrics def getCWtitrations_vs_Fdrive(neurons, a, freqs, tstim, toffset, fpath): fkey = 'Fdrive (kHz)' freqs = np.array(freqs) if os.path.isfile(fpath): df = pd.read_csv(fpath, sep=',', index_col=fkey) else: df = pd.DataFrame(index=freqs * 1e-3) for neuron in neurons: if neuron not in df and neuron.isTitratable(): neuronobj = getNeuronsDict()[neuron]() nbls = NeuronalBilayerSonophore(a, neuronobj) for i, Fdrive in enumerate(freqs): logger.info('Running CW titration for %s neuron @ %sHz', neuron, si_format(Fdrive)) Athr = nbls.titrate(Fdrive, tstim, toffset) # Pa df.loc[Fdrive * 1e-3, neuron] = np.ceil(Athr * 1e-2) / 10 df.sort_index(inplace=True) df.to_csv(fpath, sep=',', index_label=fkey) return df def getCWtitrations_vs_radius(neurons, radii, Fdrive, tstim, toffset, fpath): akey = 'radius (nm)' radii = np.array(radii) if os.path.isfile(fpath): df = pd.read_csv(fpath, sep=',', index_col=akey) else: df = pd.DataFrame(index=radii * 1e9) for neuron in neurons: if neuron not in df and neuron.isTitratable(): neuronobj = getNeuronsDict()[neuron]() for a in radii: nbls = NeuronalBilayerSonophore(a, neuronobj) logger.info( 'Running CW titration for %s neuron @ %sHz (%.2f nm sonophore radius)', neuron, si_format(Fdrive), a * 1e9) Athr = nbls.titrate(Fdrive, tstim, toffset) # Pa df.loc[a * 1e9, neuron] = np.ceil(Athr * 1e-2) / 10 df.sort_index(inplace=True) df.to_csv(fpath, sep=',', index_label=akey) return df def getSims(outdir, neuron, a, queue): fpaths = [] updated_queue = [] neuronobj = getNeuronsDict()[neuron]() nbls = NeuronalBilayerSonophore(a, neuronobj) for i, item in enumerate(queue): - Fdrive, tstim, _, PRF, DC, Adrive, method = item - fcode = nbls.filecode(Fdrive, Adrive, tstim, PRF, DC, method) + Fdrive, tstim, toffset, PRF, DC, Adrive, method = item + fcode = nbls.filecode(Fdrive, Adrive, tstim, toffset, PRF, DC, method) fpath = os.path.join(outdir, '{}.pkl'.format(fcode)) if not os.path.isfile(fpath): print(fpath, 'does not exist') updated_queue.append(item) fpaths.append(fpath) if len(updated_queue) > 0: print(updated_queue) # neuron = getNeuronsDict()[neuron]() # nbls = NeuronalBilayerSonophore(a, neuron) # runBatch(nbls, 'runAndSave', updated_queue, extra_params=[outdir], mpi=True) return fpaths def getSpikingMetrics(outdir, neuron, xvar, xkey, data_fpaths, metrics_fpaths): metrics = {} for stype in data_fpaths.keys(): if os.path.isfile(metrics_fpaths[stype]): logger.info('loading spiking metrics from file: "%s"', metrics_fpaths[stype]) metrics[stype] = pd.read_csv(metrics_fpaths[stype], sep=',') else: logger.warning('computing %s spiking metrics vs. %s for %s neuron', stype, xkey, neuron) metrics[stype] = computeSpikingMetrics(data_fpaths[stype]) metrics[stype][xkey] = pd.Series(xvar, index=metrics[stype].index) metrics[stype].to_csv(metrics_fpaths[stype], sep=',', index=False) return metrics def extractCompTimes(filenames): ''' Extract computation times from a list of simulation files. ''' tcomps = np.empty(len(filenames)) for i, fn in enumerate(filenames): logger.info('Loading data from "%s"', fn) with open(fn, 'rb') as fh: frame = pickle.load(fh) meta = frame['meta'] tcomps[i] = meta['tcomp'] return tcomps def getCompTimesQuant(outdir, neuron, xvars, xkey, data_fpaths, comptimes_fpath): if os.path.isfile(comptimes_fpath): logger.info('reading computation times from file: "%s"', comptimes_fpath) comptimes = pd.read_csv(comptimes_fpath, sep=',', index_col=xkey) else: logger.warning('extracting computation times for %s neuron', neuron) comptimes = pd.DataFrame(index=xvars) for stype in data_fpaths.keys(): for i, xvar in enumerate(xvars): comptimes.loc[xvar, stype] = extractCompTimes([data_fpaths[stype][i]]) comptimes.to_csv(comptimes_fpath, sep=',', index_label=xkey) return comptimes diff --git a/scripts/STN_regime_transition.py b/scripts/STN_regime_transition.py index 14cfa56..968ade9 100644 --- a/scripts/STN_regime_transition.py +++ b/scripts/STN_regime_transition.py @@ -1,193 +1,193 @@ # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Date: 2018-09-28 16:13:34 # @Last Modified by: Theo Lemaire -# @Last Modified time: 2019-05-27 13:45:49 +# @Last Modified time: 2019-05-31 15:25:24 ''' Script to study STN transitions between different behavioral regimesl. ''' import os import pickle import numpy as np import matplotlib.pyplot as plt import matplotlib from argparse import ArgumentParser import logging from PySONIC.core import NeuronalBilayerSonophore from PySONIC.utils import * from PySONIC.postpro import getFixedPoints from PySONIC.neurons import getNeuronsDict from PySONIC.plt import plotVarsQSS, plotQSSVarVsAmp, plotVarDynamics # Plot parameters matplotlib.rcParams['pdf.fonttype'] = 42 matplotlib.rcParams['ps.fonttype'] = 42 matplotlib.rcParams['font.family'] = 'arial' # Set logging level logger.setLevel(logging.INFO) def getChargeStabilizationFromSims(inputdir, nbls, Fdrive, amps, tstim, PRF=100, DC=1.0): # Get filenames - fnames = ['{}.pkl'.format(nbls.filecode(Fdrive, A, tstim, PRF, DC, 'sonic')) + fnames = ['{}.pkl'.format(nbls.filecode(Fdrive, A, tstim, 0., PRF, DC, 'sonic')) for A in amps] # Initialize output arrays t_stab = np.empty(amps.size) Q_stab = np.empty(amps.size) Ca_stab = np.empty(amps.size) # For each file for i, fn in enumerate(fnames): # Extract charge temporal profile during stimulus fp = os.path.join(inputdir, fn) # logger.info('loading data from file "{}"'.format(fn)) with open(fp, 'rb') as fh: frame = pickle.load(fh) df = frame['data'] t = df['t'].values Qm = df['Qm'].values Ca = df['Cai'].values Qm = Qm[t < tstim] Ca = Ca[t < tstim] t = t[t < tstim] dt = np.diff(t) # If charge signal is stable during last 100 ms of stimulus if np.ptp(Qm[-int(100e-3 // dt[0]):]) < 5e-5: # Compute instant of stabilization by iNet thresholding iNet_abs = np.abs(np.diff(Qm)) / dt t_stab[i] = t[np.where(iNet_abs > 1e-3)[0][-1] + 2] # Get steady-state charge and Calcium concentration values Q_stab[i] = Qm[-1] Ca_stab[i] = Ca[-1] logger.debug('A = %.2f kPa: Qm stabilization around %.2f nC/cm2 from t = %.0f ms onward', amps[i] * 1e-3, Q_stab[i] * 1e5, t_stab[i] * 1e3) # Otherwise, populate arrays with NaN else: t_stab[i] = np.nan Q_stab[i] = np.nan Ca_stab[i] = np.nan logger.debug('A = %.2f kPa: no Qm stabilization', amps[i] * 1e-3) return t_stab, Q_stab, Ca_stab def compareEqChargesQSSvsSim(inputdir, neuron, a, Fdrive, amps, tstim, fs=12): # Get charge value that cancels out net current in QSS approx. nbls = NeuronalBilayerSonophore(a, neuron, Fdrive) _, Qref, Vmeff, QS_states = nbls.quasiSteadyStates(Fdrive, amps=amps) iNet = neuron.iNet(Vmeff, QS_states) # For each amplitude, take the max of detected stable fixed points # (since stabilization occurs during repolarization phase) Qeq_QSS = np.empty(amps.size) for i, Adrive in enumerate(amps): SFPs = getFixedPoints(Qref, -iNet[i, :]) Qeq_QSS[i] = SFPs.max() if SFPs is not None else np.nan # Get sabilization charge value in simulations _, Qeq_sim, _ = getChargeStabilizationFromSims(inputdir, nbls, Fdrive, amps, tstim) Q_rmse = np.sqrt(np.nanmean((Qeq_sim - Qeq_QSS)**2)) logger.info('RMSE Q = %.3f nC/cm2', Q_rmse * 1e5) # Plot Qm balancing net current as function of amplitude fig, ax = plt.subplots(figsize=(6, 4)) figname = '{} neuron - equilibrium charge vs. amplitude'.format(neuron.name) ax.set_title(figname) ax.set_xlabel('Amplitude (kPa)', fontsize=fs) ax.set_ylabel('$\\rm Q_{eq}\ (nC/cm^2)$', fontsize=fs) for skey in ['top', 'right']: ax.spines[skey].set_visible(False) for item in ax.get_xticklabels() + ax.get_yticklabels(): item.set_fontsize(fs) ax.plot(amps * 1e-3, Qeq_QSS * 1e5, '.', c='C0', label='QSS approximation') ax.plot(amps * 1e-3, Qeq_sim * 1e5, '.', c='C1', label='end of {:.2f} s stimulus (simulation)'.format(tstim)) ax.legend(frameon=False, fontsize=fs) fig.tight_layout() fig.canvas.set_window_title( '{}_Qeq_vs_amp_{:.0f}s'.format(neuron.name, tstim)) return fig def main(): ap = ArgumentParser() # Stimulation parameters ap.add_argument('-i', '--inputdir', type=str, default=None, help='Input directory') ap.add_argument('-o', '--outputdir', type=str, default=None, help='Output directory') ap.add_argument('-f', '--figset', type=str, nargs='+', help='Figure set', default='all') ap.add_argument('-c', '--cmap', type=str, default='viridis', help='Colormap name') ap.add_argument('-v', '--verbose', default=False, action='store_true', help='Increase verbosity') ap.add_argument('-s', '--save', default=False, action='store_true', help='Save output figures as png') # Parse arguments args = ap.parse_args() logger.setLevel(logging.DEBUG if args.verbose else logging.INFO) figset = args.figset if figset == 'all': figset = ['a', 'b', 'c'] neuron = OtsukaSTN() a = 32e-9 # m Fdrive = 500e3 # Hz intensities = getLowIntensitiesSTN() # W/m2 amps = Intensity2Pressure(intensities) # Pa amps_regimes = np.array([5.7, 18.6, 19.9, 21.4]) * 1e3 # Pa tstim = 3.0 # s Cai_range = np.logspace(np.log10(neuron.Cai0 * 1e-4), np.log10(neuron.Cai0 * 1e3), 100) charges = np.array([neuron.Qbounds()[1], neuron.Vm0 * neuron.Cm0 * 1e-3]) figs = [] if 'a' in figset: for Adrive in amps_regimes: figs += [ plotVarsQSS(neuron, a, Fdrive, Adrive), plotVarDynamics(neuron, a, Fdrive, Adrive, charges, 'Cai', Cai_range) ] if 'b' in figset: figs += [ plotQSSVarVsAmp(neuron, a, Fdrive, 'Cai', amps=amps, yscale='log'), plotQSSVarVsAmp(neuron, a, Fdrive, 'iNet', amps=amps) ] if 'c' in figset: inputdir = args.inputdir if args.inputdir is not None else selectDirDialog() if inputdir == '': logger.error('no input directory') else: figs.append(compareEqChargesQSSvsSim(inputdir, neuron, a, Fdrive, amps, tstim)) if args.save: outputdir = args.outputdir if args.outputdir is not None else selectDirDialog() if outputdir == '': logger.error('no output directory') else: for fig in figs: s = fig.canvas.get_window_title() s = s.replace('(', '- ').replace('/', '_').replace(')', '') figname = '{}.png'.format(s) fig.savefig(os.path.join(outputdir, figname), transparent=True) else: plt.show() if __name__ == '__main__': main()