diff --git a/.gitignore b/.gitignore index 46f0ba9..f553867 100644 --- a/.gitignore +++ b/.gitignore @@ -1,115 +1,118 @@ # Test lookup files *_test.pkl +# key files +*.key + # Sphinx tools and doc _build/ _static/ _templates/ Makefile *.bat conf.py # Sublime files *.sublime-project *.sublime-workspace # PNG images *.png # Temporary stats file *.stats # Byte-compiled / optimized / DLL files __pycache__/ *.py[cod] *$py.class # C extensions *.so # NEURON object and binary files *.o *.c *.dll # Distribution / packaging .Python env/ build/ develop-eggs/ dist/ downloads/ eggs/ .eggs/ lib/ lib64/ parts/ sdist/ var/ *.egg-info/ .installed.cfg *.egg # PyInstaller # Usually these files are written by a python script from a template # before PyInstaller builds the exe, so as to inject date/other infos into it. *.manifest *.spec # Installer logs pip-log.txt pip-delete-this-directory.txt # Unit test / coverage reports htmlcov/ .tox/ .coverage .coverage.* .cache nosetests.xml coverage.xml *,cover .hypothesis/ # Translations *.mo *.pot # Django stuff: local_settings.py # Flask stuff: instance/ .webassets-cache # Scrapy stuff: .scrapy # Sphinx documentation docs/_build/ # PyBuilder target/ # Jupyter Notebook .ipynb_checkpoints # pyenv .python-version # celery beat schedule file celerybeat-schedule # dotenv .env # virtualenv .venv/ venv/ ENV/ # Spyder project settings .spyderproject # Rope project settings .ropeproject \ No newline at end of file diff --git a/PySONIC/core/lookups.py b/PySONIC/core/lookups.py index e4966c5..b2621e3 100644 --- a/PySONIC/core/lookups.py +++ b/PySONIC/core/lookups.py @@ -1,328 +1,315 @@ # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Email: theo.lemaire@epfl.ch # @Date: 2019-06-27 13:59:02 # @Last Modified by: Theo Lemaire -# @Last Modified time: 2019-10-07 17:55:42 +# @Last Modified time: 2019-10-10 10:34:55 import re import numpy as np from scipy.interpolate import interp1d from ..utils import isWithin, isIterable class Lookup: ''' Lookup object. ''' def __init__(self, refs, tables): self.refs = refs self.tables = SmartDict(tables) for k, v in self.items(): if v.shape != self.dims(): raise ValueError('{} Table dimensions {} does not match references {}'.format( k, v.shape, self.dims())) def __repr__(self): return 'Lookup{}D({})'.format(self.ndims(), ', '.join(self.inputs())) def __getitem__(self, key): return self.tables[key] def __delitem__(self, key): del self.tables[key] def __setitem__(self, key, value): self.tables[key] = value def keys(self): return self.tables.keys() def values(self): return self.tables.values() def items(self): return self.tables.items() def refitems(self): return self.refs.items() def pop(self, key): x = self.tables[key] del self.tables[key] return x def rename(self, key1, key2): self.tables[key2] = self.tables.pop(key1) def dims(self): return tuple([x.size for x in self.refs.values()]) def ndims(self): return len(self.refs) def inputs(self): return list(self.refs.keys()) def outputs(self): return list(self.keys()) def checkAgainst(self, other): if self.inputs() != other.inputs(): raise ValueError(f'Differing lookups (references names do not match)') if self.dims() != other.dims(): raise ValueError(f'Differing lookup dimensions ({self.dims()} - {other.dims()})') for k, v in self.refitems(): if (other.refs[k] != v).any(): raise ValueError(f'Differing {k} lookup reference') if self.outputs() != other.outputs(): raise ValueError(f'Differing lookups (table names do not match)') def operate(self, other, op): ''' Generic arithmetic operator. ''' if isinstance(other, int): other = float(other) if isinstance(other, self.__class__): self.checkAgainst(other) tables = {k: getattr(v, op)(other[k]) for k, v in self.items()} elif isinstance(other, float): tables = {k: getattr(v, op)(other) for k, v in self.items()} else: raise ValueError(f'Cannot {op} {self.__class__} object with {type(other)} variable') return self.__class__(self.refs, tables) def __add__(self, other): return self.operate(other, '__add__') def __sub__(self, other): return self.operate(other, '__sub__') def __mul__(self, other): return self.operate(other, '__mul__') def __div__(self, other): return self.operate(other, '__div__') def squeeze(self): new_tables = {k: v.squeeze() for k, v in self.items()} new_refs = {} for k, v in self.refitems(): if v.size > 1: new_refs[k] = v return self.__class__(new_refs, new_tables) def getAxisIndex(self, key): assert key in self.inputs(), 'Unkown input dimension: {}'.format(key) return self.inputs().index(key) def project(self, key, value): ''' Interpolate tables at specific value(s) along a given dimension. ''' if not isIterable(value): delete_input_dim = True else: delete_input_dim = False value = np.asarray(value) value = isWithin(key, value, (self.refs[key].min(), self.refs[key].max())) axis = self.getAxisIndex(key) # print('interpolating lookup along {} (axis {}) at {}'.format(key, axis, value)) new_tables = {} if self.refs[key].size == 1: for k, v in self.items(): new_tables[k] = v.mean(axis=axis) else: for k, v in self.items(): new_tables[k] = interp1d(self.refs[key], v, axis=axis)(value) new_refs = self.refs.copy() if delete_input_dim: # print('removing {} input dimension'.format(key)) del new_refs[key] else: # print('updating {} reference values'.format(key)) new_refs[key] = value return self.__class__(new_refs, new_tables) def projectN(self, projections): lkp = self.__class__(self.refs, self.tables) for k, v in projections.items(): lkp = lkp.project(k, v) return lkp def move(self, key, index): if index == -1: index = self.ndims() - 1 iref = self.getAxisIndex(key) for k in self.keys(): self.tables[k] = np.moveaxis(self.tables[k], iref, index) refkeys = list(self.refs.keys()) del refkeys[iref] refkeys = refkeys[:index] + [key] + refkeys[index:] self.refs = {k: self.refs[k] for k in refkeys} def interpVar(self, ref_value, ref_key, var_key): return np.interp( ref_value, self.refs[ref_key], self.tables[var_key], left=np.nan, right=np.nan) def interpolate1D(self, key, value): return SmartDict({k: self.interpVar(value, key, k) for k in self.outputs()}) + def tile(self, ref_name, ref_values): + ''' Tile the lookups along a new dimension. ''' + itiles = range(ref_values.size) + tables = {k: np.array([v for i in itiles]) for k, v in self.items()} + refs = {**{ref_name: ref_values}, **self.refs} + return self.__class__(refs, tables) + class SmartLookup(Lookup): def __repr__(self): return 'Smart' + super().__repr__() def projectOff(self): ''' Project for OFF periods (zero amplitude). ''' # Interpolate at zero amplitude lkp0 = self.project('A', 0.) # Move charge axis to end in all tables Qaxis = lkp0.getAxisIndex('Q') for k, v in lkp0.items(): lkp0.tables[k] = np.moveaxis(v, Qaxis, -1) # Iterate along dimensions and take first value along corresponding axis for i in range(lkp0.ndims() - 1): for k, v in lkp0.items(): lkp0.tables[k] = v[0] # Keep only charge vector in references lkp0.refs = {'Q': lkp0.refs['Q']} return lkp0 - def projectDCs(self, amps=None, DCs=1.): + def projectDC(self, amps=None, DC=1.): + ''' Project lookups at a given duty cycle.''' + # Assign default values if amps is None: amps = self.refs['A'] elif not isIterable(amps): amps = np.array([amps]) - if not isIterable(DCs): - DCs = np.array([DCs]) - # project lookups at zero and defined amps - if amps is None: - amps = self.refs['A'] lkp0 = self.project('A', 0.) - lkps = self.project('A', amps) + lkps_ON = self.project('A', amps) # Retrieve amplitude axis index, and move amplitude to first axis - A_axis = lkps.getAxisIndex('A') - lkps.move('A', 0) - - # Define empty tables dictionary - tables_DCavg = {} + A_axis = lkps_ON.getAxisIndex('A') + lkps_ON.move('A', 0) - # For each variable - for var_key in lkp0.outputs(): + # Tile the zero-amplitude lookup to match the lkps_ON dimensions + lkps_OFF = lkp0.tile('A', lkps_ON.refs['A']) - # Get OFF and ON (for all amps) variable values - x_on, x_off = lkps.tables[var_key], lkp0.tables[var_key] + # Compute a DC averaged lookup + lkp = lkps_ON * DC + lkps_OFF * (1 - DC) - # Initialize empty table to gather DC-averaged variable (DC size + ON table shape) - x_avg = np.empty((DCs.size, *x_on.shape)) - - # Compute the DC-averaged variable for each amplitude-DC combination - for iA, Adrive in enumerate(amps): - for iDC, DC in enumerate(DCs): - x_avg[iDC, iA] = x_on[iA] * DC + x_off * (1 - DC) - - # Assign table in dictionary - tables_DCavg[var_key] = x_avg - - refs_DCavg = {**{'DC': DCs}, **lkps.refs} - lkp = self.__class__(refs_DCavg, tables_DCavg) - - # Move DC ot last axis and amplitude back to its original axis - lkp.move('DC', -1) + # Move amplitude back to its original axis lkp.move('A', A_axis) + return lkp def fixLookup(lkp): if 'fs' in lkp.inputs(): if lkp.refs['fs'].size == 1: if lkp.refs['fs'][0] == 1.: lkp = lkp.project('fs', 1.) return lkp class SmartDict(): # Key patterns suffix_pattern = '[A-Za-z0-9_]+' xinf_pattern = re.compile('^({})inf$'.format(suffix_pattern)) taux_pattern = re.compile('^tau({})$'.format(suffix_pattern)) def __init__(self, d): self.d = d def __repr__(self): return 'SmartDict(' + ', '.join(self.d.keys()) + ')' def items(self): return self.d.items() def keys(self): return self.d.keys() def values(self): return self.d.values() def alphax(self, x): return self.d['alpha{}'.format(x)] def betax(self, x): return self.d['beta{}'.format(x)] def taux(self, x): return 1 / (self.alphax(x) + self.betax(x)) def xinf(self, x): return self.alphax(x) * self.taux(x) def __getitem__(self, key): if key in self.d: return self.d[key] else: m = self.taux_pattern.match(key) if m is not None: return self.taux(m.group(1)) else: m = self.xinf_pattern.match(key) if m is not None: return self.xinf(m.group(1)) else: raise KeyError(key) def __setitem__(self, key, value): self.d[key] = value def pop(self, key): return self.d.pop(key) if __name__ == '__main__': refs = { 'a': np.logspace(np.log10(16), np.log10(64), 5) * 1e-9, 'f': np.array([100, 200, 500, 1e3, 2e3, 3e3, 4e3]) * 1e3, 'A': np.logspace(np.log10(1), np.log10(600), 100) * 1e3, 'Q': np.arange(-80, 50) } dims = [refs[x].shape for x in refs.keys()] tables = { 'alpham': np.ones(dims) * 2, 'betam': np.ones(dims) * 3 } lkp4d = SmartLookup(refs, tables) print(lkp4d, lkp4d.dims()) lkp1d = lkp4d.projectN({'a': 32e-9, 'f': 500e3, 'A': 100e3}) print(lkp1d, lkp1d.dims()) for k in ['alpham', 'betam', 'taum', 'minf']: print(k, lkp1d[k]) diff --git a/PySONIC/core/nbls.py b/PySONIC/core/nbls.py index 46369d3..3875c7b 100644 --- a/PySONIC/core/nbls.py +++ b/PySONIC/core/nbls.py @@ -1,696 +1,670 @@ # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Email: theo.lemaire@epfl.ch # @Date: 2016-09-29 16:16:19 # @Last Modified by: Theo Lemaire -# @Last Modified time: 2019-10-07 19:30:20 +# @Last Modified time: 2019-10-10 16:05:58 from copy import deepcopy import logging import numpy as np import pandas as pd -from scipy import linalg from .simulators import PWSimulator, HybridSimulator, PeriodicSimulator, OnOffSimulator from .bls import BilayerSonophore from .pneuron import PointNeuron from .model import Model from .batches import Batch from ..utils import * from ..constants import * from ..postpro import getFixedPoints from .lookups import SmartLookup, SmartDict, fixLookup NEURONS_LOOKUP_DIR = os.path.abspath(os.path.split(__file__)[0] + "/../neurons/") class NeuronalBilayerSonophore(BilayerSonophore): ''' This class inherits from the BilayerSonophore class and receives an PointNeuron instance at initialization, to define the electro-mechanical NICE model and its SONIC variant. ''' tscale = 'ms' # relevant temporal scale of the model simkey = 'ASTIM' # keyword used to characterize simulations made with this model def __init__(self, a, pneuron, Fdrive=None, embedding_depth=0.0): ''' Constructor of the class. :param a: in-plane radius of the sonophore structure within the membrane (m) :param pneuron: point-neuron model :param Fdrive: frequency of acoustic perturbation (Hz) :param embedding_depth: depth of the embedding tissue around the membrane (m) ''' # Check validity of input parameters if not isinstance(pneuron, PointNeuron): raise ValueError('Invalid neuron type: "{}" (must inherit from PointNeuron class)' .format(pneuron.name)) self.pneuron = pneuron # Initialize BilayerSonophore parent object BilayerSonophore.__init__(self, a, pneuron.Cm0, pneuron.Qm0(), embedding_depth) def __repr__(self): s = '{}({:.1f} nm, {}'.format(self.__class__.__name__, self.a * 1e9, self.pneuron) if self.d > 0.: s += ', d={}m'.format(si_format(self.d, precision=1, space=' ')) return s + ')' def params(self): return {**super().params(), **self.pneuron.params()} def getPltVars(self, wrapleft='df["', wrapright='"]'): return {**super().getPltVars(wrapleft, wrapright), **self.pneuron.getPltVars(wrapleft, wrapright)} def getPltScheme(self): return self.pneuron.getPltScheme() def filecode(self, *args): return Model.filecode(self, *args) @staticmethod def inputs(): # Get parent input vars and supress irrelevant entries bls_vars = BilayerSonophore.inputs() pneuron_vars = PointNeuron.inputs() del bls_vars['Qm'] del pneuron_vars['Astim'] # Fill in current input vars in appropriate order inputvars = bls_vars inputvars.update(pneuron_vars) inputvars['fs'] = { 'desc': 'sonophore membrane coverage fraction', 'label': 'f_s', 'unit': '\%', 'factor': 1e2, 'precision': 0 } inputvars['method'] = None return inputvars def filecodes(self, Fdrive, Adrive, tstim, toffset, PRF, DC, fs, method): # Get parent codes and supress irrelevant entries bls_codes = super().filecodes(Fdrive, Adrive, 0.0) pneuron_codes = self.pneuron.filecodes(0.0, tstim, toffset, PRF, DC) for x in [bls_codes, pneuron_codes]: del x['simkey'] del bls_codes['Qm'] del pneuron_codes['Astim'] # Fill in current codes in appropriate order codes = { 'simkey': self.simkey, 'neuron': pneuron_codes.pop('neuron'), 'nature': pneuron_codes.pop('nature') } codes.update(bls_codes) codes.update(pneuron_codes) codes['fs'] = 'fs{:.0f}%'.format(fs * 1e2) if fs < 1 else None codes['method'] = method return codes @staticmethod def interpOnOffVariable(key, Qm, stim, lkp): ''' Interpolate Q-dependent effective variable along ON and OFF periods of a solution. :param key: lookup variable key :param Qm: charge density solution vector :param stim: stimulation state solution vector :param lkp: dictionary of lookups for ON and OFF states :return: interpolated effective variable vector ''' x = np.zeros(stim.size) x[stim == 0] = lkp['OFF'].interpVar(Qm[stim == 0], 'Q', key) x[stim == 1] = lkp['ON'].interpVar(Qm[stim == 1], 'Q', key) return x @staticmethod def spatialAverage(fs, x, x0): ''' fs-modulated spatial averaging. ''' return fs * x + (1 - fs) * x0 def computeEffVars(self, Fdrive, Adrive, Qm, fs): ''' Compute "effective" coefficients of the HH system for a specific combination of stimulus frequency, stimulus amplitude and charge density. A short mechanical simulation is run while imposing the specific charge density, until periodic stabilization. The HH coefficients are then averaged over the last acoustic cycle to yield "effective" coefficients. :param Fdrive: acoustic drive frequency (Hz) :param Adrive: acoustic drive amplitude (Pa) :param Qm: imposed charge density (C/m2) :param fs: list of sonophore membrane coverage fractions :return: list with computation time and a list of dictionaries of effective variables ''' # Run simulation and retrieve deflection and gas content vectors from last cycle data, meta = BilayerSonophore.simulate(self, Fdrive, Adrive, Qm) Z_last = data.loc[-NPC_DENSE:, 'Z'].values # m Cm_last = self.v_capacitance(Z_last) # F/m2 # For each coverage fraction effvars = [] for x in fs: # Compute membrane capacitance and membrane potential vectors Cm = self.spatialAverage(x, Cm_last, self.Cm0) # F/m2 Vm = Qm / Cm * 1e3 # mV # Compute average cycle value for membrane potential and rate constants effvars.append({**{'V': np.mean(Vm)}, **self.pneuron.getEffRates(Vm)}) # Log process log = '{}: lookups @ {}Hz, {}Pa, {:.2f} nC/cm2'.format( self, *si_format([Fdrive, Adrive], precision=1, space=' '), Qm * 1e5) if len(fs) > 1: log += ', fs = {:.0f} - {:.0f}%'.format(fs.min() * 1e2, fs.max() * 1e2) log += ', tcomp = {:.3f} s'.format(meta['tcomp']) logger.info(log) # Return effective coefficients return [meta['tcomp'], effvars] def getLookupFileName(self, a=None, Fdrive=None, Adrive=None, fs=False): fname = '{}_lookups'.format(self.pneuron.name) if a is not None: fname += '_{:.0f}nm'.format(a * 1e9) if Fdrive is not None: fname += '_{:.0f}kHz'.format(Fdrive * 1e-3) if Adrive is not None: fname += '_{:.0f}kPa'.format(Adrive * 1e-3) if fs is True: fname += '_fs' return '{}.pkl'.format(fname) def getLookupFilePath(self, *args, **kwargs): return os.path.join(NEURONS_LOOKUP_DIR, self.getLookupFileName(*args, **kwargs)) def getLookup(self, *args, **kwargs): lookup_path = self.getLookupFilePath(*args, **kwargs) if not os.path.isfile(lookup_path): raise FileNotFoundError('Missing lookup file: "{}"'.format(lookup_path)) with open(lookup_path, 'rb') as fh: frame = pickle.load(fh) if 'ng' in frame['lookup']: del frame['lookup']['ng'] refs = frame['input'] # Move fs to last reference dimension keys = list(refs.keys()) if 'fs' in keys and keys.index('fs') < len(keys) - 1: del keys[keys.index('fs')] keys.append('fs') refs = {k: refs[k] for k in keys} lkp = SmartLookup(refs, frame['lookup']) return fixLookup(lkp) def getLookup2D(self, Fdrive, fs): if fs < 1: lkp2d = self.getLookup(a=self.a, Fdrive=Fdrive, fs=True).project('fs', fs) else: lkp2d = self.getLookup().projectN({'a': self.a, 'f': Fdrive}) return lkp2d def fullDerivatives(self, t, y, Fdrive, Adrive, phi, fs): ''' Compute the full system derivatives. :param t: specific instant in time (s) :param y: vector of state variables :param Fdrive: acoustic drive frequency (Hz) :param Adrive: acoustic drive amplitude (Pa) :param phi: acoustic drive phase (rad) :param fs: sonophore membrane coevrage fraction (-) :return: vector of derivatives ''' dydt_mech = BilayerSonophore.derivatives( self, t, y[:3], Fdrive, Adrive, y[3], phi) dydt_elec = self.pneuron.derivatives( t, y[3:], Cm=self.spatialAverage(fs, self.capacitance(y[1]), self.Cm0)) return dydt_mech + dydt_elec def effDerivatives(self, t, y, lkp1d): ''' Compute the derivatives of the n-ODE effective HH system variables, based on 1-dimensional linear interpolation of "effective" coefficients that summarize the system's behaviour over an acoustic cycle. :param t: specific instant in time (s) :param y: vector of HH system variables at time t :param lkp: dictionary of 1D data points of "effective" coefficients over the charge domain, for specific frequency and amplitude values. :return: vector of effective system derivatives at time t ''' Qm, *states = y states_dict = dict(zip(self.pneuron.statesNames(), states)) lkp0d = lkp1d.interpolate1D('Q', Qm) dQmdt = - self.pneuron.iNet(lkp0d['V'], states_dict) * 1e-3 return [dQmdt, *self.pneuron.getDerEffStates(lkp0d, states_dict)] + def QSSderivatives(self, t, y, lkp1d): + ''' Compute the derivatives of the 1D, charge-casted quasi-steady state system. ''' + Qm = y[0] + lkp0d = lkp1d.interpolate1D('Q', Qm) + QSS0d = {k: v(lkp0d) for k, v in self.pneuron.quasiSteadyStates().items()} + dQmdt = - self.pneuron.iNet(lkp0d['V'], QSS0d) * 1e-3 + return [dQmdt] + def __simFull(self, Fdrive, Adrive, tstim, toffset, PRF, DC, fs, phi=np.pi): # Determine time step dt = 1 / (NPC_DENSE * Fdrive) # Compute initial non-zero deflection Z = self.computeInitialDeflection(Adrive, Fdrive, phi, self.Qm0, dt) # Set initial conditions ss0 = self.pneuron.getSteadyStates(self.pneuron.Vm0) y0 = np.concatenate(([0., 0., self.ng0, self.Qm0], ss0)) y1 = np.concatenate(([0., Z, self.ng0, self.Qm0], ss0)) # Initialize simulator and compute solution logger.debug('Computing detailed solution') simulator = PWSimulator( lambda t, y: self.fullDerivatives(t, y, Fdrive, Adrive, phi, fs), lambda t, y: self.fullDerivatives(t, y, 0., 0., 0., fs)) t, y, stim = simulator( y1, dt, tstim, toffset, PRF, DC, target_dt=CLASSIC_TARGET_DT, print_progress=logger.getEffectiveLevel() <= logging.INFO, monitor_func=None) # monitor_func=lambda t, y: f't = {t * 1e3:.5f} ms, Qm = {y[3] * 1e5:.2f} nC/cm2') # Prepend initial conditions (prior to stimulation) t, y, stim = simulator.prependSolution(t, y, stim, y0=y0) # Store output in dataframe and return data = pd.DataFrame({ 't': t, 'stimstate': stim, 'Z': y[:, 1], 'ng': y[:, 2], 'Qm': y[:, 3] }) data['Vm'] = data['Qm'].values / self.spatialAverage( fs, self.v_capacitance(data['Z'].values), self.Cm0) * 1e3 # mV for i in range(len(self.pneuron.states)): data[self.pneuron.statesNames()[i]] = y[:, i + 4] return data def __simHybrid(self, Fdrive, Adrive, tstim, toffset, PRF, DC, fs, phi=np.pi): # Determine time steps dt_dense, dt_sparse = [1. / (n * Fdrive) for n in [NPC_DENSE, NPC_SPARSE]] # Compute initial non-zero deflection Z = self.computeInitialDeflection(Adrive, Fdrive, phi, self.Qm0, dt_dense) # Set initial conditions ss0 = self.pneuron.getSteadyStates(self.pneuron.Vm0) y0 = np.concatenate(([0., 0., self.ng0, self.Qm0], ss0)) y1 = np.concatenate(([0., Z, self.ng0, self.Qm0], ss0)) # Initialize simulator and compute solution is_dense_var = np.array([True] * 3 + [False] * (len(self.pneuron.states) + 1)) logger.debug('Computing hybrid solution') simulator = HybridSimulator( lambda t, y: self.fullDerivatives(t, y, Fdrive, Adrive, phi, fs), lambda t, y: self.fullDerivatives(t, y, 0., 0., 0., fs), lambda t, y, Cm: self.pneuron.derivatives( t, y, Cm=self.spatialAverage(fs, Cm, self.Cm0)), lambda yref: self.capacitance(yref[1]), is_dense_var, ivars_to_check=[1, 2]) t, y, stim = simulator(y1, dt_dense, dt_sparse, 1. / Fdrive, tstim, toffset, PRF, DC) # Prepend initial conditions (prior to stimulation) t, y, stim = simulator.prependSolution(t, y, stim, y0=y0) # Store output in dataframe and return data = pd.DataFrame({ 't': t, 'stimstate': stim, 'Z': y[:, 1], 'ng': y[:, 2], 'Qm': y[:, 3] }) data['Vm'] = data['Qm'].values / self.spatialAverage( fs, self.v_capacitance(data['Z'].values), self.Cm0) * 1e3 # mV for i in range(len(self.pneuron.states)): data[self.pneuron.statesNames()[i]] = y[:, i + 4] return data def __simSonic(self, Fdrive, Adrive, tstim, toffset, PRF, DC, fs): # Load appropriate 2D lookups lkp2d = self.getLookup2D(Fdrive, fs) # Interpolate 2D lookups at zero and US amplitude logger.debug('Interpolating lookups at A = %.2f kPa and A = 0', Adrive * 1e-3) lkps1d = {'ON': lkp2d.project('A', Adrive), 'OFF': lkp2d.project('A', 0.)} # Set initial conditions y0 = np.concatenate(([self.Qm0], self.pneuron.getSteadyStates(self.pneuron.Vm0))) # Initialize simulator and compute solution logger.debug('Computing effective solution') simulator = PWSimulator( lambda t, y: self.effDerivatives(t, y, lkps1d['ON']), lambda t, y: self.effDerivatives(t, y, lkps1d['OFF'])) t, y, stim = simulator(y0, self.pneuron.chooseTimeStep(), tstim, toffset, PRF, DC) # Prepend initial conditions (prior to stimulation) t, y, stim = simulator.prependSolution(t, y, stim) # Store output in dataframe and return data = pd.DataFrame({ 't': t, 'stimstate': stim, 'Qm': y[:, 0] }) data['Vm'] = self.interpOnOffVariable('V', data['Qm'].values, stim, lkps1d) for key in ['Z', 'ng']: data[key] = np.full(t.size, np.nan) for i in range(len(self.pneuron.states)): data[self.pneuron.statesNames()[i]] = y[:, i + 1] return data + def __simQSS(self, Fdrive, Adrive, tstim, toffset, PRF, DC, fs): + # Load appropriate 2D lookups + lkp2d = self.getLookup2D(Fdrive, fs) + + # Interpolate 2D lookups at zero and US amplitude + logger.debug('Interpolating lookups at A = %.2f kPa and A = 0', Adrive * 1e-3) + lkps1d = {'ON': lkp2d.project('A', Adrive), 'OFF': lkp2d.project('A', 0.)} + + # Compute DC-averaged lookups for stimulus duration + lkps1d['ON'] = lkps1d['ON'] * DC + lkps1d['OFF'] * (1 - DC) + + # Create 1D QSS lookup with reference charge vector + QSS_1D_lkp = { + key: SmartLookup( + lkps1d['ON'].refs, + {k: v(val) for k, v in self.pneuron.quasiSteadyStates().items()}) + for key, val in lkps1d.items()} + + # Set initial conditions + y0 = np.array([self.Qm0]) + + # Adapt tstim and toffset to pulsing protocol to correctly capture ON-OFF transition + tstim_new = (int(tstim * PRF) - 1 + DC) / PRF + toffset_new = tstim + toffset - tstim_new + + # Initialize simulator and compute solution + logger.debug('Computing QSS solution') + simulator = OnOffSimulator( + lambda t, y: self.QSSderivatives(t, y, lkps1d['ON']), + lambda t, y: self.QSSderivatives(t, y, lkps1d['OFF'])) + t, y, stim = simulator(y0, self.pneuron.chooseTimeStep(), tstim_new, toffset_new) + + # Prepend initial conditions (prior to stimulation) + t, y, stim = simulator.prependSolution(t, y, stim) + Qm = y[:, 0] + + # Store output in dataframe and return + data = pd.DataFrame({ + 't': t, + 'stimstate': stim, + 'Qm': Qm + }) + data['Vm'] = self.interpOnOffVariable('V', data['Qm'].values, stim, lkps1d) + for key in ['Z', 'ng']: + data[key] = np.full(t.size, np.nan) + + # Interpolate QSS lookup for other variables + for k, v in self.pneuron.quasiSteadyStates().items(): + data[k] = self.interpOnOffVariable(k, data['Qm'].values, stim, QSS_1D_lkp) + + return data + @classmethod @Model.checkOutputDir def simQueue(cls, freqs, amps, durations, offsets, PRFs, DCs, fs, methods, **kwargs): ''' Create a serialized 2D array of all parameter combinations for a series of individual parameter sweeps, while avoiding repetition of CW protocols for a given PRF sweep. :param freqs: list (or 1D-array) of US frequencies :param amps: list (or 1D-array) of acoustic amplitudes :param durations: list (or 1D-array) of stimulus durations :param offsets: list (or 1D-array) of stimulus offsets (paired with durations array) :param PRFs: list (or 1D-array) of pulse-repetition frequencies :param DCs: list (or 1D-array) of duty cycle values :param fs: sonophore membrane coverage fractions (-) :params methods: integration methods :return: list of parameters (list) for each simulation ''' method_ids = list(range(len(methods))) if ('full' in methods or 'hybrid' in methods) and kwargs['outputdir'] is None: logger.warning('Running cumbersome simulation(s) without file saving') if amps is None: amps = [np.nan] DCs = np.array(DCs) queue = [] if 1.0 in DCs: queue += Batch.createQueue( freqs, amps, durations, offsets, min(PRFs), 1.0, fs, method_ids) if np.any(DCs != 1.0): queue += Batch.createQueue( freqs, amps, durations, offsets, PRFs, DCs[DCs != 1.0], fs, method_ids) for item in queue: if np.isnan(item[1]): item[1] = None item[-1] = methods[int(item[-1])] return queue @Model.logNSpikes @Model.checkTitrate('Adrive') @Model.addMeta def simulate(self, Fdrive, Adrive, tstim, toffset, PRF=100., DC=1., fs=1., method='sonic'): ''' Simulate the electro-mechanical model for a specific set of US stimulation parameters, and return output data in a dataframe. :param Fdrive: acoustic drive frequency (Hz) :param Adrive: acoustic drive amplitude (Pa) :param tstim: duration of US stimulation (s) :param toffset: duration of the offset (s) :param PRF: pulse repetition frequency (Hz) :param DC: pulse duty cycle (-) :param fs: sonophore membrane coverage fraction (-) :param method: selected integration method :return: 2-tuple with the output dataframe and computation time. ''' logger.info( '%s: %s simulation @ f = %sHz, A = %sPa, t = %ss (%ss offset)%s%s', self, method, si_format(Fdrive, 0, space=' '), si_format(Adrive, 2, space=' '), *si_format([tstim, toffset], 1, space=' '), (', PRF = {}Hz, DC = {:.2f}%'.format( si_format(PRF, 2, space=' '), DC * 1e2) if DC < 1.0 else ''), ', fs = {:.2f}%'.format(fs * 1e2) if fs < 1.0 else '') # Check validity of stimulation parameters BilayerSonophore.checkInputs(Fdrive, Adrive, 0.0, 0.0) PointNeuron.checkInputs(Adrive, tstim, toffset, PRF, DC) # Call appropriate simulation function and return try: simfunc = { 'full': self.__simFull, 'hybrid': self.__simHybrid, 'sonic': self.__simSonic, 'qss': self.__simQSS }[method] except KeyError: raise ValueError('Invalid integration method: "{}"'.format(method)) return simfunc(Fdrive, Adrive, tstim, toffset, PRF, DC, fs) def meta(self, Fdrive, Adrive, tstim, toffset, PRF, DC, fs, method): return { 'simkey': self.simkey, 'neuron': self.pneuron.name, 'a': self.a, 'd': self.d, 'Fdrive': Fdrive, 'Adrive': Adrive, 'tstim': tstim, 'toffset': toffset, 'PRF': PRF, 'DC': DC, 'fs': fs, 'method': method } @staticmethod def getNSpikes(data): return PointNeuron.getNSpikes(data) @logCache(os.path.join(os.path.split(__file__)[0], 'astim_titrations.log')) def titrate(self, Fdrive, tstim, toffset, PRF=100., DC=1., fs=1., method='sonic', xfunc=None, Arange=None): ''' Use a binary search to determine the threshold amplitude needed to obtain neural excitation for a given frequency, duration, PRF and duty cycle. :param Fdrive: US frequency (Hz) :param tstim: duration of US stimulation (s) :param toffset: duration of the offset (s) :param PRF: pulse repetition frequency (Hz) :param DC: pulse duty cycle (-) :param fs: sonophore membrane coverage fraction (-) :param method: integration method :param xfunc: function determining whether condition is reached from simulation output :param Arange: search interval for Adrive, iteratively refined :return: determined threshold amplitude (Pa) ''' # Default output function if xfunc is None: xfunc = self.pneuron.titrationFunc # Default amplitude interval if Arange is None: Arange = [0., self.getLookup().refs['A'].max()] return binarySearch( lambda x: xfunc(self.simulate(*x)[0]), [Fdrive, tstim, toffset, PRF, DC, fs, method], 1, Arange, THRESHOLD_CONV_RANGE_ASTIM ) - def getQuasiSteadyStates(self, Fdrive, amps=None, charges=None, DCs=1.0, squeeze_output=False): + def getQuasiSteadyStates(self, Fdrive, amps=None, charges=None, DC=1.0, squeeze_output=False): ''' Compute the quasi-steady state values of the neuron's gating variables - for a combination of US amplitudes, charge densities and duty cycles, - at a specific US frequency. + for a combination of US amplitudes, charge densities, + at a specific US frequency and duty cycle. :param Fdrive: US frequency (Hz) :param amps: US amplitudes (Pa) :param charges: membrane charge densities (C/m2) - :param DCs: duty cycle value(s) + :param DC: duty cycle :return: 4-tuple with reference values of US amplitude and charge density, as well as interpolated Vmeff and QSS gating variables ''' - # Get DC-averaged lookups interpolated at the appropriate amplitudes and charges - lkp = self.getLookup().projectDCs(amps=amps, DCs=DCs).projectN({'a': self.a, 'f': Fdrive}) + lkp = self.getLookup().projectDC(amps=amps, DC=DC).projectN({'a': self.a, 'f': Fdrive}) if charges is not None: lkp = lkp.project('Q', charges) - # Specify dimensions with A and DC as the first two axes + # Specify dimensions with A as the first axis A_axis = lkp.getAxisIndex('A') lkp.move('A', 0) - lkp.move('DC', 1) - nA, nDC = lkp.dims()[:2] + nA = lkp.dims()[0] # Compute QSS states using these lookups - QSS = {k: np.empty(lkp.dims()) for k in self.pneuron.statesNames()} - for iA in range(nA): - for iDC in range(nDC): - lkp1d = SmartDict({k: v[iA, iDC] for k, v in lkp.items()}) - QSS_1D = {k: v(lkp1d) for k, v in self.pneuron.quasiSteadyStates().items()} - for k in QSS.keys(): - QSS[k][iA, iDC] = QSS_1D[k] - QSS = SmartLookup(lkp.refs, QSS) - - for item in [lkp, QSS]: - item.move('A', A_axis) - item.move('DC', -1) + QSS = SmartLookup( + lkp.refs, + {k: v(lkp) for k, v in self.pneuron.quasiSteadyStates().items()}) # Compress outputs if needed if squeeze_output: QSS = QSS.squeeze() lkp = lkp.squeeze() return lkp, QSS - def QSSderivatives(self, t, y, lkp1d): - ''' Compute the derivatives of the 1D, charge-casted quasi-steady state system. ''' - Qm = y[0] - lkp0d = lkp1d.interpolate1D('Q', Qm) - QSS0d = {k: v(lkp0d) for k, v in self.pneuron.quasiSteadyStates().items()} - dQmdt = - self.pneuron.iNet(lkp0d['V'], QSS0d) * 1e-3 - return [dQmdt] - - def __simQSS(self, Fdrive, Adrive, tstim, toffset, PRF, DC, fs): - # Load appropriate 2D lookups - lkp2d = self.getLookup2D(Fdrive, fs) - - # Interpolate 2D lookups at zero and US amplitude - logger.debug('Interpolating lookups at A = %.2f kPa and A = 0', Adrive * 1e-3) - lkps1d = {'ON': lkp2d.project('A', Adrive), 'OFF': lkp2d.project('A', 0.)} - - # Compute DC-averaged lookups for stimulus duration - lkps1d['ON'] = lkps1d['ON'] * DC + lkps1d['OFF'] * (1 - DC) - - # Create 1D QSS lookup with reference charge vector - QSS_1D_lkp = { - key: SmartLookup( - lkps1d['ON'].refs, - {k: v(val) for k, v in self.pneuron.quasiSteadyStates().items()}) - for key, val in lkps1d.items()} - - # Set initial conditions - y0 = np.array([self.Qm0]) - - # Adapt tstim and toffset to pulsing protocol to correctly capture ON-OFF transition - tstim_new = (int(tstim * PRF) - 1 + DC) / PRF - toffset_new = tstim + toffset - tstim_new - - # Initialize simulator and compute solution - logger.debug('Computing QSS solution') - simulator = OnOffSimulator( - lambda t, y: self.QSSderivatives(t, y, lkps1d['ON']), - lambda t, y: self.QSSderivatives(t, y, lkps1d['OFF'])) - t, y, stim = simulator(y0, self.pneuron.chooseTimeStep(), tstim_new, toffset_new) - - # Prepend initial conditions (prior to stimulation) - t, y, stim = simulator.prependSolution(t, y, stim) - Qm = y[:, 0] - - # Store output in dataframe and return - data = pd.DataFrame({ - 't': t, - 'stimstate': stim, - 'Qm': Qm - }) - data['Vm'] = self.interpOnOffVariable('V', data['Qm'].values, stim, lkps1d) - for key in ['Z', 'ng']: - data[key] = np.full(t.size, np.nan) - - # Interpolate QSS lookup for other variables - for k, v in self.pneuron.quasiSteadyStates().items(): - data[k] = self.interpOnOffVariable(k, data['Qm'].values, stim, QSS_1D_lkp) - - return data - def iNetQSS(self, Qm, Fdrive, Adrive, DC): ''' Compute quasi-steady state net membrane current for a given combination of US parameters and a given membrane charge density. :param Qm: membrane charge density (C/m2) :param Fdrive: US frequency (Hz) :param Adrive: US amplitude (Pa) :param DC: duty cycle (-) :return: net membrane current (mA/m2) ''' lkp, QSS = self.getQuasiSteadyStates( - Fdrive, amps=Adrive, charges=Qm, DCs=DC, squeeze_output=True) + Fdrive, amps=Adrive, charges=Qm, DC=DC, squeeze_output=True) return self.pneuron.iNet(lkp['V'], QSS) # mA/m2 def fixedPointsQSS(self, Fdrive, Adrive, DC, lkp, dQdt): ''' Compute QSS fixed points along the charge dimension for a given combination of US parameters, and determine their stability. :param Fdrive: US frequency (Hz) :param Adrive: US amplitude (Pa) :param DC: duty cycle (-) :param lkp: lookup dictionary for effective variables along charge dimension :param dQdt: charge derivative profile along charge dimension :return: 2-tuple with values of stable and unstable fixed points ''' pltvars = self.getPltVars() logger.debug('A = {:.2f} kPa, DC = {:.0f}%'.format(Adrive * 1e-3, DC * 1e2)) # Extract fixed points from QSS charge variation profile def dfunc(Qm): return - self.iNetQSS(Qm, Fdrive, Adrive, DC) fixed_points = getFixedPoints( lkp.refs['Q'], dQdt, filter='both', der_func=dfunc).tolist() dfunc = lambda x: np.array(self.effDerivatives(_, x, lkp)) - classified_fixed_points = {'stable': [], 'unstable': [], 'saddle': []} + + # classified_fixed_points = {'stable': [], 'unstable': [], 'saddle': []} + classified_fixed_points = [] np.set_printoptions(precision=2) # For each fixed point for i, Qm in enumerate(fixed_points): # Re-compute QSS at fixed point - *_, QSS = self.getQuasiSteadyStates(Fdrive, amps=Adrive, charges=Qm, DCs=DC, + *_, QSS = self.getQuasiSteadyStates(Fdrive, amps=Adrive, charges=Qm, DC=DC, squeeze_output=True) - # Approximate the system's Jacobian matrix at the fixed-point and compute its eigenvalues + # Classify fixed point stability by numerically evaluating its Jacobian and + # computing its eigenvalues x = np.array([Qm, *QSS.tables.values()]) - print(f'x = {x}, dfunx(x) = {dfunc(x)}') - eps_machine = np.sqrt(np.finfo(float).eps) - J = jacobian(dfunc, x, rel_eps=eps_machine, method='forward') - # import numdifftools as nd - # Jfunc = nd.Jacobian(dfunc, order=3) - # J = Jfunc(x) - # print('------------------ Jacobian ------------------') - # names = ['Q'] + self.pneuron.statesNames() - # for name, Jline in zip(names, J): - # print(f'd(d{name}dt) = {Jline}') - - # Determine fixed point stability based on eigenvalues - eigvals, eigvecs = linalg.eig(J) - s = ['({0.real:.2e} + {0.imag:.2e}j)'.format(x) for x in eigvals] - print(f'eigenvalues = {s}') - is_real_eigvals = np.isreal(eigvals) - print(is_real_eigvals) - is_neg_eigvals = eigvals.real < 0 - if np.all(is_neg_eigvals): - key = 'stable' - elif np.any(is_neg_eigvals): - key = 'saddle' - else: - key = 'unstable' - classified_fixed_points[key].append(Qm) - logger.debug(f'{key} fixed point @ Q = {(Qm * 1e5):.1f} nC/cm2') + eigvals, key = classifyFixedPoint(x, dfunc) + # classified_fixed_points[key].append(Qm) + + classified_fixed_points.append((x, eigvals, key)) + # eigenvalues.append(eigvals) + logger.debug(f'{key} point @ Q = {(Qm * 1e5):.1f} nC/cm2') + + # eigenvalues = np.array(eigenvalues).T + # print(eigenvalues.shape) return classified_fixed_points def isStableQSS(self, Fdrive, Adrive, DC): lookups, QSS = self.getQuasiSteadyStates( Fdrive, amps=Adrive, DCs=DC, squeeze_output=True) dQdt = -self.pneuron.iNet(lookups['V'], QSS.tables) # mA/m2 classified_fixed_points = self.fixedPointsQSS(Fdrive, Adrive, DC, lookups, dQdt) return len(classified_fixed_points['stable']) > 0 def titrateQSS(self, Fdrive, DC=1., Arange=None): # Default amplitude interval if Arange is None: Arange = [0., self.getLookup().refs['A'].max()] # Titration function def xfunc(x): if self.pneuron.name == 'STN': return self.isStableQSS(*x) else: return not self.isStableQSS(*x) return binarySearch( xfunc, [Fdrive, DC], 1, Arange, THRESHOLD_CONV_RANGE_ASTIM) diff --git a/PySONIC/neurons/sundt.py b/PySONIC/neurons/sundt.py new file mode 100644 index 0000000..d942c65 --- /dev/null +++ b/PySONIC/neurons/sundt.py @@ -0,0 +1,125 @@ +# -*- coding: utf-8 -*- +# @Author: Mariia Popova +# @Email: theo.lemaire@epfl.ch +# @Date: 2019-10-03 15:58:38 +# @Last Modified by: Theo Lemaire +# @Last Modified time: 2019-10-10 15:28:25 + +import numpy as np + +from ..core import PointNeuron + + +class Sundt(PointNeuron): + ''' Sundt neuron only sodium and delayed-rectifier potassium currents ''' + + # Neuron name + name = 'Sundt' + + # ------------------------------ Biophysical parameters ------------------------------ + + # Resting parameters + Cm0 = 1e-6 # Membrane capacitance (F/m2) + Vm0 = -60 # Membrane potential (mV) + + # Reversal potentials (mV) + ENa = 55.0 # Sodium + EK = -90.0 # Potassium + ELeak = -110 # Non-specific leakage ??? + + # Maximal channel conductances (S/m2) + gNabar = 400.0 # Sodium + gKdbar = 400.0 # Delayed-rectifier Potassium + gLeak = 200.0 # Non-specific leakage ????? + + # Additional parameters + #VT = -56.2 # Spike threshold adjustment parameter (mV) ???? + + # ------------------------------ States names & descriptions ------------------------------ + states = { + 'm': 'iNa activation gate', + 'h': 'iNa inactivation gate', + 'n': 'iKdr gate', + 'l': 'iKdr Borg-Graham formalism gate' + } + + # ------------------------------ Gating states kinetics ------------------------------ + + @classmethod + def alpham(cls, Vm): + return 0.55 * (7.1 - Vm) / np.exp((7.1-Vm) / 4) * 1e3 # s-1 ?? do i need 1e3 + + @classmethod + def betam(cls, Vm): + return 0.48 * cls.vtrap((Vm - 46.1), 5) * 1e3 # s-1 ?? do i need 1e3 + + @classmethod + def alphah(cls, Vm): + return 0.22 * np.exp((23 - Vm) / 18) * 1e3 # s-1 ?? do i need 1e3 + + @classmethod + def betah(cls, Vm): + return 6.92 / (1 + np.exp((46 - Vm) / 5)) * 1e3 # s-1 ?? do i need 1e3 + + @classmethod + def alphan(cls, Vm): + return np.exp((-5e-3 * (Vm + 32) * 9.648e4) / 2562.35) * 1e3 # s-1 ?? do i need 1e3 + + @classmethod + def betan(cls, Vm): + return np.exp((-2e-3 * (Vm + 32) * 9.648e4) / 2562.35) * 1e3 # s-1 ?? do i need 1e3 + + @classmethod + def alphal(cls, Vm): + return np.exp((2e-3 * (Vm + 61) * 9.648e4) / 2562.35) * 1e3 # s-1 ?? do i need 1e3 + + @classmethod + def betal(cls, Vm): + return np.exp((-2e-3 * (Vm + 32) * 9.648e4) / 2562.35) * 1e3 # s-1 ?? do i need 1e3 + + # ------------------------------ States derivatives ------------------------------ + + @classmethod + def derStates(cls): + return { + 'm': lambda Vm, x: cls.alpham(Vm) * (1 - x['m']) - cls.betam(Vm) * x['m'], #? + 'h': lambda Vm, x: cls.alphah(Vm) * (1 - x['h']) - cls.betah(Vm) * x['h'], #? + 'n': lambda Vm, x: cls.alphan(Vm) * (1 - x['n']) - cls.betan(Vm) * x['n'], #? + 'l': lambda Vm, x: cls.alphal(Vm) * (1 - x['l']) - cls.betal(Vm) * x['l'] #? + } + + # ------------------------------ Steady states ------------------------------ + + @classmethod + def steadyStates(cls): + return { + 'm': lambda Vm: cls.alpham(Vm) / (cls.alpham(Vm) + cls.betam(Vm)), + 'h': lambda Vm: cls.alphah(Vm) / (cls.alphah(Vm) + cls.betah(Vm)), + 'n': lambda Vm: 1 / (cls.alphan(Vm) + 1), + 'l': lambda Vm: 1 / (cls.alphal(Vm) + 1) + } + + # ------------------------------ Membrane currents ------------------------------ + + @classmethod + def iNa(cls, m, h, Vm): + ''' Sodium current ''' + return cls.gNabar * m**3 * h * (Vm - cls.ENa) # mA/m2 + + @classmethod + def iKdr(cls, n, l, Vm): + ''' delayed-rectifier Potassium current ''' + return cls.gKdbar * n**3 * l * (Vm - cls.EK) # mA/m2 + + @classmethod + def iLeak(cls, Vm): + ''' non-specific leakage current ''' + return cls.gLeak * (Vm - cls.ELeak) # mA/m2 + + @classmethod + def currents(cls): + return { + 'iNa': lambda Vm, x: cls.iNa(x['m'], x['h'], Vm), + 'iKdr': lambda Vm, x: cls.iKdr(x['n'], x['l'], Vm), + 'iLeak': lambda Vm, _: cls.iLeak(Vm) + } diff --git a/PySONIC/plt/QSS.py b/PySONIC/plt/QSS.py index 2591ea9..9ed47b2 100644 --- a/PySONIC/plt/QSS.py +++ b/PySONIC/plt/QSS.py @@ -1,500 +1,519 @@ # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Email: theo.lemaire@epfl.ch # @Date: 2019-06-04 18:24:29 # @Last Modified by: Theo Lemaire -# @Last Modified time: 2019-07-18 21:12:46 +# @Last Modified time: 2019-10-10 16:29:34 import inspect import logging import pandas as pd import numpy as np import matplotlib.pyplot as plt from ..core import NeuronalBilayerSonophore, Batch from .pltutils import * -from ..utils import logger, fileCache +from ..utils import logger, fileCache, alert root = '../../../QSS analysis/data' FP_colors = { 'stable': 'tab:green', 'saddle': 'tab:orange', 'unstable': 'tab:red' } def plotVarQSSDynamics(pneuron, a, Fdrive, Adrive, charges, varname, varrange, fs=12): ''' Plot the QSS-approximated derivative of a specific variable as function of the variable itself, as well as equilibrium values, for various membrane charge densities at a given acoustic amplitude. :param pneuron: point-neuron model :param a: sonophore radius (m) :param Fdrive: US frequency (Hz) :param Adrive: US amplitude (Pa) :param charges: charge density vector (C/m2) :param varname: name of variable to plot :param varrange: range over which to compute the derivative :return: figure handle ''' # Extract information about variable to plot pltvar = pneuron.getPltVars()[varname] # Get methods to compute derivative and steady-state of variable of interest derX_func = getattr(pneuron, 'der{}{}'.format(varname[0].upper(), varname[1:])) Xinf_func = getattr(pneuron, '{}inf'.format(varname)) derX_args = inspect.getargspec(derX_func)[0][1:] Xinf_args = inspect.getargspec(Xinf_func)[0][1:] # Get dictionary of charge and amplitude dependent QSS variables nbls = NeuronalBilayerSonophore(a, pneuron, Fdrive) _, Qref, lookups, QSS = nbls.getQuasiSteadyStates( Fdrive, amps=Adrive, charges=charges, squeeze_output=True) df = QSS df['Vm'] = lookups['V'] # Create figure fig, ax = plt.subplots(figsize=(6, 4)) ax.set_title('{} neuron - QSS {} dynamics @ {:.2f} kPa'.format( pneuron.name, pltvar['desc'], Adrive * 1e-3), fontsize=fs) ax.set_xscale('log') for key in ['top', 'right']: ax.spines[key].set_visible(False) ax.set_xlabel('$\\rm {}\ ({})$'.format(pltvar['label'], pltvar.get('unit', '')), fontsize=fs) ax.set_ylabel('$\\rm QSS\ d{}/dt\ ({}/s)$'.format(pltvar['label'], pltvar.get('unit', '1')), fontsize=fs) ax.set_ylim(-40, 40) ax.axhline(0, c='k', linewidth=0.5) y0_str = '{}0'.format(varname) if hasattr(pneuron, y0_str): ax.axvline(getattr(pneuron, y0_str) * pltvar.get('factor', 1), label=y0_str, c='k', linewidth=0.5) # For each charge value icolor = 0 for j, Qm in enumerate(charges): lbl = 'Q = {:.0f} nC/cm2'.format(Qm * 1e5) # Compute variable derivative as a function of its value, as well as equilibrium value, # keeping other variables at quasi steady-state derX_inputs = [varrange if arg == varname else df[arg][j] for arg in derX_args] Xinf_inputs = [df[arg][j] for arg in Xinf_args] dX_QSS = pneuron.derCai(*derX_inputs) Xeq_QSS = pneuron.Caiinf(*Xinf_inputs) # Plot variable derivative and its root as a function of the variable itself c = 'C{}'.format(icolor) ax.plot(varrange * pltvar.get('factor', 1), dX_QSS * pltvar.get('factor', 1), c=c, label=lbl) ax.axvline(Xeq_QSS * pltvar.get('factor', 1), linestyle='--', c=c) icolor += 1 ax.legend(frameon=False, fontsize=fs - 3) for item in ax.get_xticklabels() + ax.get_yticklabels(): item.set_fontsize(fs) fig.tight_layout() fig.canvas.set_window_title('{}_QSS_{}_dynamics_{:.2f}kPa'.format( pneuron.name, varname, Adrive * 1e-3)) return fig def plotQSSdynamics(pneuron, a, Fdrive, Adrive, DC=1., fs=12): ''' Plot effective membrane potential, quasi-steady states and resulting membrane currents as a function of membrane charge density, for a given acoustic amplitude. :param pneuron: point-neuron model :param a: sonophore radius (m) :param Fdrive: US frequency (Hz) :param Adrive: US amplitude (Pa) :return: figure handle ''' # Get neuron-specific pltvars pltvars = pneuron.getPltVars() # Compute neuron-specific charge and amplitude dependent QS states at this amplitude nbls = NeuronalBilayerSonophore(a, pneuron, Fdrive) lookups, QSS = nbls.getQuasiSteadyStates(Fdrive, amps=Adrive, DCs=DC, squeeze_output=True) Qref = lookups.refs['Q'] Vmeff = lookups['V'] # Compute QSS currents and 1D charge variation array states = {k: QSS[k] for k in pneuron.states} currents = {name: cfunc(Vmeff, states) for name, cfunc in pneuron.currents().items()} iNet = sum(currents.values()) dQdt = -iNet # Compute stable and unstable fixed points classified_FPs = nbls.fixedPointsQSS(Fdrive, Adrive, DC, lookups, dQdt) # Extract dimensionless states norm_QSS = {} for x in pneuron.states: if 'unit' not in pltvars[x]: norm_QSS[x] = QSS[x] # Create figure fig, axes = plt.subplots(3, 1, figsize=(7, 9)) axes[-1].set_xlabel('$\\rm Q_m\ (nC/cm^2)$', fontsize=fs) for ax in axes: for skey in ['top', 'right']: ax.spines[skey].set_visible(False) for item in ax.get_xticklabels() + ax.get_yticklabels(): item.set_fontsize(fs) for item in ax.get_xticklabels(minor=True): item.set_visible(False) fig.suptitle('{} neuron - QSS dynamics @ {:.2f} kPa, {:.0f}%DC'.format( pneuron.name, Adrive * 1e-3, DC * 1e2), fontsize=fs) # Subplot: Vmeff ax = axes[0] ax.set_ylabel('$V_m^*$ (mV)', fontsize=fs) ax.plot(Qref * 1e5, Vmeff, color='k') ax.axhline(pneuron.Vm0, linewidth=0.5, color='k') # Subplot: dimensionless quasi-steady states cset = plt.get_cmap('Dark2').colors + plt.get_cmap('tab10').colors ax = axes[1] ax.set_ylabel('QSS gating variables (-)', fontsize=fs) ax.set_yticks([0, 0.5, 1]) ax.set_ylim([-0.05, 1.05]) for i, (label, QS_state) in enumerate(norm_QSS.items()): ax.plot(Qref * 1e5, QS_state, label=label, c=cset[i]) # Subplot: currents ax = axes[2] cset = plt.get_cmap('tab10').colors ax.set_ylabel('QSS currents ($\\rm A/m^2$)', fontsize=fs) for i, (k, I) in enumerate(currents.items()): ax.plot(Qref * 1e5, -I * 1e-3, '--', c=cset[i], label='$\\rm -{}$'.format(pneuron.getPltVars()[k]['label'])) ax.plot(Qref * 1e5, -iNet * 1e-3, color='k', label='$\\rm -I_{Net}$') ax.axhline(0, color='k', linewidth=0.5) for k, v in classified_FPs.items(): if len(v) > 0: ax.scatter(np.array(v) * 1e5, np.zeros(len(v)), marker='.', s=200, facecolors=FP_colors[k], edgecolors='none', label=f'{k} fixed points', zorder=3) fig.tight_layout() fig.subplots_adjust(right=0.8) for ax in axes[1:]: ax.legend(loc='center right', fontsize=fs, frameon=False, bbox_to_anchor=(1.3, 0.5)) for ax in axes[:-1]: ax.set_xticklabels([]) fig.canvas.set_window_title( '{}_QSS_dynamics_vs_Qm_{:.2f}kPa_DC{:.0f}%'.format(pneuron.name, Adrive * 1e-3, DC * 1e2)) return fig def plotQSSVarVsQm(pneuron, a, Fdrive, varname, amps=None, DC=1., fs=12, cmap='viridis', yscale='lin', zscale='lin', mpi=False, loglevel=logging.INFO): ''' Plot a specific QSS variable (state or current) as a function of membrane charge density, for various acoustic amplitudes. :param pneuron: point-neuron model :param a: sonophore radius (m) :param Fdrive: US frequency (Hz) :param amps: US amplitudes (Pa) :param DC: duty cycle (-) :param varname: extraction key for variable to plot :return: figure handle ''' # Extract information about variable to plot pltvar = pneuron.getPltVars()[varname] Qvar = pneuron.getPltVars()['Qm'] Afactor = 1e-3 logger.info('plotting %s neuron QSS %s vs. Qm for various amplitudes @ %.0f%% DC', pneuron.name, pltvar['desc'], DC * 1e2) nbls = NeuronalBilayerSonophore(a, pneuron, Fdrive) # Get reference dictionaries for zero amplitude lookups0, QSS0 = nbls.getQuasiSteadyStates(Fdrive, amps=0., squeeze_output=True) Vmeff0 = lookups0['V'] Qref = lookups0.refs['Q'] df0 = QSS0.tables df0['Vm'] = Vmeff0 # Create figure fig, ax = plt.subplots(figsize=(6, 4)) title = '{} neuron - QSS {} vs. Qm - {:.0f}% DC'.format(pneuron.name, varname, DC * 1e2) ax.set_title(title, fontsize=fs) ax.set_xlabel('$\\rm {}\ ({})$'.format(Qvar['label'], Qvar['unit']), fontsize=fs) ax.set_ylabel('$\\rm QSS\ {}\ ({})$'.format(pltvar['label'], pltvar.get('unit', '')), fontsize=fs) if yscale == 'log': ax.set_yscale('log') for key in ['top', 'right']: ax.spines[key].set_visible(False) # Plot y-variable reference line, if any y0 = None y0_str = '{}0'.format(varname) if hasattr(pneuron, y0_str): y0 = getattr(pneuron, y0_str) * pltvar.get('factor', 1) elif varname in pneuron.getCurrentsNames() + ['iNet', 'dQdt']: y0 = 0. y0_str = '' if y0 is not None: ax.axhline(y0, label=y0_str, c='k', linewidth=0.5) # Plot reference QSS profile of variable as a function of charge density var0 = extractPltVar( pneuron, pltvar, pd.DataFrame({k: df0[k] for k in df0.keys()}), name=varname) ax.plot(Qref * Qvar['factor'], var0, '--', c='k', zorder=1, label='A = 0') if varname == 'dQdt': # Plot charge fixed points for each acoustic amplitude classified_FPs = getQSSFixedPointsvsAdrive( nbls, Fdrive, amps, DC, mpi=mpi, loglevel=loglevel) for k, v in classified_FPs.items(): if len(v) > 0: _, Q_FPs = np.array(v).T ax.scatter(np.array(Q_FPs) * 1e5, np.zeros(len(v)), marker='.', s=100, facecolors=FP_colors[k], edgecolors='none', label=f'{k} fixed points') # Define color code mymap = plt.get_cmap(cmap) zref = amps * Afactor norm, sm = setNormalizer(mymap, (zref.min(), zref.max()), zscale) # Get amplitude-dependent QSS dictionary lookups, QSS = nbls.getQuasiSteadyStates( Fdrive, amps=amps, DCs=DC, squeeze_output=True) df = QSS.tables df['Vm'] = lookups['V'] # Plot QSS profiles for various amplitudes for i, A in enumerate(amps): var = extractPltVar( pneuron, pltvar, pd.DataFrame({k: df[k][i] for k in df.keys()}), name=varname) ax.plot(Qref * Qvar['factor'], var, c=sm.to_rgba(A * Afactor), zorder=0) # Add legend and adjust layout ax.legend(frameon=False, fontsize=fs) for item in ax.get_xticklabels() + ax.get_yticklabels(): item.set_fontsize(fs) fig.tight_layout() fig.subplots_adjust(bottom=0.15, top=0.9, right=0.80, hspace=0.5) # Plot amplitude colorbar if amps is not None: cbarax = fig.add_axes([0.85, 0.15, 0.03, 0.75]) fig.colorbar(sm, cax=cbarax) cbarax.set_ylabel('Amplitude (kPa)', fontsize=fs) for item in cbarax.get_yticklabels(): item.set_fontsize(fs) fig.canvas.set_window_title('{}_QSS_{}_vs_Qm_{}A_{:.2f}-{:.2f}kPa_DC{:.0f}%'.format( pneuron.name, varname, zscale, amps.min() * 1e-3, amps.max() * 1e-3, DC * 1e2)) return fig -@fileCache( - root, - lambda nbls, Fdrive, amps, DC: - '{}_QSS_FPs_{:.0f}kHz_{:.2f}-{:.2f}kPa_DC{:.0f}%'.format( - nbls.pneuron.name, Fdrive * 1e-3, amps.min() * 1e-3, amps.max() * 1e-3, DC * 1e2) -) +# @fileCache( +# root, +# lambda nbls, Fdrive, amps, DC: +# '{}_QSS_FPs_{:.0f}kHz_{:.2f}-{:.2f}kPa_DC{:.0f}%'.format( +# nbls.pneuron.name, Fdrive * 1e-3, amps.min() * 1e-3, amps.max() * 1e-3, DC * 1e2) +# ) +# @alert def getQSSFixedPointsvsAdrive(nbls, Fdrive, amps, DC, mpi=False, loglevel=logging.INFO): # Compute 2D QSS charge variation array lkp2d, QSS = nbls.getQuasiSteadyStates( - Fdrive, amps=amps, DCs=DC, squeeze_output=True) + Fdrive, amps=amps, DC=DC, squeeze_output=True) dQdt = -nbls.pneuron.iNet(lkp2d['V'], QSS.tables) # mA/m2 # Generate batch queue queue = [] for iA, Adrive in enumerate(amps): - lkp1d = lkp2d.project('A', Adrive) - queue.append([Fdrive, Adrive, DC, lkp1d, dQdt[iA, :]]) + queue.append([Fdrive, Adrive, DC, lkp2d.project('A', Adrive), dQdt[iA, :]]) # Run batch to find stable and unstable fixed points at each amplitude batch = Batch(nbls.fixedPointsQSS, queue) output = batch(mpi=mpi, loglevel=loglevel) - # Sort points by amplitude - classified_FPs = {k: [] for k in output[0].keys()} - for i, Adrive in enumerate(amps): - for key in classified_FPs.keys(): - classified_FPs[key] += [(Adrive, Qm) for Qm in output[i][key]] + classified_FPs = {} + eigenvalues = [] + for A, out in zip(amps, output): + for item in out: + x, eigvals, prop = item + Qm = x[0] + if prop not in classified_FPs: + classified_FPs[prop] = [] + classified_FPs[prop] += [(A, Qm)] + eigenvalues.append(eigvals) + eigenvalues = np.array(eigenvalues).T + + # Plot root locus diagram + fig, ax = plt.subplots() + ax.set_xlabel('$Re(\lambda)$') + ax.set_ylabel('$Im(\lambda)$') + ax.axhline(0, color='k') + ax.axvline(0, color='k') + ax.set_title('root locus diagram') + states = ['Qm'] + nbls.pneuron.statesNames() + for state, eigvals in zip(states, eigenvalues): + ax.scatter(eigvals.real, eigvals.imag, label=f'$\lambda ({state})$') + ax.legend() + return classified_FPs def runAndGetStab(nbls, *args): args = list(args[:-1]) + [1., args[-1]] # hacking coverage fraction into args return nbls.pneuron.getStabilizationValue(nbls.getOutput(*args)[0]) @fileCache( root, lambda nbls, Fdrive, amps, tstim, toffset, PRF, DC: '{}_sim_FPs_{:.0f}kHz_{:.0f}ms_offset{:.0f}ms_PRF{:.0f}Hz_{:.2f}-{:.2f}kPa_DC{:.0f}%'.format( nbls.pneuron.name, Fdrive * 1e-3, tstim * 1e3, toffset * 1e3, PRF, amps.min() * 1e-3, amps.max() * 1e-3, DC * 1e2) ) def getSimFixedPointsvsAdrive(nbls, Fdrive, amps, tstim, toffset, PRF, DC, outputdir=None, mpi=False, loglevel=logging.INFO): # Run batch to find stabilization point from simulations (if any) at each amplitude queue = [[nbls, outputdir, Fdrive, Adrive, tstim, toffset, PRF, DC, 'sonic'] for Adrive in amps] batch = Batch(runAndGetStab, queue) output = batch(mpi=mpi, loglevel=loglevel) return list(zip(amps, output)) def plotEqChargeVsAmp(pneuron, a, Fdrive, amps=None, tstim=None, toffset=None, PRF=None, DC=1., fs=12, xscale='lin', compdir=None, mpi=False, loglevel=logging.INFO): ''' Plot the equilibrium membrane charge density as a function of acoustic amplitude, given an initial value of membrane charge density. :param pneuron: point-neuron model :param a: sonophore radius (m) :param Fdrive: US frequency (Hz) :param amps: US amplitudes (Pa) :return: figure handle ''' logger.info('plotting equilibrium charges for various amplitudes') # Create figure fig, ax = plt.subplots(figsize=(6, 4)) figname = '{} neuron - charge stability vs. amplitude @ {:.0f}%DC'.format( pneuron.name, DC * 1e2) ax.set_title(figname) ax.set_xlabel('Amplitude (kPa)', fontsize=fs) ax.set_ylabel('$\\rm Q_m\ (nC/cm^2)$', fontsize=fs) if xscale == 'log': ax.set_xscale('log') for skey in ['top', 'right']: ax.spines[skey].set_visible(False) for item in ax.get_xticklabels() + ax.get_yticklabels(): item.set_fontsize(fs) nbls = NeuronalBilayerSonophore(a, pneuron, Fdrive) Afactor = 1e-3 # Plot charge fixed points for each acoustic amplitude classified_FPs = getQSSFixedPointsvsAdrive( nbls, Fdrive, amps, DC, mpi=mpi, loglevel=loglevel) for k, v in classified_FPs.items(): if len(v) > 0: A_FPs, Q_FPs = np.array(v).T ax.scatter(np.array(A_FPs) * Afactor, np.array(Q_FPs) * 1e5, marker='.', s=20, facecolors=FP_colors[k], edgecolors='none', label=f'{k} fixed points') # Plot charge asymptotic stabilization points from simulations for each acoustic amplitude if compdir is not None: stab_points = getSimFixedPointsvsAdrive( nbls, Fdrive, amps, tstim, toffset, PRF, DC, outputdir=compdir, mpi=mpi, loglevel=loglevel) if len(stab_points) > 0: A_stab, Q_stab = np.array(stab_points).T ax.scatter(np.array(A_stab) * Afactor, np.array(Q_stab) * 1e5, marker='o', s=20, facecolors='none', edgecolors='k', label='stabilization points from simulations') # Post-process figure ax.set_ylim(np.array([pneuron.Qm0() - 10e-5, 0]) * 1e5) ax.legend(frameon=False, fontsize=fs) fig.tight_layout() fig.canvas.set_window_title('{}_QSS_Qstab_vs_{}A_{:.0f}%DC{}'.format( pneuron.name, xscale, DC * 1e2, '_with_comp' if compdir is not None else '' )) return fig @fileCache( root, lambda nbls, Fdrive, DCs: '{}_QSS_threshold_curve_{:.0f}kHz_DC{:.2f}-{:.2f}%'.format( nbls.pneuron.name, Fdrive * 1e-3, DCs.min() * 1e2, DCs.max() * 1e2), ext='csv' ) def getQSSThresholdAmps(nbls, Fdrive, DCs, mpi=False, loglevel=logging.INFO): queue = [[Fdrive, DC] for DC in DCs] batch = Batch(nbls.titrateQSS, queue) return batch(mpi=mpi, loglevel=loglevel) @fileCache( root, lambda nbls, Fdrive, tstim, toffset, PRF, DCs: '{}_sim_threshold_curve_{:.0f}kHz_{:.0f}ms_offset{:.0f}ms_PRF{:.0f}Hz_DC{:.2f}-{:.2f}%'.format( nbls.pneuron.name, Fdrive * 1e-3, tstim * 1e3, toffset * 1e3, PRF, DCs.min() * 1e2, DCs.max() * 1e2), ext='csv' ) def getSimThresholdAmps(nbls, Fdrive, tstim, toffset, PRF, DCs, mpi=False, loglevel=logging.INFO): # Run batch to find threshold amplitude from titrations at each DC queue = [[Fdrive, tstim, toffset, PRF, DC, 1. , 'sonic'] for DC in DCs] batch = Batch(nbls.titrate, queue) return batch(mpi=mpi, loglevel=loglevel) def plotQSSThresholdCurve(pneuron, a, Fdrive, tstim=None, toffset=None, PRF=None, DCs=None, fs=12, Ascale='lin', comp=False, mpi=False, loglevel=logging.INFO): logger.info('plotting %s neuron threshold curve', pneuron.name) if pneuron.name == 'STN': raise ValueError('cannot compute threshold curve for STN neuron') # Create figure fig, ax = plt.subplots(figsize=(6, 4)) figname = '{} neuron - threshold amplitude vs. duty cycle'.format(pneuron.name) ax.set_title(figname) ax.set_xlabel('Duty cycle (%)', fontsize=fs) ax.set_ylabel('Amplitude (kPa)', fontsize=fs) if Ascale == 'log': ax.set_yscale('log') for skey in ['top', 'right']: ax.spines[skey].set_visible(False) for item in ax.get_xticklabels() + ax.get_yticklabels(): item.set_fontsize(fs) nbls = NeuronalBilayerSonophore(a, pneuron, Fdrive) Athrs_QSS = np.array(getQSSThresholdAmps(nbls, Fdrive, DCs, mpi=mpi, loglevel=loglevel)) ax.plot(DCs * 1e2, Athrs_QSS * 1e-3, '-', c='k', label='QSS curve') if comp: Athrs_sim = np.array(getSimThresholdAmps( nbls, Fdrive, tstim, toffset, PRF, DCs, mpi=mpi, loglevel=loglevel)) ax.plot(DCs * 1e2, Athrs_sim * 1e-3, '--', c='k', label='sim curve') # Post-process figure ax.set_xlim([0, 100]) ax.set_ylim([10, 600]) ax.legend(frameon=False, fontsize=fs) fig.tight_layout() fig.canvas.set_window_title('{}_QSS_threhold_curve_{:.0f}-{:.0f}%DC_{}A{}'.format( pneuron.name, DCs.min() * 1e2, DCs.max() * 1e2, Ascale, '_with_comp' if comp else '' )) return fig diff --git a/PySONIC/utils.py b/PySONIC/utils.py index 9b6e695..001f10a 100644 --- a/PySONIC/utils.py +++ b/PySONIC/utils.py @@ -1,724 +1,798 @@ # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Email: theo.lemaire@epfl.ch # @Date: 2016-09-19 22:30:46 # @Last Modified by: Theo Lemaire -# @Last Modified time: 2019-09-20 23:03:39 +# @Last Modified time: 2019-10-10 15:36:34 ''' Definition of generic utility functions used in other modules ''' import csv from functools import wraps import operator import time import os from shutil import get_terminal_size import lockfile import math import pickle import json from tqdm import tqdm import logging import tkinter as tk from tkinter import filedialog +import base64 +import datetime import numpy as np from scipy.optimize import brentq +from scipy import linalg import colorlog +from pushbullet import Pushbullet # Package logger my_log_formatter = colorlog.ColoredFormatter( '%(log_color)s %(asctime)s %(message)s', datefmt='%d/%m/%Y %H:%M:%S:', reset=True, log_colors={ 'DEBUG': 'green', 'INFO': 'white', 'WARNING': 'yellow', 'ERROR': 'red', 'CRITICAL': 'red,bg_white', }, style='%') def setHandler(logger, handler): for h in logger.handlers: logger.removeHandler(h) logger.addHandler(handler) return logger def setLogger(name, formatter): handler = colorlog.StreamHandler() handler.setFormatter(formatter) logger = colorlog.getLogger(name) logger.addHandler(handler) return logger class TqdmHandler(logging.StreamHandler): def __init__(self, formatter): logging.StreamHandler.__init__(self) self.setFormatter(formatter) def emit(self, record): msg = self.format(record) tqdm.write(msg) logger = setLogger('PySONIC', my_log_formatter) def fillLine(text, char='-', totlength=None): ''' Surround a text with repetitions of a specific character in order to fill a line to a given total length. :param text: text to be surrounded :param char: surrounding character :param totlength: target number of characters in filled text line :return: filled text line ''' if totlength is None: totlength = get_terminal_size().columns - 1 ndashes = totlength - len(text) - 2 if ndashes < 2: return text else: nside = ndashes // 2 nleft, nright = nside, nside if ndashes % 2 == 1: nright += 1 return f'{char * nleft} {text} {char * nright}' # SI units prefixes si_prefixes = { 'y': 1e-24, # yocto 'z': 1e-21, # zepto 'a': 1e-18, # atto 'f': 1e-15, # femto 'p': 1e-12, # pico 'n': 1e-9, # nano 'u': 1e-6, # micro 'm': 1e-3, # mili '': 1e0, # None 'k': 1e3, # kilo 'M': 1e6, # mega 'G': 1e9, # giga 'T': 1e12, # tera 'P': 1e15, # peta 'E': 1e18, # exa 'Z': 1e21, # zetta 'Y': 1e24, # yotta } def si_format(x, precision=0, space=' '): ''' Format a float according to the SI unit system, with the appropriate prefix letter. ''' if isinstance(x, float) or isinstance(x, int) or isinstance(x, np.float) or\ isinstance(x, np.int32) or isinstance(x, np.int64): if x == 0: factor = 1e0 prefix = '' else: sorted_si_prefixes = sorted(si_prefixes.items(), key=operator.itemgetter(1)) vals = [tmp[1] for tmp in sorted_si_prefixes] # vals = list(si_prefixes.values()) ix = np.searchsorted(vals, np.abs(x)) - 1 if np.abs(x) == vals[ix + 1]: ix += 1 factor = vals[ix] prefix = sorted_si_prefixes[ix][0] # prefix = list(si_prefixes.keys())[ix] return '{{:.{}f}}{}{}'.format(precision, space, prefix).format(x / factor) elif isinstance(x, list) or isinstance(x, tuple): return [si_format(item, precision, space) for item in x] elif isinstance(x, np.ndarray) and x.ndim == 1: return [si_format(float(item), precision, space) for item in x] else: print(type(x)) def pow10_format(number, precision=2): ''' Format a number in power of 10 notation. ''' ret_string = '{0:.{1:d}e}'.format(number, precision) a, b = ret_string.split("e") a = float(a) b = int(b) return '{}10^{{{}}}'.format('{} * '.format(a) if a != 1. else '', b) def plural(n): if n < 0: raise ValueError('Cannot format negative integer (n = {})'.format(n)) if n == 0: return '' else: return 's' def rmse(x1, x2): ''' Compute the root mean square error between two 1D arrays ''' return np.sqrt(((x1 - x2) ** 2).mean()) def rsquared(x1, x2): ''' compute the R-squared coefficient between two 1D arrays ''' residuals = x1 - x2 ss_res = np.sum(residuals**2) ss_tot = np.sum((x1 - np.mean(x1))**2) return 1 - (ss_res / ss_tot) def Pressure2Intensity(p, rho=1075.0, c=1515.0): ''' Return the spatial peak, pulse average acoustic intensity (ISPPA) associated with the specified pressure amplitude. :param p: pressure amplitude (Pa) :param rho: medium density (kg/m3) :param c: speed of sound in medium (m/s) :return: spatial peak, pulse average acoustic intensity (W/m2) ''' return p**2 / (2 * rho * c) def Intensity2Pressure(I, rho=1075.0, c=1515.0): ''' Return the pressure amplitude associated with the specified spatial peak, pulse average acoustic intensity (ISPPA). :param I: spatial peak, pulse average acoustic intensity (W/m2) :param rho: medium density (kg/m3) :param c: speed of sound in medium (m/s) :return: pressure amplitude (Pa) ''' return np.sqrt(2 * rho * c * I) def convertPKL2JSON(): for pkl_filepath in OpenFilesDialog('pkl')[0]: logger.info('Processing {} ...'.format(pkl_filepath)) json_filepath = '{}.json'.format(os.path.splitext(pkl_filepath)[0]) with open(pkl_filepath, 'rb') as fpkl, open(json_filepath, 'w') as fjson: data = pickle.load(fpkl) json.dump(data, fjson, ensure_ascii=False, sort_keys=True, indent=4) logger.info('All done!') def OpenFilesDialog(filetype, dirname=''): ''' Open a FileOpenDialogBox to select one or multiple file. The default directory and file type are given. :param dirname: default directory :param filetype: default file type :return: tuple of full paths to the chosen filenames ''' root = tk.Tk() root.withdraw() filenames = filedialog.askopenfilenames( filetypes=[(filetype + " files", '.' + filetype)], initialdir=dirname ) if len(filenames) == 0: raise ValueError('no input file selected') par_dir = os.path.abspath(os.path.join(filenames[0], os.pardir)) return filenames, par_dir def selectDirDialog(title='Select directory'): ''' Open a dialog box to select a directory. :return: full path to selected directory ''' root = tk.Tk() root.withdraw() directory = filedialog.askdirectory(title=title) if directory == '': raise ValueError('no directory selected') return directory def SaveFileDialog(filename, dirname=None, ext=None): ''' Open a dialog box to save file. :param filename: filename :param dirname: initial directory :param ext: default extension :return: full path to the chosen filename ''' root = tk.Tk() root.withdraw() filename_out = filedialog.asksaveasfilename( defaultextension=ext, initialdir=dirname, initialfile=filename) if len(filename_out) == 0: raise ValueError('no output filepath selected') return filename_out def loadData(fpath, frequency=1): ''' Load dataframe and metadata dictionary from pickle file. ''' logger.info('Loading data from "%s"', os.path.basename(fpath)) with open(fpath, 'rb') as fh: frame = pickle.load(fh) df = frame['data'].iloc[::frequency] meta = frame['meta'] return df, meta def rescale(x, lb=None, ub=None, lb_new=0, ub_new=1): ''' Rescale a value to a specific interval by linear transformation. ''' if lb is None: lb = x.min() if ub is None: ub = x.max() xnorm = (x - lb) / (ub - lb) return xnorm * (ub_new - lb_new) + lb_new def expandRange(xmin, xmax, exp_factor=2): if xmin > xmax: raise ValueError('values must be provided in (min, max) order') xptp = xmax - xmin xmid = (xmin + xmax) / 2 xdev = xptp * exp_factor / 2 return (xmid - xdev, xmin + xdev) def isIterable(x): for t in [list, tuple, np.ndarray]: if isinstance(x, t): return True return False def isWithin(name, val, bounds, rel_tol=1e-9): ''' Check if a floating point number is within an interval. If the value falls outside the interval, an error is raised. If the value falls just outside the interval due to rounding errors, the associated interval bound is returned. :param val: float value :param bounds: interval bounds (float tuple) :return: original or corrected value ''' if isIterable(val): return np.array([isWithin(name, v, bounds, rel_tol) for v in val]) if val >= bounds[0] and val <= bounds[1]: return val elif val < bounds[0] and math.isclose(val, bounds[0], rel_tol=rel_tol): logger.warning('Rounding %s value (%s) to interval lower bound (%s)', name, val, bounds[0]) return bounds[0] elif val > bounds[1] and math.isclose(val, bounds[1], rel_tol=rel_tol): logger.warning('Rounding %s value (%s) to interval upper bound (%s)', name, val, bounds[1]) return bounds[1] else: raise ValueError('{} value ({}) out of [{}, {}] interval'.format( name, val, bounds[0], bounds[1])) def getDistribution(xmin, xmax, nx, scale='lin'): if scale == 'log': xmin, xmax = np.log10(xmin), np.log10(xmax) return {'lin': np.linspace, 'log': np.logspace}[scale](xmin, xmax, nx) def getDistFromList(xlist): if not isinstance(xlist, list): raise TypeError('Input must be a list') if len(xlist) != 4: raise ValueError('List must contain exactly 4 arguments ([type, min, max, n])') scale = xlist[0] if scale not in ('log', 'lin'): raise ValueError('Unknown distribution type (must be "lin" or "log")') xmin, xmax = [float(x) for x in xlist[1:-1]] if xmin >= xmax: raise ValueError('Specified minimum higher or equal than specified maximum') nx = int(xlist[-1]) if nx < 2: raise ValueError('Specified number must be at least 2') return getDistribution(xmin, xmax, nx, scale=scale) def getIndex(container, value): ''' Return the index of a float / string value in a list / array :param container: list / 1D-array of elements :param value: value to search for :return: index of value (if found) ''' if isinstance(value, float): container = np.array(container) imatches = np.where(np.isclose(container, value, rtol=1e-9, atol=1e-16))[0] if len(imatches) == 0: raise ValueError('{} not found in {}'.format(value, container)) return imatches[0] elif isinstance(value, str): return container.index(value) def debug(func): ''' Print the function signature and return value. ''' @wraps(func) def wrapper_debug(*args, **kwargs): args_repr = [repr(a) for a in args] kwargs_repr = [f"{k}={v!r}" for k, v in kwargs.items()] signature = '{}({})'.format(func.__name__, ', '.join(args_repr + kwargs_repr)) print('Calling {}'.format(signature)) value = func(*args, **kwargs) print(f"{func.__name__!r} returned {value!r}") return value return wrapper_debug def timer(func): ''' Monitor and return the runtime of the decorated function. ''' @wraps(func) def wrapper(*args, **kwargs): start_time = time.perf_counter() value = func(*args, **kwargs) end_time = time.perf_counter() run_time = end_time - start_time return value, run_time return wrapper def logCache(fpath, delimiter='\t', out_type=float): ''' Add an extra IO memoization functionality to a function using file caching, to avoid repetitions of tedious computations with identical inputs. ''' def wrapper_with_args(func): @wraps(func) def wrapper(*args, **kwargs): # If function has history -> do not log if 'history' in kwargs: return func(*args, **kwargs) # Translate function arguments into string signature args_repr = [repr(a) for a in args] kwargs_repr = [f"{k}={v!r}" for k, v in kwargs.items()] signature = '{}({})'.format(func.__name__, ', '.join(args_repr + kwargs_repr)) # If entry present in log, return corresponding output if os.path.isfile(fpath): with open(fpath, 'r', newline='') as f: reader = csv.reader(f, delimiter=delimiter) for row in reader: if row[0] == signature: logger.debug('entry found in "{}"'.format(os.path.basename(fpath))) return out_type(row[1]) # Otherwise, compute output and log it into file before returning out = func(*args, **kwargs) lock = lockfile.FileLock(fpath) lock.acquire() with open(fpath, 'a', newline='') as csvfile: writer = csv.writer(csvfile, delimiter=delimiter) writer.writerow([signature, str(out)]) lock.release() return out return wrapper return wrapper_with_args def fileCache(root, fcode_func, ext='json'): def wrapper_with_args(func): @wraps(func) def wrapper(*args, **kwargs): # Get load and dump functions from file extension try: load_func = { 'json': json.load, 'pkl': pickle.load, 'csv': lambda f: np.loadtxt(f, delimiter=',') }[ext] dump_func = { 'json': json.dump, 'pkl': pickle.dump, 'csv': lambda x, f: np.savetxt(f, x, delimiter=',') }[ext] except KeyError: raise ValueError('Unknown file extension') # Get read and write mode (text or binary) from file extension mode = 'b' if ext == 'pkl' else '' # Get file path from root and function arguments, using fcode function - fpath = os.path.join(os.path.abspath(root), '{}.{}'.format(fcode_func(*args), ext)) + if callable(fcode_func): + fcode = fcode_func(*args) + else: + fcode = fcode_func + fpath = os.path.join(os.path.abspath(root), '{}.{}'.format(fcode, ext)) # If file exists, load output from it if os.path.isfile(fpath): logger.info('loading data from "{}"'.format(fpath)) with open(fpath, 'r' + mode) as f: out = load_func(f) # Otherwise, execute function and create the file to dump the output else: logger.warning('reference data file not found: "{}"'.format(fpath)) out = func(*args, **kwargs) logger.info('dumping data in "{}"'.format(fpath)) lock = lockfile.FileLock(fpath) lock.acquire() with open(fpath, 'w' + mode) as f: dump_func(out, f) lock.release() return out return wrapper return wrapper_with_args def binarySearch(bool_func, args, ix, xbounds, dx_thr, history=None): ''' Use a binary search to determine the threshold satisfying a given condition within a continuous search interval. :param bool_func: boolean function returning whether condition is satisfied :param args: list of function arguments other than refined value :param xbounds: search interval for threshold (progressively refined) :param dx_thr: accuracy criterion for threshold :return: excitation threshold ''' # If first function call: check that condition changes within the interval if history is None: sim_args = args[:] # If condition not satisfied even for upper bound -> return nan sim_args.insert(ix, xbounds[1]) if not bool_func(sim_args): logger.warning('titration error: condition not satisfied for upper bound') return np.nan # If condition satisfied even for lower bound -> return nan sim_args[ix] = xbounds[0] if bool_func(sim_args): logger.warning('titration error: condition satisfied even for lower bound') return np.nan # Assign empty history history = [] # Compute function output at interval mid-point x = (xbounds[0] + xbounds[1]) / 2 sim_args = args[:] sim_args.insert(ix, x) history.append(bool_func(sim_args)) # If titration interval is small enough conv = False if (xbounds[1] - xbounds[0]) <= dx_thr: logger.debug('titration interval smaller than defined threshold') # If both conditions have been encountered during titration process, # we're going towards convergence if (0 in history and 1 in history): logger.debug('converging around threshold') # If current value satisfies condition, convergence is achieved # -> return threshold if history[-1]: logger.debug('currently satisfying condition -> convergence') return x # If only one condition has been encountered during titration process, # then no titration is impossible within the defined interval -> return NaN else: logger.warning('titration does not converge within this interval') return np.nan # Return threshold if convergence is reached, otherwise refine interval and iterate if conv: return x else: if x > 0.: xbounds = (xbounds[0], x) if history[-1] else (x, xbounds[1]) else: xbounds = (x, xbounds[1]) if history[-1] else (xbounds[0], x) return binarySearch(bool_func, args, ix, xbounds, dx_thr, history=history) def pow2Search(bool_func, args, ix, x, xmax, icall=0): ''' Use an incremental power of 2 search to determine the threshold satisfying a given condition within a continuous search interval. :param bool_func: boolean function returning whether condition is satisfied :param args: list of function arguments other than refined value :param x: value to be tested (updated at each recursion step) :param xmax: upper bound of search interval for x :return: first (i.e. minimal) x value for which the condition is satisfied ''' # Compute function output sim_args = args[:] sim_args.insert(ix, x) res = bool_func(sim_args) # If condition is satisfied -> return if res is True: # If first call -> return nan if icall == 0: logger.warning('titration error: condition satisfied for initial x value') return np.nan else: logger.debug(f'condition satisfied on call {icall} -> returning') return x # Otherwise, double x value and call function recursively else: if x > xmax: logger.error('titration error: condition not satisfied for max x value') return np.nan else: return pow2Search(bool_func, args, ix, 2 * x, xmax, icall=icall + 1) def derivative(f, x, eps, method='central'): ''' Compute the difference formula for f'(x) with perturbation size eps. :param dfunc: derivatives function, taking an array of states and returning an array of derivatives :param x: states vector :param method: difference formula: 'forward', 'backward' or 'central' :param eps: perturbation vector (same size as states vector) :return: numerical approximation of the derivative around the fixed point ''' if isIterable(x): if not isIterable(eps) or len(eps) != len(x): raise ValueError('eps must be the same size as x') elif np.sum(eps != 0.) != 1: raise ValueError('eps must be zero-valued across all but one dimensions') eps_val = np.sum(eps) else: eps_val = eps if method == 'central': df = (f(x + eps) - f(x - eps)) / 2 elif method == 'forward': df = f(x + eps) - f(x) elif method == 'backward': df = f(x) - f(x - eps) else: raise ValueError("Method must be 'central', 'forward' or 'backward'.") return df / eps_val def jacobian(dfunc, x, rel_eps=None, abs_eps=None, method='central'): ''' Evaluate the Jacobian maatrix of a (time-invariant) system, given a states vector and derivatives function. :param dfunc: derivatives function, taking an array of n states and returning an array of n derivatives :param x: n-states vector :return: n-by-n square Jacobian matrix ''' if sum(e is not None for e in [abs_eps, rel_eps]) != 1: raise ValueError('one (and only one) of "rel_eps" or "abs_eps" parameters must be provided') # Determine vector size x = np.asarray(x) n = x.size # Initialize Jacobian matrix J = np.empty((n, n)) # Create epsilon vector if rel_eps is not None: mode = 'relative' eps_vec = rel_eps else: mode = 'absolute' eps_vec = abs_eps if not isIterable(eps_vec): eps_vec = np.array([eps_vec] * n) if mode == 'relative': eps = x * eps_vec else: eps = eps_vec # Perturb each state by epsilon on both sides, re-evaluate derivatives # and assemble Jacobian matrix ei = np.zeros(n) for i in range(n): ei[i] = 1 J[:, i] = derivative(dfunc, x, eps * ei, method=method) ei[i] = 0 return J +def classifyFixedPoint(x, dfunc): + ''' Characterize the stability of a fixed point by numerically evaluating its Jacobian + matrix and evaluating the sign of the real part of its associated eigenvalues. + + :param x: n-states vector + :param dfunc: derivatives function, taking an array of n states and returning + an array of n derivatives + ''' + # Compute Jacobian numerically + # print(f'x = {x}, dfunx(x) = {dfunc(x)}') + eps_machine = np.sqrt(np.finfo(float).eps) + J = jacobian(dfunc, x, rel_eps=eps_machine, method='forward') + + # Compute eigenvalues and eigenvectors + eigvals, eigvecs = linalg.eig(J) + logger.debug(f"eigenvalues = {['({0.real:.2e} + {0.imag:.2e}j)'.format(x) for x in eigvals]}") + + # Determine fixed point stability based on eigenvalues + is_neg_eigvals = eigvals.real < 0 + if is_neg_eigvals.all(): # all real parts negative -> stable + key = 'stable' + elif is_neg_eigvals.any(): # both posivie and negative real parts -> saddle + key = 'saddle' + else: # all real parts positive -> unstable + key = 'unstable' + + return eigvals, key + + def findModifiedEq(x0, dfunc, *args): ''' Find an equilibrium variable in a modified system by searching for its derivative root within an interval around its original equilibrium. :param x0: equilibrium value in original system. :param func: derivative function, taking the variable as first parameter. :param *args: remaining arguments needed for the derivative function. :return: variable equilibrium value in modified system. ''' is_iterable = [isIterable(arg) for arg in args] if any(is_iterable): if not all(is_iterable): raise ValueError('mix of iterables and non-iterables') lengths = [len(arg) for arg in args] if not all(n == lengths[0] for n in lengths): raise ValueError(f'inputs are not of the same size: {lengths}') n = lengths[0] res = [] for i in range(n): x = [arg[i] for arg in args] res.append(findModifiedEq(x0, dfunc, *x)) return np.array(res) else: return brentq(lambda x: dfunc(x, *args), x0 * 1e-4, x0 * 1e3, xtol=1e-16) def swapFirstLetterCase(s): if s[0].islower(): return s.capitalize() else: return s[0].lower() + s[1:] def getPow10(x, direction='up'): ''' Get the power of 10 that is closest to a number, in either direction("down" or "up"). ''' round_method = {'up': np.ceil, 'down': np.floor}[direction] return np.power(10, round_method(np.log10(x))) def rotAroundPoint2D(x, theta, p): ''' Rotate a 2D vector around a center point by a given angle. :param x: 2D coordinates vector :param theta: rotation angle (in rad) :param p: 2D center point coordinates :return: 2D rotated coordinates vector ''' n1, n2 = x.shape if n1 != 2: if n2 == 2: x = x.T else: raise ValueError('x should be a 2-by-n vector') # Rotation matrix R = np.array([ [np.cos(theta), -np.sin(theta)], [np.sin(theta), np.cos(theta)], ]) # Broadcast center point to input vector ptile = np.tile(p, (x.shape[1], 1)).T # Subtract, rotate and add return R.dot(x - ptile) + ptile + + +def getKey(keyfile='pushbullet.key'): + dir_path = os.path.dirname(os.path.realpath(__file__)) + fpath = os.path.join(dir_path, keyfile) + if not os.path.isfile(fpath): + raise FileNotFoundError('pushbullet API key file not found') + with open(fpath) as f: + encoded_key = f.readlines()[0] + return base64.b64decode(str.encode(encoded_key)).decode() + + +def sendPushNotification(msg): + try: + key = getKey() + pb = Pushbullet(key) + dt = datetime.datetime.now() + s = dt.strftime('%Y-%m-%d %H:%M:%S') + push = pb.push_note('Code Messenger', f'{s}\n{msg}') + except FileNotFoundError as e: + logger.error(f'Could not send push notification: "{msg}"') + + +def alert(func): + ''' Run a function, and send a push notification upon completion, + or if an error is raised during its execution. + ''' + @wraps(func) + def wrapper(*args, **kwargs): + try: + out = func(*args, **kwargs) + sendPushNotification(f'completed "{func.__name__}" execution successfully') + return out + except BaseException as e: + sendPushNotification(f'error during "{func.__name__}" execution: {e}') + raise e + return wrapper \ No newline at end of file diff --git a/scripts/plot_QSS.py b/scripts/plot_QSS.py index a512318..2b5adf4 100644 --- a/scripts/plot_QSS.py +++ b/scripts/plot_QSS.py @@ -1,75 +1,75 @@ # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Email: theo.lemaire@epfl.ch # @Date: 2018-09-28 16:13:34 # @Last Modified by: Theo Lemaire -# @Last Modified time: 2019-10-03 20:49:47 +# @Last Modified time: 2019-10-09 15:18:55 ''' Phase-plane analysis of neuron behavior under quasi-steady state approximation. ''' import os import numpy as np import matplotlib.pyplot as plt from PySONIC.utils import logger from PySONIC.plt import plotQSSdynamics, plotQSSVarVsQm, plotEqChargeVsAmp, plotQSSThresholdCurve from PySONIC.parsers import AStimParser def main(): # Parse command line arguments parser = AStimParser() # parser.addCmap(default='viridis') parser.addAscale() parser.outputdir_dep_key = 'save' parser.addInputDir(dep_key='compare') parser.defaults['amp'] = np.logspace(np.log10(1), np.log10(600), 100) # kPa parser.defaults['tstim'] = 1000. # ms parser.defaults['toffset'] = 0. # ms args = parser.parse() logger.setLevel(args['loglevel']) if args['plot'] is None: args['plot'] = ['dQdt'] a, Fdrive, tstim, toffset, PRF = [ args[k][0] for k in ['radius', 'freq', 'tstim', 'toffset', 'PRF']] figs = [] for i, pneuron in enumerate(args['neuron']): if args['DC'].size == 1: DC = args['DC'][0] if args['amp'].size == 1: Adrive = args['amp'][0] figs.append(plotQSSdynamics(pneuron, a, Fdrive, Adrive, DC)) else: # Plot evolution of QSS vars vs Q for different amplitudes - for pvar in args['plot']: - figs.append(plotQSSVarVsQm( - pneuron, a, Fdrive, pvar, amps=args['amp'], DC=DC, - cmap=args['cmap'], zscale=args['Ascale'], mpi=args['mpi'], - loglevel=args['loglevel'])) + # for pvar in args['plot']: + # figs.append(plotQSSVarVsQm( + # pneuron, a, Fdrive, pvar, amps=args['amp'], DC=DC, + # cmap=args['cmap'], zscale=args['Ascale'], mpi=args['mpi'], + # loglevel=args['loglevel'])) # Plot equilibrium charge as a function of amplitude if 'dQdt' in args['plot']: figs.append(plotEqChargeVsAmp( pneuron, a, Fdrive, amps=args['amp'], tstim=tstim, toffset=toffset, PRF=PRF, DC=DC, xscale=args['Ascale'], compdir=args['inputdir'], mpi=args['mpi'], loglevel=args['loglevel'])) else: figs.append(plotQSSThresholdCurve( pneuron, a, Fdrive, tstim=tstim, toffset=toffset, PRF=PRF, DCs=args['DC'], Ascale=args['Ascale'], comp=args['compare'], mpi=args['mpi'], loglevel=args['loglevel'])) if args['save']: for fig in figs: s = fig.canvas.get_window_title() s = s.replace('(', '- ').replace('/', '_').replace(')', '') figname = '{}.png'.format(s) fig.savefig(os.path.join(args['outputdir'], figname), transparent=True) else: plt.show() if __name__ == '__main__': main() diff --git a/setup.py b/setup.py index f3e8591..5f9d9a8 100644 --- a/setup.py +++ b/setup.py @@ -1,49 +1,50 @@ # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Email: theo.lemaire@epfl.ch # @Date: 2017-06-13 09:40:02 # @Last Modified by: Theo Lemaire -# @Last Modified time: 2019-06-26 13:50:32 +# @Last Modified time: 2019-10-10 11:58:14 import os from setuptools import setup def readme(): with open('README.md', encoding="utf8") as f: return f.read() setup( name='PySONIC', version='1.0', description='Python implementation of the **multi-Scale Optimized Neuronal Intramembrane \ Cavitation** (SONIC) model to compute individual neural responses to acoustic \ stimuli, as predicted by the *intramembrane cavitation* hypothesis.', long_description=readme(), url='https://iopscience.iop.org/article/10.1088/1741-2552/ab1685', classifiers=[ 'Development Status :: 4 - Beta', 'Intended Audience :: Science/Research', 'Programming Language :: Python :: 3', 'Topic :: Scientific/Engineering :: Physics' ], keywords=('SONIC NICE acoustic ultrasound ultrasonic neuromodulation neurostimulation excitation\ computational model intramembrane cavitation'), author='Théo Lemaire', author_email='theo.lemaire@epfl.ch', license='MIT', packages=['PySONIC'], scripts=['scripts/{}'.format(x) for x in os.listdir('scripts')], install_requires=[ 'numpy>=1.10', 'scipy>=0.17', 'matplotlib>=2' 'pandas>=0.22.0', 'colorlog>=3.0.1', 'tqdm>=4.3', 'lockfile>=0.1.2', - 'multiprocess>=0.70' + 'multiprocess>=0.70', + 'pushbullet>=0.11.0' ], zip_safe=False )