diff --git a/PySONIC/constants.py b/PySONIC/constants.py index b1ed663..e9e7f6d 100644 --- a/PySONIC/constants.py +++ b/PySONIC/constants.py @@ -1,78 +1,79 @@ # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Email: theo.lemaire@epfl.ch # @Date: 2016-11-04 13:23:31 # @Last Modified by: Theo Lemaire -# @Last Modified time: 2020-03-31 15:37:58 +# @Last Modified time: 2020-04-15 19:22:09 ''' Numerical constants used in the package. ''' # -------------------------- Biophysical constants -------------------------- FARADAY = 9.64853e4 # Faraday constant (C/mol) Rg = 8.31342 # Universal gas constant (Pa.m^3.mol^-1.K^-1 or J.mol^-1.K^-1) Z_Ca = 2 # Calcium valence Z_Na = 1 # Sodium valence Z_K = 1 # Potassium valence CELSIUS_2_KELVIN = 273.15 # Celsius to Kelvin conversion constant # -------------------------- Intermolecular pressure fitting -------------------------- LJFIT_PM_MAX = 1e8 # Pm value at the deflection lower bound for LJ fitting (Pa) PNET_EQ_MAX = 1e-1 # Pnet error threshold at computed equilibrium position (Pa) PMAVG_STD_ERR_MAX = 5e3 # error threshold in intermolecular pressure nonlinear fit (Pa) # -------------------------- Lookups pre-computing -------------------------- DQ_LOOKUP = 1e-5 # charge density interval step for lookup tables # -------------------------- Simulations -------------------------- MAX_RMSE_PTP_RATIO = 1e-4 # threshold RMSE / peak-to-peak ratio for periodic convergence Z_ERR_MAX = 1e-11 # periodic convergence threshold for deflection (m) NG_ERR_MAX = 1e-24 # periodic convergence threshold for gas content (mol) NCYCLES_MAX = 10 # max number of cycles in periodic simulations CHARGE_RANGE = (-300e-5, 150e-5) # physiological charge range constraining the membrane (C/m2) SOLVER_NSTEPS = 1000 # max number of steps during one ODE solver call CLASSIC_TARGET_DT = 1e-8 # target time step in output arrays of detailed simulations NPC_DENSE = 1000 # nb of samples per acoustic period in detailed simulations NPC_SPARSE = 40 # nb of samples per acoustic period in sparse simulations +MIN_SPARSE_DT = 1e-12 # minimal time step used during sparse integration (s) HYBRID_UPDATE_INTERVAL = 5e-4 # time interval between two hybrid integrations (s) DT_EFFECTIVE = 5e-5 # time step for effective integration (s) MIN_SAMPLES_PER_PULSE_INTERVAL = 1 # minimal number of time points per pulse interval (TON of TOFF) # -------------------------- Post-processing -------------------------- SPIKE_MIN_DT = 5e-4 # minimal time interval for spike detection on charge signal (s) SPIKE_MIN_QAMP = 3e-5 # threshold amplitude for spike detection on charge signal (C/m2) SPIKE_MIN_QPROM = 20e-5 # threshold prominence for spike detection on charge signal (C/m2) SPIKE_MIN_VAMP = 3.0 # threshold amplitude for spike detection on potential signal (mV) SPIKE_MIN_VPROM = 20.0 # threshold prominence for spike detection on potential signal (mV) MIN_NSPIKES_SPECTRUM = 3 # minimum number of spikes to compute firing rate spectrum # -------------------------- Titrations -------------------------- ESTIM_AMP_UPPER_BOUND = 1e5 # initial current density upper bound for titration (mA/m2) ESTIM_AMP_INITIAL = 1e0 # initial ESTIM titration amplitude (mA/m2) ESTIM_REL_CONV_THR = 1e-2 # relative ESTIM titration convergence threshold ASTIM_AMP_INITIAL = 1e4 # initial ASTIM titration amplitude (Pa) ASTIM_ABS_CONV_THR = 1e2 # absolute ASTIM titration convergence threshold (Pa) ASTIM_REL_CONV_THR = 1e0 # relative ASTIM titration convergence threshold (Pa) # -------------------------- QSS stability analysis -------------------------- QSS_REL_OFFSET = .05 # relative state perturbation amplitude: s = s0 * (1 +/- x) QSS_HISTORY_INTERVAL = 30e-3 # recent history interval (s) QSS_INTEGRATION_INTERVAL = 1e-3 # iterative integration interval (s) QSS_MAX_INTEGRATION_DURATION = 1000e-3 # iterative integration interval (s) QSS_Q_CONV_THR = 1e-7 # max. charge deviation to infer convergence (C/m2) QSS_Q_DIV_THR = 1e-4 # min. charge deviation to infer divergence (C/m2) TMIN_STABILIZATION = 500e-3 # time window for stabilization analysis (s) def getConstantsDict(): cdict = {} for k, v in globals().items(): if not k.startswith('__') and k != 'getConstantsDict': cdict[k] = v return cdict diff --git a/PySONIC/core/__init__.py b/PySONIC/core/__init__.py index d5063dd..07375cb 100644 --- a/PySONIC/core/__init__.py +++ b/PySONIC/core/__init__.py @@ -1,48 +1,49 @@ # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Email: theo.lemaire@epfl.ch # @Date: 2017-06-06 13:36:00 # @Last Modified by: Theo Lemaire -# @Last Modified time: 2020-02-20 16:35:41 +# @Last Modified time: 2020-04-15 20:48:13 from types import MethodType import inspect import sys -from .simulators import * +from .solvers import * from .batches import * from .model import * from .pneuron import * from .bls import * from .translators import * from .nbls import * from .vclamp import * from .lookups import * from .protocols import * from .drives import * from ..neurons import getPointNeuron def getModelsDict(): ''' Construct a dictionary of all model classes, indexed by simulation key. ''' current_module = sys.modules[__name__] models_dict = {} for _, obj in inspect.getmembers(current_module): if inspect.isclass(obj) and hasattr(obj, 'simkey') and isinstance(obj.simkey, str): models_dict[obj.simkey] = obj return models_dict + # Add an initFromMeta method to the Pointneuron class (done here to avoid circular import) PointNeuron.initFromMeta = MethodType( lambda self, meta: getPointNeuron(meta['neuron']), PointNeuron) models_dict = getModelsDict() def getModel(meta): ''' Return appropriate model object based on a dictionary of meta-information. ''' simkey = meta['simkey'] try: return models_dict[simkey].initFromMeta(meta['model']) except KeyError: raise ValueError(f'Unknown model type:{simkey}') diff --git a/PySONIC/core/bls.py b/PySONIC/core/bls.py index f64d70e..a5c6701 100644 --- a/PySONIC/core/bls.py +++ b/PySONIC/core/bls.py @@ -1,814 +1,809 @@ # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Email: theo.lemaire@epfl.ch # @Date: 2016-09-29 16:16:19 # @Last Modified by: Theo Lemaire -# @Last Modified time: 2020-04-09 09:10:22 +# @Last Modified time: 2020-04-15 20:16:08 from enum import Enum import os import json import numpy as np import pandas as pd import scipy.integrate as integrate from scipy.optimize import brentq, curve_fit from .model import Model -from .simulators import PeriodicSimulator +from .solvers import PeriodicSolver from .drives import Drive, AcousticDrive from ..utils import logger, si_format from ..constants import * class PmCompMethod(Enum): ''' Enum: types of computation method for the intermolecular pressure ''' direct = 1 predict = 2 def LennardJones(x, beta, alpha, C, m, n): ''' Generic expression of a Lennard-Jones function, adapted for the context of symmetric deflection (distance = 2x). :param x: deflection (i.e. half-distance) :param beta: x-shifting factor :param alpha: x-scaling factor :param C: y-scaling factor :param m: exponent of the repulsion term :param n: exponent of the attraction term :return: Lennard-Jones potential at given distance (2x) ''' return C * (np.power((alpha / (2 * x + beta)), m) - np.power((alpha / (2 * x + beta)), n)) def lookup(func): ''' Load parameters from lookup file, or compute them and store them in lookup file. ''' lookup_path = os.path.join(os.path.split(__file__)[0], 'bls_lookups.json') def wrapper(obj): akey = f'{obj.a * 1e9:.1f}' Qkey = f'{obj.Qm0 * 1e5:.2f}' # Open lookup files try: with open(lookup_path, 'r') as fh: lookups = json.load(fh) except FileNotFoundError: lookups = {} # If info not in lookups, compute parameters and add them if akey not in lookups or Qkey not in lookups[akey]: func(obj) if akey not in lookups: lookups[akey] = {Qkey: {'LJ_approx': obj.LJ_approx, 'Delta_eq': obj.Delta}} else: lookups[akey][Qkey] = {'LJ_approx': obj.LJ_approx, 'Delta_eq': obj.Delta} logger.debug('Saving BLS derived parameters to lookup file') with open(lookup_path, 'w') as fh: json.dump(lookups, fh, indent=2) # If lookup exists, load parameters from it else: logger.debug('Loading BLS derived parameters from lookup file') obj.LJ_approx = lookups[akey][Qkey]['LJ_approx'] obj.Delta = lookups[akey][Qkey]['Delta_eq'] return wrapper class BilayerSonophore(Model): ''' Definition of the Bilayer Sonophore Model - geometry - pressure terms - cavitation dynamics ''' # BIOMECHANICAL PARAMETERS T = 309.15 # Temperature (K) delta0 = 2.0e-9 # Thickness of the leaflet (m) Delta_ = 1.4e-9 # Initial gap between the two leaflets on a non-charged membrane at equil. (m) pDelta = 1.0e5 # Attraction/repulsion pressure coefficient (Pa) m = 5.0 # Exponent in the repulsion term (dimensionless) n = 3.3 # Exponent in the attraction term (dimensionless) rhoL = 1075.0 # Density of the surrounding fluid (kg/m^3) muL = 7.0e-4 # Dynamic viscosity of the surrounding fluid (Pa.s) muS = 0.035 # Dynamic viscosity of the leaflet (Pa.s) kA = 0.24 # Area compression modulus of the leaflet (N/m) alpha = 7.56 # Tissue shear loss modulus frequency coefficient (Pa.s) C0 = 0.62 # Initial gas molar concentration in the surrounding fluid (mol/m^3) kH = 1.613e5 # Henry's constant (Pa.m^3/mol) P0 = 1.0e5 # Static pressure in the surrounding fluid (Pa) Dgl = 3.68e-9 # Diffusion coefficient of gas in the fluid (m^2/s) xi = 0.5e-9 # Boundary layer thickness for gas transport across leaflet (m) c = 1515.0 # Speed of sound in medium (m/s) # BIOPHYSICAL PARAMETERS epsilon0 = 8.854e-12 # Vacuum permittivity (F/m) epsilonR = 1.0 # Relative permittivity of intramembrane cavity (dimensionless) rel_Zmin = -0.49 # relative deflection range lower bound (in multiples of Delta) tscale = 'us' # relevant temporal scale of the model simkey = 'MECH' # keyword used to characterize simulations made with this model def __init__(self, a, Cm0, Qm0, embedding_depth=0.0): ''' Constructor of the class. :param a: in-plane radius of the sonophore structure within the membrane (m) :param Cm0: membrane resting capacitance (F/m2) :param Qm0: membrane resting charge density (C/m2) :param embedding_depth: depth of the embedding tissue around the membrane (m) ''' # Extract resting constants and geometry self.Cm0 = Cm0 self.Qm0 = Qm0 self.a = a self.d = embedding_depth self.S0 = np.pi * self.a**2 # Initialize null elastic modulus for tissue self.kA_tissue = 0. # Compute Pm params self.computePMparams() # Compute initial volume and gas content self.V0 = np.pi * self.Delta * self.a**2 self.ng0 = self.gasPa2mol(self.P0, self.V0) def copy(self): return self.__class__(self.a, self.Cm0, self.Qm0, embedding_depth=self.d) @property def a(self): return self._a @a.setter def a(self, value): if value <= 0.: raise ValueError('Sonophore radius must be positive') self._a = value @property def Cm0(self): return self._Cm0 @Cm0.setter def Cm0(self, value): if value <= 0.: raise ValueError('Resting membrane capacitance must be positive') self._Cm0 = value @property def d(self): return self._d @d.setter def d(self, value): if value < 0.: raise ValueError('Embedding depth cannot be negative') self._d = value def __repr__(self): s = f'{self.__class__.__name__}({self.a * 1e9:.1f} nm' if self.d > 0.: s += f', d={si_format(self.d, precision=1)}m' return f'{s})' @property def meta(self): return { 'a': self.a, 'd': self.d, 'Cm0': self.Cm0, 'Qm0': self.Qm0, } @classmethod def initFromMeta(cls, d): return cls(d['a'], d['Cm0'], d['Qm0']) @staticmethod def inputs(): return { 'a': { 'desc': 'sonophore radius', 'label': 'a', 'unit': 'nm', 'factor': 1e9, 'precision': 0 }, 'Qm': { 'desc': 'membrane charge density', 'label': 'Q_m', 'unit': 'nC/cm^2', 'factor': 1e5, 'precision': 1 }, **AcousticDrive.inputs() } def filecodes(self, drive, Qm, PmCompMethod='predict'): return { 'simkey': self.simkey, 'a': f'{self.a * 1e9:.0f}nm', **drive.filecodes, 'Qm': f'{Qm * 1e5:.1f}nCcm2' } @staticmethod def getPltVars(wrapleft='df["', wrapright='"]'): return { 'Pac': { 'desc': 'acoustic pressure', 'label': 'P_{AC}', 'unit': 'kPa', 'factor': 1e-3, 'func': f'meta["drive"].compute({wrapleft}t{wrapright})' }, 'Z': { 'desc': 'leaflets deflection', 'label': 'Z', 'unit': 'nm', 'factor': 1e9, 'bounds': (-1.0, 10.0) }, 'ng': { 'desc': 'gas content', 'label': 'n_g', 'unit': '10^{-22}\ mol', 'factor': 1e22, 'bounds': (1.0, 15.0) }, 'Pmavg': { 'desc': 'average intermolecular pressure', 'label': 'P_M', 'unit': 'kPa', 'factor': 1e-3, 'func': f'PMavgpred({wrapleft}Z{wrapright})' }, 'Telastic': { 'desc': 'leaflet elastic tension', 'label': 'T_E', 'unit': 'mN/m', 'factor': 1e3, 'func': f'TEleaflet({wrapleft}Z{wrapright})' }, 'Cm': { 'desc': 'membrane capacitance', 'label': 'C_m', 'unit': 'uF/cm^2', 'factor': 1e2, 'bounds': (0.0, 1.5), 'func': f'v_capacitance({wrapleft}Z{wrapright})' } } @property def pltScheme(self): return { 'P_{AC}': ['Pac'], 'Z': ['Z'], 'n_g': ['ng'] } @property def Zmin(self): return self.rel_Zmin * self.Delta def curvrad(self, Z): ''' Leaflet curvature radius (signed variable) :param Z: leaflet apex deflection (m) :return: leaflet curvature radius (m) ''' if Z == 0.0: return np.inf else: return (self.a**2 + Z**2) / (2 * Z) def v_curvrad(self, Z): ''' Vectorized curvrad function ''' return np.array(list(map(self.curvrad, Z))) def surface(self, Z): ''' Surface area of the stretched leaflet (spherical cap formula) :param Z: leaflet apex deflection (m) :return: stretched leaflet surface (m^2) ''' return np.pi * (self.a**2 + Z**2) def volume(self, Z): ''' Volume of the inter-leaflet space (cylinder +/- 2 spherical caps) :param Z: leaflet apex deflection (m) :return: bilayer sonophore inner volume (m^3) ''' return np.pi * self.a**2 * self.Delta\ * (1 + (Z / (3 * self.Delta) * (3 + Z**2 / self.a**2))) def arealStrain(self, Z): ''' Areal strain of the stretched leaflet epsilon = (S - S0)/S0 = (Z/a)^2 :param Z: leaflet apex deflection (m) :return: areal strain (dimensionless) ''' return (Z / self.a)**2 def capacitance(self, Z): ''' Membrane capacitance (parallel-plate capacitor evaluated at average inter-layer distance) :param Z: leaflet apex deflection (m) :return: capacitance per unit area (F/m2) ''' if Z == 0.0: return self.Cm0 else: return ((self.Cm0 * self.Delta / self.a**2) * (Z + (self.a**2 - Z**2 - Z * self.Delta) / (2 * Z) * np.log((2 * Z + self.Delta) / self.Delta))) def v_capacitance(self, Z): ''' Vectorized capacitance function ''' return np.array(list(map(self.capacitance, Z))) def derCapacitance(self, Z, U): ''' Evolution of membrane capacitance :param Z: leaflet apex deflection (m) :param U: leaflet apex deflection velocity (m/s) :return: time derivative of capacitance per unit area (F/m2.s) ''' dCmdZ = ((self.Cm0 * self.Delta / self.a**2) * ((Z**2 + self.a**2) / (Z * (2 * Z + self.Delta)) - ((Z**2 + self.a**2) * np.log((2 * Z + self.Delta) / self.Delta)) / (2 * Z**2))) return dCmdZ * U @staticmethod def localDeflection(r, Z, R): ''' Local leaflet deflection at specific radial distance (signed) :param r: in-plane distance from center of the sonophore (m) :param Z: leaflet apex deflection (m) :param R: leaflet curvature radius (m) :return: local transverse leaflet deviation (m) ''' if np.abs(Z) == 0.0: return 0.0 else: return np.sign(Z) * (np.sqrt(R**2 - r**2) - np.abs(R) + np.abs(Z)) def PMlocal(self, r, Z, R): ''' Local intermolecular pressure :param r: in-plane distance from center of the sonophore (m) :param Z: leaflet apex deflection (m) :param R: leaflet curvature radius (m) :return: local intermolecular pressure (Pa) ''' z = self.localDeflection(r, Z, R) relgap = (2 * z + self.Delta) / self.Delta_ return self.pDelta * ((1 / relgap)**self.m - (1 / relgap)**self.n) def PMavg(self, Z, R, S): ''' Average intermolecular pressure across the leaflet (computed by quadratic integration) :param Z: leaflet apex outward deflection value (m) :param R: leaflet curvature radius (m) :param S: surface of the stretched leaflet (m^2) :return: averaged intermolecular resultant pressure (Pa) .. warning:: quadratic integration is computationally expensive. ''' # Integrate intermolecular force over an infinitely thin ring of radius r from 0 to a fTotal, _ = integrate.quad(lambda r, Z, R: 2 * np.pi * r * self.PMlocal(r, Z, R), 0, self.a, args=(Z, R)) return fTotal / S def v_PMavg(self, Z, R, S): ''' Vectorized PMavg function ''' return np.array(list(map(self.PMavg, Z, R, S))) def LJfitPMavg(self): ''' Determine optimal parameters of a Lennard-Jones expression approximating the average intermolecular pressure. These parameters are obtained by a nonlinear fit of the Lennard-Jones function for a range of deflection values between predetermined Zmin and Zmax. :return: 3-tuple with optimized LJ parameters for PmAvg prediction (Map) and the standard and max errors of the prediction in the fitting range (in Pascals) ''' # Determine lower bound of deflection range: when Pm = Pmmax PMmax = LJFIT_PM_MAX # Pa Zlb_range = (self.Zmin, 0.0) Zlb = brentq(lambda Z, Pmmax: self.PMavg(Z, self.curvrad(Z), self.surface(Z)) - PMmax, *Zlb_range, args=(PMmax), xtol=1e-16) # Create vectors for geometric variables Zub = 2 * self.a Z = np.arange(Zlb, Zub, 1e-11) Pmavg = self.v_PMavg(Z, self.v_curvrad(Z), self.surface(Z)) # Compute optimal nonlinear fit of custom LJ function with initial guess x0_guess = self.delta0 C_guess = 0.1 * self.pDelta nrep_guess = self.m nattr_guess = self.n pguess = (x0_guess, C_guess, nrep_guess, nattr_guess) popt, _ = curve_fit(lambda x, x0, C, nrep, nattr: LennardJones(x, self.Delta, x0, C, nrep, nattr), Z, Pmavg, p0=pguess, maxfev=100000) (x0_opt, C_opt, nrep_opt, nattr_opt) = popt Pmavg_fit = LennardJones(Z, self.Delta, x0_opt, C_opt, nrep_opt, nattr_opt) # Compute prediction error residuals = Pmavg - Pmavg_fit ss_res = np.sum(residuals**2) N = residuals.size std_err = np.sqrt(ss_res / N) max_err = max(np.abs(residuals)) logger.debug('LJ approx: x0 = %.2f nm, C = %.2f kPa, m = %.2f, n = %.2f', x0_opt * 1e9, C_opt * 1e-3, nrep_opt, nattr_opt) LJ_approx = {"x0": x0_opt, "C": C_opt, "nrep": nrep_opt, "nattr": nattr_opt} return (LJ_approx, std_err, max_err) @lookup def computePMparams(self): # Find Delta that cancels out Pm + Pec at Z = 0 (m) if self.Qm0 == 0.0: D_eq = self.Delta_ else: (D_eq, Pnet_eq) = self.findDeltaEq(self.Qm0) assert Pnet_eq < PNET_EQ_MAX, 'High Pnet at Z = 0 with ∆ = %.2f nm' % (D_eq * 1e9) self.Delta = D_eq # Find optimal Lennard-Jones parameters to approximate PMavg (self.LJ_approx, std_err, _) = self.LJfitPMavg() assert std_err < PMAVG_STD_ERR_MAX, 'High error in PmAvg nonlinear fit:'\ ' std_err = %.2f Pa' % std_err def PMavgpred(self, Z): ''' Approximated average intermolecular pressure (using nonlinearly fitted Lennard-Jones function) :param Z: leaflet apex deflection (m) :return: predicted average intermolecular pressure (Pa) ''' return LennardJones(Z, self.Delta, self.LJ_approx['x0'], self.LJ_approx['C'], self.LJ_approx['nrep'], self.LJ_approx['nattr']) def Pelec(self, Z, Qm): ''' Electrical pressure term :param Z: leaflet apex deflection (m) :param Qm: membrane charge density (C/m2) :return: electrical pressure (Pa) ''' relS = self.S0 / self.surface(Z) abs_perm = self.epsilon0 * self.epsilonR # F/m return - relS * Qm**2 / (2 * abs_perm) # Pa def findDeltaEq(self, Qm): ''' Compute the Delta that cancels out the (Pm + Pec) equation at Z = 0 for a given membrane charge density, using the Brent method to refine the pressure root iteratively. :param Qm: membrane charge density (C/m2) :return: equilibrium value (m) and associated pressure (Pa) ''' def dualPressure(Delta): x = (self.Delta_ / Delta) return (self.pDelta * (x**self.m - x**self.n) + self.Pelec(0.0, Qm)) Delta_eq = brentq(dualPressure, 0.1 * self.Delta_, 2.0 * self.Delta_, xtol=1e-16) logger.debug('∆eq = %.2f nm', Delta_eq * 1e9) return (Delta_eq, dualPressure(Delta_eq)) def gasFlux(self, Z, P): ''' Gas molar flux through the sonophore boundary layers :param Z: leaflet apex deflection (m) :param P: internal gas pressure (Pa) :return: gas molar flux (mol/s) ''' dC = self.C0 - P / self.kH return 2 * self.surface(Z) * self.Dgl * dC / self.xi @classmethod def gasmol2Pa(cls, ng, V): ''' Internal gas pressure for a given molar content :param ng: internal molar content (mol) :param V: sonophore inner volume (m^3) :return: internal gas pressure (Pa) ''' return ng * Rg * cls.T / V @classmethod def gasPa2mol(cls, P, V): ''' Internal gas molar content for a given pressure :param P: internal gas pressure (Pa) :param V: sonophore inner volume (m^3) :return: internal gas molar content (mol) ''' return P * V / (Rg * cls.T) def PtotQS(self, Z, ng, Qm, Pac, Pm_comp_method): ''' Net quasi-steady pressure for a given acoustic pressure (Ptot = Pm + Pg + Pec - P0 - Pac) :param Z: leaflet apex deflection (m) :param ng: internal molar content (mol) :param Qm: membrane charge density (C/m2) :param Pac: acoustic pressure (Pa) :param Pm_comp_method: computation method for average intermolecular pressure :return: total balance pressure (Pa) ''' if Pm_comp_method is PmCompMethod.direct: Pm = self.PMavg(Z, self.curvrad(Z), self.surface(Z)) elif Pm_comp_method is PmCompMethod.predict: Pm = self.PMavgpred(Z) return Pm + self.gasmol2Pa(ng, self.volume(Z)) - self.P0 - Pac + self.Pelec(Z, Qm) def balancedefQS(self, ng, Qm, Pac=0.0, Pm_comp_method=PmCompMethod.predict): ''' Quasi-steady equilibrium deflection for a given acoustic pressure (computed by approximating the root of quasi-steady pressure) :param ng: internal molar content (mol) :param Qm: membrane charge density (C/m2) :param Pac: external acoustic perturbation (Pa) :param Pm_comp_method: computation method for average intermolecular pressure :return: leaflet deflection canceling quasi-steady pressure (m) ''' Zbounds = (self.Zmin, self.a) Plb, Pub = [self.PtotQS(x, ng, Qm, Pac, Pm_comp_method) for x in Zbounds] assert (Plb > 0 > Pub), '[{}, {}] is not a sign changing interval for PtotQS'.format(*Zbounds) return brentq(self.PtotQS, *Zbounds, args=(ng, Qm, Pac, Pm_comp_method), xtol=1e-16) def TEleaflet(self, Z): ''' Elastic tension in leaflet :param Z: leaflet apex deflection (m) :return: circumferential elastic tension (N/m) ''' return self.kA * self.arealStrain(Z) def setTissueModulus(self, drive): ''' Set the frequency-dependent elastic modulus of the surrounding tissue. ''' G_tissue = self.alpha * drive.modulationFrequency # G'' (Pa) self.kA_tissue = 2 * G_tissue * self.d # kA of the tissue layer (N/m) def TEtissue(self, Z): ''' Elastic tension in surrounding viscoelastic layer :param Z: leaflet apex deflection (m) :return: circumferential elastic tension (N/m) ''' return self.kA_tissue * self.arealStrain(Z) def TEtot(self, Z): ''' Total elastic tension (leaflet + surrounding viscoelastic layer) :param Z: leaflet apex deflection (m) :return: circumferential elastic tension (N/m) ''' return self.TEleaflet(Z) + self.TEtissue(Z) def PEtot(self, Z, R): ''' Total elastic tension pressure (leaflet + surrounding viscoelastic layer) :param Z: leaflet apex deflection (m) :param R: leaflet curvature radius (m) :return: elastic tension pressure (Pa) ''' return - self.TEtot(Z) / R @classmethod def PVleaflet(cls, U, R): ''' Viscous stress pressure in leaflet :param U: leaflet apex deflection velocity (m/s) :param R: leaflet curvature radius (m) :return: leaflet viscous stress pressure (Pa) ''' return - 12 * U * cls.delta0 * cls.muS / R**2 @classmethod def PVfluid(cls, U, R): ''' Viscous stress pressure in surrounding medium :param U: leaflet apex deflection velocity (m/s) :param R: leaflet curvature radius (m) :return: fluid viscous stress pressure (Pa) ''' return - 4 * U * cls.muL / np.abs(R) @classmethod def accP(cls, Ptot, R): ''' Leaflet transverse acceleration resulting from pressure imbalance :param Ptot: net pressure (Pa) :param R: leaflet curvature radius (m) :return: pressure-driven acceleration (m/s^2) ''' return Ptot / (cls.rhoL * np.abs(R)) @staticmethod def accNL(U, R): ''' Leaflet transverse nonlinear acceleration :param U: leaflet apex deflection velocity (m/s) :param R: leaflet curvature radius (m) :return: nonlinear acceleration term (m/s^2) .. note:: A simplified version of nonlinear acceleration (neglecting dR/dH) is used here. ''' # return - (3/2 - 2*R/H) * U**2 / R return -(3 * U**2) / (2 * R) @staticmethod def checkInputs(drive, Qm, Pm_comp_method): ''' Check validity of stimulation parameters :param drive: acoustic drive object :param Qm: imposed membrane charge density (C/m2) :param Pm_comp_method: type of method used to compute average intermolecular pressure ''' if not isinstance(drive, Drive): raise TypeError(f'Invalid "drive" parameter (must be an "Drive" object)') if not isinstance(Qm, float): raise TypeError(f'Invalid "Qm" parameter (must be float typed)') Qmin, Qmax = CHARGE_RANGE if Qm < Qmin or Qm > Qmax: raise ValueError( f'Invalid applied charge: {Qm * 1e5} nC/cm2 (must be within [{Qmin * 1e5}, {Qmax * 1e5}] interval') if not isinstance(Pm_comp_method, PmCompMethod): raise TypeError('Invalid Pm computation method (must be "PmCompmethod" type)') def derivatives(self, t, y, drive, Qm, Pm_comp_method=PmCompMethod.predict): ''' Evolution of the mechanical system :param t: time instant (s) :param y: vector of HH system variables at time t :param drive: acoustic drive object :param Qm: membrane charge density (F/m2) :param Pm_comp_method: computation method for average intermolecular pressure :return: vector of mechanical system derivatives at time t ''' # Split input vector explicitly U, Z, ng = y # Correct deflection value is below critical compression if Z < self.Zmin: logger.warning('Deflection out of range: Z = %.2f nm', Z * 1e9) Z = self.Zmin # Compute curvature radius R = self.curvrad(Z) # Compute total pressure Pg = self.gasmol2Pa(ng, self.volume(Z)) if Pm_comp_method is PmCompMethod.direct: Pm = self.PMavg(Z, self.curvrad(Z), self.surface(Z)) elif Pm_comp_method is PmCompMethod.predict: Pm = self.PMavgpred(Z) Pac = drive.compute(t) Pv = self.PVleaflet(U, R) + self.PVfluid(U, R) Ptot = Pm + Pg - self.P0 - Pac + self.PEtot(Z, R) + Pv + self.Pelec(Z, Qm) # Compute derivatives dUdt = self.accP(Ptot, R) + self.accNL(U, R) dZdt = U dngdt = self.gasFlux(Z, Pg) # Return derivatives vector return [dUdt, dZdt, dngdt] def computeInitialDeflection(self, drive, Qm, dt, Pm_comp_method=PmCompMethod.predict): ''' Compute non-zero deflection value for a small perturbation (solving quasi-steady equation). ''' Pac = drive.compute(dt) return self.balancedefQS(self.ng0, Qm, Pac, Pm_comp_method) @classmethod @Model.checkOutputDir def simQueue(cls, freqs, amps, charges, **kwargs): drives = AcousticDrive.createQueue(freqs, amps) queue = [] for drive in drives: for Qm in charges: queue.append([drive, Qm]) return queue + def computeInitialConditions(self, *args, **kwargs): + ''' Compute simulation initial conditions. ''' + # Compute initial non-zero deflection + Z = self.computeInitialDeflection(*args, **kwargs) + + # Return initial conditions dictionary + return { + 'U': [0.] * 2, + 'Z': [0., Z], + 'ng': [self.ng0] * 2, + } + def simCycles(self, drive, Qm, n=None, Pm_comp_method=PmCompMethod.predict): ''' Simulate for a specific number of cycles or until periodic stabilization, for a specific set of ultrasound parameters, and return output data in a dataframe. :param drive: acoustic drive object :param Qm: imposed membrane charge density (C/m2) :param n: number of cycles (optional) :param Pm_comp_method: type of method used to compute average intermolecular pressure :return: output dataframe ''' # Determine time step dt = drive.dt - # Determine stop function - if n is not None: - stopfunc = lambda t, _, T: t[-1] > (n - 1) * T - else: - stopfunc = None - # Set the tissue elastic modulus self.setTissueModulus(drive) - # Compute initial non-zero deflection - Z = self.computeInitialDeflection(drive, Qm, dt, Pm_comp_method=Pm_comp_method) - - # Set initial conditions - y0 = np.array([0., 0., self.ng0]) - y1 = np.array([0., Z, self.ng0]) + # Compute initial conditions + y0 = self.computeInitialConditions(drive, Qm, dt, Pm_comp_method=Pm_comp_method) - # Initialize simulator and compute solution - simulator = PeriodicSimulator( + # Initialize solver and compute solution + solver = PeriodicSolver( + y0.keys(), lambda t, y: self.derivatives(t, y, drive, Qm, Pm_comp_method), - ivars_to_check=[1, 2], stopfunc=stopfunc) - t, y, stim = simulator(y1, dt, drive.periodicity) + drive.periodicity, dt=dt, primary_vars=['Z', 'ng']) + data = solver(y0, nmax=n) - # Prepend initial conditions (prior to stimulation) - t, y, stim = simulator.prependSolution(t, y, stim, y0=y0) + # Remove velocity timeries from solution + del data['U'] - # Set last stimulation state to zero - stim[-1] = 0 - - # Store output in dataframe and return - return pd.DataFrame({ - 't': t, - 'stimstate': stim, - 'Z': y[:, 1], - 'ng': y[:, 2] - }) + # Return solution dataframe + return data @Model.addMeta @Model.logDesc @Model.checkSimParams def simulate(self, drive, Qm, Pm_comp_method=PmCompMethod.predict): ''' Wrapper around the simUntilConvergence method, with decorators. ''' return self.simCycles(drive, Qm, Pm_comp_method=Pm_comp_method) def desc(self, meta): return f'{self}: simulation @ {meta["drive"].desc}, Q = {si_format(meta["Qm"] * 1e-4, 2)}C/cm2' def getCycleProfiles(self, drive, Qm): ''' Simulate mechanical system and compute pressures over the last acoustic cycle :param drive: acoustic drive object :param Qm: imposed membrane charge density (C/m2) :return: dataframe with the time, kinematic and pressure profiles over the last cycle. ''' # Run default simulation and retrieve last cycle solution logger.info(f'Running mechanical simulation (a = {si_format(self.a, 1)}m, {drive.desc})') data = self.simulate( drive, Qm, Pm_comp_method=PmCompMethod.direct)[0].iloc[-drive.nPerCycle:, :] # Extract relevant variables and de-offset time vector t, Z, ng = [data[key].values for key in ['t', 'Z', 'ng']] dt = (t[-1] - t[0]) / (NPC_DENSE - 1) t -= t[0] # Compute pressure cyclic profiles logger.info('Computing pressure cyclic profiles') R = self.v_curvrad(Z) U = np.diff(Z) / dt U = np.hstack((U, U[-1])) data = { 't': t, 'Z': Z, 'Cm': self.v_capacitance(Z), 'P_M': self.v_PMavg(Z, R, self.surface(Z)), 'P_Q': self.Pelec(Z, Qm), 'P_{VE}': self.PEtot(Z, R) + self.PVleaflet(U, R), 'P_V': self.PVfluid(U, R), 'P_G': self.gasmol2Pa(ng, self.volume(Z)), 'P_0': - np.ones(Z.size) * self.P0 } return pd.DataFrame(data, columns=data.keys()) diff --git a/PySONIC/core/nbls.py b/PySONIC/core/nbls.py index 79874e1..84d7590 100644 --- a/PySONIC/core/nbls.py +++ b/PySONIC/core/nbls.py @@ -1,698 +1,671 @@ # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Email: theo.lemaire@epfl.ch # @Date: 2016-09-29 16:16:19 # @Last Modified by: Theo Lemaire -# @Last Modified time: 2020-04-10 16:07:06 +# @Last Modified time: 2020-04-15 20:20:33 import logging import numpy as np -import pandas as pd -from .simulators import PWSimulator, HybridSimulator +from .solvers import EventDrivenSolver, HybridSolver from .bls import BilayerSonophore from .pneuron import PointNeuron from .model import Model from .drives import Drive, AcousticDrive from .protocols import TimeProtocol, PulsedProtocol from ..utils import * from ..constants import * from ..postpro import getFixedPoints from .lookups import EffectiveVariablesLookup from ..neurons import getPointNeuron NEURONS_LOOKUP_DIR = os.path.abspath(os.path.split(__file__)[0] + "/../lookups/") class NeuronalBilayerSonophore(BilayerSonophore): ''' This class inherits from the BilayerSonophore class and receives an PointNeuron instance at initialization, to define the electro-mechanical NICE model and its SONIC variant. ''' tscale = 'ms' # relevant temporal scale of the model simkey = 'ASTIM' # keyword used to characterize simulations made with this model def __init__(self, a, pneuron, embedding_depth=0.0): ''' Constructor of the class. :param a: in-plane radius of the sonophore structure within the membrane (m) :param pneuron: point-neuron model :param embedding_depth: depth of the embedding tissue around the membrane (m) ''' self.pneuron = pneuron super().__init__(a, pneuron.Cm0, pneuron.Qm0, embedding_depth=embedding_depth) @property def a_str(self): return f'{self.a * 1e9:.1f} nm' def __repr__(self): s = f'{self.__class__.__name__}({self.a_str}, {self.pneuron}' if self.d > 0.: s += f', d={si_format(self.d, precision=1)}m' return f'{s})' def copy(self): return self.__class__(self.a, self.pneuron, embedding_depth=self.d) def __eq__(self, other): if not isinstance(other, self.__class__): return False return self.a == other.a and self.pneuron == other.pneuron and self.d == other.d @property def pneuron(self): return self._pneuron @pneuron.setter def pneuron(self, value): if not isinstance(value, PointNeuron): raise ValueError(f'{value} is not a valid PointNeuron instance') if not hasattr(self, '_pneuron') or value != self._pneuron: self._pneuron = value if hasattr(self, 'a'): super().__init__(self.a, self.pneuron.Cm0, self.pneuron.Qm0, embedding_depth=self.d) @property def meta(self): return { 'neuron': self.pneuron.name, 'a': self.a, 'd': self.d, } @classmethod def initFromMeta(cls, meta): return cls(meta['a'], getPointNeuron(meta['neuron']), embedding_depth=meta['d']) def params(self): return {**super().params(), **self.pneuron.params()} def getPltVars(self, wrapleft='df["', wrapright='"]'): return {**super().getPltVars(wrapleft, wrapright), **self.pneuron.getPltVars(wrapleft, wrapright)} @property def pltScheme(self): return self.pneuron.pltScheme def filecode(self, *args): return Model.filecode(self, *args) @staticmethod def inputs(): # Get parent input vars and supress irrelevant entries inputvars = BilayerSonophore.inputs() del inputvars['Qm'] # Fill in current input vars in appropriate order inputvars.update({ **AcousticDrive.inputs(), 'fs': { 'desc': 'sonophore membrane coverage fraction', 'label': 'f_s', 'unit': '\%', 'factor': 1e2, 'precision': 0 }, 'method': None }) return inputvars def filecodes(self, drive, pp, fs, method, qss_vars): codes = { 'simkey': self.simkey, 'neuron': self.pneuron.name, 'nature': pp.nature, 'a': f'{self.a * 1e9:.0f}nm', **drive.filecodes, **pp.filecodes, } codes['fs'] = f'fs{fs * 1e2:.0f}%' if fs < 1 else None codes['method'] = method codes['qss_vars'] = qss_vars return codes @staticmethod - def interpOnOffVariable(key, Qm, stim, lkp): - ''' Interpolate Q-dependent effective variable along ON and OFF periods of a solution. + def interpEffVariable(key, Qm, stim, lkp): + ''' Interpolate Q-dependent effective variable along various stimulation states of a solution. :param key: lookup variable key :param Qm: charge density solution vector :param stim: stimulation state solution vector - :param lkp: dictionary of lookups for ON and OFF states + :param lkp: 2D lookup object :return: interpolated effective variable vector ''' x = np.zeros(stim.size) - x[stim == 0] = lkp['OFF'].interpVar1D(Qm[stim == 0], key) - x[stim == 1] = lkp['ON'].interpVar1D(Qm[stim == 1], key) + stim_vals = np.unique(stim) + for s in stim_vals: + x[stim == s] = lkp.project('A', s).interpVar1D(Qm[stim == s], key) return x @staticmethod def spatialAverage(fs, x, x0): ''' fs-modulated spatial averaging. ''' return fs * x + (1 - fs) * x0 @timer def computeEffVars(self, drive, Qm, fs): ''' Compute "effective" coefficients of the HH system for a specific acoustic stimulus and charge density. A short mechanical simulation is run while imposing the specific charge density, until periodic stabilization. The HH coefficients are then averaged over the last acoustic cycle to yield "effective" coefficients. :param drive: acoustic drive object :param Qm: imposed charge density (C/m2) :param fs: list of sonophore membrane coverage fractions :return: list with computation time and a list of dictionaries of effective variables ''' # Run simulation and retrieve deflection and gas content vectors from last cycle data = BilayerSonophore.simCycles(self, drive, Qm) Z_last = data.loc[-NPC_DENSE:, 'Z'].values # m Cm_last = self.v_capacitance(Z_last) # F/m2 # For each coverage fraction effvars = [] for x in fs: # Compute membrane capacitance and membrane potential vectors Cm = self.spatialAverage(x, Cm_last, self.Cm0) # F/m2 Vm = Qm / Cm * 1e3 # mV # Compute average cycle value for membrane potential and rate constants effvars.append({**{'V': np.mean(Vm)}, **self.pneuron.getEffRates(Vm)}) # Log process log = f'{self}: lookups @ {drive.desc}, {Qm * 1e5:.2f} nC/cm2' if len(fs) > 1: log += f', fs = {fs.min() * 1e2:.0f} - {fs.max() * 1e2:.0f}%' logger.info(log) # Return effective coefficients return effvars def getLookupFileName(self, a=None, f=None, A=None, fs=False): fname = f'{self.pneuron.name}_lookups' if a is not None: fname += f'_{a * 1e9:.0f}nm' if f is not None: fname += f'_{f * 1e-3:.0f}kHz' if A is not None: fname += f'_{A * 1e-3:.0f}kPa' if fs is True: fname += '_fs' return f'{fname}.pkl' def getLookupFilePath(self, *args, **kwargs): return os.path.join(NEURONS_LOOKUP_DIR, self.getLookupFileName(*args, **kwargs)) def getLookup(self, *args, **kwargs): keep_tcomp = kwargs.pop('keep_tcomp', False) lookup_path = self.getLookupFilePath(*args, **kwargs) lkp = EffectiveVariablesLookup.fromPickle(lookup_path) if not keep_tcomp: del lkp.tables['tcomp'] return lkp def getLookup2D(self, f, fs): proj_kwargs = {'a': self.a, 'f': f, 'fs': fs} if fs < 1.: kwargs = proj_kwargs.copy() kwargs['fs'] = True else: kwargs = {} return self.getLookup(**kwargs).projectN(proj_kwargs) def fullDerivatives(self, t, y, drive, fs): ''' Compute the full system derivatives. :param t: specific instant in time (s) :param y: vector of state variables :param drive: acoustic drive object :param fs: sonophore membrane coverage fraction (-) :return: vector of derivatives ''' dydt_mech = BilayerSonophore.derivatives( self, t, y[:3], drive, y[3]) dydt_elec = self.pneuron.derivatives( t, y[3:], Cm=self.spatialAverage(fs, self.capacitance(y[1]), self.Cm0)) return dydt_mech + dydt_elec def effDerivatives(self, t, y, lkp1d, qss_vars): ''' Compute the derivatives of the n-ODE effective system variables, based on 1-dimensional linear interpolation of "effective" coefficients that summarize the system's behaviour over an acoustic cycle. :param t: specific instant in time (s) :param y: vector of HH system variables at time t :param lkp: dictionary of 1D data points of "effective" coefficients over the charge domain, for specific frequency and amplitude values. :param qss_vars: list of QSS variables :return: vector of effective system derivatives at time t ''' # Unpack values and interpolate lookup at current charge density Qm, *states = y lkp0d = lkp1d.interpolate1D(Qm) # Compute states dictionary from differential and QSS variables states_dict = {} i = 0 for k in self.pneuron.statesNames(): if k in qss_vars: states_dict[k] = self.pneuron.quasiSteadyStates()[k](lkp0d) else: states_dict[k] = states[i] i += 1 # Compute charge density derivative dQmdt = - self.pneuron.iNet(lkp0d['V'], states_dict) * 1e-3 # Compute states derivative vector only for differential variable dstates = [] for k in self.pneuron.statesNames(): if k not in qss_vars: dstates.append(self.pneuron.derEffStates()[k](lkp0d, states_dict)) return [dQmdt, *dstates] + def deflectionDependentVm(self, Qm, Z, fs): + ''' Compute deflection (and sonopphore coverage fraction) dependent voltage profile. ''' + return Qm / self.spatialAverage(fs, self.v_capacitance(Z), self.Cm0) * 1e3 # mV + + def computeInitialConditions(self, *args, **kwargs): + ''' Compute simulation initial conditions. ''' + y0 = super().computeInitialConditions(*args, **kwargs) + y0.update({ + 'Qm': [self.Qm0] * 2, + **{k: [self.pneuron.steadyStates()[k](self.pneuron.Vm0)] * 2 + for k in self.pneuron.statesNames()} + }) + return y0 + def __simFull(self, drive, pp, fs): # Determine time step dt = drive.dt - # Compute initial non-zero deflection - Z = self.computeInitialDeflection(drive, self.Qm0, dt) - - # Set initial conditions - ss0 = self.pneuron.getSteadyStates(self.pneuron.Vm0) - y0 = np.concatenate(([0., 0., self.ng0, self.Qm0], ss0)) - y1 = np.concatenate(([0., Z, self.ng0, self.Qm0], ss0)) - - drive_OFF = drive.copy() - drive_OFF.A = 0 - - # Initialize simulator and compute solution - logger.debug('Computing detailed solution') - simulator = PWSimulator( - lambda t, y: self.fullDerivatives(t, y, drive, fs), - lambda t, y: self.fullDerivatives(t, y, drive_OFF, fs)) - t, y, stim = simulator( - y1, dt, pp, - target_dt=CLASSIC_TARGET_DT, - print_progress=logger.getEffectiveLevel() <= logging.INFO, - monitor_func=None) - # monitor_func=lambda t, y: f't = {t * 1e3:.5f} ms, Qm = {y[3] * 1e5:.2f} nC/cm2') - - # Prepend initial conditions (prior to stimulation) - t, y, stim = simulator.prependSolution(t, y, stim, y0=y0) - - # Store output in dataframe and return - data = pd.DataFrame({ - 't': t, - 'stimstate': stim, - 'Z': y[:, 1], - 'ng': y[:, 2], - 'Qm': y[:, 3] - }) - data['Vm'] = data['Qm'].values / self.spatialAverage( - fs, self.v_capacitance(data['Z'].values), self.Cm0) * 1e3 # mV - for i in range(len(self.pneuron.states)): - data[self.pneuron.statesNames()[i]] = y[:, i + 4] + # Compute initial conditions + y0 = self.computeInitialConditions(drive, self.Qm0, dt) + + # Initialize solver and compute solution + solver = EventDrivenSolver( + y0.keys(), + lambda t, y: self.fullDerivatives(t, y, solver.drive, fs), + lambda x: setattr(solver.drive, 'A', drive.A * x), + event_params={'drive': drive.copy().updatedX(0.)}, + dt=dt) + data = solver( + y0, pp.stimEvents(), pp.ttotal, target_dt=CLASSIC_TARGET_DT, + log_period=pp.ttotal / 100 if logger.getEffectiveLevel() < logging.INFO else None, + logfunc=lambda y: f'Qm = {y[3] * 1e5:.2f} nC/cm2' + ) + + # Remove velocity and add voltage timeseries to solution + del data['U'] + data = addColumn( + data, 'Vm', self.deflectionDependentVm(data['Qm'], data['Z'], fs), preceding_key='Qm') + + # Return solution dataframe return data def __simHybrid(self, drive, pp, fs): # Determine time steps dt_dense, dt_sparse = [drive.dt, drive.dt_sparse] - # Compute initial non-zero deflection - Z = self.computeInitialDeflection(drive, self.Qm0, dt_dense) + # Compute initial conditions + y0 = self.computeInitialConditions(drive, self.Qm0, dt_dense) - # Set initial conditions - ss0 = self.pneuron.getSteadyStates(self.pneuron.Vm0) - y0 = np.concatenate(([0., 0., self.ng0, self.Qm0], ss0)) - y1 = np.concatenate(([0., Z, self.ng0, self.Qm0], ss0)) - - drive_OFF = drive.copy() - drive_OFF.A = 0 - - # Initialize simulator and compute solution - is_dense_var = np.array([True] * 3 + [False] * (len(self.pneuron.states) + 1)) - logger.debug('Computing hybrid solution') - simulator = HybridSimulator( - lambda t, y: self.fullDerivatives(t, y, drive, fs), - lambda t, y: self.fullDerivatives(t, y, drive_OFF, fs), + # Initialize solver and compute solution + dense_vars = ['U', 'Z', 'ng'] + solver = HybridSolver( + y0.keys(), + lambda t, y: self.fullDerivatives(t, y, solver.drive, fs), # dfunc lambda t, y, Cm: self.pneuron.derivatives( - t, y, Cm=self.spatialAverage(fs, Cm, self.Cm0)), - lambda yref: self.capacitance(yref[1]), - is_dense_var, - ivars_to_check=[1, 2]) - t, y, stim = simulator(y1, dt_dense, dt_sparse, drive.periodicity, pp) - - # Prepend initial conditions (prior to stimulation) - t, y, stim = simulator.prependSolution(t, y, stim, y0=y0) - - # Store output in dataframe and return - data = pd.DataFrame({ - 't': t, - 'stimstate': stim, - 'Z': y[:, 1], - 'ng': y[:, 2], - 'Qm': y[:, 3] - }) - data['Vm'] = data['Qm'].values / self.spatialAverage( - fs, self.v_capacitance(data['Z'].values), self.Cm0) * 1e3 # mV - for i in range(len(self.pneuron.states)): - data[self.pneuron.statesNames()[i]] = y[:, i + 4] + t, y, Cm=self.spatialAverage(fs, Cm, self.Cm0)), # dfunc_sparse + lambda yref: self.capacitance(yref[1]), # predfunc + lambda x: setattr(solver.drive, 'A', drive.A * x), # eventfunc + drive.periodicity, dense_vars, dt_dense, dt_sparse, + event_params={'drive': drive.copy().updatedX(0.)}, + primary_vars=['Z', 'ng'] + ) + data = solver( + y0, pp.stimEvents(), pp.ttotal, target_dt=CLASSIC_TARGET_DT, + log_period=pp.ttotal / 100 if logger.getEffectiveLevel() < logging.INFO else None, + logfunc=lambda y: f'Qm = {y[3] * 1e5:.2f} nC/cm2' + ) + + # Remove velocity and add voltage timeseries to solution + del data['U'] + data = addColumn( + data, 'Vm', self.deflectionDependentVm(data['Qm'], data['Z'], fs), preceding_key='Qm') + + # Return solution dataframe return data def __simSonic(self, drive, pp, fs, qss_vars=None, pavg=False): - # Load appropriate 2D lookups - lkp2d = self.getLookup2D(drive.f, fs) - - # Interpolate 2D lookups at zero and US amplitude - logger.debug('Interpolating lookups at A = %.2f kPa and A = 0', drive.A * 1e-3) - lkps1d = {'ON': lkp2d.project('A', drive.A), 'OFF': lkp2d.project('A', 0.)} + # Load appropriate 2D lookup + lkp = self.getLookup2D(drive.f, fs) - # Adapt lookups and pulsing protocol if pulse-average mode is selected + # Adapt lookup and pulsing protocol if pulse-average mode is selected if pavg: - lkps1d['ON'] = lkps1d['ON'] * pp.DC + lkps1d['OFF'] * (1 - pp.DC) + lkp = lkp * pp.DC + lkp.project('A', 0.).tile('A', lkp.refs['A']) * (1 - pp.DC) tstim = (int(pp.tstim * pp.PRF) - 1 + pp.DC) / pp.PRF - toffset = pp.tstim + pp.toffset - tstim - pp = TimeProtocol(tstim, toffset) + pp = TimeProtocol(tstim, pp.tstim + pp.toffset - tstim) - # # Determine QSS and differential variables + # Determine QSS and differential variables, and create optional QSS lookup if qss_vars is None: qss_vars = [] + else: + lkp_QSS = EffectiveVariablesLookup( + lkp.refs, {k: self.pneuron.quasiSteadyStates()[k](lkp) for k in qss_vars}) diff_vars = [item for item in self.pneuron.statesNames() if item not in qss_vars] - # Create 1D lookup of QSS variables with reference charge vector - QSS_1D_lkp = { - key: EffectiveVariablesLookup( - lkps1d['ON'].refs, - {k: self.pneuron.quasiSteadyStates()[k](val) for k in qss_vars}) - for key, val in lkps1d.items()} - # Set initial conditions - sstates = [self.pneuron.steadyStates()[k](self.pneuron.Vm0) for k in diff_vars] - y0 = np.array([self.Qm0, *sstates]) - - # Initialize simulator and compute solution - logger.debug('Computing effective solution') - simulator = PWSimulator( - lambda t, y: self.effDerivatives(t, y, lkps1d['ON'], qss_vars), - lambda t, y: self.effDerivatives(t, y, lkps1d['OFF'], qss_vars)) - t, y, stim = simulator(y0, self.pneuron.chooseTimeStep(), pp) - - # Prepend initial conditions (prior to stimulation) - t, y, stim = simulator.prependSolution(t, y, stim) - - # Store output vectors in dataframe: time, stim state, charge, potential - # and other differential variables - data = pd.DataFrame({ - 't': t, - 'stimstate': stim, - 'Qm': y[:, 0] - }) - data['Vm'] = self.interpOnOffVariable('V', data['Qm'].values, stim, lkps1d) - for key in ['Z', 'ng']: - data[key] = np.full(t.size, np.nan) - for i, k in enumerate(diff_vars): - data[k] = y[:, i + 1] + y0 = { + 'Qm': self.Qm0, + **{k: self.pneuron.steadyStates()[k](self.pneuron.Vm0) for k in diff_vars} + } - # Interpolate QSS variables along charge vector and store them in dataframe + # Initialize solver and compute solution + solver = EventDrivenSolver( + y0.keys(), + lambda t, y: self.effDerivatives(t, y, solver.lkp, qss_vars), + lambda x: setattr(solver, 'lkp', lkp.project('A', drive.A * x)), + dt=self.pneuron.chooseTimeStep()) + data = solver(y0, pp.stimEvents(), pp.ttotal) + + # Interpolate Vm and QSS variables along charge vector and store them in solution dataframe + data = addColumn( + data, 'Vm', self.interpEffVariable('V', data['Qm'], data['stimstate'] * drive.A, lkp), + preceding_key='Qm') for k in qss_vars: - data[k] = self.interpOnOffVariable(k, data['Qm'].values, stim, QSS_1D_lkp) + data[k] = self.interpEffVariable(k, data['Qm'], data['stimstate'] * drive.A, lkp_QSS) + + # Add dummy deflection and gas content vectors to solution + for key in ['Z', 'ng']: + data[key] = np.full(data['t'].size, np.nan) + # Return solution dataframe return data def intMethods(self): ''' Listing of model integration methods. ''' return { - 'full': self.__simFull, - 'hybrid': self.__simHybrid, - 'sonic': self.__simSonic + 'full': self.__simFull, + 'hybrid': self.__simHybrid, + 'sonic': self.__simSonic } @classmethod @Model.checkOutputDir def simQueue(cls, freqs, amps, durations, offsets, PRFs, DCs, fs, methods, qss_vars, **kwargs): ''' Create a serialized 2D array of all parameter combinations for a series of individual parameter sweeps, while avoiding repetition of CW protocols for a given PRF sweep. :param freqs: list (or 1D-array) of US frequencies :param amps: list (or 1D-array) of acoustic amplitudes :param durations: list (or 1D-array) of stimulus durations :param offsets: list (or 1D-array) of stimulus offsets (paired with durations array) :param PRFs: list (or 1D-array) of pulse-repetition frequencies :param DCs: list (or 1D-array) of duty cycle values :param fs: sonophore membrane coverage fractions (-) :params methods: integration methods :param qss_vars: QSS variables :return: list of parameters (list) for each simulation ''' if ('full' in methods or 'hybrid' in methods) and kwargs['outputdir'] is None: logger.warning('Running cumbersome simulation(s) without file saving') if amps is None: amps = [None] drives = AcousticDrive.createQueue(freqs, amps) protocols = PulsedProtocol.createQueue(durations, offsets, PRFs, DCs) queue = [] for drive in drives: for pp in protocols: for cov in fs: for method in methods: queue.append([drive, pp, cov, method, qss_vars]) return queue def checkInputs(self, drive, pp, fs, method, qss_vars): if not isinstance(drive, Drive): raise TypeError(f'Invalid "drive" parameter (must be an "Drive" object)') if not isinstance(pp, PulsedProtocol): raise TypeError('Invalid pulsed protocol (must be "PulsedProtocol" instance)') if not isinstance(fs, float): raise TypeError(f'Invalid "fs" parameter (must be float typed)') if qss_vars is not None: if not isIterable(qss_vars) or not isinstance(qss_vars[0], str): raise ValueError('Invalid QSS variables: must be None or an iterable of strings') sn = self.pneuron.statesNames() for item in qss_vars: if item not in sn: raise ValueError(f'Invalid QSS variable: {item} (must be in {sn}') if method not in list(self.intMethods().keys()): raise ValueError(f'Invalid integration method: "{method}"') @Model.logNSpikes @Model.checkTitrate @Model.addMeta @Model.logDesc @Model.checkSimParams def simulate(self, drive, pp, fs=1., method='sonic', qss_vars=None): ''' Simulate the electro-mechanical model for a specific set of US stimulation parameters, and return output data in a dataframe. :param drive: acoustic drive object :param pp: pulse protocol object :param fs: sonophore membrane coverage fraction (-) :param method: selected integration method :return: output dataframe ''' # Set the tissue elastic modulus self.setTissueModulus(drive) # Call appropriate simulation function and return simfunc = self.intMethods()[method] simargs = [drive, pp, fs] if method == 'sonic': simargs.append(qss_vars) return simfunc(*simargs) def desc(self, meta): method = meta['method'] if 'method' in meta else meta['model']['method'] fs = meta['fs'] if 'fs' in meta else meta['model']['fs'] s = f'{self}: {method} simulation @ {meta["drive"].desc}, {meta["pp"].desc}' if fs < 1.0: s += f', fs = {(fs * 1e2):.2f}%' if 'qss_vars' in meta and meta['qss_vars'] is not None: s += f" - QSS ({', '.join(meta['qss_vars'])})" return s @staticmethod def getNSpikes(data): return PointNeuron.getNSpikes(data) def getArange(self, drive): return (0., self.getLookup().refs['A'].max()) @property def titrationFunc(self): return self.pneuron.titrationFunc @logCache(os.path.join(os.path.split(__file__)[0], 'astim_titrations.log')) def titrate(self, drive, pp, fs=1., method='sonic', qss_vars=None, xfunc=None, Arange=None): ''' Use a binary search to determine the threshold amplitude needed to obtain neural excitation for a given frequency and pulsed protocol. :param drive: unresolved acoustic drive object :param pp: pulse protocol object :param fs: sonophore membrane coverage fraction (-) :param method: integration method :return: determined threshold amplitude (Pa) ''' return super().titrate(drive, pp, fs=fs, method=method, qss_vars=qss_vars, xfunc=xfunc, Arange=Arange) def getQuasiSteadyStates(self, f, amps=None, charges=None, DC=1.0, squeeze_output=False): ''' Compute the quasi-steady state values of the neuron's gating variables for a combination of US amplitudes, charge densities, at a specific US frequency and duty cycle. :param f: US frequency (Hz) :param amps: US amplitudes (Pa) :param charges: membrane charge densities (C/m2) :param DC: duty cycle :return: 4-tuple with reference values of US amplitude and charge density, as well as interpolated Vmeff and QSS gating variables ''' # Get DC-averaged lookups interpolated at the appropriate amplitudes and charges lkp = self.getLookup().projectDC(amps=amps, DC=DC).projectN({'a': self.a, 'f': f}) if charges is not None: lkp = lkp.project('Q', charges) # Specify dimensions with A as the first axis A_axis = lkp.getAxisIndex('A') lkp.move('A', 0) nA = lkp.dims()[0] # Compute QSS states using these lookups QSS = EffectiveVariablesLookup( lkp.refs, {k: v(lkp) for k, v in self.pneuron.quasiSteadyStates().items()}) # Compress outputs if needed if squeeze_output: QSS = QSS.squeeze() lkp = lkp.squeeze() return lkp, QSS def iNetQSS(self, Qm, f, A, DC): ''' Compute quasi-steady state net membrane current for a given combination of US parameters and a given membrane charge density. :param Qm: membrane charge density (C/m2) :param f: US frequency (Hz) :param A: US amplitude (Pa) :param DC: duty cycle (-) :return: net membrane current (mA/m2) ''' lkp, QSS = self.getQuasiSteadyStates( f, amps=A, charges=Qm, DC=DC, squeeze_output=True) return self.pneuron.iNet(lkp['V'], QSS) # mA/m2 def fixedPointsQSS(self, f, A, DC, lkp, dQdt): ''' Compute QSS fixed points along the charge dimension for a given combination of US parameters, and determine their stability. :param f: US frequency (Hz) :param A: US amplitude (Pa) :param DC: duty cycle (-) :param lkp: lookup dictionary for effective variables along charge dimension :param dQdt: charge derivative profile along charge dimension :return: 2-tuple with values of stable and unstable fixed points ''' pltvars = self.getPltVars() logger.debug(f'A = {A * 1e-3:.2f} kPa, DC = {DC * 1e2:.0f}%') # Extract fixed points from QSS charge variation profile def dfunc(Qm): return - self.iNetQSS(Qm, f, A, DC) fixed_points = getFixedPoints( lkp.refs['Q'], dQdt, filter='both', der_func=dfunc).tolist() dfunc = lambda x: np.array(self.effDerivatives(_, x, lkp)) # classified_fixed_points = {'stable': [], 'unstable': [], 'saddle': []} classified_fixed_points = [] np.set_printoptions(precision=2) # For each fixed point for i, Qm in enumerate(fixed_points): # Re-compute QSS at fixed point *_, QSS = self.getQuasiSteadyStates(f, amps=A, charges=Qm, DC=DC, squeeze_output=True) # Classify fixed point stability by numerically evaluating its Jacobian and # computing its eigenvalues x = np.array([Qm, *QSS.tables.values()]) eigvals, key = classifyFixedPoint(x, dfunc) # classified_fixed_points[key].append(Qm) classified_fixed_points.append((x, eigvals, key)) # eigenvalues.append(eigvals) logger.debug(f'{key} point @ Q = {(Qm * 1e5):.1f} nC/cm2') # eigenvalues = np.array(eigenvalues).T # print(eigenvalues.shape) return classified_fixed_points def isStableQSS(self, f, A, DC): lookups, QSS = self.getQuasiSteadyStates( f, amps=A, DCs=DC, squeeze_output=True) dQdt = -self.pneuron.iNet(lookups['V'], QSS.tables) # mA/m2 classified_fixed_points = self.fixedPointsQSS(f, A, DC, lookups, dQdt) return len(classified_fixed_points['stable']) > 0 class DrivenNeuronalBilayerSonophore(NeuronalBilayerSonophore): simkey = 'DASTIM' # keyword used to characterize simulations made with this model def __init__(self, Idrive, *args, **kwargs): self.Idrive = Idrive super().__init__(*args, **kwargs) def __repr__(self): return super().__repr__()[:-1] + f', Idrive = {self.Idrive:.2f} mA/m2)' @classmethod def initFromMeta(cls, meta): return cls(meta['Idrive'], meta['a'], getPointNeuron(meta['neuron']), embedding_depth=meta['d']) def params(self): return {**{'Idrive': self.Idrive}, **super().params()} @staticmethod def inputs(): inputvars = NeuronalBilayerSonophore.inputs() inputvars['Idrive'] = { 'desc': 'driving current density', 'label': 'I_{drive}', 'unit': 'mA/m2', 'factor': 1e0, 'precision': 0 } return inputvars def filecodes(self, *args): codes = super().filecodes(*args) codes['Idrive'] = f'Idrive{self.Idrive:.1f}mAm2' return codes def fullDerivatives(self, *args): dydt = super().fullDerivatives(*args) dydt[3] += self.Idrive * 1e-3 return dydt def effDerivatives(self, *args): dQmdt, *dstates = super().effDerivatives(*args) dQmdt += self.Idrive * 1e-3 return [dQmdt, *dstates] def meta(self, drive, pp, fs, method, qss_vars): d = super().meta(drive, pp, fs, method, qss_vars) d['Idrive'] = self.Idrive return d \ No newline at end of file diff --git a/PySONIC/core/pneuron.py b/PySONIC/core/pneuron.py index ab36a30..220f529 100644 --- a/PySONIC/core/pneuron.py +++ b/PySONIC/core/pneuron.py @@ -1,552 +1,545 @@ # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Email: theo.lemaire@epfl.ch # @Date: 2017-08-03 11:53:04 # @Last Modified by: Theo Lemaire -# @Last Modified time: 2020-04-13 15:54:42 +# @Last Modified time: 2020-04-15 12:46:23 import abc import inspect import numpy as np -import pandas as pd from .protocols import PulsedProtocol from .model import Model from .lookups import EffectiveVariablesLookup -from .simulators import PWSimulator +from .solvers import EventDrivenSolver from .drives import Drive, ElectricDrive from ..postpro import detectSpikes, computeFRProfile from ..constants import * from ..utils import * class PointNeuron(Model): ''' Generic point-neuron model interface. ''' tscale = 'ms' # relevant temporal scale of the model simkey = 'ESTIM' # keyword used to characterize simulations made with this model def __repr__(self): return self.__class__.__name__ def copy(self): return self.__class__() def __eq__(self, other): if not isinstance(other, PointNeuron): return False return self.name == other.name @property @classmethod @abc.abstractmethod def name(cls): ''' Neuron name. ''' raise NotImplementedError @property @classmethod @abc.abstractmethod def Cm0(cls): ''' Neuron's resting capacitance (F/m2). ''' raise NotImplementedError @property @classmethod @abc.abstractmethod def Vm0(cls): ''' Neuron's resting membrane potential(mV). ''' raise NotImplementedError @property def Qm0(self): return self.Cm0 * self.Vm0 * 1e-3 # C/m2 @property def meta(self): return {'neuron': self.name} @staticmethod def inputs(): return ElectricDrive.inputs() @classmethod def filecodes(cls, drive, pp): return { 'simkey': cls.simkey, 'neuron': cls.name, 'nature': pp.nature, **drive.filecodes, **pp.filecodes } @classmethod def normalizedQm(cls, Qm): ''' Compute membrane charge density normalized by resting capacitance. :param Qm: membrane charge density (Q/m2) :return: normalized charge density (mV) ''' return Qm / cls.Cm0 * 1e3 @classmethod def getPltVars(cls, wrapleft='df["', wrapright='"]'): pltvars = { 'Qm': { 'desc': 'membrane charge density', 'label': 'Q_m', 'unit': 'nC/cm^2', 'factor': 1e5, 'bounds': ((cls.Vm0 - 20.0) * cls.Cm0 * 1e2, 60) }, 'Qm/Cm0': { 'desc': 'membrane charge density over resting capacitance', 'label': 'Q_m / C_{m0}', 'unit': 'mV', 'bounds': (-150, 70), 'func': f"normalizedQm({wrapleft}Qm{wrapright})" }, 'Vm': { 'desc': 'membrane potential', 'label': 'V_m', 'unit': 'mV', 'bounds': (-150, 70) }, 'ELeak': { 'constant': 'obj.ELeak', 'desc': 'non-specific leakage current resting potential', 'label': 'V_{leak}', 'unit': 'mV', 'ls': '--', 'color': 'k' } } for cname in cls.getCurrentsNames(): cfunc = getattr(cls, cname) cargs = inspect.getargspec(cfunc)[0][1:] pltvars[cname] = { 'desc': inspect.getdoc(cfunc).splitlines()[0], 'label': f'I_{{{cname[1:]}}}', 'unit': 'A/m^2', 'factor': 1e-3, 'func': f"{cname}({', '.join([f'{wrapleft}{a}{wrapright}' for a in cargs])})" } for var in cargs: if var != 'Vm': pltvars[var] = { 'desc': cls.states[var], 'label': var, 'bounds': (-0.1, 1.1) } pltvars['iNet'] = { 'desc': inspect.getdoc(getattr(cls, 'iNet')).splitlines()[0], 'label': 'I_{net}', 'unit': 'A/m^2', 'factor': 1e-3, 'func': f'iNet({wrapleft}Vm{wrapright}, {wrapleft[:-1]}{cls.statesNames()}{wrapright[1:]})', 'ls': '--', 'color': 'black' } pltvars['dQdt'] = { 'desc': inspect.getdoc(getattr(cls, 'dQdt')).splitlines()[0], 'label': 'dQ_m/dt', 'unit': 'A/m^2', 'factor': 1e-3, 'func': f'dQdt({wrapleft}Vm{wrapright}, {wrapleft[:-1]}{cls.statesNames()}{wrapright[1:]})', 'ls': '--', 'color': 'black' } pltvars['iCap'] = { 'desc': inspect.getdoc(getattr(cls, 'iCap')).splitlines()[0], 'label': 'I_{cap}', 'unit': 'A/m^2', 'factor': 1e-3, 'func': f'iCap({wrapleft}t{wrapright}, {wrapleft}Vm{wrapright})' } for rate in cls.rates: if 'alpha' in rate: prefix, suffix = 'alpha', rate[5:] else: prefix, suffix = 'beta', rate[4:] pltvars[rate] = { 'label': '\\{}_{{{}}}'.format(prefix, suffix), 'unit': 'ms^{-1}', 'factor': 1e-3 } pltvars['FR'] = { 'desc': 'riring rate', 'label': 'FR', 'unit': 'Hz', 'factor': 1e0, # 'bounds': (0, 1e3), 'func': f'firingRateProfile({wrapleft[:-2]})' } return pltvars @classmethod def iCap(cls, t, Vm): ''' Capacitive current. ''' dVdt = np.insert(np.diff(Vm) / np.diff(t), 0, 0.) return cls.Cm0 * dVdt @property def pltScheme(self): pltscheme = { 'Q_m': ['Qm'], 'V_m': ['Vm'] } pltscheme['I'] = self.getCurrentsNames() + ['iNet'] for cname in self.getCurrentsNames(): if 'Leak' not in cname: key = f'i_{{{cname[1:]}}}\ kin.' cargs = inspect.getargspec(getattr(self, cname))[0][1:] pltscheme[key] = [var for var in cargs if var not in ['Vm', 'Cai']] return pltscheme @classmethod def statesNames(cls): ''' Return a list of names of all state variables of the model. ''' return list(cls.states.keys()) @classmethod @abc.abstractmethod def derStates(cls): ''' Dictionary of states derivatives functions ''' raise NotImplementedError @classmethod def getDerStates(cls, Vm, states): ''' Compute states derivatives array given a membrane potential and states dictionary ''' return np.array([cls.derStates()[k](Vm, states) for k in cls.statesNames()]) @classmethod @abc.abstractmethod def steadyStates(cls): ''' Return a dictionary of steady-states functions ''' raise NotImplementedError @classmethod def getSteadyStates(cls, Vm): ''' Compute array of steady-states for a given membrane potential ''' return np.array([cls.steadyStates()[k](Vm) for k in cls.statesNames()]) @classmethod def getDerEffStates(cls, lkp, states): ''' Compute effective states derivatives array given lookups and states dictionaries. ''' return np.array([ cls.derEffStates()[k](lkp, states) for k in cls.statesNames()]) @classmethod def getEffRates(cls, Vm): ''' Compute array of effective rate constants for a given membrane potential vector. ''' return {k: np.mean(np.vectorize(v)(Vm)) for k, v in cls.effRates().items()} def getLookup(self): ''' Get lookup of membrane potential rate constants interpolated along the neuron's charge physiological range. ''' Qmin, Qmax = expandRange(*self.Qbounds, exp_factor=10.) Qref = np.arange(Qmin, Qmax, 1e-5) # C/m2 Vref = Qref / self.Cm0 * 1e3 # mV tables = {k: np.vectorize(v)(Vref) for k, v in self.effRates().items()} return EffectiveVariablesLookup({'Q': Qref}, {'V': Vref, **tables}) @classmethod @abc.abstractmethod def currents(cls): ''' Dictionary of ionic currents functions (returning current densities in mA/m2) ''' @classmethod def iNet(cls, Vm, states): ''' net membrane current :param Vm: membrane potential (mV) :states: states of ion channels gating and related variables :return: current per unit area (mA/m2) ''' return sum([cfunc(Vm, states) for cfunc in cls.currents().values()]) @classmethod def dQdt(cls, Vm, states): ''' membrane charge density variation rate :param Vm: membrane potential (mV) :states: states of ion channels gating and related variables :return: variation rate (mA/m2) ''' return -cls.iNet(Vm, states) @classmethod def titrationFunc(cls, *args, **kwargs): ''' Default titration function. ''' return cls.isExcited(*args, **kwargs) @staticmethod def currentToConcentrationRate(z_ion, depth): ''' Compute the conversion factor from a specific ionic current (in mA/m2) into a variation rate of submembrane ion concentration (in M/s). :param: z_ion: ion valence :param depth: submembrane depth (m) :return: conversion factor (Mmol.m-1.C-1) ''' return 1e-6 / (z_ion * depth * FARADAY) @staticmethod def nernst(z_ion, Cion_in, Cion_out, T): ''' Nernst potential of a specific ion given its intra and extracellular concentrations. :param z_ion: ion valence :param Cion_in: intracellular ion concentration :param Cion_out: extracellular ion concentration :param T: temperature (K) :return: ion Nernst potential (mV) ''' return (Rg * T) / (z_ion * FARADAY) * np.log(Cion_out / Cion_in) * 1e3 @staticmethod def vtrap(x, y): ''' Generic function used to compute rate constants. ''' return x / (np.exp(x / y) - 1) @staticmethod def efun(x): ''' Generic function used to compute rate constants. ''' return x / (np.exp(x) - 1) @classmethod def ghkDrive(cls, Vm, Z_ion, Cion_in, Cion_out, T): ''' Use the Goldman-Hodgkin-Katz equation to compute the electrochemical driving force of a specific ion species for a given membrane potential. :param Vm: membrane potential (mV) :param Cin: intracellular ion concentration (M) :param Cout: extracellular ion concentration (M) :param T: temperature (K) :return: electrochemical driving force of a single ion particle (mC.m-3) ''' x = Z_ion * FARADAY * Vm / (Rg * T) * 1e-3 # [-] eCin = Cion_in * cls.efun(-x) # M eCout = Cion_out * cls.efun(x) # M return FARADAY * (eCin - eCout) * 1e6 # mC/m3 @classmethod def xBG(cls, Vref, Vm): ''' Compute dimensionless Borg-Graham ratio for a given voltage. :param Vref: reference voltage membrane (mV) :param Vm: membrane potential (mV) :return: dimensionless ratio ''' return (Vm - Vref) * FARADAY / (Rg * cls.T) * 1e-3 # [-] @classmethod def alphaBG(cls, alpha0, zeta, gamma, Vref, Vm): ''' Compute the activation rate constant for a given voltage and temperature, using a Borg-Graham formalism. :param alpha0: pre-exponential multiplying factor :param zeta: effective valence of the gating particle :param gamma: normalized position of the transition state within the membrane :param Vref: membrane voltage at which alpha = alpha0 (mV) :param Vm: membrane potential (mV) :return: rate constant (in alpha0 units) ''' return alpha0 * np.exp(-zeta * gamma * cls.xBG(Vref, Vm)) @classmethod def betaBG(cls, beta0, zeta, gamma, Vref, Vm): ''' Compute the inactivation rate constant for a given voltage and temperature, using a Borg-Graham formalism. :param beta0: pre-exponential multiplying factor :param zeta: effective valence of the gating particle :param gamma: normalized position of the transition state within the membrane :param Vref: membrane voltage at which beta = beta0 (mV) :param Vm: membrane potential (mV) :return: rate constant (in beta0 units) ''' return beta0 * np.exp(zeta * (1 - gamma) * cls.xBG(Vref, Vm)) @classmethod def getCurrentsNames(cls): return list(cls.currents().keys()) @staticmethod def firingRateProfile(*args, **kwargs): return computeFRProfile(*args, **kwargs) @property def Qbounds(self): ''' Determine bounds of membrane charge physiological range for a given neuron. ''' return np.array([np.round(self.Vm0 - 25.0), 50.0]) * self.Cm0 * 1e-3 # C/m2 @classmethod def isVoltageGated(cls, state): ''' Determine whether a given state is purely voltage-gated or not.''' return f'alpha{state.lower()}' in cls.rates @classmethod @Model.checkOutputDir def simQueue(cls, amps, durations, offsets, PRFs, DCs, **kwargs): ''' Create a serialized 2D array of all parameter combinations for a series of individual parameter sweeps. :param amps: list (or 1D-array) of acoustic amplitudes :param durations: list (or 1D-array) of stimulus durations :param offsets: list (or 1D-array) of stimulus offsets (paired with durations array) :param PRFs: list (or 1D-array) of pulse-repetition frequencies :param DCs: list (or 1D-array) of duty cycle values :return: list of parameters (list) for each simulation ''' if amps is None: amps = [None] drives = ElectricDrive.createQueue(amps) protocols = PulsedProtocol.createQueue(durations, offsets, PRFs, DCs) queue = [] for drive in drives: for pp in protocols: queue.append([drive, pp]) return queue @staticmethod def checkInputs(drive, pp): ''' Check validity of electrical stimulation parameters. :param drive: electric drive object :param pp: pulse protocol object ''' if not isinstance(drive, Drive): raise TypeError(f'Invalid "drive" parameter (must be an "Drive" object)') if not isinstance(pp, PulsedProtocol): raise TypeError('Invalid pulsed protocol (must be "PulsedProtocol" instance)') def chooseTimeStep(self): ''' Determine integration time step based on intrinsic temporal properties. ''' return DT_EFFECTIVE @classmethod def derivatives(cls, t, y, Cm=None, Iinj=0.): ''' Compute system derivatives for a given membrane capacitance and injected current. :param t: specific instant in time (s) :param y: vector of HH system variables at time t :param Cm: membrane capacitance (F/m2) :param Iinj: injected current (mA/m2) :return: vector of system derivatives at time t ''' if Cm is None: Cm = cls.Cm0 Qm, *states = y Vm = Qm / Cm * 1e3 # mV states_dict = dict(zip(cls.statesNames(), states)) dQmdt = (Iinj - cls.iNet(Vm, states_dict)) * 1e-3 # A/m2 return [dQmdt, *cls.getDerStates(Vm, states_dict)] @Model.logNSpikes @Model.checkTitrate @Model.addMeta @Model.logDesc @Model.checkSimParams def simulate(self, drive, pp): ''' Simulate a specific neuron model for a set of simulation parameters, and return output data in a dataframe. :param drive: electric drive object :param pp: pulse protocol object - :return: output dataframe + :return: output DataFrame ''' - # Set initial conditions - y0 = np.array((self.Qm0, *self.getSteadyStates(self.Vm0))) - - # Initialize simulator and compute solution - logger.debug('Computing solution') - simulator = PWSimulator( - lambda t, y: self.derivatives(t, y, Iinj=drive.I), - lambda t, y: self.derivatives(t, y, Iinj=0.)) - t, y, stim = simulator( - y0, self.chooseTimeStep(), pp) - - # Prepend initial conditions (prior to stimulation) - t, y, stim = simulator.prependSolution(t, y, stim) - - # Store output in dataframe and return - data = pd.DataFrame({ - 't': t, - 'stimstate': stim, - 'Qm': y[:, 0], - 'Vm': y[:, 0] / self.Cm0 * 1e3, - }) - for i in range(len(self.states)): - data[self.statesNames()[i]] = y[:, i + 1] + y0 = { + 'Qm': self.Qm0, + **{k: self.steadyStates()[k](self.Vm0) for k in self.statesNames()} + } + + # Initialize solver and compute solution + solver = EventDrivenSolver( + y0.keys(), + lambda t, y: self.derivatives(t, y, Iinj=solver.A), + lambda x: setattr(solver, 'A', drive.I * x), + dt=self.chooseTimeStep()) + data = solver(y0, pp.stimEvents(), pp.ttotal) + + # Add Vm timeries to solution + data = addColumn(data, 'Vm', data['Qm'].values / self.Cm0 * 1e3, preceding_key='Qm') + + # Return solution dataframe return data def desc(self, meta): return f'{self}: simulation @ {meta["drive"].desc}, {meta["pp"].desc}' @staticmethod def getNSpikes(data): ''' Compute number of spikes in charge profile of simulation output. :param data: dataframe containing output time series :return: number of detected spikes ''' return detectSpikes(data)[0].size @staticmethod def getStabilizationValue(data): ''' Determine stabilization value from the charge profile of a simulation output. :param data: dataframe containing output time series :return: charge stabilization value (or np.nan if no stabilization detected) ''' # Extract charge signal posterior to observation window t, Qm = [data[key].values for key in ['t', 'Qm']] if t.max() <= TMIN_STABILIZATION: raise ValueError('solution length is too short to assess stabilization') Qm = Qm[t > TMIN_STABILIZATION] # Compute variation range Qm_range = np.ptp(Qm) logger.debug('%.2f nC/cm2 variation range over the last %.0f ms, Qmf = %.2f nC/cm2', Qm_range * 1e5, TMIN_STABILIZATION * 1e3, Qm[-1] * 1e5) # Return final value only if stabilization is detected if np.ptp(Qm) < QSS_Q_DIV_THR: return Qm[-1] else: return np.nan @classmethod def isExcited(cls, data): ''' Determine if neuron is excited from simulation output. :param data: dataframe containing output time series :return: boolean stating whether neuron is excited or not ''' return cls.getNSpikes(data) > 0 @classmethod def isSilenced(cls, data): ''' Determine if neuron is silenced from simulation output. :param data: dataframe containing output time series :return: boolean stating whether neuron is silenced or not ''' return not np.isnan(cls.getStabilizationValue(data)) def getArange(self, drive): return drive.xvar_range diff --git a/PySONIC/core/protocols.py b/PySONIC/core/protocols.py index 03f501f..ac5baad 100644 --- a/PySONIC/core/protocols.py +++ b/PySONIC/core/protocols.py @@ -1,241 +1,241 @@ # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Email: theo.lemaire@epfl.ch # @Date: 2019-11-12 18:04:45 # @Last Modified by: Theo Lemaire -# @Last Modified time: 2020-04-05 17:52:08 +# @Last Modified time: 2020-04-15 20:43:24 import numpy as np from ..utils import si_format, StimObject from .batches import Batch class TimeProtocol(StimObject): def __init__(self, tstim, toffset): ''' Class constructor. :param tstim: pulse duration (s) :param toffset: offset duration (s) ''' self.tstim = tstim self.toffset = toffset @property def tstim(self): return self._tstim @tstim.setter def tstim(self, value): value = self.checkFloat('tstim', value) self.checkPositiveOrNull('tstim', value) self._tstim = value @property def toffset(self): return self._toffset @toffset.setter def toffset(self, value): value = self.checkFloat('toffset', value) self.checkPositiveOrNull('toffset', value) self._toffset = value @property def ttotal(self): return self.tstim + self.toffset def __eq__(self, other): if not isinstance(other, self.__class__): return False return self.tstim == other.tstim and self.toffset == other.toffset def __repr__(self): params = [f'{si_format(x, 1, space="")}s' for x in [self.tstim, self.toffset]] return f'{self.__class__.__name__}({", ".join(params)})' @property def desc(self): return f'{si_format(self.tstim, 1)}s stim, {si_format(self.toffset, 1)}s offset' @property def filecodes(self): return {'tstim': f'{(self.tstim * 1e3):.0f}ms', 'toffset': None} @staticmethod def inputs(): return { 'tstim': { 'desc': 'stimulus duration', 'label': 't_{stim}', 'unit': 'ms', 'factor': 1e3, 'precision': 0 }, 'toffset': { 'desc': 'offset duration', 'label': 't_{offset}', 'unit': 'ms', 'factor': 1e3, 'precision': 0 } } @classmethod def createQueue(cls, durations, offsets): ''' Create a serialized 2D array of all parameter combinations for a series of individual parameter sweeps. :param durations: list (or 1D-array) of stimulus durations :param offsets: list (or 1D-array) of stimulus offsets (paired with durations array) :return: list of parameters (list) for each simulation ''' return [cls(*item) for item in Batch.createQueue(durations, offsets)] def tOFFON(self): ''' Return vector of times of OFF-ON transitions (in s). ''' return np.array([0.]) def tONOFF(self): ''' Return vector of times of ON-OFF transitions (in s). ''' return np.array([self.tstim]) def stimEvents(self): ''' Return time-value pairs for each transition in stimulation state. ''' t_on_off = self.tONOFF() t_off_on = self.tOFFON() pairs = list(zip(t_off_on, [1] * len(t_off_on))) + list(zip(t_on_off, [0] * len(t_on_off))) return sorted(pairs, key=lambda x: x[0]) class PulsedProtocol(TimeProtocol): def __init__(self, tstim, toffset, PRF=100., DC=1.): ''' Class constructor. :param tstim: pulse duration (s) :param toffset: offset duration (s) :param PRF: pulse repetition frequency (Hz) :param DC: pulse duty cycle (-) ''' super().__init__(tstim, toffset) self.DC = DC self.PRF = PRF @property def DC(self): return self._DC @DC.setter def DC(self, value): value = self.checkFloat('DC', value) self.checkBounded('DC', value, (0., 1.)) self._DC = value @property def PRF(self): return self._PRF @PRF.setter def PRF(self, value): value = self.checkFloat('PRF', value) self.checkPositiveOrNull('PRF', value) if self.DC < 1.: self.checkBounded('PRF', value, (1 / self.tstim, np.inf)) self._PRF = value def __eq__(self, other): if not isinstance(other, self.__class__): return False return super().__eq__(other) and self.PRF == other.PRF and self.DC == other.DC def __repr__(self): params = [f'{si_format(self.PRF, 1, space="")}Hz', f'{self.DC:.2f}'] return f'{super().__repr__()[:-1]}, {", ".join(params)})' @property def T_ON(self): return self.DC / self.PRF @property def T_OFF(self): return (1 - self.DC) / self.PRF @property def npulses(self): return int(np.round(self.tstim * self.PRF)) @property def desc(self): s = super().desc if self.DC < 1: s += f', {si_format(self.PRF, 2)}Hz PRF, {(self.DC * 1e2):.1f}% DC' return s @property def isCW(self): return self.DC == 1. @property def nature(self): return 'CW' if self.isCW else 'PW' @property def filecodes(self): if self.isCW: d = {'PRF': None, 'DC': None} else: d = {'PRF': f'PRF{self.PRF:.2f}Hz', 'DC': f'DC{self.DC * 1e2:04.1f}%'} return {**super().filecodes, **d} @staticmethod def inputs(): d = { 'PRF': { 'desc': 'pulse repetition frequency', 'label': 'PRF', 'unit': 'Hz', 'factor': 1e0, 'precision': 0 }, 'DC': { 'desc': 'duty cycle', 'label': 'DC', 'unit': '%', 'factor': 1e2, 'precision': 2 } } return {**TimeProtocol.inputs(), **d} @classmethod def createQueue(cls, durations, offsets, PRFs, DCs): ''' Create a serialized 2D array of all parameter combinations for a series of individual parameter sweeps, while avoiding repetition of CW protocols for a given PRF sweep. :param durations: list (or 1D-array) of stimulus durations :param offsets: list (or 1D-array) of stimulus offsets (paired with durations array) :param PRFs: list (or 1D-array) of pulse-repetition frequencies :param DCs: list (or 1D-array) of duty cycle values :return: list of parameters (list) for each simulation ''' DCs = np.array(DCs) queue = [] if 1.0 in DCs: queue += Batch.createQueue(durations, offsets, min(PRFs), 1.0) if np.any(DCs != 1.0): queue += Batch.createQueue(durations, offsets, PRFs, DCs[DCs != 1.0]) queue = [cls(*item) for item in queue] return queue def tOFFON(self): if self.DC == 1: return super().tOFFON() else: return np.arange(self.npulses) / self.PRF def tONOFF(self): if self.DC == 1: return super().tONOFF() else: return self.tOFFON() + self.DC / self.PRF diff --git a/PySONIC/core/simulators.py b/PySONIC/core/simulators.py deleted file mode 100644 index f7c2f79..0000000 --- a/PySONIC/core/simulators.py +++ /dev/null @@ -1,552 +0,0 @@ -# -*- coding: utf-8 -*- -# @Author: Theo Lemaire -# @Email: theo.lemaire@epfl.ch -# @Date: 2019-05-28 14:45:12 -# @Last Modified by: Theo Lemaire -# @Last Modified time: 2020-02-03 16:55:09 - -import abc -import numpy as np -from scipy.integrate import ode, odeint, solve_ivp -from tqdm import tqdm - -from ..utils import * -from ..constants import * - - -class Simulator(metaclass=abc.ABCMeta): - ''' Generic interface to simulator object. ''' - - @staticmethod - def initialize(y0): - ''' Initialize global arrays. - - :param y0: vector of initial conditions - :return: 3-tuple with the initialized time vector, solution matrix and state vector - ''' - t = np.array([0.]) - y = np.atleast_2d(y0) - stim = np.array([1]) - return t, y, stim - - @staticmethod - def appendSolution(t, y, stim, tnew, ynew, is_on): - ''' Append to time vector, solution matrix and state vector. - - :param t: preceding time vector - :param y: preceding solution matrix - :param stim: preceding stimulation state vector - :param tnew: integration time vector for current interval - :param ynew: derivative function for current interval - :param is_on: stimulation state for current interval - :return: 3-tuple with the appended time vector, solution matrix and state vector - ''' - t = np.concatenate((t, tnew[1:])) - y = np.concatenate((y, ynew[1:]), axis=0) - stim = np.concatenate((stim, np.ones(tnew.size - 1) * is_on)) - return t, y, stim - - @staticmethod - def prependSolution(t, y, stim, y0=None): - ''' Prepend initial conditions (prior to stimulation) to solution vectors. ''' - if y0 is None: - y0 = y[0, :] - return np.insert(t, 0, 0.), np.vstack((y0, y)), np.insert(stim, 0, 0) - - @classmethod - def integrate(cls, t, y, stim, tnew, dfunc, is_on, use_adaptive_dt=False): - ''' Integrate system for a time interval and append to preceding solution arrays. - - :param t: preceding time vector - :param y: preceding solution matrix - :param stim: preceding stimulation state vector - :param tnew: integration time vector for current interval - :param dfunc: derivative function for current interval - :param is_on: stimulation state for current interval - :return: 3-tuple with the appended time vector, solution matrix and state vector - ''' - if tnew.size == 0: - return t, y, stim - if use_adaptive_dt: - ynew = solve_ivp(dfunc, [tnew[0], tnew[-1]], y[-1], t_eval=tnew, method='LSODA').y.T - else: - ynew = odeint(dfunc, y[-1], tnew, tfirst=True) - return cls.appendSolution(t, y, stim, tnew, ynew, is_on) - - @staticmethod - def downsample(t, y, stim, target_dt): - ''' Resample a solution to a new target time step. - - :param t: time vector - :param y: solution matrix - :param stim: stimulation state vector - :target_dt: target time step after resampling - :return: 3-tuple with the downsampled time vector, solution matrix and state vector - ''' - dt = t[2] - t[1] - if dt >= target_dt: - logger.warning(f'Hyper-sampling not supported -> keeping Fs = {si_format(1 / dt, 2)}Hz') - return t, y, stim - - rf = int(np.round(target_dt / dt)) - logger.debug(f'Downsampling output arrays by factor {rf} (Fs = {si_format(1 / (dt * rf), 2)}Hz)') - return t[::rf], y[::rf, :], stim[::rf] - - @abc.abstractmethod - def compute(self): - ''' Abstract compute method. ''' - return 'Should never reach here' - - def __call__(self, *args, **kwargs): - ''' Call and return compute method ''' - return self.compute(*args, **kwargs) - - -class PeriodicSimulator(Simulator): - - def __init__(self, dfunc, stopfunc=None, ivars_to_check=None): - ''' Initialize simulator with specific derivative and stop functions - - :param dfunc: derivative function - :param stopfunc: function estimating stopping criterion - :param ivars_to_check: solution indexes of variables to check for stability - ''' - self.dfunc = dfunc - self.ivars_to_check = ivars_to_check - if stopfunc is not None: - self.stopfunc = stopfunc - else: - self.stopfunc = self.isPeriodicallyStable - - @staticmethod - def getNPerCycle(dt, T): - ''' Compute number of samples per cycle given a time step and a specific periodicity. - - :param dt: integration time step (s) - :param T: periodicity (s) - :return: number of samples per cycle - ''' - return int(np.round(T / dt)) + 1 - - @classmethod - def getTimeReference(cls, dt, T): - ''' Compute reference integration time vector for a specific periodicity. - - :param dt: integration time step (s) - :param T: periodicity (s) - :return: time vector for 1 periodic cycle - ''' - return np.linspace(0, T, cls.getNPerCycle(dt, T)) - - def isPeriodicallyStable(self, t, y, T): - ''' Assess the periodic stabilization of a solution, by evaluating the deviation - of system variables between the last two periods. - - :param t: time vector - :param y: solution matrix - :param T: periodicity (s) - :return: boolean stating whether the solution is periodically stable or not - ''' - # Extract the 2 cycles of interest from the solution - n = self.getNPerCycle(t[1] - t[0], T) - 1 - y_last = y[-n:, :] - y_prec = y[-2 * n:-n, :] - - # For each variable of interest, evaluate the RMSE between the two cycles, the - # variation range over the last cycle, and the ratio of these 2 quantities - ratios = np.array([rmse(y_last[:, ivar], y_prec[:, ivar]) / np.ptp(y_last[:, ivar]) - for ivar in self.ivars_to_check]) - - # Classify the solution as periodically stable only if all RMSE/PTP ratios - # are below critical threshold - is_periodically_stable = np.all(ratios < MAX_RMSE_PTP_RATIO) - - return is_periodically_stable - - def isAsymptoticallyStable(self, t, y, T): - ''' Assess the asymptotically stabilization of a solution, by evaluating the deviation - of system variables from their initial values. - - :param t: time vector - :param y: solution matrix - :param T: periodicity (s) - :return: boolean stating whether the solution is asymptotically stable or not - ''' - n_history = int(self.t_history // T) # size of history - n = self.getNPerCycle(t[1] - t[0], T) - 1 - - # For each variable to be checked - convs = [None] * len(self.ivars_to_check) - for i, ivar in enumerate(self.ivars_to_check): - xdev = y[::n, ivar] - self.refs[i] - # print(np.array(xdev) * 1e5) - - # If last deviation is too large -> divergence - if np.abs(xdev[-1]) > self.div_thr[i]: - return -1 - - # If last deviation or average deviation in recent history - # is small enough -> convergence - for x in [xdev[-1], np.mean(xdev[-n_history:])]: - if np.abs(x) < self.conv_thr[i]: - convs[i] = 1 - - # print(convs) - - return np.all(convs == 1) - - def stopFuncTmp(self, t, y, T): - return self.isAsymptoticallyStable(t, y, T) != 0 - - def compute(self, y0, dt, T, t0=0., nmax=NCYCLES_MAX): - ''' Simulate system with a specific periodicity until stopping criterion is met. - - :param y0: 1D vector of initial conditions - :param dt: integration time step (s) - :param T: periodicity (s) - :param t0: starting time - :return: 3-tuple with the time profile, the effective solution matrix and a state vector - ''' - - # If none specified, set all variables to be checked for stability - if self.ivars_to_check is None: - self.ivars_to_check = range(y0.size) - - # Get reference time vector - tref = self.getTimeReference(dt, T) - - # Initialize global arrays - t, y, stim = self.initialize(y0) - - # Integrate system for a few cycles until stopping criterion is met - icycle = 0 - stop = False - while not stop and icycle < nmax: - t, y, stim = self.integrate(t, y, stim, tref + icycle * T, self.dfunc, True) - if icycle > 0: - stop = self.stopfunc(t, y, T) - icycle += 1 - t += t0 - - # Log stopping criterion - t_str = f't = {t[-1] * 1e3:.5f} ms' - if icycle == nmax: - logger.warning(f'{t_str}: criterion not met -> stopping after {icycle} cycles') - else: - logger.debug(f'{t_str}: stopping criterion met after {icycle} cycles') - - # Return output variables - return t, y, stim - - -class OnOffSimulator(Simulator): - - def __init__(self, dfunc_on, dfunc_off): - ''' Initialize simulator with specific derivative functions - - :param dfunc_on: derivative function for ON periods - :param dfunc_off: derivative function for OFF periods - ''' - self.dfunc_on = dfunc_on - self.dfunc_off = dfunc_off - - @staticmethod - def getTimeReference(dt, tstim, toffset): - ''' Compute reference integration time vectors for a specific stimulus application pattern. - - :param dt: integration time step (s) - :param tstim: duration of US stimulation (s) - :param toffset: duration of the offset (s) - :return: 2-tuple with time vectors for ON and OFF periods - ''' - t_on = np.linspace(0, tstim, int(np.round(tstim / dt)) + 1) - t_off = np.linspace(tstim, tstim + toffset, int(np.round(toffset / dt))) - return t_on, t_off - - def compute(self, y0, dt, tp, target_dt=None): - ''' Simulate system for a specific stimulus application pattern. - - :param y0: 1D vector of initial conditions - :param dt: integration time step (s) - :param tp: time protocol object - :param target_dt: target time step after resampling - :return: 3-tuple with the time profile, the effective solution matrix and a state vector - ''' - tstim, toffset = tp.tstim, tp.toffset - - # Get reference time vectors - t_on, t_off = self.getTimeReference(dt, tstim, toffset) - - # Initialize global arrays - t, y, stim = self.initialize(y0) - - # Integrate ON and OFF periods - t, y, stim = self.integrate(t, y, stim, t_on, self.dfunc_on, True) - t, y, stim = self.integrate(t, y, stim, t_off, self.dfunc_off, False) - - # Resample solution if specified - if target_dt is not None: - t, y, stim = self.downsample(t, y, stim, target_dt) - - # Return output variables - return t, y, stim - - -class PWSimulator(Simulator): - - def __init__(self, dfunc_on, dfunc_off): - ''' Initialize simulator with specific derivative functions - - :param dfunc_on: derivative function for ON periods - :param dfunc_off: derivative function for OFF periods - ''' - self.dfunc_on = dfunc_on - self.dfunc_off = dfunc_off - - @staticmethod - def getTimeReference(dt, tstim, toffset, PRF, DC): - ''' Compute reference integration time vectors for a specific stimulus application pattern. - - :param dt: integration time step (s) - :param tstim: duration of US stimulation (s) - :param toffset: duration of the offset (s) - :param PRF: pulse repetition frequency (Hz) - :param DC: pulse duty cycle (-) - :return: 3-tuple with time vectors for stimulus ON and OFF periods and stimulus offset - ''' - - # Compute vector sizes - T_ON = DC / PRF - T_OFF = (1 - DC) / PRF - - # For high-PRF pulsed protocols: adapt time step to ensure minimal - # number of samples during TON or TOFF - dt_warning_msg = 'high-PRF protocol: lowering time step to %.2e s to properly integrate %s' - for key, T in {'TON': T_ON, 'TOFF': T_OFF}.items(): - if T > 0 and T / dt < MIN_SAMPLES_PER_PULSE_INTERVAL: - dt = T / MIN_SAMPLES_PER_PULSE_INTERVAL - logger.warning(dt_warning_msg, dt, key) - - # Initializing accurate time vectors pulse ON and OFF periods, as well as offset - t_on = np.linspace(0, T_ON, int(np.round(T_ON / dt)) + 1) - t_off = np.linspace(T_ON, 1 / PRF, int(np.round(T_OFF / dt))) - t_offset = np.linspace(tstim, tstim + toffset, int(np.round(toffset / dt))) - - return t_on, t_off, t_offset - - @staticmethod - def adjustPRF(tstim, PRF, DC, print_progress): - ''' Adjust the PRF in case of continuous wave stimulus, in order to obtain the desired - number of integration interval(s) during stimulus. - - :param tstim: duration of US stimulation (s) - :param PRF: pulse repetition frequency (Hz) - :param DC: pulse duty cycle (-) - :param print_progress: boolean specifying whether to show a progress bar - :return: adjusted PRF value (Hz) - ''' - if DC < 1.0: # if PW stimuli, then no change - return PRF - else: # if CW stimuli, then divide integration according to presence of progress bar - return {True: 100., False: 1.}[print_progress] / tstim - - @staticmethod - def getNPulses(tstim, PRF): - ''' Calculate number of pulses from stimulus temporal pattern. - - :param tstim: duration of US stimulation (s) - :param toffset: duration of the offset (s) - :return: number of pulses during the stimulus - ''' - return int(np.round(tstim * PRF)) - - def compute(self, y0, dt, pp, target_dt=None, - print_progress=False, monitor_func=None): - ''' Simulate system for a specific stimulus application pattern. - - :param y0: 1D vector of initial conditions - :param dt: integration time step (s) - :param pp: pulse protocol object - :param target_dt: target time step after resampling - :param monitor: string specifying how to monitor integration to show a progress bar - :return: 3-tuple with the time profile, the effective solution matrix and a state vector - ''' - tstim, toffset, PRF, DC = pp.tstim, pp.toffset, pp.PRF, pp.DC - - # Adjust PRF and get number of pulses - if tstim > 0: - PRF = self.adjustPRF(tstim, PRF, DC, print_progress) - npulses = self.getNPulses(tstim, PRF) - - # Get reference time vectors - t_on, t_off, t_offset = self.getTimeReference(dt, tstim, toffset, PRF, DC) - - # Initialize global arrays - t, y, stim = self.initialize(y0) - - # Divide offset interval if print progress is True - if print_progress: - nslices_offset = int(np.round(toffset * npulses / tstim)) - else: - nslices_offset = 1 - - # Compute reference time vector for offset slices - tslice = toffset / nslices_offset - nperslice = int(np.round(tslice / dt)) - tref_offset = np.linspace(0, tslice, nperslice) + tstim - - # Initialize progress bar if no monitoring function provided - if print_progress: - ntot = npulses + nslices_offset - if monitor_func is None: - setHandler(logger, TqdmHandler(my_log_formatter)) - pbar = tqdm(total=ntot) - else: - logger.debug('integrating stimulus') - - # Integrate ON and OFF intervals of each pulse - for i in range(npulses): - for j, (tref, func) in enumerate(zip([t_on, t_off], [self.dfunc_on, self.dfunc_off])): - t, y, stim = self.integrate(t, y, stim, tref + i / PRF, func, j == 0) - - # Monitor progress - if print_progress: - if monitor_func is None: - pbar.update() - else: - logger.debug(f'slice {i + 1}/{ntot}: {monitor_func(t[-1], y[-1, :])}') - - # Integrate offset interval - if print_progress and monitor_func is not None: - logger.debug('integrating offset') - for i in range(nslices_offset): - t, y, stim = self.integrate(t, y, stim, tref_offset + i * tslice, self.dfunc_off, False) - - # Monitor progress - if print_progress: - if monitor_func is None: - pbar.update() - else: - logger.debug(f'slice {npulses + i + 1}/{ntot}: {monitor_func(t[-1], y[-1, :])}') - - # Terminate progress bar if any - if print_progress: - if monitor_func is None: - pbar.close() - else: - logger.debug('integration completed') - - # Resample solution if specified - if target_dt is not None: - t, y, stim = self.downsample(t, y, stim, target_dt) - - # Return output variables - return t, y, stim - - -class HybridSimulator(PWSimulator): - - def __init__(self, dfunc_on, dfunc_off, dfunc_sparse, predfunc, is_dense_var, - stopfunc=None, ivars_to_check=None): - ''' Initialize simulator with specific derivative functions - - :param dfunc_on: derivative function for ON periods - :param dfunc_off: derivative function for OFF periods - :param dfunc_sparse: derivative function for sparse integration - :param predfunc: function computing the extra arguments necessary for sparse integration - :param is_dense_var: boolean array stating for each variable if it evolves fast or not - :param stopfunc: function estimating stopping criterion - :param ivars_to_check: solution indexes of variables to check for stability - ''' - PWSimulator.__init__(self, dfunc_on, dfunc_off) - self.sparse_solver = ode(dfunc_sparse) - self.sparse_solver.set_integrator('dop853', nsteps=SOLVER_NSTEPS, atol=1e-12) - self.predfunc = predfunc - self.is_dense_var = is_dense_var - self.is_sparse_var = np.invert(is_dense_var) - self.stopfunc = stopfunc - self.ivars_to_check = ivars_to_check - - def integrate(self, t, y, stim, tnew, dfunc, is_on): - ''' Integrate system for a time interval and append to preceding solution arrays, - using a hybrid scheme: - - - First, the full ODE system is integrated for a few cycles with a dense time - granularity until a stopping criterion is met - - Second, the profiles of all variables over the last cycle are downsampled to a - far lower (i.e. sparse) sampling rate - - Third, a subset of the ODE system is integrated with a sparse time granularity, - for the remaining of the time interval, while the remaining variables are - periodically expanded from their last cycle profile. - - :param t: preceding time vector - :param y: preceding solution matrix - :param stim: preceding stimulation state vector - :param tnew: integration time vector for current interval - :param dfunc: derivative function for current interval - :param is_on: stimulation state for current interval - :return: 3-tuple with the appended time vector, solution matrix and state vector - ''' - - if tnew.size == 0: - return t, y, stim - - # Initialize periodic solver - dense_solver = PeriodicSimulator( - dfunc, stopfunc=self.stopfunc, ivars_to_check=self.ivars_to_check) - dt_dense = tnew[1] - tnew[0] - npc_dense = dense_solver.getNPerCycle(dt_dense, self.T) - - # Until final integration time is reached - while t[-1] < tnew[-1]: - logger.debug(f't = {t[-1] * 1e3:.5f} ms: starting new hybrid integration') - - # Integrate dense system until stopping criterion is met - tdense, ydense, stimdense = dense_solver.compute(y[-1], dt_dense, self.T, t0=t[-1]) - t, y, stim = self.appendSolution(t, y, stim, tdense, ydense, is_on) - - # Resample signals over last cycle to match sparse time step - tlast, ylast, stimlast = self.downsample( - tdense[-npc_dense:], ydense[-npc_dense:], stimdense[-npc_dense:], - self.dt_sparse) - npc_sparse = tlast.size - - # Integrate until either the rest of the interval or max update interval is reached - t0 = tdense[-1] - tf = min(tnew[-1], tdense[0] + HYBRID_UPDATE_INTERVAL) - nsparse = int(np.round((tf - t0) / self.dt_sparse)) - tsparse = np.linspace(t0, tf, nsparse) - ysparse = np.empty((nsparse, y.shape[1])) - ysparse[0] = y[-1] - self.sparse_solver.set_initial_value(y[-1, self.is_sparse_var], t[-1]) - for j in range(1, tsparse.size): - self.sparse_solver.set_f_params(self.predfunc(ylast[j % npc_sparse])) - self.sparse_solver.integrate(tsparse[j]) - if not self.sparse_solver.successful(): - raise ValueError(f'integration error at t = {tsparse[j] * 1e3:.5f} ms') - ysparse[j, self.is_dense_var] = ylast[j % npc_sparse, self.is_dense_var] - ysparse[j, self.is_sparse_var] = self.sparse_solver.y - t, y, stim = self.appendSolution(t, y, stim, tsparse, ysparse, is_on) - - return t, y, stim - - def compute(self, y0, dt_dense, dt_sparse, T, pp, print_progress=False): - ''' Simulate system for a specific stimulus application pattern. - - :param y0: 1D vector of initial conditions - :param dt_dense: dense integration time step (s) - :param dt_sparse: sparse integration time step (s) - :param T: periodicity (s) - :param pp: pulse protocol object - :param print_progress: boolean specifying whether to show a progress bar - :return: 3-tuple with the time profile, the effective solution matrix and a state vector - ''' - - # Set periodicity and sparse time step - self.T = T - self.dt_sparse = dt_sparse - - # Call and return parent compute method - return PWSimulator.compute( - self, y0, dt_dense, pp, - target_dt=None, print_progress=print_progress) diff --git a/PySONIC/core/solvers.py b/PySONIC/core/solvers.py new file mode 100644 index 0000000..9100b06 --- /dev/null +++ b/PySONIC/core/solvers.py @@ -0,0 +1,568 @@ +# -*- coding: utf-8 -*- +# @Author: Theo Lemaire +# @Email: theo.lemaire@epfl.ch +# @Date: 2019-05-28 14:45:12 +# @Last Modified by: Theo Lemaire +# @Last Modified time: 2020-04-15 20:00:03 + +import abc +import numpy as np +import pandas as pd +from scipy.interpolate import interp1d +from scipy.integrate import ode, odeint, solve_ivp +from tqdm import tqdm + +from ..utils import * +from ..constants import * + + +class ODESolver(metaclass=abc.ABCMeta): + ''' Generic interface to ODE solver object. ''' + + def __init__(self, ykeys, dfunc, dt=None): + ''' Initialization. + + :param ykeys: list of differential variables names + :param dfunc: derivative function + :param dt: integration time step (s) + ''' + self.ykeys = ykeys + self.dfunc = dfunc + self.dt = dt + + def checkFunc(self, key, value): + if not callable(value): + raise ValueError(f'{key} function must be a callable object') + + @property + def ykeys(self): + return self._ykeys + + @ykeys.setter + def ykeys(self, value): + if not isIterable(value): + value = list(value) + for item in value: + if not isinstance(item, str): + raise ValueError('ykeys must be a list of strings') + self._ykeys = value + + @property + def dfunc(self): + return self._dfunc + + @dfunc.setter + def dfunc(self, value): + self.checkFunc('derivative', value) + self._dfunc = value + + @property + def dt(self): + return self._dt + + @dt.setter + def dt(self, value): + if value is None: + self._dt = None + else: + if not isinstance(value, float): + raise ValueError('time step must be float-typed') + if value <= 0: + raise ValueError('time step must be strictly positive') + self._dt = value + + def getNSamples(self, t0, tend, dt=None): + ''' Get the number of samples required to integrate from an initial to a final time with + a specific time step. + + :param t0: initial time (s) + :param tend: final time (s) + :param dt: integration time step (s) + :return: number of required samples, rounded to nearest integer + ''' + if dt is None: + dt = self.dt + return int(np.round((tend - t0) / dt)) + + def getTimeVector(self, t0, tend, **kwargs): + ''' Get the time vector required to integrate from an initial to a final time with + a specific time step. + + :param t0: initial time (s) + :param tend: final time (s) + :return: vector going from current time to target time with appropriate step (s) + ''' + return np.linspace(t0, tend, self.getNSamples(t0, tend, **kwargs)) + + def initialize(self, y0, t0=0.): + ''' Initialize time vector and solution matrix. + + :param y0: dictionary of initial conditions + ''' + keys = list(y0.keys()) + if len(keys) != len(self.ykeys): + raise ValueError("Initial conditions do not match system's dimensions") + for k in keys: + if k not in self.ykeys: + raise ValueError(f'{k} is not a differential variable') + y0 = {k: np.asarray(v) if isIterable(v) else np.array([v]) for k, v in y0.items()} + ref_size = y0[keys[0]].size + if not all(v.size == ref_size for v in y0.values()): + raise ValueError('dimensions of initial conditions are inconsistent') + self.y = np.array(list(y0.values())).T + self.t = np.ones(self.y.shape[0]) * t0 + self.x = np.zeros(self.t.size) + + def append(self, t, y): + ''' Append to time vector and solution matrix. + + :param t: integration time vector for current interval + :param y: derivative function for current interval + ''' + self.t = np.concatenate((self.t, t)) + self.y = np.concatenate((self.y, y), axis=0) + self.x = np.concatenate((self.x, np.ones(t.size) * self.xref)) + + def bound(self, tbounds): + ''' Bound to time vector and solution matrix. ''' + i_bounded = np.logical_and(self.t >= tbounds[0], self.t <= tbounds[1]) + self.t = self.t[i_bounded] + self.y = self.y[i_bounded, :] + self.x = self.x[i_bounded] + + def integrateUntil(self, target_t, remove_first=False): + ''' Integrate system for a time interval and append to preceding solution arrays. + + :param target_t: target time (s) + :param dt: integration time step (s) + ''' + if target_t < self.t[-1]: + raise ValueError(f'target time ({target_t} s) precedes current time {self.t[-1]} s') + elif target_t == self.t[-1]: + t, y = self.t[-1], self.y[-1] + if self.dt is None: + sol = solve_ivp( + self.dfunc, [self.t[-1], target_t], self.y[-1], t_eval=t, method='LSODA') + t, y = sol.t, sol.y.T + else: + t = self.getTimeVector(self.t[-1], target_t) + y = odeint(self.dfunc, self.y[-1], t, tfirst=True) + if remove_first: + t, y = t[1:], y[1:] + self.append(t, y) + + def resampleArray(self, t, y, target_dt): + tnew = self.getTimeVector(t[0], t[-1], dt=target_dt) + ynew = np.array([np.interp(tnew, t, x) for x in y.T]).T + return tnew, ynew + + def resample(self, target_dt): + ''' Resample solution to a new target time step. + + :target_dt: target time step (s) + ''' + tnew, self.y = self.resampleArray(self.t, self.y, target_dt) + self.x = interp1d(self.t, self.x, kind='nearest', assume_sorted=True)(tnew) + self.t = tnew + + @abc.abstractmethod + def solve(self, y0): + ''' Simulate system for a given time interval with specific initial conditions. + + :param y0: dictionary of initial conditions + ''' + raise NotImplementedError + + @property + def solution(self): + ''' Return solution as a pandas dataframe. ''' + return pd.DataFrame({ + 't': self.t, + 'stimstate': self.x, + **{k: self.y[:, i] for i, k in enumerate(self.ykeys)} + }) + + def __call__(self, *args, target_dt=None, **kwargs): + ''' Call solve method and return solution dataframe. ''' + self.solve(*args, **kwargs) + if target_dt is not None: + self.resample(target_dt) + return self.solution + + +class PeriodicSolver(ODESolver): + ''' ODE solver with specific periodicity. ''' + + def __init__(self, ykeys, dfunc, T, primary_vars=None, **kwargs): + ''' Initialization. + + :param ykeys: list of differential variables names + :param dfunc: derivative function + :param T: periodicity (s) + :param primary_vars: keys of the primary solution variables to check for stability + ''' + super().__init__(ykeys, dfunc, **kwargs) + self.T = T + self.primary_vars = primary_vars + + @property + def T(self): + return self._T + + @T.setter + def T(self, value): + if not isinstance(value, float): + raise ValueError('periodicity must be float-typed') + if value <= 0: + raise ValueError('periodicity must be strictly positive') + self._T = value + + @property + def primary_vars(self): + return self._primary_vars + + @primary_vars.setter + def primary_vars(self, value): + if value is None: # If none specified, set all variables to be checked for stability + value = self.ykeys + if not isIterable(value): + value = [value] + for item in value: + if item not in self.ykeys: + raise ValueError(f'{item} is not a differential variable') + self._primary_vars = value + + @property + def i_primary_vars(self): + return [self.ykeys.index(k) for k in self.primary_vars] + + @property + def xref(self): + return 1. + + def getNPerCycle(self, dt): + ''' Compute number of samples per cycle given a time step and a specific periodicity. + + :param dt: integration time step (s) + :return: number of samples per cycle + ''' + if isIterable(dt): # if time vector is provided, compute dt from its last 2 elements + dt = dt[-1] - dt[-2] + return int(np.round(self.T / dt)) + 1 + + def getLastCycle(self, i=0): + ''' Get solution vector for the last ith cycle. ''' + n = self.getNPerCycle(self.t) - 1 + if i == 0: + return self.y[-n:] + return self.y[-(i + 1) * n:-i * n] + + def isPeriodicallyStable(self): + ''' Assess the periodic stabilization of a solution, by evaluating the deviation + of system variables between the last two periods. + + :return: boolean stating whether the solution is periodically stable or not + ''' + # Extract the 2 cycles of interest from the solution + y_last, y_prec = self.getLastCycle(i=0), self.getLastCycle(i=1) + + # For each variable of interest, evaluate the RMSE between the two cycles, the + # variation range over the last cycle, and the ratio of these 2 quantities + ratios = np.array([rmse(y_last[:, i], y_prec[:, i]) / np.ptp(y_last[:, i]) + for i in self.i_primary_vars]) + + # Classify the solution as periodically stable only if all RMSE/PTP ratios + # are below critical threshold + return np.all(ratios < MAX_RMSE_PTP_RATIO) + + def integrateCycle(self): + ''' Integrate system for a cycle. ''' + self.integrateUntil(self.t[-1] + self.T, remove_first=True) + + def solve(self, y0, nmax=None, **kwargs): + ''' Simulate system with a specific periodicity until stopping criterion is met. + + :param y0: dictionary of initial conditions + :param nmax: maximum number of integration cycles (optional) + ''' + if nmax is None: + nmax = NCYCLES_MAX + + # Initialize system + if y0 is not None: + self.initialize(y0, **kwargs) + + # Integrate system for 2 cycles + for i in range(2): + self.integrateCycle() + + # Keep integrating system cyclically until stopping criterion is met + while not self.isPeriodicallyStable() and i < nmax: + self.integrateCycle() + i += 1 + + # Log stopping criterion + t_str = f't = {self.t[-1] * 1e3:.5f} ms' + if i == nmax: + logger.warning(f'{t_str}: criterion not met -> stopping after {i} cycles') + else: + logger.debug(f'{t_str}: stopping criterion met after {i} cycles') + + +class EventDrivenSolver(ODESolver): + ''' Event-driven ODE solver. ''' + + def __init__(self, ykeys, dfunc, eventfunc, event_params=None, **kwargs): + ''' Initialization. + + :param ykeys: list of differential variables names + :param dfunc: derivatives function + :param eventfunc: function called on each event + :param event_params: dictionary of parameters used by the derivatives function + ''' + super().__init__(ykeys, dfunc, **kwargs) + self.eventfunc = eventfunc + self.assignEventParams(event_params) + + def assignEventParams(self, event_params): + ''' Assign event parameters as instance attributes. ''' + if event_params is not None: + for k, v in event_params.items(): + setattr(self, k, v) + + @property + def eventfunc(self): + return self._eventfunc + + @eventfunc.setter + def eventfunc(self, value): + self.checkFunc('event', value) + self._eventfunc = value + + @property + def xref(self): + return self._xref + + @xref.setter + def xref(self, value): + self._xref = value + + def initialize(self, *args, **kwargs): + self.xref = 0 + super().initialize(*args, **kwargs) + + def fireEvent(self, xevent): + ''' Call event function and set new xref value. ''' + if xevent is not None: + if xevent == 'log': + self.logProgress() + else: + self.eventfunc(xevent) + self.xref = xevent + + def initLog(self, logfunc, n): + self.logfunc = logfunc + if self.logfunc is None: + setHandler(logger, TqdmHandler(my_log_formatter)) + self.pbar = tqdm(total=n) + else: + self.np = n + logger.debug('integrating stimulus') + + def logProgress(self): + if self.logfunc is None: + self.pbar.update() + else: + logger.debug(f't = {self.t[-1] * 1e3:.5f} ms: {self.logfunc(self.y[-1])}') + + def terminateLog(self): + if self.logfunc is None: + self.pbar.close() + else: + logger.debug('integration completed') + + def sortEvents(self, events): + return sorted(events, key=lambda x: x[0]) + + def solve(self, y0, events, tstop, log_period=None, logfunc=None, **kwargs): + ''' Simulate system for a specific stimulus application pattern. + + :param y0: 1D vector of initial conditions + :param events: list of events + :param tstop: stopping time (s) + ''' + # Sort events according to occurrence time + events = self.sortEvents(events) + + # Make sure all events occur before tstop + if events[-1][0] > tstop: + raise ValueError('all events must occur before stopping time') + + if log_period is not None: # Add log events if any + tlogs = np.arange(kwargs.get('t0', 0.), tstop, log_period)[1:] + if tstop not in tlogs: + tlogs = np.hstack((tlogs, [tstop])) + events = self.sortEvents(events + [(t, 'log') for t in tlogs]) + self.initLog(logfunc, tlogs.size) + else: # Otherwise, add None event at tstop + events.append((tstop, None)) + + # Initialize system + self.initialize(y0, **kwargs) + + # For each upcoming event + for i, (tevent, xevent) in enumerate(events): + self.integrateUntil( # integrate until event time + tevent, + remove_first=i > 0 and events[i - 1][1] == 'log') + self.fireEvent(xevent) # fire event + + # Terminate log if any + if log_period is not None: + self.terminateLog() + + +class HybridSolver(EventDrivenSolver, PeriodicSolver): + + def __init__(self, ykeys, dfunc, dfunc_sparse, predfunc, eventfunc, T, + dense_vars, dt_dense, dt_sparse, **kwargs): + ''' Initialization. + + :param ykeys: list of differential variables names + :param dfunc: derivatives function + :param dfunc_sparse: derivatives function for sparse integration periods + :param predfunc: function computing the extra arguments necessary for sparse integration + :param eventfunc: function called on each event + :param T: periodicity (s) + :param dense_vars: list of fast-evolving differential variables + :param dt_dense: dense integration time step (s) + :param dt_sparse: sparse integration time step (s) + ''' + PeriodicSolver.__init__( + self, ykeys, dfunc, T, primary_vars=kwargs.get('primary_vars', None), dt=dt_dense) + self.eventfunc = eventfunc + self.assignEventParams(kwargs.get('event_params', None)) + self.predfunc = predfunc + self.dense_vars = dense_vars + self.dt_sparse = dt_sparse + self.sparse_solver = ode(dfunc_sparse) + self.sparse_solver.set_integrator('dop853', nsteps=SOLVER_NSTEPS, atol=1e-12) + + @property + def predfunc(self): + return self._predfunc + + @predfunc.setter + def predfunc(self, value): + self.checkFunc('prediction', value) + self._predfunc = value + + @property + def dense_vars(self): + return self._dense_vars + + @dense_vars.setter + def dense_vars(self, value): + if value is None: # If none specified, set all variables as dense variables + value = self.ykeys + if not isIterable(value): + value = [value] + for item in value: + if item not in self.ykeys: + raise ValueError(f'{item} is not a differential variable') + self._dense_vars = value + + @property + def is_dense_var(self): + return np.array([x in self.dense_vars for x in self.ykeys]) + + @property + def is_sparse_var(self): + return np.invert(self.is_dense_var) + + def integrateSparse(self, ysparse, tf): + ''' Integrate sparse system until a specific time. ''' + # Compute number of samples in the sparse cycle solution + npc = ysparse.shape[0] + + # Initialize time vector and solution array for the current interval + n = int(np.ceil((tf - self.t[-1]) / self.dt_sparse)) + t = np.linspace(self.t[-1], tf, n + 1)[1:] + y = np.empty((n, self.y.shape[1])) + + # Initialize sparse integrator + self.sparse_solver.set_initial_value(self.y[-1, self.is_sparse_var], self.t[-1]) + for i, tt in enumerate(t): + # Integrate to next time only if dt is above given threshold + if tt - self.sparse_solver.t > MIN_SPARSE_DT: + self.sparse_solver.set_f_params(self.predfunc(ysparse[i % npc])) + self.sparse_solver.integrate(tt) + if not self.sparse_solver.successful(): + raise ValueError(f'integration error at t = {tt * 1e3:.5f} ms') + + # Assign solution values (computed and propagated) to sparse solution array + y[i, self.is_dense_var] = ysparse[i % npc, self.is_dense_var] + y[i, self.is_sparse_var] = self.sparse_solver.y + + # Append to global solution + self.append(t, y) + + def solve(self, y0, events, tstop, logfunc=None, **kwargs): + ''' Integrate system using a hybrid scheme: + + - First, the full ODE system is integrated for a few cycles with a dense time + granularity until a stopping criterion is met + - Second, the profiles of all variables over the last cycle are downsampled to a + far lower (i.e. sparse) sampling rate + - Third, a subset of the ODE system is integrated with a sparse time granularity, + for the remaining of the time interval, while the remaining variables are + periodically expanded from their last cycle profile. + ''' + # Sort events according to occurrence time + events = self.sortEvents(events) + + # Make sure all events occur before tstop + if events[-1][0] > tstop: + raise ValueError('all events must occur before stopping time') + # Add None event at tstop + events.append((tstop, None)) + + # Initialize system + self.initialize(y0) + + # Initialize event iterator + ievent = iter(events) + tevent, xevent = next(ievent) + stop = False + + # While final event is not reached + while not stop: + # Determine end-time of current interval + tend = min(tevent, self.t[-1] + HYBRID_UPDATE_INTERVAL) + + # If time interval encompasses at least one cycle, solve periodic system + nmax = int(np.round((tend - self.t[-1]) / self.T)) + if nmax > 0: + PeriodicSolver.solve(self, None, nmax=nmax) + + # If end-time of current interval has been exceeded, bound solution to that time + if self.t[-1] > tend: + self.bound((self.t[0], tend)) + + # If end-time of current interval has not been reached + if self.t[-1] < tend: + # Get solution over last cycle and resample it to sparse time step + ylast = self.getLastCycle() + tlast = self.t[-ylast.shape[0]:] + _, ysparse = self.resampleArray(tlast, ylast, self.dt_sparse) + + # Integrate sparse system for the rest of the current interval + self.integrateSparse(ysparse, tend) + + # If end-time corresponds to event, fire it and move to next event + if self.t[-1] == tevent: + self.fireEvent(xevent) + try: + tevent, xevent = next(ievent) + except StopIteration: + stop = True diff --git a/PySONIC/core/vclamp.py b/PySONIC/core/vclamp.py index f97ff0d..48bb90d 100644 --- a/PySONIC/core/vclamp.py +++ b/PySONIC/core/vclamp.py @@ -1,156 +1,150 @@ # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Email: theo.lemaire@epfl.ch # @Date: 2019-08-14 13:49:25 # @Last Modified by: Theo Lemaire -# @Last Modified time: 2020-02-20 17:00:01 +# @Last Modified time: 2020-04-15 20:43:31 import numpy as np import pandas as pd -from .batches import Batch from .protocols import TimeProtocol from .model import Model from .pneuron import PointNeuron -from .simulators import OnOffSimulator +from .solvers import EventDrivenSolver from .drives import Drive, VoltageDrive from ..constants import * from ..utils import * from ..neurons import getPointNeuron class VoltageClamp(Model): tscale = 'ms' # relevant temporal scale of the model simkey = 'VCLAMP' # keyword used to characterize simulations made with this model def __init__(self, pneuron): ''' Constructor of the class. :param pneuron: point-neuron model ''' # Check validity of input parameters if not isinstance(pneuron, PointNeuron): raise ValueError( f'Invalid neuron type: "{pneuron.name}" (must inherit from PointNeuron class)') self.pneuron = pneuron def __repr__(self): return f'{self.__class__.__name__}({self.pneuron})' def copy(self): return self.__class__(self.pneuron) @property def meta(self): return {'neuron': self.pneuron.name} @classmethod def initFromMeta(cls, meta): return cls(getPointNeuron(meta['neuron'])) def params(self): return self.pneuron.params() def getPltVars(self, wrapleft='df["', wrapright='"]'): return self.pneuron.getPltVars(wrapleft, wrapright) @property def pltScheme(self): return self.pneuron.pltScheme def filecode(self, *args): return Model.filecode(self, *args) @staticmethod def inputs(): return VoltageDrive.inputs() def filecodes(self, drive, tp): return { 'simkey': self.simkey, 'neuron': self.pneuron.name, **drive.filecodes, **tp.filecodes } @classmethod @Model.checkOutputDir def simQueue(cls, holds, steps, durations, offsets, **kwargs): ''' Create a serialized 2D array of all parameter combinations for a series of individual parameter sweeps. :param holds: list (or 1D-array) of held membrane potentials :param steps: list (or 1D-array) of step membrane potentials :param durations: list (or 1D-array) of stimulus durations :param offsets: list (or 1D-array) of stimulus offsets (paired with durations array) :return: list of parameters (list) for each simulation ''' drives = VoltageDrive.createQueue(holds, steps) protocols = TimeProtocol.createQueue(durations, offsets) queue = [] for drive in drives: for tp in protocols: queue.append([drive, tp]) return queue @staticmethod def checkInputs(drive, tp): ''' Check validity of stimulation parameters. :param drive: voltage drive object :param tp: time protocol object ''' if not isinstance(drive, Drive): raise TypeError(f'Invalid "drive" parameter (must be an "Drive" object)') if not isinstance(tp, TimeProtocol): raise TypeError('Invalid time protocol (must be "TimeProtocol" instance)') def derivatives(self, t, y, Vm=None): if Vm is None: Vm = self.pneuron.Vm0 states_dict = dict(zip(self.pneuron.statesNames(), y)) return self.pneuron.getDerStates(Vm, states_dict) @Model.addMeta @Model.logDesc @Model.checkSimParams def simulate(self, drive, tp): ''' Simulate a specific neuron model for a set of simulation parameters, and return output data in a dataframe. :param drive: voltage drive object :param tp: time protocol object :return: output dataframe ''' # Set initial conditions - y0 = self.pneuron.getSteadyStates(drive.Vhold) + y0 = {k: self.pneuron.steadyStates()[k](drive.Vhold) for k in self.pneuron.statesNames()} + # y0 = self.pneuron.getSteadyStates(drive.Vhold) - # Initialize simulator and compute solution - logger.debug('Computing solution') - simulator = OnOffSimulator( - lambda t, y: self.derivatives(t, y, Vm=drive.Vstep), - lambda t, y: self.derivatives(t, y, Vm=drive.Vhold)) - t, y, stim = simulator(y0, DT_EFFECTIVE, tp) - - # Prepend initial conditions (prior to stimulation) - t, y, stim = simulator.prependSolution(t, y, stim) + # Initialize solver and compute solution + solver = EventDrivenSolver( + y0.keys(), + lambda t, y: self.derivatives(t, y, Vm=solver.V), + lambda x: setattr(solver, 'V', (drive.Vstep - drive.Vhold) * x + drive.Vhold), + dt=DT_EFFECTIVE) + data = solver(y0, tp.stimEvents(), tp.ttotal) # Compute clamped membrane potential vector - Vm = np.zeros(stim.size) - Vm[stim == 0] = drive.Vhold - Vm[stim == 1] = drive.Vstep - - # Store output in dataframe and return - data = pd.DataFrame({ - 't': t, - 'stimstate': stim, - 'Qm': Vm * 1e-3 * self.pneuron.Cm0, - 'Vm': Vm, - }) - for i in range(len(self.pneuron.states)): - data[self.pneuron.statesNames()[i]] = y[:, i] + Vm = np.zeros(len(data)) + Vm[data['stimstate'] == 0] = drive.Vhold + Vm[data['stimstate'] == 1] = drive.Vstep + + # Add Qm and Vm timeries to solution + data = addColumn(data, 'Qm', Vm * 1e-3 * self.pneuron.Cm0, preceding_key='stimstate') + data = addColumn(data, 'Vm', Vm, preceding_key='Qm') + + # Return solution dataframe return data def desc(self, meta): return f'{self}: simulation @ {meta["drive"].desc}, {meta["tp"].desc}' diff --git a/PySONIC/utils.py b/PySONIC/utils.py index 50560bf..8fb3504 100644 --- a/PySONIC/utils.py +++ b/PySONIC/utils.py @@ -1,961 +1,971 @@ # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Email: theo.lemaire@epfl.ch # @Date: 2016-09-19 22:30:46 # @Last Modified by: Theo Lemaire -# @Last Modified time: 2020-04-10 18:30:51 +# @Last Modified time: 2020-04-15 12:37:08 ''' Definition of generic utility functions used in other modules ''' import csv from functools import wraps import operator import time from inspect import signature import os import abc from shutil import get_terminal_size import lockfile import math import pickle import json from tqdm import tqdm import logging import tkinter as tk from tkinter import filedialog import base64 import datetime import numpy as np from scipy.optimize import brentq from scipy import linalg import colorlog from pushbullet import Pushbullet # Package logger my_log_formatter = colorlog.ColoredFormatter( '%(log_color)s %(asctime)s %(message)s', datefmt='%d/%m/%Y %H:%M:%S:', reset=True, log_colors={ 'DEBUG': 'green', 'INFO': 'white', 'WARNING': 'yellow', 'ERROR': 'red', 'CRITICAL': 'red,bg_white', }, style='%') def setHandler(logger, handler): for h in logger.handlers: logger.removeHandler(h) logger.addHandler(handler) return logger def setLogger(name, formatter): handler = colorlog.StreamHandler() handler.setFormatter(formatter) logger = colorlog.getLogger(name) logger.addHandler(handler) return logger class TqdmHandler(logging.StreamHandler): def __init__(self, formatter): logging.StreamHandler.__init__(self) self.setFormatter(formatter) def emit(self, record): msg = self.format(record) tqdm.write(msg) logger = setLogger('PySONIC', my_log_formatter) def fillLine(text, char='-', totlength=None): ''' Surround a text with repetitions of a specific character in order to fill a line to a given total length. :param text: text to be surrounded :param char: surrounding character :param totlength: target number of characters in filled text line :return: filled text line ''' if totlength is None: totlength = get_terminal_size().columns - 1 ndashes = totlength - len(text) - 2 if ndashes < 2: return text else: nside = ndashes // 2 nleft, nright = nside, nside if ndashes % 2 == 1: nright += 1 return f'{char * nleft} {text} {char * nright}' # SI units prefixes si_prefixes = { 'y': 1e-24, # yocto 'z': 1e-21, # zepto 'a': 1e-18, # atto 'f': 1e-15, # femto 'p': 1e-12, # pico 'n': 1e-9, # nano 'u': 1e-6, # micro 'm': 1e-3, # mili '': 1e0, # None 'k': 1e3, # kilo 'M': 1e6, # mega 'G': 1e9, # giga 'T': 1e12, # tera 'P': 1e15, # peta 'E': 1e18, # exa 'Z': 1e21, # zetta 'Y': 1e24, # yotta } def si_format(x, precision=0, space=' '): ''' Format a float according to the SI unit system, with the appropriate prefix letter. ''' if isinstance(x, float) or isinstance(x, int) or isinstance(x, np.float) or\ isinstance(x, np.int32) or isinstance(x, np.int64): if x == 0: factor = 1e0 prefix = '' else: sorted_si_prefixes = sorted(si_prefixes.items(), key=operator.itemgetter(1)) vals = [tmp[1] for tmp in sorted_si_prefixes] ix = np.searchsorted(vals, np.abs(x)) - 1 if np.abs(x) == vals[ix + 1]: ix += 1 factor = vals[ix] prefix = sorted_si_prefixes[ix][0] return f'{x / factor:.{precision}f}{space}{prefix}' elif isinstance(x, list) or isinstance(x, tuple): return [si_format(item, precision, space) for item in x] elif isinstance(x, np.ndarray) and x.ndim == 1: return [si_format(float(item), precision, space) for item in x] else: print(type(x)) def pow10_format(number, precision=2): ''' Format a number in power of 10 notation. ''' sci_string = f'{number:.{precision}e}' value, exponent = sci_string.split("e") value, exponent = float(value), int(exponent) val_str = f'{value} * ' if value != 1. else '' return f'{val_str}10^{{{exponent}}}' def plural(n): if n < 0: raise ValueError(f'Cannot format negative integer (n = {n})') if n == 0: return '' else: return 's' def rmse(x1, x2): ''' Compute the root mean square error between two 1D arrays ''' return np.sqrt(((x1 - x2) ** 2).mean()) def rsquared(x1, x2): ''' compute the R-squared coefficient between two 1D arrays ''' residuals = x1 - x2 ss_res = np.sum(residuals**2) ss_tot = np.sum((x1 - np.mean(x1))**2) return 1 - (ss_res / ss_tot) def Pressure2Intensity(p, rho=1075.0, c=1515.0): ''' Return the spatial peak, pulse average acoustic intensity (ISPPA) associated with the specified pressure amplitude. :param p: pressure amplitude (Pa) :param rho: medium density (kg/m3) :param c: speed of sound in medium (m/s) :return: spatial peak, pulse average acoustic intensity (W/m2) ''' return p**2 / (2 * rho * c) def Intensity2Pressure(I, rho=1075.0, c=1515.0): ''' Return the pressure amplitude associated with the specified spatial peak, pulse average acoustic intensity (ISPPA). :param I: spatial peak, pulse average acoustic intensity (W/m2) :param rho: medium density (kg/m3) :param c: speed of sound in medium (m/s) :return: pressure amplitude (Pa) ''' return np.sqrt(2 * rho * c * I) def convertPKL2JSON(): for pkl_filepath in OpenFilesDialog('pkl')[0]: logger.info(f'Processing {pkl_filepath} ...') json_filepath = f'{os.path.splitext(pkl_filepath)[0]}.json' with open(pkl_filepath, 'rb') as fpkl, open(json_filepath, 'w') as fjson: data = pickle.load(fpkl) json.dump(data, fjson, ensure_ascii=False, sort_keys=True, indent=4) logger.info('All done!') def OpenFilesDialog(filetype, dirname=''): ''' Open a FileOpenDialogBox to select one or multiple file. The default directory and file type are given. :param dirname: default directory :param filetype: default file type :return: tuple of full paths to the chosen filenames ''' root = tk.Tk() root.withdraw() filenames = filedialog.askopenfilenames( filetypes=[(filetype + " files", '.' + filetype)], initialdir=dirname ) if len(filenames) == 0: raise ValueError('no input file selected') par_dir = os.path.abspath(os.path.join(filenames[0], os.pardir)) return filenames, par_dir def selectDirDialog(title='Select directory'): ''' Open a dialog box to select a directory. :return: full path to selected directory ''' root = tk.Tk() root.withdraw() directory = filedialog.askdirectory(title=title) if directory == '': raise ValueError('no directory selected') return directory def SaveFileDialog(filename, dirname=None, ext=None): ''' Open a dialog box to save file. :param filename: filename :param dirname: initial directory :param ext: default extension :return: full path to the chosen filename ''' root = tk.Tk() root.withdraw() filename_out = filedialog.asksaveasfilename( defaultextension=ext, initialdir=dirname, initialfile=filename) if len(filename_out) == 0: raise ValueError('no output filepath selected') return filename_out def loadData(fpath, frequency=1): ''' Load dataframe and metadata dictionary from pickle file. ''' logger.info('Loading data from "%s"', os.path.basename(fpath)) with open(fpath, 'rb') as fh: frame = pickle.load(fh) df = frame['data'].iloc[::frequency] meta = frame['meta'] return df, meta def rescale(x, lb=None, ub=None, lb_new=0, ub_new=1): ''' Rescale a value to a specific interval by linear transformation. ''' if lb is None: lb = x.min() if ub is None: ub = x.max() xnorm = (x - lb) / (ub - lb) return xnorm * (ub_new - lb_new) + lb_new def expandRange(xmin, xmax, exp_factor=2): if xmin > xmax: raise ValueError('values must be provided in (min, max) order') xptp = xmax - xmin xmid = (xmin + xmax) / 2 xdev = xptp * exp_factor / 2 return (xmid - xdev, xmin + xdev) def isIterable(x): for t in [list, tuple, np.ndarray]: if isinstance(x, t): return True return False def isWithin(name, val, bounds, rel_tol=1e-9): ''' Check if a floating point number is within an interval. If the value falls outside the interval, an error is raised. If the value falls just outside the interval due to rounding errors, the associated interval bound is returned. :param val: float value :param bounds: interval bounds (float tuple) :return: original or corrected value ''' if isIterable(val): return np.array([isWithin(name, v, bounds, rel_tol) for v in val]) if val >= bounds[0] and val <= bounds[1]: return val elif val < bounds[0] and math.isclose(val, bounds[0], rel_tol=rel_tol): logger.warning('Rounding %s value (%s) to interval lower bound (%s)', name, val, bounds[0]) return bounds[0] elif val > bounds[1] and math.isclose(val, bounds[1], rel_tol=rel_tol): logger.warning('Rounding %s value (%s) to interval upper bound (%s)', name, val, bounds[1]) return bounds[1] else: raise ValueError(f'{name} value ({val}) out of [{bounds[0]}, {bounds[1]}] interval') def getDistribution(xmin, xmax, nx, scale='lin'): if scale == 'log': xmin, xmax = np.log10(xmin), np.log10(xmax) return {'lin': np.linspace, 'log': np.logspace}[scale](xmin, xmax, nx) def getDistFromList(xlist): if not isinstance(xlist, list): raise TypeError('Input must be a list') if len(xlist) != 4: raise ValueError('List must contain exactly 4 arguments ([type, min, max, n])') scale = xlist[0] if scale not in ('log', 'lin'): raise ValueError('Unknown distribution type (must be "lin" or "log")') xmin, xmax = [float(x) for x in xlist[1:-1]] if xmin >= xmax: raise ValueError('Specified minimum higher or equal than specified maximum') nx = int(xlist[-1]) if nx < 2: raise ValueError('Specified number must be at least 2') return getDistribution(xmin, xmax, nx, scale=scale) def getIndex(container, value): ''' Return the index of a float / string value in a list / array :param container: list / 1D-array of elements :param value: value to search for :return: index of value (if found) ''' if isinstance(value, float): container = np.array(container) imatches = np.where(np.isclose(container, value, rtol=1e-9, atol=1e-16))[0] if len(imatches) == 0: raise ValueError(f'{value} not found in {container}') return imatches[0] elif isinstance(value, str): return container.index(value) def funcSig(func, args, kwargs): args_repr = [repr(a) for a in args] kwargs_repr = [f"{k}={v!r}" for k, v in kwargs.items()] return f'{func.__name__}({", ".join(args_repr + kwargs_repr)})' def debug(func): ''' Print the function signature and return value. ''' @wraps(func) def wrapper_debug(*args, **kwargs): print(f'Calling {funcSig(func, args, kwargs)}') value = func(*args, **kwargs) print(f"{func.__name__!r} returned {value!r}") return value return wrapper_debug def timer(func): ''' Monitor and return the runtime of the decorated function. ''' @wraps(func) def wrapper(*args, **kwargs): start_time = time.perf_counter() value = func(*args, **kwargs) end_time = time.perf_counter() run_time = end_time - start_time return value, run_time return wrapper def alignWithFuncDef(func, args, kwargs): ''' Align a set of provided positional and keyword arguments with the arguments signature in a specific function definition. :param func: function object :param args: list of provided positional arguments :param kwargs: dictionary of provided keyword arguments :return: 2-tuple with the modified arguments and ''' # Get positional and keyword arguments from function signature sig_params = {k: v for k, v in signature(func).parameters.items()} sig_args = list(filter(lambda x: x.default == x.empty, sig_params.values())) sig_kwargs = {k: v.default for k, v in sig_params.items() if v.default != v.empty} sig_nargs = len(sig_args) kwarg_keys = list(sig_kwargs.keys()) # Restrain provided positional arguments to those that are also positional in signature new_args = args[:sig_nargs] # Construct hybrid keyword arguments dictionary from: # - remaining positional arguments # - provided keyword arguments # - default keyword arguments new_kwargs = sig_kwargs for i, x in enumerate(args[sig_nargs:]): new_kwargs[kwarg_keys[i]] = x for k, v in kwargs.items(): new_kwargs[k] = v return new_args, new_kwargs def alignWithMethodDef(method, args, kwargs): args, kwargs = alignWithFuncDef(method, [None] + list(args), kwargs) return tuple(args[1:]), kwargs def logCache(fpath, delimiter='\t', out_type=float): ''' Add an extra IO memoization functionality to a function using file caching, to avoid repetitions of tedious computations with identical inputs. ''' def wrapper_with_args(func): @wraps(func) def wrapper(*args, **kwargs): # If function has history -> do not log if 'history' in kwargs: return func(*args, **kwargs) # Modify positional and keyword arguments to match function signature, if needed args, kwargs = alignWithFuncDef(func, args, kwargs) # Translate args and kwargs into string signature fsignature = funcSig(func, args, kwargs) # If entry present in log, return corresponding output if os.path.isfile(fpath): with open(fpath, 'r', newline='') as f: reader = csv.reader(f, delimiter=delimiter) for row in reader: if row[0] == fsignature: logger.debug(f'entry found in "{os.path.basename(fpath)}"') return out_type(row[1]) # Otherwise, compute output and log it into file before returning out = func(*args, **kwargs) lock = lockfile.FileLock(fpath) lock.acquire() with open(fpath, 'a', newline='') as csvfile: writer = csv.writer(csvfile, delimiter=delimiter) writer.writerow([fsignature, str(out)]) lock.release() return out return wrapper return wrapper_with_args def fileCache(root, fcode_func, ext='json'): def wrapper_with_args(func): @wraps(func) def wrapper(*args, **kwargs): # Get load and dump functions from file extension try: load_func = { 'json': json.load, 'pkl': pickle.load, 'csv': lambda f: np.loadtxt(f, delimiter=',') }[ext] dump_func = { 'json': json.dump, 'pkl': pickle.dump, 'csv': lambda x, f: np.savetxt(f, x, delimiter=',') }[ext] except KeyError: raise ValueError('Unknown file extension') # Get read and write mode (text or binary) from file extension mode = 'b' if ext == 'pkl' else '' # Get file path from root and function arguments, using fcode function if callable(fcode_func): fcode = fcode_func(*args) else: fcode = fcode_func fpath = os.path.join(os.path.abspath(root), f'{fcode}.{ext}') # If file exists, load output from it if os.path.isfile(fpath): logger.info(f'loading data from "{fpath}"') with open(fpath, 'r' + mode) as f: out = load_func(f) # Otherwise, execute function and create the file to dump the output else: logger.warning(f'reference data file not found: "{fpath}"') out = func(*args, **kwargs) logger.info(f'dumping data in "{fpath}"') lock = lockfile.FileLock(fpath) lock.acquire() with open(fpath, 'w' + mode) as f: dump_func(out, f) lock.release() return out return wrapper return wrapper_with_args def derivative(f, x, eps, method='central'): ''' Compute the difference formula for f'(x) with perturbation size eps. :param dfunc: derivatives function, taking an array of states and returning an array of derivatives :param x: states vector :param method: difference formula: 'forward', 'backward' or 'central' :param eps: perturbation vector (same size as states vector) :return: numerical approximation of the derivative around the fixed point ''' if isIterable(x): if not isIterable(eps) or len(eps) != len(x): raise ValueError('eps must be the same size as x') elif np.sum(eps != 0.) != 1: raise ValueError('eps must be zero-valued across all but one dimensions') eps_val = np.sum(eps) else: eps_val = eps if method == 'central': df = (f(x + eps) - f(x - eps)) / 2 elif method == 'forward': df = f(x + eps) - f(x) elif method == 'backward': df = f(x) - f(x - eps) else: raise ValueError("Method must be 'central', 'forward' or 'backward'.") return df / eps_val def jacobian(dfunc, x, rel_eps=None, abs_eps=None, method='central'): ''' Evaluate the Jacobian maatrix of a (time-invariant) system, given a states vector and derivatives function. :param dfunc: derivatives function, taking an array of n states and returning an array of n derivatives :param x: n-states vector :return: n-by-n square Jacobian matrix ''' if sum(e is not None for e in [abs_eps, rel_eps]) != 1: raise ValueError('one (and only one) of "rel_eps" or "abs_eps" parameters must be provided') # Determine vector size x = np.asarray(x) n = x.size # Initialize Jacobian matrix J = np.empty((n, n)) # Create epsilon vector if rel_eps is not None: mode = 'relative' eps_vec = rel_eps else: mode = 'absolute' eps_vec = abs_eps if not isIterable(eps_vec): eps_vec = np.array([eps_vec] * n) if mode == 'relative': eps = x * eps_vec else: eps = eps_vec # Perturb each state by epsilon on both sides, re-evaluate derivatives # and assemble Jacobian matrix ei = np.zeros(n) for i in range(n): ei[i] = 1 J[:, i] = derivative(dfunc, x, eps * ei, method=method) ei[i] = 0 return J def classifyFixedPoint(x, dfunc): ''' Characterize the stability of a fixed point by numerically evaluating its Jacobian matrix and evaluating the sign of the real part of its associated eigenvalues. :param x: n-states vector :param dfunc: derivatives function, taking an array of n states and returning an array of n derivatives ''' # Compute Jacobian numerically # print(f'x = {x}, dfunx(x) = {dfunc(x)}') eps_machine = np.sqrt(np.finfo(float).eps) J = jacobian(dfunc, x, rel_eps=eps_machine, method='forward') # Compute eigenvalues and eigenvectors eigvals, eigvecs = linalg.eig(J) logger.debug(f"eigenvalues = {[f'({x.real:.2e} + {x.imag:.2e}j)' for x in eigvals]}") # Determine fixed point stability based on eigenvalues is_neg_eigvals = eigvals.real < 0 if is_neg_eigvals.all(): # all real parts negative -> stable key = 'stable' elif is_neg_eigvals.any(): # both posivie and negative real parts -> saddle key = 'saddle' else: # all real parts positive -> unstable key = 'unstable' return eigvals, key def findModifiedEq(x0, dfunc, *args): ''' Find an equilibrium variable in a modified system by searching for its derivative root within an interval around its original equilibrium. :param x0: equilibrium value in original system. :param func: derivative function, taking the variable as first parameter. :param *args: remaining arguments needed for the derivative function. :return: variable equilibrium value in modified system. ''' is_iterable = [isIterable(arg) for arg in args] if any(is_iterable): if not all(is_iterable): raise ValueError('mix of iterables and non-iterables') lengths = [len(arg) for arg in args] if not all(n == lengths[0] for n in lengths): raise ValueError(f'inputs are not of the same size: {lengths}') n = lengths[0] res = [] for i in range(n): x = [arg[i] for arg in args] res.append(findModifiedEq(x0, dfunc, *x)) return np.array(res) else: return brentq(lambda x: dfunc(x, *args), x0 * 1e-4, x0 * 1e3, xtol=1e-16) def swapFirstLetterCase(s): if s[0].islower(): return s.capitalize() else: return s[0].lower() + s[1:] def getPow10(x, direction='up'): ''' Get the power of 10 that is closest to a number, in either direction("down" or "up"). ''' round_method = {'up': np.ceil, 'down': np.floor}[direction] return np.power(10, round_method(np.log10(x))) def rotAroundPoint2D(x, theta, p): ''' Rotate a 2D vector around a center point by a given angle. :param x: 2D coordinates vector :param theta: rotation angle (in rad) :param p: 2D center point coordinates :return: 2D rotated coordinates vector ''' n1, n2 = x.shape if n1 != 2: if n2 == 2: x = x.T else: raise ValueError('x should be a 2-by-n vector') # Rotation matrix R = np.array([ [np.cos(theta), -np.sin(theta)], [np.sin(theta), np.cos(theta)], ]) # Broadcast center point to input vector ptile = np.tile(p, (x.shape[1], 1)).T # Subtract, rotate and add return R.dot(x - ptile) + ptile def getKey(keyfile='pushbullet.key'): dir_path = os.path.dirname(os.path.realpath(__file__)) package_root = os.path.abspath(os.path.join(dir_path, os.pardir)) fpath = os.path.join(package_root, keyfile) if not os.path.isfile(fpath): raise FileNotFoundError('pushbullet API key file not found') with open(fpath) as f: encoded_key = f.readlines()[0] return base64.b64decode(str.encode(encoded_key)).decode() def sendPushNotification(msg): try: key = getKey() pb = Pushbullet(key) dt = datetime.datetime.now() s = dt.strftime('%Y-%m-%d %H:%M:%S') push = pb.push_note('Code Messenger', f'{s}\n{msg}') except FileNotFoundError as e: logger.error(f'Could not send push notification: "{msg}"') def alert(func): ''' Run a function, and send a push notification upon completion, or if an error is raised during its execution. ''' @wraps(func) def wrapper(*args, **kwargs): try: out = func(*args, **kwargs) sendPushNotification(f'completed "{func.__name__}" execution successfully') return out except BaseException as e: sendPushNotification(f'error during "{func.__name__}" execution: {e}') raise e return wrapper def sunflower(n, radius=1, alpha=1): ''' Generate a population of uniformly distributed 2D data points in a unit circle. :param n: number of data points :param alpha: coefficient determining evenness of the boundary :return: 2D matrix of Cartesian (x, y) positions ''' nbounds = np.round(alpha * np.sqrt(n)) # number of boundary points phi = (np.sqrt(5) + 1) / 2 # golden ratio k = np.arange(1, n + 1) # index vector theta = 2 * np.pi * k / phi**2 # angle vector r = np.sqrt((k - 1) / (n - nbounds - 1)) # radius vector r[r > 1] = 1 x = r * np.cos(theta) y = r * np.sin(theta) return radius * np.vstack((x, y)) def filecode(model, *args): ''' Generate file code given a specific combination of model input parameters. ''' # If meta dictionary was passed, generate inputs list from it if len(args) == 1 and isinstance(args[0], dict): meta = args[0].copy() if meta['simkey'] == 'ASTIM' and 'fs' not in meta: meta['fs'] = meta['model']['fs'] meta['method'] = meta['model']['method'] meta['qss_vars'] = None for k in ['simkey', 'model', 'tcomp', 'dt', 'atol']: if k in meta: del meta[k] args = list(meta.values()) # Otherwise, transform args tuple into list else: args = list(args) # If any argument is an iterable -> transform it to a continous string for i in range(len(args)): if isIterable(args[i]): args[i] = ''.join([str(x) for x in args[i]]) # Create file code by joining string-encoded inputs with underscores codes = model.filecodes(*args).values() return '_'.join([x for x in codes if x is not None]) def simAndSave(model, *args, **kwargs): ''' Simulate the model and save the results in a specific output directory. :param *args: list of arguments provided to the simulation function :param **kwargs: optional arguments dictionary :return: output filepath ''' # Extract output directory and overwrite boolean from keyword arguments. outputdir = kwargs.pop('outputdir', '.') overwrite = kwargs.pop('overwrite', True) # Set data and meta to None data, meta = None, None # Extract drive object from args drive, *other_args = args # If drive is searchable and not fully resolved if drive.is_searchable: if not drive.is_resolved: # Call simulate to perform titration out = model.simulate(*args) # If titration yields nothing -> no file produced -> return None if out is None: logger.warning('returning None') return None # Store data and meta data, meta = out # Update args list with resovled drive try: args = (meta['drive'], *other_args) except KeyError: args = (meta['source'], *other_args) # Check if a output file corresponding to sim inputs is found in the output directory # That check is performed prior to running the simulation, such that # it is not run if the file is present and overwrite is set ot false. fname = f'{model.filecode(*args)}.pkl' fpath = os.path.join(outputdir, fname) existing_file_msg = f'File "{fname}" already present in directory "{outputdir}"' existing_file = os.path.isfile(fpath) # If file exists and overwrite is set ot false -> return if existing_file and not overwrite: logger.warning(f'{existing_file_msg} -> preserving') return fpath # Run simulation if not already done (for titration cases) if data is None: data, meta = model.simulate(*args) # Raise warning if an existing file is overwritten if existing_file: logger.warning(f'{existing_file_msg} -> overwriting') # Save output file and return output filepath with open(fpath, 'wb') as fh: pickle.dump({'meta': meta, 'data': data}, fh) logger.debug('simulation data exported to "%s"', fpath) return fpath def moveItem(l, value, itarget): ''' Move a list item to a specific target index. :param l: list object :param value: value of the item to move :param itarget: target index :return: re-ordered list. ''' # Get absolute target index if itarget < 0: itarget += len(l) assert itarget < len(l), f'target index {itarget} exceeds list size ({len(l)})' # Get index corresponding to element and delete entry from list iref = l.index(value) new_l = l.copy() del new_l[iref] # Return re-organized list return new_l[:itarget] + [value] + new_l[itarget:] class StimObject(metaclass=abc.ABCMeta): ''' Generic interface to a simulation object. ''' @staticmethod @abc.abstractmethod def inputs(): raise NotImplementedError def checkInt(self, key, value): if not isinstance(value, int): raise TypeError(f'Invalid {self.inputs()[key]["desc"]} (must be an integer)') return value def checkFloat(self, key, value): if isinstance(value, int): value = float(value) if not isinstance(value, float): raise TypeError(f'Invalid {self.inputs()[key]["desc"]} (must be float typed)') return value def checkStrictlyPositive(self, key, value): if value <= 0: d = self.inputs()[key] raise ValueError(f'Invalid {d["desc"]}: {value * d.get("factor", 1)} {d["unit"]} (must be strictly positive)') def checkPositiveOrNull(self, key, value): if value < 0: d = self.inputs()[key] raise ValueError(f'Invalid {d["desc"]}: {value * d.get("factor", 1)} {d["unit"]} (must be positive or null)') def checkStrictlyNegative(self, key, value): if value >= 0: d = self.inputs()[key] raise ValueError(f'Invalid {d["desc"]}: {value * d.get("factor", 1)} {d["unit"]} (must be strictly negative)') def checkNegativeOrNull(self, key, value): if value > 0: d = self.inputs()[key] raise ValueError(f'Invalid {d["desc"]}: {value * d.get("factor", 1)} {d["unit"]} (must be negative or null)') def checkBounded(self, key, value, bounds): if value < bounds[0] or value > bounds[1]: d = self.inputs()[key] f, u = d.get("factor", 1), d["unit"] bounds_str = f'[{bounds[0] * f}; {bounds[1] * f}] {u}' raise ValueError(f'Invalid {d["desc"]}: {value * f} {u} (must be within {bounds_str})') def gaussian(x, mu=0., sigma=1., A=1.): return A * np.exp(-((x - mu) / sigma)**2) def isPickable(obj): try: pickle.dumps(obj) except Exception as err: return False return True def resolveFuncArgs(func, *args, **kwargs): ''' Return a dictionary of positional and keyword arguments upon function call, adding defaults from simfunc signature if not provided at call time. ''' bound_args = signature(func).bind(*args, **kwargs) bound_args.apply_defaults() return dict(bound_args.arguments) def getMeta(model, simfunc, *args, **kwargs): ''' Construct an informative dictionary about the model and simulation parameters. ''' # Retrieve function call arguments args_dict = resolveFuncArgs(simfunc, model, *args, **kwargs) # Construct meta dictionary meta = {'simkey': model.simkey} for k, v in args_dict.items(): if k == 'self': meta['model'] = v.meta else: meta[k] = v return meta def bounds(arr): ''' Return the bounds or a numpy array / list. ''' return (np.nanmin(arr), np.nanmax(arr)) + + +def addColumn(df, key, arr, preceding_key=None): + ''' Add a new column to a dataframe, right after a specific column. ''' + df[key] = arr + if preceding_key is not None: + cols = df.columns.tolist()[:-1] + preceding_index = cols.index(preceding_key) + df = df[cols[:preceding_index + 1] + [key] + cols[preceding_index + 1:]] + return df diff --git a/README.md b/README.md index 2eeb66d..96fdbc1 100644 --- a/README.md +++ b/README.md @@ -1,228 +1,228 @@ # Description `PySONIC` is a Python implementation of the **multi-Scale Optimized Neuronal Intramembrane Cavitation (SONIC) model [1]**, a computationally efficient and interpretable model of neuronal intramembrane cavitation. It allows to simulate the responses of various neuron types to ultrasonic (and electrical) stimuli. ## Content of repository ### Models The package contains four model classes: - `Model` defines the generic interface of a model, including mandatory attributes and methods for simulating it. - `BilayerSonophore` defines the underlying **biomechanical model of intramembrane cavitation**. - `PointNeuron` defines an abstract generic interface to **conductance-based point-neuron electrical models**. It is inherited by classes defining the different neuron types with specific membrane dynamics. - `NeuronalBilayerSonophore` defines the **full electromechanical model for any given neuron type**. To do so, it inherits from `BilayerSonophore` and receives a specific `PointNeuron` object at initialization. All model classes contain a `simulate` method to simulate the underlying model's behavior for a given set of stimulation and physiological parameters. The `NeuronalBilayerSonophore.simulate` method contains an additional `method` argument defining whether to perform a detailed (`full`), coarse-grained (`sonic`) or hybrid (`hybrid`) integration of the differential system. -### Simulators +### Solvers -Numerical integration routines are implemented outside the models, in separate `Simulator` classes: -- `PeriodicSimulator` integrates a differential system periodically until a stable periodic behavior is detected. -- `PWSimulator` integrates a differential system given a specific temporal stimulation pattern (pulse repetition frequency, stimulus duty cycle and post-stimulus offset), using different derivative functions for "ON" (with stimulus) and "OFF" (without stimulus) periods -- `HybridSimulator` inherits from both `PeriodicSimulator`and `PWSimulator`. It integrates a differential system using a hybrid scheme inside each "ON" or "OFF" period: +Numerical integration routines are implemented outside the models, in a separate `solvers` module: +- `PeriodicSolver` integrates a differential system periodically until a stable periodic behavior is detected. +- `EventDrivenSolver` integrates a differential system across a specific set of "events" that modulate the stimuluation drive over time. +- `HybridSolver` inherits from both `PeriodicSolver`and `EventDrivenSolver`. It integrates a differential system using a hybrid scheme inside each "ON" or "OFF" period: 1. The full ODE system is integrated for a few cycles with a dense time granularity until a periodic stabilization detection 2. The profiles of all variables over the last cycle are resampled to a far lower (i.e. sparse) sampling rate 3. A subset of the ODE system is integrated with a sparse time granularity, while the remaining variables are periodically expanded from their last cycle profile, until the end of the period or that of an predefined update interval. 4. The process is repeated from step 1 ### Neurons Several conductance-based point-neuron models are implemented that inherit from the `PointNeuron` generic interface. Among them, several CNS models: - `CorticalRS`: cortical regular spiking (`RS`) neuron - `CorticalFS`: cortical fast spiking (`FS`) neuron - `CorticalLTS`: cortical low-threshold spiking (`LTS`) neuron - `CorticalIB`: cortical intrinsically bursting (`IB`) neuron - `ThalamicRE`: thalamic reticular (`RE`) neuron - `ThalamoCortical`: thalamo-cortical (`TC`) neuron - `OstukaSTN`: subthalamic nucleus (`STN`) neuron But also some valuable models used in peripheral axon models: - `FrankenhaeuserHuxleyNode`: Amphibian (Xenopus) myelinated fiber node (`FHnode`) - `SweeneyNode`: Mammalian (rabbit) myelinated motor fiber node (`SWnode`) - `MRGNode`: Mammalian (human / cat) myelinated fiber node (`MRGnode`) - `SundtSegment`: Mammalian unmyelinated C-fiber segment (`SUseg`) ### Other modules - `batches`: a generic interface to run simulation batches with or without multiprocessing - `parsers`: command line parsing utilities - `plt`: graphing utilities - `postpro`: post-processing utilities (mostly signal features detection) - `constants`: algorithmic constants used across modules and classes - `utils`: generic utilities # Requirements - Python 3.6+ - Package dependencies (numpy, scipy, ...) are installed automatically upon installation of the package. # Installation - Open a terminal. - Install [git-lfs](https://github.com/git-lfs/git-lfs/wiki/Installation) if not already done - Activate a Python3 environment if needed, e.g. on the tnesrv5 machine: ```source /opt/apps/anaconda3/bin activate``` - Check that the appropriate version of pip is activated: ```pip --version``` - Clone the repository and install the python package: ```git clone https://c4science.ch/diffusion/4670/pysonic.git``` ```cd pysonic``` ```pip install -e .``` # Usage ## Python scripts You can easily run simulations of any implemented point-neuron model under both electrical and ultrasonic stimuli, and visualize the simulation results, in just a few lines of code: ```python # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Email: theo.lemaire@epfl.ch # @Date: 2020-04-04 15:26:28 # @Last Modified by: Theo Lemaire -# @Last Modified time: 2020-04-04 15:29:52 +# @Last Modified time: 2020-04-13 16:34:56 import logging import matplotlib.pyplot as plt from PySONIC.core import PulsedProtocol, NeuronalBilayerSonophore, ElectricDrive, AcousticDrive from PySONIC.neurons import getPointNeuron from PySONIC.utils import logger from PySONIC.plt import GroupedTimeSeries # Set logging level logger.setLevel(logging.INFO) # Define point-neuron model and corresponding neuronal bilayer sonophore model pneuron = getPointNeuron('RS') a = 32e-9 # sonophore radius (m) nbls = NeuronalBilayerSonophore(a, pneuron) # Define electric and ultrasonic drives ELdrive = ElectricDrive(20.) # mA/m2 USdrive = AcousticDrive( 500e3, # Hz 100e3) # Pa # Set pulsing protocol tstim = 250e-3 # s toffset = 50e-3 # s PRF = 100. # Hz DC = 0.5 # - pp = PulsedProtocol(tstim, toffset, PRF, DC) # Run simulation upon electrical stimulation, and plot results data, meta = pneuron.simulate(ELdrive, pp) GroupedTimeSeries([(data, meta)]).render() # Run simulation upon ultrasonic stimulation, and plot results data, meta = nbls.simulate(USdrive, pp) GroupedTimeSeries([(data, meta)]).render() # Show figures plt.show() ``` ## From the command line ### Running simulations and batches You can easily run simulations of all 3 model types using the dedicated command line scripts. To do so, open a terminal in the `scripts` directory. - Use `run_mech.py` for simulations of the **mechanical model** upon **ultrasonic stimulation**. For instance, for a 32 nm radius bilayer sonophore sonicated at 500 kHz and 100 kPa: ```python run_mech.py -a 32 -f 500 -A 100 -p Z``` - Use `run_estim.py` for simulations of **point-neuron models** upon **intracellular electrical stimulation**. For instance, a regular-spiking (RS) neuron injected with 10 mA/m2 intracellular current for 30 ms: ```python run_estim.py -n RS -A 10 --tstim 30 -p Vm``` - Use `run_astim.py` for simulations of **point-neuron models** upon **ultrasonic stimulation**. For instance, for a coarse-grained simulation of a 32 nm radius bilayer sonophore within a regular-spiking (RS) neuron membrane, sonicated at 500 kHz and 100 kPa for 150 ms: ```python run_astim.py -n RS -a 32 -f 500 -A 100 --tstim 150 --method sonic -p Qm``` Additionally, you can run batches of simulations by specifying more than one value for any given stimulation parameter (e.g. `-A 100 200` for sonication with 100 and 200 kPa respectively). These batches can be parallelized using multiprocessing to optimize performance, with the extra argument `--mpi`. ### Saving and visualizing results By default, simulation results are neither shown, nor saved. To view results directly upon simulation completion, you can use the `-p [xxx]` option, where `[xxx]` can be `all` (to plot all resulting variables) or a given variable name (e.g. `Z` for membrane deflection, `Vm` for membrane potential, `Qm` for membrane charge density). To save simulation results in binary `.pkl` files, you can use the `-s` option. You will be prompted to choose an output directory, unless you also specify it with the `-o ` option. Output files are automatically named from model and simulation parameters to avoid ambiguity. When running simulation batches, it is highly advised to specify the `-s` option in order to save results of each simulation. You can then visualize results at a later stage. To visualize results, use the `plot_timeseries.py` script. You will be prompted to select the output files containing the simulation(s) results. By default, separate figures will be created for each simulation, showing the time profiles of all resulting variables. Here again, you can choose to show only a subset of variables using the `-p [xxx]` option. Moreover, if you select a subset of variables, you can visualize resulting profiles across simulations in comparative figures wih the `--compare` option. Several more options are available. To view them, type in: ```python -h``` # Extend the package ## Add other neuron types You can easily add other neuron types into the package, providing their ion channel populations and underlying voltage-gated dynamics equations are known. To add a new point-neuron model, follow this procedure: 1. Create a new file, and save it in the `neurons` sub-folder, with an explicit name (e.g. `my_neuron.py`). 2. Copy-paste the content of the `template.py` file (also located in the `neurons` sub-folder) into your file. 3. In your file, change the **class name** from `TemplateNeuron` to something more explicit (e.g. `MyNeuron`), and change the **neuron name** accordingly (e.g. `myneuron`). This name is a keyword used to refer to the model from outside the class. Also, comment out the `addSonicFeatures` decorator temporarily. 4. Modify/add **biophysical parameters** of your model (resting parameters, reversal potentials, channel conductances, ionic concentrations, temperatures, diffusion constants, etc...) as class attributes. If some parameters are not fixed and must be computed, assign them to the class inside a `__new__` method, taking the class (`cls`) as sole attribute. 5. Specify a **dictionary of names:descriptions of your different differential states** (i.e. all the differential variables of your model, except for the membrane potential). 6. Modify/add **gating states kinetics** (`alphax` and `betax` methods) that define the voltage-dependent activation and inactivation rates of the different ion channnels gates of your model. Those methods take the membrane potential `Vm` as input and return a rate in `s-1`. Alternatively, your can use steady-state open-probabilties (`xinf`) and adaptation time constants (`taux`) methods. 7. Modify the `derStates` method that defines the **derivatives of your different state variables**. These derivatives are defined inside a dictionary, where each state key is paired to a lambda function that takes the membrane potential `Vm` and a states vector `x` as inputs, and returns the associated state derivative (in `/s`). 8. Modify the `steadyStates` method that defines the **steady-state values of your different state variables**. These steady-states are defined inside a dictionary, where each state key is paired to a lambda function that takes the membrane potential `Vm` as only input, and returns the associated steady-state value (in ``). If some steady-states depend on the values of other-steady states, you can proceed as follows: - define all independent steady-states functions in a dictionary called `lambda_dict` - add dependent steady-state functions to the dictionary, calling `lambda_dict[k](Vm)` for each state `k` whose value is required. 9. Modify/add **membrane currents** (`iXX` methods) of your model. Those methods take relevant gating states and the membrane potential `Vm` as inputs, and must return a current density in `mA/m2`. **You also need to modify the docstring accordingly, as this information is used by the package**. 10. Modify the `currents` method that defines the **membrane currents of your model**. These currents are defined inside a dictionary, where each current key is paired to a lambda function that takes the membrane potential `Vm` and a states vector `x` as inputs, and returns the associated current (in `mA/m2`). 11. Add the neuron class to the package, by importing it in the `__init__.py` file of the `neurons` sub-folder: ```python from .my_neuron import MyNeuron ``` 12. Verify your point-neuron model by running simulations under various electrical stimuli and comparing the output to the neurons's expected behavior. Implement required corrections if any. 13. Uncomment the `addSonicFeatures` decorator on top of your class. **This decorator will automatically parse the `derStates`, `steadyStates` and `currents` methods in order to adapt your neuron model to US stimulation. For this to work, you need to make sure to**: - **keep them as class methods** - **check that all calls to functions that depend solely on `Vm` appear directly in the methods' lambda expressions and are not hidden inside nested function calls.** 14. Pre-compute lookup tables required to run coarse-grained simulations of the neuron model upon ultrasonic stimulation. To do so, go to the `scripts` directory and run the `run_lookups.py` script with the neuron's name as command line argument, e.g.: ```python run_lookups.py -n myneuron --mpi``` If possible, use the `--mpi` argument to enable multiprocessing, as lookups pre-computation greatly benefits from parallelization. That's it! You can now run simulations of your point-neuron model upon ultrasonic stimulation. # Authors Code written and maintained by Theo Lemaire (theo.lemaire@epfl.ch). # License & citation This project is licensed under the MIT License - see the LICENSE file for details. If PySONIC contributes to a project that leads to a scientific publication, please acknowledge this fact by citing [Lemaire, T., Neufeld, E., Kuster, N., and Micera, S. (2019). Understanding ultrasound neuromodulation using a computationally efficient and interpretable model of intramembrane cavitation. J. Neural Eng.](https://iopscience.iop.org/article/10.1088/1741-2552/ab1685) DOI: 10.1088/1741-2552/ab1685 # References [1] Lemaire, T., Neufeld, E., Kuster, N., and Micera, S. (2019). Understanding ultrasound neuromodulation using a computationally efficient and interpretable model of intramembrane cavitation. J. Neural Eng. diff --git a/README.mdpp b/README.mdpp index 9a54105..1bfee88 100644 --- a/README.mdpp +++ b/README.mdpp @@ -1,180 +1,180 @@ # Description `PySONIC` is a Python implementation of the **multi-Scale Optimized Neuronal Intramembrane Cavitation (SONIC) model [1]**, a computationally efficient and interpretable model of neuronal intramembrane cavitation. It allows to simulate the responses of various neuron types to ultrasonic (and electrical) stimuli. ## Content of repository ### Models The package contains four model classes: - `Model` defines the generic interface of a model, including mandatory attributes and methods for simulating it. - `BilayerSonophore` defines the underlying **biomechanical model of intramembrane cavitation**. - `PointNeuron` defines an abstract generic interface to **conductance-based point-neuron electrical models**. It is inherited by classes defining the different neuron types with specific membrane dynamics. - `NeuronalBilayerSonophore` defines the **full electromechanical model for any given neuron type**. To do so, it inherits from `BilayerSonophore` and receives a specific `PointNeuron` object at initialization. All model classes contain a `simulate` method to simulate the underlying model's behavior for a given set of stimulation and physiological parameters. The `NeuronalBilayerSonophore.simulate` method contains an additional `method` argument defining whether to perform a detailed (`full`), coarse-grained (`sonic`) or hybrid (`hybrid`) integration of the differential system. -### Simulators +### Solvers -Numerical integration routines are implemented outside the models, in separate `Simulator` classes: -- `PeriodicSimulator` integrates a differential system periodically until a stable periodic behavior is detected. -- `PWSimulator` integrates a differential system given a specific temporal stimulation pattern (pulse repetition frequency, stimulus duty cycle and post-stimulus offset), using different derivative functions for "ON" (with stimulus) and "OFF" (without stimulus) periods -- `HybridSimulator` inherits from both `PeriodicSimulator`and `PWSimulator`. It integrates a differential system using a hybrid scheme inside each "ON" or "OFF" period: +Numerical integration routines are implemented outside the models, in a separate `solvers` module: +- `PeriodicSolver` integrates a differential system periodically until a stable periodic behavior is detected. +- `EventDrivenSolver` integrates a differential system across a specific set of "events" that modulate the stimuluation drive over time. +- `HybridSolver` inherits from both `PeriodicSolver`and `EventDrivenSolver`. It integrates a differential system using a hybrid scheme inside each "ON" or "OFF" period: 1. The full ODE system is integrated for a few cycles with a dense time granularity until a periodic stabilization detection 2. The profiles of all variables over the last cycle are resampled to a far lower (i.e. sparse) sampling rate 3. A subset of the ODE system is integrated with a sparse time granularity, while the remaining variables are periodically expanded from their last cycle profile, until the end of the period or that of an predefined update interval. 4. The process is repeated from step 1 ### Neurons Several conductance-based point-neuron models are implemented that inherit from the `PointNeuron` generic interface. Among them, several CNS models: - `CorticalRS`: cortical regular spiking (`RS`) neuron - `CorticalFS`: cortical fast spiking (`FS`) neuron - `CorticalLTS`: cortical low-threshold spiking (`LTS`) neuron - `CorticalIB`: cortical intrinsically bursting (`IB`) neuron - `ThalamicRE`: thalamic reticular (`RE`) neuron - `ThalamoCortical`: thalamo-cortical (`TC`) neuron - `OstukaSTN`: subthalamic nucleus (`STN`) neuron But also some valuable models used in peripheral axon models: - `FrankenhaeuserHuxleyNode`: Amphibian (Xenopus) myelinated fiber node (`FHnode`) - `SweeneyNode`: Mammalian (rabbit) myelinated motor fiber node (`SWnode`) - `MRGNode`: Mammalian (human / cat) myelinated fiber node (`MRGnode`) - `SundtSegment`: Mammalian unmyelinated C-fiber segment (`SUseg`) ### Other modules - `batches`: a generic interface to run simulation batches with or without multiprocessing - `parsers`: command line parsing utilities - `plt`: graphing utilities - `postpro`: post-processing utilities (mostly signal features detection) - `constants`: algorithmic constants used across modules and classes - `utils`: generic utilities # Requirements - Python 3.6+ - Package dependencies (numpy, scipy, ...) are installed automatically upon installation of the package. # Installation - Open a terminal. - Install [git-lfs](https://github.com/git-lfs/git-lfs/wiki/Installation) if not already done - Activate a Python3 environment if needed, e.g. on the tnesrv5 machine: ```source /opt/apps/anaconda3/bin activate``` - Check that the appropriate version of pip is activated: ```pip --version``` - Clone the repository and install the python package: ```git clone https://c4science.ch/diffusion/4670/pysonic.git``` ```cd pysonic``` ```pip install -e .``` # Usage ## Python scripts You can easily run simulations of any implemented point-neuron model under both electrical and ultrasonic stimuli, and visualize the simulation results, in just a few lines of code: !INCLUDECODE "examples/run.py" (python) ## From the command line ### Running simulations and batches You can easily run simulations of all 3 model types using the dedicated command line scripts. To do so, open a terminal in the `scripts` directory. - Use `run_mech.py` for simulations of the **mechanical model** upon **ultrasonic stimulation**. For instance, for a 32 nm radius bilayer sonophore sonicated at 500 kHz and 100 kPa: ```python run_mech.py -a 32 -f 500 -A 100 -p Z``` - Use `run_estim.py` for simulations of **point-neuron models** upon **intracellular electrical stimulation**. For instance, a regular-spiking (RS) neuron injected with 10 mA/m2 intracellular current for 30 ms: ```python run_estim.py -n RS -A 10 --tstim 30 -p Vm``` - Use `run_astim.py` for simulations of **point-neuron models** upon **ultrasonic stimulation**. For instance, for a coarse-grained simulation of a 32 nm radius bilayer sonophore within a regular-spiking (RS) neuron membrane, sonicated at 500 kHz and 100 kPa for 150 ms: ```python run_astim.py -n RS -a 32 -f 500 -A 100 --tstim 150 --method sonic -p Qm``` Additionally, you can run batches of simulations by specifying more than one value for any given stimulation parameter (e.g. `-A 100 200` for sonication with 100 and 200 kPa respectively). These batches can be parallelized using multiprocessing to optimize performance, with the extra argument `--mpi`. ### Saving and visualizing results By default, simulation results are neither shown, nor saved. To view results directly upon simulation completion, you can use the `-p [xxx]` option, where `[xxx]` can be `all` (to plot all resulting variables) or a given variable name (e.g. `Z` for membrane deflection, `Vm` for membrane potential, `Qm` for membrane charge density). To save simulation results in binary `.pkl` files, you can use the `-s` option. You will be prompted to choose an output directory, unless you also specify it with the `-o ` option. Output files are automatically named from model and simulation parameters to avoid ambiguity. When running simulation batches, it is highly advised to specify the `-s` option in order to save results of each simulation. You can then visualize results at a later stage. To visualize results, use the `plot_timeseries.py` script. You will be prompted to select the output files containing the simulation(s) results. By default, separate figures will be created for each simulation, showing the time profiles of all resulting variables. Here again, you can choose to show only a subset of variables using the `-p [xxx]` option. Moreover, if you select a subset of variables, you can visualize resulting profiles across simulations in comparative figures wih the `--compare` option. Several more options are available. To view them, type in: ```python -h``` # Extend the package ## Add other neuron types You can easily add other neuron types into the package, providing their ion channel populations and underlying voltage-gated dynamics equations are known. To add a new point-neuron model, follow this procedure: 1. Create a new file, and save it in the `neurons` sub-folder, with an explicit name (e.g. `my_neuron.py`). 2. Copy-paste the content of the `template.py` file (also located in the `neurons` sub-folder) into your file. 3. In your file, change the **class name** from `TemplateNeuron` to something more explicit (e.g. `MyNeuron`), and change the **neuron name** accordingly (e.g. `myneuron`). This name is a keyword used to refer to the model from outside the class. Also, comment out the `addSonicFeatures` decorator temporarily. 4. Modify/add **biophysical parameters** of your model (resting parameters, reversal potentials, channel conductances, ionic concentrations, temperatures, diffusion constants, etc...) as class attributes. If some parameters are not fixed and must be computed, assign them to the class inside a `__new__` method, taking the class (`cls`) as sole attribute. 5. Specify a **dictionary of names:descriptions of your different differential states** (i.e. all the differential variables of your model, except for the membrane potential). 6. Modify/add **gating states kinetics** (`alphax` and `betax` methods) that define the voltage-dependent activation and inactivation rates of the different ion channnels gates of your model. Those methods take the membrane potential `Vm` as input and return a rate in `s-1`. Alternatively, your can use steady-state open-probabilties (`xinf`) and adaptation time constants (`taux`) methods. 7. Modify the `derStates` method that defines the **derivatives of your different state variables**. These derivatives are defined inside a dictionary, where each state key is paired to a lambda function that takes the membrane potential `Vm` and a states vector `x` as inputs, and returns the associated state derivative (in `/s`). 8. Modify the `steadyStates` method that defines the **steady-state values of your different state variables**. These steady-states are defined inside a dictionary, where each state key is paired to a lambda function that takes the membrane potential `Vm` as only input, and returns the associated steady-state value (in ``). If some steady-states depend on the values of other-steady states, you can proceed as follows: - define all independent steady-states functions in a dictionary called `lambda_dict` - add dependent steady-state functions to the dictionary, calling `lambda_dict[k](Vm)` for each state `k` whose value is required. 9. Modify/add **membrane currents** (`iXX` methods) of your model. Those methods take relevant gating states and the membrane potential `Vm` as inputs, and must return a current density in `mA/m2`. **You also need to modify the docstring accordingly, as this information is used by the package**. 10. Modify the `currents` method that defines the **membrane currents of your model**. These currents are defined inside a dictionary, where each current key is paired to a lambda function that takes the membrane potential `Vm` and a states vector `x` as inputs, and returns the associated current (in `mA/m2`). 11. Add the neuron class to the package, by importing it in the `__init__.py` file of the `neurons` sub-folder: ```python from .my_neuron import MyNeuron ``` 12. Verify your point-neuron model by running simulations under various electrical stimuli and comparing the output to the neurons's expected behavior. Implement required corrections if any. 13. Uncomment the `addSonicFeatures` decorator on top of your class. **This decorator will automatically parse the `derStates`, `steadyStates` and `currents` methods in order to adapt your neuron model to US stimulation. For this to work, you need to make sure to**: - **keep them as class methods** - **check that all calls to functions that depend solely on `Vm` appear directly in the methods' lambda expressions and are not hidden inside nested function calls.** 14. Pre-compute lookup tables required to run coarse-grained simulations of the neuron model upon ultrasonic stimulation. To do so, go to the `scripts` directory and run the `run_lookups.py` script with the neuron's name as command line argument, e.g.: ```python run_lookups.py -n myneuron --mpi``` If possible, use the `--mpi` argument to enable multiprocessing, as lookups pre-computation greatly benefits from parallelization. That's it! You can now run simulations of your point-neuron model upon ultrasonic stimulation. # Authors Code written and maintained by Theo Lemaire (theo.lemaire@epfl.ch). # License & citation This project is licensed under the MIT License - see the LICENSE file for details. If PySONIC contributes to a project that leads to a scientific publication, please acknowledge this fact by citing [Lemaire, T., Neufeld, E., Kuster, N., and Micera, S. (2019). Understanding ultrasound neuromodulation using a computationally efficient and interpretable model of intramembrane cavitation. J. Neural Eng.](https://iopscience.iop.org/article/10.1088/1741-2552/ab1685) DOI: 10.1088/1741-2552/ab1685 # References [1] Lemaire, T., Neufeld, E., Kuster, N., and Micera, S. (2019). Understanding ultrasound neuromodulation using a computationally efficient and interpretable model of intramembrane cavitation. J. Neural Eng.