diff --git a/PySONIC/core/batches.py b/PySONIC/core/batches.py index 4c8d542..70b230c 100644 --- a/PySONIC/core/batches.py +++ b/PySONIC/core/batches.py @@ -1,146 +1,161 @@ # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Email: theo.lemaire@epfl.ch # @Date: 2017-08-22 14:33:04 # @Last Modified by: Theo Lemaire -# @Last Modified time: 2019-07-01 10:10:45 +# @Last Modified time: 2019-09-17 16:03:19 ''' Utility functions used in simulations ''' import logging import multiprocess as mp import numpy as np from ..utils import logger class Consumer(mp.Process): ''' Generic consumer process, taking tasks from a queue and outputing results in another queue. ''' def __init__(self, queue_in, queue_out): mp.Process.__init__(self) self.queue_in = queue_in self.queue_out = queue_out logger.debug('Starting %s', self.name) def run(self): while True: nextTask = self.queue_in.get() if nextTask is None: logger.debug('Exiting %s', self.name) self.queue_in.task_done() break answer = nextTask() self.queue_in.task_done() self.queue_out.put(answer) return class Worker: ''' Generic worker class calling a specific function with a given set of parameters. ''' - def __init__(self, wid, func, params, loglevel): + def __init__(self, wid, func, args, kwargs, loglevel): ''' Worker constructor. :param wid: worker ID :param func: function object - :param params: list of method parameters + :param args: list of method arguments + :param kwargs: dictionary of optional method arguments :param loglevel: logging level ''' self.id = wid self.func = func - self.params = params + self.args = args + self.kwargs = kwargs self.loglevel = loglevel def __call__(self): ''' Caller to the function with specific parameters. ''' logger.setLevel(self.loglevel) - return self.id, self.func(*self.params) + return self.id, self.func(*self.args, **self.kwargs) class Batch: ''' Generic interface to run batches of function calls. ''' def __init__(self, func, queue): ''' Batch constructor. :param func: function object :param queue: list of list of function parameters ''' self.func = func self.queue = queue def __call__(self, *args, **kwargs): ''' Call the internal run method. ''' return self.run(*args, **kwargs) def getNConsumers(self): ''' Determine number of consumers based on queue length and number of available CPUs. ''' return min(mp.cpu_count(), len(self.queue)) def start(self): ''' Create tasks and results queues, and start consumers. ''' mp.freeze_support() self.tasks = mp.JoinableQueue() self.results = mp.Queue() self.consumers = [Consumer(self.tasks, self.results) for i in range(self.getNConsumers())] for c in self.consumers: c.start() + @staticmethod + def resolve(params): + if isinstance(params, list): + args = params + kwargs = {} + elif isinstance(params, tuple): + args, kwargs = params + return args, kwargs + def assign(self, loglevel): ''' Assign tasks to workers. ''' for i, params in enumerate(self.queue): - worker = Worker(i, self.func, params, loglevel) + args, kwargs = self.resolve(params) + worker = Worker(i, self.func, args, kwargs, loglevel) self.tasks.put(worker, block=False) def join(self): ''' Put all tasks to None and join the queue. ''' for i in range(len(self.consumers)): self.tasks.put(None, block=False) self.tasks.join() def get(self): ''' Extract and re-order results. ''' outputs, idxs = [], [] for i in range(len(self.queue)): wid, out = self.results.get() outputs.append(out) idxs.append(wid) return [x for _, x in sorted(zip(idxs, outputs))] def stop(self): ''' Close tasks and results queues. ''' self.tasks.close() self.results.close() def run(self, mpi=False, loglevel=logging.INFO): ''' Run batch with or without multiprocessing. ''' if mpi: self.start() self.assign(loglevel) self.join() outputs = self.get() self.stop() else: - outputs = [self.func(*params) for params in self.queue] + outputs = [] + for params in self.queue: + args, kwargs = self.resolve(params) + outputs.append(self.func(*args, **kwargs)) return outputs @staticmethod def createQueue(*dims): ''' Create a serialized 2D array of all parameter combinations for a series of individual parameter sweeps. :param dims: list of lists (or 1D arrays) of input parameters :return: list of parameters (list) for each simulation ''' ndims = len(dims) dims_in = [dims[1], dims[0]] inds_out = [1, 0] if ndims > 2: dims_in += dims[2:] inds_out += list(range(2, ndims)) queue = np.stack(np.meshgrid(*dims_in), -1).reshape(-1, ndims) queue = queue[:, inds_out] return queue.tolist() diff --git a/PySONIC/core/bls.py b/PySONIC/core/bls.py index 051ddcb..3d394bf 100644 --- a/PySONIC/core/bls.py +++ b/PySONIC/core/bls.py @@ -1,796 +1,796 @@ # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Email: theo.lemaire@epfl.ch # @Date: 2016-09-29 16:16:19 # @Last Modified by: Theo Lemaire -# @Last Modified time: 2019-09-17 14:54:30 +# @Last Modified time: 2019-09-17 16:03:03 from enum import Enum import os import json import numpy as np import pandas as pd import scipy.integrate as integrate from scipy.optimize import brentq, curve_fit from .batches import Batch from .model import Model from .simulators import PeriodicSimulator from ..utils import logger, si_format, debug from ..constants import * class PmCompMethod(Enum): ''' Enum: types of computation method for the intermolecular pressure ''' direct = 1 predict = 2 def LennardJones(x, beta, alpha, C, m, n): ''' Generic expression of a Lennard-Jones function, adapted for the context of symmetric deflection (distance = 2x). :param x: deflection (i.e. half-distance) :param beta: x-shifting factor :param alpha: x-scaling factor :param C: y-scaling factor :param m: exponent of the repulsion term :param n: exponent of the attraction term :return: Lennard-Jones potential at given distance (2x) ''' return C * (np.power((alpha / (2 * x + beta)), m) - np.power((alpha / (2 * x + beta)), n)) def lookup(func): ''' Load parameters from lookup file, or compute them and sotre them in lookup file. ''' lookup_path = os.path.join(os.path.split(__file__)[0], 'bls_lookups.json') def wrapper(obj): # lookup_path = obj.getLookupsPath() akey = '{:.1f}'.format(obj.a * 1e9) Qkey = '{:.2f}'.format(obj.Qm0 * 1e5) # Open lookup files try: with open(lookup_path, 'r') as fh: lookups = json.load(fh) except FileNotFoundError: lookups = {} # If info not in lookups, compute parameters and add them if akey not in lookups or Qkey not in lookups[akey]: func(obj) if akey not in lookups: lookups[akey] = {Qkey: {'LJ_approx': obj.LJ_approx, 'Delta_eq': obj.Delta}} else: lookups[akey][Qkey] = {'LJ_approx': obj.LJ_approx, 'Delta_eq': obj.Delta} logger.debug('Saving BLS derived parameters to lookup file') with open(lookup_path, 'w') as fh: json.dump(lookups, fh, indent=2) # If lookup exists, load parameters from it else: logger.debug('Loading BLS derived parameters from lookup file') obj.LJ_approx = lookups[akey][Qkey]['LJ_approx'] obj.Delta = lookups[akey][Qkey]['Delta_eq'] return wrapper class BilayerSonophore(Model): ''' Definition of the Bilayer Sonophore Model - geometry - pressure terms - cavitation dynamics ''' # BIOMECHANICAL PARAMETERS T = 309.15 # Temperature (K) delta0 = 2.0e-9 # Thickness of the leaflet (m) Delta_ = 1.4e-9 # Initial gap between the two leaflets on a non-charged membrane at equil. (m) pDelta = 1.0e5 # Attraction/repulsion pressure coefficient (Pa) m = 5.0 # Exponent in the repulsion term (dimensionless) n = 3.3 # Exponent in the attraction term (dimensionless) rhoL = 1075.0 # Density of the surrounding fluid (kg/m^3) muL = 7.0e-4 # Dynamic viscosity of the surrounding fluid (Pa.s) muS = 0.035 # Dynamic viscosity of the leaflet (Pa.s) kA = 0.24 # Area compression modulus of the leaflet (N/m) alpha = 7.56 # Tissue shear loss modulus frequency coefficient (Pa.s) C0 = 0.62 # Initial gas molar concentration in the surrounding fluid (mol/m^3) kH = 1.613e5 # Henry's constant (Pa.m^3/mol) P0 = 1.0e5 # Static pressure in the surrounding fluid (Pa) Dgl = 3.68e-9 # Diffusion coefficient of gas in the fluid (m^2/s) xi = 0.5e-9 # Boundary layer thickness for gas transport across leaflet (m) c = 1515.0 # Speed of sound in medium (m/s) # BIOPHYSICAL PARAMETERS epsilon0 = 8.854e-12 # Vacuum permittivity (F/m) epsilonR = 1.0 # Relative permittivity of intramembrane cavity (dimensionless) tscale = 'us' # relevant temporal scale of the model simkey = 'MECH' # keyword used to characterize simulations made with this model def __init__(self, a, Cm0, Qm0, Fdrive=None, embedding_depth=0.0): ''' Constructor of the class. :param a: in-plane radius of the sonophore structure within the membrane (m) :param Cm0: membrane resting capacitance (F/m2) :param Qm0: membrane resting charge density (C/m2) :param Fdrive: frequency of acoustic perturbation (Hz) :param embedding_depth: depth of the embedding tissue around the membrane (m) ''' # Extract resting constants and geometry self.Cm0 = Cm0 self.Qm0 = Qm0 self.a = a self.d = embedding_depth self.S0 = np.pi * self.a**2 # Derive frequency-dependent tissue elastic modulus if Fdrive is not None: G_tissue = self.alpha * Fdrive # G'' (Pa) self.kA_tissue = 2 * G_tissue * self.d # kA of the tissue layer (N/m) else: self.kA_tissue = 0. # Compute Pm params self.computePMparams() # Compute initial volume and gas content self.V0 = np.pi * self.Delta * self.a**2 self.ng0 = self.gasPa2mol(self.P0, self.V0) def __repr__(self): s = '{}({:.1f} nm'.format(self.__class__.__name__, self.a * 1e9) if self.d > 0.: s += ', d={}m'.format(si_format(self.d, precision=1, space=' ')) return s + ')' @staticmethod def inputs(): return { 'a': { 'desc': 'sonophore radius', 'label': 'a', 'unit': 'nm', 'factor': 1e9, 'precision': 0 }, 'Fdrive': { 'desc': 'US frequency', 'label': 'f', 'unit': 'kHz', 'factor': 1e-3, 'precision': 0 }, 'Adrive': { 'desc': 'US pressure amplitude', 'label': 'A', 'unit': 'kPa', 'factor': 1e-3, 'precision': 2 }, 'Qm': { 'desc': 'membrane charge density', 'label': 'Q_m', 'unit': 'nC/cm^2', 'factor': 1e5, 'precision': 1 } } def filecodes(self, Fdrive, Adrive, Qm, PmCompMethod='predict'): return { 'simkey': self.simkey, 'a': '{:.0f}nm'.format(self.a * 1e9), 'Fdrive': '{:.0f}kHz'.format(Fdrive * 1e-3), 'Adrive': '{:.2f}kPa'.format(Adrive * 1e-3), 'Qm': '{:.1f}nCcm2'.format(Qm * 1e5) } @staticmethod def getPltVars(wrapleft='df["', wrapright='"]'): return { 'Pac': { 'desc': 'acoustic pressure', 'label': 'P_{AC}', 'unit': 'kPa', 'factor': 1e-3, 'func': 'Pacoustic({0}t{1}, meta["Adrive"] * {0}stimstate{1}, meta["Fdrive"])'.format( wrapleft, wrapright) }, 'Z': { 'desc': 'leaflets deflection', 'label': 'Z', 'unit': 'nm', 'factor': 1e9, 'bounds': (-1.0, 10.0) }, 'ng': { 'desc': 'gas content', 'label': 'n_g', 'unit': '10^{-22}\ mol', 'factor': 1e22, 'bounds': (1.0, 15.0) }, 'Pmavg': { 'desc': 'average intermolecular pressure', 'label': 'P_M', 'unit': 'kPa', 'factor': 1e-3, 'func': 'PMavgpred({0}Z{1})'.format(wrapleft, wrapright) }, 'Telastic': { 'desc': 'leaflet elastic tension', 'label': 'T_E', 'unit': 'mN/m', 'factor': 1e3, 'func': 'TEleaflet({0}Z{1})'.format(wrapleft, wrapright) }, 'Cm': { 'desc': 'membrane capacitance', 'label': 'C_m', 'unit': 'uF/cm^2', 'factor': 1e2, 'bounds': (0.0, 1.5), 'func': 'v_capacitance({0}Z{1})'.format(wrapleft, wrapright) } } @staticmethod def getPltScheme(): return { 'P_{AC}': ['Pac'], 'Z': ['Z'], 'n_g': ['ng'] } def curvrad(self, Z): ''' Leaflet curvature radius (signed variable) :param Z: leaflet apex deflection (m) :return: leaflet curvature radius (m) ''' if Z == 0.0: return np.inf else: return (self.a**2 + Z**2) / (2 * Z) def v_curvrad(self, Z): ''' Vectorized curvrad function ''' return np.array(list(map(self.curvrad, Z))) def surface(self, Z): ''' Surface area of the stretched leaflet (spherical cap formula) :param Z: leaflet apex deflection (m) :return: stretched leaflet surface (m^2) ''' return np.pi * (self.a**2 + Z**2) def volume(self, Z): ''' Volume of the inter-leaflet space (cylinder +/- 2 spherical caps) :param Z: leaflet apex deflection (m) :return: bilayer sonophore inner volume (m^3) ''' return np.pi * self.a**2 * self.Delta\ * (1 + (Z / (3 * self.Delta) * (3 + Z**2 / self.a**2))) def arealStrain(self, Z): ''' Areal strain of the stretched leaflet epsilon = (S - S0)/S0 = (Z/a)^2 :param Z: leaflet apex deflection (m) :return: areal strain (dimensionless) ''' return (Z / self.a)**2 def capacitance(self, Z): ''' Membrane capacitance (parallel-plate capacitor evaluated at average inter-layer distance) :param Z: leaflet apex deflection (m) :return: capacitance per unit area (F/m2) ''' if Z == 0.0: return self.Cm0 else: return ((self.Cm0 * self.Delta / self.a**2) * (Z + (self.a**2 - Z**2 - Z * self.Delta) / (2 * Z) * np.log((2 * Z + self.Delta) / self.Delta))) def v_capacitance(self, Z): ''' Vectorized capacitance function ''' return np.array(list(map(self.capacitance, Z))) def derCapacitance(self, Z, U): ''' Evolution of membrane capacitance :param Z: leaflet apex deflection (m) :param U: leaflet apex deflection velocity (m/s) :return: time derivative of capacitance per unit area (F/m2.s) ''' dCmdZ = ((self.Cm0 * self.Delta / self.a**2) * ((Z**2 + self.a**2) / (Z * (2 * Z + self.Delta)) - ((Z**2 + self.a**2) * np.log((2 * Z + self.Delta) / self.Delta)) / (2 * Z**2))) return dCmdZ * U @staticmethod def localDeflection(r, Z, R): ''' Local leaflet deflection at specific radial distance (signed) :param r: in-plane distance from center of the sonophore (m) :param Z: leaflet apex deflection (m) :param R: leaflet curvature radius (m) :return: local transverse leaflet deviation (m) ''' if np.abs(Z) == 0.0: return 0.0 else: return np.sign(Z) * (np.sqrt(R**2 - r**2) - np.abs(R) + np.abs(Z)) @staticmethod def Pacoustic(t, Adrive, Fdrive, phi=np.pi): ''' Time-varying acoustic pressure :param t: time (s) :param Adrive: acoustic drive amplitude (Pa) :param Fdrive: acoustic drive frequency (Hz) :param phi: acoustic drive phase (rad) ''' return Adrive * np.sin(2 * np.pi * Fdrive * t - phi) def PMlocal(self, r, Z, R): ''' Local intermolecular pressure :param r: in-plane distance from center of the sonophore (m) :param Z: leaflet apex deflection (m) :param R: leaflet curvature radius (m) :return: local intermolecular pressure (Pa) ''' z = self.localDeflection(r, Z, R) relgap = (2 * z + self.Delta) / self.Delta_ return self.pDelta * ((1 / relgap)**self.m - (1 / relgap)**self.n) def PMavg(self, Z, R, S): ''' Average intermolecular pressure across the leaflet (computed by quadratic integration) :param Z: leaflet apex outward deflection value (m) :param R: leaflet curvature radius (m) :param S: surface of the stretched leaflet (m^2) :return: averaged intermolecular resultant pressure (Pa) .. warning:: quadratic integration is computationally expensive. ''' # Integrate intermolecular force over an infinitely thin ring of radius r from 0 to a fTotal, _ = integrate.quad(lambda r, Z, R: 2 * np.pi * r * self.PMlocal(r, Z, R), 0, self.a, args=(Z, R)) return fTotal / S def v_PMavg(self, Z, R, S): ''' Vectorized PMavg function ''' return np.array(list(map(self.PMavg, Z, R, S))) def LJfitPMavg(self): ''' Determine optimal parameters of a Lennard-Jones expression approximating the average intermolecular pressure. These parameters are obtained by a nonlinear fit of the Lennard-Jones function for a range of deflection values between predetermined Zmin and Zmax. :return: 3-tuple with optimized LJ parameters for PmAvg prediction (Map) and the standard and max errors of the prediction in the fitting range (in Pascals) ''' # Determine lower bound of deflection range: when Pm = Pmmax PMmax = LJFIT_PM_MAX # Pa Zminlb = -0.49 * self.Delta Zminub = 0.0 Zmin = brentq(lambda Z, Pmmax: self.PMavg(Z, self.curvrad(Z), self.surface(Z)) - PMmax, Zminlb, Zminub, args=(PMmax), xtol=1e-16) # Create vectors for geometric variables Zmax = 2 * self.a Z = np.arange(Zmin, Zmax, 1e-11) Pmavg = self.v_PMavg(Z, self.v_curvrad(Z), self.surface(Z)) # Compute optimal nonlinear fit of custom LJ function with initial guess x0_guess = self.delta0 C_guess = 0.1 * self.pDelta nrep_guess = self.m nattr_guess = self.n pguess = (x0_guess, C_guess, nrep_guess, nattr_guess) popt, _ = curve_fit(lambda x, x0, C, nrep, nattr: LennardJones(x, self.Delta, x0, C, nrep, nattr), Z, Pmavg, p0=pguess, maxfev=100000) (x0_opt, C_opt, nrep_opt, nattr_opt) = popt Pmavg_fit = LennardJones(Z, self.Delta, x0_opt, C_opt, nrep_opt, nattr_opt) # Compute prediction error residuals = Pmavg - Pmavg_fit ss_res = np.sum(residuals**2) N = residuals.size std_err = np.sqrt(ss_res / N) max_err = max(np.abs(residuals)) logger.debug('LJ approx: x0 = %.2f nm, C = %.2f kPa, m = %.2f, n = %.2f', x0_opt * 1e9, C_opt * 1e-3, nrep_opt, nattr_opt) LJ_approx = {"x0": x0_opt, "C": C_opt, "nrep": nrep_opt, "nattr": nattr_opt} return (LJ_approx, std_err, max_err) @lookup def computePMparams(self): # Find Delta that cancels out Pm + Pec at Z = 0 (m) if self.Qm0 == 0.0: D_eq = self.Delta_ else: (D_eq, Pnet_eq) = self.findDeltaEq(self.Qm0) assert Pnet_eq < PNET_EQ_MAX, 'High Pnet at Z = 0 with ∆ = %.2f nm' % (D_eq * 1e9) self.Delta = D_eq # Find optimal Lennard-Jones parameters to approximate PMavg (self.LJ_approx, std_err, _) = self.LJfitPMavg() assert std_err < PMAVG_STD_ERR_MAX, 'High error in PmAvg nonlinear fit:'\ ' std_err = %.2f Pa' % std_err def PMavgpred(self, Z): ''' Approximated average intermolecular pressure (using nonlinearly fitted Lennard-Jones function) :param Z: leaflet apex deflection (m) :return: predicted average intermolecular pressure (Pa) ''' return LennardJones(Z, self.Delta, self.LJ_approx['x0'], self.LJ_approx['C'], self.LJ_approx['nrep'], self.LJ_approx['nattr']) def Pelec(self, Z, Qm): ''' Electrical pressure term :param Z: leaflet apex deflection (m) :param Qm: membrane charge density (C/m2) :return: electrical pressure (Pa) ''' relS = self.S0 / self.surface(Z) abs_perm = self.epsilon0 * self.epsilonR # F/m return - relS * Qm**2 / (2 * abs_perm) # Pa def findDeltaEq(self, Qm): ''' Compute the Delta that cancels out the (Pm + Pec) equation at Z = 0 for a given membrane charge density, using the Brent method to refine the pressure root iteratively. :param Qm: membrane charge density (C/m2) :return: equilibrium value (m) and associated pressure (Pa) ''' def dualPressure(Delta): x = (self.Delta_ / Delta) return (self.pDelta * (x**self.m - x**self.n) + self.Pelec(0.0, Qm)) Delta_eq = brentq(dualPressure, 0.1 * self.Delta_, 2.0 * self.Delta_, xtol=1e-16) logger.debug('∆eq = %.2f nm', Delta_eq * 1e9) return (Delta_eq, dualPressure(Delta_eq)) def gasFlux(self, Z, P): ''' Gas molar flux through the sonophore boundary layers :param Z: leaflet apex deflection (m) :param P: internal gas pressure (Pa) :return: gas molar flux (mol/s) ''' dC = self.C0 - P / self.kH return 2 * self.surface(Z) * self.Dgl * dC / self.xi @classmethod def gasmol2Pa(cls, ng, V): ''' Internal gas pressure for a given molar content :param ng: internal molar content (mol) :param V: sonophore inner volume (m^3) :return: internal gas pressure (Pa) ''' return ng * Rg * cls.T / V @classmethod def gasPa2mol(cls, P, V): ''' Internal gas molar content for a given pressure :param P: internal gas pressure (Pa) :param V: sonophore inner volume (m^3) :return: internal gas molar content (mol) ''' return P * V / (Rg * cls.T) def PtotQS(self, Z, ng, Qm, Pac, Pm_comp_method): ''' Net quasi-steady pressure for a given acoustic pressure (Ptot = Pm + Pg + Pec - P0 - Pac) :param Z: leaflet apex deflection (m) :param ng: internal molar content (mol) :param Qm: membrane charge density (C/m2) :param Pac: acoustic pressure (Pa) :param Pm_comp_method: computation method for average intermolecular pressure :return: total balance pressure (Pa) ''' if Pm_comp_method is PmCompMethod.direct: Pm = self.PMavg(Z, self.curvrad(Z), self.surface(Z)) elif Pm_comp_method is PmCompMethod.predict: Pm = self.PMavgpred(Z) return Pm + self.gasmol2Pa(ng, self.volume(Z)) - self.P0 - Pac + self.Pelec(Z, Qm) def balancedefQS(self, ng, Qm, Pac=0.0, Pm_comp_method=PmCompMethod.predict): ''' Quasi-steady equilibrium deflection for a given acoustic pressure (computed by approximating the root of quasi-steady pressure) :param ng: internal molar content (mol) :param Qm: membrane charge density (C/m2) :param Pac: external acoustic perturbation (Pa) :param Pm_comp_method: computation method for average intermolecular pressure :return: leaflet deflection canceling quasi-steady pressure (m) ''' lb, ub = -0.49 * self.Delta, self.a Plb, Pub = [self.PtotQS(x, ng, Qm, Pac, Pm_comp_method) for x in [lb, ub]] assert (Plb > 0 > Pub), '[{}, {}] is not a sign changing interval for PtotQS'.format(lb, ub) return brentq(self.PtotQS, lb, ub, args=(ng, Qm, Pac, Pm_comp_method), xtol=1e-16) def TEleaflet(self, Z): ''' Elastic tension in leaflet :param Z: leaflet apex deflection (m) :return: circumferential elastic tension (N/m) ''' return self.kA * self.arealStrain(Z) def TEtissue(self, Z): ''' Elastic tension in surrounding viscoelastic layer :param Z: leaflet apex deflection (m) :return: circumferential elastic tension (N/m) ''' return self.kA_tissue * self.arealStrain(Z) def TEtot(self, Z): ''' Total elastic tension (leaflet + surrounding viscoelastic layer) :param Z: leaflet apex deflection (m) :return: circumferential elastic tension (N/m) ''' return self.TEleaflet(Z) + self.TEtissue(Z) def PEtot(self, Z, R): ''' Total elastic tension pressure (leaflet + surrounding viscoelastic layer) :param Z: leaflet apex deflection (m) :param R: leaflet curvature radius (m) :return: elastic tension pressure (Pa) ''' return - self.TEtot(Z) / R @classmethod def PVleaflet(cls, U, R): ''' Viscous stress pressure in leaflet :param U: leaflet apex deflection velocity (m/s) :param R: leaflet curvature radius (m) :return: leaflet viscous stress pressure (Pa) ''' return - 12 * U * cls.delta0 * cls.muS / R**2 @classmethod def PVfluid(cls, U, R): ''' Viscous stress pressure in surrounding medium :param U: leaflet apex deflection velocity (m/s) :param R: leaflet curvature radius (m) :return: fluid viscous stress pressure (Pa) ''' return - 4 * U * cls.muL / np.abs(R) @classmethod def accP(cls, Ptot, R): ''' Leaflet transverse acceleration resulting from pressure imbalance :param Ptot: net pressure (Pa) :param R: leaflet curvature radius (m) :return: pressure-driven acceleration (m/s^2) ''' return Ptot / (cls.rhoL * np.abs(R)) @staticmethod def accNL(U, R): ''' Leaflet transverse nonlinear acceleration :param U: leaflet apex deflection velocity (m/s) :param R: leaflet curvature radius (m) :return: nonlinear acceleration term (m/s^2) .. note:: A simplified version of nonlinear acceleration (neglecting dR/dH) is used here. ''' # return - (3/2 - 2*R/H) * U**2 / R return -(3 * U**2) / (2 * R) @staticmethod def checkInputs(Fdrive, Adrive, Qm, phi): ''' Check validity of stimulation parameters :param Fdrive: acoustic drive frequency (Hz) :param Adrive: acoustic drive amplitude (Pa) :param phi: acoustic drive phase (rad) :param Qm: imposed membrane charge density (C/m2) ''' if not all(isinstance(param, float) for param in [Fdrive, Adrive, Qm, phi]): raise TypeError('Invalid stimulation parameters (must be float typed)') if Fdrive <= 0: raise ValueError('Invalid US driving frequency: {} kHz (must be strictly positive)' .format(Fdrive * 1e-3)) if Adrive < 0: raise ValueError('Invalid US pressure amplitude: {} kPa (must be positive or null)' .format(Adrive * 1e-3)) if Qm < CHARGE_RANGE[0] or Qm > CHARGE_RANGE[1]: raise ValueError('Invalid applied charge: {} nC/cm2 (must be within [{}, {}] interval' .format(Qm * 1e5, CHARGE_RANGE[0] * 1e5, CHARGE_RANGE[1] * 1e5)) if phi < 0 or phi >= 2 * np.pi: raise ValueError('Invalid US pressure phase: {:.2f} rad (must be within [0, 2 PI[ rad' .format(phi)) def derivatives(self, t, y, Fdrive, Adrive, Qm, phi, Pm_comp_method=PmCompMethod.predict): ''' Evolution of the mechanical system :param t: time instant (s) :param y: vector of HH system variables at time t :param Fdrive: acoustic drive frequency (Hz) :param Adrive: acoustic drive amplitude (Pa) :param Qm: membrane charge density (F/m2) :param phi: acoustic drive phase (rad) :param Pm_comp_method: computation method for average intermolecular pressure :return: vector of mechanical system derivatives at time t ''' # Split input vector explicitly U, Z, ng = y # Correct deflection value is below critical compression if Z < -0.5 * self.Delta: logger.warning('Deflection out of range: Z = %.2f nm', Z * 1e9) Z = -0.49 * self.Delta # Compute curvature radius R = self.curvrad(Z) # Compute total pressure Pg = self.gasmol2Pa(ng, self.volume(Z)) if Pm_comp_method is PmCompMethod.direct: Pm = self.PMavg(Z, self.curvrad(Z), self.surface(Z)) elif Pm_comp_method is PmCompMethod.predict: Pm = self.PMavgpred(Z) Ptot = (Pm + Pg - self.P0 - self.Pacoustic(t, Adrive, Fdrive, phi) + self.PEtot(Z, R) + self.PVleaflet(U, R) + self.PVfluid(U, R) + self.Pelec(Z, Qm)) # Compute derivatives dUdt = self.accP(Ptot, R) + self.accNL(U, R) dZdt = U dngdt = self.gasFlux(Z, Pg) # Return derivatives vector return [dUdt, dZdt, dngdt] def computeInitialDeflection(self, Adrive, Fdrive, phi, Qm, dt, Pm_comp_method=PmCompMethod.predict): ''' Compute non-zero deflection value for a small perturbation (solving quasi-steady equation). ''' Pac = self.Pacoustic(dt, Adrive, Fdrive, phi) return self.balancedefQS(self.ng0, Qm, Pac, Pm_comp_method) @classmethod @Model.checkOutputDir - def simQueue(cls, *args, outputdir=None): + def simQueue(cls, *args, **kwargs): return Batch.createQueue(*args) @Model.addMeta def simulate(self, Fdrive, Adrive, Qm, phi=np.pi, Pm_comp_method=PmCompMethod.predict): ''' Simulate until periodic stabilization for a specific set of ultrasound parameters, and return output data in a dataframe. :param Fdrive: acoustic drive frequency (Hz) :param Adrive: acoustic drive amplitude (Pa) :param phi: acoustic drive phase (rad) :param Qm: imposed membrane charge density (C/m2) :param Pm_comp_method: type of method used to compute average intermolecular pressure :return: 2-tuple with the output dataframe and computation time. ''' logger.info('%s: simulation @ f = %sHz, A = %sPa, Q = %sC/cm2', self, *si_format([Fdrive, Adrive, Qm * 1e-4], 2, space=' ')) # Check validity of stimulation parameters self.checkInputs(Fdrive, Adrive, Qm, phi) # Determine time step dt = 1 / (NPC_DENSE * Fdrive) # Compute initial non-zero deflection Z = self.computeInitialDeflection(Adrive, Fdrive, phi, Qm, dt, Pm_comp_method=Pm_comp_method) # Set initial conditions y0 = np.array([0., 0., self.ng0]) y1 = np.array([0., Z, self.ng0]) # Initialize simulator and compute solution simulator = PeriodicSimulator( lambda t, y: self.derivatives(t, y, Fdrive, Adrive, Qm, phi, Pm_comp_method), ivars_to_check=[1, 2]) t, y, stim = simulator(y1, dt, 1. / Fdrive) # Prepend initial conditions (prior to stimulation) t, y, stim = simulator.prependSolution(t, y, stim, y0=y0) # Set last stimulation state to zero stim[-1] = 0 # Store output in dataframe and return return pd.DataFrame({ 't': t, 'stimstate': stim, 'Z': y[:, 1], 'ng': y[:, 2] }) def meta(self, Fdrive, Adrive, Qm, Pm_comp_method): return { 'simkey': self.simkey, 'a': self.a, 'd': self.d, 'Cm0': self.Cm0, 'Qm0': self.Qm0, 'Fdrive': Fdrive, 'Adrive': Adrive, 'Qm': Qm, 'Pm_comp_method': Pm_comp_method } def getCycleProfiles(self, Fdrive, Adrive, Qm): ''' Simulate mechanical system and compute pressures over the last acoustic cycle :param Fdrive: acoustic drive frequency (Hz) :param Adrive: acoustic drive amplitude (Pa) :param Qm: imposed membrane charge density (C/m2) :return: dataframe with the time, kinematic and pressure profiles over the last cycle. ''' # Run default simulation and retrieve last cycle solution logger.info('Running mechanical simulation (a = %sm, f = %sHz, A = %sPa)', si_format(self.a, 1), si_format(Fdrive, 1), si_format(Adrive, 1)) data = self.simulate( Fdrive, Adrive, Qm, Pm_comp_method=PmCompMethod.direct)[0].iloc[-NPC_DENSE:, :] # Extract relevant variables and de-offset time vector t, Z, ng = [data[key].values for key in ['t', 'Z', 'ng']] dt = (t[-1] - t[0]) / (NPC_DENSE - 1) t -= t[0] # Compute pressure cyclic profiles logger.info('Computing pressure cyclic profiles') R = self.v_curvrad(Z) U = np.diff(Z) / dt U = np.hstack((U, U[-1])) data = { 't': t, 'Z': Z, 'Cm': self.v_capacitance(Z), 'P_M': self.v_PMavg(Z, R, self.surface(Z)), 'P_Q': self.Pelec(Z, Qm), 'P_{VE}': self.PEtot(Z, R) + self.PVleaflet(U, R), 'P_V': self.PVfluid(U, R), 'P_G': self.gasmol2Pa(ng, self.volume(Z)), 'P_0': - np.ones(Z.size) * self.P0 } return pd.DataFrame(data, columns=data.keys()) diff --git a/PySONIC/core/model.py b/PySONIC/core/model.py index b459c26..3fa5f35 100644 --- a/PySONIC/core/model.py +++ b/PySONIC/core/model.py @@ -1,281 +1,294 @@ # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Email: theo.lemaire@epfl.ch # @Date: 2017-08-03 11:53:04 # @Last Modified by: Theo Lemaire -# @Last Modified time: 2019-09-17 14:56:49 +# @Last Modified time: 2019-09-17 16:02:45 import os from functools import wraps from inspect import signature, getdoc import pickle import abc import inspect import numpy as np from .batches import Batch from ..utils import logger, loadData, timer, si_format, plural, debug class Model(metaclass=abc.ABCMeta): ''' Generic model interface. ''' @property @abc.abstractmethod def tscale(self): ''' Relevant temporal scale of the model. ''' raise NotImplementedError @property @abc.abstractmethod def simkey(self): ''' Keyword used to characterize simulations made with the model. ''' raise NotImplementedError @property @abc.abstractmethod def __repr__(self): ''' String representation. ''' raise NotImplementedError def params(self): ''' Return a dictionary of all model parameters (class and instance attributes) ''' def toAvoid(p): return (p.startswith('__') and p.endswith('__')) or p.startswith('_abc_') class_attrs = inspect.getmembers(self.__class__, lambda a: not(inspect.isroutine(a))) inst_attrs = inspect.getmembers(self, lambda a: not(inspect.isroutine(a))) class_attrs = [a for a in class_attrs if not toAvoid(a[0])] inst_attrs = [a for a in inst_attrs if not toAvoid(a[0]) and a not in class_attrs] params_dict = {a[0]: a[1] for a in class_attrs + inst_attrs} return params_dict @classmethod def description(cls): return getdoc(cls).split('\n', 1)[0].strip() @staticmethod @abc.abstractmethod def inputs(): ''' Return an informative dictionary on input variables used to simulate the model. ''' raise NotImplementedError @property @abc.abstractmethod def filecodes(self, *args): ''' Return a dictionary of string-encoded inputs used for file naming. ''' raise NotImplementedError def filecode(self, *args): ''' Generate file code given a specific combination of model input parameters. ''' # If meta dictionary was passed, generate inputs list from it if len(args) == 1 and isinstance(args[0], dict): meta = args[0] meta.pop('tcomp', None) nparams = len(signature(self.meta).parameters) args = list(meta.values())[-nparams:] # Create file code by joining string-encoded inputs with underscores codes = self.filecodes(*args).values() return '_'.join([x for x in codes if x is not None]) @classmethod @abc.abstractmethod def getPltVars(self, *args, **kwargs): ''' Return a dictionary with information about all plot variables related to the model. ''' raise NotImplementedError @classmethod @abc.abstractmethod def getPltScheme(self): ''' Return a dictionary model plot variables grouped by context. ''' raise NotImplementedError @staticmethod def checkOutputDir(queuefunc): ''' Check if an outputdir is provided in input arguments, and if so, add it as - the first element of each item in the returned queue. + the first element of each item in the returned queue (along with an "overwrite" + boolean as keyword argument to each item in the queue. ''' @wraps(queuefunc) def wrapper(self, *args, **kwargs): outputdir = kwargs.pop('outputdir') queue = queuefunc(self, *args, **kwargs) if outputdir is not None: - for item in queue: - item.insert(0, outputdir) + overwrite = kwargs.pop('overwrite', True) + queue = queuefunc(self, *args, **kwargs) + for i, params in enumerate(queue): + position_args, keyword_args = Batch.resolve(params) + position_args.insert(0, outputdir) + keyword_args['overwrite'] = overwrite + queue[i] = (position_args, keyword_args) else: if len(queue) > 5: logger.warning('Running more than 5 simulations without file saving') return queue return wrapper - @staticmethod - def checkOverwrite(queue, overwrite): - ''' Check if an outputdir is provided in input arguments, and if so, add it as - the first element of each item in the returned queue. - ''' - for item in queue: - item.append(overwrite) - return queue - @classmethod @abc.abstractmethod - def simQueue(cls, *args, outputdir=None): + def simQueue(cls, *args, outputdir=None, overwrite=True): return NotImplementedError @staticmethod @abc.abstractmethod def checkInputs(self, *args): ''' Check the validity of simulation input parameters. ''' raise NotImplementedError @property @abc.abstractmethod def derivatives(self, *args, **kwargs): ''' Compute ODE derivatives for a specific set of ODE states and external parameters. ''' raise NotImplementedError @property @abc.abstractmethod def simulate(self, *args, **kwargs): ''' Simulate the model's differential system for specific input parameters and return output data in a dataframe. ''' raise NotImplementedError @classmethod @abc.abstractmethod def meta(self, *args): ''' Return an informative dictionary about model and simulation parameters. ''' raise NotImplementedError @staticmethod def addMeta(simfunc): ''' Add an informative dictionary about model and simulation parameters to simulation output ''' @wraps(simfunc) def wrapper(self, *args, **kwargs): data, tcomp = timer(simfunc)(self, *args, **kwargs) logger.debug('completed in %ss', si_format(tcomp, 1)) # Add keyword arguments from simfunc signature if not provided bound_args = signature(simfunc).bind(self, *args, **kwargs) bound_args.apply_defaults() target_args = dict(bound_args.arguments) # Try to retrieve meta information try: meta_params_names = list(signature(self.meta).parameters.keys()) meta_params = [target_args[k] for k in meta_params_names] meta = self.meta(*meta_params) except KeyError as err: logger.error(f'Could not find {err} parameter in "{simfunc.__name__}" function') meta = {} # Add computation time to it meta['tcomp'] = tcomp # Return data with meta dict return data, meta return wrapper @staticmethod def logNSpikes(simfunc): ''' Log number of detected spikes on charge profile of simulation output. ''' @wraps(simfunc) def wrapper(self, *args, **kwargs): out = simfunc(self, *args, **kwargs) if out is None: return None data, meta = out nspikes = self.getNSpikes(data) logger.debug('{} spike{} detected'.format(nspikes, plural(nspikes))) return data, meta return wrapper @staticmethod def checkTitrate(argname): ''' If no None provided in the list of input parameters, perform a titration to find the threshold parameter and add it to the list. ''' def wrapper_with_args(simfunc): @wraps(simfunc) def wrapper(self, *args, **kwargs): # Get argument index from function signature func_args = list(signature(simfunc).parameters.keys())[1:] iarg = func_args.index(argname) # If argument is None if args[iarg] is None: # Generate new args list without argument args = list(args) new_args = args.copy() del new_args[iarg] # Perform titration to find threshold argument value xthr = self.titrate(*new_args) if np.isnan(xthr): logger.error(f'Could not find threshold {argname}') return None # Re-insert it into arguments list args[iarg] = xthr # Execute simulation function return simfunc(self, *args, **kwargs) return wrapper return wrapper_with_args - def simAndSave(self, outdir, *args, overwrite=True): + def simAndSave(self, outdir, *args, **kwargs): ''' Simulate the model and save the results in a specific output directory. :param outdir: ouput directory :param *args: list of arguments provided to the simulation function - :param overwrite: boolean stating whether or not to overwrite a corresponding - output file if it is found in the directory. That check is performed prior - to running the simulation, such that it is not run if the file is present and - overwrite is set ot false. + :param **kwargs: optional arguments dictionary :return: output filepath ''' + + # boolean stating whether or not to overwrite a corresponding output file if it is found + # in the directory. + overwrite = kwargs.pop('overwrite') + + # Set data and meta to None data, meta = None, None + + # None in sim args -> titration case -> store data and meta, and add threshold amp to args if None in args: args = list(args) iNone = next(i for i, arg in enumerate(args) if arg is None) out = self.simulate(*args) if out is None: logger.warning('returning None') return None data, meta = out args[iNone] = meta['Adrive'] + # Check if a output file corresponding to sim inputs is found in the output directory + # That check is performed prior to running the simulation, such that + # it is not run if the file is present and overwrite is set ot false. fname = f'{self.filecode(*args)}.pkl' fpath = os.path.join(outdir, fname) existing_file_msg = f'File "{fname}" already present in directory "{outdir}"' existing_file = os.path.isfile(fpath) + + # If file exists and overwrite is set ot false -> return if existing_file and not overwrite: logger.warning(f'{existing_file_msg} -> preserving') - else: - if data is None: - data, meta = self.simulate(*args) - if existing_file: - logger.warning(f'{existing_file_msg} -> overwriting') - with open(fpath, 'wb') as fh: - pickle.dump({'meta': meta, 'data': data}, fh) - logger.debug('simulation data exported to "%s"', fpath) + return fpath + + # Run simulation if not already done (for titration cases) + if data is None: + data, meta = self.simulate(*args) + + # Raise warning if an existing file is overwritten + if existing_file: + logger.warning(f'{existing_file_msg} -> overwriting') + + # Save output file and return output filepath + with open(fpath, 'wb') as fh: + pickle.dump({'meta': meta, 'data': data}, fh) + logger.debug('simulation data exported to "%s"', fpath) return fpath def getOutput(self, outdir, *args): ''' Get simulation output data for a specific parameters combination, by looking for an output file into a specific directory. If a corresponding output file is not found in the specified directory, the model is first run and results are saved in the output file. ''' fpath = '{}/{}.pkl'.format(outdir, self.filecode(*args)) if not os.path.isfile(fpath): self.simAndSave(outdir, *args) return loadData(fpath) diff --git a/PySONIC/core/nbls.py b/PySONIC/core/nbls.py index 6ac20a7..a1bf3f7 100644 --- a/PySONIC/core/nbls.py +++ b/PySONIC/core/nbls.py @@ -1,634 +1,634 @@ # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Email: theo.lemaire@epfl.ch # @Date: 2016-09-29 16:16:19 # @Last Modified by: Theo Lemaire -# @Last Modified time: 2019-09-17 14:58:06 +# @Last Modified time: 2019-09-17 16:10:37 from copy import deepcopy import logging import numpy as np import numdifftools as nd import pandas as pd from scipy import linalg from .simulators import PWSimulator, HybridSimulator, PeriodicSimulator from .bls import BilayerSonophore from .pneuron import PointNeuron from .model import Model from .batches import Batch from ..utils import * from ..constants import * from ..postpro import getFixedPoints from .lookups import SmartLookup, SmartDict, fixLookup NEURONS_LOOKUP_DIR = os.path.abspath(os.path.split(__file__)[0] + "/../neurons/") class NeuronalBilayerSonophore(BilayerSonophore): ''' This class inherits from the BilayerSonophore class and receives an PointNeuron instance at initialization, to define the electro-mechanical NICE model and its SONIC variant. ''' tscale = 'ms' # relevant temporal scale of the model simkey = 'ASTIM' # keyword used to characterize simulations made with this model def __init__(self, a, pneuron, Fdrive=None, embedding_depth=0.0): ''' Constructor of the class. :param a: in-plane radius of the sonophore structure within the membrane (m) :param pneuron: point-neuron model :param Fdrive: frequency of acoustic perturbation (Hz) :param embedding_depth: depth of the embedding tissue around the membrane (m) ''' # Check validity of input parameters if not isinstance(pneuron, PointNeuron): raise ValueError('Invalid neuron type: "{}" (must inherit from PointNeuron class)' .format(pneuron.name)) self.pneuron = pneuron # Initialize BilayerSonophore parent object BilayerSonophore.__init__(self, a, pneuron.Cm0, pneuron.Qm0(), embedding_depth) def __repr__(self): s = '{}({:.1f} nm, {}'.format(self.__class__.__name__, self.a * 1e9, self.pneuron) if self.d > 0.: s += ', d={}m'.format(si_format(self.d, precision=1, space=' ')) return s + ')' def params(self): return {**super().params(), **self.pneuron.params()} def getPltVars(self, wrapleft='df["', wrapright='"]'): return {**super().getPltVars(wrapleft, wrapright), **self.pneuron.getPltVars(wrapleft, wrapright)} def getPltScheme(self): return self.pneuron.getPltScheme() def filecode(self, *args): return Model.filecode(self, *args) @staticmethod def inputs(): # Get parent input vars and supress irrelevant entries bls_vars = BilayerSonophore.inputs() pneuron_vars = PointNeuron.inputs() del bls_vars['Qm'] del pneuron_vars['Astim'] # Fill in current input vars in appropriate order inputvars = bls_vars inputvars.update(pneuron_vars) inputvars['fs'] = { 'desc': 'sonophore membrane coverage fraction', 'label': 'f_s', 'unit': '\%', 'factor': 1e2, 'precision': 0 } inputvars['method'] = None return inputvars def filecodes(self, Fdrive, Adrive, tstim, toffset, PRF, DC, fs, method): # Get parent codes and supress irrelevant entries bls_codes = super().filecodes(Fdrive, Adrive, 0.0) pneuron_codes = self.pneuron.filecodes(0.0, tstim, toffset, PRF, DC) for x in [bls_codes, pneuron_codes]: del x['simkey'] del bls_codes['Qm'] del pneuron_codes['Astim'] # Fill in current codes in appropriate order codes = { 'simkey': self.simkey, 'neuron': pneuron_codes.pop('neuron'), 'nature': pneuron_codes.pop('nature') } codes.update(bls_codes) codes.update(pneuron_codes) codes['fs'] = 'fs{:.0f}%'.format(fs * 1e2) if fs < 1 else None codes['method'] = method return codes @staticmethod def interpOnOffVariable(key, Qm, stim, lkp): ''' Interpolate Q-dependent effective variable along ON and OFF periods of a solution. :param key: lookup variable key :param Qm: charge density solution vector :param stim: stimulation state solution vector :param lkp: dictionary of lookups for ON and OFF states :return: interpolated effective variable vector ''' x = np.zeros(stim.size) x[stim == 0] = lkp['OFF'].interpVar(Qm[stim == 0], 'Q', key) x[stim == 1] = lkp['ON'].interpVar(Qm[stim == 1], 'Q', key) return x @staticmethod def spatialAverage(fs, x, x0): ''' fs-modulated spatial averaging. ''' return fs * x + (1 - fs) * x0 def computeEffVars(self, Fdrive, Adrive, Qm, fs): ''' Compute "effective" coefficients of the HH system for a specific combination of stimulus frequency, stimulus amplitude and charge density. A short mechanical simulation is run while imposing the specific charge density, until periodic stabilization. The HH coefficients are then averaged over the last acoustic cycle to yield "effective" coefficients. :param Fdrive: acoustic drive frequency (Hz) :param Adrive: acoustic drive amplitude (Pa) :param Qm: imposed charge density (C/m2) :param fs: list of sonophore membrane coverage fractions :return: list with computation time and a list of dictionaries of effective variables ''' # Run simulation and retrieve deflection and gas content vectors from last cycle data, meta = BilayerSonophore.simulate(self, Fdrive, Adrive, Qm) Z_last = data.loc[-NPC_DENSE:, 'Z'].values # m Cm_last = self.v_capacitance(Z_last) # F/m2 # For each coverage fraction effvars = [] for x in fs: # Compute membrane capacitance and membrane potential vectors Cm = self.spatialAverage(x, Cm_last, self.Cm0) # F/m2 Vm = Qm / Cm * 1e3 # mV # Compute average cycle value for membrane potential and rate constants effvars.append({**{'V': np.mean(Vm)}, **self.pneuron.getEffRates(Vm)}) # Log process log = '{}: lookups @ {}Hz, {}Pa, {:.2f} nC/cm2'.format( self, *si_format([Fdrive, Adrive], precision=1, space=' '), Qm * 1e5) if len(fs) > 1: log += ', fs = {:.0f} - {:.0f}%'.format(fs.min() * 1e2, fs.max() * 1e2) log += ', tcomp = {:.3f} s'.format(meta['tcomp']) logger.info(log) # Return effective coefficients return [meta['tcomp'], effvars] def getLookupFileName(self, a=None, Fdrive=None, Adrive=None, fs=False): fname = '{}_lookups'.format(self.pneuron.name) if a is not None: fname += '_{:.0f}nm'.format(a * 1e9) if Fdrive is not None: fname += '_{:.0f}kHz'.format(Fdrive * 1e-3) if Adrive is not None: fname += '_{:.0f}kPa'.format(Adrive * 1e-3) if fs is True: fname += '_fs' return '{}.pkl'.format(fname) def getLookupFilePath(self, *args, **kwargs): return os.path.join(NEURONS_LOOKUP_DIR, self.getLookupFileName(*args, **kwargs)) def getLookup(self, *args, **kwargs): lookup_path = self.getLookupFilePath(*args, **kwargs) if not os.path.isfile(lookup_path): raise FileNotFoundError('Missing lookup file: "{}"'.format(lookup_path)) with open(lookup_path, 'rb') as fh: frame = pickle.load(fh) if 'ng' in frame['lookup']: del frame['lookup']['ng'] refs = frame['input'] # Move fs to last reference dimension keys = list(refs.keys()) if 'fs' in keys and keys.index('fs') < len(keys) - 1: del keys[keys.index('fs')] keys.append('fs') refs = {k: refs[k] for k in keys} lkp = SmartLookup(refs, frame['lookup']) return fixLookup(lkp) def getLookup2D(self, Fdrive, fs): if fs < 1: lkp2d = self.getLookup(a=self.a, Fdrive=Fdrive, fs=True).project('fs', fs) else: lkp2d = self.getLookup().projectN({'a': self.a, 'f': Fdrive}) return lkp2d def fullDerivatives(self, t, y, Fdrive, Adrive, phi, fs): ''' Compute the full system derivatives. :param t: specific instant in time (s) :param y: vector of state variables :param Fdrive: acoustic drive frequency (Hz) :param Adrive: acoustic drive amplitude (Pa) :param phi: acoustic drive phase (rad) :param fs: sonophore membrane coevrage fraction (-) :return: vector of derivatives ''' dydt_mech = BilayerSonophore.derivatives( self, t, y[:3], Fdrive, Adrive, y[3], phi) dydt_elec = self.pneuron.derivatives( t, y[3:], Cm=self.spatialAverage(fs, self.capacitance(y[1]), self.Cm0)) return dydt_mech + dydt_elec def effDerivatives(self, t, y, lkp1d): ''' Compute the derivatives of the n-ODE effective HH system variables, based on 1-dimensional linear interpolation of "effective" coefficients that summarize the system's behaviour over an acoustic cycle. :param t: specific instant in time (s) :param y: vector of HH system variables at time t :param lkp: dictionary of 1D data points of "effective" coefficients over the charge domain, for specific frequency and amplitude values. :return: vector of effective system derivatives at time t ''' Qm, *states = y states_dict = dict(zip(self.pneuron.statesNames(), states)) lkp0d = lkp1d.interpolate1D('Q', Qm) dQmdt = - self.pneuron.iNet(lkp0d['V'], states_dict) * 1e-3 return [dQmdt, *self.pneuron.getDerEffStates(lkp0d, states_dict)] def __simFull(self, Fdrive, Adrive, tstim, toffset, PRF, DC, fs, phi=np.pi): # Determine time step dt = 1 / (NPC_DENSE * Fdrive) # Compute initial non-zero deflection Z = self.computeInitialDeflection(Adrive, Fdrive, phi, self.Qm0, dt) # Set initial conditions ss0 = self.pneuron.getSteadyStates(self.pneuron.Vm0) y0 = np.concatenate(([0., 0., self.ng0, self.Qm0], ss0)) y1 = np.concatenate(([0., Z, self.ng0, self.Qm0], ss0)) # Initialize simulator and compute solution logger.debug('Computing detailed solution') simulator = PWSimulator( lambda t, y: self.fullDerivatives(t, y, Fdrive, Adrive, phi, fs), lambda t, y: self.fullDerivatives(t, y, 0., 0., 0., fs)) t, y, stim = simulator( y1, dt, tstim, toffset, PRF, DC, target_dt=CLASSIC_TARGET_DT, print_progress=logger.getEffectiveLevel() <= logging.INFO, monitor_func=None) # monitor_func=lambda t, y: f't = {t * 1e3:.3f} ms, Qm = {y[3] * 1e5:.2f} nC/cm2') # Prepend initial conditions (prior to stimulation) t, y, stim = simulator.prependSolution(t, y, stim, y0=y0) # Store output in dataframe and return data = pd.DataFrame({ 't': t, 'stimstate': stim, 'Z': y[:, 1], 'ng': y[:, 2], 'Qm': y[:, 3] }) data['Vm'] = data['Qm'].values / self.spatialAverage( fs, self.v_capacitance(data['Z'].values), self.Cm0) * 1e3 # mV for i in range(len(self.pneuron.states)): data[self.pneuron.statesNames()[i]] = y[:, i + 4] return data def __simHybrid(self, Fdrive, Adrive, tstim, toffset, PRF, DC, fs, phi=np.pi): # Determine time steps dt_dense, dt_sparse = [1. / (n * Fdrive) for n in [NPC_DENSE, NPC_SPARSE]] # Compute initial non-zero deflection Z = self.computeInitialDeflection(Adrive, Fdrive, phi, self.Qm0, dt_dense) # Set initial conditions ss0 = self.pneuron.getSteadyStates(self.pneuron.Vm0) y0 = np.concatenate(([0., 0., self.ng0, self.Qm0], ss0)) y1 = np.concatenate(([0., Z, self.ng0, self.Qm0], ss0)) # Initialize simulator and compute solution is_dense_var = np.array([True] * 3 + [False] * (len(self.pneuron.states) + 1)) logger.debug('Computing hybrid solution') simulator = HybridSimulator( lambda t, y: self.fullDerivatives(t, y, Fdrive, Adrive, phi, fs), lambda t, y: self.fullDerivatives(t, y, 0., 0., 0., fs), lambda t, y, Cm: self.pneuron.derivatives( t, y, Cm=self.spatialAverage(fs, Cm, self.Cm0)), lambda yref: self.capacitance(yref[1]), is_dense_var, ivars_to_check=[1, 2]) t, y, stim = simulator(y1, dt_dense, dt_sparse, 1. / Fdrive, tstim, toffset, PRF, DC) # Prepend initial conditions (prior to stimulation) t, y, stim = simulator.prependSolution(t, y, stim, y0=y0) # Store output in dataframe and return data = pd.DataFrame({ 't': t, 'stimstate': stim, 'Z': y[:, 1], 'ng': y[:, 2], 'Qm': y[:, 3] }) data['Vm'] = data['Qm'].values / self.spatialAverage( fs, self.v_capacitance(data['Z'].values), self.Cm0) * 1e3 # mV for i in range(len(self.pneuron.states)): data[self.pneuron.statesNames()[i]] = y[:, i + 4] return data def __simSonic(self, Fdrive, Adrive, tstim, toffset, PRF, DC, fs): # Load appropriate 2D lookups lkp2d = self.getLookup2D(Fdrive, fs) # Interpolate 2D lookups at zero and US amplitude logger.debug('Interpolating lookups at A = %.2f kPa and A = 0', Adrive * 1e-3) lkps1d = {'ON': lkp2d.project('A', Adrive), 'OFF': lkp2d.project('A', 0.)} # Set initial conditions y0 = np.concatenate(([self.Qm0], self.pneuron.getSteadyStates(self.pneuron.Vm0))) # Initialize simulator and compute solution logger.debug('Computing effective solution') simulator = PWSimulator( lambda t, y: self.effDerivatives(t, y, lkps1d['ON']), lambda t, y: self.effDerivatives(t, y, lkps1d['OFF'])) t, y, stim = simulator(y0, self.pneuron.chooseTimeStep(), tstim, toffset, PRF, DC) # Prepend initial conditions (prior to stimulation) t, y, stim = simulator.prependSolution(t, y, stim) # Store output in dataframe and return data = pd.DataFrame({ 't': t, 'stimstate': stim, 'Qm': y[:, 0] }) data['Vm'] = self.interpOnOffVariable('V', data['Qm'].values, stim, lkps1d) for key in ['Z', 'ng']: data[key] = np.full(t.size, np.nan) for i in range(len(self.pneuron.states)): data[self.pneuron.statesNames()[i]] = y[:, i + 1] return data @classmethod @Model.checkOutputDir - def simQueue(cls, freqs, amps, durations, offsets, PRFs, DCs, fs, methods, outputdir=None): + def simQueue(cls, freqs, amps, durations, offsets, PRFs, DCs, fs, methods, **kwargs): ''' Create a serialized 2D array of all parameter combinations for a series of individual parameter sweeps, while avoiding repetition of CW protocols for a given PRF sweep. :param freqs: list (or 1D-array) of US frequencies :param amps: list (or 1D-array) of acoustic amplitudes :param durations: list (or 1D-array) of stimulus durations :param offsets: list (or 1D-array) of stimulus offsets (paired with durations array) :param PRFs: list (or 1D-array) of pulse-repetition frequencies :param DCs: list (or 1D-array) of duty cycle values :param fs: sonophore membrane coverage fractions (-) :params methods: integration methods :return: list of parameters (list) for each simulation ''' method_ids = list(range(len(methods))) if ('full' in methods or 'hybrid' in methods) and outputdir is None: logger.warning('Running cumbersome simulation(s) without file saving') if amps is None: amps = [np.nan] DCs = np.array(DCs) queue = [] if 1.0 in DCs: queue += Batch.createQueue( freqs, amps, durations, offsets, min(PRFs), 1.0, fs, method_ids) if np.any(DCs != 1.0): queue += Batch.createQueue( freqs, amps, durations, offsets, PRFs, DCs[DCs != 1.0], fs, method_ids) for item in queue: if np.isnan(item[1]): item[1] = None item[-1] = methods[int(item[-1])] return queue @Model.logNSpikes @Model.checkTitrate('Adrive') @Model.addMeta def simulate(self, Fdrive, Adrive, tstim, toffset, PRF=100., DC=1., fs=1., method='sonic'): ''' Simulate the electro-mechanical model for a specific set of US stimulation parameters, and return output data in a dataframe. :param Fdrive: acoustic drive frequency (Hz) :param Adrive: acoustic drive amplitude (Pa) :param tstim: duration of US stimulation (s) :param toffset: duration of the offset (s) :param PRF: pulse repetition frequency (Hz) :param DC: pulse duty cycle (-) :param fs: sonophore membrane coverage fraction (-) :param method: selected integration method :return: 2-tuple with the output dataframe and computation time. ''' logger.info( '%s: simulation @ f = %sHz, A = %sPa, t = %ss (%ss offset)%s%s', self, si_format(Fdrive, 0, space=' '), si_format(Adrive, 2, space=' '), *si_format([tstim, toffset], 1, space=' '), (', PRF = {}Hz, DC = {:.2f}%'.format( si_format(PRF, 2, space=' '), DC * 1e2) if DC < 1.0 else ''), ', fs = {:.2f}%'.format(fs * 1e2) if fs < 1.0 else '') # Check validity of stimulation parameters BilayerSonophore.checkInputs(Fdrive, Adrive, 0.0, 0.0) PointNeuron.checkInputs(Adrive, tstim, toffset, PRF, DC) # Call appropriate simulation function and return try: simfunc = { 'full': self.__simFull, 'hybrid': self.__simHybrid, 'sonic': self.__simSonic }[method] except KeyError: raise ValueError('Invalid integration method: "{}"'.format(method)) return simfunc(Fdrive, Adrive, tstim, toffset, PRF, DC, fs) def meta(self, Fdrive, Adrive, tstim, toffset, PRF, DC, fs, method): return { 'simkey': self.simkey, 'neuron': self.pneuron.name, 'a': self.a, 'd': self.d, 'Fdrive': Fdrive, 'Adrive': Adrive, 'tstim': tstim, 'toffset': toffset, 'PRF': PRF, 'DC': DC, 'fs': fs, 'method': method } @staticmethod def getNSpikes(data): return PointNeuron.getNSpikes(data) @logCache(os.path.join(os.path.split(__file__)[0], 'astim_titrations.log')) def titrate(self, Fdrive, tstim, toffset, PRF=100., DC=1., fs=1., method='sonic', xfunc=None, Arange=None): ''' Use a binary search to determine the threshold amplitude needed to obtain neural excitation for a given frequency, duration, PRF and duty cycle. :param Fdrive: US frequency (Hz) :param tstim: duration of US stimulation (s) :param toffset: duration of the offset (s) :param PRF: pulse repetition frequency (Hz) :param DC: pulse duty cycle (-) :param fs: sonophore membrane coverage fraction (-) :param method: integration method :param xfunc: function determining whether condition is reached from simulation output :param Arange: search interval for Adrive, iteratively refined :return: determined threshold amplitude (Pa) ''' # Default output function if xfunc is None: xfunc = self.pneuron.titrationFunc # Default amplitude interval if Arange is None: Arange = [0., self.getLookup().refs['A'].max()] return binarySearch( lambda x: xfunc(self.simulate(*x)[0]), [Fdrive, tstim, toffset, PRF, DC, fs, method], 1, Arange, THRESHOLD_CONV_RANGE_ASTIM ) def getQuasiSteadyStates(self, Fdrive, amps=None, charges=None, DCs=1.0, squeeze_output=False): ''' Compute the quasi-steady state values of the neuron's gating variables for a combination of US amplitudes, charge densities and duty cycles, at a specific US frequency. :param Fdrive: US frequency (Hz) :param amps: US amplitudes (Pa) :param charges: membrane charge densities (C/m2) :param DCs: duty cycle value(s) :return: 4-tuple with reference values of US amplitude and charge density, as well as interpolated Vmeff and QSS gating variables ''' # Get DC-averaged lookups interpolated at the appropriate amplitudes and charges lkp = self.getLookup().projectDCs(amps=amps, DCs=DCs).projectN({'a': self.a, 'f': Fdrive}) if charges is not None: lkp = lkp.project('Q', charges) # Specify dimensions with A and DC as the first two axes A_axis = lkp.getAxisIndex('A') lkp.move('A', 0) lkp.move('DC', 1) nA, nDC = lkp.dims()[:2] # Compute QSS states using these lookups QSS = {k: np.empty(lkp.dims()) for k in self.pneuron.statesNames()} for iA in range(nA): for iDC in range(nDC): lkp1d = SmartDict({k: v[iA, iDC] for k, v in lkp.items()}) QSS_1D = {k: v(lkp1d) for k, v in self.pneuron.quasiSteadyStates().items()} for k in QSS.keys(): QSS[k][iA, iDC] = QSS_1D[k] QSS = SmartLookup(lkp.refs, QSS) for item in [lkp, QSS]: item.move('A', A_axis) item.move('DC', -1) # Compress outputs if needed if squeeze_output: QSS = QSS.squeeze() lkp = lkp.squeeze() return lkp, QSS def iNetQSS(self, Qm, Fdrive, Adrive, DC): ''' Compute quasi-steady state net membrane current for a given combination of US parameters and a given membrane charge density. :param Qm: membrane charge density (C/m2) :param Fdrive: US frequency (Hz) :param Adrive: US amplitude (Pa) :param DC: duty cycle (-) :return: net membrane current (mA/m2) ''' lkp, QSS = self.getQuasiSteadyStates( Fdrive, amps=Adrive, charges=Qm, DCs=DC, squeeze_output=True) return self.pneuron.iNet(lkp['V'], QSS) # mA/m2 def fixedPointsQSS(self, Fdrive, Adrive, DC, lkp, dQdt): ''' Compute QSS fixed points along the charge dimension for a given combination of US parameters, and determine their stability. :param Fdrive: US frequency (Hz) :param Adrive: US amplitude (Pa) :param DC: duty cycle (-) :param lkp: lookup dictionary for effective variables along charge dimension :param dQdt: charge derivative profile along charge dimension :return: 2-tuple with values of stable and unstable fixed points ''' pltvars = self.getPltVars() logger.debug('A = {:.2f} kPa, DC = {:.0f}%'.format(Adrive * 1e-3, DC * 1e2)) # Extract fixed points from QSS charge variation profile def dfunc(Qm): return - self.iNetQSS(Qm, Fdrive, Adrive, DC) fixed_points = getFixedPoints( lkp.refs['Q'], dQdt, filter='both', der_func=dfunc).tolist() dfunc = lambda x: np.array(self.effDerivatives(_, x, lkp)) classified_fixed_points = {'stable': [], 'unstable': [], 'saddle': []} np.set_printoptions(precision=2) # For each fixed point for i, Qm in enumerate(fixed_points): # Re-compute QSS at fixed point *_, QSS = self.getQuasiSteadyStates(Fdrive, amps=Adrive, charges=Qm, DCs=DC, squeeze_output=True) # Approximate the system's Jacobian matrix at the fixed-point and compute its eigenvalues x = np.array([Qm, *QSS.tables.values()]) print(f'x = {x}, dfunx(x) = {dfunc(x)}') eps_machine = np.sqrt(np.finfo(float).eps) J = jacobian(dfunc, x, rel_eps=eps_machine, method='forward') # Jfunc = nd.Jacobian(dfunc, order=3) # J = Jfunc(x) # print('------------------ Jacobian ------------------') # names = ['Q'] + self.pneuron.statesNames() # for name, Jline in zip(names, J): # print(f'd(d{name}dt) = {Jline}') # Determine fixed point stability based on eigenvalues eigvals, eigvecs = linalg.eig(J) s = ['({0.real:.2e} + {0.imag:.2e}j)'.format(x) for x in eigvals] print(f'eigenvalues = {s}') is_real_eigvals = np.isreal(eigvals) print(is_real_eigvals) is_neg_eigvals = eigvals.real < 0 if np.all(is_neg_eigvals): key = 'stable' elif np.any(is_neg_eigvals): key = 'saddle' else: key = 'unstable' classified_fixed_points[key].append(Qm) logger.debug(f'{key} fixed point @ Q = {(Qm * 1e5):.1f} nC/cm2') return classified_fixed_points def isStableQSS(self, Fdrive, Adrive, DC): lookups, QSS = self.getQuasiSteadyStates( Fdrive, amps=Adrive, DCs=DC, squeeze_output=True) dQdt = -self.pneuron.iNet(lookups['V'], QSS.tables) # mA/m2 classified_fixed_points = self.fixedPointsQSS(Fdrive, Adrive, DC, lookups, dQdt) return len(classified_fixed_points['stable']) > 0 def titrateQSS(self, Fdrive, DC=1., Arange=None): # Default amplitude interval if Arange is None: Arange = [0., self.getLookup().refs['A'].max()] # Titration function def xfunc(x): if self.pneuron.name == 'STN': return self.isStableQSS(*x) else: return not self.isStableQSS(*x) return binarySearch( xfunc, [Fdrive, DC], 1, Arange, THRESHOLD_CONV_RANGE_ASTIM) diff --git a/PySONIC/core/pneuron.py b/PySONIC/core/pneuron.py index 650157b..9dcfa34 100644 --- a/PySONIC/core/pneuron.py +++ b/PySONIC/core/pneuron.py @@ -1,583 +1,583 @@ # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Email: theo.lemaire@epfl.ch # @Date: 2017-08-03 11:53:04 # @Last Modified by: Theo Lemaire -# @Last Modified time: 2019-09-17 14:59:13 +# @Last Modified time: 2019-09-17 16:10:59 import abc import inspect import numpy as np import pandas as pd from .batches import Batch from .model import Model from .lookups import SmartLookup from .simulators import PWSimulator from ..postpro import detectSpikes, computeFRProfile from ..constants import * from ..utils import * class PointNeuron(Model): ''' Generic point-neuron model interface. ''' tscale = 'ms' # relevant temporal scale of the model simkey = 'ESTIM' # keyword used to characterize simulations made with this model def __repr__(self): return self.__class__.__name__ @property @classmethod @abc.abstractmethod def name(cls): ''' Neuron name. ''' raise NotImplementedError @property @classmethod @abc.abstractmethod def Cm0(cls): ''' Neuron's resting capacitance (F/cm2). ''' raise NotImplementedError @property @classmethod @abc.abstractmethod def Vm0(cls): ''' Neuron's resting membrane potential(mV). ''' raise NotImplementedError @classmethod def Qm0(cls): return cls.Cm0 * cls.Vm0 * 1e-3 # C/cm2 @staticmethod def inputs(): return { 'Astim': { 'desc': 'current density amplitude', 'label': 'A', 'unit': 'mA/m2', 'factor': 1e0, 'precision': 1 }, 'tstim': { 'desc': 'stimulus duration', 'label': 't_{stim}', 'unit': 'ms', 'factor': 1e3, 'precision': 0 }, 'toffset': { 'desc': 'offset duration', 'label': 't_{offset}', 'unit': 'ms', 'factor': 1e3, 'precision': 0 }, 'PRF': { 'desc': 'pulse repetition frequency', 'label': 'PRF', 'unit': 'Hz', 'factor': 1e0, 'precision': 0 }, 'DC': { 'desc': 'duty cycle', 'label': 'DC', 'unit': '%', 'factor': 1e2, 'precision': 2 } } @classmethod def filecodes(cls, Astim, tstim, toffset, PRF, DC): is_CW = DC == 1. return { 'simkey': cls.simkey, 'neuron': cls.name, 'nature': 'CW' if is_CW else 'PW', 'Astim': '{:.1f}mAm2'.format(Astim), 'tstim': '{:.0f}ms'.format(tstim * 1e3), 'toffset': None, 'PRF': 'PRF{:.2f}Hz'.format(PRF) if not is_CW else None, 'DC': 'DC{:.2f}%'.format(DC * 1e2) if not is_CW else None } @classmethod def getPltVars(cls, wrapleft='df["', wrapright='"]'): pltvars = { 'Qm': { 'desc': 'membrane charge density', 'label': 'Q_m', 'unit': 'nC/cm^2', 'factor': 1e5, 'bounds': ((cls.Vm0 - 20.0) * cls.Cm0 * 1e2, 60) }, 'Vm': { 'desc': 'membrane potential', 'label': 'V_m', 'unit': 'mV', 'bounds': (-150, 70) }, 'ELeak': { 'constant': 'obj.ELeak', 'desc': 'non-specific leakage current resting potential', 'label': 'V_{leak}', 'unit': 'mV', 'ls': '--', 'color': 'k' } } for cname in cls.getCurrentsNames(): cfunc = getattr(cls, cname) cargs = inspect.getargspec(cfunc)[0][1:] pltvars[cname] = { 'desc': inspect.getdoc(cfunc).splitlines()[0], 'label': 'I_{{{}}}'.format(cname[1:]), 'unit': 'A/m^2', 'factor': 1e-3, 'func': '{}({})'.format(cname, ', '.join(['{}{}{}'.format(wrapleft, a, wrapright) for a in cargs])) } for var in cargs: if var != 'Vm': pltvars[var] = { 'desc': cls.states[var], 'label': var, 'bounds': (-0.1, 1.1) } pltvars['iNet'] = { 'desc': inspect.getdoc(getattr(cls, 'iNet')).splitlines()[0], 'label': 'I_{net}', 'unit': 'A/m^2', 'factor': 1e-3, 'func': 'iNet({0}Vm{1}, {2}{3}{4})'.format( wrapleft, wrapright, wrapleft[:-1], cls.statesNames(), wrapright[1:]), 'ls': '--', 'color': 'black' } pltvars['dQdt'] = { 'desc': inspect.getdoc(getattr(cls, 'dQdt')).splitlines()[0], 'label': 'dQ_m/dt', 'unit': 'A/m^2', 'factor': 1e-3, 'func': 'dQdt({0}Vm{1}, {2}{3}{4})'.format( wrapleft, wrapright, wrapleft[:-1], cls.statesNames(), wrapright[1:]), 'ls': '--', 'color': 'black' } pltvars['iCap'] = { 'desc': inspect.getdoc(getattr(cls, 'iCap')).splitlines()[0], 'label': 'I_{cap}', 'unit': 'A/m^2', 'factor': 1e-3, 'func': 'iCap({0}t{1}, {0}Vm{1})'.format(wrapleft, wrapright) } for rate in cls.rates: if 'alpha' in rate: prefix, suffix = 'alpha', rate[5:] else: prefix, suffix = 'beta', rate[4:] pltvars['{}'.format(rate)] = { 'label': '\\{}_{{{}}}'.format(prefix, suffix), 'unit': 'ms^{-1}', 'factor': 1e-3 } pltvars['FR'] = { 'desc': 'riring rate', 'label': 'FR', 'unit': 'Hz', 'factor': 1e0, # 'bounds': (0, 1e3), 'func': f'firingRateProfile({wrapleft[:-2]})' } return pltvars @classmethod def iCap(cls, t, Vm): ''' Capacitive current. ''' dVdt = np.insert(np.diff(Vm) / np.diff(t), 0, 0.) return cls.Cm0 * dVdt @classmethod def getPltScheme(cls): pltscheme = { 'Q_m': ['Qm'], 'V_m': ['Vm'] } pltscheme['I'] = cls.getCurrentsNames() + ['iNet'] for cname in cls.getCurrentsNames(): if 'Leak' not in cname: key = 'i_{{{}}}\ kin.'.format(cname[1:]) cargs = inspect.getargspec(getattr(cls, cname))[0][1:] pltscheme[key] = [var for var in cargs if var not in ['Vm', 'Cai']] return pltscheme @classmethod def statesNames(cls): ''' Return a list of names of all state variables of the model. ''' return list(cls.states.keys()) @classmethod @abc.abstractmethod def derStates(cls): ''' Dictionary of states derivatives functions ''' raise NotImplementedError @classmethod def getDerStates(cls, Vm, states): ''' Compute states derivatives array given a membrane potential and states dictionary ''' return np.array([cls.derStates()[k](Vm, states) for k in cls.statesNames()]) @classmethod @abc.abstractmethod def steadyStates(cls): ''' Return a dictionary of steady-states functions ''' raise NotImplementedError @classmethod def getSteadyStates(cls, Vm): ''' Compute array of steady-states for a given membrane potential ''' return np.array([cls.steadyStates()[k](Vm) for k in cls.statesNames()]) @classmethod def getDerEffStates(cls, lkp, states): ''' Compute effective states derivatives array given lookups and states dictionaries. ''' return np.array([ cls.derEffStates()[k](lkp, states) for k in cls.statesNames()]) @classmethod def getEffRates(cls, Vm): ''' Compute array of effective rate constants for a given membrane potential vector. ''' return {k: np.mean(np.vectorize(v)(Vm)) for k, v in cls.effRates().items()} @classmethod def getLookup(cls): ''' Get lookup of membrane potential rate constants interpolated along the neuron's charge physiological range. ''' Qmin, Qmax = expandRange(*cls.Qbounds(), 10.) Qref = np.arange(Qmin, Qmax, 1e-5) # C/m2 Vref = Qref / cls.Cm0 * 1e3 # mV tables = {k: np.vectorize(v)(Vref) for k, v in cls.effRates().items()} return SmartLookup({'Q': Qref}, {**{'V': Vref}, **tables}) @classmethod @abc.abstractmethod def currents(cls): ''' Dictionary of ionic currents functions (returning current densities in mA/m2) ''' @classmethod def iNet(cls, Vm, states): ''' net membrane current :param Vm: membrane potential (mV) :states: states of ion channels gating and related variables :return: current per unit area (mA/m2) ''' return sum([cfunc(Vm, states) for cfunc in cls.currents().values()]) @classmethod def dQdt(cls, Vm, states): ''' membrane charge density variation rate :param Vm: membrane potential (mV) :states: states of ion channels gating and related variables :return: variation rate (mA/m2) ''' return -cls.iNet(Vm, states) @classmethod def titrationFunc(cls, *args, **kwargs): ''' Default titration function. ''' return cls.isExcited(*args, **kwargs) @staticmethod def currentToConcentrationRate(z_ion, depth): ''' Compute the conversion factor from a specific ionic current (in mA/m2) into a variation rate of submembrane ion concentration (in M/s). :param: z_ion: ion valence :param depth: submembrane depth (m) :return: conversion factor (Mmol.m-1.C-1) ''' return 1e-6 / (z_ion * depth * FARADAY) @staticmethod def nernst(z_ion, Cion_in, Cion_out, T): ''' Nernst potential of a specific ion given its intra and extracellular concentrations. :param z_ion: ion valence :param Cion_in: intracellular ion concentration :param Cion_out: extracellular ion concentration :param T: temperature (K) :return: ion Nernst potential (mV) ''' return (Rg * T) / (z_ion * FARADAY) * np.log(Cion_out / Cion_in) * 1e3 @staticmethod def vtrap(x, y): ''' Generic function used to compute rate constants. ''' return x / (np.exp(x / y) - 1) @staticmethod def efun(x): ''' Generic function used to compute rate constants. ''' return x / (np.exp(x) - 1) @classmethod def ghkDrive(cls, Vm, Z_ion, Cion_in, Cion_out, T): ''' Use the Goldman-Hodgkin-Katz equation to compute the electrochemical driving force of a specific ion species for a given membrane potential. :param Vm: membrane potential (mV) :param Cin: intracellular ion concentration (M) :param Cout: extracellular ion concentration (M) :param T: temperature (K) :return: electrochemical driving force of a single ion particle (mC.m-3) ''' x = Z_ion * FARADAY * Vm / (Rg * T) * 1e-3 # [-] eCin = Cion_in * cls.efun(-x) # M eCout = Cion_out * cls.efun(x) # M return FARADAY * (eCin - eCout) * 1e6 # mC/m3 @classmethod def getCurrentsNames(cls): return list(cls.currents().keys()) @staticmethod def firingRateProfile(*args, **kwargs): return computeFRProfile(*args, **kwargs) @classmethod def Qbounds(cls): ''' Determine bounds of membrane charge physiological range for a given neuron. ''' return np.array([np.round(cls.Vm0 - 25.0), 50.0]) * cls.Cm0 * 1e-3 # C/m2 @classmethod def isVoltageGated(cls, state): ''' Determine whether a given state is purely voltage-gated or not.''' return 'alpha{}'.format(state.lower()) in cls.rates @classmethod @Model.checkOutputDir - def simQueue(cls, amps, durations, offsets, PRFs, DCs, outputdir=None): + def simQueue(cls, amps, durations, offsets, PRFs, DCs, **kwargs): ''' Create a serialized 2D array of all parameter combinations for a series of individual parameter sweeps, while avoiding repetition of CW protocols for a given PRF sweep. :param amps: list (or 1D-array) of acoustic amplitudes :param durations: list (or 1D-array) of stimulus durations :param offsets: list (or 1D-array) of stimulus offsets (paired with durations array) :param PRFs: list (or 1D-array) of pulse-repetition frequencies :param DCs: list (or 1D-array) of duty cycle values :return: list of parameters (list) for each simulation ''' if amps is None: amps = [np.nan] DCs = np.array(DCs) queue = [] if 1.0 in DCs: queue += Batch.createQueue(amps, durations, offsets, min(PRFs), 1.0) if np.any(DCs != 1.0): queue += Batch.createQueue(amps, durations, offsets, PRFs, DCs[DCs != 1.0]) for item in queue: if np.isnan(item[0]): item[0] = None return queue @staticmethod def checkInputs(Astim, tstim, toffset, PRF, DC): ''' Check validity of electrical stimulation parameters. :param Astim: pulse amplitude (mA/m2) :param tstim: pulse duration (s) :param toffset: offset duration (s) :param PRF: pulse repetition frequency (Hz) :param DC: pulse duty cycle (-) ''' if not all(isinstance(param, float) for param in [Astim, tstim, toffset, DC]): raise TypeError('Invalid stimulation parameters (must be float typed)') if tstim <= 0: raise ValueError('Invalid stimulus duration: {} ms (must be strictly positive)' .format(tstim * 1e3)) if toffset < 0: raise ValueError('Invalid stimulus offset: {} ms (must be positive or null)' .format(toffset * 1e3)) if DC <= 0.0 or DC > 1.0: raise ValueError('Invalid duty cycle: {} (must be within ]0; 1])'.format(DC)) if DC < 1.0: if not isinstance(PRF, float): raise TypeError('Invalid PRF value (must be float typed)') if PRF is None: raise AttributeError('Missing PRF value (must be provided when DC < 1)') if PRF < 1 / tstim: raise ValueError('Invalid PRF: {} Hz (PR interval exceeds stimulus duration)' .format(PRF)) def chooseTimeStep(self): ''' Determine integration time step based on intrinsic temporal properties. ''' return DT_EFFECTIVE @classmethod def derivatives(cls, t, y, Cm=None, Iinj=0.): ''' Compute system derivatives for a given mambrane capacitance and injected current. :param t: specific instant in time (s) :param y: vector of HH system variables at time t :param Cm: membrane capacitance (F/m2) :param Iinj: injected current (mA/m2) :return: vector of system derivatives at time t ''' if Cm is None: Cm = cls.Cm0 Qm, *states = y Vm = Qm / Cm * 1e3 # mV states_dict = dict(zip(cls.statesNames(), states)) dQmdt = (Iinj - cls.iNet(Vm, states_dict)) * 1e-3 # A/m2 return [dQmdt, *cls.getDerStates(Vm, states_dict)] @Model.logNSpikes @Model.checkTitrate('Astim') @Model.addMeta def simulate(self, Astim, tstim, toffset, PRF=100., DC=1.0): ''' Simulate a specific neuron model for a specific set of electrical parameters, and return output data in a dataframe. :param Astim: pulse amplitude (mA/m2) :param tstim: pulse duration (s) :param toffset: offset duration (s) :param PRF: pulse repetition frequency (Hz) :param DC: pulse duty cycle (-) :return: 2-tuple with the output dataframe and computation time. ''' logger.info( '%s: simulation @ A = %sA/m2, t = %ss (%ss offset)%s', self, si_format(Astim, 2, space=' '), *si_format([tstim, toffset], 1, space=' '), (', PRF = {}Hz, DC = {:.2f}%'.format( si_format(PRF, 2, space=' '), DC * 1e2) if DC < 1.0 else '')) # Check validity of stimulation parameters self.checkInputs(Astim, tstim, toffset, PRF, DC) # Set initial conditions y0 = np.array((self.Qm0(), *self.getSteadyStates(self.Vm0))) # Initialize simulator and compute solution logger.debug('Computing solution') simulator = PWSimulator( lambda t, y: self.derivatives(t, y, Iinj=Astim), lambda t, y: self.derivatives(t, y, Iinj=0.)) t, y, stim = simulator( y0, self.chooseTimeStep(), tstim, toffset, PRF, DC) # Prepend initial conditions (prior to stimulation) t, y, stim = simulator.prependSolution(t, y, stim) # Store output in dataframe and return data = pd.DataFrame({ 't': t, 'stimstate': stim, 'Qm': y[:, 0], 'Vm': y[:, 0] / self.Cm0 * 1e3, }) for i in range(len(self.states)): data[self.statesNames()[i]] = y[:, i + 1] return data @classmethod def meta(cls, Astim, tstim, toffset, PRF, DC): return { 'simkey': cls.simkey, 'neuron': cls.name, 'Astim': Astim, 'tstim': tstim, 'toffset': toffset, 'PRF': PRF, 'DC': DC } @staticmethod def getNSpikes(data): ''' Compute number of spikes in charge profile of simulation output. :param data: dataframe containing output time series :return: number of detected spikes ''' return detectSpikes(data)[0].size @staticmethod def getStabilizationValue(data): ''' Determine stabilization value from the charge profile of a simulation output. :param data: dataframe containing output time series :return: charge stabilization value (or np.nan if no stabilization detected) ''' # Extract charge signal posterior to observation window t, Qm = [data[key].values for key in ['t', 'Qm']] if t.max() <= TMIN_STABILIZATION: raise ValueError('solution length is too short to assess stabilization') Qm = Qm[t > TMIN_STABILIZATION] # Compute variation range Qm_range = np.ptp(Qm) logger.debug('%.2f nC/cm2 variation range over the last %.0f ms, Qmf = %.2f nC/cm2', Qm_range * 1e5, TMIN_STABILIZATION * 1e3, Qm[-1] * 1e5) # Return final value only if stabilization is detected if np.ptp(Qm) < QSS_Q_DIV_THR: return Qm[-1] else: return np.nan @classmethod def isExcited(cls, data): ''' Determine if neuron is excited from simulation output. :param data: dataframe containing output time series :return: boolean stating whether neuron is excited or not ''' return cls.getNSpikes(data) > 0 @classmethod def isSilenced(cls, data): ''' Determine if neuron is silenced from simulation output. :param data: dataframe containing output time series :return: boolean stating whether neuron is silenced or not ''' return not np.isnan(cls.getStabilizationValue(data)) def titrate(self, tstim, toffset, PRF, DC, xfunc=None, Arange=(0., 2 * AMP_UPPER_BOUND_ESTIM)): ''' Use a binary search to determine the threshold amplitude needed to obtain neural excitation for a given duration, PRF and duty cycle. :param tstim: duration of US stimulation (s) :param toffset: duration of the offset (s) :param PRF: pulse repetition frequency (Hz) :param DC: pulse duty cycle (-) :param xfunc: function determining whether condition is reached from simulation output :param Arange: search interval for Astim, iteratively refined :return: excitation threshold amplitude (mA/m2) ''' # Default output function if xfunc is None: xfunc = self.titrationFunc return binarySearch( lambda x: xfunc(self.simulate(*x)[0]), [tstim, toffset, PRF, DC], 0, Arange, THRESHOLD_CONV_RANGE_ESTIM ) diff --git a/PySONIC/core/vclamp.py b/PySONIC/core/vclamp.py index 8abcd93..452148c 100644 --- a/PySONIC/core/vclamp.py +++ b/PySONIC/core/vclamp.py @@ -1,159 +1,159 @@ # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Email: theo.lemaire@epfl.ch # @Date: 2019-08-14 13:49:25 # @Last Modified by: Theo Lemaire -# @Last Modified time: 2019-09-17 14:58:40 +# @Last Modified time: 2019-09-17 16:12:22 import numpy as np import pandas as pd from .batches import Batch from .model import Model from .pneuron import PointNeuron from .simulators import PWSimulator from ..constants import * from ..utils import * class VoltageClamp(Model): tscale = 'ms' # relevant temporal scale of the model simkey = 'VCLAMP' # keyword used to characterize simulations made with this model def __init__(self, pneuron): ''' Constructor of the class. :param pneuron: point-neuron model ''' # Check validity of input parameters if not isinstance(pneuron, PointNeuron): raise ValueError('Invalid neuron type: "{}" (must inherit from PointNeuron class)' .format(pneuron.name)) self.pneuron = pneuron def __repr__(self): return '{}({})'.format(self.__class__.__name__, self.pneuron) def params(self): return self.pneuron.params() def getPltVars(self, wrapleft='df["', wrapright='"]'): return self.pneuron.getPltVars(wrapleft, wrapright) def getPltScheme(self): return self.pneuron.getPltScheme() def filecode(self, *args): return Model.filecode(self, *args) @staticmethod def inputs(): # Get pneuron input vars and replace stimulation current by held and step voltages inputvars = PointNeuron.inputs() for key in ['Astim', 'PRF', 'DC']: del inputvars[key] inputvars.update({ 'Vhold': { 'desc': 'held membrane potential', 'label': 'V_{hold}', 'unit': 'mV', 'precision': 0 }, 'Vstep': { 'desc': 'step membrane potential', 'label': 'V_{step}', 'unit': 'mV', 'precision': 0 } }) return inputvars def filecodes(self, Vhold, Vstep, tstim, toffset): return { 'simkey': self.simkey, 'neuron': self.pneuron.name, 'Vhold': '{:.1f}mV'.format(Vhold), 'Vstep': '{:.1f}mV'.format(Vstep), 'tstim': '{:.0f}ms'.format(tstim * 1e3), 'toffset': None } @classmethod @Model.checkOutputDir - def simQueue(cls, *args, outputdir=None): + def simQueue(cls, *args, **kwargs): return Batch.createQueue(*args) @staticmethod def checkInputs(Vhold, Vstep, tstim, toffset): ''' Check validity of stimulation parameters. :param Vhold: held membrane potential (mV) :param Vstep: step membrane potential (mV) :param tstim: pulse duration (s) :param toffset: offset duration (s) ''' if not all(isinstance(param, float) for param in [Vhold, Vstep, tstim, toffset]): raise TypeError('Invalid stimulation parameters (must be float typed)') if tstim <= 0: raise ValueError('Invalid stimulus duration: {} ms (must be strictly positive)' .format(tstim * 1e3)) if toffset < 0: raise ValueError('Invalid stimulus offset: {} ms (must be positive or null)' .format(toffset * 1e3)) def derivatives(self, t, y, Vm=None): if Vm is None: Vm = self.pneuron.Vm0 states_dict = dict(zip(self.pneuron.statesNames(), y)) return self.pneuron.getDerStates(Vm, states_dict) @Model.addMeta def simulate(self, Vhold, Vstep, tstim, toffset): logger.info( '%s: simulation @ Vhold = %sV, Vstep = %sV, t = %ss (%ss offset)', self, *si_format([Vhold * 1e-3, Vstep * 1e-3, tstim, toffset], 1, space=' ')) # Check validity of stimulation parameters self.checkInputs(Vhold, Vstep, tstim, toffset) # Set initial conditions y0 = self.pneuron.getSteadyStates(Vhold) # Initialize simulator and compute solution logger.debug('Computing solution') simulator = PWSimulator( lambda t, y: self.derivatives(t, y, Vm=Vstep), lambda t, y: self.derivatives(t, y, Vm=Vhold)) t, y, stim = simulator( y0, DT_EFFECTIVE, tstim, toffset, None, 1.) # Prepend initial conditions (prior to stimulation) t, y, stim = simulator.prependSolution(t, y, stim) # Compute clamped membrane potential vector Vm = np.zeros(stim.size) Vm[stim == 0] = Vhold Vm[stim == 1] = Vstep # Store output in dataframe and return data = pd.DataFrame({ 't': t, 'stimstate': stim, 'Qm': Vm * 1e-3 * self.pneuron.Cm0, 'Vm': Vm, }) for i in range(len(self.pneuron.states)): data[self.pneuron.statesNames()[i]] = y[:, i] return data def meta(self, Vhold, Vstep, tstim, toffset): return { 'simkey': self.simkey, 'neuron': self.pneuron.name, 'Vhold': Vhold, 'Vstep': Vstep, 'tstim': tstim, 'toffset': toffset, } diff --git a/scripts/run_mech.py b/scripts/run_mech.py index 55d28d4..6facc3c 100644 --- a/scripts/run_mech.py +++ b/scripts/run_mech.py @@ -1,39 +1,39 @@ # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Email: theo.lemaire@epfl.ch # @Date: 2016-11-21 10:46:56 # @Last Modified by: Theo Lemaire -# @Last Modified time: 2019-08-30 17:00:09 +# @Last Modified time: 2019-09-17 15:32:27 ''' Run simulations of the NICE mechanical model. ''' from PySONIC.core import BilayerSonophore, Batch from PySONIC.utils import logger from PySONIC.parsers import MechSimParser def main(): # Parse command line arguments parser = MechSimParser() args = parser.parse() logger.setLevel(args['loglevel']) # Run MECH batch logger.info("Starting mechanical simulation batch") queue = BilayerSonophore.simQueue(*parser.parseSimInputs(args), outputdir=args['outputdir']) output = [] for a in args['radius']: for d in args['embedding']: for Cm0 in args['Cm0']: for Qm0 in args['Qm0']: bls = BilayerSonophore(a, Cm0, Qm0, embedding_depth=d) batch = Batch(bls.simAndSave if args['save'] else bls.simulate, queue) output += batch(mpi=args['mpi'], loglevel=args['loglevel']) # Plot resulting profiles if args['plot'] is not None: parser.parsePlot(args, output) if __name__ == '__main__': main()