diff --git a/PySONIC/core/bls.py b/PySONIC/core/bls.py index cf42d83..8b40cc5 100644 --- a/PySONIC/core/bls.py +++ b/PySONIC/core/bls.py @@ -1,809 +1,808 @@ # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Email: theo.lemaire@epfl.ch # @Date: 2016-09-29 16:16:19 # @Last Modified by: Theo Lemaire -# @Last Modified time: 2020-04-22 15:22:26 +# @Last Modified time: 2020-04-29 12:05:14 from enum import Enum import os import json import numpy as np import pandas as pd import scipy.integrate as integrate from scipy.optimize import brentq, curve_fit from .model import Model from .solvers import PeriodicSolver from .drives import Drive, AcousticDrive from ..utils import logger, si_format from ..constants import * class PmCompMethod(Enum): ''' Enum: types of computation method for the intermolecular pressure ''' direct = 1 predict = 2 def LennardJones(x, beta, alpha, C, m, n): ''' Generic expression of a Lennard-Jones function, adapted for the context of symmetric deflection (distance = 2x). :param x: deflection (i.e. half-distance) :param beta: x-shifting factor :param alpha: x-scaling factor :param C: y-scaling factor :param m: exponent of the repulsion term :param n: exponent of the attraction term :return: Lennard-Jones potential at given distance (2x) ''' return C * (np.power((alpha / (2 * x + beta)), m) - np.power((alpha / (2 * x + beta)), n)) def lookup(func): ''' Load parameters from lookup file, or compute them and store them in lookup file. ''' lookup_path = os.path.join(os.path.split(__file__)[0], 'bls_lookups.json') def wrapper(obj): akey = f'{obj.a * 1e9:.1f}' Qkey = f'{obj.Qm0 * 1e5:.2f}' # Open lookup files try: with open(lookup_path, 'r') as fh: lookups = json.load(fh) except FileNotFoundError: lookups = {} # If info not in lookups, compute parameters and add them if akey not in lookups or Qkey not in lookups[akey]: func(obj) if akey not in lookups: lookups[akey] = {Qkey: {'LJ_approx': obj.LJ_approx, 'Delta_eq': obj.Delta}} else: lookups[akey][Qkey] = {'LJ_approx': obj.LJ_approx, 'Delta_eq': obj.Delta} logger.debug('Saving BLS derived parameters to lookup file') with open(lookup_path, 'w') as fh: json.dump(lookups, fh, indent=2) # If lookup exists, load parameters from it else: logger.debug('Loading BLS derived parameters from lookup file') obj.LJ_approx = lookups[akey][Qkey]['LJ_approx'] obj.Delta = lookups[akey][Qkey]['Delta_eq'] return wrapper class BilayerSonophore(Model): ''' Definition of the Bilayer Sonophore Model - geometry - pressure terms - cavitation dynamics ''' # BIOMECHANICAL PARAMETERS T = 309.15 # Temperature (K) delta0 = 2.0e-9 # Thickness of the leaflet (m) Delta_ = 1.4e-9 # Initial gap between the two leaflets on a non-charged membrane at equil. (m) pDelta = 1.0e5 # Attraction/repulsion pressure coefficient (Pa) m = 5.0 # Exponent in the repulsion term (dimensionless) n = 3.3 # Exponent in the attraction term (dimensionless) rhoL = 1075.0 # Density of the surrounding fluid (kg/m^3) muL = 7.0e-4 # Dynamic viscosity of the surrounding fluid (Pa.s) muS = 0.035 # Dynamic viscosity of the leaflet (Pa.s) kA = 0.24 # Area compression modulus of the leaflet (N/m) alpha = 7.56 # Tissue shear loss modulus frequency coefficient (Pa.s) C0 = 0.62 # Initial gas molar concentration in the surrounding fluid (mol/m^3) kH = 1.613e5 # Henry's constant (Pa.m^3/mol) P0 = 1.0e5 # Static pressure in the surrounding fluid (Pa) Dgl = 3.68e-9 # Diffusion coefficient of gas in the fluid (m^2/s) xi = 0.5e-9 # Boundary layer thickness for gas transport across leaflet (m) c = 1515.0 # Speed of sound in medium (m/s) # BIOPHYSICAL PARAMETERS epsilon0 = 8.854e-12 # Vacuum permittivity (F/m) epsilonR = 1.0 # Relative permittivity of intramembrane cavity (dimensionless) rel_Zmin = -0.49 # relative deflection range lower bound (in multiples of Delta) tscale = 'us' # relevant temporal scale of the model simkey = 'MECH' # keyword used to characterize simulations made with this model def __init__(self, a, Cm0, Qm0, embedding_depth=0.0): ''' Constructor of the class. :param a: in-plane radius of the sonophore structure within the membrane (m) :param Cm0: membrane resting capacitance (F/m2) :param Qm0: membrane resting charge density (C/m2) :param embedding_depth: depth of the embedding tissue around the membrane (m) ''' # Extract resting constants and geometry self.Cm0 = Cm0 self.Qm0 = Qm0 self.a = a self.d = embedding_depth self.S0 = np.pi * self.a**2 # Initialize null elastic modulus for tissue self.kA_tissue = 0. # Compute Pm params self.computePMparams() # Compute initial volume and gas content self.V0 = np.pi * self.Delta * self.a**2 self.ng0 = self.gasPa2mol(self.P0, self.V0) def copy(self): return self.__class__(self.a, self.Cm0, self.Qm0, embedding_depth=self.d) @property def a(self): return self._a @a.setter def a(self, value): if value <= 0.: raise ValueError('Sonophore radius must be positive') self._a = value @property def Cm0(self): return self._Cm0 @Cm0.setter def Cm0(self, value): if value <= 0.: raise ValueError('Resting membrane capacitance must be positive') self._Cm0 = value @property def d(self): return self._d @d.setter def d(self, value): if value < 0.: raise ValueError('Embedding depth cannot be negative') self._d = value def __repr__(self): s = f'{self.__class__.__name__}({self.a * 1e9:.1f} nm' if self.d > 0.: s += f', d={si_format(self.d, precision=1)}m' return f'{s})' @property def meta(self): return { 'a': self.a, 'd': self.d, 'Cm0': self.Cm0, 'Qm0': self.Qm0, } @classmethod def initFromMeta(cls, d): return cls(d['a'], d['Cm0'], d['Qm0']) @staticmethod def inputs(): return { 'a': { 'desc': 'sonophore radius', 'label': 'a', - 'unit': 'nm', - 'factor': 1e9, + 'unit': 'm', 'precision': 0 }, 'Qm': { 'desc': 'membrane charge density', 'label': 'Q_m', 'unit': 'nC/cm^2', 'factor': 1e5, 'precision': 1 }, **AcousticDrive.inputs() } def filecodes(self, drive, Qm, PmCompMethod='predict'): return { 'simkey': self.simkey, 'a': f'{self.a * 1e9:.0f}nm', **drive.filecodes, 'Qm': f'{Qm * 1e5:.1f}nCcm2' } @staticmethod def getPltVars(wrapleft='df["', wrapright='"]'): return { 'Pac': { 'desc': 'acoustic pressure', 'label': 'P_{AC}', 'unit': 'kPa', 'factor': 1e-3, 'func': f'meta["drive"].compute({wrapleft}t{wrapright})' }, 'Z': { 'desc': 'leaflets deflection', 'label': 'Z', 'unit': 'nm', 'factor': 1e9, 'bounds': (-1.0, 10.0) }, 'ng': { 'desc': 'gas content', 'label': 'n_g', 'unit': '10^{-22}\ mol', 'factor': 1e22, 'bounds': (1.0, 15.0) }, 'Pmavg': { 'desc': 'average intermolecular pressure', 'label': 'P_M', 'unit': 'kPa', 'factor': 1e-3, 'func': f'PMavgpred({wrapleft}Z{wrapright})' }, 'Telastic': { 'desc': 'leaflet elastic tension', 'label': 'T_E', 'unit': 'mN/m', 'factor': 1e3, 'func': f'TEleaflet({wrapleft}Z{wrapright})' }, 'Cm': { 'desc': 'membrane capacitance', 'label': 'C_m', 'unit': 'uF/cm^2', 'factor': 1e2, 'bounds': (0.0, 1.5), 'func': f'v_capacitance({wrapleft}Z{wrapright})' } } @property def pltScheme(self): return { 'P_{AC}': ['Pac'], 'Z': ['Z'], 'n_g': ['ng'] } @property def Zmin(self): return self.rel_Zmin * self.Delta def curvrad(self, Z): ''' Leaflet curvature radius (signed variable) :param Z: leaflet apex deflection (m) :return: leaflet curvature radius (m) ''' if Z == 0.0: return np.inf else: return (self.a**2 + Z**2) / (2 * Z) def v_curvrad(self, Z): ''' Vectorized curvrad function ''' return np.array(list(map(self.curvrad, Z))) def surface(self, Z): ''' Surface area of the stretched leaflet (spherical cap formula) :param Z: leaflet apex deflection (m) :return: stretched leaflet surface (m^2) ''' return np.pi * (self.a**2 + Z**2) def volume(self, Z): ''' Volume of the inter-leaflet space (cylinder +/- 2 spherical caps) :param Z: leaflet apex deflection (m) :return: bilayer sonophore inner volume (m^3) ''' return np.pi * self.a**2 * self.Delta\ * (1 + (Z / (3 * self.Delta) * (3 + Z**2 / self.a**2))) def arealStrain(self, Z): ''' Areal strain of the stretched leaflet epsilon = (S - S0)/S0 = (Z/a)^2 :param Z: leaflet apex deflection (m) :return: areal strain (dimensionless) ''' return (Z / self.a)**2 def capacitance(self, Z): ''' Membrane capacitance (parallel-plate capacitor evaluated at average inter-layer distance) :param Z: leaflet apex deflection (m) :return: capacitance per unit area (F/m2) ''' if Z == 0.0: return self.Cm0 else: return ((self.Cm0 * self.Delta / self.a**2) * (Z + (self.a**2 - Z**2 - Z * self.Delta) / (2 * Z) * np.log((2 * Z + self.Delta) / self.Delta))) def v_capacitance(self, Z): ''' Vectorized capacitance function ''' return np.array(list(map(self.capacitance, Z))) def derCapacitance(self, Z, U): ''' Evolution of membrane capacitance :param Z: leaflet apex deflection (m) :param U: leaflet apex deflection velocity (m/s) :return: time derivative of capacitance per unit area (F/m2.s) ''' dCmdZ = ((self.Cm0 * self.Delta / self.a**2) * ((Z**2 + self.a**2) / (Z * (2 * Z + self.Delta)) - ((Z**2 + self.a**2) * np.log((2 * Z + self.Delta) / self.Delta)) / (2 * Z**2))) return dCmdZ * U @staticmethod def localDeflection(r, Z, R): ''' Local leaflet deflection at specific radial distance (signed) :param r: in-plane distance from center of the sonophore (m) :param Z: leaflet apex deflection (m) :param R: leaflet curvature radius (m) :return: local transverse leaflet deviation (m) ''' if np.abs(Z) == 0.0: return 0.0 else: return np.sign(Z) * (np.sqrt(R**2 - r**2) - np.abs(R) + np.abs(Z)) def PMlocal(self, r, Z, R): ''' Local intermolecular pressure :param r: in-plane distance from center of the sonophore (m) :param Z: leaflet apex deflection (m) :param R: leaflet curvature radius (m) :return: local intermolecular pressure (Pa) ''' z = self.localDeflection(r, Z, R) relgap = (2 * z + self.Delta) / self.Delta_ return self.pDelta * ((1 / relgap)**self.m - (1 / relgap)**self.n) def PMavg(self, Z, R, S): ''' Average intermolecular pressure across the leaflet (computed by quadratic integration) :param Z: leaflet apex outward deflection value (m) :param R: leaflet curvature radius (m) :param S: surface of the stretched leaflet (m^2) :return: averaged intermolecular resultant pressure (Pa) .. warning:: quadratic integration is computationally expensive. ''' # Integrate intermolecular force over an infinitely thin ring of radius r from 0 to a fTotal, _ = integrate.quad(lambda r, Z, R: 2 * np.pi * r * self.PMlocal(r, Z, R), 0, self.a, args=(Z, R)) return fTotal / S def v_PMavg(self, Z, R, S): ''' Vectorized PMavg function ''' return np.array(list(map(self.PMavg, Z, R, S))) def LJfitPMavg(self): ''' Determine optimal parameters of a Lennard-Jones expression approximating the average intermolecular pressure. These parameters are obtained by a nonlinear fit of the Lennard-Jones function for a range of deflection values between predetermined Zmin and Zmax. :return: 3-tuple with optimized LJ parameters for PmAvg prediction (Map) and the standard and max errors of the prediction in the fitting range (in Pascals) ''' # Determine lower bound of deflection range: when Pm = Pmmax PMmax = LJFIT_PM_MAX # Pa Zlb_range = (self.Zmin, 0.0) Zlb = brentq(lambda Z, Pmmax: self.PMavg(Z, self.curvrad(Z), self.surface(Z)) - PMmax, *Zlb_range, args=(PMmax), xtol=1e-16) # Create vectors for geometric variables Zub = 2 * self.a Z = np.arange(Zlb, Zub, 1e-11) Pmavg = self.v_PMavg(Z, self.v_curvrad(Z), self.surface(Z)) # Compute optimal nonlinear fit of custom LJ function with initial guess x0_guess = self.delta0 C_guess = 0.1 * self.pDelta nrep_guess = self.m nattr_guess = self.n pguess = (x0_guess, C_guess, nrep_guess, nattr_guess) popt, _ = curve_fit(lambda x, x0, C, nrep, nattr: LennardJones(x, self.Delta, x0, C, nrep, nattr), Z, Pmavg, p0=pguess, maxfev=100000) (x0_opt, C_opt, nrep_opt, nattr_opt) = popt Pmavg_fit = LennardJones(Z, self.Delta, x0_opt, C_opt, nrep_opt, nattr_opt) # Compute prediction error residuals = Pmavg - Pmavg_fit ss_res = np.sum(residuals**2) N = residuals.size std_err = np.sqrt(ss_res / N) max_err = max(np.abs(residuals)) logger.debug('LJ approx: x0 = %.2f nm, C = %.2f kPa, m = %.2f, n = %.2f', x0_opt * 1e9, C_opt * 1e-3, nrep_opt, nattr_opt) LJ_approx = {"x0": x0_opt, "C": C_opt, "nrep": nrep_opt, "nattr": nattr_opt} return (LJ_approx, std_err, max_err) @lookup def computePMparams(self): # Find Delta that cancels out Pm + Pec at Z = 0 (m) if self.Qm0 == 0.0: D_eq = self.Delta_ else: (D_eq, Pnet_eq) = self.findDeltaEq(self.Qm0) assert Pnet_eq < PNET_EQ_MAX, 'High Pnet at Z = 0 with ∆ = %.2f nm' % (D_eq * 1e9) self.Delta = D_eq # Find optimal Lennard-Jones parameters to approximate PMavg (self.LJ_approx, std_err, _) = self.LJfitPMavg() assert std_err < PMAVG_STD_ERR_MAX, 'High error in PmAvg nonlinear fit:'\ ' std_err = %.2f Pa' % std_err def PMavgpred(self, Z): ''' Approximated average intermolecular pressure (using nonlinearly fitted Lennard-Jones function) :param Z: leaflet apex deflection (m) :return: predicted average intermolecular pressure (Pa) ''' return LennardJones(Z, self.Delta, self.LJ_approx['x0'], self.LJ_approx['C'], self.LJ_approx['nrep'], self.LJ_approx['nattr']) def Pelec(self, Z, Qm): ''' Electrical pressure term :param Z: leaflet apex deflection (m) :param Qm: membrane charge density (C/m2) :return: electrical pressure (Pa) ''' relS = self.S0 / self.surface(Z) abs_perm = self.epsilon0 * self.epsilonR # F/m return - relS * Qm**2 / (2 * abs_perm) # Pa def findDeltaEq(self, Qm): ''' Compute the Delta that cancels out the (Pm + Pec) equation at Z = 0 for a given membrane charge density, using the Brent method to refine the pressure root iteratively. :param Qm: membrane charge density (C/m2) :return: equilibrium value (m) and associated pressure (Pa) ''' def dualPressure(Delta): x = (self.Delta_ / Delta) return (self.pDelta * (x**self.m - x**self.n) + self.Pelec(0.0, Qm)) Delta_eq = brentq(dualPressure, 0.1 * self.Delta_, 2.0 * self.Delta_, xtol=1e-16) logger.debug('∆eq = %.2f nm', Delta_eq * 1e9) return (Delta_eq, dualPressure(Delta_eq)) def gasFlux(self, Z, P): ''' Gas molar flux through the sonophore boundary layers :param Z: leaflet apex deflection (m) :param P: internal gas pressure (Pa) :return: gas molar flux (mol/s) ''' dC = self.C0 - P / self.kH return 2 * self.surface(Z) * self.Dgl * dC / self.xi @classmethod def gasmol2Pa(cls, ng, V): ''' Internal gas pressure for a given molar content :param ng: internal molar content (mol) :param V: sonophore inner volume (m^3) :return: internal gas pressure (Pa) ''' return ng * Rg * cls.T / V @classmethod def gasPa2mol(cls, P, V): ''' Internal gas molar content for a given pressure :param P: internal gas pressure (Pa) :param V: sonophore inner volume (m^3) :return: internal gas molar content (mol) ''' return P * V / (Rg * cls.T) def PtotQS(self, Z, ng, Qm, Pac, Pm_comp_method): ''' Net quasi-steady pressure for a given acoustic pressure (Ptot = Pm + Pg + Pec - P0 - Pac) :param Z: leaflet apex deflection (m) :param ng: internal molar content (mol) :param Qm: membrane charge density (C/m2) :param Pac: acoustic pressure (Pa) :param Pm_comp_method: computation method for average intermolecular pressure :return: total balance pressure (Pa) ''' if Pm_comp_method is PmCompMethod.direct: Pm = self.PMavg(Z, self.curvrad(Z), self.surface(Z)) elif Pm_comp_method is PmCompMethod.predict: Pm = self.PMavgpred(Z) return Pm + self.gasmol2Pa(ng, self.volume(Z)) - self.P0 - Pac + self.Pelec(Z, Qm) def balancedefQS(self, ng, Qm, Pac=0.0, Pm_comp_method=PmCompMethod.predict): ''' Quasi-steady equilibrium deflection for a given acoustic pressure (computed by approximating the root of quasi-steady pressure) :param ng: internal molar content (mol) :param Qm: membrane charge density (C/m2) :param Pac: external acoustic perturbation (Pa) :param Pm_comp_method: computation method for average intermolecular pressure :return: leaflet deflection canceling quasi-steady pressure (m) ''' Zbounds = (self.Zmin, self.a) Plb, Pub = [self.PtotQS(x, ng, Qm, Pac, Pm_comp_method) for x in Zbounds] assert (Plb > 0 > Pub), '[{}, {}] is not a sign changing interval for PtotQS'.format(*Zbounds) return brentq(self.PtotQS, *Zbounds, args=(ng, Qm, Pac, Pm_comp_method), xtol=1e-16) def TEleaflet(self, Z): ''' Elastic tension in leaflet :param Z: leaflet apex deflection (m) :return: circumferential elastic tension (N/m) ''' return self.kA * self.arealStrain(Z) def setTissueModulus(self, drive): ''' Set the frequency-dependent elastic modulus of the surrounding tissue. ''' G_tissue = self.alpha * drive.modulationFrequency # G'' (Pa) self.kA_tissue = 2 * G_tissue * self.d # kA of the tissue layer (N/m) def TEtissue(self, Z): ''' Elastic tension in surrounding viscoelastic layer :param Z: leaflet apex deflection (m) :return: circumferential elastic tension (N/m) ''' return self.kA_tissue * self.arealStrain(Z) def TEtot(self, Z): ''' Total elastic tension (leaflet + surrounding viscoelastic layer) :param Z: leaflet apex deflection (m) :return: circumferential elastic tension (N/m) ''' return self.TEleaflet(Z) + self.TEtissue(Z) def PEtot(self, Z, R): ''' Total elastic tension pressure (leaflet + surrounding viscoelastic layer) :param Z: leaflet apex deflection (m) :param R: leaflet curvature radius (m) :return: elastic tension pressure (Pa) ''' return - self.TEtot(Z) / R @classmethod def PVleaflet(cls, U, R): ''' Viscous stress pressure in leaflet :param U: leaflet apex deflection velocity (m/s) :param R: leaflet curvature radius (m) :return: leaflet viscous stress pressure (Pa) ''' return - 12 * U * cls.delta0 * cls.muS / R**2 @classmethod def PVfluid(cls, U, R): ''' Viscous stress pressure in surrounding medium :param U: leaflet apex deflection velocity (m/s) :param R: leaflet curvature radius (m) :return: fluid viscous stress pressure (Pa) ''' return - 4 * U * cls.muL / np.abs(R) @classmethod def accP(cls, Ptot, R): ''' Leaflet transverse acceleration resulting from pressure imbalance :param Ptot: net pressure (Pa) :param R: leaflet curvature radius (m) :return: pressure-driven acceleration (m/s^2) ''' return Ptot / (cls.rhoL * np.abs(R)) @staticmethod def accNL(U, R): ''' Leaflet transverse nonlinear acceleration :param U: leaflet apex deflection velocity (m/s) :param R: leaflet curvature radius (m) :return: nonlinear acceleration term (m/s^2) .. note:: A simplified version of nonlinear acceleration (neglecting dR/dH) is used here. ''' # return - (3/2 - 2*R/H) * U**2 / R return -(3 * U**2) / (2 * R) @staticmethod def checkInputs(drive, Qm, Pm_comp_method): ''' Check validity of stimulation parameters :param drive: acoustic drive object :param Qm: imposed membrane charge density (C/m2) :param Pm_comp_method: type of method used to compute average intermolecular pressure ''' if not isinstance(drive, Drive): raise TypeError(f'Invalid "drive" parameter (must be an "Drive" object)') if not isinstance(Qm, float): raise TypeError(f'Invalid "Qm" parameter (must be float typed)') Qmin, Qmax = CHARGE_RANGE if Qm < Qmin or Qm > Qmax: raise ValueError( f'Invalid applied charge: {Qm * 1e5} nC/cm2 (must be within [{Qmin * 1e5}, {Qmax * 1e5}] interval') if not isinstance(Pm_comp_method, PmCompMethod): raise TypeError('Invalid Pm computation method (must be "PmCompmethod" type)') def derivatives(self, t, y, drive, Qm, Pm_comp_method=PmCompMethod.predict): ''' Evolution of the mechanical system :param t: time instant (s) :param y: vector of HH system variables at time t :param drive: acoustic drive object :param Qm: membrane charge density (F/m2) :param Pm_comp_method: computation method for average intermolecular pressure :return: vector of mechanical system derivatives at time t ''' # Split input vector explicitly U, Z, ng = y # Correct deflection value is below critical compression if Z < self.Zmin: logger.warning('Deflection out of range: Z = %.2f nm', Z * 1e9) Z = self.Zmin # Compute curvature radius R = self.curvrad(Z) # Compute total pressure Pg = self.gasmol2Pa(ng, self.volume(Z)) if Pm_comp_method is PmCompMethod.direct: Pm = self.PMavg(Z, self.curvrad(Z), self.surface(Z)) elif Pm_comp_method is PmCompMethod.predict: Pm = self.PMavgpred(Z) Pac = drive.compute(t) Pv = self.PVleaflet(U, R) + self.PVfluid(U, R) Ptot = Pm + Pg - self.P0 - Pac + self.PEtot(Z, R) + Pv + self.Pelec(Z, Qm) # Compute derivatives dUdt = self.accP(Ptot, R) + self.accNL(U, R) dZdt = U dngdt = self.gasFlux(Z, Pg) # Return derivatives vector return [dUdt, dZdt, dngdt] def computeInitialDeflection(self, drive, Qm, dt, Pm_comp_method=PmCompMethod.predict): ''' Compute non-zero deflection value for a small perturbation (solving quasi-steady equation). ''' Pac = drive.compute(dt) return self.balancedefQS(self.ng0, Qm, Pac, Pm_comp_method) @classmethod @Model.checkOutputDir def simQueue(cls, freqs, amps, charges, **kwargs): drives = AcousticDrive.createQueue(freqs, amps) queue = [] for drive in drives: for Qm in charges: queue.append([drive, Qm]) return queue def initialConditions(self, *args, **kwargs): ''' Compute simulation initial conditions. ''' # Compute initial non-zero deflection Z = self.computeInitialDeflection(*args, **kwargs) # Return initial conditions dictionary return { 'U': [0.] * 2, 'Z': [0., Z], 'ng': [self.ng0] * 2, } def simCycles(self, drive, Qm, n=None, Pm_comp_method=PmCompMethod.predict): ''' Simulate for a specific number of cycles or until periodic stabilization, for a specific set of ultrasound parameters, and return output data in a dataframe. :param drive: acoustic drive object :param Qm: imposed membrane charge density (C/m2) :param n: number of cycles (optional) :param Pm_comp_method: type of method used to compute average intermolecular pressure :return: output dataframe ''' # Set the tissue elastic modulus self.setTissueModulus(drive) # Compute initial conditions y0 = self.initialConditions(drive, Qm, drive.dt, Pm_comp_method=Pm_comp_method) # Initialize solver and compute solution solver = PeriodicSolver( drive.periodicity, # periodicty y0.keys(), # variables list lambda t, y: self.derivatives(t, y, drive, Qm, Pm_comp_method), # dfunc primary_vars=['Z', 'ng'], # primary variables dt=drive.dt # time step ) data = solver(y0, nmax=n) # Remove velocity timeries from solution del data['U'] # Return solution dataframe return data @Model.addMeta @Model.logDesc @Model.checkSimParams def simulate(self, drive, Qm, Pm_comp_method=PmCompMethod.predict): ''' Wrapper around the simUntilConvergence method, with decorators. ''' return self.simCycles(drive, Qm, Pm_comp_method=Pm_comp_method) def desc(self, meta): return f'{self}: simulation @ {meta["drive"].desc}, Q = {si_format(meta["Qm"] * 1e-4, 2)}C/cm2' def getCycleProfiles(self, drive, Qm): ''' Simulate mechanical system and compute pressures over the last acoustic cycle :param drive: acoustic drive object :param Qm: imposed membrane charge density (C/m2) :return: dataframe with the time, kinematic and pressure profiles over the last cycle. ''' # Run default simulation and retrieve last cycle solution logger.info(f'Running mechanical simulation (a = {si_format(self.a, 1)}m, {drive.desc})') data = self.simulate( drive, Qm, Pm_comp_method=PmCompMethod.direct)[0].iloc[-drive.nPerCycle:, :] # Extract relevant variables and de-offset time vector t, Z, ng = [data[key].values for key in ['t', 'Z', 'ng']] dt = (t[-1] - t[0]) / (NPC_DENSE - 1) t -= t[0] # Compute pressure cyclic profiles logger.info('Computing pressure cyclic profiles') R = self.v_curvrad(Z) U = np.diff(Z) / dt U = np.hstack((U, U[-1])) data = { 't': t, 'Z': Z, 'Cm': self.v_capacitance(Z), 'P_M': self.v_PMavg(Z, R, self.surface(Z)), 'P_Q': self.Pelec(Z, Qm), 'P_{VE}': self.PEtot(Z, R) + self.PVleaflet(U, R), 'P_V': self.PVfluid(U, R), 'P_G': self.gasmol2Pa(ng, self.volume(Z)), 'P_0': - np.ones(Z.size) * self.P0 } return pd.DataFrame(data, columns=data.keys()) diff --git a/PySONIC/core/bls_lookups.json b/PySONIC/core/bls_lookups.json index 170917e..06c6b7d 100644 --- a/PySONIC/core/bls_lookups.json +++ b/PySONIC/core/bls_lookups.json @@ -1,1024 +1,1053 @@ { "32.0": { "-80.00": { "LJ_approx": { "x0": 1.7875580514692446e-09, "C": 14506.791031634148, "nrep": 3.911252335063797, "nattr": 0.9495868868453603 }, "Delta_eq": 1.2344323203763867e-09 }, "-71.40": { "LJ_approx": { "x0": 1.710159362626304e-09, "C": 16757.44053535206, "nrep": 3.9149844779001572, "nattr": 0.9876139143736086 }, "Delta_eq": 1.2566584760815426e-09 }, "-54.00": { "LJ_approx": { "x0": 1.588820457014353e-09, "C": 21124.28839722447, "nrep": 3.9219530533405726, "nattr": 1.0531179666960837 }, "Delta_eq": 1.302942739961778e-09 }, "-71.90": { "LJ_approx": { "x0": 1.7142977983903395e-09, "C": 16627.43538695451, "nrep": 3.9147721975981384, "nattr": 0.9855168537576823 }, "Delta_eq": 1.2553492695740507e-09 }, "-89.50": { "LJ_approx": { "x0": 1.8913883171160228e-09, "C": 12016.525797229067, "nrep": 3.9069373029335464, "nattr": 0.9021994595277029 }, "Delta_eq": 1.2107230911508513e-09 }, "-61.93": { "LJ_approx": { "x0": 1.6390264131559902e-09, "C": 19180.97634755811, "nrep": 3.9188840789597705, "nattr": 1.0250251620607604 }, "Delta_eq": 1.281743450351987e-09 }, "-53.00": { "LJ_approx": { "x0": 1.5830321174402216e-09, "C": 21361.655211483354, "nrep": 3.9223254792281588, "nattr": 1.0564645995714745 }, "Delta_eq": 1.305609024046854e-09 }, "-53.58": { "LJ_approx": { "x0": 1.5863754403940291e-09, "C": 21224.206759769622, "nrep": 3.922109852077156, "nattr": 1.0545313303221624 }, "Delta_eq": 1.3040630712174578e-09 }, "-48.87": { "LJ_approx": { "x0": 1.5603070731170595e-09, "C": 22321.280954434333, "nrep": 3.9238276518118833, "nattr": 1.0698008224359472 }, "Delta_eq": 1.3165739825437056e-09 }, "0.00": { "LJ_approx": { "x0": 1.429523524073023e-09, "C": 28748.036227122713, "nrep": 3.9338919786768276, "nattr": 1.1551044201542804 }, "Delta_eq": 1.4e-09 }, "-70.00": { "LJ_approx": { "x0": 1.698788510560293e-09, "C": 17120.631318195712, "nrep": 3.915575488491436, "nattr": 0.9934238714780391 }, "Delta_eq": 1.2603339470322538e-09 }, "-58.00": { "LJ_approx": { "x0": 1.6131662659976035e-09, "C": 20156.47605325608, "nrep": 3.9204295233162925, "nattr": 1.0393049952285334 }, "Delta_eq": 1.2922508866011204e-09 }, "-140.00": { "LJ_approx": { "x0": 3.5396589580589484e-09, "C": 1255.8321160636908, "nrep": 3.879907809497444, "nattr": 0.4190657482583384 }, "Delta_eq": 1.1019265101358682e-09 }, "-200.00": { "LJ_approx": { "x0": 5.467498689940948e-09, "C": 298.4575230665454, "nrep": 3.8382806376855165, "nattr": 0.0014805372073950717 }, "Delta_eq": 1.0050623818936587e-09 }, "-60.00": { "LJ_approx": { "x0": 1.6260780786690872e-09, "C": 19662.79538138057, "nrep": 3.9196487714944097, "nattr": 1.0321270951610706 }, "Delta_eq": 1.2869009570089227e-09 }, "-160.00": { "LJ_approx": { "x0": 5.77921444789264e-09, "C": 204.15927774362973, "nrep": 3.866389064592209, "nattr": 0.03773979793348341 }, "Delta_eq": 1.0662975449878705e-09 + }, + "10.00": { + "LJ_approx": { + "x0": 1.4352403492312405e-09, + "C": 28434.36006445571, + "nrep": 3.9333944930338163, + "nattr": 1.1510029210218344 + }, + "Delta_eq": 1.3954197265639115e-09 + }, + "20.00": { + "LJ_approx": { + "x0": 1.4521735572363723e-09, + "C": 27522.575366234458, + "nrep": 3.93195491401856, + "nattr": 1.1390890902608632 + }, + "Delta_eq": 1.3824571964886577e-09 } }, "64.0": { "-80.00": { "LJ_approx": { "x0": 1.783357531675752e-09, "C": 14639.319598806138, "nrep": 3.9113027551187414, "nattr": 0.9404151935643594 }, "Delta_eq": 1.2344323203763867e-09 }, "-71.90": { "LJ_approx": { "x0": 1.7103254451796522e-09, "C": 16775.90747591089, "nrep": 3.9148582072320104, "nattr": 0.9747613204242506 }, "Delta_eq": 1.2553492695740507e-09 }, "-71.40": { "LJ_approx": { "x0": 1.7061996856525807e-09, "C": 16906.878806702443, "nrep": 3.9150725853841957, "nattr": 0.976778398349503 }, "Delta_eq": 1.2566584760815426e-09 }, "-54.00": { "LJ_approx": { "x0": 1.585264156646392e-09, "C": 21303.176047683613, "nrep": 3.92211004079812, "nattr": 1.0402641595550777 }, "Delta_eq": 1.302942739961778e-09 }, "-89.50": { "LJ_approx": { "x0": 1.886870747500225e-09, "C": 12129.260307725155, "nrep": 3.906945602966425, "nattr": 0.895598309376088 }, "Delta_eq": 1.2107230911508513e-09 }, "-61.93": { "LJ_approx": { "x0": 1.635297932833928e-09, "C": 19347.359224637305, "nrep": 3.9190113341505843, "nattr": 1.0129039638328117 }, "Delta_eq": 1.281743450351987e-09 }, "-58.00": { "LJ_approx": { "x0": 1.6095250707781465e-09, "C": 20329.27848623273, "nrep": 3.92057191136993, "nattr": 1.0267862446034561 }, "Delta_eq": 1.2922508866011204e-09 }, "-140.00": { "LJ_approx": { "x0": 3.5398097121754107e-09, "C": 1255.8690004347025, "nrep": 3.8798490490747404, "nattr": 0.4604604439108636 }, "Delta_eq": 1.1019265101358682e-09 }, "-200.00": { "LJ_approx": { "x0": 6.6005456899992844e-09, "C": 144.85407475420143, "nrep": 3.838295127819493, "nattr": 0.00992375961984237 }, "Delta_eq": 1.0050623818936587e-09 }, "-60.00": { "LJ_approx": { "x0": 1.6223930024686038e-09, "C": 19832.380892638503, "nrep": 3.9197835635468294, "nattr": 1.019801114668753 }, "Delta_eq": 1.2869009570089227e-09 }, "-160.00": { "LJ_approx": { "x0": 6.33567252974036e-09, "C": 143.09307106178315, "nrep": 3.866387420765763, "nattr": 0.08081951236969916 }, "Delta_eq": 1.0662975449878705e-09 } }, "50.0": { "-71.90": { "LJ_approx": { "x0": 1.7114794411958874e-09, "C": 16732.841575829825, "nrep": 3.914827883392826, "nattr": 0.9781401389550711 }, "Delta_eq": 1.2553492695740507e-09 } }, "10.0": { "0.00": { "LJ_approx": { "x0": 1.4403460578039628e-09, "C": 27932.27792195569, "nrep": 3.9334138654752686, "nattr": 1.19526523864855 }, "Delta_eq": 1.4e-09 }, "-71.90": { "LJ_approx": { "x0": 1.7286986021591825e-09, "C": 16087.514816365254, "nrep": 3.9147885683678543, "nattr": 1.012616990226475 }, "Delta_eq": 1.2553492695740507e-09 } }, "100.0": { "0.00": { "LJ_approx": { "x0": 1.4254455131143225e-09, "C": 29048.417918044444, "nrep": 3.9342659189249254, "nattr": 1.1351227816904121 }, "Delta_eq": 1.4e-09 }, "-71.90": { "LJ_approx": { "x0": 1.7087681652667724e-09, "C": 16833.83962398515, "nrep": 3.914908533680663, "nattr": 0.9697102045586926 }, "Delta_eq": 1.2553492695740507e-09 }, "-71.40": { "LJ_approx": { "x0": 1.7046480280451768e-09, "C": 16965.15489682674, "nrep": 3.9151238997284845, "nattr": 0.9716928395857687 }, "Delta_eq": 1.2566584760815426e-09 }, "-54.00": { "LJ_approx": { "x0": 1.5838767580748841e-09, "C": 21372.412565003375, "nrep": 3.922191259312523, "nattr": 1.0344167918733638 }, "Delta_eq": 1.302942739961778e-09 }, "-89.50": { "LJ_approx": { "x0": 1.8850831984948754e-09, "C": 12173.857567383837, "nrep": 3.906959887367545, "nattr": 0.8923154523792497 }, "Delta_eq": 1.2107230911508513e-09 }, "-61.93": { "LJ_approx": { "x0": 1.6338405482612552e-09, "C": 19411.96069791924, "nrep": 3.9190798895919405, "nattr": 1.0073171165886772 }, "Delta_eq": 1.281743450351987e-09 } }, "500.0": { "0.00": { "LJ_approx": { "x0": 1.4236928207491738e-09, "C": 29174.423851140436, "nrep": 3.934489087598531, "nattr": 1.1244706663694597 }, "Delta_eq": 1.4e-09 } }, "15.0": { "-71.90": { "LJ_approx": { "x0": 1.722207527516432e-09, "C": 16331.124295102756, "nrep": 3.914721476976037, "nattr": 1.0021304453280393 }, "Delta_eq": 1.2553492695740507e-09 }, "-71.40": { "LJ_approx": { "x0": 1.7180439454408102e-09, "C": 16459.15520614834, "nrep": 3.9149304238974905, "nattr": 1.0043742068193204 }, "Delta_eq": 1.2566584760815426e-09 }, "-54.00": { "LJ_approx": { "x0": 1.5959361190370466e-09, "C": 20763.823187183425, "nrep": 3.921785737782814, "nattr": 1.0737976563473925 }, "Delta_eq": 1.302942739961778e-09 }, "-89.50": { "LJ_approx": { "x0": 1.9002701433896942e-09, "C": 11795.576706463997, "nrep": 3.9070059150621814, "nattr": 0.9114123498082551 }, "Delta_eq": 1.2107230911508513e-09 }, "-61.93": { "LJ_approx": { "x0": 1.646473044478369e-09, "C": 18846.9803104403, "nrep": 3.918766624746869, "nattr": 1.044220870098494 }, "Delta_eq": 1.281743450351987e-09 } }, "70.0": { "-61.93": { "LJ_approx": { "x0": 1.6349587829329923e-09, "C": 19362.426097728276, "nrep": 3.9190260877993097, "nattr": 1.0116609492531905 }, "Delta_eq": 1.281743450351987e-09 }, "-71.90": { "LJ_approx": { "x0": 1.7099628637265945e-09, "C": 16789.420291577666, "nrep": 3.9148688185890483, "nattr": 0.9736446481917082 }, "Delta_eq": 1.2553492695740507e-09 }, "-71.40": { "LJ_approx": { "x0": 1.7058388214243274e-09, "C": 16920.455606556396, "nrep": 3.9150834388621805, "nattr": 0.9756530742858303 }, "Delta_eq": 1.2566584760815426e-09 }, "-54.00": { "LJ_approx": { "x0": 1.584940743805642e-09, "C": 21319.35643207058, "nrep": 3.9221277053335264, "nattr": 1.0389567310289958 }, "Delta_eq": 1.302942739961778e-09 }, "-89.50": { "LJ_approx": { "x0": 1.886457143504779e-09, "C": 12139.584658299475, "nrep": 3.9069481854501382, "nattr": 0.8948872453823127 }, "Delta_eq": 1.2107230911508513e-09 } }, "150.0": { "-71.90": { "LJ_approx": { "x0": 1.707781542586275e-09, "C": 16870.340248462282, "nrep": 3.914948339378328, "nattr": 0.9660711186661354 }, "Delta_eq": 1.2553492695740507e-09 }, "-71.40": { "LJ_approx": { "x0": 1.7036651779798684e-09, "C": 17001.86272239105, "nrep": 3.9151643485118117, "nattr": 0.9680342060844059 }, "Delta_eq": 1.2566584760815426e-09 }, "-54.00": { "LJ_approx": { "x0": 1.5830041112065094e-09, "C": 21415.64649760579, "nrep": 3.9222512220871013, "nattr": 1.0303387653647968 }, "Delta_eq": 1.302942739961778e-09 }, "-89.50": { "LJ_approx": { "x0": 1.883936689519767e-09, "C": 12202.390459945682, "nrep": 3.906974649816042, "nattr": 0.8898102498996743 }, "Delta_eq": 1.2107230911508513e-09 }, "-61.93": { "LJ_approx": { "x0": 1.6329212539031057e-09, "C": 19452.44141782297, "nrep": 3.9191317215599946, "nattr": 1.0033715654432673 }, "Delta_eq": 1.281743450351987e-09 } }, "16.0": { "-71.90": { "LJ_approx": { "x0": 1.721337196662225e-09, "C": 16363.738563915058, "nrep": 3.9147202446411637, "nattr": 1.0005179992350384 }, "Delta_eq": 1.2553492695740507e-09 }, "-71.40": { "LJ_approx": { "x0": 1.717176984027311e-09, "C": 16491.97282711717, "nrep": 3.9149293522242252, "nattr": 1.0027563589061772 }, "Delta_eq": 1.2566584760815426e-09 }, "-54.00": { "LJ_approx": { "x0": 1.5951507107307484e-09, "C": 20803.713978702526, "nrep": 3.921796023871708, "nattr": 1.0717457519944718 }, "Delta_eq": 1.302942739961778e-09 }, "-89.50": { "LJ_approx": { "x0": 1.899300769652515e-09, "C": 11819.650350288994, "nrep": 3.906993085458305, "nattr": 0.9105930969008011 }, "Delta_eq": 1.2107230911508513e-09 }, "-61.93": { "LJ_approx": { "x0": 1.6456524054221337e-09, "C": 18883.851022856303, "nrep": 3.918771854603072, "nattr": 1.042336408142498 }, "Delta_eq": 1.281743450351987e-09 }, "-58.00": { "LJ_approx": { "x0": 1.6196423302303851e-09, "C": 19847.346684716063, "nrep": 3.920294617611314, "nattr": 1.0573210567487794 }, "Delta_eq": 1.2922508866011204e-09 }, "-140.00": { "LJ_approx": { "x0": 3.536300349187651e-09, "C": 1259.968827699725, "nrep": 3.8800144975132485, "nattr": 0.35722933384459793 }, "Delta_eq": 1.1019265101358682e-09 }, "-200.00": { "LJ_approx": { "x0": 4.448077576688761e-09, "C": 658.9502615105886, "nrep": 3.8382596040618573, "nattr": -0.0007412836249291593 }, "Delta_eq": 1.0050623818936587e-09 }, "-60.00": { "LJ_approx": { "x0": 1.6326306076113028e-09, "C": 19359.641562109642, "nrep": 3.9195252093199606, "nattr": 1.0498037816897123 }, "Delta_eq": 1.2869009570089227e-09 }, "-160.00": { "LJ_approx": { "x0": 4.9755404141516864e-09, "C": 364.2006750890508, "nrep": 3.866403743827545, "nattr": 0.007733297873632889 }, "Delta_eq": 1.0662975449878705e-09 } }, "40.0": { "-71.90": { "LJ_approx": { "x0": 1.7127556130633909e-09, "C": 16685.139617894773, "nrep": 3.9147997833590855, "nattr": 0.9816110605284186 }, "Delta_eq": 1.2553492695740507e-09 }, "-140.00": { "LJ_approx": { "x0": 3.5400899114625196e-09, "C": 1255.3328685760946, "nrep": 3.8798858604365565, "nattr": 0.4340973480775988 }, "Delta_eq": 1.1019265101358682e-09 } }, "30.0": { "-71.90": { "LJ_approx": { "x0": 1.7147994934556977e-09, "C": 16608.65261608246, "nrep": 3.914764637335829, "nattr": 0.9867268079339511 }, "Delta_eq": 1.2553492695740507e-09 } }, "25.4": { "-71.90": { "LJ_approx": { "x0": 1.7162265501918752e-09, "C": 16555.217050637588, "nrep": 3.9147464050871856, "nattr": 0.9900398844251959 }, "Delta_eq": 1.2553492695740507e-09 }, "-61.93": { "LJ_approx": { "x0": 1.6408385667642563e-09, "C": 19099.831853142834, "nrep": 3.9188405082474103, "nattr": 1.0301853851264013 }, "Delta_eq": 1.281743450351987e-09 } }, "40.3": { "-71.90": { "LJ_approx": { "x0": 1.7127058661258177e-09, "C": 16686.9995037205, "nrep": 3.914800799246736, "nattr": 0.981479342025535 }, "Delta_eq": 1.2553492695740507e-09 }, "-61.93": { "LJ_approx": { "x0": 1.637531858311385e-09, "C": 19247.790954282773, "nrep": 3.918928365198691, "nattr": 1.020450016688262 }, "Delta_eq": 1.281743450351987e-09 } }, "45.0": { "-71.90": { "LJ_approx": { "x0": 1.712051746586677e-09, "C": 16711.456749369456, "nrep": 3.914814642254888, "nattr": 0.979726839958776 }, "Delta_eq": 1.2553492695740507e-09 } }, "20.0": { "-71.90": { "LJ_approx": { "x0": 1.7186388439609698e-09, "C": 16464.85454836168, "nrep": 3.9147266088229755, "nattr": 0.9952322612710219 }, "Delta_eq": 1.2553492695740507e-09 }, "-140.00": { "LJ_approx": { "x0": 3.5381843063817075e-09, "C": 1257.5756585425543, "nrep": 3.8799713679048966, "nattr": 0.38013951841061877 }, "Delta_eq": 1.1019265101358682e-09 } }, "60.0": { "-71.90": { "LJ_approx": { "x0": 1.710603828012074e-09, "C": 16765.5278159216, "nrep": 3.9148503824940146, "nattr": 0.9756024890579372 }, "Delta_eq": 1.2553492695740507e-09 } }, "34.0": { "-71.90": { "LJ_approx": { "x0": 1.7138494069983225e-09, "C": 16644.21766668786, "nrep": 3.9147795488410644, "nattr": 0.9844103642751335 }, "Delta_eq": 1.2553492695740507e-09 } }, "22.6": { "-71.90": { "LJ_approx": { "x0": 1.717347083819261e-09, "C": 16513.244775760173, "nrep": 3.914735582752492, "nattr": 0.9925083846044295 }, "Delta_eq": 1.2553492695740507e-09 }, "-54.00": { "LJ_approx": { "x0": 1.5915583742563881e-09, "C": 20985.87279990122, "nrep": 3.9218681449692334, "nattr": 1.06169568520175 }, "Delta_eq": 1.302942739961778e-09 }, "-71.40": { "LJ_approx": { "x0": 1.7131877388482784e-09, "C": 16642.92622648039, "nrep": 3.9149463907912256, "nattr": 0.9946488696557423 }, "Delta_eq": 1.2566584760815426e-09 } }, "45.3": { "-71.90": { "LJ_approx": { "x0": 1.7120143529753677e-09, "C": 16712.8535932463, "nrep": 3.9148154954695285, "nattr": 0.9796234512533043 }, "Delta_eq": 1.2553492695740507e-09 }, "-71.40": { "LJ_approx": { "x0": 1.7078889917152291e-09, "C": 16843.200319018524, "nrep": 3.9150287560913037, "nattr": 0.9816938577273149 }, "Delta_eq": 1.2566584760815426e-09 }, "-54.00": { "LJ_approx": { "x0": 1.5867800612156331e-09, "C": 21227.103472649964, "nrep": 3.922035517445887, "nattr": 1.0460446908119716 }, "Delta_eq": 1.302942739961778e-09 } }, "31.0": { "0.00": { "LJ_approx": { "x0": 1.429704934686545e-09, "C": 28734.55271989346, "nrep": 3.9338783401809607, "nattr": 1.155904475115303 }, "Delta_eq": 1.4e-09 }, "-71.90": { "LJ_approx": { "x0": 1.714533452342314e-09, "C": 16618.61152256, "nrep": 3.9147686086169227, "nattr": 0.9860860917336786 }, "Delta_eq": 1.2553492695740507e-09 } }, "31.1": { "-71.90": { "LJ_approx": { "x0": 1.714515996985078e-09, "C": 16619.26565583038, "nrep": 3.914768856863524, "nattr": 0.986044873313721 }, "Delta_eq": 1.2553492695740507e-09 } }, "45.2": { "-71.90": { "LJ_approx": { "x0": 1.7120203335936594e-09, "C": 16712.630620283762, "nrep": 3.914815347514677, "nattr": 0.9796407085930723 }, "Delta_eq": 1.2553492695740507e-09 } }, "28.0": { "-140.00": { "LJ_approx": { "x0": 3.539662733628618e-09, "C": 1255.7594429416959, "nrep": 3.8799232005747073, "nattr": 0.4091144934965918 }, "Delta_eq": 1.1019265101358682e-09 } }, "52.0": { "-140.00": { "LJ_approx": { "x0": 3.540099825001411e-09, "C": 1255.4092689381769, "nrep": 3.8798640679267495, "nattr": 0.44958440372614994 }, "Delta_eq": 1.1019265101358682e-09 } }, "48.0": { "-140.00": { "LJ_approx": { "x0": 3.540113162797765e-09, "C": 1255.365483546012, "nrep": 3.8798702909870157, "nattr": 0.4451084636623491 }, "Delta_eq": 1.1019265101358682e-09 } }, "25.0": { "-140.00": { "LJ_approx": { "x0": 3.5394591974100957e-09, "C": 1255.9739311868518, "nrep": 3.8799379083844014, "nattr": 0.3998616563003167 }, "Delta_eq": 1.1019265101358682e-09 } }, "24.0": { "-140.00": { "LJ_approx": { "x0": 3.539294542800419e-09, "C": 1256.1755733964683, "nrep": 3.8799434079572483, "nattr": 0.3965246264708772 }, "Delta_eq": 1.1019265101358682e-09 } }, "26.0": { "-71.90": { "LJ_approx": { "x0": 1.716013742853656e-09, "C": 16563.186031095847, "nrep": 3.914748825378261, "nattr": 0.9895571841402089 }, "Delta_eq": 1.2553492695740507e-09 } }, "47.8": { "-71.90": { "LJ_approx": { "x0": 1.7117134574481139e-09, "C": 16724.09882321637, "nrep": 3.914822340960349, "nattr": 0.9787950821308133 }, "Delta_eq": 1.2553492695740507e-09 } }, "45.9": { "-71.90": { "LJ_approx": { "x0": 1.711942249735064e-09, "C": 16715.54886943138, "nrep": 3.9148171027544123, "nattr": 0.9794265342377567 }, "Delta_eq": 1.2553492695740507e-09 } }, "30.3": { "-71.90": { "LJ_approx": { "x0": 1.7147276912050609e-09, "C": 16611.3405217478, "nrep": 3.9147656933151262, "nattr": 0.9865543051062363 }, "Delta_eq": 1.2553492695740507e-09 } }, "43.4": { "-71.90": { "LJ_approx": { "x0": 1.7122593582582404e-09, "C": 16703.696731600092, "nrep": 3.914810082243474, "nattr": 0.9802910111364295 }, "Delta_eq": 1.2553492695740507e-09 } }, "59.7": { "-71.90": { "LJ_approx": { "x0": 1.710624810423359e-09, "C": 16764.744721099407, "nrep": 3.914849818351629, "nattr": 0.9756645210872293 }, "Delta_eq": 1.2553492695740507e-09 } }, "31.6": { "-71.90": { "LJ_approx": { "x0": 1.7144029387800298e-09, "C": 16623.497279426527, "nrep": 3.914770610799719, "nattr": 0.9857697612214258 }, "Delta_eq": 1.2553492695740507e-09 } }, "36.3": { "-71.90": { "LJ_approx": { "x0": 1.7133999698436411e-09, "C": 16661.0351629387, "nrep": 3.9147874692503435, "nattr": 0.9832772435848539 }, "Delta_eq": 1.2553492695740507e-09 } }, "55.7": { "-71.90": { "LJ_approx": { "x0": 1.710943143434582e-09, "C": 16752.866849028967, "nrep": 3.914841318636611, "nattr": 0.9766031714646571 }, "Delta_eq": 1.2553492695740507e-09 } }, "48.5": { "-71.90": { "LJ_approx": { "x0": 1.7116397026523852e-09, "C": 16726.853954116246, "nrep": 3.9148240832062964, "nattr": 0.9785886152529253 }, "Delta_eq": 1.2553492695740507e-09 } }, "42.2": { "-71.90": { "LJ_approx": { "x0": 1.712423436257706e-09, "C": 16697.560003586437, "nrep": 3.9148066437354734, "nattr": 0.980728415714794 }, "Delta_eq": 1.2553492695740507e-09 } }, "27.9": { "-71.90": { "LJ_approx": { "x0": 1.7154107379662085e-09, "C": 16585.76539172651, "nrep": 3.914756263081378, "nattr": 0.9881670164800336 }, "Delta_eq": 1.2553492695740507e-09 } }, "36.8": { "-71.90": { "LJ_approx": { "x0": 1.7133062675842822e-09, "C": 16664.541652779935, "nrep": 3.9147891722285952, "nattr": 0.9830389492355903 }, "Delta_eq": 1.2553492695740507e-09 } }, "24.3": { "-71.90": { "LJ_approx": { "x0": 1.7166571039691342e-09, "C": 16539.090593408535, "nrep": 3.91474189560473, "nattr": 0.9910014451873015 }, "Delta_eq": 1.2553492695740507e-09 } }, "80.0": { "-61.93": { "LJ_approx": { "x0": 1.6344991861630834e-09, "C": 19382.81542285848, "nrep": 3.9190471576123405, "nattr": 1.0099259239851026 }, "Delta_eq": 1.281743450351987e-09 } + }, + "22.0": { + "-71.90": { + "LJ_approx": { + "x0": 1.7176213744119541e-09, + "C": 16502.97011663288, + "nrep": 3.914733365777537, + "nattr": 0.9930970282708068 + }, + "Delta_eq": 1.2553492695740507e-09 + } } } \ No newline at end of file diff --git a/PySONIC/core/stimobj.py b/PySONIC/core/stimobj.py index 77be3d5..9f3cf33 100644 --- a/PySONIC/core/stimobj.py +++ b/PySONIC/core/stimobj.py @@ -1,123 +1,127 @@ # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Email: theo.lemaire@epfl.ch # @Date: 2020-04-21 11:32:49 # @Last Modified by: Theo Lemaire -# @Last Modified time: 2020-04-28 13:33:00 +# @Last Modified time: 2020-04-29 11:01:14 import abc from ..utils import isIterable, si_format class StimObject(metaclass=abc.ABCMeta): ''' Generic interface to a simulation object. ''' fcode_replace_pairs = [ ('/', '_per_'), (',', '_'), ('(', ''), (')', ''), (' ', '') ] @abc.abstractmethod def copy(self): ''' String representation. ''' raise NotImplementedError @staticmethod @abc.abstractmethod def inputs(): raise NotImplementedError def xformat(self, x, factor, precision, minfigs, strict_nfigs=False): if isIterable(x): l = [self.xformat(xx, factor, precision, minfigs, strict_nfigs=strict_nfigs) for xx in x] return f'({", ".join(l)})' if isinstance(x, str): return x xf = si_format(x * factor, precision=precision, space='') if strict_nfigs: if minfigs is not None: nfigs = len(xf.split('.')[0]) if nfigs < minfigs: xf = '0' * (minfigs - nfigs) + xf return xf def paramStr(self, k, **kwargs): val = getattr(self, k) if val is None: return None xf = self.xformat( val, self.inputs()[k].get('factor', 1.), self.inputs()[k].get('precision', 0), self.inputs()[k].get('minfigs', None), **kwargs) return f"{xf}{self.inputs()[k].get('unit', '')}" def pdict(self, sf='{key}={value}', **kwargs): d = {k: self.paramStr(k, **kwargs) for k in self.inputs().keys()} return {k: sf.format(key=k, value=v) for k, v in d.items() if v is not None} + @property + def meta(self): + return {k: getattr(self, k) for k in self.inputs().keys()} + def __eq__(self, other): if not isinstance(other, self.__class__): return False for k in self.inputs().keys(): if getattr(self, k) != getattr(other, k): return False return True def __repr__(self): return f'{self.__class__.__name__}({", ".join(self.pdict().values())})' @property def desc(self): return ', '.join(self.pdict(sf='{key} = {value}').values()) def slugify(self, s): for pair in self.fcode_replace_pairs: s = s.replace(*pair) return s @property def filecodes(self): d = self.pdict(sf='{key}_{value}', strict_nfigs=True) return {k: self.slugify(v) for k, v in d.items()} def checkInt(self, key, value): if not isinstance(value, int): raise TypeError(f'Invalid {self.inputs()[key]["desc"]} (must be an integer)') return value def checkFloat(self, key, value): if isinstance(value, int): value = float(value) if not isinstance(value, float): raise TypeError(f'Invalid {self.inputs()[key]["desc"]} (must be float typed)') return value def checkStrictlyPositive(self, key, value): if value <= 0: raise ValueError(f'Invalid {key} (must be strictly positive)') def checkPositiveOrNull(self, key, value): if value < 0: raise ValueError(f'Invalid {key} (must be positive or null)') def checkStrictlyNegative(self, key, value): if value >= 0: raise ValueError(f'Invalid {key} (must be strictly negative)') def checkNegativeOrNull(self, key, value): if value > 0: d = self.inputs()[key] raise ValueError(f'Invalid {key} {d["unit"]} (must be negative or null)') def checkBounded(self, key, value, bounds): if value < bounds[0] or value > bounds[1]: d = self.inputs()[key] f, u = d.get("factor", 1), d["unit"] bounds_str = f'[{bounds[0] * f}; {bounds[1] * f}] {u}' raise ValueError(f'Invalid {d["desc"]}: {value * f} {u} (must be within {bounds_str})') diff --git a/PySONIC/plt/effvars.py b/PySONIC/plt/effvars.py index 2c16bf1..d341556 100644 --- a/PySONIC/plt/effvars.py +++ b/PySONIC/plt/effvars.py @@ -1,253 +1,253 @@ # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Email: theo.lemaire@epfl.ch # @Date: 2018-10-02 01:44:59 # @Last Modified by: Theo Lemaire -# @Last Modified time: 2020-04-27 18:08:04 +# @Last Modified time: 2020-04-29 12:14:47 from inspect import signature import numpy as np import matplotlib.pyplot as plt -from ..utils import logger, isWithin +from ..utils import logger, isWithin, getSIpair from ..core import NeuronalBilayerSonophore from .pltutils import setGrid, setNormalizer def isVoltageDependent(func): return 'Vm' in list(signature(func).parameters.keys()) def plotGatingKinetics(pneuron, fs=15): ''' Plot the voltage-dependent steady-states and time constants of activation and inactivation gates of the different ionic currents involved in a specific neuron's membrane. :param pneuron: point-neuron model :param fs: labels and title font size ''' # Input membrane potential vector Vm = np.linspace(-100, 50, 300) xinf_dict = {} taux_dict = {} logger.info('Computing %s neuron gating kinetics', pneuron.name) names = pneuron.states for xname in names: Vm_state = True # Names of functions of interest xinf_func_str = xname.lower() + 'inf' taux_func_str = 'tau' + xname.lower() alphax_func_str = 'alpha' + xname.lower() betax_func_str = 'beta' + xname.lower() # derx_func_str = 'der' + xname.upper() # 1st choice: use xinf and taux function if hasattr(pneuron, xinf_func_str) and hasattr(pneuron, taux_func_str): xinf_func = getattr(pneuron, xinf_func_str) taux_func = getattr(pneuron, taux_func_str) xinf = np.array([xinf_func(v) for v in Vm]) if isinstance(taux_func, float): taux = taux_func * np.ones(len(Vm)) else: taux = np.array([taux_func(v) for v in Vm]) # 2nd choice: use alphax and betax functions elif hasattr(pneuron, alphax_func_str) and hasattr(pneuron, betax_func_str): alphax_func = getattr(pneuron, alphax_func_str) betax_func = getattr(pneuron, betax_func_str) if not isVoltageDependent(alphax_func): Vm_state = False else: alphax = np.array([alphax_func(v) for v in Vm]) if isinstance(betax_func, float): betax = betax_func * np.ones(len(Vm)) else: betax = np.array([betax_func(v) for v in Vm]) taux = 1.0 / (alphax + betax) xinf = taux * alphax # # 3rd choice: use derX choice # elif hasattr(pneuron, derx_func_str): # derx_func = getattr(pneuron, derx_func_str) # xinf = brentq(lambda x: derx_func(pneuron.Vm, x), 0, 1) else: Vm_state = False if not Vm_state: logger.error('%s-state gating kinetics is not Vm-dependent', xname) else: xinf_dict[xname] = xinf taux_dict[xname] = taux fig, axes = plt.subplots(2) fig.suptitle(f'{pneuron.name} neuron: gating dynamics') ax = axes[0] ax.get_xaxis().set_ticklabels([]) ax.set_ylabel('$X_{\infty}$', fontsize=fs) for xname in names: if xname in xinf_dict: ax.plot(Vm, xinf_dict[xname], lw=2, label='$' + xname + '_{\infty}$') ax.legend(fontsize=fs, loc=7) ax = axes[1] ax.set_xlabel('$V_m\ (mV)$', fontsize=fs) ax.set_ylabel('$\\tau_X\ (ms)$', fontsize=fs) for xname in names: if xname in taux_dict: ax.plot(Vm, taux_dict[xname] * 1e3, lw=2, label='$\\tau_{' + xname + '}$') ax.legend(fontsize=fs, loc=7) return fig def plotEffectiveVariables(pneuron, a=None, f=None, A=None, nlevels=10, zscale='lin', cmap=None, fs=12, ncolmax=1): ''' Plot the profiles of effective variables of a specific neuron as a function of charge density and another reference variable (z-variable). For each effective variable, one charge-profile per z-value is plotted, with a color code based on the z-variable value. :param pneuron: point-neuron model :param a: sonophore radius (m) :param f: acoustic drive frequency (Hz) :param A: acoustic pressure amplitude (Pa) :param nlevels: number of levels for the z-variable :param zscale: scale type for the z-variable ('lin' or 'log') :param cmap: colormap name :param fs: figure fontsize :param ncolmax: max number of columns on the figure :return: handle to the created figure ''' if sum(isinstance(x, float) for x in [a, f, A]) < 2: raise ValueError('at least 2 parameters in (a, f, A) must be fixed') if cmap is None: cmap = 'viridis' nbls = NeuronalBilayerSonophore(32e-9, pneuron) pltvars = nbls.getPltVars() # Get lookups and re-organize them lkp = nbls.getLookup().squeeze() Qref = lkp.refs['Q'] lkp.rename('V', 'Vm') lkp['Cm'] = Qref / lkp['Vm'] * 1e3 # uF/cm2 # Sort keys for display keys = lkp.outputs del keys[keys.index('Cm')] del keys[keys.index('Vm')] keys = ['Cm', 'Vm'] + keys # Get reference US-OFF lookups (1D) lookupsoff = lkp.projectOff() # Get 2D lookups at specific combination inputs = {} for k, v in {'a': a, 'f': f, 'A': A}.items(): if v is not None: inputs[k] = v lookups2D = lkp.projectN(inputs) # Get z-variable from remaining inputs for key in lookups2D.inputs: if key != 'Q': zkey = key zref = lookups2D.refs[key] zvar = nbls.inputs()[zkey] zref *= zvar.get('factor', 1.) # Optional: interpolate along z dimension if nlevels specified if zscale == 'log': zref_pos = zref[zref > 0] znew = np.logspace(np.log10(zref_pos.min()), np.log10(zref_pos.max()), nlevels) elif zscale == 'lin': znew = np.linspace(zref.min(), zref.max(), nlevels) else: raise ValueError('unknown scale type (should be "lin" or "log")') znew = np.array([isWithin(zvar['label'], z, (zref.min(), zref.max())) for z in znew]) lookups2D = lookups2D.project(zkey, znew) zref = znew # Define color code mymap = plt.get_cmap(cmap) norm, sm = setNormalizer(mymap, (zref.min(), zref.max()), zscale) # Plot logger.info('plotting') nrows, ncols = setGrid(len(keys), ncolmax=ncolmax) xvar = pltvars['Qm'] Qbounds = np.array([Qref.min(), Qref.max()]) * xvar['factor'] fig, _ = plt.subplots(figsize=(3.5 * ncols, 1 * nrows), squeeze=False) for j, key in enumerate(keys): ax = plt.subplot2grid((nrows, ncols), (j // ncols, j % ncols)) for s in ['right', 'top']: ax.spines[s].set_visible(False) yvar = pltvars[key] if j // ncols == nrows - 1: ax.set_xlabel('$\\rm {}\ ({})$'.format(xvar["label"], xvar["unit"]), fontsize=fs) ax.set_xticks(Qbounds) else: ax.set_xticks([]) ax.spines['bottom'].set_visible(False) ax.xaxis.set_label_coords(0.5, -0.1) ax.yaxis.set_label_coords(-0.02, 0.5) for item in ax.get_xticklabels() + ax.get_yticklabels(): item.set_fontsize(fs) ymin = np.inf ymax = -np.inf # Plot effective variable for each selected z-value y0 = lookupsoff[key] for i, z in enumerate(zref): y = lookups2D[key][i] if 'alpha' in key or 'beta' in key: y[y > y0.max() * 2] = np.nan ax.plot(Qref * xvar.get('factor', 1), y * yvar.get('factor', 1), c=sm.to_rgba(z)) ymin = min(ymin, y.min()) ymax = max(ymax, y.max()) # Plot reference variable ax.plot(Qref * xvar.get('factor', 1), y0 * yvar.get('factor', 1), '--', c='k') ymax = max(ymax, y0.max()) ymin = min(ymin, y0.min()) # Set axis y-limits if 'alpha' in key or 'beta' in key: ymax = y0.max() * 2 ylim = [ymin * yvar.get('factor', 1), ymax * yvar.get('factor', 1)] if key == 'Cm': factor = 1e1 ylim = [np.floor(ylim[0] * factor) / factor, np.ceil(ylim[1] * factor) / factor] else: factor = 1 / np.power(10, np.floor(np.log10(ylim[1]))) ylim = [np.floor(ylim[0] * factor) / factor, np.ceil(ylim[1] * factor) / factor] ax.set_yticks(ylim) ax.set_ylim(ylim) ax.set_ylabel('$\\rm {}\ ({})$'.format(yvar["label"], yvar["unit"]), fontsize=fs, rotation=0, ha='right', va='center') fig.suptitle(f'{pneuron.name} neuron: {zvar["label"]} \n modulated effective variables') - # Adjust colorbar factor and unit if zvar is US frequency of amplitude - if zkey in ['f', 'A']: - zvar['unit'] = 'k' + zvar['unit'] - _, sm = setNormalizer(mymap, (zref.min() * 1e-3, zref.max() * 1e-3), zscale) + # Adjust colorbar factor and unit prefix if zvar is US frequency of amplitude + zfactor, zprefix = getSIpair(zref, scale=zscale) + zvar['unit'] = zprefix + zvar['unit'] + _, sm = setNormalizer(mymap, (zref.min() / zfactor, zref.max() / zfactor), zscale) # Plot colorbar fig.subplots_adjust(left=0.20, bottom=0.05, top=0.8, right=0.80, hspace=0.5) cbarax = fig.add_axes([0.10, 0.90, 0.80, 0.02]) fig.colorbar(sm, cax=cbarax, orientation='horizontal') cbarax.set_xlabel(f'{zvar["label"]} ({zvar["unit"]})', fontsize=fs) for item in cbarax.get_yticklabels(): item.set_fontsize(fs) return fig diff --git a/PySONIC/plt/pltutils.py b/PySONIC/plt/pltutils.py index ef18a0b..6618fdc 100644 --- a/PySONIC/plt/pltutils.py +++ b/PySONIC/plt/pltutils.py @@ -1,428 +1,444 @@ # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Email: theo.lemaire@epfl.ch # @Date: 2017-08-21 14:33:36 # @Last Modified by: Theo Lemaire -# @Last Modified time: 2020-04-28 17:54:11 +# @Last Modified time: 2020-04-29 12:29:01 ''' Useful functions to generate plots. ''' import re import numpy as np import pandas as pd import matplotlib from matplotlib.patches import Rectangle from matplotlib import cm, colors import matplotlib.pyplot as plt from ..core import getModel -from ..utils import logger, isIterable, loadData, rescale, swapFirstLetterCase +from ..utils import * # Matplotlib parameters matplotlib.rcParams['pdf.fonttype'] = 42 matplotlib.rcParams['ps.fonttype'] = 42 matplotlib.rcParams['font.family'] = 'arial' def cm2inch(*tupl): inch = 2.54 if isinstance(tupl[0], tuple): return tuple(i / inch for i in tupl[0]) else: return tuple(i / inch for i in tupl) def extractPltVar(model, pltvar, df, meta=None, nsamples=0, name=''): if 'func' in pltvar: s = pltvar['func'] if not s.startswith('meta'): s = f'model.{s}' try: var = eval(s) except AttributeError: var = eval(s.replace('model', 'model.pneuron')) elif 'key' in pltvar: var = df[pltvar['key']] elif 'constant' in pltvar: var = eval(pltvar['constant']) * np.ones(nsamples) else: var = df[name] if isinstance(var, pd.Series): var = var.values var = var.copy() if var.size == nsamples - 1: var = np.insert(var, 0, var[0]) var *= pltvar.get('factor', 1) return var def setGrid(n, ncolmax=3): ''' Determine number of rows and columns in figure grid, based on number of variables to plot. ''' if n <= ncolmax: return (1, n) else: return ((n - 1) // ncolmax + 1, ncolmax) def setNormalizer(cmap, bounds, scale='lin'): norm = { 'lin': colors.Normalize, 'log': colors.LogNorm }[scale](*bounds) sm = cm.ScalarMappable(norm=norm, cmap=cmap) sm._A = [] return norm, sm class GenericPlot: def __init__(self, outputs): ''' Constructor. :param outputs: list / generator of simulation outputs ''' try: iter(outputs) except TypeError: outputs = [outputs] self.outputs = outputs def __call__(self, *args, **kwargs): return self.render(*args, **kwargs) @property def nouts(self): ''' Number of outputs. ''' return sum(1 for _ in self.outputs) def figtitle(self, model, meta): return model.desc(meta) @staticmethod def wraptitle(ax, title, maxwidth=120, sep=':', fs=10, y=1.0): if len(title) > maxwidth: title = '\n'.join(title.split(sep)) y = 0.94 h = ax.set_title(title, fontsize=fs) h.set_y(y) @staticmethod def getData(entry, frequency=1, trange=None): if entry is None: raise ValueError('non-existing data') if isinstance(entry, str): data, meta = loadData(entry, frequency) else: data, meta = entry data = data.iloc[::frequency] if trange is not None: tmin, tmax = trange data = data.loc[(data['t'] >= tmin) & (data['t'] <= tmax)] return data, meta def render(self, *args, **kwargs): raise NotImplementedError @staticmethod def getSimType(fname): ''' Get sim type from filename. ''' mo = re.search('(^[A-Z]*)_(.*).pkl', fname) if not mo: raise ValueError(f'Could not find sim-key in filename: "{fname}"') return mo.group(1) @staticmethod def getModel(*args, **kwargs): return getModel(*args, **kwargs) @staticmethod def getTimePltVar(tscale): ''' Return time plot variable for a given temporal scale. ''' return { 'desc': 'time', 'label': 'time', 'unit': tscale, 'factor': {'ms': 1e3, 'us': 1e6}[tscale], 'onset': {'ms': 1e-3, 'us': 1e-6}[tscale] } @staticmethod def createBackBone(*args, **kwargs): raise NotImplementedError @staticmethod def prettify(ax, xticks=None, yticks=None, xfmt='{:.0f}', yfmt='{:+.0f}'): try: ticks = ax.get_ticks() ticks = (min(ticks), max(ticks)) ax.set_ticks(ticks) ax.set_ticklabels([xfmt.format(x) for x in ticks]) except AttributeError: if xticks is None: xticks = ax.get_xticks() xticks = (min(xticks), max(xticks)) if yticks is None: yticks = ax.get_yticks() yticks = (min(yticks), max(yticks)) ax.set_xticks(xticks) ax.set_yticks(yticks) if xfmt is not None: ax.set_xticklabels([xfmt.format(x) for x in xticks]) if yfmt is not None: ax.set_yticklabels([yfmt.format(y) for y in yticks]) @staticmethod def addInset(fig, ax, inset): ''' Create inset axis. ''' inset_ax = fig.add_axes(ax.get_position()) inset_ax.set_zorder(1) inset_ax.set_xlim(inset['xlims'][0], inset['xlims'][1]) inset_ax.set_ylim(inset['ylims'][0], inset['ylims'][1]) inset_ax.set_xticks([]) inset_ax.set_yticks([]) inset_ax.add_patch(Rectangle((inset['xlims'][0], inset['ylims'][0]), inset['xlims'][1] - inset['xlims'][0], inset['ylims'][1] - inset['ylims'][0], color='w')) return inset_ax @staticmethod def materializeInset(ax, inset_ax, inset): ''' Materialize inset with zoom boox. ''' # Re-position inset axis axpos = ax.get_position() left, right, = rescale(inset['xcoords'], ax.get_xlim()[0], ax.get_xlim()[1], axpos.x0, axpos.x0 + axpos.width) bottom, top, = rescale(inset['ycoords'], ax.get_ylim()[0], ax.get_ylim()[1], axpos.y0, axpos.y0 + axpos.height) inset_ax.set_position([left, bottom, right - left, top - bottom]) for i in inset_ax.spines.values(): i.set_linewidth(2) # Materialize inset target region with contour frame ax.plot(inset['xlims'], [inset['ylims'][0]] * 2, linestyle='-', color='k') ax.plot(inset['xlims'], [inset['ylims'][1]] * 2, linestyle='-', color='k') ax.plot([inset['xlims'][0]] * 2, inset['ylims'], linestyle='-', color='k') ax.plot([inset['xlims'][1]] * 2, inset['ylims'], linestyle='-', color='k') # Link target and inset with dashed lines if possible if inset['xcoords'][1] < inset['xlims'][0]: ax.plot([inset['xcoords'][1], inset['xlims'][0]], [inset['ycoords'][0], inset['ylims'][0]], linestyle='--', color='k') ax.plot([inset['xcoords'][1], inset['xlims'][0]], [inset['ycoords'][1], inset['ylims'][1]], linestyle='--', color='k') elif inset['xcoords'][0] > inset['xlims'][1]: ax.plot([inset['xcoords'][0], inset['xlims'][1]], [inset['ycoords'][0], inset['ylims'][0]], linestyle='--', color='k') ax.plot([inset['xcoords'][0], inset['xlims'][1]], [inset['ycoords'][1], inset['ylims'][1]], linestyle='--', color='k') else: logger.warning('Inset x-coordinates intersect with those of target region') def postProcess(self, *args, **kwargs): raise NotImplementedError @staticmethod def removeSpines(ax): for item in ['top', 'right']: ax.spines[item].set_visible(False) @staticmethod def setXTicks(ax, xticks=None): if xticks is not None: ax.set_xticks(xticks) @staticmethod def setYTicks(ax, yticks=None): if yticks is not None: ax.set_yticks(yticks) @staticmethod def setTickLabelsFontSize(ax, fs): for tick in ax.xaxis.get_major_ticks() + ax.yaxis.get_major_ticks(): tick.label.set_fontsize(fs) @staticmethod def setXLabel(ax, xplt, fs): ax.set_xlabel('$\\rm {}\ ({})$'.format(xplt["label"], xplt["unit"]), fontsize=fs) @staticmethod def setYLabel(ax, yplt, fs): ax.set_ylabel('$\\rm {}\ ({})$'.format(yplt["label"], yplt.get("unit", "")), fontsize=fs) @classmethod def addCmap(cls, fig, cmap, handles, comp_values, comp_info, fs, prettify, zscale='lin'): + # Rescale comparison values and adjust unit + comp_values = np.asarray(comp_values) * comp_info.get('factor', 1.) + comp_factor, comp_prefix = getSIpair(comp_values, scale=zscale) + comp_values /= comp_factor + comp_info['unit'] = comp_prefix + comp_info['unit'] + # Create colormap and normalizer try: mymap = plt.get_cmap(cmap) except ValueError: mymap = plt.get_cmap(swapFirstLetterCase(cmap)) norm, sm = setNormalizer(mymap, (comp_values.min(), comp_values.max()), zscale) # Adjust line colors for lh, z in zip(handles, comp_values): if isIterable(lh): for item in lh: item.set_color(sm.to_rgba(z)) else: lh.set_color(sm.to_rgba(z)) # Add colorbar fig.subplots_adjust(left=0.1, right=0.8, bottom=0.15, top=0.95, hspace=0.5) cbarax = fig.add_axes([0.85, 0.15, 0.03, 0.8]) cbar = fig.colorbar(sm, cax=cbarax, orientation='vertical') desc_str = comp_info["desc"].replace(" ", "\ ") cbarax.set_ylabel('$\\rm {}\ ({})$'.format(desc_str, comp_info["unit"]), fontsize=fs) if prettify: cls.prettify(cbar) for item in cbarax.get_yticklabels(): item.set_fontsize(fs) class ComparativePlot(GenericPlot): def __init__(self, outputs, varname): ''' Constructor. :param outputs: list /generator of simulation outputs to be compared. :param varname: name of variable to extract and compare. ''' super().__init__(outputs) self.varname = varname self.comp_ref_key = None self.meta_ref = None self.comp_info = None self.is_unique_comp = False @property def ncomps(self): return sum(1 for _ in self.outputs) def checkColors(self, colors): if colors is None: colors = [f'C{j}' for j in range(self.nouts)] return colors def checkLines(self, lines): if lines is None: lines = ['-'] * self.nouts return lines def checkLabels(self, labels): if labels is not None: nlabels, nfiles = len(labels), self.nouts if nlabels != nfiles: raise ValueError( f'Invalid labels ({nlabels}): not matching number of compared files ({nfiles})') if not all(isinstance(x, str) for x in labels): raise TypeError('Invalid labels: must be string typed') def checkSimType(self, meta): ''' Check consistency of sim types across files. ''' if meta['simkey'] != self.meta_ref['simkey']: raise ValueError('Invalid comparison: different simulation types') def checkCompValues(self, meta, comp_values): ''' Check consistency of differing values across files. ''' - differing = {k: meta[k] != self.meta_ref[k] for k in meta.keys()} - if sum(differing.values()) > 1: + # Get differing values across meta dictionaries + diffs = differing(self.meta_ref, meta, subdkey='meta') + + # Check that only one value differs + if len(diffs) > 1: logger.warning('More than one differing inputs') self.comp_ref_key = None return [] - zkey = (list(differing.keys())[list(differing.values()).index(True)]) + + # Get the key and differing values + zkey, refval, val = diffs[0] + + # If no comparison key yet, fill it up if self.comp_ref_key is None: self.comp_ref_key = zkey self.is_unique_comp = True - comp_values.append(self.meta_ref[self.comp_ref_key]) - comp_values.append(meta[self.comp_ref_key]) + comp_values += [refval, val] + # Otherwise, check that comparison matches the existing one else: if zkey != self.comp_ref_key: logger.warning('inconsistent differing inputs') self.comp_ref_key = None return [] else: - comp_values.append(meta[self.comp_ref_key]) + comp_values.append(val) return comp_values def checkConsistency(self, meta, comp_values): ''' Check consistency of sim types and check differing inputs. ''' if self.meta_ref is None: self.meta_ref = meta else: self.checkSimType(meta) comp_values = self.checkCompValues(meta, comp_values) if self.comp_ref_key is None: self.is_unique_comp = False return comp_values def getCompLabels(self, comp_values): if self.comp_info is not None: comp_values = np.array(comp_values) * self.comp_info.get('factor', 1) - comp_labels = ['$\\rm{} = {}\ {}$'.format(self.comp_info["label"], x, self.comp_info["unit"]) - for x in comp_values] + if 'unit' in self.comp_info: + p = self.comp_info.get('precision', 0) + comp_values = [f"{si_format(v, p)}{self.comp_info['unit']}".replace(' ', '\ ') + for v in comp_values] + comp_labels = ['$\\rm{} = {}$'.format(self.comp_info['label'], x) for x in comp_values] else: comp_labels = comp_values return comp_values, comp_labels def chooseLabels(self, labels, comp_labels, full_labels): if labels is not None: return labels else: if self.is_unique_comp: return comp_labels else: return full_labels @staticmethod def getCommonLabel(lbls, seps='_'): ''' Get a common label from a list of labels, by removing parts that differ across them. ''' # Split every label according to list of separator characters, and save splitters as well splt_lbls = [re.split(f'([{seps}])', x) for x in lbls] pieces = [x[::2] for x in splt_lbls] splitters = [x[1::2] for x in splt_lbls] ncomps = len(pieces[0]) # Assert that splitters are equivalent across all labels, and reduce them to a single array assert (x == x[0] for x in splitters), 'Inconsistent splitters' splitters = np.array(splitters[0]) # Transform pieces into 2D matrix, and evaluate equality of every piece across labels pieces = np.array(pieces).T all_identical = [np.all(x == x[0]) for x in pieces] if np.sum(all_identical) < ncomps - 1: logger.warning('More than one differing inputs') return '' # Discard differing pieces and remove associated splitters pieces = pieces[all_identical, 0] splitters = splitters[all_identical[:-1]] # Remove last splitter if the last pieces were discarded if splitters.size == pieces.size: splitters = splitters[:-1] # Join common pieces and associated splitters into a single label common_lbl = '' for p, s in zip(pieces, splitters): common_lbl += f'{p}{s}' common_lbl += pieces[-1] - return common_lbl + return common_lbl.replace('( ', '(') def addExcitationInset(ax, is_excited): ''' Add text inset on axis stating excitation status. ''' ax.text( 0.7, 0.7, f'{"" if is_excited else "not "}excited', transform=ax.transAxes, ha='center', va='center', size=30, bbox=dict( boxstyle='round', fc=(0.8, 1.0, 0.8) if is_excited else (1., 0.8, 0.8) )) diff --git a/PySONIC/plt/timeseries.py b/PySONIC/plt/timeseries.py index 8c50848..c9082d1 100644 --- a/PySONIC/plt/timeseries.py +++ b/PySONIC/plt/timeseries.py @@ -1,510 +1,511 @@ # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Email: theo.lemaire@epfl.ch # @Date: 2018-09-25 16:18:45 # @Last Modified by: Theo Lemaire -# @Last Modified time: 2020-04-28 18:01:09 +# @Last Modified time: 2020-04-29 12:25:50 import numpy as np import matplotlib.pyplot as plt from ..postpro import detectSpikes, convertPeaksProperties from ..utils import * from .pltutils import * class TimeSeriesPlot(GenericPlot): ''' Generic interface to build a plot displaying temporal profiles of model simulations. ''' @classmethod def setTimeLabel(cls, ax, tplt, fs): return super().setXLabel(ax, tplt, fs) @classmethod def setYLabel(cls, ax, yplt, fs, grouplabel=None): if grouplabel is not None: yplt['label'] = grouplabel return super().setYLabel(ax, yplt, fs) def checkInputs(self, *args, **kwargs): raise NotImplementedError @staticmethod def getStimStates(df): try: stimstate = df['stimstate'] except KeyError: stimstate = df['states'] return stimstate.values @classmethod def getStimPulses(cls, t, states): ''' Determine the onset and offset times of pulses from a stimulation vector. :param t: time vector (s). :param states: a vector of stimulation state (ON/OFF) at each instant in time. :return: list of 3-tuples start time, end time and value of each pulse. ''' # Compute states derivatives and identify transition indexes dstates = np.diff(states) itransitions = np.where(np.abs(dstates) > 0)[0] + 1 if states[0] != 0.: itransitions = np.hstack(([0], itransitions)) if states[-1] != 0: itransitions = np.hstack((itransitions, [t.size - 1])) pulses = list(zip(t[itransitions[:-1]], t[itransitions[1:]], states[itransitions[:-1]])) return list(filter(lambda x: x[2] != 0, pulses)) def addLegend(self, fig, ax, handles, labels, fs, color=None, ls=None): lh = ax.legend(handles, labels, loc=1, fontsize=fs, frameon=False) if color is not None: for l in lh.get_lines(): l.set_color(color) if ls: for l in lh.get_lines(): l.set_linestyle(ls) @classmethod def materializeSpikes(cls, ax, data, tplt, yplt, color, mode, add_to_legend=False): ispikes, properties = detectSpikes(data) t = data['t'].values Qm = data['Qm'].values if ispikes is not None: yoffset = 5 ax.plot(t[ispikes] * tplt['factor'], Qm[ispikes] * yplt['factor'] + yoffset, 'v', color=color, label='spikes' if add_to_legend else None) if mode == 'details': ileft = properties['left_bases'] iright = properties['right_bases'] properties = convertPeaksProperties(t, properties) ax.plot(t[ileft] * tplt['factor'], Qm[ileft] * yplt['factor'] - 5, '<', color=color, label='left-bases' if add_to_legend else None) ax.plot(t[iright] * tplt['factor'], Qm[iright] * yplt['factor'] - 10, '>', color=color, label='right-bases' if add_to_legend else None) ax.vlines( x=t[ispikes] * tplt['factor'], ymin=(Qm[ispikes] - properties['prominences']) * yplt['factor'], ymax=Qm[ispikes] * yplt['factor'], color=color, linestyles='dashed', label='prominences' if add_to_legend else '') ax.hlines( y=properties['width_heights'] * yplt['factor'], xmin=properties['left_ips'] * tplt['factor'], xmax=properties['right_ips'] * tplt['factor'], color=color, linestyles='dotted', label='half-widths' if add_to_legend else '') return add_to_legend @staticmethod def prepareTime(t, tplt): if tplt['onset'] > 0.0: tonset = t.min() - 0.05 * np.ptp(t) t = np.insert(t, 0, tonset) return t * tplt['factor'] @staticmethod def getPatchesColors(x): if np.all([xx == x[0] for xx in x]): return ['#8A8A8A'] * len(x) else: xabsmax = np.abs(x).max() _, sm = setNormalizer(plt.get_cmap('RdGy'), (-xabsmax, xabsmax), 'lin') return [sm.to_rgba(xx) for xx in x] @classmethod def addPatches(cls, ax, pulses, tplt, color=None): tstart, tend, x = zip(*pulses) if color is None: colors = cls.getPatchesColors(x) else: colors = [color] * len(x) for i in range(len(pulses)): ax.axvspan(tstart[i] * tplt['factor'], tend[i] * tplt['factor'], edgecolor='none', facecolor=colors[i], alpha=0.2) @staticmethod def plotInset(inset_ax, inset, t, y, tplt, yplt, line, color, lw): inset_ax.plot(t, y, linewidth=lw, linestyle=line, color=color) return inset_ax @staticmethod def addInsetPatches(ax, inset_ax, inset, pulses, tplt, color): tstart, tend, x = [np.array([z]) for z in zip(*pulses)] # colors = cls.getPatchesColors(x) tfactor = tplt['factor'] ybottom, ytop = ax.get_ylim() cond_start = np.logical_and(tstart > (inset['xlims'][0] / tfactor), tstart < (inset['xlims'][1] / tfactor)) cond_end = np.logical_and(tend > (inset['xlims'][0] / tfactor), tend < (inset['xlims'][1] / tfactor)) cond_glob = np.logical_and(tstart < (inset['xlims'][0] / tfactor), tend > (inset['xlims'][1] / tfactor)) cond_onoff = np.logical_or(cond_start, cond_end) cond = np.logical_or(cond_onoff, cond_glob) tstart, tend, x = [z[cond] for z in [tstart, tend, x]] colors = cls.getPatchesColors(x) npatches_inset = tstart.size for i in range(npatches_inset): inset_ax.add_patch(Rectangle( (tstart[i] * tfactor, ybottom), (tend[i] - tstart[i]) * tfactor, ytop - ybottom, color=colors[i], alpha=0.1)) class CompTimeSeries(ComparativePlot, TimeSeriesPlot): ''' Interface to build a comparative plot displaying profiles of a specific output variable across different model simulations. ''' def __init__(self, outputs, varname): ''' Constructor. :param outputs: list / generator of simulator outputs to be compared. :param varname: name of variable to extract and compare ''' ComparativePlot.__init__(self, outputs, varname) def checkPatches(self, patches): greypatch = False if patches == 'none': patches = [False] * self.nouts elif patches == 'all': patches = [True] * self.nouts elif patches == 'one': patches = [True] + [False] * (self.nouts - 1) greypatch = True elif isinstance(patches, list): npatches, nfiles = len(patches), self.nouts if npatches != nfiles: raise ValueError( f'Invalid patches ({npatches}): not matching number of files ({nfiles})') if not all(isinstance(p, bool) for p in patches): raise TypeError('Invalid patch sequence: all list items must be boolean typed') else: raise ValueError( 'Invalid patches: must be either "none", all", "one", or a boolean list') return patches, greypatch def checkInputs(self, lines, labels, colors, patches): self.checkLabels(labels) lines = self.checkLines(lines) colors = self.checkColors(colors) patches, greypatch = self.checkPatches(patches) return lines, labels, colors, patches, greypatch @staticmethod def createBackBone(figsize): fig, ax = plt.subplots(figsize=figsize) ax.set_zorder(0) return fig, ax @classmethod def postProcess(cls, ax, tplt, yplt, fs, meta, prettify): cls.removeSpines(ax) if 'bounds' in yplt: ymin, ymax = ax.get_ylim() ax.set_ylim(min(ymin, yplt['bounds'][0]), max(ymax, yplt['bounds'][1])) cls.setTimeLabel(ax, tplt, fs) cls.setYLabel(ax, yplt, fs) if prettify: cls.prettify(ax, xticks=(0, meta['tstim'] * tplt['factor'])) cls.setTickLabelsFontSize(ax, fs) def render(self, figsize=(11, 4), fs=10, lw=2, labels=None, colors=None, lines=None, patches='one', inset=None, frequency=1, spikes='none', cmap=None, cscale='lin', trange=None, prettify=False): ''' Render plot. :param figsize: figure size (x, y) :param fs: labels fontsize :param lw: linewidth :param labels: list of labels to use in the legend :param colors: list of colors to use for each curve :param lines: list of linestyles :param patches: string indicating whether/how to mark stimulation periods with rectangular patches :param inset: string indicating whether/how to mark an inset zooming on a particular region of the graph :param frequency: frequency at which to plot samples :param spikes: string indicating how to show spikes ("none", "marks" or "details") :param cmap: color map to use for colobar-based comparison (if not None) :param cscale: color scale to use for colobar-based comparison :param trange: optional lower and upper bounds to time axis :return: figure handle ''' lines, labels, colors, patches, greypatch = self.checkInputs( lines, labels, colors, patches) fcodes = [] fig, ax = self.createBackBone(figsize) if inset is not None: inset_ax = self.addInset(fig, ax, inset) # Loop through data files handles, comp_values, full_labels = [], [], [] tmin, tmax = np.inf, -np.inf for j, output in enumerate(self.outputs): # Load data try: data, meta = self.getData(output, frequency, trange) except ValueError: continue if 'tcomp' in meta: meta.pop('tcomp') # Extract model model = self.getModel(meta) fcodes.append(model.filecode(meta)) # Add label to list full_labels.append(self.figtitle(model, meta)) # Check consistency of sim types and check differing inputs comp_values = self.checkConsistency(meta, comp_values) # Extract time and stim pulses t = data['t'].values stimstate = self.getStimStates(data) pulses = self.getStimPulses(t, stimstate) tplt = self.getTimePltVar(model.tscale) t = self.prepareTime(t, tplt) # Extract y-variable pltvars = model.getPltVars() if self.varname not in pltvars: pltvars_str = ', '.join([f'"{p}"' for p in pltvars.keys()]) raise KeyError( f'Unknown plot variable: "{self.varname}". Candidates are: {pltvars_str}') yplt = pltvars[self.varname] y = extractPltVar(model, yplt, data, meta, t.size, self.varname) # Plot time series handles.append(ax.plot(t, y, linewidth=lw, linestyle=lines[j], color=colors[j])[0]) # Optional: add spikes if self.varname == 'Qm' and spikes != 'none': self.materializeSpikes(ax, data, tplt, yplt, colors[j], spikes) # Plot optional inset if inset is not None: inset_ax = self.plotInset( inset_ax, inset, t, y, tplt, yplt, lines[j], colors[j], lw) # Add optional STIM-ON patches if patches[j]: ybottom, ytop = ax.get_ylim() color = None if greypatch else handles[j].get_color() self.addPatches(ax, pulses, tplt, color=color) if inset is not None: self.addInsetPatches(ax, inset_ax, inset, pulses, tplt, color) tmin, tmax = min(tmin, t.min()), max(tmax, t.max()) - # Determine labels + # Get common label and add it as title + common_label = self.getCommonLabel(full_labels.copy(), seps=':@,()') + self.wraptitle(ax, common_label, fs=fs) + + # Get comp info if any if self.comp_ref_key is not None: self.comp_info = model.inputs().get(self.comp_ref_key, None) - comp_values, comp_labels = self.getCompLabels(comp_values) - labels = self.chooseLabels(labels, comp_labels, full_labels) - - # - split labels by space - common_label = self.getCommonLabel(full_labels.copy(), seps=':@,') - self.wraptitle(ax, common_label, fs=fs) # Post-process figure self.postProcess(ax, tplt, yplt, fs, meta, prettify) ax.set_xlim(tmin, tmax) fig.tight_layout() + # Materialize inset if any if inset is not None: self.materializeInset(ax, inset_ax, inset) # Add labels or colorbar legend if cmap is not None: if not self.is_unique_comp: raise ValueError('Colormap mode unavailable for multiple differing parameters') if self.comp_info is None: raise ValueError('Colormap mode unavailable for qualitative comparisons') self.addCmap( fig, cmap, handles, comp_values, self.comp_info, fs, prettify, zscale=cscale) else: + comp_values, comp_labels = self.getCompLabels(comp_values) + labels = self.chooseLabels(labels, comp_labels, full_labels) self.addLegend(fig, ax, handles, labels, fs) # Add window title based on common pattern common_fcode = self.getCommonLabel(fcodes.copy()) fig.canvas.set_window_title(common_fcode) return fig class GroupedTimeSeries(TimeSeriesPlot): ''' Interface to build a plot displaying profiles of several output variables arranged into specific schemes. ''' def __init__(self, outputs, pltscheme=None): ''' Constructor. :param outputs: list / generator of simulation outputs. :param varname: name of variable to extract and compare ''' super().__init__(outputs) self.pltscheme = pltscheme @staticmethod def createBackBone(pltscheme): naxes = len(pltscheme) if naxes == 1: fig, ax = plt.subplots(figsize=(11, 4)) axes = [ax] else: fig, axes = plt.subplots(naxes, 1, figsize=(11, min(3 * naxes, 9))) return fig, axes @classmethod def postProcess(cls, axes, tplt, fs, meta, prettify): for ax in axes: cls.removeSpines(ax) if prettify: cls.prettify(ax, xticks=(0, meta['pp'].tstim * tplt['factor']), yfmt=None) cls.setTickLabelsFontSize(ax, fs) for ax in axes[:-1]: ax.get_shared_x_axes().join(ax, axes[-1]) ax.set_xticklabels([]) cls.setTimeLabel(axes[-1], tplt, fs) def render(self, fs=10, lw=2, labels=None, colors=None, lines=None, patches='one', save=False, outputdir=None, fig_ext='png', frequency=1, spikes='none', trange=None, prettify=False): ''' Render plot. :param fs: labels fontsize :param lw: linewidth :param labels: list of labels to use in the legend :param colors: list of colors to use for each curve :param lines: list of linestyles :param patches: boolean indicating whether to mark stimulation periods with rectangular patches :param save: boolean indicating whether or not to save the figure(s) :param outputdir: path to output directory in which to save figure(s) :param fig_ext: string indcating figure extension ("png", "pdf", ...) :param frequency: frequency at which to plot samples :param spikes: string indicating how to show spikes ("none", "marks" or "details") :param trange: optional lower and upper bounds to time axis :return: figure handle(s) ''' if colors is None: colors = plt.get_cmap('tab10').colors figs = [] for output in self.outputs: # Load data and extract model try: data, meta = self.getData(output, frequency, trange) except ValueError: continue model = self.getModel(meta) # Extract time and stim pulses t = data['t'].values stimstate = self.getStimStates(data) pulses = self.getStimPulses(t, stimstate) tplt = self.getTimePltVar(model.tscale) t = self.prepareTime(t, tplt) # Check plot scheme if provided, otherwise generate it pltvars = model.getPltVars() if self.pltscheme is not None: for key in list(sum(list(self.pltscheme.values()), [])): if key not in pltvars: raise KeyError(f'Unknown plot variable: "{key}"') pltscheme = self.pltscheme else: pltscheme = model.pltScheme # Create figure fig, axes = self.createBackBone(pltscheme) # Loop through each subgraph for ax, (grouplabel, keys) in zip(axes, pltscheme.items()): ax_legend_spikes = False # Extract variables to plot nvars = len(keys) ax_pltvars = [pltvars[k] for k in keys] if nvars == 1: ax_pltvars[0]['color'] = 'k' ax_pltvars[0]['ls'] = '-' # Plot time series icolor = 0 for yplt, name in zip(ax_pltvars, pltscheme[grouplabel]): color = yplt.get('color', colors[icolor]) y = extractPltVar(model, yplt, data, meta, t.size, name) ax.plot(t, y, yplt.get('ls', '-'), c=color, lw=lw, label='$\\rm {}$'.format(yplt["label"])) if 'color' not in yplt: icolor += 1 # Optional: add spikes if name == 'Qm' and spikes != 'none': ax_legend_spikes = self.materializeSpikes( ax, data, tplt, yplt, color, spikes, add_to_legend=True) # Set y-axis unit and bounds self.setYLabel(ax, ax_pltvars[0].copy(), fs, grouplabel=grouplabel) if 'bounds' in ax_pltvars[0]: ymin, ymax = ax.get_ylim() ax_min = min(ymin, *[ap['bounds'][0] for ap in ax_pltvars]) ax_max = max(ymax, *[ap['bounds'][1] for ap in ax_pltvars]) ax.set_ylim(ax_min, ax_max) # Add legend if nvars > 1 or 'gate' in ax_pltvars[0]['desc'] or ax_legend_spikes: ax.legend(fontsize=fs, loc=7, ncol=nvars // 4 + 1, frameon=False) # Set x-limits and add optional patches for ax in axes: ax.set_xlim(t.min(), t.max()) if patches != 'none': self.addPatches(ax, pulses, tplt) # Post-process figure self.postProcess(axes, tplt, fs, meta, prettify) self.wraptitle(axes[0], self.figtitle(model, meta), fs=fs) fig.tight_layout() fig.canvas.set_window_title(model.filecode(meta)) # Save figure if needed (automatic or checked) if save: filecode = model.filecode(meta) if outputdir is None: raise ValueError('output directory not specified') plt_filename = f'{outputdir}/{filecode}.{fig_ext}' plt.savefig(plt_filename) logger.info(f'Saving figure as "{plt_filename}"') plt.close() figs.append(fig) return figs if __name__ == '__main__': # example of use filepaths = OpenFilesDialog('pkl')[0] comp_plot = CompTimeSeries(filepaths, 'Qm') fig = comp_plot.render( lines=['-', '--'], labels=['60 kPa', '80 kPa'], patches='one', colors=['r', 'g'], xticks=[0, 100], yticks=[-80, +50], inset={'xcoords': [5, 40], 'ycoords': [-35, 45], 'xlims': [57.5, 60.5], 'ylims': [10, 35]} ) scheme_plot = GroupedTimeSeries(filepaths) figs = scheme_plot.render() plt.show() diff --git a/PySONIC/utils.py b/PySONIC/utils.py index 05cee25..0cd872f 100644 --- a/PySONIC/utils.py +++ b/PySONIC/utils.py @@ -1,942 +1,984 @@ # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Email: theo.lemaire@epfl.ch # @Date: 2016-09-19 22:30:46 # @Last Modified by: Theo Lemaire -# @Last Modified time: 2020-04-21 20:38:46 +# @Last Modified time: 2020-04-29 12:13:44 ''' Definition of generic utility functions used in other modules ''' import csv from functools import wraps import operator import time from inspect import signature import os from shutil import get_terminal_size import lockfile import math import pickle import json from tqdm import tqdm import logging import tkinter as tk from tkinter import filedialog import base64 import datetime import numpy as np from scipy.optimize import brentq from scipy import linalg import colorlog from pushbullet import Pushbullet # Package logger my_log_formatter = colorlog.ColoredFormatter( '%(log_color)s %(asctime)s %(message)s', datefmt='%d/%m/%Y %H:%M:%S:', reset=True, log_colors={ 'DEBUG': 'green', 'INFO': 'white', 'WARNING': 'yellow', 'ERROR': 'red', 'CRITICAL': 'red,bg_white', }, style='%') def setHandler(logger, handler): for h in logger.handlers: logger.removeHandler(h) logger.addHandler(handler) return logger def setLogger(name, formatter): handler = colorlog.StreamHandler() handler.setFormatter(formatter) logger = colorlog.getLogger(name) logger.addHandler(handler) return logger class TqdmHandler(logging.StreamHandler): def __init__(self, formatter): logging.StreamHandler.__init__(self) self.setFormatter(formatter) def emit(self, record): msg = self.format(record) tqdm.write(msg) logger = setLogger('PySONIC', my_log_formatter) def fillLine(text, char='-', totlength=None): ''' Surround a text with repetitions of a specific character in order to fill a line to a given total length. :param text: text to be surrounded :param char: surrounding character :param totlength: target number of characters in filled text line :return: filled text line ''' if totlength is None: totlength = get_terminal_size().columns - 1 ndashes = totlength - len(text) - 2 if ndashes < 2: return text else: nside = ndashes // 2 nleft, nright = nside, nside if ndashes % 2 == 1: nright += 1 return f'{char * nleft} {text} {char * nright}' # SI units prefixes si_prefixes = { 'y': 1e-24, # yocto 'z': 1e-21, # zepto 'a': 1e-18, # atto 'f': 1e-15, # femto 'p': 1e-12, # pico 'n': 1e-9, # nano 'u': 1e-6, # micro 'm': 1e-3, # mili '': 1e0, # None 'k': 1e3, # kilo 'M': 1e6, # mega 'G': 1e9, # giga 'T': 1e12, # tera 'P': 1e15, # peta 'E': 1e18, # exa 'Z': 1e21, # zetta 'Y': 1e24, # yotta } +sorted_si_prefixes = sorted(si_prefixes.items(), key=operator.itemgetter(1)) + + +def getSIpair(x, scale='lin'): + ''' Get the correct SI factor and prefix for a floating point number. ''' + if isIterable(x): + # If iterable, get a representative number of the distribution + x = np.asarray(x) + x = x.prod()**(1.0 / x.size) if scale == 'log' else np.mean(x) + if x == 0: + return 1e0, '' + else: + vals = [tmp[1] for tmp in sorted_si_prefixes] + ix = np.searchsorted(vals, np.abs(x)) - 1 + if np.abs(x) == vals[ix + 1]: + ix += 1 + return vals[ix], sorted_si_prefixes[ix][0] + def si_format(x, precision=0, space=' '): ''' Format a float according to the SI unit system, with the appropriate prefix letter. ''' if isinstance(x, float) or isinstance(x, int) or isinstance(x, np.float) or\ isinstance(x, np.int32) or isinstance(x, np.int64): - if x == 0: - factor = 1e0 - prefix = '' - else: - sorted_si_prefixes = sorted(si_prefixes.items(), key=operator.itemgetter(1)) - vals = [tmp[1] for tmp in sorted_si_prefixes] - ix = np.searchsorted(vals, np.abs(x)) - 1 - if np.abs(x) == vals[ix + 1]: - ix += 1 - factor = vals[ix] - prefix = sorted_si_prefixes[ix][0] + factor, prefix = getSIpair(x) return f'{x / factor:.{precision}f}{space}{prefix}' elif isinstance(x, list) or isinstance(x, tuple): return [si_format(item, precision, space) for item in x] elif isinstance(x, np.ndarray) and x.ndim == 1: return [si_format(float(item), precision, space) for item in x] else: - print(type(x)) + raise ValueError(f'cannot si_format {type(x)} objects') def pow10_format(number, precision=2): ''' Format a number in power of 10 notation. ''' sci_string = f'{number:.{precision}e}' value, exponent = sci_string.split("e") value, exponent = float(value), int(exponent) val_str = f'{value} * ' if value != 1. else '' return f'{val_str}10^{{{exponent}}}' def rmse(x1, x2, axis=None): ''' Compute the root mean square error between two 1D arrays ''' return np.sqrt(((x1 - x2) ** 2).mean(axis=axis)) def rsquared(x1, x2): ''' compute the R-squared coefficient between two 1D arrays ''' residuals = x1 - x2 ss_res = np.sum(residuals**2) ss_tot = np.sum((x1 - np.mean(x1))**2) return 1 - (ss_res / ss_tot) def Pressure2Intensity(p, rho=1075.0, c=1515.0): ''' Return the spatial peak, pulse average acoustic intensity (ISPPA) associated with the specified pressure amplitude. :param p: pressure amplitude (Pa) :param rho: medium density (kg/m3) :param c: speed of sound in medium (m/s) :return: spatial peak, pulse average acoustic intensity (W/m2) ''' return p**2 / (2 * rho * c) def Intensity2Pressure(I, rho=1075.0, c=1515.0): ''' Return the pressure amplitude associated with the specified spatial peak, pulse average acoustic intensity (ISPPA). :param I: spatial peak, pulse average acoustic intensity (W/m2) :param rho: medium density (kg/m3) :param c: speed of sound in medium (m/s) :return: pressure amplitude (Pa) ''' return np.sqrt(2 * rho * c * I) def convertPKL2JSON(): for pkl_filepath in OpenFilesDialog('pkl')[0]: logger.info(f'Processing {pkl_filepath} ...') json_filepath = f'{os.path.splitext(pkl_filepath)[0]}.json' with open(pkl_filepath, 'rb') as fpkl, open(json_filepath, 'w') as fjson: data = pickle.load(fpkl) json.dump(data, fjson, ensure_ascii=False, sort_keys=True, indent=4) logger.info('All done!') def OpenFilesDialog(filetype, dirname=''): ''' Open a FileOpenDialogBox to select one or multiple file. The default directory and file type are given. :param dirname: default directory :param filetype: default file type :return: tuple of full paths to the chosen filenames ''' root = tk.Tk() root.withdraw() filenames = filedialog.askopenfilenames( filetypes=[(filetype + " files", '.' + filetype)], initialdir=dirname ) if len(filenames) == 0: raise ValueError('no input file selected') par_dir = os.path.abspath(os.path.join(filenames[0], os.pardir)) return filenames, par_dir def selectDirDialog(title='Select directory'): ''' Open a dialog box to select a directory. :return: full path to selected directory ''' root = tk.Tk() root.withdraw() directory = filedialog.askdirectory(title=title) if directory == '': raise ValueError('no directory selected') return directory def SaveFileDialog(filename, dirname=None, ext=None): ''' Open a dialog box to save file. :param filename: filename :param dirname: initial directory :param ext: default extension :return: full path to the chosen filename ''' root = tk.Tk() root.withdraw() filename_out = filedialog.asksaveasfilename( defaultextension=ext, initialdir=dirname, initialfile=filename) if len(filename_out) == 0: raise ValueError('no output filepath selected') return filename_out def loadData(fpath, frequency=1): ''' Load dataframe and metadata dictionary from pickle file. ''' logger.info('Loading data from "%s"', os.path.basename(fpath)) with open(fpath, 'rb') as fh: frame = pickle.load(fh) df = frame['data'].iloc[::frequency] meta = frame['meta'] return df, meta def rescale(x, lb=None, ub=None, lb_new=0, ub_new=1): ''' Rescale a value to a specific interval by linear transformation. ''' if lb is None: lb = x.min() if ub is None: ub = x.max() xnorm = (x - lb) / (ub - lb) return xnorm * (ub_new - lb_new) + lb_new def expandRange(xmin, xmax, exp_factor=2): if xmin > xmax: raise ValueError('values must be provided in (min, max) order') xptp = xmax - xmin xmid = (xmin + xmax) / 2 xdev = xptp * exp_factor / 2 return (xmid - xdev, xmin + xdev) def isIterable(x): for t in [list, tuple, np.ndarray]: if isinstance(x, t): return True return False def isWithin(name, val, bounds, rel_tol=1e-9): ''' Check if a floating point number is within an interval. If the value falls outside the interval, an error is raised. If the value falls just outside the interval due to rounding errors, the associated interval bound is returned. :param val: float value :param bounds: interval bounds (float tuple) :return: original or corrected value ''' if isIterable(val): return np.array([isWithin(name, v, bounds, rel_tol) for v in val]) if val >= bounds[0] and val <= bounds[1]: return val elif val < bounds[0] and math.isclose(val, bounds[0], rel_tol=rel_tol): logger.warning('Rounding %s value (%s) to interval lower bound (%s)', name, val, bounds[0]) return bounds[0] elif val > bounds[1] and math.isclose(val, bounds[1], rel_tol=rel_tol): logger.warning('Rounding %s value (%s) to interval upper bound (%s)', name, val, bounds[1]) return bounds[1] else: raise ValueError(f'{name} value ({val}) out of [{bounds[0]}, {bounds[1]}] interval') def getDistribution(xmin, xmax, nx, scale='lin'): if scale == 'log': xmin, xmax = np.log10(xmin), np.log10(xmax) return {'lin': np.linspace, 'log': np.logspace}[scale](xmin, xmax, nx) def getDistFromList(xlist): if not isinstance(xlist, list): raise TypeError('Input must be a list') if len(xlist) != 4: raise ValueError('List must contain exactly 4 arguments ([type, min, max, n])') scale = xlist[0] if scale not in ('log', 'lin'): raise ValueError('Unknown distribution type (must be "lin" or "log")') xmin, xmax = [float(x) for x in xlist[1:-1]] if xmin >= xmax: raise ValueError('Specified minimum higher or equal than specified maximum') nx = int(xlist[-1]) if nx < 2: raise ValueError('Specified number must be at least 2') return getDistribution(xmin, xmax, nx, scale=scale) def getIndex(container, value): ''' Return the index of a float / string value in a list / array :param container: list / 1D-array of elements :param value: value to search for :return: index of value (if found) ''' if isinstance(value, float): container = np.array(container) imatches = np.where(np.isclose(container, value, rtol=1e-9, atol=1e-16))[0] if len(imatches) == 0: raise ValueError(f'{value} not found in {container}') return imatches[0] elif isinstance(value, str): return container.index(value) def funcSig(func, args, kwargs): args_repr = [repr(a) for a in args] kwargs_repr = [f"{k}={v!r}" for k, v in kwargs.items()] return f'{func.__name__}({", ".join(args_repr + kwargs_repr)})' def debug(func): ''' Print the function signature and return value. ''' @wraps(func) def wrapper_debug(*args, **kwargs): print(f'Calling {funcSig(func, args, kwargs)}') value = func(*args, **kwargs) print(f"{func.__name__!r} returned {value!r}") return value return wrapper_debug def timer(func): ''' Monitor and return the runtime of the decorated function. ''' @wraps(func) def wrapper(*args, **kwargs): start_time = time.perf_counter() value = func(*args, **kwargs) end_time = time.perf_counter() run_time = end_time - start_time return value, run_time return wrapper def alignWithFuncDef(func, args, kwargs): ''' Align a set of provided positional and keyword arguments with the arguments signature in a specific function definition. :param func: function object :param args: list of provided positional arguments :param kwargs: dictionary of provided keyword arguments :return: 2-tuple with the modified arguments and ''' # Get positional and keyword arguments from function signature sig_params = {k: v for k, v in signature(func).parameters.items()} sig_args = list(filter(lambda x: x.default == x.empty, sig_params.values())) sig_kwargs = {k: v.default for k, v in sig_params.items() if v.default != v.empty} sig_nargs = len(sig_args) kwarg_keys = list(sig_kwargs.keys()) # Restrain provided positional arguments to those that are also positional in signature new_args = args[:sig_nargs] # Construct hybrid keyword arguments dictionary from: # - remaining positional arguments # - provided keyword arguments # - default keyword arguments new_kwargs = sig_kwargs for i, x in enumerate(args[sig_nargs:]): new_kwargs[kwarg_keys[i]] = x for k, v in kwargs.items(): new_kwargs[k] = v return new_args, new_kwargs def alignWithMethodDef(method, args, kwargs): args, kwargs = alignWithFuncDef(method, [None] + list(args), kwargs) return tuple(args[1:]), kwargs def logCache(fpath, delimiter='\t', out_type=float): ''' Add an extra IO memoization functionality to a function using file caching, to avoid repetitions of tedious computations with identical inputs. ''' def wrapper_with_args(func): @wraps(func) def wrapper(*args, **kwargs): # If function has history -> do not log if 'history' in kwargs: return func(*args, **kwargs) # Modify positional and keyword arguments to match function signature, if needed args, kwargs = alignWithFuncDef(func, args, kwargs) # Translate args and kwargs into string signature fsignature = funcSig(func, args, kwargs) # If entry present in log, return corresponding output if os.path.isfile(fpath): with open(fpath, 'r', newline='') as f: reader = csv.reader(f, delimiter=delimiter) for row in reader: if row[0] == fsignature: logger.debug(f'entry found in "{os.path.basename(fpath)}"') return out_type(row[1]) # Otherwise, compute output and log it into file before returning out = func(*args, **kwargs) lock = lockfile.FileLock(fpath) lock.acquire() with open(fpath, 'a', newline='') as csvfile: writer = csv.writer(csvfile, delimiter=delimiter) writer.writerow([fsignature, str(out)]) lock.release() return out return wrapper return wrapper_with_args def fileCache(root, fcode_func, ext='json'): def wrapper_with_args(func): @wraps(func) def wrapper(*args, **kwargs): # Get load and dump functions from file extension try: load_func = { 'json': json.load, 'pkl': pickle.load, 'csv': lambda f: np.loadtxt(f, delimiter=',') }[ext] dump_func = { 'json': json.dump, 'pkl': pickle.dump, 'csv': lambda x, f: np.savetxt(f, x, delimiter=',') }[ext] except KeyError: raise ValueError('Unknown file extension') # Get read and write mode (text or binary) from file extension mode = 'b' if ext == 'pkl' else '' # Get file path from root and function arguments, using fcode function if callable(fcode_func): fcode = fcode_func(*args) else: fcode = fcode_func fpath = os.path.join(os.path.abspath(root), f'{fcode}.{ext}') # If file exists, load output from it if os.path.isfile(fpath): logger.info(f'loading data from "{fpath}"') with open(fpath, 'r' + mode) as f: out = load_func(f) # Otherwise, execute function and create the file to dump the output else: logger.warning(f'reference data file not found: "{fpath}"') out = func(*args, **kwargs) logger.info(f'dumping data in "{fpath}"') lock = lockfile.FileLock(fpath) lock.acquire() with open(fpath, 'w' + mode) as f: dump_func(out, f) lock.release() return out return wrapper return wrapper_with_args def derivative(f, x, eps, method='central'): ''' Compute the difference formula for f'(x) with perturbation size eps. :param dfunc: derivatives function, taking an array of states and returning an array of derivatives :param x: states vector :param method: difference formula: 'forward', 'backward' or 'central' :param eps: perturbation vector (same size as states vector) :return: numerical approximation of the derivative around the fixed point ''' if isIterable(x): if not isIterable(eps) or len(eps) != len(x): raise ValueError('eps must be the same size as x') elif np.sum(eps != 0.) != 1: raise ValueError('eps must be zero-valued across all but one dimensions') eps_val = np.sum(eps) else: eps_val = eps if method == 'central': df = (f(x + eps) - f(x - eps)) / 2 elif method == 'forward': df = f(x + eps) - f(x) elif method == 'backward': df = f(x) - f(x - eps) else: raise ValueError("Method must be 'central', 'forward' or 'backward'.") return df / eps_val def jacobian(dfunc, x, rel_eps=None, abs_eps=None, method='central'): ''' Evaluate the Jacobian maatrix of a (time-invariant) system, given a states vector and derivatives function. :param dfunc: derivatives function, taking an array of n states and returning an array of n derivatives :param x: n-states vector :return: n-by-n square Jacobian matrix ''' if sum(e is not None for e in [abs_eps, rel_eps]) != 1: raise ValueError('one (and only one) of "rel_eps" or "abs_eps" parameters must be provided') # Determine vector size x = np.asarray(x) n = x.size # Initialize Jacobian matrix J = np.empty((n, n)) # Create epsilon vector if rel_eps is not None: mode = 'relative' eps_vec = rel_eps else: mode = 'absolute' eps_vec = abs_eps if not isIterable(eps_vec): eps_vec = np.array([eps_vec] * n) if mode == 'relative': eps = x * eps_vec else: eps = eps_vec # Perturb each state by epsilon on both sides, re-evaluate derivatives # and assemble Jacobian matrix ei = np.zeros(n) for i in range(n): ei[i] = 1 J[:, i] = derivative(dfunc, x, eps * ei, method=method) ei[i] = 0 return J def classifyFixedPoint(x, dfunc): ''' Characterize the stability of a fixed point by numerically evaluating its Jacobian matrix and evaluating the sign of the real part of its associated eigenvalues. :param x: n-states vector :param dfunc: derivatives function, taking an array of n states and returning an array of n derivatives ''' # Compute Jacobian numerically # print(f'x = {x}, dfunx(x) = {dfunc(x)}') eps_machine = np.sqrt(np.finfo(float).eps) J = jacobian(dfunc, x, rel_eps=eps_machine, method='forward') # Compute eigenvalues and eigenvectors eigvals, eigvecs = linalg.eig(J) logger.debug(f"eigenvalues = {[f'({x.real:.2e} + {x.imag:.2e}j)' for x in eigvals]}") # Determine fixed point stability based on eigenvalues is_neg_eigvals = eigvals.real < 0 if is_neg_eigvals.all(): # all real parts negative -> stable key = 'stable' elif is_neg_eigvals.any(): # both posivie and negative real parts -> saddle key = 'saddle' else: # all real parts positive -> unstable key = 'unstable' return eigvals, key def findModifiedEq(x0, dfunc, *args): ''' Find an equilibrium variable in a modified system by searching for its derivative root within an interval around its original equilibrium. :param x0: equilibrium value in original system. :param func: derivative function, taking the variable as first parameter. :param *args: remaining arguments needed for the derivative function. :return: variable equilibrium value in modified system. ''' is_iterable = [isIterable(arg) for arg in args] if any(is_iterable): if not all(is_iterable): raise ValueError('mix of iterables and non-iterables') lengths = [len(arg) for arg in args] if not all(n == lengths[0] for n in lengths): raise ValueError(f'inputs are not of the same size: {lengths}') n = lengths[0] res = [] for i in range(n): x = [arg[i] for arg in args] res.append(findModifiedEq(x0, dfunc, *x)) return np.array(res) else: return brentq(lambda x: dfunc(x, *args), x0 * 1e-4, x0 * 1e3, xtol=1e-16) def swapFirstLetterCase(s): if s[0].islower(): return s.capitalize() else: return s[0].lower() + s[1:] def getPow10(x, direction='up'): ''' Get the power of 10 that is closest to a number, in either direction("down" or "up"). ''' round_method = {'up': np.ceil, 'down': np.floor}[direction] return np.power(10, round_method(np.log10(x))) def rotAroundPoint2D(x, theta, p): ''' Rotate a 2D vector around a center point by a given angle. :param x: 2D coordinates vector :param theta: rotation angle (in rad) :param p: 2D center point coordinates :return: 2D rotated coordinates vector ''' n1, n2 = x.shape if n1 != 2: if n2 == 2: x = x.T else: raise ValueError('x should be a 2-by-n vector') # Rotation matrix R = np.array([ [np.cos(theta), -np.sin(theta)], [np.sin(theta), np.cos(theta)], ]) # Broadcast center point to input vector ptile = np.tile(p, (x.shape[1], 1)).T # Subtract, rotate and add return R.dot(x - ptile) + ptile def getKey(keyfile='pushbullet.key'): dir_path = os.path.dirname(os.path.realpath(__file__)) package_root = os.path.abspath(os.path.join(dir_path, os.pardir)) fpath = os.path.join(package_root, keyfile) if not os.path.isfile(fpath): raise FileNotFoundError('pushbullet API key file not found') with open(fpath) as f: encoded_key = f.readlines()[0] return base64.b64decode(str.encode(encoded_key)).decode() def sendPushNotification(msg): try: key = getKey() pb = Pushbullet(key) dt = datetime.datetime.now() s = dt.strftime('%Y-%m-%d %H:%M:%S') pb.push_note('Code Messenger', f'{s}\n{msg}') except FileNotFoundError: logger.error(f'Could not send push notification: "{msg}"') def alert(func): ''' Run a function, and send a push notification upon completion, or if an error is raised during its execution. ''' @wraps(func) def wrapper(*args, **kwargs): try: out = func(*args, **kwargs) sendPushNotification(f'completed "{func.__name__}" execution successfully') return out except BaseException as e: sendPushNotification(f'error during "{func.__name__}" execution: {e}') raise e return wrapper def sunflower(n, radius=1, alpha=1): ''' Generate a population of uniformly distributed 2D data points in a unit circle. :param n: number of data points :param alpha: coefficient determining evenness of the boundary :return: 2D matrix of Cartesian (x, y) positions ''' nbounds = np.round(alpha * np.sqrt(n)) # number of boundary points phi = (np.sqrt(5) + 1) / 2 # golden ratio k = np.arange(1, n + 1) # index vector theta = 2 * np.pi * k / phi**2 # angle vector r = np.sqrt((k - 1) / (n - nbounds - 1)) # radius vector r[r > 1] = 1 x = r * np.cos(theta) y = r * np.sin(theta) return radius * np.vstack((x, y)) def filecode(model, *args): ''' Generate file code given a specific combination of model input parameters. ''' # If meta dictionary was passed, generate inputs list from it if len(args) == 1 and isinstance(args[0], dict): meta = args[0].copy() if meta['simkey'] == 'ASTIM' and 'fs' not in meta: meta['fs'] = meta['model']['fs'] meta['method'] = meta['model']['method'] meta['qss_vars'] = None for k in ['simkey', 'model', 'tcomp', 'dt', 'atol']: if k in meta: del meta[k] args = list(meta.values()) # Otherwise, transform args tuple into list else: args = list(args) # If any argument is an iterable -> transform it to a continous string for i in range(len(args)): if isIterable(args[i]): args[i] = ''.join([str(x) for x in args[i]]) # Create file code by joining string-encoded inputs with underscores codes = model.filecodes(*args).values() return '_'.join([x for x in codes if x is not None]) def simAndSave(model, *args, **kwargs): ''' Simulate the model and save the results in a specific output directory. :param *args: list of arguments provided to the simulation function :param **kwargs: optional arguments dictionary :return: output filepath ''' # Extract output directory and overwrite boolean from keyword arguments. outputdir = kwargs.pop('outputdir', '.') overwrite = kwargs.pop('overwrite', True) # Set data and meta to None data, meta = None, None # Extract drive object from args drive, *other_args = args # If drive is searchable and not fully resolved if drive.is_searchable: if not drive.is_resolved: # Call simulate to perform titration out = model.simulate(*args) # If titration yields nothing -> no file produced -> return None if out is None: logger.warning('returning None') return None # Store data and meta data, meta = out # Update args list with resovled drive try: args = (meta['drive'], *other_args) except KeyError: args = (meta['source'], *other_args) # Check if a output file corresponding to sim inputs is found in the output directory # That check is performed prior to running the simulation, such that # it is not run if the file is present and overwrite is set ot false. fname = f'{model.filecode(*args)}.pkl' fpath = os.path.join(outputdir, fname) existing_file_msg = f'File "{fname}" already present in directory "{outputdir}"' existing_file = os.path.isfile(fpath) # If file exists and overwrite is set ot false -> return if existing_file and not overwrite: logger.warning(f'{existing_file_msg} -> preserving') return fpath # Run simulation if not already done (for titration cases) if data is None: data, meta = model.simulate(*args) # Raise warning if an existing file is overwritten if existing_file: logger.warning(f'{existing_file_msg} -> overwriting') # Save output file and return output filepath with open(fpath, 'wb') as fh: pickle.dump({'meta': meta, 'data': data}, fh) logger.debug('simulation data exported to "%s"', fpath) return fpath def moveItem(l, value, itarget): ''' Move a list item to a specific target index. :param l: list object :param value: value of the item to move :param itarget: target index :return: re-ordered list. ''' # Get absolute target index if itarget < 0: itarget += len(l) assert itarget < len(l), f'target index {itarget} exceeds list size ({len(l)})' # Get index corresponding to element and delete entry from list iref = l.index(value) new_l = l.copy() del new_l[iref] # Return re-organized list return new_l[:itarget] + [value] + new_l[itarget:] def gaussian(x, mu=0., sigma=1., A=1.): return A * np.exp(-((x - mu) / sigma)**2) def isPickable(obj): try: pickle.dumps(obj) except Exception: return False return True def resolveFuncArgs(func, *args, **kwargs): ''' Return a dictionary of positional and keyword arguments upon function call, adding defaults from simfunc signature if not provided at call time. ''' bound_args = signature(func).bind(*args, **kwargs) bound_args.apply_defaults() return dict(bound_args.arguments) def getMeta(model, simfunc, *args, **kwargs): ''' Construct an informative dictionary about the model and simulation parameters. ''' # Retrieve function call arguments args_dict = resolveFuncArgs(simfunc, model, *args, **kwargs) # Construct meta dictionary meta = {'simkey': model.simkey} for k, v in args_dict.items(): if k == 'self': meta['model'] = v.meta else: meta[k] = v return meta def bounds(arr): ''' Return the bounds or a numpy array / list. ''' return (np.nanmin(arr), np.nanmax(arr)) def addColumn(df, key, arr, preceding_key=None): ''' Add a new column to a dataframe, right after a specific column. ''' df[key] = arr if preceding_key is not None: cols = df.columns.tolist()[:-1] preceding_index = cols.index(preceding_key) df = df[cols[:preceding_index + 1] + [key] + cols[preceding_index + 1:]] return df def integerSuffix(n): return 'th' if 4 <= n % 100 <= 20 else {1: 'st', 2: 'nd', 3: 'rd'}.get(n % 10, 'th') def customStrftime(fmt, dt_obj): return dt_obj.strftime(fmt).replace('{S}', str(dt_obj.day) + integerSuffix(dt_obj.day)) def friendlyLogspace(xmin, xmax, bases=None): ''' Define a "friendly" logspace between two bounds. ''' if bases is None: bases = [1, 2, 5] bases = np.asarray(bases) bounds = np.array([xmin, xmax]) logbounds = np.log10(bounds) bounds_orders = np.floor(logbounds) orders = np.arange(bounds_orders[0], bounds_orders[1] + 1) factors = np.power(10., np.floor(orders)) seq = np.hstack([bases * f for f in factors]) if xmax > seq.max(): seq = np.hstack((seq, xmax)) seq = seq[np.logical_and(seq >= xmin, seq <= xmax)] if xmin not in seq: seq = np.hstack((xmin, seq)) if xmax not in seq: seq = np.hstack((seq, xmax)) return seq + + +def differing(d1, d2, subdkey=None, diff=None): + ''' Find differences in values across two dictionaries (recursively). + + :param d1: first dictionary + :param d2: second dictionary + :param subdkey: specific sub-dictionary attribute key for objects + :param diff: existing diff list to append to + :return: list of (key, value1, value2) tuples for each differing values + ''' + # Initilize diff list + if diff is None: + diff = [] + # Check that the two dicts have the same structure + if sorted(list(d1.keys())) != sorted(list(d2.keys())): + raise ValueError('inconsistent inputs') + # For each key - values triplet + for k in d1.keys(): + # If values are dicts themselves, loop recursively through them + if isinstance(d1[k], dict): + diff = differing(d1[k], d2[k], subdkey=subdkey, diff=diff) + # If values are objects with a specific sub-dictionary attribute, + # loop recursively through them + elif hasattr(d1[k], subdkey): + diff = differing(getattr(d1[k], subdkey), getattr(d2[k], subdkey), + subdkey=subdkey, diff=diff) + # Otherwise + else: + # If values differ, add the key - values triplet to the diff list + if d1[k] != d2[k]: + diff.append((k, d1[k], d2[k])) + # Return the diff list + return diff