diff --git a/PySONIC/core/batches.py b/PySONIC/core/batches.py index aee9aa1..cef1ce2 100644 --- a/PySONIC/core/batches.py +++ b/PySONIC/core/batches.py @@ -1,364 +1,364 @@ # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Email: theo.lemaire@epfl.ch # @Date: 2017-08-22 14:33:04 # @Last Modified by: Theo Lemaire -# @Last Modified time: 2021-03-23 21:30:07 +# @Last Modified time: 2021-03-26 19:36:52 ''' Utility functions used in simulations ''' import os import abc import csv import logging import numpy as np import pandas as pd import multiprocess as mp from ..utils import logger, si_format, isIterable class Consumer(mp.Process): ''' Generic consumer process, taking tasks from a queue and outputing results in another queue. ''' def __init__(self, queue_in, queue_out): mp.Process.__init__(self) self.queue_in = queue_in self.queue_out = queue_out logger.debug('Starting %s', self.name) def run(self): while True: nextTask = self.queue_in.get() if nextTask is None: logger.debug('Exiting %s', self.name) self.queue_in.task_done() break answer = nextTask() self.queue_in.task_done() self.queue_out.put(answer) return class Worker: ''' Generic worker class calling a specific function with a given set of parameters. ''' def __init__(self, wid, func, args, kwargs, loglevel): ''' Worker constructor. :param wid: worker ID :param func: function object :param args: list of method arguments :param kwargs: dictionary of optional method arguments :param loglevel: logging level ''' self.id = wid self.func = func self.args = args self.kwargs = kwargs self.loglevel = loglevel def __call__(self): ''' Caller to the function with specific parameters. ''' logger.setLevel(self.loglevel) return self.id, self.func(*self.args, **self.kwargs) class Batch: ''' Generic interface to run batches of function calls. ''' def __init__(self, func, queue): ''' Batch constructor. :param func: function object :param queue: list of list of function parameters ''' self.func = func self.queue = queue def __call__(self, *args, **kwargs): ''' Call the internal run method. ''' return self.run(*args, **kwargs) def getNConsumers(self): ''' Determine number of consumers based on queue length and number of available CPUs. ''' return min(mp.cpu_count(), len(self.queue)) def start(self): ''' Create tasks and results queues, and start consumers. ''' mp.freeze_support() self.tasks = mp.JoinableQueue() self.results = mp.Queue() self.consumers = [Consumer(self.tasks, self.results) for i in range(self.getNConsumers())] for c in self.consumers: c.start() @staticmethod def resolve(params): if isinstance(params, list): args = params kwargs = {} elif isinstance(params, tuple): args, kwargs = params return args, kwargs def assign(self, loglevel): ''' Assign tasks to workers. ''' for i, params in enumerate(self.queue): args, kwargs = self.resolve(params) worker = Worker(i, self.func, args, kwargs, loglevel) self.tasks.put(worker, block=False) def join(self): ''' Put all tasks to None and join the queue. ''' for i in range(len(self.consumers)): self.tasks.put(None, block=False) self.tasks.join() def get(self): ''' Extract and re-order results. ''' outputs, idxs = [], [] for i in range(len(self.queue)): wid, out = self.results.get() outputs.append(out) idxs.append(wid) return [x for _, x in sorted(zip(idxs, outputs))] def stop(self): ''' Close tasks and results queues. ''' self.tasks.close() self.results.close() def run(self, mpi=False, loglevel=logging.INFO): ''' Run batch with or without multiprocessing. ''' if mpi: self.start() self.assign(loglevel) self.join() outputs = self.get() self.stop() else: outputs = [] for params in self.queue: args, kwargs = self.resolve(params) outputs.append(self.func(*args, **kwargs)) return outputs @staticmethod def createQueue(*dims): ''' Create a serialized 2D array of all parameter combinations for a series of individual parameter sweeps. :param dims: list of lists (or 1D arrays) of input parameters :return: list of parameters (list) for each simulation ''' ndims = len(dims) dims_in = [dims[1], dims[0]] inds_out = [1, 0] if ndims > 2: dims_in += dims[2:] inds_out += list(range(2, ndims)) queue = np.stack(np.meshgrid(*dims_in), -1).reshape(-1, ndims) queue = queue[:, inds_out] return queue.tolist() @staticmethod def printQueue(queue, nmax=20): if len(queue) <= nmax: for x in queue: print(x) else: for x in queue[:nmax // 2]: print(x) print(f'... {len(queue) - nmax} more entries ...') for x in queue[-nmax // 2:]: print(x) class LogBatch(metaclass=abc.ABCMeta): ''' Generic interface to a simulation batch in with real-time input:output caching in a specific log file. ''' delimiter = '\t' # csv delimiter rtol = 1e-9 atol = 1e-16 def __init__(self, inputs, root='.'): ''' Construtor. :param inputs: array of batch inputs :param root: root for IO operations ''' self.inputs = inputs self.root = root self.fpath = self.filepath() @property def root(self): return self._root @root.setter def root(self, value): if not os.path.isdir(value): raise ValueError(f'{value} is not a valid directory') self._root = value @property @abc.abstractmethod def in_key(self): ''' Input key. ''' raise NotImplementedError @property @abc.abstractmethod def out_keys(self): ''' Output keys. ''' raise NotImplementedError @property @abc.abstractmethod def suffix(self): ''' filename suffix ''' raise NotImplementedError @property @abc.abstractmethod def unit(self): ''' Input unit. ''' raise NotImplementedError @property def in_label(self): ''' Input label. ''' return f'{self.in_key} ({self.unit})' def rangecode(self, x, label, unit): ''' String describing a batch input range. ''' bounds_str = si_format([x.min(), x.max()], 1, space="") return '{0}{1}{3}-{2}{3}_{4}'.format(label.replace(' ', '_'), *bounds_str, unit, x.size) @property def inputscode(self): ''' String describing the batch inputs. ''' return self.rangecode(self.inputs, self.in_key, self.unit) @abc.abstractmethod def corecode(self): ''' String describing the batch core components. ''' raise NotImplementedError def filecode(self): ''' String fully describing the batch. ''' return f'{self.corecode()}_{self.inputscode}_{self.suffix}_results' def filename(self): ''' Batch associated filename. ''' return f'{self.filecode()}.csv' def filepath(self): ''' Batch associated filepath. ''' return os.path.join(self.root, self.filename()) def createLogFile(self): ''' Create batch log file if it does not exist. ''' if not os.path.isfile(self.fpath): logger.debug(f'creating batch log file: "{self.fpath}"') self.writeLabels() else: logger.debug(f'existing batch log file: "{self.fpath}"') def writeLabels(self): ''' Write the column labels of the batch log file. ''' with open(self.fpath, 'w') as csvfile: writer = csv.writer(csvfile, delimiter=self.delimiter) writer.writerow([self.in_label, *self.out_keys]) def writeEntry(self, entry): ''' Write a new input(s):ouput(s) entry in the batch log file. ''' with open(self.fpath, 'a', newline='') as csvfile: writer = csv.writer(csvfile, delimiter=self.delimiter) writer.writerow(entry) def getLogData(self): ''' Retrieve the batch log file data (inputs and outputs) as a dataframe. ''' return pd.read_csv(self.fpath, sep=self.delimiter).sort_values(self.in_label) def getInput(self): ''' Retrieve the logged batch inputs as an array. ''' return self.getLogData()[self.in_label].values def getSerializedOutput(self): ''' Retrieve the logged batch outputs as an array (if 1 key) or dataframe (if several). ''' if len(self.out_keys) == 1: return self.getLogData()[self.out_keys[0]].values else: return pd.DataFrame({k: self.getLogData()[k].values for k in self.out_keys}) def getOutput(self): return self.getSerializedOutput() def getEntryIndex(self, entry): ''' Get the index corresponding to a given entry. ''' inputs = self.getInput() if len(inputs) == 0: raise ValueError(f'no entries in batch') close = np.isclose(inputs, entry, rtol=self.rtol, atol=self.atol) imatches = np.where(close)[0] if len(imatches) == 0: raise ValueError(f'{entry} entry not found in batch log') elif len(imatches) > 1: raise ValueError(f'duplicate {entry} entry found in batch log') return imatches[0] def getEntryOutput(self, entry): imatch = self.getEntryIndex(entry) return self.getSerializedOutput()[imatch] def isEntry(self, value): ''' Check if a given input is logged in the batch log file. ''' try: self.getEntryIndex(value) return True except ValueError: return False @abc.abstractmethod def compute(self, x): ''' Compute the necessary output(s) for a given input. ''' raise NotImplementedError def computeAndLog(self, x): ''' Compute output(s) and log new entry only if input is not already in the log file. ''' if not self.isEntry(x): logger.debug(f'entry not found: "{x}"') out = self.compute(x) if not isIterable(x): x = [x] if not isIterable(out): out = [out] entry = [*x, *out] if not self.mpi: self.writeEntry(entry) return entry else: logger.debug(f'existing entry: "{x}"') return None def run(self, mpi=False): ''' Run the batch and return the output(s). ''' self.createLogFile() if len(self.getLogData()) < len(self.inputs): batch = Batch(self.computeAndLog, [[x] for x in self.inputs]) self.mpi = mpi outputs = batch.run(mpi=mpi, loglevel=logger.level) outputs = filter(lambda x: x is not None, outputs) if mpi: for out in outputs: self.writeEntry(out) self.mpi = False else: logger.debug('all entries already present') return self.getOutput() diff --git a/PySONIC/core/bls.py b/PySONIC/core/bls.py index 19ef1ec..9b8e1a3 100644 --- a/PySONIC/core/bls.py +++ b/PySONIC/core/bls.py @@ -1,823 +1,828 @@ # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Email: theo.lemaire@epfl.ch # @Date: 2016-09-29 16:16:19 # @Last Modified by: Theo Lemaire -# @Last Modified time: 2021-03-23 17:16:05 +# @Last Modified time: 2021-03-26 18:28:04 from enum import Enum import os import json import numpy as np import scipy.integrate as integrate from scipy.optimize import brentq, curve_fit from .model import Model from .lookups import EffectiveVariablesLookup from .solvers import PeriodicSolver from .drives import Drive, AcousticDrive from ..utils import logger, si_format, isIterable, LOOKUP_DIR from ..constants import * class PmCompMethod(Enum): ''' Enum: types of computation method for the intermolecular pressure ''' direct = 1 predict = 2 def LennardJones(x, beta, alpha, C, m, n): ''' Generic expression of a Lennard-Jones function, adapted for the context of symmetric deflection (distance = 2x). :param x: deflection (i.e. half-distance) :param beta: x-shifting factor :param alpha: x-scaling factor :param C: y-scaling factor :param m: exponent of the repulsion term :param n: exponent of the attraction term :return: Lennard-Jones potential at given distance (2x) ''' return C * (np.power((alpha / (2 * x + beta)), m) - np.power((alpha / (2 * x + beta)), n)) def lookup(func): ''' Load parameters from lookup file, or compute them and store them in lookup file. ''' lookup_path = os.path.join(os.path.split(__file__)[0], 'bls_lookups.json') def wrapper(obj): akey = f'{obj.a * 1e9:.1f}' Qkey = f'{obj.Qm0 * 1e5:.2f}' # Open lookup files try: with open(lookup_path, 'r') as fh: lookups = json.load(fh) except FileNotFoundError: lookups = {} # If info not in lookups, compute parameters and add them if akey not in lookups or Qkey not in lookups[akey]: func(obj) if akey not in lookups: lookups[akey] = {Qkey: {'LJ_approx': obj.LJ_approx, 'Delta_eq': obj.Delta}} else: lookups[akey][Qkey] = {'LJ_approx': obj.LJ_approx, 'Delta_eq': obj.Delta} logger.debug('Saving BLS derived parameters to lookup file') with open(lookup_path, 'w') as fh: json.dump(lookups, fh, indent=2) # If lookup exists, load parameters from it else: logger.debug('Loading BLS derived parameters from lookup file') obj.LJ_approx = lookups[akey][Qkey]['LJ_approx'] obj.Delta = lookups[akey][Qkey]['Delta_eq'] return wrapper class BilayerSonophore(Model): ''' Definition of the Bilayer Sonophore Model - geometry - pressure terms - cavitation dynamics ''' # BIOMECHANICAL PARAMETERS T = 309.15 # Temperature (K) delta0 = 2.0e-9 # Thickness of the leaflet (m) Delta_ = 1.4e-9 # Initial gap between the two leaflets on a non-charged membrane at equil. (m) pDelta = 1.0e5 # Attraction/repulsion pressure coefficient (Pa) m = 5.0 # Exponent in the repulsion term (dimensionless) n = 3.3 # Exponent in the attraction term (dimensionless) rhoL = 1075.0 # Density of the surrounding fluid (kg/m^3) muL = 7.0e-4 # Dynamic viscosity of the surrounding fluid (Pa.s) muS = 0.035 # Dynamic viscosity of the leaflet (Pa.s) kA = 0.24 # Area compression modulus of the leaflet (N/m) alpha = 7.56 # Tissue shear loss modulus frequency coefficient (Pa.s) C0 = 0.62 # Initial gas molar concentration in the surrounding fluid (mol/m^3) kH = 1.613e5 # Henry's constant (Pa.m^3/mol) P0 = 1.0e5 # Static pressure in the surrounding fluid (Pa) Dgl = 3.68e-9 # Diffusion coefficient of gas in the fluid (m^2/s) xi = 0.5e-9 # Boundary layer thickness for gas transport across leaflet (m) c = 1515.0 # Speed of sound in medium (m/s) # BIOPHYSICAL PARAMETERS epsilon0 = 8.854e-12 # Vacuum permittivity (F/m) epsilonR = 1.0 # Relative permittivity of intramembrane cavity (dimensionless) rel_Zmin = -0.49 # relative deflection range lower bound (in multiples of Delta) tscale = 'us' # relevant temporal scale of the model simkey = 'MECH' # keyword used to characterize simulations made with this model def __init__(self, a, Cm0, Qm0, embedding_depth=0.0): ''' Constructor of the class. :param a: in-plane radius of the sonophore structure within the membrane (m) :param Cm0: membrane resting capacitance (F/m2) :param Qm0: membrane resting charge density (C/m2) :param embedding_depth: depth of the embedding tissue around the membrane (m) ''' # Extract resting constants and geometry self.Cm0 = Cm0 self.Qm0 = Qm0 self.a = a self.d = embedding_depth self.S0 = np.pi * self.a**2 # Initialize null elastic modulus for tissue self.kA_tissue = 0. # Compute Pm params self.computePMparams() # Compute initial volume and gas content self.V0 = np.pi * self.Delta * self.a**2 self.ng0 = self.gasPa2mol(self.P0, self.V0) def copy(self): return self.__class__(self.a, self.Cm0, self.Qm0, embedding_depth=self.d) @property def a(self): return self._a @a.setter def a(self, value): if value <= 0.: raise ValueError('Sonophore radius must be positive') self._a = value @property def Cm0(self): return self._Cm0 @Cm0.setter def Cm0(self, value): if value <= 0.: raise ValueError('Resting membrane capacitance must be positive') self._Cm0 = value @property def d(self): return self._d @d.setter def d(self, value): if value < 0.: raise ValueError('Embedding depth cannot be negative') self._d = value def __repr__(self): s = f'{self.__class__.__name__}({self.a * 1e9:.1f} nm' if self.d > 0.: s += f', d={si_format(self.d, precision=1)}m' return f'{s})' @property def meta(self): return { 'a': self.a, 'd': self.d, 'Cm0': self.Cm0, 'Qm0': self.Qm0, } @classmethod def initFromMeta(cls, d): return cls(d['a'], d['Cm0'], d['Qm0']) @staticmethod def inputs(): return { 'a': { 'desc': 'sonophore radius', 'label': 'a', 'unit': 'm', 'precision': 0 }, 'Qm': { 'desc': 'membrane charge density', 'label': 'Q_m', 'unit': 'nC/cm^2', 'factor': 1e5, 'precision': 1 }, **AcousticDrive.inputs() } def filecodes(self, drive, Qm, PmCompMethod='predict'): if isIterable(Qm): Qm_code = f'{Qm.min() * 1e5:.1f}nCcm2_{Qm.max() * 1e5:.1f}nCcm2_{Qm.size}' else: Qm_code = f'{Qm * 1e5:.1f}nCcm2' return { 'simkey': self.simkey, 'a': f'{self.a * 1e9:.0f}nm', **drive.filecodes, 'Qm': Qm_code } @staticmethod def getPltVars(wl='df["', wr='"]'): return { 'Pac': { 'desc': 'acoustic pressure', 'label': 'P_{AC}', 'unit': 'kPa', 'factor': 1e-3, 'func': f'meta["drive"].compute({wl}t{wr})' }, 'Z': { 'desc': 'leaflets deflection', 'label': 'Z', 'unit': 'nm', 'factor': 1e9, 'bounds': (-1.0, 10.0) }, 'ng': { 'desc': 'gas content', 'label': 'n_g', 'unit': '10^{-22}\ mol', 'factor': 1e22, 'bounds': (1.0, 15.0) }, 'Pmavg': { 'desc': 'average intermolecular pressure', 'label': 'P_M', 'unit': 'kPa', 'factor': 1e-3, 'func': f'PMavgpred({wl}Z{wr})' }, 'Telastic': { 'desc': 'leaflet elastic tension', 'label': 'T_E', 'unit': 'mN/m', 'factor': 1e3, 'func': f'TEleaflet({wl}Z{wr})' }, 'Cm': { 'desc': 'membrane capacitance', 'label': 'C_m', 'unit': 'uF/cm^2', 'factor': 1e2, 'bounds': (0.0, 1.5), 'func': f'v_capacitance({wl}Z{wr})' } } @property def pltScheme(self): return { 'P_{AC}': ['Pac'], 'Z': ['Z'], 'n_g': ['ng'] } @property def Zmin(self): return self.rel_Zmin * self.Delta def curvrad(self, Z): ''' Leaflet curvature radius (signed variable) :param Z: leaflet apex deflection (m) :return: leaflet curvature radius (m) ''' if Z == 0.0: return np.inf else: return (self.a**2 + Z**2) / (2 * Z) def v_curvrad(self, Z): ''' Vectorized curvrad function ''' return np.array(list(map(self.curvrad, Z))) def surface(self, Z): ''' Surface area of the stretched leaflet (spherical cap formula) :param Z: leaflet apex deflection (m) :return: stretched leaflet surface (m^2) ''' return np.pi * (self.a**2 + Z**2) def volume(self, Z): ''' Volume of the inter-leaflet space (cylinder +/- 2 spherical caps) :param Z: leaflet apex deflection (m) :return: bilayer sonophore inner volume (m^3) ''' return np.pi * self.a**2 * self.Delta\ * (1 + (Z / (3 * self.Delta) * (3 + Z**2 / self.a**2))) def arealStrain(self, Z): ''' Areal strain of the stretched leaflet epsilon = (S - S0)/S0 = (Z/a)^2 :param Z: leaflet apex deflection (m) :return: areal strain (dimensionless) ''' return (Z / self.a)**2 def logRelGap(self, Z): ''' Logarithm of relative sonophore deflection for a given deflection Z. ''' return np.log((2 * Z + self.Delta) / self.Delta) def capacitance(self, Z): ''' Membrane capacitance (parallel-plate capacitor evaluated at average inter-layer distance) :param Z: leaflet apex deflection (m) :return: capacitance per unit area (F/m2) ''' if Z == 0.0: return self.Cm0 else: Z2 = (self.a**2 - Z**2 - Z * self.Delta) / (2 * Z) return self.Cm0 * self.Delta / self.a**2 * (Z + Z2 * self.logRelGap(Z)) def v_capacitance(self, Z): ''' Vectorized capacitance function ''' return np.array(list(map(self.capacitance, Z))) def derCapacitance(self, Z, U): ''' Evolution of membrane capacitance :param Z: leaflet apex deflection (m) :param U: leaflet apex deflection velocity (m/s) :return: time derivative of capacitance per unit area (F/m2.s) ''' ratio1 = (Z**2 + self.a**2) / (Z * (2 * Z + self.Delta)) ratio2 = (Z**2 + self.a**2) / (2 * Z**2) * self.logRelGap(Z) dCmdZ = self.Cm0 * self.Delta / self.a**2 * (ratio1 - ratio2) return dCmdZ * U @staticmethod def localDeflection(r, Z, R): ''' Local leaflet deflection at specific radial distance (signed) :param r: in-plane distance from center of the sonophore (m) :param Z: leaflet apex deflection (m) :param R: leaflet curvature radius (m) :return: local transverse leaflet deviation (m) ''' if np.abs(Z) == 0.0: return 0.0 else: return np.sign(Z) * (np.sqrt(R**2 - r**2) - np.abs(R) + np.abs(Z)) def PMlocal(self, r, Z, R): ''' Local intermolecular pressure :param r: in-plane distance from center of the sonophore (m) :param Z: leaflet apex deflection (m) :param R: leaflet curvature radius (m) :return: local intermolecular pressure (Pa) ''' z = self.localDeflection(r, Z, R) relgap = (2 * z + self.Delta) / self.Delta_ return self.pDelta * ((1 / relgap)**self.m - (1 / relgap)**self.n) def PMavg(self, Z, R, S): ''' Average intermolecular pressure across the leaflet (computed by quadratic integration) :param Z: leaflet apex outward deflection value (m) :param R: leaflet curvature radius (m) :param S: surface of the stretched leaflet (m^2) :return: averaged intermolecular resultant pressure (Pa) .. warning:: quadratic integration is computationally expensive. ''' # Integrate intermolecular force over an infinitely thin ring of radius r from 0 to a fTotal, _ = integrate.quad(lambda r, Z, R: 2 * np.pi * r * self.PMlocal(r, Z, R), 0, self.a, args=(Z, R)) return fTotal / S def v_PMavg(self, Z, R, S): ''' Vectorized PMavg function ''' return np.array(list(map(self.PMavg, Z, R, S))) def LJfitPMavg(self): ''' Determine optimal parameters of a Lennard-Jones expression approximating the average intermolecular pressure. These parameters are obtained by a nonlinear fit of the Lennard-Jones function for a range of deflection values between predetermined Zmin and Zmax. :return: 3-tuple with optimized LJ parameters for PmAvg prediction (Map) and the standard and max errors of the prediction in the fitting range (in Pascals) ''' # Determine lower bound of deflection range: when Pm = Pmmax PMmax = LJFIT_PM_MAX # Pa Zlb_range = (self.Zmin, 0.0) Zlb = brentq(lambda Z, Pmmax: self.PMavg(Z, self.curvrad(Z), self.surface(Z)) - PMmax, *Zlb_range, args=(PMmax), xtol=1e-16) # Create vectors for geometric variables Zub = 2 * self.a Z = np.arange(Zlb, Zub, 1e-11) Pmavg = self.v_PMavg(Z, self.v_curvrad(Z), self.surface(Z)) # Compute optimal nonlinear fit of custom LJ function with initial guess x0_guess = self.delta0 C_guess = 0.1 * self.pDelta nrep_guess = self.m nattr_guess = self.n pguess = (x0_guess, C_guess, nrep_guess, nattr_guess) popt, _ = curve_fit(lambda x, x0, C, nrep, nattr: LennardJones(x, self.Delta, x0, C, nrep, nattr), Z, Pmavg, p0=pguess, maxfev=100000) (x0_opt, C_opt, nrep_opt, nattr_opt) = popt Pmavg_fit = LennardJones(Z, self.Delta, x0_opt, C_opt, nrep_opt, nattr_opt) # Compute prediction error residuals = Pmavg - Pmavg_fit ss_res = np.sum(residuals**2) N = residuals.size std_err = np.sqrt(ss_res / N) max_err = max(np.abs(residuals)) logger.debug('LJ approx: x0 = %.2f nm, C = %.2f kPa, m = %.2f, n = %.2f', x0_opt * 1e9, C_opt * 1e-3, nrep_opt, nattr_opt) LJ_approx = {"x0": x0_opt, "C": C_opt, "nrep": nrep_opt, "nattr": nattr_opt} return (LJ_approx, std_err, max_err) @lookup def computePMparams(self): # Find Delta that cancels out Pm + Pec at Z = 0 (m) if self.Qm0 == 0.0: D_eq = self.Delta_ else: (D_eq, Pnet_eq) = self.findDeltaEq(self.Qm0) assert Pnet_eq < PNET_EQ_MAX, 'High Pnet at Z = 0 with ∆ = %.2f nm' % (D_eq * 1e9) self.Delta = D_eq # Find optimal Lennard-Jones parameters to approximate PMavg (self.LJ_approx, std_err, _) = self.LJfitPMavg() assert std_err < PMAVG_STD_ERR_MAX, 'High error in PmAvg nonlinear fit:'\ ' std_err = %.2f Pa' % std_err def PMavgpred(self, Z): ''' Approximated average intermolecular pressure (using nonlinearly fitted Lennard-Jones function) :param Z: leaflet apex deflection (m) :return: predicted average intermolecular pressure (Pa) ''' return LennardJones(Z, self.Delta, self.LJ_approx['x0'], self.LJ_approx['C'], self.LJ_approx['nrep'], self.LJ_approx['nattr']) def Pelec(self, Z, Qm): ''' Electrical pressure term :param Z: leaflet apex deflection (m) :param Qm: membrane charge density (C/m2) :return: electrical pressure (Pa) ''' relS = self.S0 / self.surface(Z) abs_perm = self.epsilon0 * self.epsilonR # F/m return - relS * Qm**2 / (2 * abs_perm) # Pa def findDeltaEq(self, Qm): ''' Compute the Delta that cancels out the (Pm + Pec) equation at Z = 0 for a given membrane charge density, using the Brent method to refine the pressure root iteratively. :param Qm: membrane charge density (C/m2) :return: equilibrium value (m) and associated pressure (Pa) ''' def dualPressure(Delta): x = (self.Delta_ / Delta) return (self.pDelta * (x**self.m - x**self.n) + self.Pelec(0.0, Qm)) Delta_eq = brentq(dualPressure, 0.1 * self.Delta_, 2.0 * self.Delta_, xtol=1e-16) logger.debug('∆eq = %.2f nm', Delta_eq * 1e9) return (Delta_eq, dualPressure(Delta_eq)) def gasFlux(self, Z, P): ''' Gas molar flux through the sonophore boundary layers :param Z: leaflet apex deflection (m) :param P: internal gas pressure (Pa) :return: gas molar flux (mol/s) ''' dC = self.C0 - P / self.kH return 2 * self.surface(Z) * self.Dgl * dC / self.xi @classmethod def gasmol2Pa(cls, ng, V): ''' Internal gas pressure for a given molar content :param ng: internal molar content (mol) :param V: sonophore inner volume (m^3) :return: internal gas pressure (Pa) ''' return ng * Rg * cls.T / V @classmethod def gasPa2mol(cls, P, V): ''' Internal gas molar content for a given pressure :param P: internal gas pressure (Pa) :param V: sonophore inner volume (m^3) :return: internal gas molar content (mol) ''' return P * V / (Rg * cls.T) def PtotQS(self, Z, ng, Qm, Pac, Pm_comp_method): ''' Net quasi-steady pressure for a given acoustic pressure (Ptot = Pm + Pg + Pec - P0 - Pac) :param Z: leaflet apex deflection (m) :param ng: internal molar content (mol) :param Qm: membrane charge density (C/m2) :param Pac: acoustic pressure (Pa) :param Pm_comp_method: computation method for average intermolecular pressure :return: total balance pressure (Pa) ''' if Pm_comp_method is PmCompMethod.direct: Pm = self.PMavg(Z, self.curvrad(Z), self.surface(Z)) elif Pm_comp_method is PmCompMethod.predict: Pm = self.PMavgpred(Z) return Pm + self.gasmol2Pa(ng, self.volume(Z)) - self.P0 - Pac + self.Pelec(Z, Qm) def balancedefQS(self, ng, Qm, Pac=0.0, Pm_comp_method=PmCompMethod.predict): ''' Quasi-steady equilibrium deflection for a given acoustic pressure (computed by approximating the root of quasi-steady pressure) :param ng: internal molar content (mol) :param Qm: membrane charge density (C/m2) :param Pac: external acoustic perturbation (Pa) :param Pm_comp_method: computation method for average intermolecular pressure :return: leaflet deflection canceling quasi-steady pressure (m) ''' Zbounds = (self.Zmin, self.a) - Plb, Pub = [self.PtotQS(x, ng, Qm, Pac, Pm_comp_method) for x in Zbounds] - assert (Plb > 0 > Pub), '[{}, {}] is not a sign changing interval for PtotQS'.format(*Zbounds) + PQS = [self.PtotQS(x, ng, Qm, Pac, Pm_comp_method) for x in Zbounds] + if not (PQS[0] > 0 > PQS[1]): + s = 'P_QS not changing sign within [{:.2f}, {:.2f}] nm interval: '.format( + *np.array(Zbounds) * 1e9) + s += ', '.join([ + f'P_QS({Z * 1e9:.2f} nm) = {si_format(P, 2)}Pa' for Z, P in zip(Zbounds, PQS)]) + raise ValueError(s) return brentq(self.PtotQS, *Zbounds, args=(ng, Qm, Pac, Pm_comp_method), xtol=1e-16) def TEleaflet(self, Z): ''' Elastic tension in leaflet :param Z: leaflet apex deflection (m) :return: circumferential elastic tension (N/m) ''' return self.kA * self.arealStrain(Z) def setTissueModulus(self, drive): ''' Set the frequency-dependent elastic modulus of the surrounding tissue. ''' G_tissue = self.alpha * drive.modulationFrequency # G'' (Pa) self.kA_tissue = 2 * G_tissue * self.d # kA of the tissue layer (N/m) def TEtissue(self, Z): ''' Elastic tension in surrounding viscoelastic layer :param Z: leaflet apex deflection (m) :return: circumferential elastic tension (N/m) ''' return self.kA_tissue * self.arealStrain(Z) def TEtot(self, Z): ''' Total elastic tension (leaflet + surrounding viscoelastic layer) :param Z: leaflet apex deflection (m) :return: circumferential elastic tension (N/m) ''' return self.TEleaflet(Z) + self.TEtissue(Z) def PEtot(self, Z, R): ''' Total elastic tension pressure (leaflet + surrounding viscoelastic layer) :param Z: leaflet apex deflection (m) :param R: leaflet curvature radius (m) :return: elastic tension pressure (Pa) ''' return - self.TEtot(Z) / R @classmethod def PVleaflet(cls, U, R): ''' Viscous stress pressure in leaflet :param U: leaflet apex deflection velocity (m/s) :param R: leaflet curvature radius (m) :return: leaflet viscous stress pressure (Pa) ''' return - 12 * U * cls.delta0 * cls.muS / R**2 @classmethod def PVfluid(cls, U, R): ''' Viscous stress pressure in surrounding medium :param U: leaflet apex deflection velocity (m/s) :param R: leaflet curvature radius (m) :return: fluid viscous stress pressure (Pa) ''' return - 4 * U * cls.muL / np.abs(R) @classmethod def accP(cls, Ptot, R): ''' Leaflet transverse acceleration resulting from pressure imbalance :param Ptot: net pressure (Pa) :param R: leaflet curvature radius (m) :return: pressure-driven acceleration (m/s^2) ''' return Ptot / (cls.rhoL * np.abs(R)) @staticmethod def accNL(U, R): ''' Leaflet transverse nonlinear acceleration :param U: leaflet apex deflection velocity (m/s) :param R: leaflet curvature radius (m) :return: nonlinear acceleration term (m/s^2) .. note:: A simplified version of nonlinear acceleration (neglecting dR/dH) is used here. ''' # return - (3/2 - 2*R/H) * U**2 / R return -(3 * U**2) / (2 * R) @staticmethod def checkInputs(drive, Qm, Pm_comp_method): ''' Check validity of stimulation parameters :param drive: acoustic drive object :param Qm: imposed membrane charge density (C/m2) :param Pm_comp_method: type of method used to compute average intermolecular pressure ''' if not isinstance(drive, Drive): raise TypeError(f'Invalid "drive" parameter (must be an "Drive" object)') if not (isinstance(Qm, float) or isIterable(Qm)): raise TypeError(f'Invalid "Qm" parameter (must be a scalar or T-periodic vector)') if isIterable(Qm): if len(Qm) != drive.nPerCycle: raise ValueError( f'Qm size ({Qm.size}) differs from drive granularity ({drive.nPerCycle})') Qmin, Qmax = CHARGE_RANGE if np.min(Qm) < Qmin or np.max(Qm) > Qmax: raise ValueError( f'Invalid applied charge: {Qm * 1e5} nC/cm2 (must be within [{Qmin * 1e5}, {Qmax * 1e5}] interval') if not isinstance(Pm_comp_method, PmCompMethod): raise TypeError('Invalid Pm computation method (must be "PmCompmethod" type)') def derivatives(self, t, y, drive, Qm, Pm_comp_method=PmCompMethod.predict): ''' Evolution of the mechanical system :param t: time instant (s) :param y: vector of HH system variables at time t :param drive: acoustic drive object :param Qm: membrane charge density (F/m2) :param Pm_comp_method: computation method for average intermolecular pressure :return: vector of mechanical system derivatives at time t ''' # Split input vector explicitly U, Z, ng = y # Correct deflection value is below critical compression if Z < self.Zmin: logger.warning('Deflection out of range: Z = %.2f nm', Z * 1e9) Z = self.Zmin # Compute curvature radius R = self.curvrad(Z) # Compute total pressure Pg = self.gasmol2Pa(ng, self.volume(Z)) if Pm_comp_method is PmCompMethod.direct: Pm = self.PMavg(Z, self.curvrad(Z), self.surface(Z)) elif Pm_comp_method is PmCompMethod.predict: Pm = self.PMavgpred(Z) Pac = drive.compute(t) Pv = self.PVleaflet(U, R) + self.PVfluid(U, R) Ptot = Pm + Pg - self.P0 - Pac + self.PEtot(Z, R) + Pv + self.Pelec(Z, Qm) # Compute derivatives dUdt = self.accP(Ptot, R) + self.accNL(U, R) dZdt = U dngdt = self.gasFlux(Z, Pg) # Return derivatives vector return [dUdt, dZdt, dngdt] def computeInitialDeflection(self, drive, Qm, dt, Pm_comp_method=PmCompMethod.predict): ''' Compute non-zero deflection value for a small perturbation (solving quasi-steady equation). ''' Pac = drive.compute(dt) return self.balancedefQS(self.ng0, Qm, Pac, Pm_comp_method) @classmethod @Model.checkOutputDir def simQueue(cls, freqs, amps, charges, **kwargs): drives = AcousticDrive.createQueue(freqs, amps) queue = [] for drive in drives: for Qm in charges: queue.append([drive, Qm]) return queue def initialConditions(self, *args, **kwargs): ''' Compute simulation initial conditions. ''' # Compute initial non-zero deflection Z = self.computeInitialDeflection(*args, **kwargs) # Return initial conditions dictionary return { 'U': [0.] * 2, 'Z': [0., Z], 'ng': [self.ng0] * 2, } def simCycles(self, drive, Qm, nmax=None, nmin=None, Pm_comp_method=PmCompMethod.predict): ''' Simulate for a specific number of cycles or until periodic stabilization, for a specific set of ultrasound parameters, and return output data in a dataframe. :param drive: acoustic drive object :param Qm: imposed membrane charge density (C/m2) :param n: number of cycles (optional) :param Pm_comp_method: type of method used to compute average intermolecular pressure :return: output dataframe ''' # Set the tissue elastic modulus self.setTissueModulus(drive) # Adapt Qm(t) function to charge input type if isinstance(Qm, float): # If float, simply use a constant applied charge Qm0, Qm_t = Qm, lambda t: Qm elif isIterable(Qm): # If iterable, recast as 1d array, check size, and use a time-varying charge Qm0, Qm_t = Qm[0], lambda t: Qm[int((t % drive.periodicity) / drive.dt)] else: raise ValueError('unknown charge input type') # Compute initial conditions y0 = self.initialConditions(drive, Qm0, drive.dt, Pm_comp_method=Pm_comp_method) # Initialize solver and compute solution solver = PeriodicSolver( drive.periodicity, # periodicty y0.keys(), # variables list lambda t, y: self.derivatives(t, y, drive, Qm_t(t), Pm_comp_method), # dfunc primary_vars=['Z', 'ng'], # primary variables dt=drive.dt # time step ) data = solver(y0, nmax=nmax, nmin=nmin) # Remove velocity timeries from solution del data['U'] # Return solution dataframe return data @Model.addMeta @Model.logDesc @Model.checkSimParams def simulate(self, drive, Qm, Pm_comp_method=PmCompMethod.predict): ''' Wrapper around the simUntilConvergence method, with decorators. ''' return self.simCycles(drive, Qm, Pm_comp_method=Pm_comp_method) def desc(self, meta): Qm = meta['Qm'] if isIterable(Qm): Qstr = f'US-periodic function within [{Qm.min() * 1e5:.2f}, {Qm.max() * 1e5:.2f}] nC/cm2' else: Qstr = f'{si_format(Qm * 1e-4, 2)}C/cm2' return f'{self}: simulation @ {meta["drive"].desc}, Q = {Qstr}' @Model.logDesc def getZlast(self, drive, Qm): ''' Run simulation and extract deflection vector from last cycle. ''' return self.simCycles(drive, Qm).tail(NPC_DENSE)['Z'].values # m def getRelCmCycle(self, *args, **kwargs): ''' Run simulation and extract relative capacitance vector from last cycle. ''' return self.v_capacitance(self.getZlast(*args, **kwargs)) / self.Cm0 @property def Cm_lkp_filename(self): return f'Cm_lkp_{self.a * 1e9:.0f}nm.pkl' @property def Cm_lkp_filepath(self): return os.path.join(LOOKUP_DIR, self.Cm_lkp_filename) @property def Cm_lkp(self): return EffectiveVariablesLookup.fromPickle(self.Cm_lkp_filepath) def getGammaLookup(self): return self.Cm_lkp.reduce(lambda x, **kwargs: np.ptp(x, **kwargs) / 2, 't') diff --git a/PySONIC/core/nbls.py b/PySONIC/core/nbls.py index 81e95a8..fc49c76 100644 --- a/PySONIC/core/nbls.py +++ b/PySONIC/core/nbls.py @@ -1,712 +1,714 @@ # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Email: theo.lemaire@epfl.ch # @Date: 2016-09-29 16:16:19 # @Last Modified by: Theo Lemaire -# @Last Modified time: 2021-03-23 21:31:15 +# @Last Modified time: 2021-03-26 18:42:04 import logging import numpy as np from .solvers import EventDrivenSolver, HybridSolver from .bls import BilayerSonophore from .pneuron import PointNeuron from .model import Model from .drives import AcousticDrive, ElectricDrive from .protocols import * from ..utils import * from ..constants import * from ..postpro import getFixedPoints from .lookups import EffectiveVariablesLookup from ..neurons import getPointNeuron class NeuronalBilayerSonophore(BilayerSonophore): ''' This class inherits from the BilayerSonophore class and receives an PointNeuron instance at initialization, to define the electro-mechanical NICE model and its SONIC variant. ''' tscale = 'ms' # relevant temporal scale of the model simkey = 'ASTIM' # keyword used to characterize simulations made with this model def __init__(self, a, pneuron, embedding_depth=0.0): ''' Constructor of the class. :param a: in-plane radius of the sonophore structure within the membrane (m) :param pneuron: point-neuron model :param embedding_depth: depth of the embedding tissue around the membrane (m) ''' self.pneuron = pneuron super().__init__(a, pneuron.Cm0, pneuron.Qm0, embedding_depth=embedding_depth) @property def a_str(self): return f'{self.a * 1e9:.1f} nm' def __repr__(self): s = f'{self.__class__.__name__}({self.a_str}, {self.pneuron}' if self.d > 0.: s += f', d={si_format(self.d, precision=1)}m' return f'{s})' def copy(self): return self.__class__(self.a, self.pneuron, embedding_depth=self.d) def __eq__(self, other): if not isinstance(other, self.__class__): return False return self.a == other.a and self.pneuron == other.pneuron and self.d == other.d @property def pneuron(self): return self._pneuron @pneuron.setter def pneuron(self, value): if not isinstance(value, PointNeuron): raise ValueError(f'{value} is not a valid PointNeuron instance') if not hasattr(self, '_pneuron') or value != self._pneuron: self._pneuron = value if hasattr(self, 'a'): super().__init__(self.a, self.pneuron.Cm0, self.pneuron.Qm0, embedding_depth=self.d) @property def meta(self): return { 'neuron': self.pneuron.name, 'a': self.a, 'd': self.d, } @classmethod def initFromMeta(cls, meta): return cls(meta['a'], getPointNeuron(meta['neuron']), embedding_depth=meta['d']) def params(self): return {**super().params(), **self.pneuron.params()} def getPltVars(self, wrapleft='df["', wrapright='"]'): return {**super().getPltVars(wrapleft, wrapright), **self.pneuron.getPltVars(wrapleft, wrapright)} @property def pltScheme(self): return self.pneuron.pltScheme def filecode(self, *args): return Model.filecode(self, *args) @staticmethod def inputs(): # Get parent input vars and supress irrelevant entries inputvars = BilayerSonophore.inputs() del inputvars['Qm'] # Fill in current input vars in appropriate order inputvars.update({ **AcousticDrive.inputs(), 'fs': { 'desc': 'sonophore membrane coverage fraction', 'label': 'f_s', 'unit': '\%', 'factor': 1e2, 'precision': 0 }, 'method': None }) return inputvars def filecodes(self, drive, pp, fs, method, qss_vars): codes = { 'simkey': self.simkey, 'neuron': self.pneuron.name, 'nature': pp.nature, 'a': f'{self.a * 1e9:.0f}nm', **drive.filecodes, **pp.filecodes, } codes['fs'] = f'fs{fs * 1e2:.0f}%' if fs < 1 else None codes['method'] = method codes['qss_vars'] = qss_vars return codes @staticmethod def interpEffVariable(key, Qm, stim, lkp): ''' Interpolate Q-dependent effective variable along various stimulation states of a solution. :param key: lookup variable key :param Qm: charge density solution vector :param stim: stimulation state solution vector :param lkp: 2D lookup object :return: interpolated effective variable vector ''' x = np.zeros(stim.size) stim_vals = np.unique(stim) for s in stim_vals: x[stim == s] = lkp.project('A', s).interpVar1D(Qm[stim == s], key) return x @staticmethod def spatialAverage(fs, x, x0): ''' fs-modulated spatial averaging. ''' return fs * x + (1 - fs) * x0 @timer def computeEffVars(self, drive, fs, Qm0, Qm_overtones=None): ''' Compute "effective" coefficients of the HH system for a specific acoustic stimulus and charge density. A short mechanical simulation is run while imposing the specific charge density, until periodic stabilization. The HH coefficients are then averaged over the last acoustic cycle to yield "effective" coefficients. :param drive: acoustic drive object :param fs: list of sonophore membrane coverage fractions :param Qm: imposed charge density (C/m2) :return: list with computation time and a list of dictionaries of effective variables ''' if not isIterable(fs): fs = [fs] if Qm_overtones is None: # Constant Qm profile Qm_cycle = Qm0 novertones = 0 else: # Qm profile as Fourier series - A_Qm, phi_Qm = Qm_overtones + A_Qm, phi_Qm = list(zip(*Qm_overtones)) Qm_fft = np.hstack(([Qm0 + 0j], A_Qm * (np.cos(phi_Qm) + 1j * np.sin(phi_Qm)))) Qm_cycle = np.fft.irfft(Qm_fft, n=drive.nPerCycle) * drive.nPerCycle novertones = len(A_Qm) # Run simulation and extract capacitance vector from last cycle Z_cycle = super().simCycles(drive, Qm_cycle).tail(drive.nPerCycle)['Z'].values # m Cm_cycle = self.v_capacitance(Z_cycle) # F/m2 # For each coverage fraction effvars_list = [] for x in fs: # Compute membrane potential vector Vm_cycle = Qm_cycle / self.spatialAverage(x, Cm_cycle, self.Cm0) * 1e3 # mV # Compute effective (cycle-average) membrane potential effvars = {'V': np.mean(Vm_cycle)} # If Qm overtones were provided, compute Vm overtones if novertones > 0: # classic Fourier coefficients Vm_coeffs = np.fft.rfft(Vm_cycle)[:novertones + 1] / drive.nPerCycle # amplitude-phase formalism A_Vm, phi_Vm = np.abs(Vm_coeffs), np.angle(Vm_coeffs) for i in range(1, novertones + 1): effvars[f'A_V{i}'] = A_Vm[i] effvars[f'phi_V{i}'] = phi_Vm[i] # Add computed effective rates effvars.update(self.pneuron.getEffRates(Vm_cycle)) # Append to list effvars_list.append(effvars) # Log process log = f'{self}: lookups @ {drive.desc}, Qm0 = {Qm0 * 1e5:.2f} nC/cm2' if Qm_overtones is not None: - log += f', Qm_overtones = {Qm_overtones}' + log = log + ', ' + ', '.join([ + f'Qm{i + 1} = ({x[0] * 1e5:.2f} nC/cm2, {x[1]:.2f} rad)' + for i, x in enumerate(Qm_overtones)]) if len(fs) > 1: log += f', fs = {fs.min() * 1e2:.0f} - {fs.max() * 1e2:.0f}%' logger.info(log) # Return effective coefficients return effvars_list def getLookupFileName(self, a=None, f=None, A=None, fs=None, novertones=0.): fname = f'{self.pneuron.name}_lookups' if a is not None: fname += f'_{a * 1e9:.0f}nm' if f is not None: fname += f'_{f * 1e-3:.0f}kHz' if A is not None: fname += f'_{A * 1e-3:.0f}kPa' if fs is not None: fname += f'_fs{fs:.2f}' if novertones > 0: fname += f'_{novertones}overtones' return f'{fname}.pkl' def getLookupFilePath(self, *args, **kwargs): return os.path.join(LOOKUP_DIR, self.getLookupFileName(*args, **kwargs)) def getLookup(self, *args, **kwargs): keep_tcomp = kwargs.pop('keep_tcomp', False) lookup_path = self.getLookupFilePath(*args, **kwargs) lkp = EffectiveVariablesLookup.fromPickle(lookup_path) if not keep_tcomp: del lkp.tables['tcomp'] return lkp def getLookup2D(self, f, fs): proj_kwargs = {'a': self.a, 'f': f, 'fs': fs} proj_str = f'a = {si_format(self.a)}m, f = {si_format(f)}Hz, fs = {fs * 1e2:.0f}%' logger.debug(f'loading {self.pneuron} lookup for {proj_str}') if fs < 1.: kwargs = proj_kwargs.copy() kwargs['fs'] = None else: kwargs = {'fs': fs} return self.getLookup(**kwargs).projectN(proj_kwargs) def fullDerivatives(self, t, y, drive, fs): ''' Compute the full system derivatives. :param t: specific instant in time (s) :param y: vector of state variables :param drive: acoustic drive object :param fs: sonophore membrane coverage fraction (-) :return: vector of derivatives ''' dydt_mech = BilayerSonophore.derivatives( self, t, y[:3], drive, y[3]) dydt_elec = self.pneuron.derivatives( t, y[3:], Cm=self.spatialAverage(fs, self.capacitance(y[1]), self.Cm0)) return dydt_mech + dydt_elec def effDerivatives(self, t, y, lkp1d, qss_vars): ''' Compute the derivatives of the n-ODE effective system variables, based on 1-dimensional linear interpolation of "effective" coefficients that summarize the system's behaviour over an acoustic cycle. :param t: specific instant in time (s) :param y: vector of HH system variables at time t :param lkp: dictionary of 1D data points of "effective" coefficients over the charge domain, for specific frequency and amplitude values. :param qss_vars: list of QSS variables :return: vector of effective system derivatives at time t ''' # Unpack values and interpolate lookup at current charge density Qm, *states = y lkp0d = lkp1d.interpolate1D(Qm) # Compute states dictionary from differential and QSS variables states_dict = {} i = 0 for k in self.pneuron.statesNames(): if k in qss_vars: states_dict[k] = self.pneuron.quasiSteadyStates()[k](lkp0d) else: states_dict[k] = states[i] i += 1 # Compute charge density derivative dQmdt = - self.pneuron.iNet(lkp0d['V'], states_dict) * 1e-3 # Compute states derivative vector only for differential variable dstates = [] for k in self.pneuron.statesNames(): if k not in qss_vars: dstates.append(self.pneuron.derEffStates()[k](lkp0d, states_dict)) return [dQmdt, *dstates] def deflectionDependentVm(self, Qm, Z, fs): ''' Compute deflection (and sonopphore coverage fraction) dependent voltage profile. ''' return Qm / self.spatialAverage(fs, self.v_capacitance(Z), self.Cm0) * 1e3 # mV def fullInitialConditions(self, *args, **kwargs): ''' Compute simulation initial conditions. ''' y0 = super().initialConditions(*args, **kwargs) y0.update({ 'Qm': [self.Qm0] * 2, **{k: [self.pneuron.steadyStates()[k](self.pneuron.Vm0)] * 2 for k in self.pneuron.statesNames()} }) return y0 def __simFull(self, drive, pp, fs): # Compute initial conditions y0 = self.fullInitialConditions(drive, self.Qm0, drive.dt) # Initialize solver and compute solution solver = EventDrivenSolver( lambda x: setattr(solver.drive, 'xvar', drive.xvar * x), # eventfunc y0.keys(), # variables list lambda t, y: self.fullDerivatives(t, y, solver.drive, fs), # dfunc event_params={'drive': drive.copy().updatedX(0.)}, # event parameters dt=drive.dt) # time step data = solver( y0, pp.stimEvents(), pp.tstop, target_dt=CLASSIC_TARGET_DT, log_period=pp.tstop / 100 if logger.getEffectiveLevel() <= logging.INFO else None, # logfunc=lambda y: f'Qm = {y[3] * 1e5:.2f} nC/cm2' ) # Remove velocity and add voltage timeseries to solution del data['U'] data = addColumn( data, 'Vm', self.deflectionDependentVm(data['Qm'], data['Z'], fs), preceding_key='Qm') # Return solution dataframe return data def __simHybrid(self, drive, pp, fs): # Compute initial conditions y0 = self.fullInitialConditions(drive, self.Qm0, drive.dt) # Initialize solver and compute solution solver = HybridSolver( y0.keys(), lambda t, y: self.fullDerivatives(t, y, solver.drive, fs), # dfunc lambda t, y, Cm: self.pneuron.derivatives( t, y, Cm=self.spatialAverage(fs, Cm, self.Cm0)), # dfunc_sparse lambda yref: self.capacitance(yref[1]), # predfunc lambda x: setattr(solver.drive, 'xvar', drive.xvar * x), # eventfunc drive.periodicity, # periodicity ['U', 'Z', 'ng'], # fast-evolving variables drive.dt, # dense time step drive.dt_sparse, # sparse time step event_params={'drive': drive.copy().updatedX(0.)}, # event parameters primary_vars=['Z', 'ng'] # primary variables ) data = solver( y0, pp.stimEvents(), pp.tstop, HYBRID_UPDATE_INTERVAL, target_dt=CLASSIC_TARGET_DT, log_period=pp.tstop / 100 if logger.getEffectiveLevel() < logging.INFO else None, logfunc=lambda y: f'Qm = {y[3] * 1e5:.2f} nC/cm2' ) # Remove velocity and add voltage timeseries to solution del data['U'] data = addColumn( data, 'Vm', self.deflectionDependentVm(data['Qm'], data['Z'], fs), preceding_key='Qm') # Return solution dataframe return data def __simSonic(self, drive, pp, fs, qss_vars=None, pavg=False): # Load appropriate 2D lookup lkp = self.getLookup2D(drive.f, fs) # Adapt lookup and pulsing protocol if pulse-average mode is selected if pavg: lkp = lkp * pp.DC + lkp.project('A', 0.).tile('A', lkp.refs['A']) * (1 - pp.DC) tstim = (int(pp.tstim * pp.PRF) - 1 + pp.DC) / pp.PRF pp = TimeProtocol(tstim, pp.tstim + pp.toffset - tstim) # Determine QSS and differential variables, and create optional QSS lookup if qss_vars is None: qss_vars = [] else: lkp_QSS = EffectiveVariablesLookup( lkp.refs, {k: self.pneuron.quasiSteadyStates()[k](lkp) for k in qss_vars}) diff_vars = [item for item in self.pneuron.statesNames() if item not in qss_vars] # Set initial conditions y0 = { 'Qm': self.Qm0, **{k: self.pneuron.steadyStates()[k](self.pneuron.Vm0) for k in diff_vars} } # Initialize solver and compute solution solver = EventDrivenSolver( lambda x: setattr(solver, 'lkp', lkp.project('A', drive.xvar * x)), # eventfunc y0.keys(), # variables list lambda t, y: self.effDerivatives(t, y, solver.lkp, qss_vars), # dfunc event_params={'lkp': lkp.project('A', 0.)}, # event parameters dt=self.pneuron.chooseTimeStep()) # time step data = solver( y0, pp.stimEvents(), pp.tstop, log_period=pp.tstop / 100 if pp.tstop >= 5 else None, max_nsamples=MAX_NSAMPLES_EFFECTIVE) # Interpolate Vm and QSS variables along charge vector and store them in solution dataframe data = addColumn( data, 'Vm', self.interpEffVariable('V', data['Qm'], data['stimstate'] * drive.A, lkp), preceding_key='Qm') for k in qss_vars: data[k] = self.interpEffVariable(k, data['Qm'], data['stimstate'] * drive.A, lkp_QSS) # Add dummy deflection and gas content vectors to solution for key in ['Z', 'ng']: data[key] = np.full(data['t'].size, np.nan) # Return solution dataframe return data def intMethods(self): ''' Listing of model integration methods. ''' return { 'full': self.__simFull, 'hybrid': self.__simHybrid, 'sonic': self.__simSonic } @classmethod @Model.checkOutputDir def simQueue(cls, freqs, amps, durations, offsets, PRFs, DCs, fs, methods, qss_vars, **kwargs): ''' Create a serialized 2D array of all parameter combinations for a series of individual parameter sweeps, while avoiding repetition of CW protocols for a given PRF sweep. :param freqs: list (or 1D-array) of US frequencies :param amps: list (or 1D-array) of acoustic amplitudes :param durations: list (or 1D-array) of stimulus durations :param offsets: list (or 1D-array) of stimulus offsets (paired with durations array) :param PRFs: list (or 1D-array) of pulse-repetition frequencies :param DCs: list (or 1D-array) of duty cycle values :param fs: sonophore membrane coverage fractions (-) :params methods: integration methods :param qss_vars: QSS variables :return: list of parameters (list) for each simulation ''' if ('full' in methods or 'hybrid' in methods) and kwargs['outputdir'] is None: logger.warning('Running cumbersome simulation(s) without file saving') if amps is None: amps = [None] drives = AcousticDrive.createQueue(freqs, amps) protocols = PulsedProtocol.createQueue(durations, offsets, PRFs, DCs) queue = [] for drive in drives: for pp in protocols: for cov in fs: for method in methods: queue.append([drive, pp, cov, method, qss_vars]) return queue @classmethod @Model.checkOutputDir def simQueueBurst(cls, freqs, amps, durations, PRFs, DCs, BRFs, nbursts, fs, methods, qss_vars, **kwargs): if ('full' in methods or 'hybrid' in methods) and kwargs['outputdir'] is None: logger.warning('Running cumbersome simulation(s) without file saving') if amps is None: amps = [None] drives = AcousticDrive.createQueue(freqs, amps) protocols = BurstProtocol.createQueue(durations, PRFs, DCs, BRFs, nbursts) queue = [] for drive in drives: for pp in protocols: for cov in fs: for method in methods: queue.append([drive, pp, cov, method, qss_vars]) return queue def checkInputs(self, drive, pp, fs, method, qss_vars): PointNeuron.checkInputs(drive, pp) _, xevents, = zip(*pp.stimEvents()) if np.any(np.array([xevents]) < 0.): raise ValueError('Invalid time protocol: contains negative modulators') if not isinstance(fs, float): raise TypeError(f'Invalid "fs" parameter (must be float typed)') if qss_vars is not None: if not isIterable(qss_vars) or not isinstance(qss_vars[0], str): raise ValueError('Invalid QSS variables: must be None or an iterable of strings') sn = self.pneuron.statesNames() for item in qss_vars: if item not in sn: raise ValueError(f'Invalid QSS variable: {item} (must be in {sn}') if method not in list(self.intMethods().keys()): raise ValueError(f'Invalid integration method: "{method}"') @Model.logNSpikes @Model.checkTitrate @Model.addMeta @Model.logDesc @Model.checkSimParams def simulate(self, drive, pp, fs=1., method='sonic', qss_vars=None): ''' Simulate the electro-mechanical model for a specific set of US stimulation parameters, and return output data in a dataframe. :param drive: acoustic drive object :param pp: pulse protocol object :param fs: sonophore membrane coverage fraction (-) :param method: selected integration method :return: output dataframe ''' # Set the tissue elastic modulus self.setTissueModulus(drive) # Call appropriate simulation function and return simfunc = self.intMethods()[method] simargs = [drive, pp, fs] if method == 'sonic': simargs.append(qss_vars) return simfunc(*simargs) def desc(self, meta): method = meta['method'] if 'method' in meta else meta['model']['method'] fs = meta['fs'] if 'fs' in meta else meta['model']['fs'] s = f'{self}: {method} simulation @ {meta["drive"].desc}, {meta["pp"].desc}' if fs < 1.0: s += f', fs = {(fs * 1e2):.2f}%' if 'qss_vars' in meta and meta['qss_vars'] is not None: s += f" - QSS ({', '.join(meta['qss_vars'])})" return s @staticmethod def getNSpikes(data): return PointNeuron.getNSpikes(data) def getArange(self, drive): return (0., self.getLookup().refs['A'].max()) @property def titrationFunc(self): return self.pneuron.titrationFunc @logCache(os.path.join(os.path.split(__file__)[0], 'astim_titrations.log')) def titrate(self, drive, pp, fs=1., method='sonic', qss_vars=None, xfunc=None, Arange=None): ''' Use a binary search to determine the threshold amplitude needed to obtain neural excitation for a given frequency and pulsed protocol. :param drive: unresolved acoustic drive object :param pp: pulse protocol object :param fs: sonophore membrane coverage fraction (-) :param method: integration method :return: determined threshold amplitude (Pa) ''' return super().titrate(drive, pp, fs=fs, method=method, qss_vars=qss_vars, xfunc=xfunc, Arange=Arange) def getQuasiSteadyStates(self, f, amps=None, charges=None, DC=1.0, squeeze_output=False): ''' Compute the quasi-steady state values of the neuron's gating variables for a combination of US amplitudes, charge densities, at a specific US frequency and duty cycle. :param f: US frequency (Hz) :param amps: US amplitudes (Pa) :param charges: membrane charge densities (C/m2) :param DC: duty cycle :return: 4-tuple with reference values of US amplitude and charge density, as well as interpolated Vmeff and QSS gating variables ''' # Get DC-averaged lookups interpolated at the appropriate amplitudes and charges lkp = self.getLookup().projectDC(amps=amps, DC=DC).projectN({'a': self.a, 'f': f}) if charges is not None: lkp = lkp.project('Q', charges) # Specify dimensions with A as the first axis lkp.move('A', 0) # Compute QSS states using these lookups QSS = EffectiveVariablesLookup( lkp.refs, {k: v(lkp) for k, v in self.pneuron.quasiSteadyStates().items()}) # Compress outputs if needed if squeeze_output: QSS = QSS.squeeze() lkp = lkp.squeeze() return lkp, QSS def iNetQSS(self, Qm, f, A, DC): ''' Compute quasi-steady state net membrane current for a given combination of US parameters and a given membrane charge density. :param Qm: membrane charge density (C/m2) :param f: US frequency (Hz) :param A: US amplitude (Pa) :param DC: duty cycle (-) :return: net membrane current (mA/m2) ''' lkp, QSS = self.getQuasiSteadyStates( f, amps=A, charges=Qm, DC=DC, squeeze_output=True) return self.pneuron.iNet(lkp['V'], QSS) # mA/m2 def fixedPointsQSS(self, f, A, DC, lkp, dQdt): ''' Compute QSS fixed points along the charge dimension for a given combination of US parameters, and determine their stability. :param f: US frequency (Hz) :param A: US amplitude (Pa) :param DC: duty cycle (-) :param lkp: lookup dictionary for effective variables along charge dimension :param dQdt: charge derivative profile along charge dimension :return: 2-tuple with values of stable and unstable fixed points ''' logger.debug(f'A = {A * 1e-3:.2f} kPa, DC = {DC * 1e2:.0f}%') # Extract fixed points from QSS charge variation profile def dfunc(Qm): return - self.iNetQSS(Qm, f, A, DC) fixed_points = getFixedPoints( lkp.refs['Q'], dQdt, filter='both', der_func=dfunc).tolist() dfunc = lambda x: np.array(self.effDerivatives(_, x, lkp)) # classified_fixed_points = {'stable': [], 'unstable': [], 'saddle': []} classified_fixed_points = [] np.set_printoptions(precision=2) # For each fixed point for i, Qm in enumerate(fixed_points): # Re-compute QSS at fixed point *_, QSS = self.getQuasiSteadyStates(f, amps=A, charges=Qm, DC=DC, squeeze_output=True) # Classify fixed point stability by numerically evaluating its Jacobian and # computing its eigenvalues x = np.array([Qm, *QSS.tables.values()]) eigvals, key = classifyFixedPoint(x, dfunc) # classified_fixed_points[key].append(Qm) classified_fixed_points.append((x, eigvals, key)) # eigenvalues.append(eigvals) logger.debug(f'{key} point @ Q = {(Qm * 1e5):.1f} nC/cm2') # eigenvalues = np.array(eigenvalues).T # print(eigenvalues.shape) return classified_fixed_points def isStableQSS(self, f, A, DC): lookups, QSS = self.getQuasiSteadyStates( f, amps=A, DCs=DC, squeeze_output=True) dQdt = -self.pneuron.iNet(lookups['V'], QSS.tables) # mA/m2 classified_fixed_points = self.fixedPointsQSS(f, A, DC, lookups, dQdt) return len(classified_fixed_points['stable']) > 0 class DrivenNeuronalBilayerSonophore(NeuronalBilayerSonophore): simkey = 'DASTIM' # keyword used to characterize simulations made with this model def __init__(self, Idrive, *args, **kwargs): self.Idrive = Idrive super().__init__(*args, **kwargs) def __repr__(self): return super().__repr__()[:-1] + f', Idrive = {self.Idrive:.2f} mA/m2)' @classmethod def initFromMeta(cls, meta): return cls(meta['Idrive'], meta['a'], getPointNeuron(meta['neuron']), embedding_depth=meta['d']) def params(self): return {**{'Idrive': self.Idrive}, **super().params()} @staticmethod def inputs(): return { **NeuronalBilayerSonophore.inputs(), 'Idrive': ElectricDrive.inputs()['I'] } @property def meta(self): return { **super().meta, 'Idrive': self.Idrive } def filecodes(self, *args): return { **super().filecodes(*args), 'Idrive': f'Idrive{self.Idrive:.1f}mAm2' } def fullDerivatives(self, *args): dydt = super().fullDerivatives(*args) dydt[3] += self.Idrive * 1e-3 return dydt def effDerivatives(self, *args): dQmdt, *dstates = super().effDerivatives(*args) dQmdt += self.Idrive * 1e-3 return [dQmdt, *dstates] diff --git a/scripts/run_lookups.py b/scripts/run_lookups.py index bf553ab..25b5c58 100644 --- a/scripts/run_lookups.py +++ b/scripts/run_lookups.py @@ -1,220 +1,233 @@ # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Email: theo.lemaire@epfl.ch # @Date: 2017-06-02 17:50:10 # @Last Modified by: Theo Lemaire -# @Last Modified time: 2021-03-23 21:34:38 +# @Last Modified time: 2021-03-26 19:50:50 ''' Create lookup table for specific neuron. ''' import os import itertools import logging import numpy as np from PySONIC.utils import logger, isIterable, alert from PySONIC.core import NeuronalBilayerSonophore, Batch, Lookup, AcousticDrive from PySONIC.parsers import MechSimParser from PySONIC.constants import DQ_LOOKUP # @alert def computeAStimLookup(pneuron, aref, fref, Aref, fsref, Qref, novertones=0, test=False, mpi=False, loglevel=logging.INFO): ''' Run simulations of the mechanical system for a multiple combinations of imposed sonophore radius, US frequencies, acoustic amplitudes charge densities and (spatially-averaged) sonophore membrane coverage fractions, compute effective coefficients and store them in a dictionary of n-dimensional arrays. :param pneuron: point-neuron model :param aref: array of sonophore radii (m) :param fref: array of acoustic drive frequencies (Hz) :param Aref: array of acoustic drive amplitudes (Pa) :param Qref: array of membrane charge densities (C/m2) :param fsref: acoustic drive phase (rad) :param mpi: boolean statting wether or not to use multiprocessing :param loglevel: logging level :return: lookups dictionary ''' descs = { 'a': 'sonophore radii', 'f': 'US frequencies', 'A': 'US amplitudes', 'fs': 'sonophore membrane coverage fractions', 'overtones': 'charge Fourier overtones' } # Populate reference vectors dictionary refs = { 'a': aref, # nm 'f': fref, # Hz 'A': Aref, # Pa 'Q': Qref # C/m2 } err_span = 'cannot span {} for more than 1 {}' # If multiple sonophore coverage values, ensure that only 1 value of # sonophore radius and US frequency are provided if fsref.size > 1 or fsref[0] != 1.: for x in ['a', 'f']: assert refs[x].size == 1, err_span.format(descs['fs'], descs[x]) # Add sonophore coverage vector to references refs['fs'] = fsref # If charge overtones are required, ensure that only 1 value of # sonophore radius, US frequency and coverage fraction are provided if novertones > 0: for x in ['a', 'f', 'fs']: assert refs[x].size == 1, err_span.format(descs['overtones'], descs[x]) # If charge overtones are required, downsample charge and US amplitude input vectors if novertones > 0: nQmax = 50 if len(refs['Q']) > nQmax: refs['Q'] = np.linspace(refs['Q'][0], refs['Q'][-1], nQmax) nAmax = 15 if len(refs['A']) > nAmax: refs['A'] = np.insert( np.logspace(np.log10(refs['A'][1]), np.log10(refs['A'][-1]), num=nAmax - 1), 0, 0.0) # If test case, reduce all vector dimensions to their instrinsic bounds if test: refs = {k: np.array([v.min(), v.max()]) if v.size > 1 else v for k, v in refs.items()} # Check validity of all reference vectors for key, values in refs.items(): if not isIterable(values): raise TypeError(f'Invalid {descs[key]} (must be provided as list or numpy array)') if not all(isinstance(x, float) for x in values): raise TypeError(f'Invalid {descs[key]} (must all be float typed)') if len(values) == 0: raise ValueError(f'Empty {key} array') if key in ('a', 'f') and min(values) <= 0: raise ValueError(f'Invalid {descs[key]} (must all be strictly positive)') if key in ('A', 'fs') and min(values) < 0: raise ValueError(f'Invalid {descs[key]} (must all be positive or null)') - # Get references dimensions - dims = np.array([x.size for x in refs.values()]) - # Create simulation queue per sonophore radius drives = AcousticDrive.createQueue(refs['f'], refs['A']) queue = [] for drive in drives: for Qm in refs['Q']: queue.append([drive, refs['fs'], Qm]) # Add charge overtones to queue if required if novertones > 0: + # Default references nAQ, nphiQ = 5, 5 AQ_ref = np.linspace(0, 100e-5, nAQ) # C/m2 phiQ_ref = np.linspace(0, 2 * np.pi, nphiQ, endpoint=False) # rad + # Downsample if test mode is on if test: AQ_ref = np.array([AQ_ref.min(), AQ_ref.max()]) phiQ_ref = np.array([phiQ_ref.min(), phiQ_ref.max()]) - Qovertones_dims = [] + # Construct refs dict specific to Qm overtones + Qovertones_refs = {} for i in range(novertones): - Qovertones_dims += [AQ_ref, phiQ_ref] - Qovertones = Batch.createQueue(*Qovertones_dims) + Qovertones_refs[f'AQ{i + 1}'] = AQ_ref + Qovertones_refs[f'phiQ{i + 1}'] = phiQ_ref + # Create associated batch queue + Qovertones = Batch.createQueue(*Qovertones_refs.values()) Qovertones = [list(zip(x, x[1:]))[::2] for x in Qovertones] + # Merge with main queue (moving Qm overtones into kwargs) queue = list(itertools.product(queue, Qovertones)) queue = [(x[0], {'Qm_overtones': x[1]}) for x in queue] + # Update main refs dict, and reset 'fs' as last dictionary key + refs.update(Qovertones_refs) + refs['fs'] = refs.pop('fs') + + # Get references dimensions + dims = np.array([x.size for x in refs.values()]) - # Print queue (or redcued view of it) + # Print queue (or reduced view of it) + logger.info('batch queue:') Batch.printQueue(queue) # Run simulations and populate outputs logger.info('Starting simulation batch for %s neuron', pneuron.name) outputs = [] for a in aref: nbls = NeuronalBilayerSonophore(a, pneuron) batch = Batch(nbls.computeEffVars, queue) outputs += batch(mpi=mpi, loglevel=loglevel) # Split comp times and effvars from outputs effvars, tcomps = [list(x) for x in zip(*outputs)] effvars = list(itertools.chain.from_iterable(effvars)) # Make sure outputs size matches inputs dimensions product nout = len(effvars) - assert nout == dims.prod(), 'Number of outputs does not match number of combinations' + ncombs = dims.prod() + if nout != ncombs: + raise ValueError( + f'Number of outputs ({nout}) does not match number of input combinations ({ncombs})') # Reshape effvars into nD arrays and add them to lookups dictionary - logger.info('Reshaping output into lookup tables') + logger.info(f'Reshaping {nout}-entries output into {tuple(dims)} lookup tables') varkeys = list(effvars[0].keys()) tables = {} for key in varkeys: effvar = [effvars[i][key] for i in range(nout)] tables[key] = np.array(effvar).reshape(dims) # Reshape computation times, tile over extra fs dimension, and add it as a lookup table tcomps = np.array(tcomps).reshape(dims[:-1]) tcomps = np.moveaxis(np.array([tcomps for i in range(dims[-1])]), 0, -1) tables['tcomp'] = tcomps # Construct and return lookup object return Lookup(refs, tables) def main(): parser = MechSimParser(outputdir='.') parser.addNeuron() parser.addTest() parser.defaults['neuron'] = 'RS' parser.defaults['radius'] = np.array([16.0, 32.0, 64.0]) # nm parser.defaults['freq'] = np.array([20., 100., 500., 1e3, 2e3, 3e3, 4e3]) # kHz parser.defaults['amp'] = np.insert( np.logspace(np.log10(0.1), np.log10(600), num=50), 0, 0.0) # kPa parser.defaults['charge'] = np.nan parser.add_argument('--novertones', type=int, default=0, help='Number of Fourier overtones') args = parser.parse() logger.setLevel(args['loglevel']) for pneuron in args['neuron']: # Determine charge vector charges = args['charge'] if charges.size == 1 and np.isnan(charges[0]): Qmin, Qmax = pneuron.Qbounds charges = np.arange(Qmin, Qmax + DQ_LOOKUP, DQ_LOOKUP) # C/m2 # Number of Fourier overtones novertones = args['novertones'] # Determine output filename input_args = {'a': args['radius'], 'f': args['freq'], 'A': args['amp'], 'fs': args['fs']} fname_args = {k: v[0] if v.size == 1 else None for k, v in input_args.items()} fname_args['novertones'] = novertones lookup_fpath = NeuronalBilayerSonophore(32e-9, pneuron).getLookupFilePath(**fname_args) # Combine inputs into single list inputs = [args[x] for x in ['radius', 'freq', 'amp', 'fs']] + [charges] # Adapt inputs and output filename if test case if args['test']: fcode, fext = os.path.splitext(lookup_fpath) lookup_fpath = f'{fcode}_test{fext}' # Check if lookup file already exists if os.path.isfile(lookup_fpath): logger.warning( f'"{lookup_fpath}" file already exists and will be overwritten. Continue? (y/n)') user_str = input() if user_str not in ['y', 'Y']: logger.error('%s Lookup creation canceled', pneuron.name) return # Compute lookup lkp = computeAStimLookup(pneuron, *inputs, novertones=novertones, test=args['test'], mpi=args['mpi'], loglevel=args['loglevel']) logger.info(f'Generated lookup: {lkp}') # Save lookup in PKL file logger.info('Saving %s neuron lookup in file: "%s"', pneuron.name, lookup_fpath) lkp.toPickle(lookup_fpath) if __name__ == '__main__': main()