diff --git a/PySONIC/core/nbls.py b/PySONIC/core/nbls.py index b9e6ecc..a913683 100644 --- a/PySONIC/core/nbls.py +++ b/PySONIC/core/nbls.py @@ -1,648 +1,649 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Date: 2016-09-29 16:16:19 # @Email: theo.lemaire@epfl.ch # @Last Modified by: Theo Lemaire -# @Last Modified time: 2019-06-06 18:53:20 +# @Last Modified time: 2019-06-06 21:22:13 from copy import deepcopy import logging import numpy as np import pandas as pd from scipy.interpolate import interp1d from scipy.integrate import solve_ivp from .simulators import PWSimulator, HybridSimulator from .bls import BilayerSonophore from .pneuron import PointNeuron from .batches import createQueue +from ..neurons import getLookups2D, getLookupsDCavg from ..utils import * from ..constants import * from ..postpro import getFixedPoints class NeuronalBilayerSonophore(BilayerSonophore): ''' This class inherits from the BilayerSonophore class and receives an PointNeuron instance at initialization, to define the electro-mechanical NICE model and its SONIC variant. ''' tscale = 'ms' # relevant temporal scale of the model simkey = 'ASTIM' # keyword used to characterize simulations made with this model def __init__(self, a, neuron, Fdrive=None, embedding_depth=0.0): ''' Constructor of the class. :param a: in-plane radius of the sonophore structure within the membrane (m) :param neuron: neuron object :param Fdrive: frequency of acoustic perturbation (Hz) :param embedding_depth: depth of the embedding tissue around the membrane (m) ''' # Check validity of input parameters if not isinstance(neuron, PointNeuron): raise ValueError('Invalid neuron type: "{}" (must inherit from PointNeuron class)' .format(neuron.name)) self.neuron = neuron # Initialize BilayerSonophore parent object BilayerSonophore.__init__(self, a, neuron.Cm0, neuron.Cm0 * neuron.Vm0 * 1e-3, embedding_depth) def __repr__(self): s = '{}({:.1f} nm, {}'.format(self.__class__.__name__, self.a * 1e9, self.neuron) if self.d > 0.: s += ', d={}m'.format(si_format(self.d, precision=1, space=' ')) return s + ')' def params(self): params = super().params() params.update(self.neuron.params()) return params def getPltVars(self, wrapleft='df["', wrapright='"]'): pltvars = super().getPltVars(wrapleft, wrapright) pltvars.update(self.neuron.getPltVars(wrapleft, wrapright)) return pltvars def getPltScheme(self): return self.neuron.getPltScheme() def filecode(self, Fdrive, Adrive, tstim, toffset, PRF, DC, method='sonic'): return '{}_{}_{}_{:.0f}nm_{:.0f}kHz_{:.2f}kPa_{:.0f}ms_{}{}'.format( self.simkey, self.neuron.name, 'CW' if DC == 1 else 'PW', self.a * 1e9, Fdrive * 1e-3, Adrive * 1e-3, tstim * 1e3, 'PRF{:.2f}Hz_DC{:.2f}%_'.format(PRF, DC * 1e2) if DC < 1. else '', method) def fullDerivatives(self, y, t, Adrive, Fdrive, phi): ''' Compute the derivatives of the (n+3) ODE full NBLS system variables. :param y: vector of state variables :param t: specific instant in time (s) :param Adrive: acoustic drive amplitude (Pa) :param Fdrive: acoustic drive frequency (Hz) :param phi: acoustic drive phase (rad) :return: vector of derivatives ''' dydt_mech = BilayerSonophore.derivatives(self, y[:3], t, Adrive, Fdrive, y[3], phi) dydt_elec = self.neuron.Qderivatives(y[3:], t, self.Capct(y[1])) return dydt_mech + dydt_elec def effDerivatives(self, y, t, lkp): ''' Compute the derivatives of the n-ODE effective HH system variables, based on 1-dimensional linear interpolation of "effective" coefficients that summarize the system's behaviour over an acoustic cycle. :param y: vector of HH system variables at time t :param t: specific instant in time (s) :param lkp: dictionary of 1D data points of "effective" coefficients over the charge domain, for specific frequency and amplitude values. :return: vector of effective system derivatives at time t ''' # Split input vector explicitly Qm, *states = y # Compute charge and channel states variation Vmeff = self.neuron.interpVmeff(Qm, lkp) dQmdt = - self.neuron.iNet(Vmeff, states) * 1e-3 dstates = self.neuron.derEffStates(Qm, states, lkp) # Return derivatives vector return [dQmdt, *[dstates[k] for k in self.neuron.states]] def interpEffVariable(self, key, Qm, stim, lkps1D): ''' Interpolate Q-dependent effective variable along solution. :param key: lookup variable key :param Qm: charge density solution vector :param stim: stimulation state solution vector :param lkps1D: dictionary of lookups for ON and OFF states :return: interpolated effective variable vector ''' x = np.zeros(stim.size) x[stim == 0] = np.interp( Qm[stim == 0], lkps1D['ON']['Q'], lkps1D['ON'][key], left=np.nan, right=np.nan) x[stim == 1] = np.interp( Qm[stim == 1], lkps1D['ON']['Q'], lkps1D['OFF'][key], left=np.nan, right=np.nan) return x def runFull(self, Fdrive, Adrive, tstim, toffset, PRF, DC, phi=np.pi): ''' Compute solutions of the full electro-mechanical system for a specific set of US stimulation parameters, using a classic integration scheme. The first iteration uses the quasi-steady simplification to compute the initiation of motion from a flat leaflet configuration. Afterwards, the ODE system is solved iteratively until completion. :param Fdrive: acoustic drive frequency (Hz) :param Adrive: acoustic drive amplitude (Pa) :param tstim: duration of US stimulation (s) :param toffset: duration of the offset (s) :param PRF: pulse repetition frequency (Hz) :param DC: pulse duty cycle (-) :param phi: acoustic drive phase (rad) :return: 2-tuple with the output dataframe and computation time. ''' # Determine time step dt = 1 / (NPC_FULL * Fdrive) # Compute non-zero deflection value for a small perturbation (solving quasi-steady equation) Pac = self.Pacoustic(dt, Adrive, Fdrive, phi) Z0 = self.balancedefQS(self.ng0, self.Qm0, Pac) # Set initial conditions steady_states = self.neuron.steadyStates(self.neuron.Vm0) y0 = np.concatenate(( [0., Z0, self.ng0, self.Qm0], [steady_states[k] for k in self.neuron.states])) # Initialize simulator and compute solution logger.debug('Computing detailed solution') simulator = PWSimulator( lambda y, t: self.fullDerivatives(y, t, Adrive, Fdrive, phi), lambda y, t: self.fullDerivatives(y, t, 0., 0., 0.)) (t, y, stim), tcomp = simulator( y0, dt, tstim, toffset, PRF, DC, print_progress=logger.getEffectiveLevel() <= logging.INFO, target_dt=CLASSIC_TARGET_DT, monitor_time=True) logger.debug('completed in %ss', si_format(tcomp, 1)) # Store output in dataframe data = pd.DataFrame({ 't': t, 'stimstate': stim, 'Z': y[:, 1], 'ng': y[:, 2], 'Qm': y[:, 3] }) data['Vm'] = data['Qm'].values / self.v_Capct(data['Z'].values) * 1e3 # mV for i in range(len(self.neuron.states)): data[self.neuron.states[i]] = y[:, i + 4] # Return dataframe and computation time return data, tcomp def runHybrid(self, Fdrive, Adrive, tstim, toffset, PRF, DC, phi=np.pi): ''' Compute solutions of the system for a specific set of US stimulation parameters, using a hybrid integration scheme. :param Fdrive: acoustic drive frequency (Hz) :param Adrive: acoustic drive amplitude (Pa) :param tstim: duration of US stimulation (s) :param toffset: duration of the offset (s) :param phi: acoustic drive phase (rad) :return: 3-tuple with the time profile, the solution matrix and a state vector ''' # Determine time steps dt_dense, dt_sparse = [1. / (n * Fdrive) for n in [NPC_FULL, NPC_HH]] # Compute non-zero deflection value for a small perturbation (solving quasi-steady equation) Pac = self.Pacoustic(dt_dense, Adrive, Fdrive, phi) Z0 = self.balancedefQS(self.ng0, self.Qm0, Pac) # Set initial conditions steady_states = self.neuron.steadyStates(self.neuron.Vm0) y0 = np.concatenate(( [0., Z0, self.ng0, self.Qm0], [steady_states[k] for k in self.neuron.states], )) is_dense_var = np.array([True] * 3 + [False] * (len(self.neuron.states) + 1)) # Initialize simulator and compute solution logger.debug('Computing hybrid solution') simulator = HybridSimulator( lambda y, t: self.fullDerivatives(y, t, Adrive, Fdrive, phi), lambda y, t: self.fullDerivatives(y, t, 0., 0., 0.), lambda t, y, Cm: self.neuron.Qderivatives(y, t, Cm), lambda yref: self.Capct(yref[1]), is_dense_var, ivars_to_check=[1, 2]) (t, y, stim), tcomp = simulator( y0, dt_dense, dt_sparse, 1. / Fdrive, tstim, toffset, PRF, DC, monitor_time=True) logger.debug('completed in %ss', si_format(tcomp, 1)) # Store output in dataframe data = pd.DataFrame({ 't': t, 'stimstate': stim, 'Z': y[:, 1], 'ng': y[:, 2], 'Qm': y[:, 3] }) data['Vm'] = data['Qm'].values / self.v_Capct(data['Z'].values) * 1e3 # mV for i in range(len(self.neuron.states)): data[self.neuron.states[i]] = y[:, i + 4] # Return dataframe and computation time return data, tcomp def computeEffVars(self, Fdrive, Adrive, Qm, fs): ''' Compute "effective" coefficients of the HH system for a specific combination of stimulus frequency, stimulus amplitude and charge density. A short mechanical simulation is run while imposing the specific charge density, until periodic stabilization. The HH coefficients are then averaged over the last acoustic cycle to yield "effective" coefficients. :param Fdrive: acoustic drive frequency (Hz) :param Adrive: acoustic drive amplitude (Pa) :param Qm: imposed charge density (C/m2) :param fs: list of sonophore membrane coverage fractions :return: list with computation time and a list of dictionaries of effective variables ''' # Run simulation and retrieve deflection and gas content vectors from last cycle data, tcomp = BilayerSonophore.simulate(self, Fdrive, Adrive, Qm) Z_last = data.loc[-NPC_FULL:, 'Z'].values # m Cm_last = self.v_Capct(Z_last) # F/m2 # For each coverage fraction effvars = [] for x in fs: # Compute membrane capacitance and membrane potential vectors Cm = x * Cm_last + (1 - x) * self.Cm0 # F/m2 Vm = Qm / Cm * 1e3 # mV # Compute average cycle value for membrane potential and rate constants effvars.append({'V': np.mean(Vm)}) effvars[-1].update(self.neuron.computeEffRates(Vm)) # Log process log = '{}: lookups @ {}Hz, {}Pa, {:.2f} nC/cm2'.format( self, *si_format([Fdrive, Adrive], precision=1, space=' '), Qm * 1e5) if len(fs) > 1: log += ', fs = {:.0f} - {:.0f}%'.format(fs.min() * 1e2, fs.max() * 1e2) log += ', tcomp = {:.3f} s'.format(tcomp) logger.info(log) # Return effective coefficients return [tcomp, effvars] def runSONIC(self, Fdrive, Adrive, tstim, toffset, PRF, DC): ''' Compute solutions of the system for a specific set of US stimulation parameters, using charge-predicted "effective" coefficients to solve the HH equations at each step. :param Fdrive: acoustic drive frequency (Hz) :param Adrive: acoustic drive amplitude (Pa) :param tstim: duration of US stimulation (s) :param toffset: duration of the offset (s) :param PRF: pulse repetition frequency (Hz) :param DC: pulse duty cycle (-) :return: 3-tuple with the time profile, the effective solution matrix and a state vector ''' # Load appropriate 2D lookups Aref, Qref, lkps2D, _ = getLookups2D(self.neuron.name, a=self.a, Fdrive=Fdrive) # Check that acoustic amplitude is within lookup range Adrive = isWithin('amplitude', Adrive, (Aref.min(), Aref.max())) # Interpolate 2D lookups at zero and US amplitude logger.debug('Interpolating lookups at A = %.2f kPa and A = 0', Adrive * 1e-3) lkps1D = {state: {key: interp1d(Aref, y2D, axis=0)(val) for key, y2D in lkps2D.items()} for state, val in {'ON': Adrive, 'OFF': 0.}.items()} # Add reference charge vector to 1D lookup dictionaries for state in lkps1D.keys(): lkps1D[state]['Q'] = Qref # Set initial conditions steady_states = self.neuron.steadyStates(self.neuron.Vm0) y0 = np.insert(np.array([steady_states[k] for k in self.neuron.states]), 0, self.Qm0) # Initialize simulator and compute solution logger.debug('Computing effective solution') simulator = PWSimulator( lambda y, t: self.effDerivatives(y, t, lkps1D['ON']), lambda y, t: self.effDerivatives(y, t, lkps1D['OFF'])) (t, y, stim), tcomp = simulator(y0, DT_EFF, tstim, toffset, PRF, DC, monitor_time=True) logger.debug('completed in %ss', si_format(tcomp, 1)) # Store output in dataframe data = pd.DataFrame({ 't': t, 'stimstate': stim, 'Qm': y[:, 0] }) for key in ['ng', 'V']: data[key] = self.interpEffVariable(key, data['Qm'].values, stim, lkps1D) data['Z'] = np.array([self.balancedefQS(ng, Qm) for ng, Qm in zip( data['ng'].values, data['Qm'].values)]) # m data['Vm'] = data['Qm'].values / self.v_Capct(data['Z'].values) * 1e3 # mV for i in range(len(self.neuron.states)): data[self.neuron.states[i]] = y[:, i + 1] # Return dataframe and computation time return data, tcomp def meta(self, Fdrive, Adrive, tstim, toffset, PRF, DC, method): ''' Return information about object and simulation parameters. :param Fdrive: US frequency (Hz) :param Adrive: acoustic drive amplitude (Pa) :param tstim: stimulus duration (s) :param toffset: stimulus offset (s) :param PRF: pulse repetition frequency (Hz) :param DC: stimulus duty cycle (-) :param method: integration method :return: meta-data dictionary ''' return { 'neuron': self.neuron.name, 'a': self.a, 'd': self.d, 'Fdrive': Fdrive, 'Adrive': Adrive, 'tstim': tstim, 'toffset': toffset, 'PRF': PRF, 'DC': DC, 'method': method } def simulate(self, Fdrive, Adrive, tstim, toffset, PRF=100., DC=1.0, method='sonic'): ''' Simulate the electro-mechanical model for a specific set of US stimulation parameters, and return output data in a dataframe. :param Fdrive: acoustic drive frequency (Hz) :param Adrive: acoustic drive amplitude (Pa) :param tstim: duration of US stimulation (s) :param toffset: duration of the offset (s) :param PRF: pulse repetition frequency (Hz) :param DC: pulse duty cycle (-) :param method: selected integration method :return: 2-tuple with the output dataframe and computation time. ''' logger.info( '%s: simulation @ f = %sHz, A = %sPa, t = %ss (%ss offset)%s', self, si_format(Fdrive, 0, space=' '), si_format(Adrive, 2, space=' '), *si_format([tstim, toffset], 1, space=' '), (', PRF = {}Hz, DC = {:.2f}%'.format( si_format(PRF, 2, space=' '), DC * 1e2) if DC < 1.0 else '')) # Check validity of stimulation parameters BilayerSonophore.checkInputs(self, Fdrive, Adrive, 0.0, 0.0) self.neuron.checkInputs(Adrive, tstim, toffset, PRF, DC) # Call appropriate simulation function try: simfunc = { 'full': self.runFull, 'hybrid': self.runHybrid, 'sonic': self.runSONIC }[method] except KeyError: raise ValueError('Invalid integration method: "{}"'.format(method)) data, tcomp = simfunc(Fdrive, Adrive, tstim, toffset, PRF, DC) # Log number of detected spikes nspikes = self.neuron.getNSpikes(data) logger.debug('{} spike{} detected'.format(nspikes, plural(nspikes))) # Return dataframe and computation time return data, tcomp @logCache(os.path.join(os.path.split(__file__)[0], 'astim_titrations.log')) def titrate(self, Fdrive, tstim, toffset, PRF=100., DC=1., method='sonic', xfunc=None, Arange=None): ''' Use a binary search to determine the threshold amplitude needed to obtain neural excitation for a given frequency, duration, PRF and duty cycle. :param Fdrive: US frequency (Hz) :param tstim: duration of US stimulation (s) :param toffset: duration of the offset (s) :param PRF: pulse repetition frequency (Hz) :param DC: pulse duty cycle (-) :param method: integration method :param xfunc: function determining whether condition is reached from simulation output :param Arange: search interval for Adrive, iteratively refined :return: determined threshold amplitude (Pa) ''' # Default output function if xfunc is None: xfunc = self.neuron.titrationFunc # Default amplitude interval if Arange is None: Arange = (0, getLookups2D(self.neuron.name, a=self.a, Fdrive=Fdrive)[0].max()) return binarySearch( lambda x: xfunc(self.simulate(*x)[0]), [Fdrive, tstim, toffset, PRF, DC, method], 1, Arange, TITRATION_ASTIM_DA_MAX ) def simQueue(self, freqs, amps, durations, offsets, PRFs, DCs, method): ''' Create a serialized 2D array of all parameter combinations for a series of individual parameter sweeps, while avoiding repetition of CW protocols for a given PRF sweep. :param freqs: list (or 1D-array) of US frequencies :param amps: list (or 1D-array) of acoustic amplitudes :param durations: list (or 1D-array) of stimulus durations :param offsets: list (or 1D-array) of stimulus offsets (paired with durations array) :param PRFs: list (or 1D-array) of pulse-repetition frequencies :param DCs: list (or 1D-array) of duty cycle values :params method: integration method :return: list of parameters (list) for each simulation ''' if amps is None: amps = [np.nan] DCs = np.array(DCs) queue = [] if 1.0 in DCs: queue += createQueue(freqs, amps, durations, offsets, min(PRFs), 1.0) if np.any(DCs != 1.0): queue += createQueue(freqs, amps, durations, offsets, PRFs, DCs[DCs != 1.0]) for item in queue: if np.isnan(item[1]): item[1] = None item.append(method) return queue def quasiSteadyStates(self, Fdrive, amps=None, charges=None, DCs=1.0, squeeze_output=False): ''' Compute the quasi-steady state values of the neuron's gating variables for a combination of US amplitudes, charge densities and duty cycles, at a specific US frequency. :param Fdrive: US frequency (Hz) :param amps: US amplitudes (Pa) :param charges: membrane charge densities (C/m2) :param DCs: duty cycle value(s) :return: 4-tuple with reference values of US amplitude and charge density, as well as interpolated Vmeff and QSS gating variables ''' # Get DC-averaged lookups interpolated at the appropriate amplitudes and charges amps, charges, lookups = getLookupsDCavg( self.neuron.name, self.a, Fdrive, amps, charges, DCs) # Compute QSS states using these lookups nA, nQ, nDC = lookups['V'].shape QSS = {k: np.empty((nA, nQ, nDC)) for k in self.neuron.states} for iA in range(nA): for iDC in range(nDC): QSS_1D = self.neuron.quasiSteadyStates( {k: v[iA, :, iDC] for k, v in lookups.items()}) for k in QSS.keys(): QSS[k][iA, :, iDC] = QSS_1D[k] # Compress outputs if needed if squeeze_output: QSS = {k: v.squeeze() for k, v in QSS.items()} lookups = {k: v.squeeze() for k, v in lookups.items()} # Return reference inputs and outputs return amps, charges, lookups, QSS def iNetQSS(self, Qm, Fdrive, Adrive, DC): ''' Compute quasi-steady state net membrane current for a given combination of US parameters and a given membrane charge density. :param Qm: membrane charge density (C/m2) :param Fdrive: US frequency (Hz) :param Adrive: US amplitude (Pa) :param DC: duty cycle (-) :return: net membrane current (mA/m2) ''' _, _, lookups, QSS = self.quasiSteadyStates( Fdrive, amps=Adrive, charges=Qm, DCs=DC, squeeze_output=True) return self.neuron.iNet(lookups['V'], np.array(list(QSS.values()))) # mA/m2 def evaluateStability(self, Qm0, states0, lkp): ''' Integrate the effective differential system from a given starting point, until clear convergence or clear divergence is found. :param Qm0: initial membrane charge density (C/m2) :param states0: dictionary of initial states values :param lkp: dictionary of 1D data points of "effective" coefficients over the charge domain, for specific frequency and amplitude values. :return: boolean indicating convergence state ''' # Initialize y0 vector t0 = 0. y0 = np.array([Qm0] + list(states0.values())) # # Initialize simulator and compute solution # simulator = PeriodicSimulator( # lambda y, t: self.effDerivatives(y, t, lkp), # ivars_to_check=[0]) # simulator.stopfunc = simulator.isAsymptoticallyStable # nmax = int(QSS_HISTORY_INTERVAL // QSS_INTEGRATION_INTERVAL) # t, y, stim = simulator.compute(y0, DT_EFF, QSS_INTEGRATION_INTERVAL, nmax=nmax) # logger.debug('completed in %ss', si_format(tcomp, 1)) # conv = t[-1] < QSS_HISTORY_INTERVAL # Initializing empty list to record evolution of charge deviation n = int(QSS_HISTORY_INTERVAL // QSS_INTEGRATION_INTERVAL) # size of history dQ = [] # As long as there is no clear charge convergence or divergence conv, div = False, False tf, yf = t0, y0 while not conv and not div: # Integrate system for small interval and retrieve final charge deviation t0, y0 = tf, yf sol = solve_ivp( lambda t, y: self.effDerivatives(y, t, lkp), [t0, t0 + QSS_INTEGRATION_INTERVAL], y0, method='LSODA' ) tf, yf = sol.t[-1], sol.y[:, -1] dQ.append(yf[0] - Qm0) # logger.debug('{:.0f} ms: dQ = {:.5f} nC/cm2, avg dQ = {:.5f} nC/cm2'.format( # tf * 1e3, dQ[-1] * 1e5, np.mean(dQ[-n:]) * 1e5)) # If last charge deviation is too large -> divergence if np.abs(dQ[-1]) > QSS_Q_DIV_THR: div = True # If last charge deviation or average deviation in recent history # is small enough -> convergence for x in [dQ[-1], np.mean(dQ[-n:])]: if np.abs(x) < QSS_Q_CONV_THR: conv = True # If max integration duration is been reached -> error if tf > QSS_MAX_INTEGRATION_DURATION: raise ValueError('too many iterations') logger.debug('{}vergence after {:.0f} ms: dQ = {:.5f} nC/cm2'.format( {True: 'con', False: 'di'}[conv], tf * 1e3, dQ[-1] * 1e5)) return conv def fixedPointsQSS(self, Fdrive, Adrive, DC, lkp, dQdt): ''' Compute QSS fixed points along the charge dimension for a given combination of US parameters, and determine their stability. :param Fdrive: US frequency (Hz) :param Adrive: US amplitude (Pa) :param DC: duty cycle (-) :param lkp: lookup dictionary for effective variables along charge dimension :param dQdt: charge derivative profile along charge dimension :return: 2-tuple with values of stable and unstable fixed points ''' logger.debug('A = {:.2f} kPa, DC = {:.0f}%'.format(Adrive * 1e-3, DC * 1e2)) # Extract stable and unstable fixed points from QSS charge variation profile dfunc = lambda Qm: - self.iNetQSS(Qm, Fdrive, Adrive, DC) SFP_candidates = getFixedPoints(lkp['Q'], dQdt, filter='stable', der_func=dfunc).tolist() UFPs = getFixedPoints(lkp['Q'], dQdt, filter='unstable', der_func=dfunc).tolist() SFPs = [] pltvars = self.getPltVars() # For each candidate SFP for i, Qm in enumerate(SFP_candidates): logger.debug('Q-SFP = {:.2f} nC/cm2'.format(Qm * 1e5)) # Re-compute QSS *_, QSS_FP = self.quasiSteadyStates(Fdrive, amps=Adrive, charges=Qm, DCs=DC, squeeze_output=True) # Simulate from unperturbed QSS and evaluate stability if not self.evaluateStability(Qm, QSS_FP, lkp): logger.warning('diverging system at ({:.2f} kPa, {:.2f} nC/cm2)'.format( Adrive * 1e-3, Qm * 1e5)) UFPs.append(Qm) else: # For each state unstable_states = [] for x in self.neuron.states: pltvar = pltvars[x] unit_str = pltvar.get('unit', '') factor = pltvar.get('factor', 1) is_stable_direction = [] for sign in [-1, +1]: # Perturb state with small offset QSS_perturbed = deepcopy(QSS_FP) QSS_perturbed[x] *= (1 + sign * QSS_REL_OFFSET) # If gating state, bound within [0., 1.] if self.neuron.isVoltageGated(x): QSS_perturbed[x] = np.clip(QSS_perturbed[x], 0., 1.) logger.debug('{}: {:.5f} -> {:.5f} {}'.format( x, QSS_FP[x] * factor, QSS_perturbed[x] * factor, unit_str)) # Simulate from perturbed QSS and evaluate stability is_stable_direction.append( self.evaluateStability(Qm, QSS_perturbed, lkp)) # Check if system shows stability upon x-state perturbation # in both directions if not np.all(is_stable_direction): unstable_states.append(x) # Classify fixed point as stable only if all states show stability is_stable_FP = len(unstable_states) == 0 {True: SFPs, False: UFPs}[is_stable_FP].append(Qm) logger.info('{}stable fixed-point at ({:.2f} kPa, {:.2f} nC/cm2){}'.format( '' if is_stable_FP else 'un', Adrive * 1e-3, Qm * 1e5, '' if is_stable_FP else ', caused by {} states'.format(unstable_states))) return SFPs, UFPs diff --git a/PySONIC/neurons/__init__.py b/PySONIC/neurons/__init__.py index 10c3f90..51961a5 100644 --- a/PySONIC/neurons/__init__.py +++ b/PySONIC/neurons/__init__.py @@ -1,36 +1,37 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Date: 2017-06-06 13:36:00 # @Email: theo.lemaire@epfl.ch # @Last Modified by: Theo Lemaire -# @Last Modified time: 2019-06-06 15:02:07 +# @Last Modified time: 2019-06-06 21:23:54 import inspect import sys +from .lookups import * from .cortical import CorticalRS, CorticalFS, CorticalLTS, CorticalIB from .thalamic import ThalamicRE, ThalamoCortical from .leech import LeechTouch, LeechPressure, LeechRetzius from .stn import OtsukaSTN from .fh import FrankenhaeuserHuxley def getNeuronsDict(): ''' Construct a dictionary of all the implemented point neuron classes, indexed by name. ''' current_module = sys.modules[__name__] neurons_dict = {} for _, obj in inspect.getmembers(current_module): - if inspect.isclass(obj) and isinstance(obj.name, str): + if inspect.isclass(obj) and hasattr(obj, 'name') and isinstance(obj.name, str): neurons_dict[obj.name] = obj return neurons_dict def getPointNeuron(name): ''' Return a point-neuron instance corresponding to a given name. ''' neuron_classes = getNeuronsDict() try: return neuron_classes[name]() except KeyError: raise ValueError('"{}" neuron not found. Implemented neurons are: {}'.format( name, ', '.join(list(neuron_classes.keys())))) diff --git a/PySONIC/neurons/lookups.py b/PySONIC/neurons/lookups.py new file mode 100644 index 0000000..cd2db3f --- /dev/null +++ b/PySONIC/neurons/lookups.py @@ -0,0 +1,240 @@ + +import os +import pickle +import numpy as np +from scipy.interpolate import interp1d + +from ..utils import isWithin, logger, si_format + + +def getNeuronLookupsFile(mechname, a=None, Fdrive=None, Adrive=None, fs=False): + fpath = os.path.join(os.path.split(__file__)[0], '{}_lookups'.format(mechname)) + if a is not None: + fpath += '_{:.0f}nm'.format(a * 1e9) + if Fdrive is not None: + fpath += '_{:.0f}kHz'.format(Fdrive * 1e-3) + if Adrive is not None: + fpath += '_{:.0f}kPa'.format(Adrive * 1e-3) + if fs is True: + fpath += '_fs' + return '{}.pkl'.format(fpath) + + +def getLookups4D(mechname): + ''' Retrieve 4D lookup tables and reference vectors for a given membrane mechanism. + + :param mechname: name of membrane density mechanism + :return: 4-tuple with 1D numpy arrays of reference input vectors (charge density and + one other variable), a dictionary of associated 2D lookup numpy arrays, and + a dictionary with information about the other variable. + ''' + + # Check lookup file existence + lookup_path = getNeuronLookupsFile(mechname) + if not os.path.isfile(lookup_path): + raise FileNotFoundError('Missing lookup file: "{}"'.format(lookup_path)) + + # Load lookups dictionary + # logger.debug('Loading %s lookup table', mechname) + with open(lookup_path, 'rb') as fh: + df = pickle.load(fh) + inputs = df['input'] + lookups4D = df['lookup'] + + # Retrieve 1D inputs from lookups dictionary + aref = inputs['a'] + Fref = inputs['f'] + Aref = inputs['A'] + Qref = inputs['Q'] + + return aref, Fref, Aref, Qref, lookups4D + + +def getLookupsOff(mechname): + ''' Retrieve appropriate US-OFF lookup tables and reference vectors + for a given membrane mechanism. + + :param mechname: name of membrane density mechanism + :return: 2-tuple with 1D numpy array of reference charge density + and dictionary of associated 1D lookup numpy arrays. + ''' + + # Get 4D lookups and input vectors + aref, Fref, Aref, Qref, lookups4D = getLookups4D(mechname) + + # Perform 2D projection in appropriate dimensions + logger.debug('Interpolating lookups at A = 0') + lookups_off = {key: y4D[0, 0, 0, :] for key, y4D in lookups4D.items()} + + return Qref, lookups_off + + +def getLookups2D(mechname, a=None, Fdrive=None, Adrive=None): + ''' Retrieve appropriate 2D lookup tables and reference vectors + for a given membrane mechanism, projected at a specific combination + of sonophore radius, US frequency and/or acoustic pressure amplitude. + + :param mechname: name of membrane density mechanism + :param a: sonophore radius (m) + :param Fdrive: US frequency (Hz) + :param Adrive: Acoustic peak pressure amplitude (Hz) + :return: 4-tuple with 1D numpy arrays of reference input vectors (charge density and + one other variable), a dictionary of associated 2D lookup numpy arrays, and + a dictionary with information about the other variable. + ''' + + # Get 4D lookups and input vectors + aref, Fref, Aref, Qref, lookups4D = getLookups4D(mechname) + + # Check that inputs are within lookup range + if a is not None: + a = isWithin('radius', a, (aref.min(), aref.max())) + if Fdrive is not None: + Fdrive = isWithin('frequency', Fdrive, (Fref.min(), Fref.max())) + if Adrive is not None: + Adrive = isWithin('amplitude', Adrive, (Aref.min(), Aref.max())) + + # Determine projection dimensions based on inputs + var_a = {'name': 'a', 'label': 'sonophore radius', 'val': a, 'unit': 'm', 'factor': 1e9, + 'ref': aref, 'axis': 0} + var_Fdrive = {'name': 'f', 'label': 'frequency', 'val': Fdrive, 'unit': 'Hz', 'factor': 1e-3, + 'ref': Fref, 'axis': 1} + var_Adrive = {'name': 'A', 'label': 'amplitude', 'val': Adrive, 'unit': 'Pa', 'factor': 1e-3, + 'ref': Aref, 'axis': 2} + if not isinstance(Adrive, float): + var1 = var_a + var2 = var_Fdrive + var3 = var_Adrive + elif not isinstance(Fdrive, float): + var1 = var_a + var2 = var_Adrive + var3 = var_Fdrive + elif not isinstance(a, float): + var1 = var_Fdrive + var2 = var_Adrive + var3 = var_a + + # Perform 2D projection in appropriate dimensions + # logger.debug('Interpolating lookups at (%s = %s%s, %s = %s%s)', + # var1['name'], si_format(var1['val'], space=' '), var1['unit'], + # var2['name'], si_format(var2['val'], space=' '), var2['unit']) + lookups3D = {key: interp1d(var1['ref'], y4D, axis=var1['axis'])(var1['val']) + for key, y4D in lookups4D.items()} + if var2['axis'] > var1['axis']: + var2['axis'] -= 1 + lookups2D = {key: interp1d(var2['ref'], y3D, axis=var2['axis'])(var2['val']) + for key, y3D in lookups3D.items()} + + if var3['val'] is not None: + logger.debug('Interpolating lookups at %d new %s values between %s%s and %s%s', + len(var3['val']), var3['name'], + si_format(min(var3['val']), space=' '), var3['unit'], + si_format(max(var3['val']), space=' '), var3['unit']) + lookups2D = {key: interp1d(var3['ref'], y2D, axis=0)(var3['val']) + for key, y2D in lookups2D.items()} + var3['ref'] = np.array(var3['val']) + + return var3['ref'], Qref, lookups2D, var3 + + +def getLookups2Dfs(mechname, a, Fdrive, fs): + + # Check lookup file existence + lookup_path = getNeuronLookupsFile(mechname, a=a, Fdrive=Fdrive, fs=True) + if not os.path.isfile(lookup_path): + raise FileNotFoundError('Missing lookup file: "{}"'.format(lookup_path)) + + # Load lookups dictionary + logger.debug('Loading %s lookup table with fs = %.0f%%', mechname, fs * 1e2) + with open(lookup_path, 'rb') as fh: + df = pickle.load(fh) + inputs = df['input'] + lookups3D = df['lookup'] + + # Retrieve 1D inputs from lookups dictionary + fsref = inputs['fs'] + Aref = inputs['A'] + Qref = inputs['Q'] + + # Check that fs is within lookup range + fs = isWithin('coverage', fs, (fsref.min(), fsref.max())) + + # Perform projection at fs + logger.debug('Interpolating lookups at fs = %s%%', fs * 1e2) + lookups2D = {key: interp1d(fsref, y3D, axis=2)(fs) for key, y3D in lookups3D.items()} + + return Aref, Qref, lookups2D + + +def getLookupsDCavg(mechname, a, Fdrive, amps=None, charges=None, DCs=1.0): + ''' Get the DC-averaged lookups of a specific neuron for a combination of US amplitudes, + charge densities and duty cycles, at a specific US frequency. + + :param mechname: name of membrane density mechanism + :param a: sonophore radius (m) + :param Fdrive: US frequency (Hz) + :param amps: US amplitudes (Pa) + :param charges: membrane charge densities (C/m2) + :param DCs: duty cycle value(s) + :return: 4-tuple with reference values of US amplitude and charge density, + as well as interpolated Vmeff and QSS gating variables + ''' + + # Get lookups for specific (a, f, A) combination + Aref, Qref, lookups2D, _ = getLookups2D(mechname, a=a, Fdrive=Fdrive) + if 'ng' in lookups2D: + lookups2D.pop('ng') + + # Derive inputs from lookups reference if not provided + if amps is None: + amps = Aref + if charges is None: + charges = Qref + + # Transform inputs into arrays if single value provided + if isinstance(amps, float): + amps = np.array([amps]) + if isinstance(charges, float): + charges = np.array([charges]) + if isinstance(DCs, float): + DCs = np.array([DCs]) + nA, nQ, nDC = amps.size, charges.size, DCs.size + # cs = {True: 's', False: ''} + # logger.debug('%u amplitude%s, %u charge%s, %u DC%s', + # nA, cs[nA > 1], nQ, cs[nQ > 1], nDC, cs[nDC > 1]) + + # Re-interpolate lookups at input charges + lookups2D = {key: interp1d(Qref, y2D, axis=1)(charges) for key, y2D in lookups2D.items()} + + # Interpolate US-ON (for each input amplitude) and US-OFF (A = 0) lookups + amps = isWithin('amplitude', amps, (Aref.min(), Aref.max())) + lookups_on = {key: interp1d(Aref, y2D, axis=0)(amps) for key, y2D in lookups2D.items()} + lookups_off = {key: interp1d(Aref, y2D, axis=0)(0.0) for key, y2D in lookups2D.items()} + + # Compute DC-averaged lookups + lookups_DCavg = {} + for key in lookups2D.keys(): + x_on, x_off = lookups_on[key], lookups_off[key] + x_avg = np.empty((nA, nQ, nDC)) + for iA, Adrive in enumerate(amps): + for iDC, DC in enumerate(DCs): + x_avg[iA, :, iDC] = x_on[iA, :] * DC + x_off * (1 - DC) + lookups_DCavg[key] = x_avg + + return amps, charges, lookups_DCavg + + +def getLookupsCompTime(mechname): + + # Check lookup file existence + lookup_path = getNeuronLookupsFile(mechname) + if not os.path.isfile(lookup_path): + raise FileNotFoundError('Missing lookup file: "{}"'.format(lookup_path)) + + # Load lookups dictionary + logger.debug('Loading comp times') + with open(lookup_path, 'rb') as fh: + df = pickle.load(fh) + tcomps4D = df['tcomp'] + + return np.sum(tcomps4D) diff --git a/PySONIC/plt/effvars.py b/PySONIC/plt/effvars.py index 38a24f8..e89a440 100644 --- a/PySONIC/plt/effvars.py +++ b/PySONIC/plt/effvars.py @@ -1,160 +1,161 @@ # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Date: 2018-10-02 01:44:59 # @Last Modified by: Theo Lemaire -# @Last Modified time: 2019-03-15 01:21:22 +# @Last Modified time: 2019-06-06 21:23:03 import numpy as np from scipy.interpolate import interp1d import matplotlib.pyplot as plt import matplotlib.cm as cm import matplotlib -from PySONIC.utils import logger, si_prefixes, isWithin, getLookups2D, getLookupsOff +from PySONIC.utils import logger, si_prefixes, isWithin +from PySONIC.neurons import getLookups2D, getLookupsOff from PySONIC.core import NeuronalBilayerSonophore def setGrid(n, ncolmax=3): ''' Determine number of rows and columns in figure grid, based on number of variables to plot. ''' if n <= ncolmax: return (1, n) else: return ((n - 1) // ncolmax + 1, ncolmax) def plotEffectiveVariables(neuron, a=None, Fdrive=None, Adrive=None, nlevels=10, zscale='lin', cmap=None, fs=12, ncolmax=1): ''' Plot the profiles of effective variables of a specific neuron as a function of charge density and another reference variable (z-variable). For each effective variable, one charge-profile per z-value is plotted, with a color code based on the z-variable value. :param neuron: channel mechanism object :param a: sonophore radius (m) :param Fdrive: acoustic drive frequency (Hz) :param Adrive: acoustic pressure amplitude (Pa) :param nlevels: number of levels for the z-variable :param zscale: scale type for the z-variable ('lin' or 'log') :param cmap: colormap name :param fs: figure fontsize :param ncolmax: max number of columns on the figure :return: handle to the created figure ''' if sum(isinstance(x, float) for x in [a, Fdrive, Adrive]) < 2: raise ValueError('at least 2 parameters in (a, Fdrive, Adrive) must be fixed') if cmap is None: cmap = 'viridis' # Get reference US-OFF lookups (1D) _, lookupsoff = getLookupsOff(neuron.name) nbls = NeuronalBilayerSonophore(32e-9, neuron) pltvars = nbls.getPltVars() # Get 2D lookups at specific combination zref, Qref, lookups2D, zvar = getLookups2D(neuron.name, a=a, Fdrive=Fdrive, Adrive=Adrive) _, lookupsoff = getLookupsOff(neuron.name) for lookups in [lookups2D, lookupsoff]: lookups.pop('ng') lookups['Cm'] = Qref / lookups['V'] * 1e5 # uF/cm2 zref *= zvar['factor'] prefix = {value: key for key, value in si_prefixes.items()}[1 / zvar['factor']] # Optional: interpolate along z dimension if nlevels specified if zscale is 'log': znew = np.logspace(np.log10(zref.min()), np.log10(zref.max()), nlevels) elif zscale is 'lin': znew = np.linspace(zref.min(), zref.max(), nlevels) else: raise ValueError('unknown scale type (should be "lin" or "log")') znew = np.array([isWithin(zvar['label'], z, (zref.min(), zref.max())) for z in znew]) lookups2D = {key: interp1d(zref, y2D, axis=0)(znew) for key, y2D in lookups2D.items()} zref = znew for lookups in [lookups2D, lookupsoff]: lookups['Vm'] = lookups.pop('V') # mV lookups['Cm'] = Qref / lookups['Vm'] * 1e3 # uF/cm2 keys = ['Cm', 'Vm'] + list(lookups2D.keys())[:-2] # Define color code mymap = cm.get_cmap(cmap) if zscale == 'lin': norm = matplotlib.colors.Normalize(zref.min(), zref.max()) elif zscale == 'log': norm = matplotlib.colors.LogNorm(zref.min(), zref.max()) sm = cm.ScalarMappable(norm=norm, cmap=mymap) sm._A = [] # Plot logger.info('plotting') nrows, ncols = setGrid(len(lookups2D), ncolmax=ncolmax) xvar = pltvars['Qm'] Qbounds = np.array([Qref.min(), Qref.max()]) * xvar['factor'] fig, _ = plt.subplots(figsize=(3.5 * ncols, 1 * nrows), squeeze=False) for j, key in enumerate(keys): ax = plt.subplot2grid((nrows, ncols), (j // ncols, j % ncols)) for s in ['right', 'top']: ax.spines[s].set_visible(False) yvar = pltvars[key] if j // ncols == nrows - 1: ax.set_xlabel('$\\rm {}\ ({})$'.format(xvar['label'], xvar['unit']), fontsize=fs) ax.set_xticks(Qbounds) else: ax.set_xticks([]) ax.spines['bottom'].set_visible(False) ax.xaxis.set_label_coords(0.5, -0.1) ax.yaxis.set_label_coords(-0.02, 0.5) for item in ax.get_xticklabels() + ax.get_yticklabels(): item.set_fontsize(fs) ymin = np.inf ymax = -np.inf # Plot effective variable for each selected z-value y0 = lookupsoff[key] for i, z in enumerate(zref): y = lookups2D[key][i] if 'alpha' in key or 'beta' in key: y[y > y0.max() * 2] = np.nan ax.plot(Qref * xvar.get('factor', 1), y * yvar.get('factor', 1), c=sm.to_rgba(z)) ymin = min(ymin, y.min()) ymax = max(ymax, y.max()) # Plot reference variable ax.plot(Qref * xvar.get('factor', 1), y0 * yvar.get('factor', 1), '--', c='k') ymax = max(ymax, y0.max()) ymin = min(ymin, y0.min()) # Set axis y-limits if 'alpha' in key or 'beta' in key: ymax = y0.max() * 2 ylim = [ymin * yvar.get('factor', 1), ymax * yvar.get('factor', 1)] if key == 'Cm': factor = 1e1 ylim = [np.floor(ylim[0] * factor) / factor, np.ceil(ylim[1] * factor) / factor] else: factor = 1 / np.power(10, np.floor(np.log10(ylim[1]))) ylim = [np.floor(ylim[0] * factor) / factor, np.ceil(ylim[1] * factor) / factor] ax.set_yticks(ylim) ax.set_ylim(ylim) ax.set_ylabel('$\\rm {}\ ({})$'.format(yvar['label'], yvar['unit']), fontsize=fs, rotation=0, ha='right', va='center') fig.suptitle('{} neuron: {} \n modulated effective variables'.format(neuron.name, zvar['label'])) # Plot colorbar fig.subplots_adjust(left=0.20, bottom=0.05, top=0.8, right=0.80, hspace=0.5) cbarax = fig.add_axes([0.10, 0.90, 0.80, 0.02]) fig.colorbar(sm, cax=cbarax, orientation='horizontal') cbarax.set_xlabel('{} ({}{})'.format( zvar['label'], prefix, zvar['unit']), fontsize=fs) for item in cbarax.get_yticklabels(): item.set_fontsize(fs) return fig diff --git a/PySONIC/utils.py b/PySONIC/utils.py index 62fc9dd..33f5635 100644 --- a/PySONIC/utils.py +++ b/PySONIC/utils.py @@ -1,835 +1,462 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Date: 2016-09-19 22:30:46 # @Email: theo.lemaire@epfl.ch # @Last Modified by: Theo Lemaire -# @Last Modified time: 2019-06-06 19:02:42 +# @Last Modified time: 2019-06-06 21:20:41 ''' Definition of generic utility functions used in other modules ''' import csv from functools import wraps import operator import time import os import math import pickle from tqdm import tqdm import logging import tkinter as tk from tkinter import filedialog import numpy as np import colorlog -from scipy.interpolate import interp1d # Package logger my_log_formatter = colorlog.ColoredFormatter( '%(log_color)s %(asctime)s %(message)s', datefmt='%d/%m/%Y %H:%M:%S:', reset=True, log_colors={ 'DEBUG': 'green', 'INFO': 'white', 'WARNING': 'yellow', 'ERROR': 'red', 'CRITICAL': 'red,bg_white', }, style='%') def setHandler(logger, handler): for h in logger.handlers: logger.removeHandler(h) logger.addHandler(handler) return logger def setLogger(name, formatter): handler = colorlog.StreamHandler() handler.setFormatter(formatter) logger = colorlog.getLogger(name) logger.addHandler(handler) return logger class TqdmHandler(logging.StreamHandler): def __init__(self, formatter): logging.StreamHandler.__init__(self) self.setFormatter(formatter) def emit(self, record): msg = self.format(record) tqdm.write(msg) logger = setLogger('PySONIC', my_log_formatter) # SI units prefixes si_prefixes = { 'y': 1e-24, # yocto 'z': 1e-21, # zepto 'a': 1e-18, # atto 'f': 1e-15, # femto 'p': 1e-12, # pico 'n': 1e-9, # nano 'u': 1e-6, # micro 'm': 1e-3, # mili '': 1e0, # None 'k': 1e3, # kilo 'M': 1e6, # mega 'G': 1e9, # giga 'T': 1e12, # tera 'P': 1e15, # peta 'E': 1e18, # exa 'Z': 1e21, # zetta 'Y': 1e24, # yotta } - - def si_format(x, precision=0, space=' '): ''' Format a float according to the SI unit system, with the appropriate prefix letter. ''' if isinstance(x, float) or isinstance(x, int) or isinstance(x, np.float) or\ isinstance(x, np.int32) or isinstance(x, np.int64): if x == 0: factor = 1e0 prefix = '' else: sorted_si_prefixes = sorted(si_prefixes.items(), key=operator.itemgetter(1)) vals = [tmp[1] for tmp in sorted_si_prefixes] # vals = list(si_prefixes.values()) ix = np.searchsorted(vals, np.abs(x)) - 1 if np.abs(x) == vals[ix + 1]: ix += 1 factor = vals[ix] prefix = sorted_si_prefixes[ix][0] # prefix = list(si_prefixes.keys())[ix] return '{{:.{}f}}{}{}'.format(precision, space, prefix).format(x / factor) elif isinstance(x, list) or isinstance(x, tuple): return [si_format(item, precision, space) for item in x] elif isinstance(x, np.ndarray) and x.ndim == 1: return [si_format(float(item), precision, space) for item in x] else: print(type(x)) def pow10_format(number, precision=2): ''' Format a number in power of 10 notation. ''' ret_string = '{0:.{1:d}e}'.format(number, precision) a, b = ret_string.split("e") a = float(a) b = int(b) return '{}10^{{{}}}'.format('{} * '.format(a) if a != 1. else '', b) +def plural(n): + if n < 0: + raise ValueError('Cannot format negative integer (n = {})'.format(n)) + if n == 0: + return '' + else: + return 's' + + def rmse(x1, x2): ''' Compute the root mean square error between two 1D arrays ''' return np.sqrt(((x1 - x2) ** 2).mean()) def rsquared(x1, x2): ''' compute the R-squared coefficient between two 1D arrays ''' residuals = x1 - x2 ss_res = np.sum(residuals**2) ss_tot = np.sum((x1 - np.mean(x1))**2) return 1 - (ss_res / ss_tot) -def getInDict(d, key, func): - ''' Return value of specific dictionary key, or function return alias if not there. ''' - if key in d: - return d[key] - else: - return func() - - def Pressure2Intensity(p, rho=1075.0, c=1515.0): ''' Return the spatial peak, pulse average acoustic intensity (ISPPA) associated with the specified pressure amplitude. :param p: pressure amplitude (Pa) :param rho: medium density (kg/m3) :param c: speed of sound in medium (m/s) :return: spatial peak, pulse average acoustic intensity (W/m2) ''' return p**2 / (2 * rho * c) def Intensity2Pressure(I, rho=1075.0, c=1515.0): ''' Return the pressure amplitude associated with the specified spatial peak, pulse average acoustic intensity (ISPPA). :param I: spatial peak, pulse average acoustic intensity (W/m2) :param rho: medium density (kg/m3) :param c: speed of sound in medium (m/s) :return: pressure amplitude (Pa) ''' return np.sqrt(2 * rho * c * I) def OpenFilesDialog(filetype, dirname=''): ''' Open a FileOpenDialogBox to select one or multiple file. The default directory and file type are given. :param dirname: default directory :param filetype: default file type :return: tuple of full paths to the chosen filenames ''' root = tk.Tk() root.withdraw() filenames = filedialog.askopenfilenames(filetypes=[(filetype + " files", '.' + filetype)], initialdir=dirname) if filenames: par_dir = os.path.abspath(os.path.join(filenames[0], os.pardir)) else: par_dir = None return (filenames, par_dir) def selectDirDialog(): ''' Open a dialog box to select a directory. :return: full path to selected directory ''' root = tk.Tk() root.withdraw() return filedialog.askdirectory() def SaveFileDialog(filename, dirname=None, ext=None): ''' Open a dialog box to save file. :param filename: filename :param dirname: initial directory :param ext: default extension :return: full path to the chosen filename ''' root = tk.Tk() root.withdraw() filename_out = filedialog.asksaveasfilename( defaultextension=ext, initialdir=dirname, initialfile=filename) return filename_out def loadData(fpath, frequency=1): ''' Load dataframe and metadata dictionary from pickle file. ''' logger.info('Loading data from "%s"', os.path.basename(fpath)) with open(fpath, 'rb') as fh: frame = pickle.load(fh) df = frame['data'].iloc[::frequency] meta = frame['meta'] return df, meta -def downsample(t_dense, y, nsparse): - ''' Decimate periodic signals to a specified number of samples.''' - - if(y.ndim) > 1: - nsignals = y.shape[0] - else: - nsignals = 1 - y = np.array([y]) - - # determine time step and period of input signal - T = t_dense[-1] - t_dense[0] - dt_dense = t_dense[1] - t_dense[0] - - # resample time vector linearly - t_ds = np.linspace(t_dense[0], t_dense[-1], nsparse) - - # create MAV window - nmav = int(0.03 * T / dt_dense) - if nmav % 2 == 0: - nmav += 1 - mav = np.ones(nmav) / nmav - - # determine signals padding - npad = int((nmav - 1) / 2) - - # determine indexes of sampling on convolved signals - ids = np.round(np.linspace(0, t_dense.size - 1, nsparse)).astype(int) - - y_ds = np.empty((nsignals, nsparse)) - - # loop through signals - for i in range(nsignals): - # pad, convolve and resample - pad_left = y[i, -(npad + 2):-2] - pad_right = y[i, 1:npad + 1] - y_ext = np.concatenate((pad_left, y[i, :], pad_right), axis=0) - y_mav = np.convolve(y_ext, mav, mode='valid') - y_ds[i, :] = y_mav[ids] - - if nsignals == 1: - y_ds = y_ds[0, :] - - return (t_ds, y_ds) - - def rescale(x, lb=None, ub=None, lb_new=0, ub_new=1): ''' Rescale a value to a specific interval by linear transformation. ''' if lb is None: lb = x.min() if ub is None: ub = x.max() xnorm = (x - lb) / (ub - lb) return xnorm * (ub_new - lb_new) + lb_new -def getNeuronLookupsFile(mechname, a=None, Fdrive=None, Adrive=None, fs=False): - fpath = os.path.join( - os.path.split(__file__)[0], - 'neurons', - '{}_lookups'.format(mechname) - ) - if a is not None: - fpath += '_{:.0f}nm'.format(a * 1e9) - if Fdrive is not None: - fpath += '_{:.0f}kHz'.format(Fdrive * 1e-3) - if Adrive is not None: - fpath += '_{:.0f}kPa'.format(Adrive * 1e-3) - if fs is True: - fpath += '_fs' - return '{}.pkl'.format(fpath) - - -def getLookups4D(mechname): - ''' Retrieve 4D lookup tables and reference vectors for a given membrane mechanism. - - :param mechname: name of membrane density mechanism - :return: 4-tuple with 1D numpy arrays of reference input vectors (charge density and - one other variable), a dictionary of associated 2D lookup numpy arrays, and - a dictionary with information about the other variable. - ''' - - # Check lookup file existence - lookup_path = getNeuronLookupsFile(mechname) - if not os.path.isfile(lookup_path): - raise FileNotFoundError('Missing lookup file: "{}"'.format(lookup_path)) - - # Load lookups dictionary - # logger.debug('Loading %s lookup table', mechname) - with open(lookup_path, 'rb') as fh: - df = pickle.load(fh) - inputs = df['input'] - lookups4D = df['lookup'] - - # Retrieve 1D inputs from lookups dictionary - aref = inputs['a'] - Fref = inputs['f'] - Aref = inputs['A'] - Qref = inputs['Q'] - - return aref, Fref, Aref, Qref, lookups4D - - -def getLookupsOff(mechname): - ''' Retrieve appropriate US-OFF lookup tables and reference vectors - for a given membrane mechanism. - - :param mechname: name of membrane density mechanism - :return: 2-tuple with 1D numpy array of reference charge density - and dictionary of associated 1D lookup numpy arrays. - ''' - - # Get 4D lookups and input vectors - aref, Fref, Aref, Qref, lookups4D = getLookups4D(mechname) - - # Perform 2D projection in appropriate dimensions - logger.debug('Interpolating lookups at A = 0') - lookups_off = {key: y4D[0, 0, 0, :] for key, y4D in lookups4D.items()} - - return Qref, lookups_off - - -def getLookups2D(mechname, a=None, Fdrive=None, Adrive=None): - ''' Retrieve appropriate 2D lookup tables and reference vectors - for a given membrane mechanism, projected at a specific combination - of sonophore radius, US frequency and/or acoustic pressure amplitude. - - :param mechname: name of membrane density mechanism - :param a: sonophore radius (m) - :param Fdrive: US frequency (Hz) - :param Adrive: Acoustic peak pressure amplitude (Hz) - :return: 4-tuple with 1D numpy arrays of reference input vectors (charge density and - one other variable), a dictionary of associated 2D lookup numpy arrays, and - a dictionary with information about the other variable. - ''' - - # Get 4D lookups and input vectors - aref, Fref, Aref, Qref, lookups4D = getLookups4D(mechname) - - # Check that inputs are within lookup range - if a is not None: - a = isWithin('radius', a, (aref.min(), aref.max())) - if Fdrive is not None: - Fdrive = isWithin('frequency', Fdrive, (Fref.min(), Fref.max())) - if Adrive is not None: - Adrive = isWithin('amplitude', Adrive, (Aref.min(), Aref.max())) - - # Determine projection dimensions based on inputs - var_a = {'name': 'a', 'label': 'sonophore radius', 'val': a, 'unit': 'm', 'factor': 1e9, - 'ref': aref, 'axis': 0} - var_Fdrive = {'name': 'f', 'label': 'frequency', 'val': Fdrive, 'unit': 'Hz', 'factor': 1e-3, - 'ref': Fref, 'axis': 1} - var_Adrive = {'name': 'A', 'label': 'amplitude', 'val': Adrive, 'unit': 'Pa', 'factor': 1e-3, - 'ref': Aref, 'axis': 2} - if not isinstance(Adrive, float): - var1 = var_a - var2 = var_Fdrive - var3 = var_Adrive - elif not isinstance(Fdrive, float): - var1 = var_a - var2 = var_Adrive - var3 = var_Fdrive - elif not isinstance(a, float): - var1 = var_Fdrive - var2 = var_Adrive - var3 = var_a - - # Perform 2D projection in appropriate dimensions - # logger.debug('Interpolating lookups at (%s = %s%s, %s = %s%s)', - # var1['name'], si_format(var1['val'], space=' '), var1['unit'], - # var2['name'], si_format(var2['val'], space=' '), var2['unit']) - lookups3D = {key: interp1d(var1['ref'], y4D, axis=var1['axis'])(var1['val']) - for key, y4D in lookups4D.items()} - if var2['axis'] > var1['axis']: - var2['axis'] -= 1 - lookups2D = {key: interp1d(var2['ref'], y3D, axis=var2['axis'])(var2['val']) - for key, y3D in lookups3D.items()} - - if var3['val'] is not None: - logger.debug('Interpolating lookups at %d new %s values between %s%s and %s%s', - len(var3['val']), var3['name'], - si_format(min(var3['val']), space=' '), var3['unit'], - si_format(max(var3['val']), space=' '), var3['unit']) - lookups2D = {key: interp1d(var3['ref'], y2D, axis=0)(var3['val']) - for key, y2D in lookups2D.items()} - var3['ref'] = np.array(var3['val']) - - return var3['ref'], Qref, lookups2D, var3 - - -def getLookups2Dfs(mechname, a, Fdrive, fs): - - # Check lookup file existence - lookup_path = getNeuronLookupsFile(mechname, a=a, Fdrive=Fdrive, fs=True) - if not os.path.isfile(lookup_path): - raise FileNotFoundError('Missing lookup file: "{}"'.format(lookup_path)) - - # Load lookups dictionary - logger.debug('Loading %s lookup table with fs = %.0f%%', mechname, fs * 1e2) - with open(lookup_path, 'rb') as fh: - df = pickle.load(fh) - inputs = df['input'] - lookups3D = df['lookup'] - - # Retrieve 1D inputs from lookups dictionary - fsref = inputs['fs'] - Aref = inputs['A'] - Qref = inputs['Q'] - - # Check that fs is within lookup range - fs = isWithin('coverage', fs, (fsref.min(), fsref.max())) - - # Perform projection at fs - logger.debug('Interpolating lookups at fs = %s%%', fs * 1e2) - lookups2D = {key: interp1d(fsref, y3D, axis=2)(fs) for key, y3D in lookups3D.items()} - - return Aref, Qref, lookups2D - - -def getLookupsDCavg(mechname, a, Fdrive, amps=None, charges=None, DCs=1.0): - ''' Get the DC-averaged lookups of a specific neuron for a combination of US amplitudes, - charge densities and duty cycles, at a specific US frequency. - - :param mechname: name of membrane density mechanism - :param a: sonophore radius (m) - :param Fdrive: US frequency (Hz) - :param amps: US amplitudes (Pa) - :param charges: membrane charge densities (C/m2) - :param DCs: duty cycle value(s) - :return: 4-tuple with reference values of US amplitude and charge density, - as well as interpolated Vmeff and QSS gating variables - ''' - - # Get lookups for specific (a, f, A) combination - Aref, Qref, lookups2D, _ = getLookups2D(mechname, a=a, Fdrive=Fdrive) - if 'ng' in lookups2D: - lookups2D.pop('ng') - - # Derive inputs from lookups reference if not provided - if amps is None: - amps = Aref - if charges is None: - charges = Qref - - # Transform inputs into arrays if single value provided - if isinstance(amps, float): - amps = np.array([amps]) - if isinstance(charges, float): - charges = np.array([charges]) - if isinstance(DCs, float): - DCs = np.array([DCs]) - nA, nQ, nDC = amps.size, charges.size, DCs.size - cs = {True: 's', False: ''} - # logger.debug('%u amplitude%s, %u charge%s, %u DC%s', - # nA, cs[nA > 1], nQ, cs[nQ > 1], nDC, cs[nDC > 1]) - - # Re-interpolate lookups at input charges - lookups2D = {key: interp1d(Qref, y2D, axis=1)(charges) for key, y2D in lookups2D.items()} - - # Interpolate US-ON (for each input amplitude) and US-OFF (A = 0) lookups - amps = isWithin('amplitude', amps, (Aref.min(), Aref.max())) - lookups_on = {key: interp1d(Aref, y2D, axis=0)(amps) for key, y2D in lookups2D.items()} - lookups_off = {key: interp1d(Aref, y2D, axis=0)(0.0) for key, y2D in lookups2D.items()} - - # Compute DC-averaged lookups - lookups_DCavg = {} - for key in lookups2D.keys(): - x_on, x_off = lookups_on[key], lookups_off[key] - x_avg = np.empty((nA, nQ, nDC)) - for iA, Adrive in enumerate(amps): - for iDC, DC in enumerate(DCs): - x_avg[iA, :, iDC] = x_on[iA, :] * DC + x_off * (1 - DC) - lookups_DCavg[key] = x_avg - - return amps, charges, lookups_DCavg - - def isWithin(name, val, bounds, rel_tol=1e-9): ''' Check if a floating point number is within an interval. If the value falls outside the interval, an error is raised. If the value falls just outside the interval due to rounding errors, the associated interval bound is returned. :param val: float value :param bounds: interval bounds (float tuple) :return: original or corrected value ''' if isinstance(val, list) or isinstance(val, np.ndarray) or isinstance(val, tuple): return [isWithin(name, v, bounds, rel_tol) for v in val] if val >= bounds[0] and val <= bounds[1]: return val elif val < bounds[0] and math.isclose(val, bounds[0], rel_tol=rel_tol): logger.warning('Rounding %s value (%s) to interval lower bound (%s)', name, val, bounds[0]) return bounds[0] elif val > bounds[1] and math.isclose(val, bounds[1], rel_tol=rel_tol): logger.warning('Rounding %s value (%s) to interval upper bound (%s)', name, val, bounds[1]) return bounds[1] else: raise ValueError('{} value ({}) out of [{}, {}] interval'.format( name, val, bounds[0], bounds[1])) -def getLookupsCompTime(mechname): - - # Check lookup file existence - lookup_path = getNeuronLookupsFile(mechname) - if not os.path.isfile(lookup_path): - raise FileNotFoundError('Missing lookup file: "{}"'.format(lookup_path)) - - # Load lookups dictionary - logger.debug('Loading comp times') - with open(lookup_path, 'rb') as fh: - df = pickle.load(fh) - tcomps4D = df['tcomp'] - - return np.sum(tcomps4D) - - def getLowIntensitiesSTN(): ''' Return an array of acoustic intensities (W/m2) used to study the STN neuron in Tarnaud, T., Joseph, W., Martens, L., and Tanghe, E. (2018). Computational Modeling of Ultrasonic Subthalamic Nucleus Stimulation. IEEE Trans Biomed Eng. ''' return np.hstack(( np.arange(10, 101, 10), np.arange(101, 131, 1), np.array([140]) )) # W/m2 def getDistribution(xmin, xmax, nx, scale='lin'): if scale == 'log': xmin, xmax = np.log10(xmin), np.log10(xmax) return {'lin': np.linspace, 'log': np.logspace}[scale](xmin, xmax, nx) def getDistFromList(xlist): if not isinstance(xlist, list): raise TypeError('Input must be a list') if len(xlist) != 4: raise ValueError('List must contain exactly 4 arguments ([type, min, max, n])') scale = xlist[0] if scale not in ('log', 'lin'): raise ValueError('Unknown distribution type (must be "lin" or "log")') xmin, xmax = [float(x) for x in xlist[1:-1]] if xmin >= xmax: raise ValueError('Specified minimum higher or equal than specified maximum') nx = int(xlist[-1]) if nx < 2: raise ValueError('Specified number must be at least 2') return getDistribution(xmin, xmax, nx, scale=scale) -def parseUSAmps(args, defaults): - - # Check if several mutually exclusive arguments were provided - Aparams = ['Arange', 'Irange', 'amp', 'intensity'] - if sum([x in args for x in Aparams]) > 1: - raise ValueError('You must provide only one of the following arguments: {}'.format( - ', '.join(Aparams))) - - if 'Arange' in args: - return getDistFromList(args['Arange']) * 1e3 # Pa - elif 'Irange' in args: - return Intensity2Pressure(getDistFromList(args['Irange']) * 1e4) # Pa - elif 'amp' in args: - return np.array(args['amp']) * 1e3 # Pa - elif 'intensity' in args: - return Intensity2Pressure(np.array(args['intensity']) * 1e4) # Pa - return np.array(defaults['amp']) * 1e3 # Pa - - -def addUSAmps(ap): - ap.add_argument('-A', '--amp', nargs='+', type=float, help='Acoustic pressure amplitude (kPa)') - ap.add_argument('--Arange', type=str, nargs='+', help='Amplitude range [scale min max n] (kPa)') - ap.add_argument('-I', '--intensity', nargs='+', type=float, help='Acoustic intensity (W/cm2)') - ap.add_argument('--Irange', type=str, nargs='+', - help='Intensity range [scale min max n] (W/cm2)') - -def parseElecAmps(args, defaults): - - # Check if several mutually exclusive arguments were provided - Aparams = ['Arange', 'amp'] - if sum([x in args for x in Aparams]) > 1: - raise ValueError('You must provide only one of the following arguments: {}'.format( - ', '.join(Aparams))) - - if 'Arange' in args: - return getDistFromList(args['Arange']) # mA/m2 - elif 'amp' in args: - return np.array(args['amp']) # mA/m2 - return np.array(defaults['amp']) # mA/m2 - - def getIndex(container, value): ''' Return the index of a float / string value in a list / array :param container: list / 1D-array of elements :param value: value to search for :return: index of value (if found) ''' if isinstance(value, float): container = np.array(container) imatches = np.where(np.isclose(container, value, rtol=1e-9, atol=1e-16))[0] if len(imatches) == 0: raise ValueError('{} not found in {}'.format(value, container)) return imatches[0] elif isinstance(value, str): return container.index(value) def debug(func): ''' Print the function signature and return value. ''' @wraps(func) def wrapper_debug(*args, **kwargs): args_repr = [repr(a) for a in args] kwargs_repr = [f"{k}={v!r}" for k, v in kwargs.items()] signature = '{}({})'.format(func.__name__, ', '.join(args_repr + kwargs_repr)) print('Calling {}'.format(signature)) value = func(*args, **kwargs) print(f"{func.__name__!r} returned {value!r}") return value return wrapper_debug def timer(func): ''' Monitor and return the runtime of the decorated function. ''' @wraps(func) def wrapper(*args, **kwargs): start_time = time.perf_counter() value = func(*args, **kwargs) end_time = time.perf_counter() run_time = end_time - start_time return value, run_time return wrapper def logCache(fpath, delimiter='\t', out_type=float): ''' Add an extra IO memoization functionality to a function using file caching, to avoid repetitions of tedious computations with identical inputs. ''' def wrapper_with_args(func): @wraps(func) def wrapper(*args, **kwargs): # If function has history -> do not log if 'history' in kwargs: return func(*args, **kwargs) # Translate function arguments into string signature args_repr = [repr(a) for a in args] kwargs_repr = [f"{k}={v!r}" for k, v in kwargs.items()] signature = '{}({})'.format(func.__name__, ', '.join(args_repr + kwargs_repr)) # If entry present in log, return corresponding output if os.path.isfile(fpath): with open(fpath, 'r', newline='') as f: reader = csv.reader(f, delimiter=delimiter) for row in reader: if row[0] == signature: logger.info('entry found in "{}"'.format(os.path.basename(fpath))) return out_type(row[1]) # Otherwise, compute output and log it into file before returning out = func(*args, **kwargs) with open(fpath, 'a', newline='') as csvfile: writer = csv.writer(csvfile, delimiter=delimiter) writer.writerow([signature, str(out)]) return out return wrapper return wrapper_with_args def fileCache(root, fcode_func, load_func=pickle.load, dump_func=pickle.dump): def wrapper_with_args(func): @wraps(func) def wrapper(*args, **kwargs): # Get file path from root and function arguments, using fcode function fpath = os.path.join(os.path.abspath(root), '{}.pkl'.format(fcode_func(*args))) # If file exists, load output from it if os.path.isfile(fpath): print('loading data from "{}"'.format(fpath)) with open(fpath, 'rb') as f: out = load_func(f) # Otherwise, execute function and create the file to dump the output else: out = func(*args, **kwargs) print('dumping data in "{}"'.format(fpath)) with open(fpath, 'wb') as f: dump_func(out, f) return out return wrapper return wrapper_with_args def binarySearch(bool_func, args, ix, xbounds, dx_thr, history=None): ''' Use a binary search to determine the threshold satisfying a given condition within a continuous search interval. :param bool_func: boolean function returning whether condition is satisfied :param args: list of function arguments other than refined value :param xbounds: search interval for threshold (progressively refined) :param dx_thr: accuracy criterion for threshold :return: excitation threshold ''' # Assign empty history if first function call if history is None: history = [] # Compute function output at interval mid-point x = (xbounds[0] + xbounds[1]) / 2 sim_args = args[:] sim_args.insert(ix, x) history.append(bool_func(sim_args)) # If titration interval is small enough conv = False if (xbounds[1] - xbounds[0]) <= dx_thr: logger.debug('titration interval smaller than defined threshold') # If both conditions have been encountered during titration process, # we're going towards convergence if (0 in history and 1 in history): logger.debug('converging around threshold') # If current value satisfies condition, convergence is achieved # -> return threshold if history[-1]: logger.debug('currently satisfying condition -> convergence') return x # If only one condition has been encountered during titration process, # then no titration is impossible within the defined interval -> return NaN else: logger.warning('titration does not converge within this interval') return np.nan # Return threshold if convergence is reached, otherwise refine interval and iterate if conv: return x else: if x > 0.: xbounds = (xbounds[0], x) if history[-1] else (x, xbounds[1]) else: xbounds = (x, xbounds[1]) if history[-1] else (xbounds[0], x) return binarySearch(bool_func, args, ix, xbounds, dx_thr, history=history) - - -def resolveDependencies(deps, join_items=True): - ''' Solve a dictionary of dependencies. - - :param arg: dependency dictionary in which the values are the dependencies - of their respective keys. - :param join_items: boolean specifying whether or not to serialize output - :return: list of inter-dependent elements in resolved order - ''' - - # Transform input dictionary of lists into dictionary of sets, - # while removing circular (auto) dependencies - deps = dict((k, set([x for x in deps[k] if x != k])) for k in deps) - - # Initialize empty list of resolved dependencies - resolved_deps = [] - - # Iterate while dependencies not entirely resolved - while deps: - # Extract latest items without dependencies (values that are not in keys - # and keys without value) into a set - nd_items = set(i for v in deps.values() for i in v) - set(deps.keys()) - nd_items.update(k for k, v in deps.items() if not v) - - # Append new set of non-dependent items to output list - resolved_deps.append(nd_items) - - # Remove those items from remaining dependencies in input dictionary - deps = dict(((k, v - nd_items) for k, v in deps.items() if v)) - - # If specified, merge list of sets into a unique list (while preserving order) - if join_items: - tmp = [] - for item in resolved_deps: - tmp += list(item) - resolved_deps = tmp - - return resolved_deps - - -def plural(n): - if n < 0: - raise ValueError('Cannot format negative integer (n = {})'.format(n)) - if n == 0: - return '' - else: - return 's'