diff --git a/PySONIC/core/batches.py b/PySONIC/core/batches.py index f90ee90..3fc69e5 100644 --- a/PySONIC/core/batches.py +++ b/PySONIC/core/batches.py @@ -1,354 +1,352 @@ # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Email: theo.lemaire@epfl.ch # @Date: 2017-08-22 14:33:04 # @Last Modified by: Theo Lemaire -# @Last Modified time: 2020-07-06 18:15:57 +# @Last Modified time: 2020-08-25 12:08:48 ''' Utility functions used in simulations ''' import os import abc import csv import logging import numpy as np import pandas as pd import multiprocess as mp from ..utils import logger, si_format, isIterable class Consumer(mp.Process): ''' Generic consumer process, taking tasks from a queue and outputing results in another queue. ''' def __init__(self, queue_in, queue_out): mp.Process.__init__(self) self.queue_in = queue_in self.queue_out = queue_out logger.debug('Starting %s', self.name) def run(self): while True: nextTask = self.queue_in.get() if nextTask is None: logger.debug('Exiting %s', self.name) self.queue_in.task_done() break answer = nextTask() self.queue_in.task_done() self.queue_out.put(answer) return class Worker: ''' Generic worker class calling a specific function with a given set of parameters. ''' def __init__(self, wid, func, args, kwargs, loglevel): ''' Worker constructor. :param wid: worker ID :param func: function object :param args: list of method arguments :param kwargs: dictionary of optional method arguments :param loglevel: logging level ''' self.id = wid self.func = func self.args = args self.kwargs = kwargs self.loglevel = loglevel def __call__(self): ''' Caller to the function with specific parameters. ''' logger.setLevel(self.loglevel) return self.id, self.func(*self.args, **self.kwargs) class Batch: ''' Generic interface to run batches of function calls. ''' def __init__(self, func, queue): ''' Batch constructor. :param func: function object :param queue: list of list of function parameters ''' self.func = func self.queue = queue def __call__(self, *args, **kwargs): ''' Call the internal run method. ''' return self.run(*args, **kwargs) def getNConsumers(self): ''' Determine number of consumers based on queue length and number of available CPUs. ''' return min(mp.cpu_count(), len(self.queue)) def start(self): ''' Create tasks and results queues, and start consumers. ''' mp.freeze_support() self.tasks = mp.JoinableQueue() self.results = mp.Queue() self.consumers = [Consumer(self.tasks, self.results) for i in range(self.getNConsumers())] for c in self.consumers: c.start() @staticmethod def resolve(params): if isinstance(params, list): args = params kwargs = {} elif isinstance(params, tuple): args, kwargs = params return args, kwargs def assign(self, loglevel): ''' Assign tasks to workers. ''' for i, params in enumerate(self.queue): args, kwargs = self.resolve(params) worker = Worker(i, self.func, args, kwargs, loglevel) self.tasks.put(worker, block=False) def join(self): ''' Put all tasks to None and join the queue. ''' for i in range(len(self.consumers)): self.tasks.put(None, block=False) self.tasks.join() def get(self): ''' Extract and re-order results. ''' outputs, idxs = [], [] for i in range(len(self.queue)): wid, out = self.results.get() outputs.append(out) idxs.append(wid) return [x for _, x in sorted(zip(idxs, outputs))] def stop(self): ''' Close tasks and results queues. ''' self.tasks.close() self.results.close() def run(self, mpi=False, loglevel=logging.INFO): ''' Run batch with or without multiprocessing. ''' if mpi: self.start() self.assign(loglevel) self.join() outputs = self.get() self.stop() else: outputs = [] for params in self.queue: args, kwargs = self.resolve(params) outputs.append(self.func(*args, **kwargs)) return outputs @staticmethod def createQueue(*dims): ''' Create a serialized 2D array of all parameter combinations for a series of individual parameter sweeps. :param dims: list of lists (or 1D arrays) of input parameters :return: list of parameters (list) for each simulation ''' ndims = len(dims) dims_in = [dims[1], dims[0]] inds_out = [1, 0] if ndims > 2: dims_in += dims[2:] inds_out += list(range(2, ndims)) queue = np.stack(np.meshgrid(*dims_in), -1).reshape(-1, ndims) queue = queue[:, inds_out] return queue.tolist() class LogBatch(metaclass=abc.ABCMeta): ''' Generic interface to a simulation batch in with real-time input:output caching in a specific log file. ''' delimiter = '\t' # csv delimiter rtol = 1e-9 atol = 1e-16 def __init__(self, inputs, root='.'): ''' Construtor. :param inputs: array of batch inputs :param root: root for IO operations ''' self.inputs = inputs self.root = root self.fpath = self.filepath() @property def root(self): return self._root @root.setter def root(self, value): if not os.path.isdir(value): raise ValueError(f'{value} is not a valid directory') self._root = value @property @abc.abstractmethod def in_key(self): ''' Input key. ''' raise NotImplementedError @property @abc.abstractmethod def out_keys(self): ''' Output keys. ''' raise NotImplementedError @property @abc.abstractmethod def suffix(self): ''' filename suffix ''' raise NotImplementedError @property @abc.abstractmethod def unit(self): ''' Input unit. ''' raise NotImplementedError @property def in_label(self): ''' Input label. ''' return f'{self.in_key} ({self.unit})' def rangecode(self, x, label, unit): ''' String describing a batch input range. ''' bounds_str = si_format([x.min(), x.max()], 1, space="") return '{0}{1}{3}-{2}{3}_{4}'.format(label.replace(' ', '_'), *bounds_str, unit, x.size) @property def inputscode(self): ''' String describing the batch inputs. ''' return self.rangecode(self.inputs, self.in_key, self.unit) @abc.abstractmethod def corecode(self): ''' String describing the batch core components. ''' raise NotImplementedError def filecode(self): ''' String fully describing the batch. ''' return f'{self.corecode()}_{self.inputscode}_{self.suffix}_results' def filename(self): ''' Batch associated filename. ''' return f'{self.filecode()}.csv' def filepath(self): ''' Batch associated filepath. ''' return os.path.join(self.root, self.filename()) def createLogFile(self): ''' Create batch log file if it does not exist. ''' if not os.path.isfile(self.fpath): logger.debug(f'creating batch log file: "{self.fpath}"') self.writeLabels() else: logger.debug(f'existing batch log file: "{self.fpath}"') def writeLabels(self): ''' Write the column labels of the batch log file. ''' with open(self.fpath, 'w') as csvfile: writer = csv.writer(csvfile, delimiter=self.delimiter) writer.writerow([self.in_label, *self.out_keys]) def writeEntry(self, entry): ''' Write a new input(s):ouput(s) entry in the batch log file. ''' with open(self.fpath, 'a', newline='') as csvfile: writer = csv.writer(csvfile, delimiter=self.delimiter) writer.writerow(entry) def getLogData(self): ''' Retrieve the batch log file data (inputs and outputs) as a dataframe. ''' return pd.read_csv(self.fpath, sep=self.delimiter).sort_values(self.in_label) def getInput(self): ''' Retrieve the logged batch inputs as an array. ''' return self.getLogData()[self.in_label].values def getSerializedOutput(self): ''' Retrieve the logged batch outputs as an array (if 1 key) or dataframe (if several). ''' if len(self.out_keys) == 1: return self.getLogData()[self.out_keys[0]].values else: return pd.DataFrame({k: self.getLogData()[k].values for k in self.out_keys}) def getOutput(self): return self.getSerializedOutput() def getEntryIndex(self, entry): ''' Get the index corresponding to a given entry. ''' inputs = self.getInput() if len(inputs) == 0: raise ValueError(f'no entries in batch') - entry = np.asarray(entry) - close_per_col = np.isclose(inputs, entry, rtol=self.rtol, atol=self.atol) - close = close_per_col.all(axis=1) + close = np.isclose(inputs, entry, rtol=self.rtol, atol=self.atol) imatches = np.where(close)[0] if len(imatches) == 0: raise ValueError(f'{entry} entry not found in batch log') elif len(imatches) > 1: raise ValueError(f'duplicate {entry} entry found in batch log') return imatches[0] def getEntryOutput(self, entry): imatch = self.getEntryIndex(entry) return self.getSerializedOutput()[imatch] def isEntry(self, value): ''' Check if a given input is logged in the batch log file. ''' try: self.getEntryIndex(value) return True except ValueError: return False @abc.abstractmethod def compute(self, x): ''' Compute the necessary output(s) for a given input. ''' raise NotImplementedError def computeAndLog(self, x): ''' Compute output(s) and log new entry only if input is not already in the log file. ''' if not self.isEntry(x): logger.debug(f'entry not found: "{x}"') out = self.compute(x) if not isIterable(x): x = [x] if not isIterable(out): out = [out] entry = [*x, *out] if not self.mpi: self.writeEntry(entry) return entry else: logger.debug(f'existing entry: "{x}"') return None def run(self, mpi=False): ''' Run the batch and return the output(s). ''' self.createLogFile() if len(self.getLogData()) < len(self.inputs): batch = Batch(self.computeAndLog, [[x] for x in self.inputs]) self.mpi = mpi outputs = batch.run(mpi=mpi, loglevel=logger.level) outputs = filter(lambda x: x is not None, outputs) if mpi: for out in outputs: self.writeEntry(out) self.mpi = False else: logger.debug('all entries already present') return self.getOutput() diff --git a/PySONIC/core/bls.py b/PySONIC/core/bls.py index 19791e8..413226c 100644 --- a/PySONIC/core/bls.py +++ b/PySONIC/core/bls.py @@ -1,797 +1,799 @@ # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Email: theo.lemaire@epfl.ch # @Date: 2016-09-29 16:16:19 # @Last Modified by: Theo Lemaire -# @Last Modified time: 2020-08-08 16:29:29 +# @Last Modified time: 2020-08-17 15:33:38 from enum import Enum import os import json import numpy as np -import pandas as pd import scipy.integrate as integrate from scipy.optimize import brentq, curve_fit from .model import Model from .lookups import EffectiveVariablesLookup from .solvers import PeriodicSolver from .drives import Drive, AcousticDrive from ..utils import logger, si_format, LOOKUP_DIR from ..constants import * class PmCompMethod(Enum): ''' Enum: types of computation method for the intermolecular pressure ''' direct = 1 predict = 2 def LennardJones(x, beta, alpha, C, m, n): ''' Generic expression of a Lennard-Jones function, adapted for the context of symmetric deflection (distance = 2x). :param x: deflection (i.e. half-distance) :param beta: x-shifting factor :param alpha: x-scaling factor :param C: y-scaling factor :param m: exponent of the repulsion term :param n: exponent of the attraction term :return: Lennard-Jones potential at given distance (2x) ''' return C * (np.power((alpha / (2 * x + beta)), m) - np.power((alpha / (2 * x + beta)), n)) def lookup(func): ''' Load parameters from lookup file, or compute them and store them in lookup file. ''' lookup_path = os.path.join(os.path.split(__file__)[0], 'bls_lookups.json') def wrapper(obj): akey = f'{obj.a * 1e9:.1f}' Qkey = f'{obj.Qm0 * 1e5:.2f}' # Open lookup files try: with open(lookup_path, 'r') as fh: lookups = json.load(fh) except FileNotFoundError: lookups = {} # If info not in lookups, compute parameters and add them if akey not in lookups or Qkey not in lookups[akey]: func(obj) if akey not in lookups: lookups[akey] = {Qkey: {'LJ_approx': obj.LJ_approx, 'Delta_eq': obj.Delta}} else: lookups[akey][Qkey] = {'LJ_approx': obj.LJ_approx, 'Delta_eq': obj.Delta} logger.debug('Saving BLS derived parameters to lookup file') with open(lookup_path, 'w') as fh: json.dump(lookups, fh, indent=2) # If lookup exists, load parameters from it else: logger.debug('Loading BLS derived parameters from lookup file') obj.LJ_approx = lookups[akey][Qkey]['LJ_approx'] obj.Delta = lookups[akey][Qkey]['Delta_eq'] return wrapper class BilayerSonophore(Model): ''' Definition of the Bilayer Sonophore Model - geometry - pressure terms - cavitation dynamics ''' # BIOMECHANICAL PARAMETERS T = 309.15 # Temperature (K) delta0 = 2.0e-9 # Thickness of the leaflet (m) Delta_ = 1.4e-9 # Initial gap between the two leaflets on a non-charged membrane at equil. (m) pDelta = 1.0e5 # Attraction/repulsion pressure coefficient (Pa) m = 5.0 # Exponent in the repulsion term (dimensionless) n = 3.3 # Exponent in the attraction term (dimensionless) rhoL = 1075.0 # Density of the surrounding fluid (kg/m^3) muL = 7.0e-4 # Dynamic viscosity of the surrounding fluid (Pa.s) muS = 0.035 # Dynamic viscosity of the leaflet (Pa.s) kA = 0.24 # Area compression modulus of the leaflet (N/m) alpha = 7.56 # Tissue shear loss modulus frequency coefficient (Pa.s) C0 = 0.62 # Initial gas molar concentration in the surrounding fluid (mol/m^3) kH = 1.613e5 # Henry's constant (Pa.m^3/mol) P0 = 1.0e5 # Static pressure in the surrounding fluid (Pa) Dgl = 3.68e-9 # Diffusion coefficient of gas in the fluid (m^2/s) xi = 0.5e-9 # Boundary layer thickness for gas transport across leaflet (m) c = 1515.0 # Speed of sound in medium (m/s) # BIOPHYSICAL PARAMETERS epsilon0 = 8.854e-12 # Vacuum permittivity (F/m) epsilonR = 1.0 # Relative permittivity of intramembrane cavity (dimensionless) rel_Zmin = -0.49 # relative deflection range lower bound (in multiples of Delta) tscale = 'us' # relevant temporal scale of the model simkey = 'MECH' # keyword used to characterize simulations made with this model def __init__(self, a, Cm0, Qm0, embedding_depth=0.0): ''' Constructor of the class. :param a: in-plane radius of the sonophore structure within the membrane (m) :param Cm0: membrane resting capacitance (F/m2) :param Qm0: membrane resting charge density (C/m2) :param embedding_depth: depth of the embedding tissue around the membrane (m) ''' # Extract resting constants and geometry self.Cm0 = Cm0 self.Qm0 = Qm0 self.a = a self.d = embedding_depth self.S0 = np.pi * self.a**2 # Initialize null elastic modulus for tissue self.kA_tissue = 0. # Compute Pm params self.computePMparams() # Compute initial volume and gas content self.V0 = np.pi * self.Delta * self.a**2 self.ng0 = self.gasPa2mol(self.P0, self.V0) def copy(self): return self.__class__(self.a, self.Cm0, self.Qm0, embedding_depth=self.d) @property def a(self): return self._a @a.setter def a(self, value): if value <= 0.: raise ValueError('Sonophore radius must be positive') self._a = value @property def Cm0(self): return self._Cm0 @Cm0.setter def Cm0(self, value): if value <= 0.: raise ValueError('Resting membrane capacitance must be positive') self._Cm0 = value @property def d(self): return self._d @d.setter def d(self, value): if value < 0.: raise ValueError('Embedding depth cannot be negative') self._d = value def __repr__(self): s = f'{self.__class__.__name__}({self.a * 1e9:.1f} nm' if self.d > 0.: s += f', d={si_format(self.d, precision=1)}m' return f'{s})' @property def meta(self): return { 'a': self.a, 'd': self.d, 'Cm0': self.Cm0, 'Qm0': self.Qm0, } @classmethod def initFromMeta(cls, d): return cls(d['a'], d['Cm0'], d['Qm0']) @staticmethod def inputs(): return { 'a': { 'desc': 'sonophore radius', 'label': 'a', 'unit': 'm', 'precision': 0 }, 'Qm': { 'desc': 'membrane charge density', 'label': 'Q_m', 'unit': 'nC/cm^2', 'factor': 1e5, 'precision': 1 }, **AcousticDrive.inputs() } def filecodes(self, drive, Qm, PmCompMethod='predict'): return { 'simkey': self.simkey, 'a': f'{self.a * 1e9:.0f}nm', **drive.filecodes, 'Qm': f'{Qm * 1e5:.1f}nCcm2' } @staticmethod def getPltVars(wl='df["', wr='"]'): return { 'Pac': { 'desc': 'acoustic pressure', 'label': 'P_{AC}', 'unit': 'kPa', 'factor': 1e-3, 'func': f'meta["drive"].compute({wl}t{wr})' }, 'Z': { 'desc': 'leaflets deflection', 'label': 'Z', 'unit': 'nm', 'factor': 1e9, 'bounds': (-1.0, 10.0) }, 'ng': { 'desc': 'gas content', 'label': 'n_g', 'unit': '10^{-22}\ mol', 'factor': 1e22, 'bounds': (1.0, 15.0) }, 'Pmavg': { 'desc': 'average intermolecular pressure', 'label': 'P_M', 'unit': 'kPa', 'factor': 1e-3, 'func': f'PMavgpred({wl}Z{wr})' }, 'Telastic': { 'desc': 'leaflet elastic tension', 'label': 'T_E', 'unit': 'mN/m', 'factor': 1e3, 'func': f'TEleaflet({wl}Z{wr})' }, 'Cm': { 'desc': 'membrane capacitance', 'label': 'C_m', 'unit': 'uF/cm^2', 'factor': 1e2, 'bounds': (0.0, 1.5), 'func': f'v_capacitance({wl}Z{wr})' } } @property def pltScheme(self): return { 'P_{AC}': ['Pac'], 'Z': ['Z'], 'n_g': ['ng'] } @property def Zmin(self): return self.rel_Zmin * self.Delta def curvrad(self, Z): ''' Leaflet curvature radius (signed variable) :param Z: leaflet apex deflection (m) :return: leaflet curvature radius (m) ''' if Z == 0.0: return np.inf else: return (self.a**2 + Z**2) / (2 * Z) def v_curvrad(self, Z): ''' Vectorized curvrad function ''' return np.array(list(map(self.curvrad, Z))) def surface(self, Z): ''' Surface area of the stretched leaflet (spherical cap formula) :param Z: leaflet apex deflection (m) :return: stretched leaflet surface (m^2) ''' return np.pi * (self.a**2 + Z**2) def volume(self, Z): ''' Volume of the inter-leaflet space (cylinder +/- 2 spherical caps) :param Z: leaflet apex deflection (m) :return: bilayer sonophore inner volume (m^3) ''' return np.pi * self.a**2 * self.Delta\ * (1 + (Z / (3 * self.Delta) * (3 + Z**2 / self.a**2))) def arealStrain(self, Z): ''' Areal strain of the stretched leaflet epsilon = (S - S0)/S0 = (Z/a)^2 :param Z: leaflet apex deflection (m) :return: areal strain (dimensionless) ''' return (Z / self.a)**2 def logRelGap(self, Z): ''' Logarithm of relative sonophore deflection for a given deflection Z. ''' return np.log((2 * Z + self.Delta) / self.Delta) def capacitance(self, Z): ''' Membrane capacitance (parallel-plate capacitor evaluated at average inter-layer distance) :param Z: leaflet apex deflection (m) :return: capacitance per unit area (F/m2) ''' if Z == 0.0: return self.Cm0 else: Z2 = (self.a**2 - Z**2 - Z * self.Delta) / (2 * Z) return self.Cm0 * self.Delta / self.a**2 * (Z + Z2 * self.logRelGap(Z)) def v_capacitance(self, Z): ''' Vectorized capacitance function ''' return np.array(list(map(self.capacitance, Z))) def derCapacitance(self, Z, U): ''' Evolution of membrane capacitance :param Z: leaflet apex deflection (m) :param U: leaflet apex deflection velocity (m/s) :return: time derivative of capacitance per unit area (F/m2.s) ''' ratio1 = (Z**2 + self.a**2) / (Z * (2 * Z + self.Delta)) ratio2 = (Z**2 + self.a**2) / (2 * Z**2) * self.logRelGap(Z) dCmdZ = self.Cm0 * self.Delta / self.a**2 * (ratio1 - ratio2) return dCmdZ * U @staticmethod def localDeflection(r, Z, R): ''' Local leaflet deflection at specific radial distance (signed) :param r: in-plane distance from center of the sonophore (m) :param Z: leaflet apex deflection (m) :param R: leaflet curvature radius (m) :return: local transverse leaflet deviation (m) ''' if np.abs(Z) == 0.0: return 0.0 else: return np.sign(Z) * (np.sqrt(R**2 - r**2) - np.abs(R) + np.abs(Z)) def PMlocal(self, r, Z, R): ''' Local intermolecular pressure :param r: in-plane distance from center of the sonophore (m) :param Z: leaflet apex deflection (m) :param R: leaflet curvature radius (m) :return: local intermolecular pressure (Pa) ''' z = self.localDeflection(r, Z, R) relgap = (2 * z + self.Delta) / self.Delta_ return self.pDelta * ((1 / relgap)**self.m - (1 / relgap)**self.n) def PMavg(self, Z, R, S): ''' Average intermolecular pressure across the leaflet (computed by quadratic integration) :param Z: leaflet apex outward deflection value (m) :param R: leaflet curvature radius (m) :param S: surface of the stretched leaflet (m^2) :return: averaged intermolecular resultant pressure (Pa) .. warning:: quadratic integration is computationally expensive. ''' # Integrate intermolecular force over an infinitely thin ring of radius r from 0 to a fTotal, _ = integrate.quad(lambda r, Z, R: 2 * np.pi * r * self.PMlocal(r, Z, R), 0, self.a, args=(Z, R)) return fTotal / S def v_PMavg(self, Z, R, S): ''' Vectorized PMavg function ''' return np.array(list(map(self.PMavg, Z, R, S))) def LJfitPMavg(self): ''' Determine optimal parameters of a Lennard-Jones expression approximating the average intermolecular pressure. These parameters are obtained by a nonlinear fit of the Lennard-Jones function for a range of deflection values between predetermined Zmin and Zmax. :return: 3-tuple with optimized LJ parameters for PmAvg prediction (Map) and the standard and max errors of the prediction in the fitting range (in Pascals) ''' # Determine lower bound of deflection range: when Pm = Pmmax PMmax = LJFIT_PM_MAX # Pa Zlb_range = (self.Zmin, 0.0) Zlb = brentq(lambda Z, Pmmax: self.PMavg(Z, self.curvrad(Z), self.surface(Z)) - PMmax, *Zlb_range, args=(PMmax), xtol=1e-16) # Create vectors for geometric variables Zub = 2 * self.a Z = np.arange(Zlb, Zub, 1e-11) Pmavg = self.v_PMavg(Z, self.v_curvrad(Z), self.surface(Z)) # Compute optimal nonlinear fit of custom LJ function with initial guess x0_guess = self.delta0 C_guess = 0.1 * self.pDelta nrep_guess = self.m nattr_guess = self.n pguess = (x0_guess, C_guess, nrep_guess, nattr_guess) popt, _ = curve_fit(lambda x, x0, C, nrep, nattr: LennardJones(x, self.Delta, x0, C, nrep, nattr), Z, Pmavg, p0=pguess, maxfev=100000) (x0_opt, C_opt, nrep_opt, nattr_opt) = popt Pmavg_fit = LennardJones(Z, self.Delta, x0_opt, C_opt, nrep_opt, nattr_opt) # Compute prediction error residuals = Pmavg - Pmavg_fit ss_res = np.sum(residuals**2) N = residuals.size std_err = np.sqrt(ss_res / N) max_err = max(np.abs(residuals)) logger.debug('LJ approx: x0 = %.2f nm, C = %.2f kPa, m = %.2f, n = %.2f', x0_opt * 1e9, C_opt * 1e-3, nrep_opt, nattr_opt) LJ_approx = {"x0": x0_opt, "C": C_opt, "nrep": nrep_opt, "nattr": nattr_opt} return (LJ_approx, std_err, max_err) @lookup def computePMparams(self): # Find Delta that cancels out Pm + Pec at Z = 0 (m) if self.Qm0 == 0.0: D_eq = self.Delta_ else: (D_eq, Pnet_eq) = self.findDeltaEq(self.Qm0) assert Pnet_eq < PNET_EQ_MAX, 'High Pnet at Z = 0 with ∆ = %.2f nm' % (D_eq * 1e9) self.Delta = D_eq # Find optimal Lennard-Jones parameters to approximate PMavg (self.LJ_approx, std_err, _) = self.LJfitPMavg() assert std_err < PMAVG_STD_ERR_MAX, 'High error in PmAvg nonlinear fit:'\ ' std_err = %.2f Pa' % std_err def PMavgpred(self, Z): ''' Approximated average intermolecular pressure (using nonlinearly fitted Lennard-Jones function) :param Z: leaflet apex deflection (m) :return: predicted average intermolecular pressure (Pa) ''' return LennardJones(Z, self.Delta, self.LJ_approx['x0'], self.LJ_approx['C'], self.LJ_approx['nrep'], self.LJ_approx['nattr']) def Pelec(self, Z, Qm): ''' Electrical pressure term :param Z: leaflet apex deflection (m) :param Qm: membrane charge density (C/m2) :return: electrical pressure (Pa) ''' relS = self.S0 / self.surface(Z) abs_perm = self.epsilon0 * self.epsilonR # F/m return - relS * Qm**2 / (2 * abs_perm) # Pa def findDeltaEq(self, Qm): ''' Compute the Delta that cancels out the (Pm + Pec) equation at Z = 0 for a given membrane charge density, using the Brent method to refine the pressure root iteratively. :param Qm: membrane charge density (C/m2) :return: equilibrium value (m) and associated pressure (Pa) ''' def dualPressure(Delta): x = (self.Delta_ / Delta) return (self.pDelta * (x**self.m - x**self.n) + self.Pelec(0.0, Qm)) Delta_eq = brentq(dualPressure, 0.1 * self.Delta_, 2.0 * self.Delta_, xtol=1e-16) logger.debug('∆eq = %.2f nm', Delta_eq * 1e9) return (Delta_eq, dualPressure(Delta_eq)) def gasFlux(self, Z, P): ''' Gas molar flux through the sonophore boundary layers :param Z: leaflet apex deflection (m) :param P: internal gas pressure (Pa) :return: gas molar flux (mol/s) ''' dC = self.C0 - P / self.kH return 2 * self.surface(Z) * self.Dgl * dC / self.xi @classmethod def gasmol2Pa(cls, ng, V): ''' Internal gas pressure for a given molar content :param ng: internal molar content (mol) :param V: sonophore inner volume (m^3) :return: internal gas pressure (Pa) ''' return ng * Rg * cls.T / V @classmethod def gasPa2mol(cls, P, V): ''' Internal gas molar content for a given pressure :param P: internal gas pressure (Pa) :param V: sonophore inner volume (m^3) :return: internal gas molar content (mol) ''' return P * V / (Rg * cls.T) def PtotQS(self, Z, ng, Qm, Pac, Pm_comp_method): ''' Net quasi-steady pressure for a given acoustic pressure (Ptot = Pm + Pg + Pec - P0 - Pac) :param Z: leaflet apex deflection (m) :param ng: internal molar content (mol) :param Qm: membrane charge density (C/m2) :param Pac: acoustic pressure (Pa) :param Pm_comp_method: computation method for average intermolecular pressure :return: total balance pressure (Pa) ''' if Pm_comp_method is PmCompMethod.direct: Pm = self.PMavg(Z, self.curvrad(Z), self.surface(Z)) elif Pm_comp_method is PmCompMethod.predict: Pm = self.PMavgpred(Z) return Pm + self.gasmol2Pa(ng, self.volume(Z)) - self.P0 - Pac + self.Pelec(Z, Qm) def balancedefQS(self, ng, Qm, Pac=0.0, Pm_comp_method=PmCompMethod.predict): ''' Quasi-steady equilibrium deflection for a given acoustic pressure (computed by approximating the root of quasi-steady pressure) :param ng: internal molar content (mol) :param Qm: membrane charge density (C/m2) :param Pac: external acoustic perturbation (Pa) :param Pm_comp_method: computation method for average intermolecular pressure :return: leaflet deflection canceling quasi-steady pressure (m) ''' Zbounds = (self.Zmin, self.a) Plb, Pub = [self.PtotQS(x, ng, Qm, Pac, Pm_comp_method) for x in Zbounds] assert (Plb > 0 > Pub), '[{}, {}] is not a sign changing interval for PtotQS'.format(*Zbounds) return brentq(self.PtotQS, *Zbounds, args=(ng, Qm, Pac, Pm_comp_method), xtol=1e-16) def TEleaflet(self, Z): ''' Elastic tension in leaflet :param Z: leaflet apex deflection (m) :return: circumferential elastic tension (N/m) ''' return self.kA * self.arealStrain(Z) def setTissueModulus(self, drive): ''' Set the frequency-dependent elastic modulus of the surrounding tissue. ''' G_tissue = self.alpha * drive.modulationFrequency # G'' (Pa) self.kA_tissue = 2 * G_tissue * self.d # kA of the tissue layer (N/m) def TEtissue(self, Z): ''' Elastic tension in surrounding viscoelastic layer :param Z: leaflet apex deflection (m) :return: circumferential elastic tension (N/m) ''' return self.kA_tissue * self.arealStrain(Z) def TEtot(self, Z): ''' Total elastic tension (leaflet + surrounding viscoelastic layer) :param Z: leaflet apex deflection (m) :return: circumferential elastic tension (N/m) ''' return self.TEleaflet(Z) + self.TEtissue(Z) def PEtot(self, Z, R): ''' Total elastic tension pressure (leaflet + surrounding viscoelastic layer) :param Z: leaflet apex deflection (m) :param R: leaflet curvature radius (m) :return: elastic tension pressure (Pa) ''' return - self.TEtot(Z) / R @classmethod def PVleaflet(cls, U, R): ''' Viscous stress pressure in leaflet :param U: leaflet apex deflection velocity (m/s) :param R: leaflet curvature radius (m) :return: leaflet viscous stress pressure (Pa) ''' return - 12 * U * cls.delta0 * cls.muS / R**2 @classmethod def PVfluid(cls, U, R): ''' Viscous stress pressure in surrounding medium :param U: leaflet apex deflection velocity (m/s) :param R: leaflet curvature radius (m) :return: fluid viscous stress pressure (Pa) ''' return - 4 * U * cls.muL / np.abs(R) @classmethod def accP(cls, Ptot, R): ''' Leaflet transverse acceleration resulting from pressure imbalance :param Ptot: net pressure (Pa) :param R: leaflet curvature radius (m) :return: pressure-driven acceleration (m/s^2) ''' return Ptot / (cls.rhoL * np.abs(R)) @staticmethod def accNL(U, R): ''' Leaflet transverse nonlinear acceleration :param U: leaflet apex deflection velocity (m/s) :param R: leaflet curvature radius (m) :return: nonlinear acceleration term (m/s^2) .. note:: A simplified version of nonlinear acceleration (neglecting dR/dH) is used here. ''' # return - (3/2 - 2*R/H) * U**2 / R return -(3 * U**2) / (2 * R) @staticmethod def checkInputs(drive, Qm, Pm_comp_method): ''' Check validity of stimulation parameters :param drive: acoustic drive object :param Qm: imposed membrane charge density (C/m2) :param Pm_comp_method: type of method used to compute average intermolecular pressure ''' if not isinstance(drive, Drive): raise TypeError(f'Invalid "drive" parameter (must be an "Drive" object)') if not isinstance(Qm, float): raise TypeError(f'Invalid "Qm" parameter (must be float typed)') Qmin, Qmax = CHARGE_RANGE if Qm < Qmin or Qm > Qmax: raise ValueError( f'Invalid applied charge: {Qm * 1e5} nC/cm2 (must be within [{Qmin * 1e5}, {Qmax * 1e5}] interval') if not isinstance(Pm_comp_method, PmCompMethod): raise TypeError('Invalid Pm computation method (must be "PmCompmethod" type)') def derivatives(self, t, y, drive, Qm, Pm_comp_method=PmCompMethod.predict): ''' Evolution of the mechanical system :param t: time instant (s) :param y: vector of HH system variables at time t :param drive: acoustic drive object :param Qm: membrane charge density (F/m2) :param Pm_comp_method: computation method for average intermolecular pressure :return: vector of mechanical system derivatives at time t ''' # Split input vector explicitly U, Z, ng = y # Correct deflection value is below critical compression if Z < self.Zmin: logger.warning('Deflection out of range: Z = %.2f nm', Z * 1e9) Z = self.Zmin # Compute curvature radius R = self.curvrad(Z) # Compute total pressure Pg = self.gasmol2Pa(ng, self.volume(Z)) if Pm_comp_method is PmCompMethod.direct: Pm = self.PMavg(Z, self.curvrad(Z), self.surface(Z)) elif Pm_comp_method is PmCompMethod.predict: Pm = self.PMavgpred(Z) Pac = drive.compute(t) Pv = self.PVleaflet(U, R) + self.PVfluid(U, R) Ptot = Pm + Pg - self.P0 - Pac + self.PEtot(Z, R) + Pv + self.Pelec(Z, Qm) # Compute derivatives dUdt = self.accP(Ptot, R) + self.accNL(U, R) dZdt = U dngdt = self.gasFlux(Z, Pg) # Return derivatives vector return [dUdt, dZdt, dngdt] def computeInitialDeflection(self, drive, Qm, dt, Pm_comp_method=PmCompMethod.predict): ''' Compute non-zero deflection value for a small perturbation (solving quasi-steady equation). ''' Pac = drive.compute(dt) return self.balancedefQS(self.ng0, Qm, Pac, Pm_comp_method) @classmethod @Model.checkOutputDir def simQueue(cls, freqs, amps, charges, **kwargs): drives = AcousticDrive.createQueue(freqs, amps) queue = [] for drive in drives: for Qm in charges: queue.append([drive, Qm]) return queue def initialConditions(self, *args, **kwargs): ''' Compute simulation initial conditions. ''' # Compute initial non-zero deflection Z = self.computeInitialDeflection(*args, **kwargs) # Return initial conditions dictionary return { 'U': [0.] * 2, 'Z': [0., Z], 'ng': [self.ng0] * 2, } def simCycles(self, drive, Qm, nmax=None, nmin=None, Pm_comp_method=PmCompMethod.predict): ''' Simulate for a specific number of cycles or until periodic stabilization, for a specific set of ultrasound parameters, and return output data in a dataframe. :param drive: acoustic drive object :param Qm: imposed membrane charge density (C/m2) :param n: number of cycles (optional) :param Pm_comp_method: type of method used to compute average intermolecular pressure :return: output dataframe ''' # Set the tissue elastic modulus self.setTissueModulus(drive) # Compute initial conditions y0 = self.initialConditions(drive, Qm, drive.dt, Pm_comp_method=Pm_comp_method) # Initialize solver and compute solution solver = PeriodicSolver( drive.periodicity, # periodicty y0.keys(), # variables list lambda t, y: self.derivatives(t, y, drive, Qm, Pm_comp_method), # dfunc primary_vars=['Z', 'ng'], # primary variables dt=drive.dt # time step ) data = solver(y0, nmax=nmax, nmin=nmin) # Remove velocity timeries from solution del data['U'] # Return solution dataframe return data @Model.addMeta @Model.logDesc @Model.checkSimParams def simulate(self, drive, Qm, Pm_comp_method=PmCompMethod.predict): ''' Wrapper around the simUntilConvergence method, with decorators. ''' return self.simCycles(drive, Qm, Pm_comp_method=Pm_comp_method) def desc(self, meta): return f'{self}: simulation @ {meta["drive"].desc}, Q = {si_format(meta["Qm"] * 1e-4, 2)}C/cm2' @Model.logDesc - def getRelCmCycle(self, drive, Qm): + def getZlast(self, drive, Qm): + ''' Run simulation and extract deflection vector from last cycle. ''' + return self.simCycles(drive, Qm).tail(NPC_DENSE)['Z'].values # m + + def getRelCmCycle(self, *args, **kwargs): ''' Run simulation and extract relative capacitance vector from last cycle. ''' - Z_last = self.simCycles(drive, Qm).tail(NPC_DENSE)['Z'].values # m - return self.v_capacitance(Z_last) / self.Cm0 + return self.v_capacitance(self.getZlast(*args, **kwargs)) / self.Cm0 @property def Cm_lkp_filename(self): return f'Cm_lkp_{self.a * 1e9:.0f}nm.lkp' @property def Cm_lkp_filepath(self): return os.path.join(LOOKUP_DIR, self.Cm_lkp_filename) @property def Cm_lkp(self): return EffectiveVariablesLookup.fromPickle(self.Cm_lkp_filepath) def getGammaLookup(self): return self.Cm_lkp.reduce(lambda x, **kwargs: np.ptp(x, **kwargs) / 2, 't') diff --git a/PySONIC/core/bls_lookups.json b/PySONIC/core/bls_lookups.json index 738f3d3..8a1d5bc 100644 --- a/PySONIC/core/bls_lookups.json +++ b/PySONIC/core/bls_lookups.json @@ -1,1062 +1,1178 @@ { "32.0": { "-80.00": { "LJ_approx": { "x0": 1.7875580514692446e-09, "C": 14506.791031634148, "nrep": 3.911252335063797, "nattr": 0.9495868868453603 }, "Delta_eq": 1.2344323203763867e-09 }, "-71.40": { "LJ_approx": { "x0": 1.710159362626304e-09, "C": 16757.44053535206, "nrep": 3.9149844779001572, "nattr": 0.9876139143736086 }, "Delta_eq": 1.2566584760815426e-09 }, "-54.00": { "LJ_approx": { "x0": 1.588820457014353e-09, "C": 21124.28839722447, "nrep": 3.9219530533405726, "nattr": 1.0531179666960837 }, "Delta_eq": 1.302942739961778e-09 }, "-71.90": { "LJ_approx": { "x0": 1.7142977983903395e-09, "C": 16627.43538695451, "nrep": 3.9147721975981384, "nattr": 0.9855168537576823 }, "Delta_eq": 1.2553492695740507e-09 }, "-89.50": { "LJ_approx": { "x0": 1.8913883171160228e-09, "C": 12016.525797229067, "nrep": 3.9069373029335464, "nattr": 0.9021994595277029 }, "Delta_eq": 1.2107230911508513e-09 }, "-61.93": { "LJ_approx": { "x0": 1.6390264131559902e-09, "C": 19180.97634755811, "nrep": 3.9188840789597705, "nattr": 1.0250251620607604 }, "Delta_eq": 1.281743450351987e-09 }, "-53.00": { "LJ_approx": { "x0": 1.5830321174402216e-09, "C": 21361.655211483354, "nrep": 3.9223254792281588, "nattr": 1.0564645995714745 }, "Delta_eq": 1.305609024046854e-09 }, "-53.58": { "LJ_approx": { "x0": 1.5863754403940291e-09, "C": 21224.206759769622, "nrep": 3.922109852077156, "nattr": 1.0545313303221624 }, "Delta_eq": 1.3040630712174578e-09 }, "-48.87": { "LJ_approx": { "x0": 1.5603070731170595e-09, "C": 22321.280954434333, "nrep": 3.9238276518118833, "nattr": 1.0698008224359472 }, "Delta_eq": 1.3165739825437056e-09 }, "0.00": { "LJ_approx": { "x0": 1.429523524073023e-09, "C": 28748.036227122713, "nrep": 3.9338919786768276, "nattr": 1.1551044201542804 }, "Delta_eq": 1.4e-09 }, "-70.00": { "LJ_approx": { "x0": 1.698788510560293e-09, "C": 17120.631318195712, "nrep": 3.915575488491436, "nattr": 0.9934238714780391 }, "Delta_eq": 1.2603339470322538e-09 }, "-58.00": { "LJ_approx": { "x0": 1.6131662659976035e-09, "C": 20156.47605325608, "nrep": 3.9204295233162925, "nattr": 1.0393049952285334 }, "Delta_eq": 1.2922508866011204e-09 }, "-140.00": { "LJ_approx": { "x0": 3.5396589580589484e-09, "C": 1255.8321160636908, "nrep": 3.879907809497444, "nattr": 0.4190657482583384 }, "Delta_eq": 1.1019265101358682e-09 }, "-200.00": { "LJ_approx": { "x0": 5.467498689940948e-09, "C": 298.4575230665454, "nrep": 3.8382806376855165, "nattr": 0.0014805372073950717 }, "Delta_eq": 1.0050623818936587e-09 }, "-60.00": { "LJ_approx": { "x0": 1.6260780786690872e-09, "C": 19662.79538138057, "nrep": 3.9196487714944097, "nattr": 1.0321270951610706 }, "Delta_eq": 1.2869009570089227e-09 }, "-160.00": { "LJ_approx": { "x0": 5.77921444789264e-09, "C": 204.15927774362973, "nrep": 3.866389064592209, "nattr": 0.03773979793348341 }, "Delta_eq": 1.0662975449878705e-09 }, "10.00": { "LJ_approx": { "x0": 1.4352403492312405e-09, "C": 28434.36006445571, "nrep": 3.9333944930338163, "nattr": 1.1510029210218344 }, "Delta_eq": 1.3954197265639115e-09 }, "20.00": { "LJ_approx": { "x0": 1.4521735572363723e-09, "C": 27522.575366234458, "nrep": 3.93195491401856, "nattr": 1.1390890902608632 }, "Delta_eq": 1.3824571964886577e-09 }, "-50.00": { "LJ_approx": { "x0": 1.5663532171069699e-09, "C": 22061.611190291416, "nrep": 3.9234216613472546, "nattr": 1.0662169394628622 }, "Delta_eq": 1.313576328857944e-09 } }, "64.0": { "-80.00": { "LJ_approx": { "x0": 1.783357531675752e-09, "C": 14639.319598806138, "nrep": 3.9113027551187414, "nattr": 0.9404151935643594 }, "Delta_eq": 1.2344323203763867e-09 }, "-71.90": { "LJ_approx": { "x0": 1.7103254451796522e-09, "C": 16775.90747591089, "nrep": 3.9148582072320104, "nattr": 0.9747613204242506 }, "Delta_eq": 1.2553492695740507e-09 }, "-71.40": { "LJ_approx": { "x0": 1.7061996856525807e-09, "C": 16906.878806702443, "nrep": 3.9150725853841957, "nattr": 0.976778398349503 }, "Delta_eq": 1.2566584760815426e-09 }, "-54.00": { "LJ_approx": { "x0": 1.585264156646392e-09, "C": 21303.176047683613, "nrep": 3.92211004079812, "nattr": 1.0402641595550777 }, "Delta_eq": 1.302942739961778e-09 }, "-89.50": { "LJ_approx": { "x0": 1.886870747500225e-09, "C": 12129.260307725155, "nrep": 3.906945602966425, "nattr": 0.895598309376088 }, "Delta_eq": 1.2107230911508513e-09 }, "-61.93": { "LJ_approx": { "x0": 1.635297932833928e-09, "C": 19347.359224637305, "nrep": 3.9190113341505843, "nattr": 1.0129039638328117 }, "Delta_eq": 1.281743450351987e-09 }, "-58.00": { "LJ_approx": { "x0": 1.6095250707781465e-09, "C": 20329.27848623273, "nrep": 3.92057191136993, "nattr": 1.0267862446034561 }, "Delta_eq": 1.2922508866011204e-09 }, "-140.00": { "LJ_approx": { "x0": 3.5398097121754107e-09, "C": 1255.8690004347025, "nrep": 3.8798490490747404, "nattr": 0.4604604439108636 }, "Delta_eq": 1.1019265101358682e-09 }, "-200.00": { "LJ_approx": { "x0": 6.6005456899992844e-09, "C": 144.85407475420143, "nrep": 3.838295127819493, "nattr": 0.00992375961984237 }, "Delta_eq": 1.0050623818936587e-09 }, "-60.00": { "LJ_approx": { "x0": 1.6223930024686038e-09, "C": 19832.380892638503, "nrep": 3.9197835635468294, "nattr": 1.019801114668753 }, "Delta_eq": 1.2869009570089227e-09 }, "-160.00": { "LJ_approx": { "x0": 6.33567252974036e-09, "C": 143.09307106178315, "nrep": 3.866387420765763, "nattr": 0.08081951236969916 }, "Delta_eq": 1.0662975449878705e-09 } }, "50.0": { "-71.90": { "LJ_approx": { "x0": 1.7114794411958874e-09, "C": 16732.841575829825, "nrep": 3.914827883392826, "nattr": 0.9781401389550711 }, "Delta_eq": 1.2553492695740507e-09 } }, "10.0": { "0.00": { "LJ_approx": { "x0": 1.4403460578039628e-09, "C": 27932.27792195569, "nrep": 3.9334138654752686, "nattr": 1.19526523864855 }, "Delta_eq": 1.4e-09 }, "-71.90": { "LJ_approx": { "x0": 1.7286986021591825e-09, "C": 16087.514816365254, "nrep": 3.9147885683678543, "nattr": 1.012616990226475 }, "Delta_eq": 1.2553492695740507e-09 } }, "100.0": { "0.00": { "LJ_approx": { "x0": 1.4254455131143225e-09, "C": 29048.417918044444, "nrep": 3.9342659189249254, "nattr": 1.1351227816904121 }, "Delta_eq": 1.4e-09 }, "-71.90": { "LJ_approx": { "x0": 1.7087681652667724e-09, "C": 16833.83962398515, "nrep": 3.914908533680663, "nattr": 0.9697102045586926 }, "Delta_eq": 1.2553492695740507e-09 }, "-71.40": { "LJ_approx": { "x0": 1.7046480280451768e-09, "C": 16965.15489682674, "nrep": 3.9151238997284845, "nattr": 0.9716928395857687 }, "Delta_eq": 1.2566584760815426e-09 }, "-54.00": { "LJ_approx": { "x0": 1.5838767580748841e-09, "C": 21372.412565003375, "nrep": 3.922191259312523, "nattr": 1.0344167918733638 }, "Delta_eq": 1.302942739961778e-09 }, "-89.50": { "LJ_approx": { "x0": 1.8850831984948754e-09, "C": 12173.857567383837, "nrep": 3.906959887367545, "nattr": 0.8923154523792497 }, "Delta_eq": 1.2107230911508513e-09 }, "-61.93": { "LJ_approx": { "x0": 1.6338405482612552e-09, "C": 19411.96069791924, "nrep": 3.9190798895919405, "nattr": 1.0073171165886772 }, "Delta_eq": 1.281743450351987e-09 } }, "500.0": { "0.00": { "LJ_approx": { "x0": 1.4236928207491738e-09, "C": 29174.423851140436, "nrep": 3.934489087598531, "nattr": 1.1244706663694597 }, "Delta_eq": 1.4e-09 } }, "15.0": { "-71.90": { "LJ_approx": { "x0": 1.722207527516432e-09, "C": 16331.124295102756, "nrep": 3.914721476976037, "nattr": 1.0021304453280393 }, "Delta_eq": 1.2553492695740507e-09 }, "-71.40": { "LJ_approx": { "x0": 1.7180439454408102e-09, "C": 16459.15520614834, "nrep": 3.9149304238974905, "nattr": 1.0043742068193204 }, "Delta_eq": 1.2566584760815426e-09 }, "-54.00": { "LJ_approx": { "x0": 1.5959361190370466e-09, "C": 20763.823187183425, "nrep": 3.921785737782814, "nattr": 1.0737976563473925 }, "Delta_eq": 1.302942739961778e-09 }, "-89.50": { "LJ_approx": { "x0": 1.9002701433896942e-09, "C": 11795.576706463997, "nrep": 3.9070059150621814, "nattr": 0.9114123498082551 }, "Delta_eq": 1.2107230911508513e-09 }, "-61.93": { "LJ_approx": { "x0": 1.646473044478369e-09, "C": 18846.9803104403, "nrep": 3.918766624746869, "nattr": 1.044220870098494 }, "Delta_eq": 1.281743450351987e-09 } }, "70.0": { "-61.93": { "LJ_approx": { "x0": 1.6349587829329923e-09, "C": 19362.426097728276, "nrep": 3.9190260877993097, "nattr": 1.0116609492531905 }, "Delta_eq": 1.281743450351987e-09 }, "-71.90": { "LJ_approx": { "x0": 1.7099628637265945e-09, "C": 16789.420291577666, "nrep": 3.9148688185890483, "nattr": 0.9736446481917082 }, "Delta_eq": 1.2553492695740507e-09 }, "-71.40": { "LJ_approx": { "x0": 1.7058388214243274e-09, "C": 16920.455606556396, "nrep": 3.9150834388621805, "nattr": 0.9756530742858303 }, "Delta_eq": 1.2566584760815426e-09 }, "-54.00": { "LJ_approx": { "x0": 1.584940743805642e-09, "C": 21319.35643207058, "nrep": 3.9221277053335264, "nattr": 1.0389567310289958 }, "Delta_eq": 1.302942739961778e-09 }, "-89.50": { "LJ_approx": { "x0": 1.886457143504779e-09, "C": 12139.584658299475, "nrep": 3.9069481854501382, "nattr": 0.8948872453823127 }, "Delta_eq": 1.2107230911508513e-09 } }, "150.0": { "-71.90": { "LJ_approx": { "x0": 1.707781542586275e-09, "C": 16870.340248462282, "nrep": 3.914948339378328, "nattr": 0.9660711186661354 }, "Delta_eq": 1.2553492695740507e-09 }, "-71.40": { "LJ_approx": { "x0": 1.7036651779798684e-09, "C": 17001.86272239105, "nrep": 3.9151643485118117, "nattr": 0.9680342060844059 }, "Delta_eq": 1.2566584760815426e-09 }, "-54.00": { "LJ_approx": { "x0": 1.5830041112065094e-09, "C": 21415.64649760579, "nrep": 3.9222512220871013, "nattr": 1.0303387653647968 }, "Delta_eq": 1.302942739961778e-09 }, "-89.50": { "LJ_approx": { "x0": 1.883936689519767e-09, "C": 12202.390459945682, "nrep": 3.906974649816042, "nattr": 0.8898102498996743 }, "Delta_eq": 1.2107230911508513e-09 }, "-61.93": { "LJ_approx": { "x0": 1.6329212539031057e-09, "C": 19452.44141782297, "nrep": 3.9191317215599946, "nattr": 1.0033715654432673 }, "Delta_eq": 1.281743450351987e-09 } }, "16.0": { "-71.90": { "LJ_approx": { "x0": 1.721337196662225e-09, "C": 16363.738563915058, "nrep": 3.9147202446411637, "nattr": 1.0005179992350384 }, "Delta_eq": 1.2553492695740507e-09 }, "-71.40": { "LJ_approx": { "x0": 1.717176984027311e-09, "C": 16491.97282711717, "nrep": 3.9149293522242252, "nattr": 1.0027563589061772 }, "Delta_eq": 1.2566584760815426e-09 }, "-54.00": { "LJ_approx": { "x0": 1.5951507107307484e-09, "C": 20803.713978702526, "nrep": 3.921796023871708, "nattr": 1.0717457519944718 }, "Delta_eq": 1.302942739961778e-09 }, "-89.50": { "LJ_approx": { "x0": 1.899300769652515e-09, "C": 11819.650350288994, "nrep": 3.906993085458305, "nattr": 0.9105930969008011 }, "Delta_eq": 1.2107230911508513e-09 }, "-61.93": { "LJ_approx": { "x0": 1.6456524054221337e-09, "C": 18883.851022856303, "nrep": 3.918771854603072, "nattr": 1.042336408142498 }, "Delta_eq": 1.281743450351987e-09 }, "-58.00": { "LJ_approx": { "x0": 1.6196423302303851e-09, "C": 19847.346684716063, "nrep": 3.920294617611314, "nattr": 1.0573210567487794 }, "Delta_eq": 1.2922508866011204e-09 }, "-140.00": { "LJ_approx": { "x0": 3.536300349187651e-09, "C": 1259.968827699725, "nrep": 3.8800144975132485, "nattr": 0.35722933384459793 }, "Delta_eq": 1.1019265101358682e-09 }, "-200.00": { "LJ_approx": { "x0": 4.448077576688761e-09, "C": 658.9502615105886, "nrep": 3.8382596040618573, "nattr": -0.0007412836249291593 }, "Delta_eq": 1.0050623818936587e-09 }, "-60.00": { "LJ_approx": { "x0": 1.6326306076113028e-09, "C": 19359.641562109642, "nrep": 3.9195252093199606, "nattr": 1.0498037816897123 }, "Delta_eq": 1.2869009570089227e-09 }, "-160.00": { "LJ_approx": { "x0": 4.9755404141516864e-09, "C": 364.2006750890508, "nrep": 3.866403743827545, "nattr": 0.007733297873632889 }, "Delta_eq": 1.0662975449878705e-09 } }, "40.0": { "-71.90": { "LJ_approx": { "x0": 1.7127556130633909e-09, "C": 16685.139617894773, "nrep": 3.9147997833590855, "nattr": 0.9816110605284186 }, "Delta_eq": 1.2553492695740507e-09 }, "-140.00": { "LJ_approx": { "x0": 3.5400899114625196e-09, "C": 1255.3328685760946, "nrep": 3.8798858604365565, "nattr": 0.4340973480775988 }, "Delta_eq": 1.1019265101358682e-09 } }, "30.0": { "-71.90": { "LJ_approx": { "x0": 1.7147994934556977e-09, "C": 16608.65261608246, "nrep": 3.914764637335829, "nattr": 0.9867268079339511 }, "Delta_eq": 1.2553492695740507e-09 } }, "25.4": { "-71.90": { "LJ_approx": { "x0": 1.7162265501918752e-09, "C": 16555.217050637588, "nrep": 3.9147464050871856, "nattr": 0.9900398844251959 }, "Delta_eq": 1.2553492695740507e-09 }, "-61.93": { "LJ_approx": { "x0": 1.6408385667642563e-09, "C": 19099.831853142834, "nrep": 3.9188405082474103, "nattr": 1.0301853851264013 }, "Delta_eq": 1.281743450351987e-09 } }, "40.3": { "-71.90": { "LJ_approx": { "x0": 1.7127058661258177e-09, "C": 16686.9995037205, "nrep": 3.914800799246736, "nattr": 0.981479342025535 }, "Delta_eq": 1.2553492695740507e-09 }, "-61.93": { "LJ_approx": { "x0": 1.637531858311385e-09, "C": 19247.790954282773, "nrep": 3.918928365198691, "nattr": 1.020450016688262 }, "Delta_eq": 1.281743450351987e-09 } }, "45.0": { "-71.90": { "LJ_approx": { "x0": 1.712051746586677e-09, "C": 16711.456749369456, "nrep": 3.914814642254888, "nattr": 0.979726839958776 }, "Delta_eq": 1.2553492695740507e-09 } }, "20.0": { "-71.90": { "LJ_approx": { "x0": 1.7186388439609698e-09, "C": 16464.85454836168, "nrep": 3.9147266088229755, "nattr": 0.9952322612710219 }, "Delta_eq": 1.2553492695740507e-09 }, "-140.00": { "LJ_approx": { "x0": 3.5381843063817075e-09, "C": 1257.5756585425543, "nrep": 3.8799713679048966, "nattr": 0.38013951841061877 }, "Delta_eq": 1.1019265101358682e-09 } }, "60.0": { "-71.90": { "LJ_approx": { "x0": 1.710603828012074e-09, "C": 16765.5278159216, "nrep": 3.9148503824940146, "nattr": 0.9756024890579372 }, "Delta_eq": 1.2553492695740507e-09 } }, "34.0": { "-71.90": { "LJ_approx": { "x0": 1.7138494069983225e-09, "C": 16644.21766668786, "nrep": 3.9147795488410644, "nattr": 0.9844103642751335 }, "Delta_eq": 1.2553492695740507e-09 } }, "22.6": { "-71.90": { "LJ_approx": { "x0": 1.717347083819261e-09, "C": 16513.244775760173, "nrep": 3.914735582752492, "nattr": 0.9925083846044295 }, "Delta_eq": 1.2553492695740507e-09 }, "-54.00": { "LJ_approx": { "x0": 1.5915583742563881e-09, "C": 20985.87279990122, "nrep": 3.9218681449692334, "nattr": 1.06169568520175 }, "Delta_eq": 1.302942739961778e-09 }, "-71.40": { "LJ_approx": { "x0": 1.7131877388482784e-09, "C": 16642.92622648039, "nrep": 3.9149463907912256, "nattr": 0.9946488696557423 }, "Delta_eq": 1.2566584760815426e-09 + }, + "-60.00": { + "LJ_approx": { + "x0": 1.628901317783889e-09, + "C": 19532.337508931167, + "nrep": 3.919579518949732, + "nattr": 1.0402552692356268 + }, + "Delta_eq": 1.2869009570089227e-09 + }, + "-140.00": { + "LJ_approx": { + "x0": 3.5390881309033025e-09, + "C": 1256.4204047014866, + "nrep": 3.8799520356860526, + "nattr": 0.39131995763279265 + }, + "Delta_eq": 1.1019265101358682e-09 } }, "45.3": { "-71.90": { "LJ_approx": { "x0": 1.7120143529753677e-09, "C": 16712.8535932463, "nrep": 3.9148154954695285, "nattr": 0.9796234512533043 }, "Delta_eq": 1.2553492695740507e-09 }, "-71.40": { "LJ_approx": { "x0": 1.7078889917152291e-09, "C": 16843.200319018524, "nrep": 3.9150287560913037, "nattr": 0.9816938577273149 }, "Delta_eq": 1.2566584760815426e-09 }, "-54.00": { "LJ_approx": { "x0": 1.5867800612156331e-09, "C": 21227.103472649964, "nrep": 3.922035517445887, "nattr": 1.0460446908119716 }, "Delta_eq": 1.302942739961778e-09 + }, + "-60.00": { + "LJ_approx": { + "x0": 1.62396414640836e-09, + "C": 19760.217524700623, + "nrep": 3.919718969584019, + "nattr": 1.025354155943396 + }, + "Delta_eq": 1.2869009570089227e-09 + }, + "-140.00": { + "LJ_approx": { + "x0": 3.539937183762741e-09, + "C": 1255.5886167735225, + "nrep": 3.8798748897058664, + "nattr": 0.4417757690630956 + }, + "Delta_eq": 1.1019265101358682e-09 } }, "31.0": { "0.00": { "LJ_approx": { "x0": 1.429704934686545e-09, "C": 28734.55271989346, "nrep": 3.9338783401809607, "nattr": 1.155904475115303 }, "Delta_eq": 1.4e-09 }, "-71.90": { "LJ_approx": { "x0": 1.714533452342314e-09, "C": 16618.61152256, "nrep": 3.9147686086169227, "nattr": 0.9860860917336786 }, "Delta_eq": 1.2553492695740507e-09 } }, "31.1": { "-71.90": { "LJ_approx": { "x0": 1.714515996985078e-09, "C": 16619.26565583038, "nrep": 3.914768856863524, "nattr": 0.986044873313721 }, "Delta_eq": 1.2553492695740507e-09 } }, "45.2": { "-71.90": { "LJ_approx": { "x0": 1.7120203335936594e-09, "C": 16712.630620283762, "nrep": 3.914815347514677, "nattr": 0.9796407085930723 }, "Delta_eq": 1.2553492695740507e-09 } }, "28.0": { "-140.00": { "LJ_approx": { "x0": 3.539662733628618e-09, "C": 1255.7594429416959, "nrep": 3.8799232005747073, "nattr": 0.4091144934965918 }, "Delta_eq": 1.1019265101358682e-09 } }, "52.0": { "-140.00": { "LJ_approx": { "x0": 3.540099825001411e-09, "C": 1255.4092689381769, "nrep": 3.8798640679267495, "nattr": 0.44958440372614994 }, "Delta_eq": 1.1019265101358682e-09 } }, "48.0": { "-140.00": { "LJ_approx": { "x0": 3.540113162797765e-09, "C": 1255.365483546012, "nrep": 3.8798702909870157, "nattr": 0.4451084636623491 }, "Delta_eq": 1.1019265101358682e-09 } }, "25.0": { "-140.00": { "LJ_approx": { "x0": 3.5394591974100957e-09, "C": 1255.9739311868518, "nrep": 3.8799379083844014, "nattr": 0.3998616563003167 }, "Delta_eq": 1.1019265101358682e-09 } }, "24.0": { "-140.00": { "LJ_approx": { "x0": 3.539294542800419e-09, "C": 1256.1755733964683, "nrep": 3.8799434079572483, "nattr": 0.3965246264708772 }, "Delta_eq": 1.1019265101358682e-09 } }, "26.0": { "-71.90": { "LJ_approx": { "x0": 1.716013742853656e-09, "C": 16563.186031095847, "nrep": 3.914748825378261, "nattr": 0.9895571841402089 }, "Delta_eq": 1.2553492695740507e-09 } }, "47.8": { "-71.90": { "LJ_approx": { "x0": 1.7117134574481139e-09, "C": 16724.09882321637, "nrep": 3.914822340960349, "nattr": 0.9787950821308133 }, "Delta_eq": 1.2553492695740507e-09 } }, "45.9": { "-71.90": { "LJ_approx": { "x0": 1.711942249735064e-09, "C": 16715.54886943138, "nrep": 3.9148171027544123, "nattr": 0.9794265342377567 }, "Delta_eq": 1.2553492695740507e-09 } }, "30.3": { "-71.90": { "LJ_approx": { "x0": 1.7147276912050609e-09, "C": 16611.3405217478, "nrep": 3.9147656933151262, "nattr": 0.9865543051062363 }, "Delta_eq": 1.2553492695740507e-09 } }, "43.4": { "-71.90": { "LJ_approx": { "x0": 1.7122593582582404e-09, "C": 16703.696731600092, "nrep": 3.914810082243474, "nattr": 0.9802910111364295 }, "Delta_eq": 1.2553492695740507e-09 } }, "59.7": { "-71.90": { "LJ_approx": { "x0": 1.710624810423359e-09, "C": 16764.744721099407, "nrep": 3.914849818351629, "nattr": 0.9756645210872293 }, "Delta_eq": 1.2553492695740507e-09 } }, "31.6": { "-71.90": { "LJ_approx": { "x0": 1.7144029387800298e-09, "C": 16623.497279426527, "nrep": 3.914770610799719, "nattr": 0.9857697612214258 }, "Delta_eq": 1.2553492695740507e-09 } }, "36.3": { "-71.90": { "LJ_approx": { "x0": 1.7133999698436411e-09, "C": 16661.0351629387, "nrep": 3.9147874692503435, "nattr": 0.9832772435848539 }, "Delta_eq": 1.2553492695740507e-09 } }, "55.7": { "-71.90": { "LJ_approx": { "x0": 1.710943143434582e-09, "C": 16752.866849028967, "nrep": 3.914841318636611, "nattr": 0.9766031714646571 }, "Delta_eq": 1.2553492695740507e-09 } }, "48.5": { "-71.90": { "LJ_approx": { "x0": 1.7116397026523852e-09, "C": 16726.853954116246, "nrep": 3.9148240832062964, "nattr": 0.9785886152529253 }, "Delta_eq": 1.2553492695740507e-09 } }, "42.2": { "-71.90": { "LJ_approx": { "x0": 1.712423436257706e-09, "C": 16697.560003586437, "nrep": 3.9148066437354734, "nattr": 0.980728415714794 }, "Delta_eq": 1.2553492695740507e-09 } }, "27.9": { "-71.90": { "LJ_approx": { "x0": 1.7154107379662085e-09, "C": 16585.76539172651, "nrep": 3.914756263081378, "nattr": 0.9881670164800336 }, "Delta_eq": 1.2553492695740507e-09 } }, "36.8": { "-71.90": { "LJ_approx": { "x0": 1.7133062675842822e-09, "C": 16664.541652779935, "nrep": 3.9147891722285952, "nattr": 0.9830389492355903 }, "Delta_eq": 1.2553492695740507e-09 } }, "24.3": { "-71.90": { "LJ_approx": { "x0": 1.7166571039691342e-09, "C": 16539.090593408535, "nrep": 3.91474189560473, "nattr": 0.9910014451873015 }, "Delta_eq": 1.2553492695740507e-09 } }, "80.0": { "-61.93": { "LJ_approx": { "x0": 1.6344991861630834e-09, "C": 19382.81542285848, "nrep": 3.9190471576123405, "nattr": 1.0099259239851026 }, "Delta_eq": 1.281743450351987e-09 } }, "22.0": { "-71.90": { "LJ_approx": { "x0": 1.7176213744119541e-09, "C": 16502.97011663288, "nrep": 3.914733365777537, "nattr": 0.9930970282708068 }, "Delta_eq": 1.2553492695740507e-09 } + }, + "19.0": { + "-140.00": { + "LJ_approx": { + "x0": 3.538195370787953e-09, + "C": 1257.5181416698033, + "nrep": 3.8799803529404473, + "nattr": 0.3751818971097499 + }, + "Delta_eq": 1.1019265101358682e-09 + }, + "-60.00": { + "LJ_approx": { + "x0": 1.6306387251839888e-09, + "C": 19451.924578003734, + "nrep": 3.9195492110397634, + "nattr": 1.044849990047988 + }, + "Delta_eq": 1.2869009570089227e-09 + } + }, + "26.9": { + "-140.00": { + "LJ_approx": { + "x0": 3.539727231107012e-09, + "C": 1255.6480708546228, + "nrep": 3.8799282928053556, + "nattr": 0.4058993120242667 + }, + "Delta_eq": 1.1019265101358682e-09 + }, + "-60.00": { + "LJ_approx": { + "x0": 1.6273891268762361e-09, + "C": 19602.249957838972, + "nrep": 3.919613427792109, + "nattr": 1.036012897728662 + }, + "Delta_eq": 1.2869009570089227e-09 + } + }, + "38.1": { + "-140.00": { + "LJ_approx": { + "x0": 3.5400586489408017e-09, + "C": 1255.3566217126063, + "nrep": 3.879890425738439, + "nattr": 0.430934989812783 + }, + "Delta_eq": 1.1019265101358682e-09 + }, + "-60.00": { + "LJ_approx": { + "x0": 1.6249433421918718e-09, + "C": 19715.129234709653, + "nrep": 3.919684249948236, + "nattr": 1.0285800353668955 + }, + "Delta_eq": 1.2869009570089227e-09 + } + }, + "53.8": { + "-140.00": { + "LJ_approx": { + "x0": 3.5396853039780393e-09, + "C": 1255.9914460682548, + "nrep": 3.879861170245694, + "nattr": 0.45153280631087883 + }, + "Delta_eq": 1.1019265101358682e-09 + }, + "-60.00": { + "LJ_approx": { + "x0": 1.6231198755124936e-09, + "C": 19799.031166909957, + "nrep": 3.919752144403552, + "nattr": 1.02243757862762 + }, + "Delta_eq": 1.2869009570089227e-09 + } } } \ No newline at end of file diff --git a/PySONIC/core/pneuron.py b/PySONIC/core/pneuron.py index 58a1859..23585d4 100644 --- a/PySONIC/core/pneuron.py +++ b/PySONIC/core/pneuron.py @@ -1,595 +1,596 @@ # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Email: theo.lemaire@epfl.ch # @Date: 2017-08-03 11:53:04 # @Last Modified by: Theo Lemaire -# @Last Modified time: 2020-08-05 14:46:40 +# @Last Modified time: 2020-08-28 17:14:49 import abc import inspect import numpy as np from .protocols import * from .model import Model from .lookups import EffectiveVariablesLookup from .solvers import EventDrivenSolver from .drives import Drive, ElectricDrive from ..postpro import detectSpikes, computeFRProfile from ..constants import * from ..utils import * class PointNeuron(Model): ''' Generic point-neuron model interface. ''' tscale = 'ms' # relevant temporal scale of the model simkey = 'ESTIM' # keyword used to characterize simulations made with this model celsius = 36.0 # Temperature (Celsius) T = celsius + CELSIUS_2_KELVIN def __repr__(self): return self.__class__.__name__ def copy(self): return self.__class__() def __eq__(self, other): if not isinstance(other, PointNeuron): return False return self.name == other.name @property @classmethod @abc.abstractmethod def name(cls): ''' Neuron name. ''' raise NotImplementedError @property @classmethod @abc.abstractmethod def Cm0(cls): ''' Neuron's resting capacitance (F/m2). ''' raise NotImplementedError @property @classmethod @abc.abstractmethod def Vm0(cls): ''' Neuron's resting membrane potential(mV). ''' raise NotImplementedError @property def Qm0(self): return self.Cm0 * self.Vm0 * 1e-3 # C/m2 @property def tau_pas(self): ''' Passive membrane time constant (s). ''' return self.Cm0 / self.gLeak @property def meta(self): return {'neuron': self.name} @staticmethod def inputs(): return ElectricDrive.inputs() @classmethod def filecodes(cls, drive, pp): return { 'simkey': cls.simkey, 'neuron': cls.name, 'nature': pp.nature, **drive.filecodes, **pp.filecodes } @classmethod def normalizedQm(cls, Qm): ''' Compute membrane charge density normalized by resting capacitance. :param Qm: membrane charge density (Q/m2) :return: normalized charge density (mV) ''' return Qm / cls.Cm0 * 1e3 @classmethod def getPltVars(cls, wl='df["', wr='"]'): pltvars = { 'Qm': { 'desc': 'membrane charge density', 'label': 'Q_m', 'unit': 'nC/cm^2', 'factor': 1e5, 'bounds': ((cls.Vm0 - 20.0) * cls.Cm0 * 1e2, 60) }, 'Qm/Cm0': { 'desc': 'membrane charge density over resting capacitance', 'label': 'Q_m / C_{m0}', 'unit': 'mV', 'bounds': (-150, 70), 'func': f"normalizedQm({wl}Qm{wr})" }, 'Vm': { 'desc': 'membrane potential', 'label': 'V_m', 'unit': 'mV', 'bounds': (-150, 70) }, 'ELeak': { 'constant': 'obj.ELeak', 'desc': 'non-specific leakage current resting potential', 'label': 'V_{leak}', 'unit': 'mV', 'ls': '--', 'color': 'k' } } for cname in cls.getCurrentsNames(): cfunc = getattr(cls, cname) cargs = inspect.getargspec(cfunc)[0][1:] pltvars[cname] = { 'desc': inspect.getdoc(cfunc).splitlines()[0], 'label': f'I_{{{cname[1:]}}}', 'unit': 'A/m^2', 'factor': 1e-3, 'func': f"{cname}({', '.join([f'{wl}{a}{wr}' for a in cargs])})" } for var in cargs: if var != 'Vm': pltvars[var] = { 'desc': cls.states[var], 'label': var, 'bounds': (-0.1, 1.1) } pltvars['iNet'] = { 'desc': inspect.getdoc(getattr(cls, 'iNet')).splitlines()[0], 'label': 'I_{net}', 'unit': 'A/m^2', 'factor': 1e-3, 'func': f'iNet({wl}Vm{wr}, {wl[:-1]}{cls.statesNames()}{wr[1:]})', 'ls': '--', 'color': 'black' } pltvars['dQdt'] = { 'desc': inspect.getdoc(getattr(cls, 'dQdt')).splitlines()[0], 'label': 'dQ_m/dt', 'unit': 'A/m^2', 'factor': 1e-3, 'func': f'dQdt({wl}t{wr}, {wl}Qm{wr})', 'ls': '--', 'color': 'black' } pltvars['iax'] = { 'desc': inspect.getdoc(getattr(cls, 'iax')).splitlines()[0], 'label': 'i_{ax}', 'unit': 'A/m^2', 'factor': 1e-3, # 'func': f'iax({wl}t{wr}, {wl}Qm{wr}, {wl}Vm{wr}, {wl[:-1]}{cls.statesNames()}{wr[1:]})', 'ls': '--', 'color': 'black', - 'bounds': (-1e2, 1e2) + # 'bounds': (-1e2, 1e2) } pltvars['iCap'] = { 'desc': inspect.getdoc(getattr(cls, 'iCap')).splitlines()[0], 'label': 'I_{cap}', 'unit': 'A/m^2', 'factor': 1e-3, 'func': f'iCap({wl}t{wr}, {wl}Vm{wr})' } for rate in cls.rates: if 'alpha' in rate: prefix, suffix = 'alpha', rate[5:] else: prefix, suffix = 'beta', rate[4:] pltvars[rate] = { 'label': '\\{}_{{{}}}'.format(prefix, suffix), 'unit': 'ms^{-1}', 'factor': 1e-3 } pltvars['FR'] = { 'desc': 'riring rate', 'label': 'FR', 'unit': 'Hz', 'factor': 1e0, # 'bounds': (0, 1e3), 'func': f'firingRateProfile({wl[:-2]})' } return pltvars @classmethod def iCap(cls, t, Vm): ''' Capacitive current. ''' dVdt = np.insert(np.diff(Vm) / np.diff(t), 0, 0.) return cls.Cm0 * dVdt @property def pltScheme(self): pltscheme = { 'Q_m': ['Qm'], 'V_m': ['Vm'] } pltscheme['I'] = self.getCurrentsNames() + ['iNet'] for cname in self.getCurrentsNames(): if 'Leak' not in cname: key = f'i_{{{cname[1:]}}}\ kin.' cargs = inspect.getargspec(getattr(self, cname))[0][1:] pltscheme[key] = [var for var in cargs if var not in ['Vm', 'Cai']] return pltscheme @classmethod def statesNames(cls): ''' Return a list of names of all state variables of the model. ''' return list(cls.states.keys()) @classmethod @abc.abstractmethod def derStates(cls): ''' Dictionary of states derivatives functions ''' raise NotImplementedError @classmethod def getDerStates(cls, Vm, states): ''' Compute states derivatives array given a membrane potential and states dictionary ''' return np.array([cls.derStates()[k](Vm, states) for k in cls.statesNames()]) @classmethod @abc.abstractmethod def steadyStates(cls): ''' Return a dictionary of steady-states functions ''' raise NotImplementedError @classmethod def getSteadyStates(cls, Vm): ''' Compute array of steady-states for a given membrane potential ''' return np.array([cls.steadyStates()[k](Vm) for k in cls.statesNames()]) @classmethod def getDerEffStates(cls, lkp, states): ''' Compute effective states derivatives array given lookups and states dictionaries. ''' return np.array([cls.derEffStates()[k](lkp, states) for k in cls.statesNames()]) @classmethod def getEffRates(cls, Vm): ''' Compute array of effective rate constants for a given membrane potential vector. ''' return {k: np.mean(np.vectorize(v)(Vm)) for k, v in cls.effRates().items()} def getLookup(self): ''' Get lookup of membrane potential rate constants interpolated along the neuron's charge physiological range. ''' logger.debug(f'generating {self} baseline lookup') Qmin, Qmax = expandRange(*self.Qbounds, exp_factor=10.) Qref = np.arange(Qmin, Qmax, 1e-5) # C/m2 Vref = Qref / self.Cm0 * 1e3 # mV tables = {k: np.vectorize(v)(Vref) for k, v in self.effRates().items()} return EffectiveVariablesLookup({'Q': Qref}, {'V': Vref, **tables}) @classmethod @abc.abstractmethod def currents(cls): ''' Dictionary of ionic currents functions (returning current densities in mA/m2) ''' @classmethod def iNet(cls, Vm, states): ''' net membrane current :param Vm: membrane potential (mV) :param states: states of ion channels gating and related variables :return: current per unit area (mA/m2) ''' return sum([cfunc(Vm, states) for cfunc in cls.currents().values()]) @classmethod - def dQdt(cls, t, Qm): + def dQdt(cls, t, Qm, pad='right'): ''' membrane charge density variation rate :param t: time vector (s) :param Qm: membrane charge density vector (C/m2) :return: variation rate vector (mA/m2) ''' - return padright(np.diff(Qm) / np.diff(t)) * 1e3 # mA/m2 + dQdt = np.diff(Qm) / np.diff(t) * 1e3 # mA/m2 + return {'left': padleft, 'right': padright}[pad](dQdt) @classmethod def iax(cls, t, Qm, Vm, states): ''' axial current density (computed as sum of charge variation and net membrane ionic current) :param t: time vector (s) :param Qm: membrane charge density vector (C/m2) :param Vm: membrane potential (mV) :param states: states of ion channels gating and related variables :return: axial current density (mA/m2) ''' return cls.iNet(Vm, states) + cls.dQdt(t, Qm) @classmethod def titrationFunc(cls, *args, **kwargs): ''' Default titration function. ''' return cls.isExcited(*args, **kwargs) @staticmethod def currentToConcentrationRate(z_ion, depth): ''' Compute the conversion factor from a specific ionic current (in mA/m2) into a variation rate of submembrane ion concentration (in M/s). :param: z_ion: ion valence :param depth: submembrane depth (m) :return: conversion factor (Mmol.m-1.C-1) ''' return 1e-6 / (z_ion * depth * FARADAY) @staticmethod def nernst(z_ion, Cion_in, Cion_out, T): ''' Nernst potential of a specific ion given its intra and extracellular concentrations. :param z_ion: ion valence :param Cion_in: intracellular ion concentration :param Cion_out: extracellular ion concentration :param T: temperature (K) :return: ion Nernst potential (mV) ''' return (Rg * T) / (z_ion * FARADAY) * np.log(Cion_out / Cion_in) * 1e3 @staticmethod def vtrap(x, y): ''' Generic function used to compute rate constants. ''' return x / (np.exp(x / y) - 1) @staticmethod def efun(x): ''' Generic function used to compute rate constants. ''' return x / (np.exp(x) - 1) @classmethod def ghkDrive(cls, Vm, Z_ion, Cion_in, Cion_out, T): ''' Use the Goldman-Hodgkin-Katz equation to compute the electrochemical driving force of a specific ion species for a given membrane potential. :param Vm: membrane potential (mV) :param Cin: intracellular ion concentration (M) :param Cout: extracellular ion concentration (M) :param T: temperature (K) :return: electrochemical driving force of a single ion particle (mC.m-3) ''' x = Z_ion * FARADAY * Vm / (Rg * T) * 1e-3 # [-] eCin = Cion_in * cls.efun(-x) # M eCout = Cion_out * cls.efun(x) # M return FARADAY * (eCin - eCout) * 1e6 # mC/m3 @classmethod def xBG(cls, Vref, Vm): ''' Compute dimensionless Borg-Graham ratio for a given voltage. :param Vref: reference voltage membrane (mV) :param Vm: membrane potential (mV) :return: dimensionless ratio ''' return (Vm - Vref) * FARADAY / (Rg * cls.T) * 1e-3 # [-] @classmethod def alphaBG(cls, alpha0, zeta, gamma, Vref, Vm): ''' Compute the activation rate constant for a given voltage and temperature, using a Borg-Graham formalism. :param alpha0: pre-exponential multiplying factor :param zeta: effective valence of the gating particle :param gamma: normalized position of the transition state within the membrane :param Vref: membrane voltage at which alpha = alpha0 (mV) :param Vm: membrane potential (mV) :return: rate constant (in alpha0 units) ''' return alpha0 * np.exp(-zeta * gamma * cls.xBG(Vref, Vm)) @classmethod def betaBG(cls, beta0, zeta, gamma, Vref, Vm): ''' Compute the inactivation rate constant for a given voltage and temperature, using a Borg-Graham formalism. :param beta0: pre-exponential multiplying factor :param zeta: effective valence of the gating particle :param gamma: normalized position of the transition state within the membrane :param Vref: membrane voltage at which beta = beta0 (mV) :param Vm: membrane potential (mV) :return: rate constant (in beta0 units) ''' return beta0 * np.exp(zeta * (1 - gamma) * cls.xBG(Vref, Vm)) @classmethod def getCurrentsNames(cls): return list(cls.currents().keys()) @staticmethod def firingRateProfile(*args, **kwargs): return computeFRProfile(*args, **kwargs) @property def Qbounds(self): ''' Determine bounds of membrane charge physiological range for a given neuron. ''' return np.array([np.round(self.Vm0 - 25.0), 50.0]) * self.Cm0 * 1e-3 # C/m2 @classmethod def isVoltageGated(cls, state): ''' Determine whether a given state is purely voltage-gated or not.''' return f'alpha{state.lower()}' in cls.rates @classmethod @Model.checkOutputDir def simQueue(cls, amps, durations, offsets, PRFs, DCs, **kwargs): ''' Create a serialized 2D array of all parameter combinations for a series of individual parameter sweeps. :param amps: list (or 1D-array) of acoustic amplitudes :param durations: list (or 1D-array) of stimulus durations :param offsets: list (or 1D-array) of stimulus offsets (paired with durations array) :param PRFs: list (or 1D-array) of pulse-repetition frequencies :param DCs: list (or 1D-array) of duty cycle values :return: list of parameters (list) for each simulation ''' if amps is None: amps = [None] drives = ElectricDrive.createQueue(amps) protocols = PulsedProtocol.createQueue(durations, offsets, PRFs, DCs) queue = [] for drive in drives: for pp in protocols: queue.append([drive, pp]) return queue @classmethod @Model.checkOutputDir def simQueueBurst(cls, amps, durations, PRFs, DCs, BRFs, nbursts, **kwargs): if amps is None: amps = [None] drives = ElectricDrive.createQueue(amps) protocols = BurstProtocol.createQueue(durations, PRFs, DCs, BRFs, nbursts) queue = [] for drive in drives: for pp in protocols: queue.append([drive, pp]) return queue @staticmethod def checkInputs(drive, pp): ''' Check validity of electrical stimulation parameters. :param drive: electric drive object :param pp: pulse protocol object ''' if not isinstance(drive, Drive): raise TypeError(f'Invalid "drive" parameter (must be an "Drive" object)') if not isinstance(pp, TimeProtocol): raise TypeError('Invalid time protocol (must be "TimeProtocol" instance)') def chooseTimeStep(self): ''' Determine integration time step based on intrinsic temporal properties. ''' return DT_EFFECTIVE @classmethod def derivatives(cls, t, y, Cm=None, drive=None): ''' Compute system derivatives for a given membrane capacitance and injected current. :param t: specific instant in time (s) :param y: vector of HH system variables at time t :param Cm: membrane capacitance (F/m2) :param Iinj: injected current (mA/m2) :return: vector of system derivatives at time t ''' if Cm is None: Cm = cls.Cm0 Qm, *states = y Vm = Qm / Cm * 1e3 # mV states_dict = dict(zip(cls.statesNames(), states)) dQmdt = - cls.iNet(Vm, states_dict) # mA/m2 if drive is not None: dQmdt += drive.compute(t) dQmdt *= 1e-3 # A/m2 # dQmdt = (Iinj - cls.iNet(Vm, states_dict)) * 1e-3 # A/m2 return [dQmdt, *cls.getDerStates(Vm, states_dict)] @Model.logNSpikes @Model.checkTitrate @Model.addMeta @Model.logDesc @Model.checkSimParams def simulate(self, drive, pp): ''' Simulate a specific neuron model for a set of simulation parameters, and return output data in a dataframe. :param drive: electric drive object :param pp: pulse protocol object :return: output DataFrame ''' # Set initial conditions y0 = { 'Qm': self.Qm0, **{k: self.steadyStates()[k](self.Vm0) for k in self.statesNames()} } # Initialize solver and compute solution solver = EventDrivenSolver( lambda x: setattr(solver.drive, 'xvar', drive.xvar * x), # eventfunc y0.keys(), # variables lambda t, y: self.derivatives(t, y, drive=solver.drive), # dfunc event_params={'drive': drive.copy().updatedX(0.)}, # event parameters dt=self.chooseTimeStep()) # time step data = solver(y0, pp.stimEvents(), pp.tstop) # Add Vm timeries to solution data = addColumn(data, 'Vm', data['Qm'].values / self.Cm0 * 1e3, preceding_key='Qm') # Return solution dataframe return data def desc(self, meta): return f'{self}: simulation @ {meta["drive"].desc}, {meta["pp"].desc}' @staticmethod def getNSpikes(data): ''' Compute number of spikes in charge profile of simulation output. :param data: dataframe containing output time series :return: number of detected spikes ''' return detectSpikes(data)[0].size @staticmethod def getStabilizationValue(data): ''' Determine stabilization value from the charge profile of a simulation output. :param data: dataframe containing output time series :return: charge stabilization value (or np.nan if no stabilization detected) ''' # Extract charge signal posterior to observation window t, Qm = [data[key].values for key in ['t', 'Qm']] if t.max() <= TMIN_STABILIZATION: raise ValueError('solution length is too short to assess stabilization') Qm = Qm[t > TMIN_STABILIZATION] # Compute variation range Qm_range = np.ptp(Qm) logger.debug('%.2f nC/cm2 variation range over the last %.0f ms, Qmf = %.2f nC/cm2', Qm_range * 1e5, TMIN_STABILIZATION * 1e3, Qm[-1] * 1e5) # Return final value only if stabilization is detected if np.ptp(Qm) < QSS_Q_DIV_THR: return Qm[-1] else: return np.nan @classmethod def isExcited(cls, data): ''' Determine if neuron is excited from simulation output. :param data: dataframe containing output time series :return: boolean stating whether neuron is excited or not ''' return cls.getNSpikes(data) > 0 @classmethod def isSilenced(cls, data): ''' Determine if neuron is silenced from simulation output. :param data: dataframe containing output time series :return: boolean stating whether neuron is silenced or not ''' return not np.isnan(cls.getStabilizationValue(data)) def getArange(self, drive): return drive.xvar_range diff --git a/PySONIC/plt/pltutils.py b/PySONIC/plt/pltutils.py index 5609251..b7f374c 100644 --- a/PySONIC/plt/pltutils.py +++ b/PySONIC/plt/pltutils.py @@ -1,488 +1,491 @@ # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Email: theo.lemaire@epfl.ch # @Date: 2017-08-21 14:33:36 # @Last Modified by: Theo Lemaire -# @Last Modified time: 2020-08-06 17:49:09 +# @Last Modified time: 2020-08-24 12:47:51 ''' Useful functions to generate plots. ''' import re import numpy as np import pandas as pd import matplotlib from matplotlib.patches import Polygon, Rectangle from matplotlib import cm, colors import matplotlib.pyplot as plt from ..core import getModel from ..utils import * # Matplotlib parameters matplotlib.rcParams['pdf.fonttype'] = 42 matplotlib.rcParams['ps.fonttype'] = 42 matplotlib.rcParams['font.family'] = 'arial' def getSymmetricCmap(cmap_key): cmap = plt.get_cmap(cmap_key) cl = np.vstack((cmap.colors, cmap.reversed().colors)) return colors.LinearSegmentedColormap.from_list(f'sym_{cmap_key}', cl) for k in ['viridis', 'plasma', 'inferno', 'magma', 'cividis']: for cmap_key in [k, f'{k}_r']: sym_cmap = getSymmetricCmap(cmap_key) plt.register_cmap(name=sym_cmap.name, cmap=sym_cmap) def cm2inch(*tupl): inch = 2.54 if isinstance(tupl[0], tuple): return tuple(i / inch for i in tupl[0]) else: return tuple(i / inch for i in tupl) def extractPltVar(model, pltvar, df, meta=None, nsamples=0, name=''): if 'func' in pltvar: s = pltvar['func'] if not s.startswith('meta'): s = f'model.{s}' try: var = eval(s) except AttributeError as err: if hasattr(model, 'pneuron'): var = eval(s.replace('model', 'model.pneuron')) else: raise err elif 'key' in pltvar: var = df[pltvar['key']] elif 'constant' in pltvar: var = eval(pltvar['constant']) * np.ones(nsamples) else: var = df[name] if isinstance(var, pd.Series): var = var.values var = var.copy() if var.size == nsamples - 1: var = np.insert(var, 0, var[0]) var *= pltvar.get('factor', 1) return var def setGrid(n, ncolmax=3): ''' Determine number of rows and columns in figure grid, based on number of variables to plot. ''' if n <= ncolmax: return (1, n) else: return ((n - 1) // ncolmax + 1, ncolmax) def setNormalizer(cmap, bounds, scale='lin'): norm = { 'lin': colors.Normalize, 'log': colors.LogNorm, 'symlog': colors.SymLogNorm }[scale](*bounds) sm = cm.ScalarMappable(norm=norm, cmap=cmap) sm._A = [] return norm, sm class GenericPlot: def __init__(self, outputs): ''' Constructor. :param outputs: list / generator of simulation outputs ''' try: iter(outputs) except TypeError: outputs = [outputs] self.outputs = outputs def __call__(self, *args, **kwargs): return self.render(*args, **kwargs) def figtitle(self, model, meta): return model.desc(meta) @staticmethod def wraptitle(ax, title, maxwidth=120, sep=':', fs=10, y=1.0): if len(title) > maxwidth: title = '\n'.join(title.split(sep)) y = 0.94 h = ax.set_title(title, fontsize=fs) h.set_y(y) @staticmethod def getData(entry, frequency=1, trange=None): if entry is None: raise ValueError('non-existing data') if isinstance(entry, str): data, meta = loadData(entry, frequency) else: data, meta = entry data = data.iloc[::frequency] if trange is not None: tmin, tmax = trange data = data.loc[(data['t'] >= tmin) & (data['t'] <= tmax)] return data, meta def render(self, *args, **kwargs): raise NotImplementedError @staticmethod def getSimType(fname): ''' Get sim type from filename. ''' mo = re.search('(^[A-Z]*)_(.*).pkl', fname) if not mo: raise ValueError(f'Could not find sim-key in filename: "{fname}"') return mo.group(1) @staticmethod def getModel(*args, **kwargs): return getModel(*args, **kwargs) @staticmethod def getTimePltVar(tscale): ''' Return time plot variable for a given temporal scale. ''' return { 'desc': 'time', 'label': 'time', 'unit': tscale, 'factor': {'ms': 1e3, 'us': 1e6}[tscale], 'onset': {'ms': 1e-3, 'us': 1e-6}[tscale] } @staticmethod def createBackBone(*args, **kwargs): raise NotImplementedError @staticmethod def prettify(ax, xticks=None, yticks=None, xfmt='{:.0f}', yfmt='{:+.0f}'): try: ticks = ax.get_ticks() ticks = (min(ticks), max(ticks)) ax.set_ticks(ticks) ax.set_ticklabels([xfmt.format(x) for x in ticks]) except AttributeError: if xticks is None: xticks = ax.get_xticks() xticks = (min(xticks), max(xticks)) if yticks is None: yticks = ax.get_yticks() yticks = (min(yticks), max(yticks)) ax.set_xticks(xticks) ax.set_yticks(yticks) if xfmt is not None: ax.set_xticklabels([xfmt.format(x) for x in xticks]) if yfmt is not None: ax.set_yticklabels([yfmt.format(y) for y in yticks]) @staticmethod def addInset(fig, ax, inset): ''' Create inset axis. ''' inset_ax = fig.add_axes(ax.get_position()) inset_ax.set_zorder(1) inset_ax.set_xlim(inset['xlims'][0], inset['xlims'][1]) inset_ax.set_ylim(inset['ylims'][0], inset['ylims'][1]) inset_ax.set_xticks([]) inset_ax.set_yticks([]) inset_ax.add_patch(Rectangle((inset['xlims'][0], inset['ylims'][0]), inset['xlims'][1] - inset['xlims'][0], inset['ylims'][1] - inset['ylims'][0], color='w')) return inset_ax @staticmethod def materializeInset(ax, inset_ax, inset): ''' Materialize inset with zoom boox. ''' # Re-position inset axis axpos = ax.get_position() left, right, = rescale(inset['xcoords'], ax.get_xlim()[0], ax.get_xlim()[1], axpos.x0, axpos.x0 + axpos.width) bottom, top, = rescale(inset['ycoords'], ax.get_ylim()[0], ax.get_ylim()[1], axpos.y0, axpos.y0 + axpos.height) inset_ax.set_position([left, bottom, right - left, top - bottom]) for i in inset_ax.spines.values(): i.set_linewidth(2) # Materialize inset target region with contour frame ax.plot(inset['xlims'], [inset['ylims'][0]] * 2, linestyle='-', color='k') ax.plot(inset['xlims'], [inset['ylims'][1]] * 2, linestyle='-', color='k') ax.plot([inset['xlims'][0]] * 2, inset['ylims'], linestyle='-', color='k') ax.plot([inset['xlims'][1]] * 2, inset['ylims'], linestyle='-', color='k') # Link target and inset with dashed lines if possible if inset['xcoords'][1] < inset['xlims'][0]: ax.plot([inset['xcoords'][1], inset['xlims'][0]], [inset['ycoords'][0], inset['ylims'][0]], linestyle='--', color='k') ax.plot([inset['xcoords'][1], inset['xlims'][0]], [inset['ycoords'][1], inset['ylims'][1]], linestyle='--', color='k') elif inset['xcoords'][0] > inset['xlims'][1]: ax.plot([inset['xcoords'][0], inset['xlims'][1]], [inset['ycoords'][0], inset['ylims'][0]], linestyle='--', color='k') ax.plot([inset['xcoords'][0], inset['xlims'][1]], [inset['ycoords'][1], inset['ylims'][1]], linestyle='--', color='k') else: logger.warning('Inset x-coordinates intersect with those of target region') def postProcess(self, *args, **kwargs): raise NotImplementedError @staticmethod def removeSpines(ax): for item in ['top', 'right']: ax.spines[item].set_visible(False) @staticmethod def setXTicks(ax, xticks=None): if xticks is not None: ax.set_xticks(xticks) @staticmethod def setYTicks(ax, yticks=None): if yticks is not None: ax.set_yticks(yticks) @staticmethod def setTickLabelsFontSize(ax, fs): for tick in ax.xaxis.get_major_ticks() + ax.yaxis.get_major_ticks(): tick.label.set_fontsize(fs) @staticmethod def setXLabel(ax, xplt, fs): ax.set_xlabel('$\\rm {}\ ({})$'.format(xplt["label"], xplt["unit"]), fontsize=fs) @staticmethod def setYLabel(ax, yplt, fs): ax.set_ylabel('$\\rm {}\ ({})$'.format(yplt["label"], yplt.get("unit", "")), fontsize=fs) @classmethod def addCmap(cls, fig, cmap, handles, comp_values, comp_info, fs, prettify, zscale='lin'): if all(isinstance(x, str) for x in comp_values): # If list of strings, assume that index suffixes can be extracted prefix, suffixes = extractCommonPrefix(comp_values) comp_values = [int(s) for s in suffixes] desc_str = f'{prefix}\ index' else: # Rescale comparison values and adjust unit comp_values = np.asarray(comp_values) * comp_info.get('factor', 1.) comp_factor, comp_prefix = getSIpair(comp_values, scale=zscale) comp_values /= comp_factor comp_info['unit'] = comp_prefix + comp_info['unit'] desc_str = comp_info["desc"].replace(" ", "\ ") if len(comp_info['unit']) > 0: desc_str = f"{desc_str}\ ({comp_info['unit']})" nvalues = len(comp_values) # Create colormap and normalizer try: mymap = plt.get_cmap(cmap) except ValueError: mymap = plt.get_cmap(swapFirstLetterCase(cmap)) norm, sm = setNormalizer(mymap, (min(comp_values), max(comp_values)), zscale) # Extract and adjust line colors zcolors = sm.to_rgba(comp_values) for lh, c in zip(handles, zcolors): if isIterable(lh): for item in lh: item.set_color(c) else: lh.set_color(c) # Add colorbar fig.subplots_adjust(left=0.1, right=0.8, bottom=0.15, top=0.95, hspace=0.5) cbarax = fig.add_axes([0.85, 0.15, 0.03, 0.8]) cbar_kwargs = {} if all(isinstance(x, int) for x in comp_values): - bounds = np.arange(nvalues + 1) + min(comp_values) - 0.5 - ticks = bounds[:-1] + 0.5 - if nvalues > 10: - ticks = [ticks[0], ticks[-1]] - cbar_kwargs.update({'ticks': ticks, 'boundaries': bounds, 'format': '%1i'}) - cbarax.tick_params(axis='both', which='both', length=0) + dx = np.diff(comp_values) + if all(x == dx[0] for x in dx): + dx = dx[0] + ticks = comp_values + bounds = np.hstack((ticks, [max(ticks) + dx])) - dx / 2 + if nvalues > 10: + ticks = [ticks[0], ticks[-1]] + cbar_kwargs.update({'ticks': ticks, 'boundaries': bounds, 'format': '%1i'}) + cbarax.tick_params(axis='both', which='both', length=0) cbar = fig.colorbar(sm, cax=cbarax, **cbar_kwargs) fig.sm = sm # add scalar mappable as figure attribute in case of future need cbarax.set_ylabel(f'$\\rm {desc_str}$', fontsize=fs) if prettify: cls.prettify(cbar) for item in cbarax.get_yticklabels(): item.set_fontsize(fs) class ComparativePlot(GenericPlot): def __init__(self, outputs, varname): ''' Constructor. :param outputs: list /generator of simulation outputs to be compared. :param varname: name of variable to extract and compare. ''' super().__init__(outputs) self.varname = varname self.comp_ref_key = None self.meta_ref = None self.comp_info = None self.is_unique_comp = False def checkLabels(self, labels): if labels is not None: if not isIterable(labels): raise TypeError('Invalid labels: must be an iterable') if not all(isinstance(x, str) for x in labels): raise TypeError('Invalid labels: must be string typed') def checkSimType(self, meta): ''' Check consistency of sim types across files. ''' if meta['simkey'] != self.meta_ref['simkey']: raise ValueError('Invalid comparison: different simulation types') def checkCompValues(self, meta, comp_values): ''' Check consistency of differing values across files. ''' # Get differing values across meta dictionaries diffs = differing(self.meta_ref, meta, subdkey='meta') # Check that only one value differs if len(diffs) > 1: logger.warning('More than one differing inputs') self.comp_ref_key = None return [] # Get the key and differing values zkey, refval, val = diffs[0] # If no comparison key yet, fill it up if self.comp_ref_key is None: self.comp_ref_key = zkey self.is_unique_comp = True comp_values += [refval, val] # Otherwise, check that comparison matches the existing one else: if zkey != self.comp_ref_key: logger.warning('inconsistent differing inputs') self.comp_ref_key = None return [] else: comp_values.append(val) return comp_values def checkConsistency(self, meta, comp_values): ''' Check consistency of sim types and check differing inputs. ''' if self.meta_ref is None: self.meta_ref = meta else: self.checkSimType(meta) comp_values = self.checkCompValues(meta, comp_values) if self.comp_ref_key is None: self.is_unique_comp = False return comp_values def getCompLabels(self, comp_values): if self.comp_info is not None: comp_values = np.array(comp_values) * self.comp_info.get('factor', 1) if 'unit' in self.comp_info: p = self.comp_info.get('precision', 0) comp_values = [f"{si_format(v, p)}{self.comp_info['unit']}".replace(' ', '\ ') for v in comp_values] comp_labels = ['$\\rm{} = {}$'.format(self.comp_info['label'], x) for x in comp_values] else: comp_labels = comp_values return comp_values, comp_labels def chooseLabels(self, labels, comp_labels, full_labels): if labels is not None: return labels else: if self.is_unique_comp: return comp_labels else: return full_labels @staticmethod def getCommonLabel(lbls, seps='_'): ''' Get a common label from a list of labels, by removing parts that differ across them. ''' # Split every label according to list of separator characters, and save splitters as well splt_lbls = [re.split(f'([{seps}])', x) for x in lbls] pieces = [x[::2] for x in splt_lbls] splitters = [x[1::2] for x in splt_lbls] ncomps = len(pieces[0]) # Assert that splitters are equivalent across all labels, and reduce them to a single array assert (x == x[0] for x in splitters), 'Inconsistent splitters' splitters = np.array(splitters[0]) # Transform pieces into 2D matrix, and evaluate equality of every piece across labels pieces = np.array(pieces).T all_identical = [np.all(x == x[0]) for x in pieces] if np.sum(all_identical) < ncomps - 1: logger.warning('More than one differing inputs') return '' # Discard differing pieces and remove associated splitters pieces = pieces[all_identical, 0] splitters = splitters[all_identical[:-1]] # Remove last splitter if the last pieces were discarded if splitters.size == pieces.size: splitters = splitters[:-1] # Join common pieces and associated splitters into a single label common_lbl = '' for p, s in zip(pieces, splitters): common_lbl += f'{p}{s}' common_lbl += pieces[-1] return common_lbl.replace('( ', '(') def addExcitationInset(ax, is_excited): ''' Add text inset on axis stating excitation status. ''' ax.text( 0.7, 0.7, f'{"" if is_excited else "not "}excited', transform=ax.transAxes, ha='center', va='center', size=30, bbox=dict( boxstyle='round', fc=(0.8, 1.0, 0.8) if is_excited else (1., 0.8, 0.8) )) def mirrorProp(org, new, prop): ''' Mirror an instance property onto another instance of the same class. ''' getattr(new, f'set_{prop}')(getattr(org, f'get_{prop}')()) def mirrorAxis(org_ax, new_ax, p=False): ''' Mirror content of original axis to a new axis. That includes: - position on the figure - spines properties - ticks, ticklabels, and labels - vertical spans ''' mirrorProp(org_ax, new_ax, 'position') for sk in ['bottom', 'left', 'right', 'top']: mirrorProp(org_ax.spines[sk], new_ax.spines[sk], 'visible') for prop in ['label', 'ticks', 'ticklabels']: for k in ['x', 'y']: mirrorProp(org_ax, new_ax, f'{k}{prop}') ax_children = org_ax.get_children() vspans = filter(lambda x: isinstance(x, Polygon), ax_children) for vs in vspans: props = vs.properties() xmin, xmax = [props['xy'][i][0] for i in [0, 2]] kwargs = {k: props[k] for k in ['alpha', 'edgecolor', 'facecolor']} if kwargs['edgecolor'] == (0.0, 0.0, 0.0, 0.0): kwargs['edgecolor'] = 'none' new_ax.axvspan(xmin, xmax, **kwargs) diff --git a/PySONIC/utils.py b/PySONIC/utils.py index f7489f8..5e00754 100644 --- a/PySONIC/utils.py +++ b/PySONIC/utils.py @@ -1,1139 +1,1155 @@ # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Email: theo.lemaire@epfl.ch # @Date: 2016-09-19 22:30:46 # @Last Modified by: Theo Lemaire -# @Last Modified time: 2020-08-06 15:51:39 +# @Last Modified time: 2020-09-11 10:41:02 ''' Definition of generic utility functions used in other modules ''' import sys import itertools import csv from functools import wraps import operator import time from inspect import signature import os from shutil import get_terminal_size import lockfile import math import pickle import json from tqdm import tqdm import logging import tkinter as tk from tkinter import filedialog import base64 import datetime import numpy as np import pandas as pd from scipy.optimize import brentq from scipy.interpolate import interp1d from scipy import linalg import colorlog from pushbullet import Pushbullet # Package logger my_log_formatter = colorlog.ColoredFormatter( '%(log_color)s %(asctime)s %(message)s', datefmt='%d/%m/%Y %H:%M:%S:', reset=True, log_colors={ 'DEBUG': 'green', 'INFO': 'white', 'WARNING': 'yellow', 'ERROR': 'red', 'CRITICAL': 'red,bg_white', }, style='%') def setHandler(logger, handler): for h in logger.handlers: logger.removeHandler(h) logger.addHandler(handler) return logger def setLogger(name, formatter): handler = colorlog.StreamHandler() handler.setFormatter(formatter) handler.stream = sys.stdout logger = colorlog.getLogger(name) logger.addHandler(handler) return logger class TqdmHandler(logging.StreamHandler): def __init__(self, formatter): logging.StreamHandler.__init__(self) self.setFormatter(formatter) def emit(self, record): msg = self.format(record) tqdm.write(msg) logger = setLogger('PySONIC', my_log_formatter) LOOKUP_DIR = os.path.abspath(os.path.split(__file__)[0] + "/lookups/") def fillLine(text, char='-', totlength=None): ''' Surround a text with repetitions of a specific character in order to fill a line to a given total length. :param text: text to be surrounded :param char: surrounding character :param totlength: target number of characters in filled text line :return: filled text line ''' if totlength is None: totlength = get_terminal_size().columns - 1 ndashes = totlength - len(text) - 2 if ndashes < 2: return text else: nside = ndashes // 2 nleft, nright = nside, nside if ndashes % 2 == 1: nright += 1 return f'{char * nleft} {text} {char * nright}' # SI units prefixes si_prefixes = { 'y': 1e-24, # yocto 'z': 1e-21, # zepto 'a': 1e-18, # atto 'f': 1e-15, # femto 'p': 1e-12, # pico 'n': 1e-9, # nano 'u': 1e-6, # micro 'm': 1e-3, # mili '': 1e0, # None 'k': 1e3, # kilo 'M': 1e6, # mega 'G': 1e9, # giga 'T': 1e12, # tera 'P': 1e15, # peta 'E': 1e18, # exa 'Z': 1e21, # zetta 'Y': 1e24, # yotta } sorted_si_prefixes = sorted(si_prefixes.items(), key=operator.itemgetter(1)) def getSIpair(x, scale='lin'): ''' Get the correct SI factor and prefix for a floating point number. ''' if isIterable(x): # If iterable, get a representative number of the distribution x = np.asarray(x) x = x.prod()**(1.0 / x.size) if scale == 'log' else np.mean(x) if x == 0: return 1e0, '' else: vals = [tmp[1] for tmp in sorted_si_prefixes] ix = np.searchsorted(vals, np.abs(x)) - 1 if np.abs(x) == vals[ix + 1]: ix += 1 return vals[ix], sorted_si_prefixes[ix][0] def si_format(x, precision=0, space=' '): ''' Format a float according to the SI unit system, with the appropriate prefix letter. ''' if isinstance(x, float) or isinstance(x, int) or isinstance(x, np.float) or\ isinstance(x, np.int32) or isinstance(x, np.int64): factor, prefix = getSIpair(x) return f'{x / factor:.{precision}f}{space}{prefix}' elif isinstance(x, list) or isinstance(x, tuple): return [si_format(item, precision, space) for item in x] elif isinstance(x, np.ndarray) and x.ndim == 1: return [si_format(float(item), precision, space) for item in x] else: raise ValueError(f'cannot si_format {type(x)} objects') def pow10_format(number, precision=2): ''' Format a number in power of 10 notation. ''' sci_string = f'{number:.{precision}e}' value, exponent = sci_string.split("e") value, exponent = float(value), int(exponent) val_str = f'{value} * ' if value != 1. else '' return f'{val_str}10^{{{exponent}}}' def rmse(x1, x2, axis=None): ''' Compute the root mean square error between two 1D arrays ''' return np.sqrt(((x1 - x2) ** 2).mean(axis=axis)) def rsquared(x1, x2): ''' compute the R-squared coefficient between two 1D arrays ''' residuals = x1 - x2 ss_res = np.sum(residuals**2) ss_tot = np.sum((x1 - np.mean(x1))**2) return 1 - (ss_res / ss_tot) def Pressure2Intensity(p, rho=1075.0, c=1515.0): ''' Return the spatial peak, pulse average acoustic intensity (ISPPA) associated with the specified pressure amplitude. :param p: pressure amplitude (Pa) :param rho: medium density (kg/m3) :param c: speed of sound in medium (m/s) :return: spatial peak, pulse average acoustic intensity (W/m2) ''' return p**2 / (2 * rho * c) def Intensity2Pressure(I, rho=1075.0, c=1515.0): ''' Return the pressure amplitude associated with the specified spatial peak, pulse average acoustic intensity (ISPPA). :param I: spatial peak, pulse average acoustic intensity (W/m2) :param rho: medium density (kg/m3) :param c: speed of sound in medium (m/s) :return: pressure amplitude (Pa) ''' return np.sqrt(2 * rho * c * I) def convertPKL2JSON(): for pkl_filepath in OpenFilesDialog('pkl')[0]: logger.info(f'Processing {pkl_filepath} ...') json_filepath = f'{os.path.splitext(pkl_filepath)[0]}.json' with open(pkl_filepath, 'rb') as fpkl, open(json_filepath, 'w') as fjson: data = pickle.load(fpkl) json.dump(data, fjson, ensure_ascii=False, sort_keys=True, indent=4) logger.info('All done!') def OpenFilesDialog(filetype, dirname=''): ''' Open a FileOpenDialogBox to select one or multiple file. The default directory and file type are given. :param dirname: default directory :param filetype: default file type :return: tuple of full paths to the chosen filenames ''' root = tk.Tk() root.withdraw() filenames = filedialog.askopenfilenames( filetypes=[(filetype + " files", '.' + filetype)], initialdir=dirname ) if len(filenames) == 0: raise ValueError('no input file selected') par_dir = os.path.abspath(os.path.join(filenames[0], os.pardir)) return filenames, par_dir def selectDirDialog(title='Select directory'): ''' Open a dialog box to select a directory. :return: full path to selected directory ''' root = tk.Tk() root.withdraw() directory = filedialog.askdirectory(title=title) if directory == '': raise ValueError('no directory selected') return directory def SaveFileDialog(filename, dirname=None, ext=None): ''' Open a dialog box to save file. :param filename: filename :param dirname: initial directory :param ext: default extension :return: full path to the chosen filename ''' root = tk.Tk() root.withdraw() filename_out = filedialog.asksaveasfilename( defaultextension=ext, initialdir=dirname, initialfile=filename) if len(filename_out) == 0: raise ValueError('no output filepath selected') return filename_out def loadData(fpath, frequency=1): ''' Load dataframe and metadata dictionary from pickle file. ''' logger.info('Loading data from "%s"', os.path.basename(fpath)) with open(fpath, 'rb') as fh: frame = pickle.load(fh) df = frame['data'].iloc[::frequency] meta = frame['meta'] return df, meta def rescale(x, lb=None, ub=None, lb_new=0, ub_new=1): ''' Rescale a value to a specific interval by linear transformation. ''' if lb is None: lb = x.min() if ub is None: ub = x.max() xnorm = (x - lb) / (ub - lb) return xnorm * (ub_new - lb_new) + lb_new def expandRange(xmin, xmax, exp_factor=2): ''' Expand a range by a specific factor around its mid-point. ''' if xmin > xmax: raise ValueError('values must be provided in (min, max) order') xptp = xmax - xmin xmid = (xmin + xmax) / 2 xdev = xptp * exp_factor / 2 return (xmid - xdev, xmin + xdev) def isIterable(x): for t in [list, tuple, np.ndarray]: if isinstance(x, t): return True return False def isWithin(name, val, bounds, rel_tol=1e-9, raise_warning=True): ''' Check if a floating point number is within an interval. If the value falls outside the interval, an error is raised. If the value falls just outside the interval due to rounding errors, the associated interval bound is returned. :param val: float value :param bounds: interval bounds (float tuple) :return: original or corrected value ''' if isIterable(val): return np.array([isWithin(name, v, bounds, rel_tol, raise_warning) for v in val]) if val >= bounds[0] and val <= bounds[1]: return val elif val < bounds[0] and math.isclose(val, bounds[0], rel_tol=rel_tol): if raise_warning: logger.warning( 'Rounding %s value (%s) to interval lower bound (%s)', name, val, bounds[0]) return bounds[0] elif val > bounds[1] and math.isclose(val, bounds[1], rel_tol=rel_tol): if raise_warning: logger.warning( 'Rounding %s value (%s) to interval upper bound (%s)', name, val, bounds[1]) return bounds[1] else: raise ValueError(f'{name} value ({val}) out of [{bounds[0]}, {bounds[1]}] interval') def getDistribution(xmin, xmax, nx, scale='lin'): if scale == 'log': xmin, xmax = np.log10(xmin), np.log10(xmax) return {'lin': np.linspace, 'log': np.logspace}[scale](xmin, xmax, nx) def getDistFromList(xlist): if not isinstance(xlist, list): raise TypeError('Input must be a list') if len(xlist) != 4: raise ValueError('List must contain exactly 4 arguments ([type, min, max, n])') scale = xlist[0] if scale not in ('log', 'lin'): raise ValueError('Unknown distribution type (must be "lin" or "log")') xmin, xmax = [float(x) for x in xlist[1:-1]] if xmin >= xmax: raise ValueError('Specified minimum higher or equal than specified maximum') nx = int(xlist[-1]) if nx < 2: raise ValueError('Specified number must be at least 2') return getDistribution(xmin, xmax, nx, scale=scale) def getIndex(container, value): ''' Return the index of a float / string value in a list / array :param container: list / 1D-array of elements :param value: value to search for :return: index of value (if found) ''' if isinstance(value, float): container = np.array(container) imatches = np.where(np.isclose(container, value, rtol=1e-9, atol=1e-16))[0] if len(imatches) == 0: raise ValueError(f'{value} not found in {container}') return imatches[0] elif isinstance(value, str): return container.index(value) def funcSig(func, args, kwargs): args_repr = [repr(a) for a in args] kwargs_repr = [f"{k}={v!r}" for k, v in kwargs.items()] return f'{func.__name__}({", ".join(args_repr + kwargs_repr)})' def debug(func): ''' Print the function signature and return value. ''' @wraps(func) def wrapper_debug(*args, **kwargs): print(f'Calling {funcSig(func, args, kwargs)}') value = func(*args, **kwargs) print(f"{func.__name__!r} returned {value!r}") return value return wrapper_debug def timer(func): ''' Monitor and return the runtime of the decorated function. ''' @wraps(func) def wrapper(*args, **kwargs): start_time = time.perf_counter() value = func(*args, **kwargs) end_time = time.perf_counter() run_time = end_time - start_time return value, run_time return wrapper def alignWithFuncDef(func, args, kwargs): ''' Align a set of provided positional and keyword arguments with the arguments signature in a specific function definition. :param func: function object :param args: list of provided positional arguments :param kwargs: dictionary of provided keyword arguments :return: 2-tuple with the modified arguments and ''' # Get positional and keyword arguments from function signature sig_params = {k: v for k, v in signature(func).parameters.items()} sig_args = list(filter(lambda x: x.default == x.empty, sig_params.values())) sig_kwargs = {k: v.default for k, v in sig_params.items() if v.default != v.empty} sig_nargs = len(sig_args) kwarg_keys = list(sig_kwargs.keys()) # Restrain provided positional arguments to those that are also positional in signature new_args = args[:sig_nargs] # Construct hybrid keyword arguments dictionary from: # - remaining positional arguments # - provided keyword arguments # - default keyword arguments new_kwargs = sig_kwargs for i, x in enumerate(args[sig_nargs:]): new_kwargs[kwarg_keys[i]] = x for k, v in kwargs.items(): new_kwargs[k] = v return new_args, new_kwargs def alignWithMethodDef(method, args, kwargs): args, kwargs = alignWithFuncDef(method, [None] + list(args), kwargs) return tuple(args[1:]), kwargs def logCache(fpath, delimiter='\t', out_type=float): ''' Add an extra IO memoization functionality to a function using file caching, to avoid repetitions of tedious computations with identical inputs. ''' def wrapper_with_args(func): @wraps(func) def wrapper(*args, **kwargs): # If function has history -> do not log if 'history' in kwargs: return func(*args, **kwargs) # Modify positional and keyword arguments to match function signature, if needed args, kwargs = alignWithFuncDef(func, args, kwargs) # Translate args and kwargs into string signature fsignature = funcSig(func, args, kwargs) # If entry present in log, return corresponding output if os.path.isfile(fpath): with open(fpath, 'r', newline='') as f: reader = csv.reader(f, delimiter=delimiter) for row in reader: if row[0] == fsignature: logger.debug(f'entry found in "{os.path.basename(fpath)}"') return out_type(row[1]) # Otherwise, compute output and log it into file before returning out = func(*args, **kwargs) lock = lockfile.FileLock(fpath) lock.acquire() with open(fpath, 'a', newline='') as csvfile: writer = csv.writer(csvfile, delimiter=delimiter) writer.writerow([fsignature, str(out)]) lock.release() return out return wrapper return wrapper_with_args def fileCache(root, fcode_func, ext='json'): def wrapper_with_args(func): @wraps(func) def wrapper(*args, **kwargs): # Get load and dump functions from file extension try: load_func = { 'json': json.load, 'pkl': pickle.load, 'csv': lambda f: np.loadtxt(f, delimiter=',') }[ext] dump_func = { 'json': json.dump, 'pkl': pickle.dump, 'csv': lambda x, f: np.savetxt(f, x, delimiter=',') }[ext] except KeyError: raise ValueError('Unknown file extension') # Get read and write mode (text or binary) from file extension mode = 'b' if ext == 'pkl' else '' # Get file path from root and function arguments, using fcode function if callable(fcode_func): fcode = fcode_func(*args) else: fcode = fcode_func fpath = os.path.join(os.path.abspath(root), f'{fcode}.{ext}') # If file exists, load output from it if os.path.isfile(fpath): logger.info(f'loading data from "{fpath}"') with open(fpath, 'r' + mode) as f: out = load_func(f) # Otherwise, execute function and create the file to dump the output else: logger.warning(f'reference data file not found: "{fpath}"') out = func(*args, **kwargs) logger.info(f'dumping data in "{fpath}"') lock = lockfile.FileLock(fpath) lock.acquire() with open(fpath, 'w' + mode) as f: dump_func(out, f) lock.release() return out return wrapper return wrapper_with_args def derivative(f, x, eps, method='central'): ''' Compute the difference formula for f'(x) with perturbation size eps. :param dfunc: derivatives function, taking an array of states and returning an array of derivatives :param x: states vector :param method: difference formula: 'forward', 'backward' or 'central' :param eps: perturbation vector (same size as states vector) :return: numerical approximation of the derivative around the fixed point ''' if isIterable(x): if not isIterable(eps) or len(eps) != len(x): raise ValueError('eps must be the same size as x') elif np.sum(eps != 0.) != 1: raise ValueError('eps must be zero-valued across all but one dimensions') eps_val = np.sum(eps) else: eps_val = eps if method == 'central': df = (f(x + eps) - f(x - eps)) / 2 elif method == 'forward': df = f(x + eps) - f(x) elif method == 'backward': df = f(x) - f(x - eps) else: raise ValueError("Method must be 'central', 'forward' or 'backward'.") return df / eps_val def jacobian(dfunc, x, rel_eps=None, abs_eps=None, method='central'): ''' Evaluate the Jacobian maatrix of a (time-invariant) system, given a states vector and derivatives function. :param dfunc: derivatives function, taking an array of n states and returning an array of n derivatives :param x: n-states vector :return: n-by-n square Jacobian matrix ''' if sum(e is not None for e in [abs_eps, rel_eps]) != 1: raise ValueError('one (and only one) of "rel_eps" or "abs_eps" parameters must be provided') # Determine vector size x = np.asarray(x) n = x.size # Initialize Jacobian matrix J = np.empty((n, n)) # Create epsilon vector if rel_eps is not None: mode = 'relative' eps_vec = rel_eps else: mode = 'absolute' eps_vec = abs_eps if not isIterable(eps_vec): eps_vec = np.array([eps_vec] * n) if mode == 'relative': eps = x * eps_vec else: eps = eps_vec # Perturb each state by epsilon on both sides, re-evaluate derivatives # and assemble Jacobian matrix ei = np.zeros(n) for i in range(n): ei[i] = 1 J[:, i] = derivative(dfunc, x, eps * ei, method=method) ei[i] = 0 return J def classifyFixedPoint(x, dfunc): ''' Characterize the stability of a fixed point by numerically evaluating its Jacobian matrix and evaluating the sign of the real part of its associated eigenvalues. :param x: n-states vector :param dfunc: derivatives function, taking an array of n states and returning an array of n derivatives ''' # Compute Jacobian numerically # print(f'x = {x}, dfunx(x) = {dfunc(x)}') eps_machine = np.sqrt(np.finfo(float).eps) J = jacobian(dfunc, x, rel_eps=eps_machine, method='forward') # Compute eigenvalues and eigenvectors eigvals, eigvecs = linalg.eig(J) logger.debug(f"eigenvalues = {[f'({x.real:.2e} + {x.imag:.2e}j)' for x in eigvals]}") # Determine fixed point stability based on eigenvalues is_neg_eigvals = eigvals.real < 0 if is_neg_eigvals.all(): # all real parts negative -> stable key = 'stable' elif is_neg_eigvals.any(): # both posivie and negative real parts -> saddle key = 'saddle' else: # all real parts positive -> unstable key = 'unstable' return eigvals, key def findModifiedEq(x0, dfunc, *args): ''' Find an equilibrium variable in a modified system by searching for its derivative root within an interval around its original equilibrium. :param x0: equilibrium value in original system. :param func: derivative function, taking the variable as first parameter. :param *args: remaining arguments needed for the derivative function. :return: variable equilibrium value in modified system. ''' is_iterable = [isIterable(arg) for arg in args] if any(is_iterable): if not all(is_iterable): raise ValueError('mix of iterables and non-iterables') lengths = [len(arg) for arg in args] if not all(n == lengths[0] for n in lengths): raise ValueError(f'inputs are not of the same size: {lengths}') n = lengths[0] res = [] for i in range(n): x = [arg[i] for arg in args] res.append(findModifiedEq(x0, dfunc, *x)) return np.array(res) else: return brentq(lambda x: dfunc(x, *args), x0 * 1e-4, x0 * 1e3, xtol=1e-16) def swapFirstLetterCase(s): if s[0].islower(): return s.capitalize() else: return s[0].lower() + s[1:] def getPow10(x, direction='up'): ''' Get the power of 10 that is closest to a number, in either direction("down" or "up"). ''' round_method = {'up': np.ceil, 'down': np.floor}[direction] return np.power(10, round_method(np.log10(x))) def rotAroundPoint2D(x, theta, p): ''' Rotate a 2D vector around a center point by a given angle. :param x: 2D coordinates vector :param theta: rotation angle (in rad) :param p: 2D center point coordinates :return: 2D rotated coordinates vector ''' n1, n2 = x.shape if n1 != 2: if n2 == 2: x = x.T else: raise ValueError('x should be a 2-by-n vector') # Rotation matrix R = np.array([ [np.cos(theta), -np.sin(theta)], [np.sin(theta), np.cos(theta)], ]) # Broadcast center point to input vector ptile = np.tile(p, (x.shape[1], 1)).T # Subtract, rotate and add return R.dot(x - ptile) + ptile def getKey(keyfile='pushbullet.key'): dir_path = os.path.dirname(os.path.realpath(__file__)) package_root = os.path.abspath(os.path.join(dir_path, os.pardir)) fpath = os.path.join(package_root, keyfile) if not os.path.isfile(fpath): raise FileNotFoundError('pushbullet API key file not found') with open(fpath) as f: encoded_key = f.readlines()[0] return base64.b64decode(str.encode(encoded_key)).decode() def sendPushNotification(msg): try: key = getKey() pb = Pushbullet(key) dt = datetime.datetime.now() s = dt.strftime('%Y-%m-%d %H:%M:%S') pb.push_note('Code Messenger', f'{s}\n{msg}') except FileNotFoundError: logger.error(f'Could not send push notification: "{msg}"') def alert(func): ''' Run a function, and send a push notification upon completion, or if an error is raised during its execution. ''' @wraps(func) def wrapper(*args, **kwargs): try: out = func(*args, **kwargs) sendPushNotification(f'completed "{func.__name__}" execution successfully') return out except BaseException as e: sendPushNotification(f'error during "{func.__name__}" execution: {e}') raise e return wrapper def sunflower(n, radius=1, alpha=1): ''' Generate a population of uniformly distributed 2D data points in a unit circle. :param n: number of data points :param alpha: coefficient determining evenness of the boundary :return: 2D matrix of Cartesian (x, y) positions ''' nbounds = np.round(alpha * np.sqrt(n)) # number of boundary points phi = (np.sqrt(5) + 1) / 2 # golden ratio k = np.arange(1, n + 1) # index vector theta = 2 * np.pi * k / phi**2 # angle vector r = np.sqrt((k - 1) / (n - nbounds - 1)) # radius vector r[r > 1] = 1 x = r * np.cos(theta) y = r * np.sin(theta) return radius * np.vstack((x, y)) def filecode(model, *args): ''' Generate file code given a specific combination of model input parameters. ''' # If meta dictionary was passed, generate inputs list from it if len(args) == 1 and isinstance(args[0], dict): meta = args[0].copy() if meta['simkey'] == 'ASTIM' and 'fs' not in meta: meta['fs'] = meta['model']['fs'] meta['method'] = meta['model']['method'] meta['qss_vars'] = None for k in ['simkey', 'model', 'tcomp', 'dt', 'atol']: if k in meta: del meta[k] args = list(meta.values()) # Otherwise, transform args tuple into list else: args = list(args) # If any argument is an iterable -> transform it to a continous string for i in range(len(args)): if isIterable(args[i]): args[i] = ''.join([str(x) for x in args[i]]) # Create file code by joining string-encoded inputs with underscores codes = model.filecodes(*args).values() return '_'.join([x for x in codes if x is not None]) def simAndSave(model, *args, **kwargs): ''' Simulate the model and save the results in a specific output directory. :param *args: list of arguments provided to the simulation function :param **kwargs: optional arguments dictionary :return: output filepath ''' # Extract output directory and overwrite boolean from keyword arguments. outputdir = kwargs.pop('outputdir', '.') overwrite = kwargs.pop('overwrite', True) # Set data and meta to None data, meta = None, None # Extract drive object from args drive, *other_args = args # If drive is searchable and not fully resolved if drive.is_searchable: if not drive.is_resolved: # Call simulate to perform titration out = model.simulate(*args) # If titration yields nothing -> no file produced -> return None if out is None: logger.warning('returning None') return None # Store data and meta data, meta = out # Update args list with resovled drive try: args = (meta['drive'], *other_args) except KeyError: args = (meta['source'], *other_args) # Check if a output file corresponding to sim inputs is found in the output directory # That check is performed prior to running the simulation, such that # it is not run if the file is present and overwrite is set ot false. fname = f'{model.filecode(*args)}.pkl' fpath = os.path.join(outputdir, fname) existing_file_msg = f'File "{fname}" already present in directory "{outputdir}"' existing_file = os.path.isfile(fpath) # If file exists and overwrite is set ot false -> return if existing_file and not overwrite: logger.warning(f'{existing_file_msg} -> preserving') return fpath # Run simulation if not already done (for titration cases) if data is None: data, meta = model.simulate(*args) # Raise warning if an existing file is overwritten if existing_file: logger.warning(f'{existing_file_msg} -> overwriting') # Save output file and return output filepath with open(fpath, 'wb') as fh: pickle.dump({'meta': meta, 'data': data}, fh) logger.debug('simulation data exported to "%s"', fpath) return fpath def moveItem(l, value, itarget): ''' Move a list item to a specific target index. :param l: list object :param value: value of the item to move :param itarget: target index :return: re-ordered list. ''' # Get absolute target index if itarget < 0: itarget += len(l) assert itarget < len(l), f'target index {itarget} exceeds list size ({len(l)})' # Get index corresponding to element and delete entry from list iref = l.index(value) new_l = l.copy() del new_l[iref] # Return re-organized list return new_l[:itarget] + [value] + new_l[itarget:] def gaussian(x, mu=0., sigma=1., A=1.): return A * np.exp(-((x - mu) / sigma)**2 / 2) def isPickable(obj): try: pickle.dumps(obj) except Exception: return False return True def resolveFuncArgs(func, *args, **kwargs): ''' Return a dictionary of positional and keyword arguments upon function call, adding defaults from simfunc signature if not provided at call time. ''' bound_args = signature(func).bind(*args, **kwargs) bound_args.apply_defaults() return dict(bound_args.arguments) def getMeta(model, simfunc, *args, **kwargs): ''' Construct an informative dictionary about the model and simulation parameters. ''' # Retrieve function call arguments args_dict = resolveFuncArgs(simfunc, model, *args, **kwargs) # Construct meta dictionary meta = {'simkey': model.simkey} for k, v in args_dict.items(): if k == 'self': meta['model'] = v.meta else: meta[k] = v return meta def bounds(arr): ''' Return the bounds or a numpy array / list. ''' return (np.nanmin(arr), np.nanmax(arr)) def addColumn(df, key, arr, preceding_key=None): ''' Add a new column to a dataframe, right after a specific column. ''' df[key] = arr if preceding_key is not None: cols = df.columns.tolist()[:-1] preceding_index = cols.index(preceding_key) df = df[cols[:preceding_index + 1] + [key] + cols[preceding_index + 1:]] return df def integerSuffix(n): return 'th' if 4 <= n % 100 <= 20 else {1: 'st', 2: 'nd', 3: 'rd'}.get(n % 10, 'th') def customStrftime(fmt, dt_obj): return dt_obj.strftime(fmt).replace('{S}', str(dt_obj.day) + integerSuffix(dt_obj.day)) def friendlyLogspace(xmin, xmax, bases=None): ''' Define a "friendly" logspace between two bounds. ''' if bases is None: bases = [1, 2, 5] bases = np.asarray(bases) bounds = np.array([xmin, xmax]) logbounds = np.log10(bounds) bounds_orders = np.floor(logbounds) orders = np.arange(bounds_orders[0], bounds_orders[1] + 1) factors = np.power(10., np.floor(orders)) seq = np.hstack([bases * f for f in factors]) if xmax > seq.max(): seq = np.hstack((seq, xmax)) seq = seq[np.logical_and(seq >= xmin, seq <= xmax)] if xmin not in seq: seq = np.hstack((xmin, seq)) if xmax not in seq: seq = np.hstack((seq, xmax)) return seq def differing(d1, d2, subdkey=None, diff=None): ''' Find differences in values across two dictionaries (recursively). :param d1: first dictionary :param d2: second dictionary :param subdkey: specific sub-dictionary attribute key for objects :param diff: existing diff list to append to :return: list of (key, value1, value2) tuples for each differing values ''' # Initilize diff list if diff is None: diff = [] # Check that the two dicts have the same structure if sorted(list(d1.keys())) != sorted(list(d2.keys())): raise ValueError('inconsistent inputs') # For each key - values triplet for k in d1.keys(): # If values are dicts themselves, loop recursively through them if isinstance(d1[k], dict): diff = differing(d1[k], d2[k], subdkey=subdkey, diff=diff) # If values are objects with a specific sub-dictionary attribute, # loop recursively through them elif hasattr(d1[k], subdkey): diff = differing(getattr(d1[k], subdkey), getattr(d2[k], subdkey), subdkey=subdkey, diff=diff) # Otherwise else: # If values differ, add the key - values triplet to the diff list if d1[k] != d2[k]: diff.append((k, d1[k], d2[k])) # Return the diff list return diff def extractCommonPrefix(labels): ''' Extract a common prefix and a list of suffixes from a list of labels. ''' prefix = os.path.commonprefix(labels) if len(prefix) == 0: return None return prefix, [s.split(prefix)[1] for s in labels] def cycleAvg(t, y, T): ''' Cycle-average a vector according to a given periodicity. :param t: time vector ;param y: signal vector :param T: periodicity :return: cycle-averaged signal vector ''' t -= t[0] n = int(np.ceil(t[-1] / T)) return np.array([ np.mean(y[np.where((t >= i * T) & (t < (i + 1) * T))[0]]) for i in range(n)]) class TimeSeries(pd.DataFrame): ''' Wrapper around pandas DataFrame to store timeseries data. ''' time_key = 't' stim_key = 'stimstate' def __init__(self, t, stim, dout): super().__init__(data={ self.time_key: t, self.stim_key: stim, **dout }) @property def time(self): return self[self.time_key].values @property def tbounds(self): return self.time.min(), self.time.max() @property def stim(self): return self[self.stim_key].values @property def inputs(self): return [self.time_key, self.stim_key] @property def outputs(self): return list(set(self.columns.values) - set(self.inputs)) def interpCol(self, t, k, kind): ''' Interpolate a column according to a new time vector. ''' kind = 'nearest' if k == self.stim_key else 'linear' self[k] = interp1d(self.time, self[k].values, kind=kind)(t) def interp1d(self, t): ''' Interpolate the entire dataframe according to a new time vector. ''' for k in self.outputs: self.interpCol(t, k, 'linear') self.interpCol(t, self.stim_key, 'nearest') self[self.time_key] = t def resample(self, dt): ''' Resample dataframe at regular time step. ''' tmin, tmax = self.tbounds n = int((tmax - tmin) / dt) + 1 self.interp1d(np.linspace(tmin, tmax, n)) def cycleAveraged(self, T): ''' Cycle-average a periodic solution. ''' t = np.arange(self.time[0], self.time[-1], T) stim = interp1d(self.time, self.stim, kind='nearest')(t) outputs = {k: cycleAvg(self.time, self[k].values, T) for k in self.outputs} return self.__class__(t, stim, outputs) def prepend(self, t0=0): ''' Repeat first row outputs for a preceding time. ''' if t0 > self.time.min(): raise ValueError('t0 greater than minimal time value') self.loc[-1] = self.iloc[0] # repeat first row self.index = self.index + 1 # shift index self.sort_index(inplace=True) self[self.time_key][0] = t0 self[self.stim_key][0] = 0 def bound(self, tbounds): ''' Restrict all columns of dataframe to indexes corresponding to time values within specific bounds. ''' tmin, tmax = tbounds return self[np.logical_and(self.time >= tmin, self.time <= tmax)].reset_index(drop=True) def checkAgainst(self, other): assert isinstance(other, self.__class__), 'classes do not match' assert all(self.keys() == other.keys()), 'differing keys' for k in self.inputs: assert all(self[k].values == other[k].values), f'{k} vectors do not match' def operate(self, other, op): ''' Generic arithmetic operator. ''' self.checkAgainst(other) return self.__class__( self.time, self.stim, {k: getattr(self[k].values, op)(other[k].values) for k in self.outputs} ) def __add__(self, other): ''' Addition operator. ''' return self.operate(other, '__add__') def __sub__(self, other): ''' Subtraction operator. ''' return self.operate(other, '__sub__') def __mul__(self, other): ''' Multiplication operator. ''' return self.operate(other, '__mul__') def __truediv__(self, other): ''' Division operator. ''' return self.operate(other, '__truediv__') def pairwise(iterable): ''' s -> (s0,s1), (s1,s2), (s2, s3), ... ''' a, b = itertools.tee(iterable) next(b, None) return list(zip(a, b)) def padleft(x): return np.pad(x, (1, 0), 'edge') def padright(x): return np.pad(x, (0, 1), 'edge') + + +def timeThreshold(t, y, dy_thr): + ''' Find time interval required to reach a given threshold in a non-monotonous signal. ''' + y -= y[0] # remove initial offset + ifirst = np.where(y > dy_thr)[0][0] + return np.interp(dy_thr, y[:ifirst + 1], t[:ifirst + 1]) + + +def flatten(din): + ''' Flatten a two level dictionary ''' + dout = {} + for k, v in din.items(): + for k2, v2 in v.items(): + dout[f'{k} - {k2}'] = v2 + return dout