diff --git a/PySONIC/core/bls.py b/PySONIC/core/bls.py index dd26bf2..3084f5f 100644 --- a/PySONIC/core/bls.py +++ b/PySONIC/core/bls.py @@ -1,869 +1,904 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Date: 2016-09-29 16:16:19 # @Email: theo.lemaire@epfl.ch # @Last Modified by: Theo Lemaire -# @Last Modified time: 2019-02-28 17:18:31 +# @Last Modified time: 2019-03-14 18:47:45 from enum import Enum import time import os import json import inspect import pickle import numpy as np import pandas as pd import scipy.integrate as integrate from scipy.optimize import brentq, curve_fit from ..utils import logger, rmse, si_format, MECH_filecode from ..constants import * from ..batches import xlslog class PmCompMethod(Enum): ''' Enum: types of computation method for the intermolecular pressure ''' direct = 1 predict = 2 def LennardJones(x, beta, alpha, C, m, n): ''' Generic expression of a Lennard-Jones function, adapted for the context of symmetric deflection (distance = 2x). :param x: deflection (i.e. half-distance) :param beta: x-shifting factor :param alpha: x-scaling factor :param C: y-scaling factor :param m: exponent of the repulsion term :param n: exponent of the attraction term :return: Lennard-Jones potential at given distance (2x) ''' return C * (np.power((alpha / (2 * x + beta)), m) - np.power((alpha / (2 * x + beta)), n)) class BilayerSonophore: ''' This class contains the geometric and mechanical parameters of the Bilayer Sonophore Model, as well as all the core functions needed to compute the dynamics (kinetics and kinematics) of the bilayer membrane cavitation, and run dynamic BLS simulations. ''' # BIOMECHANICAL PARAMETERS T = 309.15 # Temperature (K) delta0 = 2.0e-9 # Thickness of the leaflet (m) Delta_ = 1.4e-9 # Initial gap between the two leaflets on a non-charged membrane at equil. (m) pDelta = 1.0e5 # Attraction/repulsion pressure coefficient (Pa) m = 5.0 # Exponent in the repulsion term (dimensionless) n = 3.3 # Exponent in the attraction term (dimensionless) rhoL = 1075.0 # Density of the surrounding fluid (kg/m^3) muL = 7.0e-4 # Dynamic viscosity of the surrounding fluid (Pa.s) muS = 0.035 # Dynamic viscosity of the leaflet (Pa.s) kA = 0.24 # Area compression modulus of the leaflet (N/m) alpha = 7.56 # Tissue shear loss modulus frequency coefficient (Pa.s) C0 = 0.62 # Initial gas molar concentration in the surrounding fluid (mol/m^3) kH = 1.613e5 # Henry's constant (Pa.m^3/mol) P0 = 1.0e5 # Static pressure in the surrounding fluid (Pa) Dgl = 3.68e-9 # Diffusion coefficient of gas in the fluid (m^2/s) xi = 0.5e-9 # Boundary layer thickness for gas transport across leaflet (m) c = 1515.0 # Speed of sound in medium (m/s) # BIOPHYSICAL PARAMETERS epsilon0 = 8.854e-12 # Vacuum permittivity (F/m) epsilonR = 1.0 # Relative permittivity of intramembrane cavity (dimensionless) + tscale = 'us' # relevant temporal scale of the model + def __init__(self, a, Cm0, Qm0, Fdrive=None, embedding_depth=0.0): ''' Constructor of the class. :param a: in-plane radius of the sonophore structure within the membrane (m) :param Cm0: membrane resting capacitance (F/m2) :param Qm0: membrane resting charge density (C/m2) :param Fdrive: frequency of acoustic perturbation (Hz) :param embedding_depth: depth of the embedding tissue around the membrane (m) ''' # Extract resting constants and geometry self.Cm0 = Cm0 self.Qm0 = Qm0 self.a = a self.d = embedding_depth self.S0 = np.pi * self.a**2 # Derive frequency-dependent tissue elastic modulus if Fdrive is not None: G_tissue = self.alpha * Fdrive # G'' (Pa) self.kA_tissue = 2 * G_tissue * self.d # kA of the tissue layer (N/m) else: self.kA_tissue = 0. # Check existence of lookups for derived parameters lookups = self.getLookups() akey = '{:.1f}'.format(a * 1e9) Qkey = '{:.2f}'.format(Qm0 * 1e5) # If no lookup, compute parameters and store them in lookup if akey not in lookups or Qkey not in lookups[akey]: + # Find Delta that cancels out Pm + Pec at Z = 0 (m) if self.Qm0 == 0.0: D_eq = self.Delta_ else: (D_eq, Pnet_eq) = self.findDeltaEq(self.Qm0) assert Pnet_eq < PNET_EQ_MAX, 'High Pnet at Z = 0 with ∆ = %.2f nm' % (D_eq * 1e9) self.Delta = D_eq # Find optimal Lennard-Jones parameters to approximate PMavg (LJ_approx, std_err, _) = self.LJfitPMavg() assert std_err < PMAVG_STD_ERR_MAX, 'High error in PmAvg nonlinear fit:'\ ' std_err = %.2f Pa' % std_err self.LJ_approx = LJ_approx if akey not in lookups: lookups[akey] = {Qkey: {'LJ_approx': LJ_approx, 'Delta_eq': D_eq}} else: lookups[akey][Qkey] = {'LJ_approx': LJ_approx, 'Delta_eq': D_eq} logger.debug('Saving BLS derived parameters to lookup file') self.saveLookups(lookups) # If lookup exists, load parameters from it else: logger.debug('Loading BLS derived parameters from lookup file') self.LJ_approx = lookups[akey][Qkey]['LJ_approx'] self.Delta = lookups[akey][Qkey]['Delta_eq'] # Compute initial volume and gas content self.V0 = np.pi * self.Delta * self.a**2 self.ng0 = self.gasPa2mol(self.P0, self.V0) def __repr__(self): return 'BilayerSonophore({}m, {}F/cm2, {}C/cm2, embedding_depth={}m'.format( si_format([self.a, self.Cm0 * 1e-4, self.Qm0 * 1e-4, self.embedding_depth], precision=1, space=' ')) def pprint(self): return '{}m radius BilayerSonophore'.format( si_format(self.a, precision=0, space=' ')) def getLookupsPath(self): return os.path.join(os.path.split(__file__)[0], 'bls_lookups.json') def getLookups(self): try: with open(self.getLookupsPath()) as fh: sample = json.load(fh) return sample except FileNotFoundError: return {} def saveLookups(self, lookups): with open(self.getLookupsPath(), 'w') as fh: json.dump(lookups, fh, indent=2) def pparams(self): s = '-------- Bilayer Sonophore --------\n' s += 'class attributes:\n' class_attrs = inspect.getmembers(self.__class__, lambda a: not(inspect.isroutine(a))) class_attrs = [a for a in class_attrs if not(a[0].startswith('__') and a[0].endswith('__'))] for ca in class_attrs: s += '{} = {}\n'.format(ca[0], ca[1]) s += 'instance attributes:\n' inst_attrs = inspect.getmembers(self, lambda a: not(inspect.isroutine(a))) inst_attrs = [a for a in inst_attrs if not(a[0].startswith('__') and a[0].endswith('__')) and a not in class_attrs] for ia in inst_attrs: s += '{} = {}\n'.format(ia[0], ia[1]) return s def reinit(self): - logger.debug('Re-initializing BLS object') # Find Delta that cancels out Pm + Pec at Z = 0 (m) if self.Qm0 == 0.0: D_eq = self.Delta_ else: (D_eq, Pnet_eq) = self.findDeltaEq(self.Qm0) assert Pnet_eq < PNET_EQ_MAX, 'High Pnet at Z = 0 with ∆ = %.2f nm' % (D_eq * 1e9) self.Delta = D_eq # Compute initial volume and gas content self.V0 = np.pi * self.Delta * self.a**2 self.ng0 = self.gasPa2mol(self.P0, self.V0) + def getPltScheme(self): + return { + 'P_{AC}': ['Pac'], + 'Z': ['Z'], + 'n_g': ['ng'] + } + + + def getPltVars(self): + ''' Return a dictionary containing information about all plot variables + related to the model (description, label, unit, factor, possible alias). ''' + return { + 'Pac': { + 'desc': 'acoustic pressure', + 'label': 'P_{AC}', + 'unit': 'kPa', + 'factor': 1e-3, + 'alias': 'obj.Pacoustic(t, meta["Adrive"] * states, meta["Fdrive"])' + }, + + 'Z': { + 'desc': 'leaflets deflection', + 'label': 'Z', + 'unit': 'nm', + 'factor': 1e9, + 'bounds': (-1.0, 10.0) + }, + + 'ng': { + 'desc': 'gas content', + 'label': 'n_g', + 'unit': '10^{-22}\ mol', + 'factor': 1e22, + 'bounds': (1.0, 15.0) + }, + + 'Pmavg': { + 'desc': 'average intermolecular pressure', + 'label': 'P_M', + 'unit': 'kPa', + 'factor': 1e-3, + 'alias': 'obj.PMavgpred(df["Z"])' + }, + + 'Telastic': { + 'desc': 'leaflet elastic tension', + 'label': 'T_E', + 'unit': 'mN/m', + 'factor': 1e3, + 'alias': 'obj.TEleaflet(df["Z"])' + }, + + 'Cm': { + 'desc': 'membrane capacitance', + 'label': 'C_m', + 'unit': 'uF/cm^2', + 'factor': 1e2, + 'bounds': (0.0, 1.5), + 'alias': 'obj.v_Capct(df["Z"])' + } + } + + def curvrad(self, Z): - ''' Return the (signed) instantaneous curvature radius of the leaflet. + ''' Leaflet curvature radius + (signed variable) - :param Z: leaflet apex outward deflection value (m) + :param Z: leaflet apex deflection (m) :return: leaflet curvature radius (m) ''' if Z == 0.0: return np.inf else: return (self.a**2 + Z**2) / (2 * Z) def v_curvrad(self, Z): ''' Vectorized curvrad function ''' return np.array(list(map(self.curvrad, Z))) def surface(self, Z): - ''' Return the surface area of the stretched leaflet (spherical cap). + ''' Surface area of the stretched leaflet + (spherical cap formula) - :param Z: leaflet apex outward deflection value (m) - :return: surface of the stretched leaflet (m^2) + :param Z: leaflet apex deflection (m) + :return: stretched leaflet surface (m^2) ''' return np.pi * (self.a**2 + Z**2) def volume(self, Z): - ''' Return the total volume of the inter-leaflet space (cylinder +/- - spherical cap). + ''' Volume of the inter-leaflet space + (cylinder +/- 2 spherical caps) - :param Z: leaflet apex outward deflection value (m) - :return: inner volume of the bilayer sonophore structure (m^3) + :param Z: leaflet apex deflection (m) + :return: bilayer sonophore inner volume (m^3) ''' return np.pi * self.a**2 * self.Delta\ * (1 + (Z / (3 * self.Delta) * (3 + Z**2 / self.a**2))) def arealstrain(self, Z): - ''' Compute the areal strain of the stretched leaflet. + ''' Areal strain of the stretched leaflet epsilon = (S - S0)/S0 = (Z/a)^2 - :param Z: leaflet apex outward deflection value (m) + :param Z: leaflet apex deflection (m) :return: areal strain (dimensionless) ''' return (Z / self.a)**2 def Capct(self, Z): - ''' Compute the membrane capacitance per unit area, - under the assumption of parallel-plate capacitor - with average inter-layer distance. + ''' Membrane capacitance + (parallel-plate capacitor evaluated at average inter-layer distance) - :param Z: leaflet apex outward deflection value (m) + :param Z: leaflet apex deflection (m) :return: capacitance per unit area (F/m2) ''' if Z == 0.0: return self.Cm0 else: return ((self.Cm0 * self.Delta / self.a**2) * (Z + (self.a**2 - Z**2 - Z * self.Delta) / (2 * Z) * np.log((2 * Z + self.Delta) / self.Delta))) def v_Capct(self, Z): ''' Vectorized Capct function ''' return np.array(list(map(self.Capct, Z))) def derCapct(self, Z, U): - ''' Compute the derivative of the membrane capacitance per unit area - with respect to time, under the assumption of parallel-plate capacitor. + ''' Evolution of membrane capacitance - :param Z: leaflet apex outward deflection value (m) - :param U: leaflet apex outward deflection velocity (m/s) - :return: derivative of capacitance per unit area (F/m2.s) + :param Z: leaflet apex deflection (m) + :param U: leaflet apex deflection velocity (m/s) + :return: time derivative of capacitance per unit area (F/m2.s) ''' dCmdZ = ((self.Cm0 * self.Delta / self.a**2) * ((Z**2 + self.a**2) / (Z * (2 * Z + self.Delta)) - ((Z**2 + self.a**2) * np.log((2 * Z + self.Delta) / self.Delta)) / (2 * Z**2))) return dCmdZ * U def localdef(self, r, Z, R): - ''' Compute the (signed) local transverse leaflet deviation at a distance - r from the center of the dome. + ''' Local leaflet deflection at specific radial distance + (signed) :param r: in-plane distance from center of the sonophore (m) - :param Z: leaflet apex outward deflection value (m) + :param Z: leaflet apex deflection (m) :param R: leaflet curvature radius (m) :return: local transverse leaflet deviation (m) ''' if np.abs(Z) == 0.0: return 0.0 else: return np.sign(Z) * (np.sqrt(R**2 - r**2) - np.abs(R) + np.abs(Z)) def Pacoustic(self, t, Adrive, Fdrive, phi=np.pi): - ''' Compute the acoustic pressure at a specific time, given - the amplitude, frequency and phase of the acoustic stimulus. + ''' Time-varying acoustic pressure - :param t: time of interest + :param t: time (s) :param Adrive: acoustic drive amplitude (Pa) :param Fdrive: acoustic drive frequency (Hz) :param phi: acoustic drive phase (rad) ''' return Adrive * np.sin(2 * np.pi * Fdrive * t - phi) def PMlocal(self, r, Z, R): - ''' Compute the local intermolecular pressure. + ''' Local intermolecular pressure :param r: in-plane distance from center of the sonophore (m) - :param Z: leaflet apex outward deflection value (m) + :param Z: leaflet apex deflection (m) :param R: leaflet curvature radius (m) :return: local intermolecular pressure (Pa) ''' z = self.localdef(r, Z, R) relgap = (2 * z + self.Delta) / self.Delta_ return self.pDelta * ((1 / relgap)**self.m - (1 / relgap)**self.n) def PMavg(self, Z, R, S): - ''' Compute the average intermolecular pressure felt across the leaflet - by quadratic integration. + ''' Average intermolecular pressure across the leaflet + (computed by quadratic integration) :param Z: leaflet apex outward deflection value (m) :param R: leaflet curvature radius (m) :param S: surface of the stretched leaflet (m^2) - :return: averaged intermolecular resultant pressure across the leaflet (Pa) + :return: averaged intermolecular resultant pressure (Pa) .. warning:: quadratic integration is computationally expensive. ''' - # Integrate intermolecular force over an infinitely thin ring of radius r from 0 to a fTotal, _ = integrate.quad(lambda r, Z, R: 2 * np.pi * r * self.PMlocal(r, Z, R), 0, self.a, args=(Z, R)) return fTotal / S def v_PMavg(self, Z, R, S): ''' Vectorized PMavg function ''' return np.array(list(map(self.PMavg, Z, R, S))) def LJfitPMavg(self): ''' Determine optimal parameters of a Lennard-Jones expression approximating the average intermolecular pressure. These parameters are obtained by a nonlinear fit of the Lennard-Jones function for a range of deflection values between predetermined Zmin and Zmax. :return: 3-tuple with optimized LJ parameters for PmAvg prediction (Map) and the standard and max errors of the prediction in the fitting range (in Pascals) ''' # Determine lower bound of deflection range: when Pm = Pmmax PMmax = LJFIT_PM_MAX # Pa Zminlb = -0.49 * self.Delta Zminub = 0.0 Zmin = brentq(lambda Z, Pmmax: self.PMavg(Z, self.curvrad(Z), self.surface(Z)) - PMmax, Zminlb, Zminub, args=(PMmax), xtol=1e-16) # Create vectors for geometric variables Zmax = 2 * self.a Z = np.arange(Zmin, Zmax, 1e-11) Pmavg = self.v_PMavg(Z, self.v_curvrad(Z), self.surface(Z)) # Compute optimal nonlinear fit of custom LJ function with initial guess x0_guess = self.delta0 C_guess = 0.1 * self.pDelta nrep_guess = self.m nattr_guess = self.n pguess = (x0_guess, C_guess, nrep_guess, nattr_guess) popt, _ = curve_fit(lambda x, x0, C, nrep, nattr: LennardJones(x, self.Delta, x0, C, nrep, nattr), Z, Pmavg, p0=pguess, maxfev=10000) (x0_opt, C_opt, nrep_opt, nattr_opt) = popt Pmavg_fit = LennardJones(Z, self.Delta, x0_opt, C_opt, nrep_opt, nattr_opt) # Compute prediction error residuals = Pmavg - Pmavg_fit ss_res = np.sum(residuals**2) N = residuals.size std_err = np.sqrt(ss_res / N) max_err = max(np.abs(residuals)) logger.debug('LJ approx: x0 = %.2f nm, C = %.2f kPa, m = %.2f, n = %.2f', x0_opt * 1e9, C_opt * 1e-3, nrep_opt, nattr_opt) LJ_approx = {"x0": x0_opt, "C": C_opt, "nrep": nrep_opt, "nattr": nattr_opt} return (LJ_approx, std_err, max_err) def PMavgpred(self, Z): - ''' Return the predicted intermolecular pressure based on a specific Lennard-Jones - function fitted on the deflection physiological range. + ''' Approximated average intermolecular pressure + (using nonlinearly fitted Lennard-Jones function) - :param Z: leaflet apex outward deflection value (m) + :param Z: leaflet apex deflection (m) :return: predicted average intermolecular pressure (Pa) ''' return LennardJones(Z, self.Delta, self.LJ_approx['x0'], self.LJ_approx['C'], self.LJ_approx['nrep'], self.LJ_approx['nattr']) def Pelec(self, Z, Qm): - ''' Compute the electric equivalent pressure term. + ''' Electrical pressure term - :param Z: leaflet apex outward deflection value (m) + :param Z: leaflet apex deflection (m) :param Qm: membrane charge density (C/m2) - :return: electric equivalent pressure (Pa) + :return: electrical pressure (Pa) ''' relS = self.S0 / self.surface(Z) abs_perm = self.epsilon0 * self.epsilonR # F/m return - relS * Qm**2 / (2 * abs_perm) # Pa def findDeltaEq(self, Qm): ''' Compute the Delta that cancels out the (Pm + Pec) equation at Z = 0 for a given membrane charge density, using the Brent method to refine the pressure root iteratively. :param Qm: membrane charge density (C/m2) :return: equilibrium value (m) and associated pressure (Pa) ''' - f = lambda Delta: (self.pDelta * ( - (self.Delta_ / Delta)**self.m - (self.Delta_ / Delta)**self.n) + self.Pelec(0.0, Qm) - ) - - Delta_lb = 0.1 * self.Delta_ - Delta_ub = 2.0 * self.Delta_ - - Delta_eq = brentq(f, Delta_lb, Delta_ub, xtol=1e-16) + (self.Delta_ / Delta)**self.m - (self.Delta_ / Delta)**self.n) + self.Pelec(0.0, Qm)) + Delta_eq = brentq(f, 0.1 * self.Delta_, 2.0 * self.Delta_, xtol=1e-16) logger.debug('∆eq = %.2f nm', Delta_eq * 1e9) return (Delta_eq, f(Delta_eq)) - def gasflux(self, Z, P): - ''' Compute the gas molar flux through the BLS boundary layer for - an unsteady system. + def gasFlux(self, Z, P): + ''' Gas molar flux through the sonophore boundary layers - :param Z: leaflet apex outward deflection value (m) - :param P: internal gas pressure in the inter-leaflet space (Pa) + :param Z: leaflet apex deflection (m) + :param P: internal gas pressure (Pa) :return: gas molar flux (mol/s) ''' dC = self.C0 - P / self.kH return 2 * self.surface(Z) * self.Dgl * dC / self.xi def gasmol2Pa(self, ng, V): - ''' Compute the gas pressure in the inter-leaflet space for an - unsteady system, from the value of gas molar content. + ''' Internal gas pressure for a given molar content :param ng: internal molar content (mol) - :param V: inner volume of the bilayer sonophore structure (m^3) + :param V: sonophore inner volume (m^3) :return: internal gas pressure (Pa) ''' return ng * Rg * self.T / V def gasPa2mol(self, P, V): - ''' Compute the gas molar content in the inter-leaflet space for - an unsteady system, from the value of internal gas pressure. + ''' Internal gas molar content for a given pressure - :param P: internal gas pressure in the inter-leaflet space (Pa) - :param V: inner volume of the bilayer sonophore structure (m^3) + :param P: internal gas pressure (Pa) + :param V: sonophore inner volume (m^3) :return: internal gas molar content (mol) ''' return P * V / (Rg * self.T) def PtotQS(self, Z, ng, Qm, Pac, Pm_comp_method): - ''' Compute the balance pressure of the quasi-steady system, upon application - of an external perturbation on a charged membrane: - Ptot = Pm + Pg + Pec - P0 - Pac. + ''' Net quasi-steady pressure for a given acoustic pressure + (Ptot = Pm + Pg + Pec - P0 - Pac) - :param Z: leaflet apex outward deflection value (m) + :param Z: leaflet apex deflection (m) :param ng: internal molar content (mol) :param Qm: membrane charge density (C/m2) - :param Pac: external acoustic perturbation (Pa) - :param Pm_comp_method: type of method used to compute average intermolecular pressure + :param Pac: acoustic pressure (Pa) + :param Pm_comp_method: computation method for average intermolecular pressure :return: total balance pressure (Pa) - ''' if Pm_comp_method is PmCompMethod.direct: Pm = self.PMavg(Z, self.curvrad(Z), self.surface(Z)) elif Pm_comp_method is PmCompMethod.predict: Pm = self.PMavgpred(Z) return Pm + self.gasmol2Pa(ng, self.volume(Z)) - self.P0 - Pac + self.Pelec(Z, Qm) def balancedefQS(self, ng, Qm, Pac=0.0, Pm_comp_method=PmCompMethod.predict): - ''' Compute the leaflet deflection upon application of an external - perturbation to a quasi-steady system with a charged membrane. - - This function uses the Brent method (progressive approximation of - function root) to solve the following transcendental equation for Z: - Pm + Pg + Pec - P0 - Pac = 0. + ''' Quasi-steady equilibrium deflection for a given acoustic pressure + (computed by approximating the root of quasi-steady pressure) :param ng: internal molar content (mol) :param Qm: membrane charge density (C/m2) :param Pac: external acoustic perturbation (Pa) - :param Pm_comp_method: type of method used to compute average intermolecular pressure - :return: leaflet deflection (Z) canceling out the balance equation + :param Pm_comp_method: computation method for average intermolecular pressure + :return: leaflet deflection canceling quasi-steady pressure (m) ''' lb = -0.49 * self.Delta ub = self.a Plb = self.PtotQS(lb, ng, Qm, Pac, Pm_comp_method) Pub = self.PtotQS(ub, ng, Qm, Pac, Pm_comp_method) assert (Plb > 0 > Pub), '[%d, %d] is not a sign changing interval for PtotQS' % (lb, ub) return brentq(self.PtotQS, lb, ub, args=(ng, Qm, Pac, Pm_comp_method), xtol=1e-16) def TEleaflet(self, Z): - ''' Compute the circumferential elastic tension felt across the - entire leaflet upon stretching. + ''' Elastic tension in leaflet - :param Z: leaflet apex outward deflection value (m) + :param Z: leaflet apex deflection (m) :return: circumferential elastic tension (N/m) ''' return self.kA * self.arealstrain(Z) def TEtissue(self, Z): - ''' Compute the circumferential elastic tension felt across the - embedding viscoelastic tissue layer upon stretching. + ''' Elastic tension in surrounding viscoelastic layer - :param Z: leaflet apex outward deflection value (m) + :param Z: leaflet apex deflection (m) :return: circumferential elastic tension (N/m) ''' return self.kA_tissue * self.arealstrain(Z) def TEtot(self, Z): - ''' Compute the total circumferential elastic tension (leaflet - and embedding tissue) felt upon stretching. + ''' Total elastic tension (leaflet + surrounding viscoelastic layer) - :param Z: leaflet apex outward deflection value (m) + :param Z: leaflet apex deflection (m) :return: circumferential elastic tension (N/m) ''' return self.TEleaflet(Z) + self.TEtissue(Z) def PEtot(self, Z, R): - ''' Compute the total elastic tension pressure (leaflet + embedding - tissue) felt upon stretching. + ''' Total elastic tension pressure (leaflet + surrounding viscoelastic layer) - :param Z: leaflet apex outward deflection value (m) + :param Z: leaflet apex deflection (m) :param R: leaflet curvature radius (m) :return: elastic tension pressure (Pa) ''' return - self.TEtot(Z) / R def PVleaflet(self, U, R): - ''' Compute the viscous stress felt across the entire leaflet - upon stretching. + ''' Viscous stress pressure in leaflet - :param U: leaflet apex outward deflection velocity (m/s) + :param U: leaflet apex deflection velocity (m/s) :param R: leaflet curvature radius (m) - :return: leaflet viscous stress (Pa) + :return: leaflet viscous stress pressure (Pa) ''' return - 12 * U * self.delta0 * self.muS / R**2 def PVfluid(self, U, R): - ''' Compute the viscous stress felt across the entire fluid - upon stretching. + ''' Viscous stress pressure in surrounding medium - :param U: leaflet apex outward deflection velocity (m/s) + :param U: leaflet apex deflection velocity (m/s) :param R: leaflet curvature radius (m) - :return: fluid viscous stress (Pa) + :return: fluid viscous stress pressure (Pa) ''' return - 4 * U * self.muL / np.abs(R) - def accP(self, Pres, R): - ''' Compute the pressure-driven acceleration of the leaflet in the - unsteady system, upon application of an external perturbation. + def accP(self, Ptot, R): + ''' Leaflet transverse acceleration resulting from pressure imbalance - :param Pres: net resultant pressure (Pa) + :param Ptot: net pressure (Pa) :param R: leaflet curvature radius (m) :return: pressure-driven acceleration (m/s^2) ''' - return Pres / (self.rhoL * np.abs(R)) + return Ptot / (self.rhoL * np.abs(R)) def accNL(self, U, R): - ''' Compute the non-linear term of the leaflet acceleration in the - unsteady system, upon application of an external perturbation. + ''' Leaflet transverse nonlinear acceleration - :param U: leaflet apex outward deflection velocity (m/s) - :param R: leaflet curvature radius (m) - :return: nonlinear acceleration (m/s^2) - - .. note:: A simplified version of nonlinear acceleration (neglecting - dR/dH) is used here. + :param U: leaflet apex deflection velocity (m/s) + :param R: leaflet curvature radius (m) + :return: nonlinear acceleration term (m/s^2) - ''' + .. note:: A simplified version of nonlinear acceleration (neglecting + dR/dH) is used here. + ''' # return - (3/2 - 2*R/H) * U**2 / R return -(3 * U**2) / (2 * R) def derivatives(self, y, t, Adrive, Fdrive, Qm, phi, Pm_comp_method=PmCompMethod.predict): - ''' Compute the derivatives of the 3-ODE mechanical system variables, - with an imposed constant charge density. + ''' Evolution of the mechanical system :param y: vector of HH system variables at time t - :param t: specific instant in time (s) + :param t: time instant (s) :param Adrive: acoustic drive amplitude (Pa) :param Fdrive: acoustic drive frequency (Hz) :param Qm: membrane charge density (F/m2) :param phi: acoustic drive phase (rad) - :param Pm_comp_method: type of method used to compute average intermolecular pressure + :param Pm_comp_method: computation method for average intermolecular pressure :return: vector of mechanical system derivatives at time t ''' # Split input vector explicitly (U, Z, ng) = y # Correct deflection value is below critical compression if Z < -0.5 * self.Delta: logger.warning('Deflection out of range: Z = %.2f nm', Z * 1e9) Z = -0.49 * self.Delta # Compute curvature radius R = self.curvrad(Z) # Compute total pressure Pg = self.gasmol2Pa(ng, self.volume(Z)) if Pm_comp_method is PmCompMethod.direct: Pm = self.PMavg(Z, self.curvrad(Z), self.surface(Z)) elif Pm_comp_method is PmCompMethod.predict: Pm = self.PMavgpred(Z) Ptot = (Pm + Pg - self.P0 - self.Pacoustic(t, Adrive, Fdrive, phi) + self.PEtot(Z, R) + self.PVleaflet(U, R) + self.PVfluid(U, R) + self.Pelec(Z, Qm)) # Compute derivatives dUdt = self.accP(Ptot, R) + self.accNL(U, R) dZdt = U - dngdt = self.gasflux(Z, Pg) + dngdt = self.gasFlux(Z, Pg) # Return derivatives vector return [dUdt, dZdt, dngdt] def checkInputs(self, Fdrive, Adrive, Qm, phi): - ''' Check validity of stimulation parameters. + ''' Check validity of stimulation parameters :param Fdrive: acoustic drive frequency (Hz) :param Adrive: acoustic drive amplitude (Pa) :param phi: acoustic drive phase (rad) :param Qm: imposed membrane charge density (C/m2) ''' if not all(isinstance(param, float) for param in [Fdrive, Adrive, Qm, phi]): raise TypeError('Invalid stimulation parameters (must be float typed)') if Fdrive <= 0: raise ValueError('Invalid US driving frequency: {} kHz (must be strictly positive)' .format(Fdrive * 1e-3)) if Adrive < 0: raise ValueError('Invalid US pressure amplitude: {} kPa (must be positive or null)' .format(Adrive * 1e-3)) if Qm < CHARGE_RANGE[0] or Qm > CHARGE_RANGE[1]: raise ValueError('Invalid applied charge: {} nC/cm2 (must be within [{}, {}] interval' .format(Qm * 1e5, CHARGE_RANGE[0] * 1e5, CHARGE_RANGE[1] * 1e5)) if phi < 0 or phi >= 2 * np.pi: raise ValueError('Invalid US pressure phase: {:.2f} rad (must be within [0, 2 PI[ rad' .format(phi)) def simulate(self, Fdrive, Adrive, Qm, phi=np.pi, Pm_comp_method=PmCompMethod.predict): - ''' Compute short solutions of the mechanical system for specific - US stimulation parameters and with an imposed membrane charge density. + ''' Simulate mechanical system until prediodic stabilization :param Fdrive: acoustic drive frequency (Hz) :param Adrive: acoustic drive amplitude (Pa) :param phi: acoustic drive phase (rad) :param Qm: imposed membrane charge density (C/m2) :param Pm_comp_method: type of method used to compute average intermolecular pressure :return: 3-tuple with the time profile, the solution matrix and a state vector ''' # Check validity of stimulation parameters self.checkInputs(Fdrive, Adrive, Qm, phi) # Determine mechanical system time step Tdrive = 1 / Fdrive dt_mech = Tdrive / NPC_FULL t_mech_cycle = np.linspace(0, Tdrive - dt_mech, NPC_FULL) # Initialize system variables t0 = 0.0 Z0 = 0.0 U0 = 0.0 ng0 = self.ng0 # Solve quasi-steady equation to compute first deflection value Pac1 = self.Pacoustic(t0 + dt_mech, Adrive, Fdrive, phi) Z1 = self.balancedefQS(ng0, Qm, Pac1, Pm_comp_method) U1 = (Z1 - Z0) / dt_mech # Construct arrays to hold system variables states = np.array([1, 1]) t = np.array([t0, t0 + dt_mech]) y = np.array([[U0, U1], [Z0, Z1], [ng0, ng0]]) # Integrate mechanical system for a few acoustic cycles until stabilization j = 0 ng_last = None Z_last = None periodic_conv = False while not periodic_conv and j < NCYCLES_MAX: t_mech = t_mech_cycle + t[-1] + dt_mech y_mech = integrate.odeint(self.derivatives, y[:, -1], t_mech, args=(Adrive, Fdrive, Qm, phi, Pm_comp_method)).T # Compare Z and ng signals over the last 2 acoustic periods if j > 0: Z_rmse = rmse(Z_last, y_mech[1, :]) ng_rmse = rmse(ng_last, y_mech[2, :]) logger.debug('step %u: Z_rmse = %.2e m, ng_rmse = %.2e mol', j, Z_rmse, ng_rmse) if Z_rmse < Z_ERR_MAX and ng_rmse < NG_ERR_MAX: periodic_conv = True # Update last vectors for next comparison Z_last = y_mech[1, :] ng_last = y_mech[2, :] # Concatenate time and solutions to global vectors states = np.concatenate([states, np.ones(NPC_FULL)], axis=0) t = np.concatenate([t, t_mech], axis=0) y = np.concatenate([y, y_mech], axis=1) # Increment loop index j += 1 if j == NCYCLES_MAX: logger.warning('No convergence: stopping after %u cycles', j) else: logger.debug('Periodic convergence after %u cycles', j) states[-1] = 0 # return output variables return (t, y[1:, :], states) def runAndSave(self, outdir, Fdrive, Adrive, Qm): - ''' Run a simulation of the mechanical system with specific stimulation parameters - and an imposed value of charge density, and save the results in a PKL file. + ''' Simulate mechanical system and save results in a PKL file. :param outdir: full path to output directory :param Fdrive: US frequency (Hz) :param Adrive: acoustic pressure amplitude (Pa) :param Qm: applided membrane charge density (C/m2) ''' # Get date and time info date_str = time.strftime("%Y.%m.%d") daytime_str = time.strftime("%H:%M:%S") logger.info('%s: simulation @ f = %sHz, A = %sPa, Q = %sC/cm2', self.pprint(), *si_format([Fdrive, Adrive, Qm * 1e-4], 2, space=' ')) # Run simulation tstart = time.time() (t, y, states) = self.simulate(Fdrive, Adrive, Qm) (Z, ng) = y tcomp = time.time() - tstart logger.debug('completed in %s', si_format(tcomp, 1)) U = np.insert(np.diff(Z) / np.diff(t), 0, 0.0) # Store dataframe and metadata df = pd.DataFrame({ 't': t, 'states': states, 'U': U, 'Z': Z, 'ng': ng }) meta = { 'a': self.a, 'd': self.d, 'Cm0': self.Cm0, 'Qm0': self.Qm0, 'Fdrive': Fdrive, 'Adrive': Adrive, 'phi': np.pi, 'Qm': Qm, 'tcomp': tcomp } # Export into to PKL file simcode = MECH_filecode(self.a, Fdrive, Adrive, Qm) outpath = '{}/{}.pkl'.format(outdir, simcode) with open(outpath, 'wb') as fh: pickle.dump({'meta': meta, 'data': df}, fh) logger.debug('simulation data exported to "%s"', outpath) # Compute key output metrics Zmax = np.amax(Z) Zmin = np.amin(Z) Zabs_max = np.amax(np.abs([Zmin, Zmax])) eAmax = self.arealstrain(Zabs_max) Tmax = self.TEtot(Zabs_max) Pmmax = self.PMavgpred(Zmin) ngmax = np.amax(ng) dUdtmax = np.amax(np.abs(np.diff(U) / np.diff(t)**2)) # Export key metrics to log file logpath = os.path.join(outdir, 'log_MECH.xlsx') logentry = { 'Date': date_str, 'Time': daytime_str, 'Radius (nm)': self.a * 1e9, 'Thickness (um)': self.d * 1e6, 'Fdrive (kHz)': Fdrive * 1e-3, 'Adrive (kPa)': Adrive * 1e-3, 'Charge (nC/cm2)': Qm * 1e5, '# samples': t.size, 'Comp. time (s)': round(tcomp, 2), 'kA total (N/m)': self.kA + self.kA_tissue, 'Max Z (nm)': Zmax * 1e9, 'Max eA (-)': eAmax, 'Max Te (mN/m)': Tmax * 1e3, 'Max rel. ng increase (-)': (ngmax - self.ng0) / self.ng0, 'Max Pm (kPa)': Pmmax * 1e-3, 'Max acc. (m/s2)': dUdtmax } if xlslog(logpath, logentry) == 1: logger.debug('log exported to "%s"', logpath) else: logger.error('log export to "%s" aborted', logpath) return outpath def getCycleProfiles(self, Fdrive, Adrive, Qm): - ''' Run a mechanical simulation until periodic stabilization, and compute pressure profiles - over the last acoustic cycle. + ''' Simulate mechanical system and compute pressures over the last acoustic cycle :param Fdrive: acoustic drive frequency (Hz) :param Adrive: acoustic drive amplitude (Pa) :param Qm: imposed membrane charge density (C/m2) :return: dataframe with the time, kinematic and pressure profiles over the last cycle. ''' # Run default simulation and compute relevant profiles logger.info('Running mechanical simulation (a = %sm, f = %sHz, A = %sPa)', si_format(self.a, 1), si_format(Fdrive, 1), si_format(Adrive, 1)) t, y, _ = self.simulate(Fdrive, Adrive, Qm, Pm_comp_method=PmCompMethod.direct) dt = (t[-1] - t[0]) / (t.size - 1) Z, ng = y[:, -NPC_FULL:] t = t[-NPC_FULL:] t -= t[0] logger.info('Computing pressure cyclic profiles') R = self.v_curvrad(Z) U = np.diff(Z) / dt U = np.hstack((U, U[-1])) data = { 't': t, 'Z': Z, 'Cm': self.v_Capct(Z), 'P_M': self.v_PMavg(Z, R, self.surface(Z)), 'P_Q': self.Pelec(Z, Qm), 'P_{VE}': self.PEtot(Z, R) + self.PVleaflet(U, R), 'P_V': self.PVfluid(U, R), 'P_G': self.gasmol2Pa(ng, self.volume(Z)), 'P_0': - np.ones(Z.size) * self.P0 } return pd.DataFrame(data, columns=data.keys()) diff --git a/PySONIC/core/pneuron.py b/PySONIC/core/pneuron.py index db34e4e..964ddb0 100644 --- a/PySONIC/core/pneuron.py +++ b/PySONIC/core/pneuron.py @@ -1,562 +1,584 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Date: 2017-08-03 11:53:04 # @Email: theo.lemaire@epfl.ch # @Last Modified by: Theo Lemaire -# @Last Modified time: 2019-03-13 20:08:49 +# @Last Modified time: 2019-03-14 18:46:36 import os import time import pickle import abc import inspect import re import numpy as np from scipy.integrate import odeint import pandas as pd from ..postpro import findPeaks from ..constants import * from ..utils import si_format, logger, ESTIM_filecode from ..batches import xlslog class PointNeuron(metaclass=abc.ABCMeta): ''' Abstract class defining the common API (i.e. mandatory attributes and methods) of all subclasses implementing the channels mechanisms of specific point neurons. The mandatory attributes are: - **name**: a string defining the name of the mechanism. - **Cm0**: a float defining the membrane resting capacitance (in F/m2) - **Vm0**: a float defining the membrane resting potential (in mV) - **states_names**: a list of strings defining the names of the different state probabilities governing the channels behaviour (i.e. the differential HH variables). - **states0**: a 1D array of floats (NOT integers !!!) defining the initial values of the different state probabilities. - **coeff_names**: a list of strings defining the names of the different coefficients to be used in effective simulations. The mandatory methods are: - **iNet**: compute the net ionic current density (in mA/m2) across the membrane, given a specific membrane potential (in mV) and channel states. - **steadyStates**: compute the channels steady-state values for a specific membrane potential value (in mV). - **derStates**: compute the derivatives of channel states, given a specific membrane potential (in mV) and channel states. This method must return a list of derivatives ordered identically as in the states0 attribute. - **getEffRates**: get the effective rate constants of ion channels to be used in effective simulations. This method must return an array of effective rates ordered identically as in the coeff_names attribute. - **derStatesEff**: compute the effective derivatives of channel states, based on 1-dimensional linear interpolators of "effective" coefficients. This method must return a list of derivatives ordered identically as in the states0 attribute. ''' + tscale = 'ms' # relevant temporal scale of the model + def __repr__(self): return self.__class__.__name__ def pprint(self): return '{} neuron'.format(self.__class__.__name__) @property @abc.abstractmethod def name(self): return 'Should never reach here' @property @abc.abstractmethod def Cm0(self): return 'Should never reach here' @property @abc.abstractmethod def Vm0(self): return 'Should never reach here' @abc.abstractmethod def currents(self, Vm, states): ''' Compute all ionic currents per unit area. :param Vm: membrane potential (mV) :states: state probabilities of the ion channels :return: dictionary of ionic currents per unit area (mA/m2) ''' def iNet(self, Vm, states): ''' Net membrane current :param Vm: membrane potential (mV) :states: state probabilities of the ion channels :return: current per unit area (mA/m2) ''' return sum(self.currents(Vm, states).values()) def currentToConcentrationRate(self, z_ion, depth): ''' Compute the conversion factor from a specific ionic current (in mA/m2) into a variation rate of submembrane ion concentration (in M/s). :param: z_ion: ion valence :param depth: submembrane depth (m) :return: time derivative of submembrane ion concentration (M/s) ''' return 1e-6 / (z_ion * depth * FARADAY) + def getCurrentsNames(self): + return list(self.currents(np.nan, [np.nan] * len(self.states_names)).keys()) + + + def getPltScheme(self): + + pltscheme = { + 'Q_m': ['Qm'], + 'V_m': ['Vm'], + } + + for cname in self.getCurrentsNames(): + if cname != 'iLeak': + key = 'I_{{{}}}\ kin.'.format(cname[1:]) + cargs = inspect.getargspec(getattr(self, cname))[0][1:] + pltscheme[key] = [var for var in cargs if var not in ['Vm', 'Cai']] + pltscheme['I'] = self.getCurrentsNames() + ['iNet'] + + return pltscheme + + def getPltVars(self): ''' Return a dictionary containing information about all plot variables related to the neuron (description, label, unit, factor, possible alias). ''' - all_pltvars = { + pltvars = { 'Qm': { 'desc': 'charge density', 'label': 'Q_m', 'unit': 'nC/cm^2', 'factor': 1e5, - 'min': -100, - 'max': 50 + 'bounds': (-100, 50) }, 'Vm': { 'desc': 'membrane potential', 'label': 'V_m', 'unit': 'mV', - 'factor': 1, + 'y0': self.Vm0 }, 'ELeak': { - 'constant': 'neuron.ELeak', + 'constant': 'obj.ELeak', 'desc': 'non-specific leakage current resting potential', 'label': 'V_{leak}', 'unit': 'mV', - 'factor': 1e0 + 'ls': '--', + 'color': 'k' } } - for cname in self.currents(np.nan, [np.nan] * len(self.states_names)).keys(): + for cname in self.getCurrentsNames(): cfunc = getattr(self, cname) cargs = inspect.getargspec(cfunc)[0][1:] - all_pltvars[cname] = dict( - desc=inspect.getdoc(cfunc).splitlines()[0], - label='I_{{{}}}'.format(cname[1:]), - unit='A/m^2', - factor=1e-3, - alias='neuron.{}({})'.format(cname, ', '.join(['df["{}"]'.format(a) for a in cargs])) - ) + pltvars[cname] = { + 'desc': inspect.getdoc(cfunc).splitlines()[0], + 'label': 'I_{{{}}}'.format(cname[1:]), + 'unit': 'A/m^2', + 'factor': 1e-3, + 'alias': 'obj.{}({})'.format(cname, ', '.join(['df["{}"]'.format(a) for a in cargs])) + } for var in cargs: if var not in ['Vm', 'Cai']: vfunc = getattr(self, 'der{}{}'.format(var[0].upper(), var[1:])) desc = cname + re.sub('^Evolution of', '', inspect.getdoc(vfunc).splitlines()[0]) - all_pltvars[var] = dict( - desc=desc, - label=var, - unit=None, - factor=1, - min=-0.1, - max=1.1 - ) - - all_pltvars['iNet'] = dict( - desc=inspect.getdoc(getattr(self, 'iNet')).splitlines()[0], - label='I_{net}', - unit='A/m^2', - factor=1e-3, - alias='neuron.iNet(df["Vm"], neuron_states)' - ) - - return all_pltvars + pltvars[var] = { + 'desc': desc, + 'label': var, + 'bounds': (-0.1, 1.1) + } + + pltvars['iNet'] = { + 'desc': inspect.getdoc(getattr(self, 'iNet')).splitlines()[0], + 'label': 'I_{net}', + 'unit': 'A/m^2', + 'factor': 1e-3, + 'alias': 'obj.iNet(df["Vm"], df[obj.states_names].values.T)', + 'ls': '--', + 'color': 'k' + } + + return pltvars @abc.abstractmethod def steadyStates(self, Vm): ''' Compute the channels steady-state values for a specific membrane potential value. :param Vm: membrane potential (mV) :return: array of steady-states ''' @abc.abstractmethod def derStates(self, Vm, states): ''' Compute the derivatives of channel states. :param Vm: membrane potential (mV) :states: state probabilities of the ion channels :return: current per unit area (mA/m2) ''' @abc.abstractmethod def getEffRates(self, Vm): ''' Get the effective rate constants of ion channels, averaged along an acoustic cycle, for future use in effective simulations. :param Vm: array of membrane potential values for an acoustic cycle (mV) :return: an array of rate average constants (s-1) ''' @abc.abstractmethod def derStatesEff(self, Qm, states, interp_data): ''' Compute the effective derivatives of channel states, based on 1-dimensional linear interpolation of "effective" coefficients that summarize the system's behaviour over an acoustic cycle. :param Qm: membrane charge density (C/m2) :states: state probabilities of the ion channels :param interp_data: dictionary of 1D vectors of "effective" coefficients over the charge domain, for specific frequency and amplitude values. ''' def Qbounds(self): ''' Determine bounds of membrane charge physiological range for a given neuron. ''' return np.array([np.round(self.Vm0 - 25.0), 50.0]) * self.Cm0 * 1e-3 # C/m2 def getGates(self): ''' Retrieve the names of the neuron's states that match an ion channel gating. ''' gates = [] for x in self.states_names: if 'alpha{}'.format(x.lower()) in self.coeff_names: gates.append(x) return gates def getRates(self, Vm): ''' Compute the ion channels rate constants for a given membrane potential. :param Vm: membrane potential (mV) :return: a dictionary of rate constants and their values at the given potential. ''' rates = {} for x in self.getGates(): x = x.lower() alpha_str, beta_str = ['{}{}'.format(s, x.lower()) for s in ['alpha', 'beta']] inf_str, tau_str = ['{}inf'.format(x.lower()), 'tau{}'.format(x.lower())] if hasattr(self, 'alpha{}'.format(x)): alphax = getattr(self, alpha_str)(Vm) betax = getattr(self, beta_str)(Vm) elif hasattr(self, '{}inf'.format(x)): xinf = getattr(self, inf_str)(Vm) taux = getattr(self, tau_str)(Vm) alphax = xinf / taux betax = 1 / taux - alphax rates[alpha_str] = alphax rates[beta_str] = betax return rates def Vderivatives(self, y, t, Iinj): ''' Compute the derivatives of a V-cast HH system for a specific value of injected current. :param y: vector of HH system variables at time t :param t: time value (s, unused) :param Iinj: injected current (mA/m2) :return: vector of HH system derivatives at time t ''' Vm, *states = y Iionic = self.iNet(Vm, states) # mA/m2 dVmdt = (- Iionic + Iinj) / self.Cm0 # mV/s dstates = self.derStates(Vm, states) return [dVmdt, *dstates] def Qderivatives(self, y, t, Cm=None): ''' Compute the derivatives of the n-ODE HH system variables, based on a value of membrane capacitance. :param y: vector of HH system variables at time t :param t: specific instant in time (s) :param Cm: membrane capacitance (F/m2) :return: vector of HH system derivatives at time t ''' if Cm is None: Cm = self.Cm0 Qm, *states = y Vm = Qm / Cm * 1e3 # mV dQm = - self.iNet(Vm, states) * 1e-3 # A/m2 dstates = self.derStates(Vm, states) return [dQm, *dstates] def checkInputs(self, Astim, tstim, toffset, PRF, DC): ''' Check validity of electrical stimulation parameters. :param Astim: pulse amplitude (mA/m2) :param tstim: pulse duration (s) :param toffset: offset duration (s) :param PRF: pulse repetition frequency (Hz) :param DC: pulse duty cycle (-) ''' # Check validity of stimulation parameters if not all(isinstance(param, float) for param in [Astim, tstim, toffset, DC]): raise TypeError('Invalid stimulation parameters (must be float typed)') if tstim <= 0: raise ValueError('Invalid stimulus duration: {} ms (must be strictly positive)' .format(tstim * 1e3)) if toffset < 0: raise ValueError('Invalid stimulus offset: {} ms (must be positive or null)' .format(toffset * 1e3)) if DC <= 0.0 or DC > 1.0: raise ValueError('Invalid duty cycle: {} (must be within ]0; 1])'.format(DC)) if DC < 1.0: if not isinstance(PRF, float): raise TypeError('Invalid PRF value (must be float typed)') if PRF is None: raise AttributeError('Missing PRF value (must be provided when DC < 1)') if PRF < 1 / tstim: raise ValueError('Invalid PRF: {} Hz (PR interval exceeds stimulus duration)' .format(PRF)) def simulate(self, Astim, tstim, toffset, PRF=None, DC=1.0): ''' Compute solutions of a neuron's HH system for a specific set of electrical stimulation parameters, using a classic integration scheme. :param Astim: pulse amplitude (mA/m2) :param tstim: pulse duration (s) :param toffset: offset duration (s) :param PRF: pulse repetition frequency (Hz) :param DC: pulse duty cycle (-) :return: 3-tuple with the time profile and solution matrix and a state vector ''' # Check validity of stimulation parameters self.checkInputs(Astim, tstim, toffset, PRF, DC) # Determine system time step dt = DT_ESTIM # if CW stimulus: divide integration during stimulus into single interval if DC == 1.0: PRF = 1 / tstim # Compute vector sizes npulses = int(np.round(PRF * tstim)) Tpulse_on = DC / PRF Tpulse_off = (1 - DC) / PRF # For high-PRF pulsed protocols: adapt time step to ensure minimal # number of samples during TON or TOFF dt_warning_msg = 'high-PRF protocol: lowering time step to %.2e s to properly integrate %s' for key, Tpulse in {'TON': Tpulse_on, 'TOFF': Tpulse_off}.items(): if Tpulse > 0 and Tpulse / dt < MIN_SAMPLES_PER_PULSE_INT: dt = Tpulse / MIN_SAMPLES_PER_PULSE_INT logger.warning(dt_warning_msg, dt, key) n_pulse_on = int(np.round(Tpulse_on / dt)) n_pulse_off = int(np.round(Tpulse_off / dt)) # Compute offset size n_off = int(np.round(toffset / dt)) # Set initial conditions y0 = [self.Vm0, *self.states0] nvar = len(y0) # Initialize global arrays t = np.array([0.]) states = np.array([1]) y = np.array([y0]).T # Initialize pulse time and states vectors t_pulse0 = np.linspace(0, Tpulse_on + Tpulse_off, n_pulse_on + n_pulse_off) states_pulse = np.concatenate((np.ones(n_pulse_on), np.zeros(n_pulse_off))) # Loop through all pulse (ON and OFF) intervals for i in range(npulses): # Construct and initialize arrays t_pulse = t_pulse0 + t[-1] y_pulse = np.empty((nvar, n_pulse_on + n_pulse_off)) # Integrate ON system y_pulse[:, :n_pulse_on] = odeint( self.Vderivatives, y[:, -1], t_pulse[:n_pulse_on], args=(Astim,)).T # Integrate OFF system if n_pulse_off > 0: y_pulse[:, n_pulse_on:] = odeint( self.Vderivatives, y_pulse[:, n_pulse_on - 1], t_pulse[n_pulse_on:], args=(0.0,)).T # Append pulse arrays to global arrays states = np.concatenate([states, states_pulse[1:]]) t = np.concatenate([t, t_pulse[1:]]) y = np.concatenate([y, y_pulse[:, 1:]], axis=1) # Integrate offset interval if n_off > 0: t_off = np.linspace(0, toffset, n_off) + t[-1] states_off = np.zeros(n_off) y_off = odeint(self.Vderivatives, y[:, -1], t_off, args=(0.0, )).T # Concatenate offset arrays to global arrays states = np.concatenate([states, states_off[1:]]) t = np.concatenate([t, t_off[1:]]) y = np.concatenate([y, y_off[:, 1:]], axis=1) # Return output variables return (t, y, states) def titrate(self, tstim, toffset, PRF=None, DC=1.0, Arange=(0., 2 * TITRATION_ESTIM_A_MAX)): ''' Use a dichotomic recursive search to determine the threshold amplitude needed to obtain neural excitation for a given duration, PRF and duty cycle. :param tstim: duration of US stimulation (s) :param toffset: duration of the offset (s) :param PRF: pulse repetition frequency (Hz) :param DC: pulse duty cycle (-) :param Arange: search interval for Astim, iteratively refined :return: 5-tuple with the determined threshold, time profile, solution matrix, state vector and response latency ''' Astim = (Arange[0] + Arange[1]) / 2 # Run simulation and detect spikes t0 = time.time() (t, y, states) = self.simulate(Astim, tstim, toffset, PRF, DC) tcomp = time.time() - t0 dt = t[1] - t[0] ipeaks, *_ = findPeaks(y[0, :], SPIKE_MIN_VAMP, int(np.ceil(SPIKE_MIN_DT / dt)), SPIKE_MIN_VPROM) nspikes = ipeaks.size latency = t[ipeaks[0]] if nspikes > 0 else None logger.debug('A = %sA/m2 ---> %s spike%s detected', si_format(Astim * 1e-3, 2, space=' '), nspikes, "s" if nspikes > 1 else "") # If accurate threshold is found, return simulation results if (Arange[1] - Arange[0]) <= TITRATION_ESTIM_DA_MAX and nspikes == 1: return (Astim, t, y, states, latency, tcomp) # Otherwise, refine titration interval and iterate recursively else: if nspikes == 0: # if Astim too close to max then stop if (TITRATION_ESTIM_A_MAX - Astim) <= TITRATION_ESTIM_DA_MAX: return (np.nan, t, y, states, latency, tcomp) Arange = (Astim, Arange[1]) else: Arange = (Arange[0], Astim) return self.titrate(tstim, toffset, PRF, DC, Arange=Arange) def runAndSave(self, outdir, tstim, toffset, PRF=None, DC=1.0, Astim=None): ''' Run a simulation of the point-neuron Hodgkin-Huxley system with specific parameters, and save the results in a PKL file. :param outdir: full path to output directory :param tstim: stimulus duration (s) :param toffset: stimulus offset (s) :param PRF: pulse repetition frequency (Hz) :param DC: stimulus duty cycle (-) :param Astim: stimulus amplitude (mA/m2) ''' # Get date and time info date_str = time.strftime("%Y.%m.%d") daytime_str = time.strftime("%H:%M:%S") if Astim is not None: logger.info('%s: simulation @ A = %sA/m2, t = %ss (%ss offset)%s', self, si_format(Astim * 1e-3, 2, space=' '), *si_format([tstim, toffset], 1, space=' '), (', PRF = {}Hz, DC = {:.2f}%'.format(si_format(PRF, 2, space=' '), DC * 1e2) if DC < 1.0 else '')) # Run simulation tstart = time.time() t, y, states = self.simulate(Astim, tstim, toffset, PRF, DC) Vm, *channels = y tcomp = time.time() - tstart # Detect spikes on Vm signal dt = t[1] - t[0] ipeaks, *_ = findPeaks(Vm, SPIKE_MIN_VAMP, int(np.ceil(SPIKE_MIN_DT / dt)), SPIKE_MIN_VPROM) nspikes = ipeaks.size lat = t[ipeaks[0]] if nspikes > 0 else 'N/A' outstr = '{} spike{} detected'.format(nspikes, 's' if nspikes > 1 else '') else: logger.info('%s: titration @ t = %ss%s', self, si_format(tstim, 1, space=' '), (', PRF = {}Hz, DC = {:.2f}%'.format(si_format(PRF, 2, space=' '), DC * 1e2) if DC < 1.0 else '')) # Run titration Astim, t, y, states, lat, tcomp = self.titrate(tstim, toffset, PRF, DC) Vm, *channels = y nspikes = 1 if Astim is np.nan: outstr = 'no spikes detected within titration interval' nspikes = 0 else: nspikes = 1 outstr = 'Athr = {}A/m2'.format(si_format(Astim * 1e-3, 2, space=' ')) logger.debug('completed in %s, %s', si_format(tcomp, 1), outstr) sr = np.mean(1 / np.diff(t[ipeaks])) if nspikes > 1 else None # Store dataframe and metadata df = pd.DataFrame({ 't': t, 'states': states, 'Vm': Vm, 'Qm': Vm * self.Cm0 * 1e-3 }) for j in range(len(self.states_names)): df[self.states_names[j]] = channels[j] meta = { 'neuron': self.name, 'Astim': Astim, 'tstim': tstim, 'toffset': toffset, 'PRF': PRF, 'DC': DC, 'tcomp': tcomp } # Export into to PKL file simcode = ESTIM_filecode(self.name, Astim, tstim, PRF, DC) outpath = '{}/{}.pkl'.format(outdir, simcode) with open(outpath, 'wb') as fh: pickle.dump({'meta': meta, 'data': df}, fh) logger.debug('simulation data exported to "%s"', outpath) # Export key metrics to log file logpath = os.path.join(outdir, 'log_ESTIM.xlsx') logentry = { 'Date': date_str, 'Time': daytime_str, 'Neuron Type': self.name, 'Astim (mA/m2)': Astim, 'Tstim (ms)': tstim * 1e3, 'PRF (kHz)': PRF * 1e-3 if DC < 1 else 'N/A', 'Duty factor': DC, '# samples': t.size, 'Comp. time (s)': round(tcomp, 2), '# spikes': nspikes, 'Latency (ms)': lat * 1e3 if isinstance(lat, float) else 'N/A', 'Spike rate (sp/ms)': sr * 1e-3 if isinstance(sr, float) else 'N/A' } if xlslog(logpath, logentry) == 1: logger.debug('log exported to "%s"', logpath) else: logger.error('log export to "%s" aborted', self.logpath) return outpath def findRheobaseAmps(self, DCs, Vthr, curr='net'): ''' Find the rheobase amplitudes (i.e. threshold amplitudes of infinite duration that would result in excitation) of a specific neuron for various stimulation duty cycles. :param DCs: duty cycles vector (-) :param Vthr: threshold membrane potential above which the neuron necessarily fires (mV) :return: rheobase amplitudes vector (mA/m2) ''' # Compute the pulse average net (or leakage) current along the amplitude space if curr == 'net': iNet = self.iNet(Vthr, self.steadyStates(Vthr)) elif curr == 'leak': iNet = self.iLeak(Vthr) # Compute rheobase amplitudes return iNet / np.array(DCs) diff --git a/PySONIC/neurons/thalamic.py b/PySONIC/neurons/thalamic.py index 69325f0..b5fe34b 100644 --- a/PySONIC/neurons/thalamic.py +++ b/PySONIC/neurons/thalamic.py @@ -1,696 +1,690 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Date: 2017-07-31 15:20:54 # @Email: theo.lemaire@epfl.ch # @Last Modified by: Theo Lemaire -# @Last Modified time: 2019-03-13 20:02:02 +# @Last Modified time: 2019-03-14 18:48:15 import numpy as np from ..core import PointNeuron from ..constants import Z_Ca from ..utils import vtrap class Thalamic(PointNeuron): ''' Class defining the generic membrane channel dynamics of a thalamic neuron with 4 different current types: - Inward Sodium current - Outward Potassium current - Inward Calcium current - Non-specific leakage current This generic class cannot be used directly as it does not contain any specific parameters. Reference: *Plaksin, M., Kimmel, E., and Shoham, S. (2016). Cell-Type-Selective Effects of Intramembrane Cavitation as a Unifying Theoretical Framework for Ultrasonic Neuromodulation. eNeuro 3.* ''' # Generic biophysical parameters of thalamic cells Cm0 = 1e-2 # Cell membrane resting capacitance (F/m2) Vm0 = 0.0 # Dummy value for membrane potential (mV) ENa = 50.0 # Sodium Nernst potential (mV) EK = -90.0 # Potassium Nernst potential (mV) ECa = 120.0 # Calcium Nernst potential (mV) def __init__(self): self.states_names = ['m', 'h', 'n', 's', 'u'] self.coeff_names = ['alpham', 'betam', 'alphah', 'betah', 'alphan', 'betan', 'alphas', 'betas', 'alphau', 'betau'] self.states0 = np.array([]) def alpham(self, Vm): ''' Voltage-dependent activation rate of m-gate :param Vm: membrane potential (mV) :return: rate (s-1) ''' Vdiff = Vm - self.VT alpha = 0.32 * vtrap(13 - Vdiff, 4) # ms-1 return alpha * 1e3 # s-1 def betam(self, Vm): ''' Voltage-dependent inactivation rate of m-gate :param Vm: membrane potential (mV) :return: rate (s-1) ''' Vdiff = Vm - self.VT beta = 0.28 * vtrap(Vdiff - 40, 5) # ms-1 return beta * 1e3 # s-1 def alphah(self, Vm): ''' Voltage-dependent activation rate of h-gate :param Vm: membrane potential (mV) :return: rate (s-1) ''' Vdiff = Vm - self.VT alpha = (0.128 * np.exp(-(Vdiff - 17) / 18)) # ms-1 return alpha * 1e3 # s-1 def betah(self, Vm): ''' Voltage-dependent inactivation rate of h-gate :param Vm: membrane potential (mV) :return: rate (s-1) ''' Vdiff = Vm - self.VT beta = (4 / (1 + np.exp(-(Vdiff - 40) / 5))) # ms-1 return beta * 1e3 # s-1 def alphan(self, Vm): ''' Voltage-dependent activation rate of n-gate :param Vm: membrane potential (mV) :return: rate (s-1) ''' Vdiff = Vm - self.VT alpha = 0.032 * vtrap(15 - Vdiff, 5) # ms-1 return alpha * 1e3 # s-1 def betan(self, Vm): ''' Voltage-dependent inactivation rate of n-gate :param Vm: membrane potential (mV) :return: rate (s-1) ''' Vdiff = Vm - self.VT beta = (0.5 * np.exp(-(Vdiff - 10) / 40)) # ms-1 return beta * 1e3 # s-1 def derM(self, Vm, m): ''' Evolution of m-gate open-probability :param Vm: membrane potential (mV) :param m: open-probability of m-gate (-) :return: time derivative of m-gate open-probability (s-1) ''' return self.alpham(Vm) * (1 - m) - self.betam(Vm) * m def derH(self, Vm, h): ''' Evolution of h-gate open-probability :param Vm: membrane potential (mV) :param h: open-probability of h-gate (-) :return: time derivative of h-gate open-probability (s-1) ''' return self.alphah(Vm) * (1 - h) - self.betah(Vm) * h def derN(self, Vm, n): ''' Evolution of n-gate open-probability :param Vm: membrane potential (mV) :param n: open-probability of n-gate (-) :return: time derivative of n-gate open-probability (s-1) ''' return self.alphan(Vm) * (1 - n) - self.betan(Vm) * n def derS(self, Vm, s): ''' Evolution of s-gate open-probability :param Vm: membrane potential (mV) :param s: open-probability of s-gate (-) :return: time derivative of s-gate open-probability (s-1) ''' return (self.sinf(Vm) - s) / self.taus(Vm) def derU(self, Vm, u): ''' Evolution of u-gate open-probability :param Vm: membrane potential (mV) :param u: open-probability of u-gate (-) :return: time derivative of u-gate open-probability (s-1) ''' return (self.uinf(Vm) - u) / self.tauu(Vm) def iNa(self, m, h, Vm): ''' Sodium current :param m: open-probability of m-gate (-) :param h: open-probability of h-gate (-) :param Vm: membrane potential (mV) :return: current per unit area (mA/m2) ''' return self.gNabar * m**3 * h * (Vm - self.ENa) def iKd(self, n, Vm): ''' Delayed-rectifier Potassium current :param n: open-probability of n-gate (-) :param Vm: membrane potential (mV) :return: current per unit area (mA/m2) ''' return self.gKbar * n**4 * (Vm - self.EK) def iCaT(self, s, u, Vm): ''' Low-threshold (Ts-type) Calcium current :param s: open-probability of s-gate (-) :param u: open-probability of u-gate (-) :param Vm: membrane potential (mV) :return: current per unit area (mA/m2) ''' return self.gCaTbar * s**2 * u * (Vm - self.ECa) def iLeak(self, Vm): ''' Non-specific leakage current :param Vm: membrane potential (mV) :return: current per unit area (mA/m2) ''' return self.gLeak * (Vm - self.ELeak) def currents(self, Vm, states): ''' Overriding of abstract parent method. ''' m, h, n, s, u = states return { 'iNa': self.iNa(m, h, Vm), 'iKd': self.iKd(n, Vm), 'iCaT': self.iCaT(s, u, Vm), 'iLeak': self.iLeak(Vm) } # mA/m2 def steadyStates(self, Vm): ''' Overriding of abstract parent method. ''' # Solve the equation dx/dt = 0 at Vm for each x-state meq = self.alpham(Vm) / (self.alpham(Vm) + self.betam(Vm)) heq = self.alphah(Vm) / (self.alphah(Vm) + self.betah(Vm)) neq = self.alphan(Vm) / (self.alphan(Vm) + self.betan(Vm)) seq = self.sinf(Vm) ueq = self.uinf(Vm) return np.array([meq, heq, neq, seq, ueq]) def derStates(self, Vm, states): ''' Overriding of abstract parent method. ''' m, h, n, s, u = states dmdt = self.derM(Vm, m) dhdt = self.derH(Vm, h) dndt = self.derN(Vm, n) dsdt = self.derS(Vm, s) dudt = self.derU(Vm, u) return [dmdt, dhdt, dndt, dsdt, dudt] def getEffRates(self, Vm): ''' Overriding of abstract parent method. ''' # Compute average cycle value for rate constants am_avg = np.mean(self.alpham(Vm)) bm_avg = np.mean(self.betam(Vm)) ah_avg = np.mean(self.alphah(Vm)) bh_avg = np.mean(self.betah(Vm)) an_avg = np.mean(self.alphan(Vm)) bn_avg = np.mean(self.betan(Vm)) Ts = self.taus(Vm) as_avg = np.mean(self.sinf(Vm) / Ts) bs_avg = np.mean(1 / Ts) - as_avg Tu = np.array([self.tauu(v) for v in Vm]) au_avg = np.mean(self.uinf(Vm) / Tu) bu_avg = np.mean(1 / Tu) - au_avg # Return array of coefficients return np.array([am_avg, bm_avg, ah_avg, bh_avg, an_avg, bn_avg, as_avg, bs_avg, au_avg, bu_avg]) def derStatesEff(self, Qm, states, interp_data): ''' Overriding of abstract parent method. ''' rates = np.array([np.interp(Qm, interp_data['Q'], interp_data[rn]) for rn in self.coeff_names]) m, h, n, s, u = states dmdt = rates[0] * (1 - m) - rates[1] * m dhdt = rates[2] * (1 - h) - rates[3] * h dndt = rates[4] * (1 - n) - rates[5] * n dsdt = rates[6] * (1 - s) - rates[7] * s dudt = rates[8] * (1 - u) - rates[9] * u return [dmdt, dhdt, dndt, dsdt, dudt] class ThalamicRE(Thalamic): ''' Specific membrane channel dynamics of a thalamic reticular neuron. References: *Destexhe, A., Contreras, D., Steriade, M., Sejnowski, T.J., and Huguenard, J.R. (1996). In vivo, in vitro, and computational analysis of dendritic calcium currents in thalamic reticular neurons. J. Neurosci. 16, 169–185.* *Huguenard, J.R., and Prince, D.A. (1992). A novel T-type current underlies prolonged Ca(2+)-dependent burst firing in GABAergic neurons of rat thalamic reticular nucleus. J. Neurosci. 12, 3804–3817.* ''' # Name of channel mechanism name = 'RE' # Cell-specific biophysical parameters Vm0 = -89.5 # Cell membrane resting potential (mV) gNabar = 2000.0 # Max. conductance of Sodium current (S/m^2) gKbar = 200.0 # Max. conductance of Potassium current (S/m^2) gCaTbar = 30.0 # Max. conductance of low-threshold Calcium current (S/m^2) gLeak = 0.5 # Conductance of non-specific leakage current (S/m^2) ELeak = -90.0 # Non-specific leakage Nernst potential (mV) VT = -67.0 # Spike threshold adjustment parameter (mV) # Default plotting scheme pltvars_scheme = { 'i_{Na}\ kin.': ['m', 'h'], 'i_{Kd}\ kin.': ['n'], 'i_{TS}\ kin.': ['s', 'u'] } def __init__(self): super().__init__() self.states0 = self.steadyStates(self.Vm0) def sinf(self, Vm): ''' Voltage-dependent steady-state opening of s-gate :param Vm: membrane potential (mV) :return: steady-state opening (-) ''' return 1.0 / (1.0 + np.exp(-(Vm + 52.0) / 7.4)) # prob def taus(self, Vm): ''' Voltage-dependent adaptation time for adaptation of s-gate :param Vm: membrane potential (mV) :return: adaptation time (s) ''' return (1 + 0.33 / (np.exp((Vm + 27.0) / 10.0) + np.exp(-(Vm + 102.0) / 15.0))) * 1e-3 # s def uinf(self, Vm): ''' Voltage-dependent steady-state opening of u-gate :param Vm: membrane potential (mV) :return: steady-state opening (-) ''' return 1.0 / (1.0 + np.exp((Vm + 80.0) / 5.0)) # prob def tauu(self, Vm): ''' Voltage-dependent adaptation time for adaptation of u-gate :param Vm: membrane potential (mV) :return: adaptation time (s) ''' return (28.3 + 0.33 / (np.exp((Vm + 48.0) / 4.0) + np.exp(-(Vm + 407.0) / 50.0))) * 1e-3 # s class ThalamoCortical(Thalamic): ''' Specific membrane channel dynamics of a thalamo-cortical neuron, with a specific hyperpolarization-activated, mixed cationic current and a leakage Potassium current. References: *Pospischil, M., Toledo-Rodriguez, M., Monier, C., Piwkowska, Z., Bal, T., Frégnac, Y., Markram, H., and Destexhe, A. (2008). Minimal Hodgkin-Huxley type models for different classes of cortical and thalamic neurons. Biol Cybern 99, 427–441.* *Destexhe, A., Bal, T., McCormick, D.A., and Sejnowski, T.J. (1996). Ionic mechanisms underlying synchronized oscillations and propagating waves in a model of ferret thalamic slices. J. Neurophysiol. 76, 2049–2070.* *McCormick, D.A., and Huguenard, J.R. (1992). A model of the electrophysiological properties of thalamocortical relay neurons. J. Neurophysiol. 68, 1384–1400.* ''' # Name of channel mechanism name = 'TC' # Cell-specific biophysical parameters # Vm0 = -63.4 # Cell membrane resting potential (mV) Vm0 = -61.93 # Cell membrane resting potential (mV) gNabar = 900.0 # bar. conductance of Sodium current (S/m^2) gKbar = 100.0 # bar. conductance of Potassium current (S/m^2) gCaTbar = 20.0 # Max. conductance of low-threshold Calcium current (S/m^2) gKL = 0.138 # Conductance of leakage Potassium current (S/m^2) gHbar = 0.175 # Max. conductance of mixed cationic current (S/m^2) gLeak = 0.1 # Conductance of non-specific leakage current (S/m^2) EH = -40.0 # Mixed cationic current reversal potential (mV) ELeak = -70.0 # Non-specific leakage Nernst potential (mV) VT = -52.0 # Spike threshold adjustment parameter (mV) Vx = 0.0 # Voltage-dependence uniform shift factor at 36°C (mV) taur_Cai = 5e-3 # decay time constant for intracellular Ca2+ dissolution (s) Cai_min = 50e-9 # minimal intracellular Calcium concentration (M) deff = 100e-9 # effective depth beneath membrane for intracellular [Ca2+] calculation nCa = 4 # number of Calcium binding sites on regulating factor k1 = 2.5e22 # intracellular Ca2+ regulation factor (M-4 s-1) k2 = 0.4 # intracellular Ca2+ regulation factor (s-1) k3 = 100.0 # intracellular Ca2+ regulation factor (s-1) k4 = 1.0 # intracellular Ca2+ regulation factor (s-1) # Default plotting scheme pltvars_scheme = { 'i_{Na}\ kin.': ['m', 'h'], 'i_{Kd}\ kin.': ['n'], 'i_{T}\ kin.': ['s', 'u'], 'i_{H}\ kin.': ['O', 'OL'] } def __init__(self): super().__init__() self.iCa_to_Cai_rate = self.currentToConcentrationRate(Z_Ca, self.deff) self.states_names += ['O', 'C', 'P0', 'Cai'] self.coeff_names += ['alphao', 'betao'] self.states0 = self.steadyStates(self.Vm0) def getPltVars(self): all_pltvars = super().getPltVars() all_pltvars.update({ 'Cai': { 'desc': 'sumbmembrane Ca2+ concentration', 'label': '[Ca^{2+}]_i', 'unit': 'uM', 'factor': 1e6 }, 'OL': { 'desc': 'iH O-gate locked-opening', 'label': 'O_L', - 'unit': None, - 'factor': 1, - 'min': 0.1, - 'max:': 1.1, + 'bounds': (-0.1, 1.1), 'alias': '1 - df["O"] - df["C"]' }, 'P0': { 'desc': 'iH regulating factor activation', 'label': 'P_0', - 'unit': None, - 'factor': 1, - 'min': -0.1, - 'max': 1.1 + 'bounds': (-0.1, 1.1) } }) return all_pltvars def sinf(self, Vm): ''' Voltage-dependent steady-state opening of s-gate :param Vm: membrane potential (mV) :return: steady-state opening (-) ''' return 1.0 / (1.0 + np.exp(-(Vm + self.Vx + 57.0) / 6.2)) # prob def taus(self, Vm): ''' Voltage-dependent adaptation time for adaptation of s-gate :param Vm: membrane potential (mV) :return: adaptation time (s) ''' x = np.exp(-(Vm + self.Vx + 132.0) / 16.7) + np.exp((Vm + self.Vx + 16.8) / 18.2) return 1.0 / 3.7 * (0.612 + 1.0 / x) * 1e-3 # s def uinf(self, Vm): ''' Voltage-dependent steady-state opening of u-gate :param Vm: membrane potential (mV) :return: steady-state opening (-) ''' return 1.0 / (1.0 + np.exp((Vm + self.Vx + 81.0) / 4.0)) # prob def tauu(self, Vm): ''' Voltage-dependent adaptation time for adaptation of u-gate :param Vm: membrane potential (mV) :return: adaptation time (s) ''' if Vm + self.Vx < -80.0: return 1.0 / 3.7 * np.exp((Vm + self.Vx + 467.0) / 66.6) * 1e-3 # s else: return 1 / 3.7 * (np.exp(-(Vm + self.Vx + 22) / 10.5) + 28.0) * 1e-3 # s def derS(self, Vm, s): ''' Evolution of s-gate open-probability :param Vm: membrane potential (mV) :param s: open-probability of s-gate (-) :return: time derivative of s-gate open-probability (s-1) ''' return (self.sinf(Vm) - s) / self.taus(Vm) def derU(self, Vm, u): ''' Evolution of u-gate open-probability :param Vm: membrane potential (mV) :param u: open-probability of u-gate (-) :return: time derivative of u-gate open-probability (s-1) ''' return (self.uinf(Vm) - u) / self.tauu(Vm) def oinf(self, Vm): ''' Voltage-dependent steady-state opening of O-gate :param Vm: membrane potential (mV) :return: steady-state opening (-) ''' return 1.0 / (1.0 + np.exp((Vm + 75.0) / 5.5)) def tauo(self, Vm): ''' Voltage-dependent adaptation time for adaptation of O-gate :param Vm: membrane potential (mV) :return: adaptation time (s) ''' return 1 / (np.exp(-14.59 - 0.086 * Vm) + np.exp(-1.87 + 0.0701 * Vm)) * 1e-3 def alphao(self, Vm): ''' Voltage-dependent transition rate between closed and open forms of O-gate :param Vm: membrane potential (mV) :return: rate (s-1) ''' return self.oinf(Vm) / self.tauo(Vm) def betao(self, Vm): ''' Voltage-dependent transition rate between open and closed forms of O-gate :param Vm: membrane potential (mV) :return: rate (s-1) ''' return (1 - self.oinf(Vm)) / self.tauo(Vm) def derC(self, C, O, Vm): ''' Evolution of O-gate closed-probability :param C: closed-probability of O-gate (-) :param O: open-probability of O-gate (-) :param Vm: membrane potential (mV) :return: time derivative of O-gate closed-probability (s-1) ''' return self.betao(Vm) * O - self.alphao(Vm) * C def derO(self, C, O, P0, Vm): ''' Evolution of O-gate open-probability :param C: closed-probability of O-gate (-) :param O: open-probability of O-gate (-) :param P0: proportion of Ih channels regulating factor in unbound state (-) :param Vm: membrane potential (mV) :return: time derivative of O-gate open-probability (s-1) ''' return - self.derC(C, O, Vm) - self.k3 * O * (1 - P0) + self.k4 * (1 - O - C) def derP0(self, P0, Cai): ''' Evolution of unbound probability of Ih regulating factor. :param P0: unbound probability of Ih regulating factor (-) :param Cai: submembrane Calcium concentration (M) :return: time derivative of ubnound probability (s-1) ''' return self.k2 * (1 - P0) - self.k1 * P0 * Cai**self.nCa def derCai(self, Cai, s, u, Vm): ''' Evolution of submembrane Calcium concentration. Model of Ca2+ buffering and contribution from iCaT derived from: *McCormick, D.A., and Huguenard, J.R. (1992). A model of the electrophysiological properties of thalamocortical relay neurons. J. Neurophysiol. 68, 1384–1400.* :param Cai: submembrane Calcium concentration (M) :param s: open-probability of s-gate (-) :param u: open-probability of u-gate (-) :param Vm: membrane potential (mV) :return: time derivative of submembrane Calcium concentration (M/s) ''' return (self.Cai_min - Cai) / self.taur_Cai - self.iCa_to_Cai_rate * self.iCaT(s, u, Vm) def iKLeak(self, Vm): ''' Potassium leakage current :param Vm: membrane potential (mV) :return: current per unit area (mA/m2) ''' return self.gKL * (Vm - self.EK) def iH(self, O, C, Vm): ''' Outward mixed cationic current :param C: closed-probability of O-gate (-) :param O: open-probability of O-gate (-) :param Vm: membrane potential (mV) :return: current per unit area (mA/m2) ''' OL = 1 - O - C # proportion of channels in locked-open form return self.gHbar * (O + 2 * OL) * (Vm - self.EH) def currents(self, Vm, states): ''' Overriding of abstract parent method. ''' m, h, n, s, u, O, C, _, _ = states currents = super().currents(Vm, [m, h, n, s, u]) currents['iKLeak'] = self.iKLeak(Vm) # mA/m2 currents['iH'] = self.iH(O, C, Vm) # mA/m2 return currents def steadyStates(self, Vm): ''' Overriding of abstract parent method. ''' # Call parent method to compute Sodium, Potassium and Calcium channels gates steady-states NaKCa_eqstates = super().steadyStates(Vm) # Compute steady-state Calcium current seq = NaKCa_eqstates[3] ueq = NaKCa_eqstates[4] iCaTeq = self.iCaT(seq, ueq, Vm) # Compute steady-state variables for the kinetics system of Ih Cai_eq = self.Cai_min - self.taur_Cai * self.iCa_to_Cai_rate * iCaTeq P0_eq = self.k2 / (self.k2 + self.k1 * Cai_eq**self.nCa) BA = self.betao(Vm) / self.alphao(Vm) O_eq = self.k4 / (self.k3 * (1 - P0_eq) + self.k4 * (1 + BA)) C_eq = BA * O_eq kin_eqstates = np.array([O_eq, C_eq, P0_eq, Cai_eq]) # Merge all steady-states and return return np.concatenate((NaKCa_eqstates, kin_eqstates)) def derStates(self, Vm, states): ''' Overriding of abstract parent method. ''' m, h, n, s, u, O, C, P0, Cai = states NaKCa_states = [m, h, n, s, u] NaKCa_derstates = super().derStates(Vm, NaKCa_states) dOdt = self.derO(C, O, P0, Vm) dCdt = self.derC(C, O, Vm) dP0dt = self.derP0(P0, Cai) dCaidt = self.derCai(Cai, s, u, Vm) return NaKCa_derstates + [dOdt, dCdt, dP0dt, dCaidt] def getEffRates(self, Vm): ''' Overriding of abstract parent method. ''' # Compute effective coefficients for Sodium, Potassium and Calcium conductances NaKCa_effrates = super().getEffRates(Vm) # Compute effective coefficients for Ih conductance ao_avg = np.mean(self.alphao(Vm)) bo_avg = np.mean(self.betao(Vm)) iH_effrates = np.array([ao_avg, bo_avg]) # Return array of coefficients return np.concatenate((NaKCa_effrates, iH_effrates)) def derStatesEff(self, Qm, states, interp_data): ''' Overriding of abstract parent method. ''' rates = np.array([np.interp(Qm, interp_data['Q'], interp_data[rn]) for rn in self.coeff_names]) Vmeff = np.interp(Qm, interp_data['Q'], interp_data['V']) # Unpack states m, h, n, s, u, O, C, P0, Cai = states # INa, IK, iCaT effective states derivatives dmdt = rates[0] * (1 - m) - rates[1] * m dhdt = rates[2] * (1 - h) - rates[3] * h dndt = rates[4] * (1 - n) - rates[5] * n dsdt = rates[6] * (1 - s) - rates[7] * s dudt = rates[8] * (1 - u) - rates[9] * u # Ih effective states derivatives dCdt = rates[11] * O - rates[10] * C dOdt = - dCdt - self.k3 * O * (1 - P0) + self.k4 * (1 - O - C) dP0dt = self.derP0(P0, Cai) dCaidt = self.derCai(Cai, s, u, Vmeff) # Merge derivatives and return return [dmdt, dhdt, dndt, dsdt, dudt, dOdt, dCdt, dP0dt, dCaidt] diff --git a/PySONIC/plt/batch.py b/PySONIC/plt/batch.py index 01a018e..0a9b73e 100644 --- a/PySONIC/plt/batch.py +++ b/PySONIC/plt/batch.py @@ -1,229 +1,155 @@ # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Date: 2018-09-25 16:19:19 # @Last Modified by: Theo Lemaire -# @Last Modified time: 2019-03-13 19:14:10 +# @Last Modified time: 2019-03-14 19:07:52 -import sys import pickle import ntpath import numpy as np import matplotlib.pyplot as plt from ..utils import * from ..core import BilayerSonophore -from .pltvars import pltvars from ..neurons import getNeuronsDict -def plotBatch(filepaths, vars_dict=None, plt_save=False, directory=None, +def plotBatch(filepaths, pltscheme=None, plt_save=False, directory=None, ask_before_save=True, fig_ext='png', tag='fig', fs=15, lw=2, title=True, show_patches=True, frequency=1): ''' Plot a figure with profiles of several specific NICE output variables, for several NICE simulations. :param filepaths: list of full paths to output data files to be compared - :param vars_dict: dict of lists of variables names to extract and plot together + :param pltscheme: dict of lists of variables names to extract and plot together :param plt_save: boolean stating whether to save the created figures :param directory: directory where to save figures :param ask_before_save: boolean stating whether to show the created figures :param fig_ext: file extension for the saved figures :param tag: suffix added to the end of the figures name :param fs: labels font size :param lw: curves line width :param title: boolean stating whether to display a general title on the figures :param show_patches: boolean indicating whether to indicate periods of stimulation with colored rectangular patches + :param frequency: downsampling frequency for time series + :return: list of figure handles ''' - # Check validity of plot variables - if vars_dict: - yvars = list(sum(list(vars_dict.values()), [])) - for key in yvars: - if key not in pltvars: - raise KeyError('Unknown plot variable: "{}"'.format(key)) - - # Dictionary of neurons - neurons_dict = getNeuronsDict() + figs = [] # Loop through data files - figs = [] for filepath in filepaths: - # Get code from file name + # Retrieve file code and sim type from file name pkl_filename = ntpath.basename(filepath) filecode = pkl_filename[0:-4] - - # Retrieve sim type - mo1 = rgxp.fullmatch(pkl_filename) - mo2 = rgxp_mech.fullmatch(pkl_filename) - if mo1: - mo = mo1 - elif mo2: - mo = mo2 - else: - logger.error('Error: "%s" file does not match regexp pattern', pkl_filename) - sys.exit(1) - sim_type = mo.group(1) - if sim_type not in ('MECH', 'ASTIM', 'ESTIM'): - raise ValueError('Invalid simulation type: {}'.format(sim_type)) + sim_type = getSimType(pkl_filename) # Load data logger.info('Loading data from "%s"', pkl_filename) with open(filepath, 'rb') as fh: frame = pickle.load(fh) - df = frame['data'] + df = frame['data'].iloc[::frequency] meta = frame['meta'] # Extract variables logger.info('Extracting variables') - t = df['t'].values[::frequency] - states = df['states'].values[::frequency] - nsamples = t.size + t = df['t'].values + states = df['states'].values + _, tpatch_on, tpatch_off = getStimPulses(t, states) - # Initialize channel mechanism + # Initialize appropriate object if sim_type in ['ASTIM', 'ESTIM']: - neuron_name = mo.group(2) - global neuron - neuron = neurons_dict[neuron_name]() - neuron_states = [df[sn].values[::frequency] for sn in neuron.states_names] - Cm0 = neuron.Cm0 - Qm0 = Cm0 * neuron.Vm0 * 1e-3 - t_plt = pltvars['t_ms'] + obj = getNeuronsDict()[getNeuronType(pkl_filename)]() else: - Cm0 = meta['Cm0'] - Qm0 = meta['Qm0'] - t_plt = pltvars['t_us'] - - # Initialize BLS - if sim_type in ['MECH', 'ASTIM']: - global bls - Fdrive = meta['Fdrive'] - a = meta['a'] - bls = BilayerSonophore(a, Cm0, Qm0) - - # Determine patches location - npatches, tpatch_on, tpatch_off = getStimPulses(t, states) - - # Adding onset to time vector - if t_plt['onset'] > 0.0: - tonset = np.array([-t_plt['onset'], -t[0] - t[1]]) + obj = BilayerSonophore(meta['a'], meta['Cm0'], meta['Qm0']) + + # Retrieve plot variables + tvar, pltvars = getTimePltVar(obj.tscale), obj.getPltVars() + + # Check plot scheme if provided, otherwise generate it + if pltscheme: + for key in list(sum(list(pltscheme.values()), [])): + if key not in pltvars: + raise KeyError('Unknown plot variable: "{}"'.format(key)) + else: + pltscheme = obj.getPltScheme() + + # Preset and rescale time vector + if tvar['onset'] > 0.0: + tonset = np.array([-tvar['onset'], -t[0] - t[1]]) t = np.hstack((tonset, t)) states = np.hstack((states, np.zeros(2))) + t *= tvar['factor'] - # Determine variables to plot if not provided - if not vars_dict: - if sim_type == 'ASTIM': - vars_dict = {'Q_m': ['Qm'], 'V_m': ['Vm']} - elif sim_type == 'ESTIM': - vars_dict = {'V_m': ['Vm']} - elif sim_type == 'MECH': - vars_dict = {'P_{AC}': ['Pac'], 'Z': ['Z'], 'n_g': ['ng']} - if sim_type in ['ASTIM', 'ESTIM'] and hasattr(neuron, 'pltvars_scheme'): - vars_dict.update(neuron.pltvars_scheme) - pltvars.update(neuron.getPltVars()) - vars_dict['I'] = list(neuron.getPltVars().keys()) - labels = list(vars_dict.keys()) - naxes = len(vars_dict) - - # Plotting + # Create figure + naxes = len(pltscheme) if naxes == 1: fig, ax = plt.subplots(figsize=(11, 4)) axes = [ax] else: fig, axes = plt.subplots(naxes, 1, figsize=(11, min(3 * naxes, 9))) - for i in range(naxes): - - ax = axes[i] - for item in ['top', 'right']: - ax.spines[item].set_visible(False) - ax_pltvars = [pltvars[j] for j in vars_dict[labels[i]]] - nvars = len(ax_pltvars) - - # X-axis - if i < naxes - 1: - ax.get_xaxis().set_ticklabels([]) - else: - ax.set_xlabel('${}\ ({})$'.format(t_plt['label'], t_plt['unit']), fontsize=fs) - for tick in ax.xaxis.get_major_ticks(): - tick.label.set_fontsize(fs) - - # Y-axis - if ax_pltvars[0]['unit']: - ax.set_ylabel('${}\ ({})$'.format(labels[i], ax_pltvars[0]['unit']), - fontsize=fs) - else: - ax.set_ylabel('${}$'.format(labels[i]), fontsize=fs) - if 'min' in ax_pltvars[0] and 'max' in ax_pltvars[0]: - ax_min = min([ap['min'] for ap in ax_pltvars]) - ax_max = max([ap['max'] for ap in ax_pltvars]) + # Loop through each subgraph and plot appropriate variables + for ax, (grouplabel, keys) in zip(axes, pltscheme.items()): + nvars = len(keys) + ax_pltvars = [pltvars[k] for k in keys] + + # Set y-axis unit and bounds + ax.set_ylabel('$\\rm {}\ ({})$'.format(grouplabel, ax_pltvars[0].get('unit', '-')), + fontsize=fs) + if 'bounds' in ax_pltvars[0]: + ax_min = min([ap['bounds'][0] for ap in ax_pltvars]) + ax_max = max([ap['bounds'][1] for ap in ax_pltvars]) ax.set_ylim(ax_min, ax_max) - ax.locator_params(axis='y', nbins=2) - for tick in ax.yaxis.get_major_ticks(): - tick.label.set_fontsize(fs) - # Time series + # Plot time series icolor = 0 - for j in range(nvars): - - # Extract variable - pltvar = ax_pltvars[j] - if 'alias' in pltvar: - var = eval(pltvar['alias']) - elif 'key' in pltvar: - var = df[pltvar['key']].values[::frequency] - elif 'constant' in pltvar: - var = eval(pltvar['constant']) * np.ones(nsamples) - else: - var = df[vars_dict[labels[i]][j]].values[::frequency] - if var.size == t.size - 2: - if pltvar['desc'] == 'membrane potential': - var = np.hstack((np.array([neuron.Vm0] * 2), var)) - else: - var = np.hstack((np.array([var[0]] * 2), var)) - # var = np.insert(var, 0, var[0]) - - # Plot variable - if 'constant' in pltvar or pltvar['label'] in ['I_{net}']: - ax.plot(t * t_plt['factor'], var * pltvar['factor'], '--', c='black', lw=lw, - label='${}$'.format(pltvar['label'])) - else: - ax.plot(t * t_plt['factor'], var * pltvar['factor'], - c='C{}'.format(icolor), lw=lw, label='${}$'.format(pltvar['label'])) + for pltvar, name in zip(ax_pltvars, pltscheme[grouplabel]): + var = extractPltVar(obj, pltvar, df, t.size, name) + ax.plot(t, var, pltvar.get('ls', '-'), c=pltvar.get('color', 'C{}'.format(icolor)), + lw=lw, label='$\\rm {}$'.format(pltvar['label'])) + if 'color' not in pltvar: icolor += 1 - # Patches - if show_patches == 1: - (ybottom, ytop) = ax.get_ylim() - for j in range(npatches): - ax.axvspan(tpatch_on[j] * t_plt['factor'], tpatch_off[j] * t_plt['factor'], - edgecolor='none', facecolor='#8A8A8A', alpha=0.2) - # Legend + # Add legend if nvars > 1 or 'gate' in ax_pltvars[0]['desc']: - ax.legend(fontsize=fs, loc=7, ncol=nvars // 4 + 1) + ax.legend(fontsize=fs, loc=7, ncol=nvars // 4 + 1, frameon=False) - - # Title + # Post-process figure + for ax in axes: + for item in ['top', 'right']: + ax.spines[item].set_visible(False) + ax.locator_params(axis='y', nbins=2) + for item in ax.get_yticklabels(): + item.set_fontsize(fs) + for ax in axes[:-1]: + ax.set_xticklabels([]) + for item in axes[-1].get_xticklabels(): + item.set_fontsize(fs) + axes[-1].set_xlabel('$\\rm {}\ ({})$'.format(tvar['label'], tvar['unit']), fontsize=fs) + if show_patches == 1: + for ax in axes: + plotStimPatches(ax, tpatch_on, tpatch_off, tvar['factor']) if title: axes[0].set_title(figtitle(meta), fontsize=fs) - fig.tight_layout() # Save figure if needed (automatic or checked) if plt_save: if directory is None: directory = os.path.split(filepath)[0] if ask_before_save: plt_filename = SaveFileDialog( '{}_{}.{}'.format(filecode, tag, fig_ext), dirname=directory, ext=fig_ext) else: plt_filename = '{}/{}_{}.{}'.format(directory, filecode, tag, fig_ext) if plt_filename: plt.savefig(plt_filename) logger.info('Saving figure as "{}"'.format(plt_filename)) plt.close() figs.append(fig) return figs diff --git a/PySONIC/plt/comp.py b/PySONIC/plt/comp.py index f6cdd88..4619973 100644 --- a/PySONIC/plt/comp.py +++ b/PySONIC/plt/comp.py @@ -1,361 +1,361 @@ # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Date: 2018-09-25 16:18:45 # @Last Modified by: Theo Lemaire -# @Last Modified time: 2018-10-25 14:27:17 +# @Last Modified time: 2019-03-14 16:42:09 import sys import pickle import ntpath import numpy as np import matplotlib.pyplot as plt from matplotlib.patches import Rectangle from matplotlib.ticker import FormatStrFormatter from ..utils import * -from .pltvars import pltvars +# from .pltvars import pltvars from ..core import BilayerSonophore from ..neurons import getNeuronsDict class InteractiveLegend(object): ''' Class defining an interactive matplotlib legend, where lines visibility can be toggled by simply clicking on the corresponding legend label. Other graphic objects can also be associated to the toggle of a specific line Adapted from: http://stackoverflow.com/questions/31410043/hiding-lines-after-showing-a-pyplot-figure ''' def __init__(self, legend, aliases): self.legend = legend self.fig = legend.axes.figure self.lookup_artist, self.lookup_handle = self._build_lookups(legend) self._setup_connections() self.handles_aliases = aliases self.update() def _setup_connections(self): for artist in self.legend.texts + self.legend.legendHandles: artist.set_picker(10) # 10 points tolerance self.fig.canvas.mpl_connect('pick_event', self.on_pick) def _build_lookups(self, legend): ''' Method of the InteractiveLegend class building the legend lookups. ''' labels = [t.get_text() for t in legend.texts] handles = legend.legendHandles label2handle = dict(zip(labels, handles)) handle2text = dict(zip(handles, legend.texts)) lookup_artist = {} lookup_handle = {} for artist in legend.axes.get_children(): if artist.get_label() in labels: handle = label2handle[artist.get_label()] lookup_handle[artist] = handle lookup_artist[handle] = artist lookup_artist[handle2text[handle]] = artist lookup_handle.update(zip(handles, handles)) lookup_handle.update(zip(legend.texts, handles)) return lookup_artist, lookup_handle def on_pick(self, event): handle = event.artist if handle in self.lookup_artist: artist = self.lookup_artist[handle] artist.set_visible(not artist.get_visible()) self.update() def update(self): for artist in self.lookup_artist.values(): handle = self.lookup_handle[artist] if artist.get_visible(): handle.set_visible(True) if artist in self.handles_aliases: for al in self.handles_aliases[artist]: al.set_visible(True) else: handle.set_visible(False) if artist in self.handles_aliases: for al in self.handles_aliases[artist]: al.set_visible(False) self.fig.canvas.draw() def show(self): ''' showing the interactive legend ''' plt.show() def plotComp(filepaths, varname, labels=None, fs=15, lw=2, colors=None, lines=None, patches='one', xticks=None, yticks=None, blacklegend=False, straightlegend=False, inset=None, figsize=(11, 4)): ''' Compare profiles of several specific output variables of NICE simulations. :param filepaths: list of full paths to output data files to be compared :param varname: name of variable to extract and compare :param labels: list of labels to use in the legend :param fs: labels fontsize :param patches: string indicating whether to indicate periods of stimulation with colored rectangular patches ''' # Input check 1: variable name if varname not in pltvars: raise KeyError('Unknown plot variable: "{}"'.format(varname)) pltvar = pltvars[varname] # Input check 2: labels if labels is not None: if len(labels) != len(filepaths): raise AssertionError('Invalid labels ({}): not matching number of compared files ({})' .format(len(labels), len(filepaths))) if not all(isinstance(x, str) for x in labels): raise TypeError('Invalid labels: must be string typed') # Input check 3: line styles and colors if colors is None: colors = ['C{}'.format(j) for j in range(len(filepaths))] if lines is None: lines = ['-'] * len(filepaths) # Input check 4: STIM-ON patches greypatch = False if patches == 'none': patches = [False] * len(filepaths) elif patches == 'all': patches = [True] * len(filepaths) elif patches == 'one': patches = [True] + [False] * (len(filepaths) - 1) greypatch = True elif isinstance(patches, list): if len(patches) != len(filepaths): raise AssertionError('Invalid patches ({}): not matching number of compared files ({})' .format(len(patches), len(filepaths))) if not all(isinstance(p, bool) for p in patches): raise TypeError('Invalid patch sequence: all list items must be boolean typed') else: raise ValueError('Invalid patches: must be either "none", all", "one", or a boolean list') # Initialize figure and axis fig, ax = plt.subplots(figsize=figsize) ax.set_zorder(0) for item in ['top', 'right']: ax.spines[item].set_visible(False) if 'min' in pltvar and 'max' in pltvar: # optional min and max on y-axis ax.set_ylim(pltvar['min'], pltvar['max']) if pltvar['unit']: # y-label with optional unit ax.set_ylabel('$\\rm {}\ ({})$'.format(pltvar['label'], pltvar['unit']), fontsize=fs) else: ax.set_ylabel('$\\rm {}$'.format(pltvar['label']), fontsize=fs) if xticks is not None: # optional x-ticks ax.set_xticks(xticks) if yticks is not None: # optional y-ticks ax.set_yticks(yticks) else: ax.locator_params(axis='y', nbins=2) if any(ax.get_yticks() < 0): ax.yaxis.set_major_formatter(FormatStrFormatter('%+.0f')) for tick in ax.xaxis.get_major_ticks() + ax.yaxis.get_major_ticks(): tick.label.set_fontsize(fs) # Optional inset axis if inset is not None: inset_ax = fig.add_axes(ax.get_position()) inset_ax.set_xlim(inset['xlims'][0], inset['xlims'][1]) inset_ax.set_ylim(inset['ylims'][0], inset['ylims'][1]) inset_ax.set_xticks([]) inset_ax.set_yticks([]) # inset_ax.patch.set_alpha(1.0) inset_ax.set_zorder(1) inset_ax.add_patch(Rectangle((inset['xlims'][0], inset['ylims'][0]), inset['xlims'][1] - inset['xlims'][0], inset['ylims'][1] - inset['ylims'][0], color='w')) # Retrieve neurons dictionary neurons_dict = getNeuronsDict() # Loop through data files aliases = {} for j, filepath in enumerate(filepaths): # Retrieve sim type pkl_filename = ntpath.basename(filepath) mo1 = rgxp.fullmatch(pkl_filename) mo2 = rgxp_mech.fullmatch(pkl_filename) if mo1: mo = mo1 elif mo2: mo = mo2 else: logger.error('Error: "%s" file does not match regexp pattern', pkl_filename) sys.exit(1) sim_type = mo.group(1) if sim_type not in ('MECH', 'ASTIM', 'ESTIM'): raise ValueError('Invalid simulation type: {}'.format(sim_type)) if j == 0: sim_type_ref = sim_type t_plt = pltvars[timeunits[sim_type]] elif sim_type != sim_type_ref: raise ValueError('Invalid comparison: different simulation types') # Load data logger.info('Loading data from "%s"', pkl_filename) with open(filepath, 'rb') as fh: frame = pickle.load(fh) df = frame['data'] meta = frame['meta'] # Extract variables t = df['t'].values states = df['states'].values nsamples = t.size # Initialize neuron object if ESTIM or ASTIM sim type if sim_type in ['ASTIM', 'ESTIM']: neuron_name = mo.group(2) global neuron neuron = neurons_dict[neuron_name]() Cm0 = neuron.Cm0 Qm0 = Cm0 * neuron.Vm0 * 1e-3 # Extract neuron states if needed if 'alias' in pltvar and 'neuron_states' in pltvar['alias']: neuron_states = [df[sn].values for sn in neuron.states_names] else: Cm0 = meta['Cm0'] Qm0 = meta['Qm0'] # Initialize BLS if needed if sim_type in ['MECH', 'ASTIM'] and 'alias' in pltvar and 'bls' in pltvar['alias']: global bls bls = BilayerSonophore(meta['a'], Cm0, Qm0) # Determine patches location npatches, tpatch_on, tpatch_off = getStimPulses(t, states) # Add onset to time vectors if t_plt['onset'] > 0.0: tonset = np.array([-t_plt['onset'], -t[0] - t[1]]) t = np.hstack((tonset, t)) states = np.hstack((states, np.zeros(2))) # Set x-axis label ax.set_xlabel('$\\rm {}\ ({})$'.format(t_plt['label'], t_plt['unit']), fontsize=fs) # Extract variable to plot if 'alias' in pltvar: var = eval(pltvar['alias']) elif 'key' in pltvar: var = df[pltvar['key']].values elif 'constant' in pltvar: var = eval(pltvar['constant']) * np.ones(nsamples) else: var = df[varname].values if var.size == t.size - 2: if varname is 'Vm': var = np.hstack((np.array([neuron.Vm0] * 2), var)) else: var = np.hstack((np.array([var[0]] * 2), var)) # var = np.insert(var, 0, var[0]) # Determine legend label if labels is not None: label = labels[j] else: label = figtitle(meta) # Plot trace handle = ax.plot(t * t_plt['factor'], var * pltvar['factor'], linewidth=lw, linestyle=lines[j], color=colors[j], label=label) if inset is not None: inset_window = np.logical_and(t > (inset['xlims'][0] / t_plt['factor']), t < (inset['xlims'][1] / t_plt['factor'])) inset_ax.plot(t[inset_window] * t_plt['factor'], var[inset_window] * pltvar['factor'], linewidth=lw, linestyle=lines[j], color=colors[j]) # Add optional STIM-ON patches if patches[j]: (ybottom, ytop) = ax.get_ylim() la = [] color = '#8A8A8A' if greypatch else handle[0].get_color() for i in range(npatches): la.append(ax.axvspan(tpatch_on[i] * t_plt['factor'], tpatch_off[i] * t_plt['factor'], edgecolor='none', facecolor=color, alpha=0.2)) aliases[handle[0]] = la if inset is not None: cond_on = np.logical_and(tpatch_on > (inset['xlims'][0] / t_plt['factor']), tpatch_on < (inset['xlims'][1] / t_plt['factor'])) cond_off = np.logical_and(tpatch_off > (inset['xlims'][0] / t_plt['factor']), tpatch_off < (inset['xlims'][1] / t_plt['factor'])) cond_glob = np.logical_and(tpatch_on < (inset['xlims'][0] / t_plt['factor']), tpatch_off > (inset['xlims'][1] / t_plt['factor'])) cond_onoff = np.logical_or(cond_on, cond_off) cond = np.logical_or(cond_onoff, cond_glob) npatches_inset = np.sum(cond) for i in range(npatches_inset): inset_ax.add_patch(Rectangle((tpatch_on[cond][i] * t_plt['factor'], ybottom), (tpatch_off[cond][i] - tpatch_on[cond][i]) * t_plt['factor'], ytop - ybottom, color=color, alpha=0.1)) fig.tight_layout() # Optional operations on inset: if inset is not None: # Re-position inset axis axpos = ax.get_position() left, right, = rescale(inset['xcoords'], ax.get_xlim()[0], ax.get_xlim()[1], axpos.x0, axpos.x0 + axpos.width) bottom, top, = rescale(inset['ycoords'], ax.get_ylim()[0], ax.get_ylim()[1], axpos.y0, axpos.y0 + axpos.height) inset_ax.set_position([left, bottom, right - left, top - bottom]) for i in inset_ax.spines.values(): i.set_linewidth(2) # Materialize inset target region with contour frame ax.plot(inset['xlims'], [inset['ylims'][0]] * 2, linestyle='-', color='k') ax.plot(inset['xlims'], [inset['ylims'][1]] * 2, linestyle='-', color='k') ax.plot([inset['xlims'][0]] * 2, inset['ylims'], linestyle='-', color='k') ax.plot([inset['xlims'][1]] * 2, inset['ylims'], linestyle='-', color='k') # Link target and inset with dashed lines if possible if inset['xcoords'][1] < inset['xlims'][0]: ax.plot([inset['xcoords'][1], inset['xlims'][0]], [inset['ycoords'][0], inset['ylims'][0]], linestyle='--', color='k') ax.plot([inset['xcoords'][1], inset['xlims'][0]], [inset['ycoords'][1], inset['ylims'][1]], linestyle='--', color='k') elif inset['xcoords'][0] > inset['xlims'][1]: ax.plot([inset['xcoords'][0], inset['xlims'][1]], [inset['ycoords'][0], inset['ylims'][0]], linestyle='--', color='k') ax.plot([inset['xcoords'][0], inset['xlims'][1]], [inset['ycoords'][1], inset['ylims'][1]], linestyle='--', color='k') else: logger.warning('Inset x-coordinates intersect with those of target region') # Create interactive legend leg = ax.legend(loc=1, fontsize=fs, frameon=False) if blacklegend: for l in leg.get_lines(): l.set_color('k') if straightlegend: for l in leg.get_lines(): l.set_linestyle('-') interactive_legend = InteractiveLegend(ax.legend_, aliases) return fig \ No newline at end of file diff --git a/PySONIC/plt/pltvars.py b/PySONIC/plt/pltvars.py index 6ea0d77..c7d7e87 100644 --- a/PySONIC/plt/pltvars.py +++ b/PySONIC/plt/pltvars.py @@ -1,119 +1,53 @@ # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Date: 2017-08-21 14:33:36 # @Last Modified by: Theo Lemaire -# @Last Modified time: 2019-03-13 20:09:07 +# @Last Modified time: 2019-03-14 16:37:23 ''' Dictionary of plotting settings for output variables of the model. ''' -pltvars = { - 't_ms': { - 'desc': 'time', - 'label': 'time', - 'unit': 'ms', - 'factor': 1e3, - 'onset': 1e-3 - }, - - 't_us': { - 'desc': 'time', - 'label': 'time', - 'unit': 'us', - 'factor': 1e6, - 'onset': 1e-6 - }, - - 'Z': { - 'desc': 'leaflets deflection', - 'label': 'Z', - 'unit': 'nm', - 'factor': 1e9, - 'min': -1.0, - 'max': 10.0 - }, - - 'ng': { - 'desc': 'gas content', - 'label': 'n_g', - 'unit': '10^{-22}\ mol', - 'factor': 1e22, - 'min': 1.0, - 'max': 15.0 - }, - 'Pac': { - 'desc': 'acoustic pressure', - 'label': 'P_{AC}', - 'unit': 'kPa', - 'factor': 1e-3, - 'alias': 'bls.Pacoustic(t, meta["Adrive"] * states, Fdrive)' - }, - - 'Pmavg': { - 'desc': 'average intermolecular pressure', - 'label': 'P_M', - 'unit': 'kPa', - 'factor': 1e-3, - 'alias': 'bls.PMavgpred(df["Z"])' - }, - - 'Telastic': { - 'desc': 'leaflet elastic tension', - 'label': 'T_E', - 'unit': 'mN/m', - 'factor': 1e3, - 'alias': 'bls.TEleaflet(df["Z"])' - }, - - 'Cm': { - 'desc': 'membrane capacitance', - 'label': 'C_m', - 'unit': 'uF/cm^2', - 'factor': 1e2, - 'min': 0.0, - 'max': 1.5, - 'alias': 'bls.v_Capct(df["Z"])' - }, +tmp = { 'Nai': { 'desc': 'sumbmembrane Na+ concentration', 'label': '[Na^+]_i', 'unit': 'uM', 'factor': 1e6 }, 'Nai_arb': { 'key': 'Nai', 'desc': 'submembrane Na+ concentration', 'label': '[Na^+]', 'unit': 'arb.', 'factor': 1 }, 'C_Na_arb_activation': { 'key': 'A_Na', 'desc': 'Na+ dependent PumpNa current activation', 'label': 'A_{Na^+}', 'unit': 'arb', 'factor': 1 }, 'C_Ca_arb': { 'key': 'C_Ca', 'desc': 'submembrane Ca2+ concentration', 'label': '[Ca^{2+}]', 'unit': 'arb.', 'factor': 1 }, 'C_Ca_arb_activation': { 'key': 'A_Ca', 'desc': 'Ca2+ dependent Potassium current activation', 'label': 'A_{Ca^{2+}}', 'unit': 'arb', 'factor': 1 }, } diff --git a/PySONIC/utils.py b/PySONIC/utils.py index 7819452..f47db7f 100644 --- a/PySONIC/utils.py +++ b/PySONIC/utils.py @@ -1,594 +1,649 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Date: 2016-09-19 22:30:46 # @Email: theo.lemaire@epfl.ch # @Last Modified by: Theo Lemaire -# @Last Modified time: 2019-03-11 13:45:33 +# @Last Modified time: 2019-03-14 18:33:59 ''' Definition of generic utility functions used in other modules ''' import operator import os import math import pickle import re import tkinter as tk from tkinter import filedialog import numpy as np import colorlog from scipy.interpolate import interp1d import matplotlib from .constants import FARADAY, Rg # Matplotlib parameters matplotlib.rcParams['pdf.fonttype'] = 42 matplotlib.rcParams['ps.fonttype'] = 42 matplotlib.rcParams['font.family'] = 'arial' # Package logger def setLogger(): log_formatter = colorlog.ColoredFormatter( '%(log_color)s %(asctime)s %(message)s', datefmt='%d/%m/%Y %H:%M:%S:', reset=True, log_colors={ 'DEBUG': 'green', 'INFO': 'white', 'WARNING': 'yellow', 'ERROR': 'red', 'CRITICAL': 'red,bg_white', }, style='%' ) log_handler = colorlog.StreamHandler() log_handler.setFormatter(log_formatter) color_logger = colorlog.getLogger('PySONIC') color_logger.addHandler(log_handler) return color_logger logger = setLogger() # File naming conventions def ESTIM_filecode(neuron, Astim, tstim, PRF, DC): return 'ESTIM_{}_{}_{:.1f}mA_per_m2_{:.0f}ms{}'.format( neuron, 'CW' if DC == 1 else 'PW', Astim, tstim * 1e3, '_PRF{:.2f}Hz_DC{:.2f}%'.format(PRF, DC * 1e2) if DC < 1. else '') def ASTIM_filecode(neuron, a, Fdrive, Adrive, tstim, PRF, DC, method): return 'ASTIM_{}_{}_{:.0f}nm_{:.0f}kHz_{:.2f}kPa_{:.0f}ms_{}{}'.format( neuron, 'CW' if DC == 1 else 'PW', a * 1e9, Fdrive * 1e-3, Adrive * 1e-3, tstim * 1e3, 'PRF{:.2f}Hz_DC{:.2f}%_'.format(PRF, DC * 1e2) if DC < 1. else '', method) def MECH_filecode(a, Fdrive, Adrive, Qm): return 'MECH_{:.0f}nm_{:.0f}kHz_{:.1f}kPa_{:.1f}nCcm2'.format( a * 1e9, Fdrive * 1e-3, Adrive * 1e-3, Qm * 1e5) rgxp = re.compile('(ESTIM|ASTIM)_([A-Za-z]*)_(.*).pkl') rgxp_mech = re.compile('(MECH)_(.*).pkl') # Figure naming conventions def figtitle(meta): ''' Return appropriate title based on simulation metadata. ''' if 'Cm0' in meta: return '{:.0f}nm radius BLS structure: MECH-STIM {:.0f}kHz, {:.2f}kPa, {:.1f}nC/cm2'.format( meta['a'] * 1e9, meta['Fdrive'] * 1e-3, meta['Adrive'] * 1e-3, meta['Qm'] * 1e5) else: if meta['DC'] < 1: wavetype = 'PW' suffix = ', {:.2f}Hz PRF, {:.0f}% DC'.format(meta['PRF'], meta['DC'] * 1e2) else: wavetype = 'CW' suffix = '' if 'Astim' in meta: return '{} neuron: {} E-STIM {:.2f}mA/m2, {:.0f}ms{}'.format( meta['neuron'], wavetype, meta['Astim'], meta['tstim'] * 1e3, suffix) else: return '{} neuron ({:.1f}nm): {} A-STIM {:.0f}kHz {:.2f}kPa, {:.0f}ms{} - {} model'.format( meta['neuron'], meta['a'] * 1e9, wavetype, meta['Fdrive'] * 1e-3, meta['Adrive'] * 1e-3, meta['tstim'] * 1e3, suffix, meta['method']) timeunits = { 'ASTIM': 't_ms', 'ESTIM': 't_ms', 'MECH': 't_us' } +def getTimePltVar(tscale): + ''' Return time plot variable for a given temporal scale. ''' + return { + 'desc': 'time', + 'label': 'time', + 'unit': tscale, + 'factor': {'ms': 1e3, 'us': 1e6}[tscale], + 'onset': {'ms': 1e-3, 'us': 1e-6}[tscale] + } + + +def getSimType(fname): + ''' Get sim type from filename. ''' + for exp in [rgxp, rgxp_mech]: + mo = exp.fullmatch(fname) + if mo: + sim_type = mo.group(1) + if sim_type not in ('MECH', 'ASTIM', 'ESTIM'): + raise ValueError('Invalid simulation type: {}'.format(sim_type)) + return sim_type + raise ValueError('Error: "{}" file does not match regexp pattern'.format(fname)) + + +def getNeuronType(fname): + ''' Get neuron type from filename. ''' + mo = rgxp.fullmatch(fname) + if mo: + return mo.group(2) + raise ValueError('Error: "{}" file does not match regexp pattern'.format(fname)) + + def cm2inch(*tupl): inch = 2.54 if isinstance(tupl[0], tuple): return tuple(i / inch for i in tupl[0]) else: return tuple(i / inch for i in tupl) # SI units prefixes si_prefixes = { 'y': 1e-24, # yocto 'z': 1e-21, # zepto 'a': 1e-18, # atto 'f': 1e-15, # femto 'p': 1e-12, # pico 'n': 1e-9, # nano 'u': 1e-6, # micro 'm': 1e-3, # mili '': 1e0, # None 'k': 1e3, # kilo 'M': 1e6, # mega 'G': 1e9, # giga 'T': 1e12, # tera 'P': 1e15, # peta 'E': 1e18, # exa 'Z': 1e21, # zetta 'Y': 1e24, # yotta } def si_format(x, precision=0, space=' '): ''' Format a float according to the SI unit system, with the appropriate prefix letter. ''' if isinstance(x, float) or isinstance(x, int) or isinstance(x, np.float) or\ isinstance(x, np.int32) or isinstance(x, np.int64): if x == 0: factor = 1e0 prefix = '' else: sorted_si_prefixes = sorted(si_prefixes.items(), key=operator.itemgetter(1)) vals = [tmp[1] for tmp in sorted_si_prefixes] # vals = list(si_prefixes.values()) ix = np.searchsorted(vals, np.abs(x)) - 1 if np.abs(x) == vals[ix + 1]: ix += 1 factor = vals[ix] prefix = sorted_si_prefixes[ix][0] # prefix = list(si_prefixes.keys())[ix] return '{{:.{}f}}{}{}'.format(precision, space, prefix).format(x / factor) elif isinstance(x, list) or isinstance(x, tuple): return [si_format(item, precision, space) for item in x] elif isinstance(x, np.ndarray) and x.ndim == 1: return [si_format(float(item), precision, space) for item in x] else: print(type(x)) def pow10_format(number, precision=2): ''' Format a number in power of 10 notation. ''' ret_string = '{0:.{1:d}e}'.format(number, precision) a, b = ret_string.split("e") a = float(a) b = int(b) return '{}10^{{{}}}'.format('{} * '.format(a) if a != 1. else '', b) def rmse(x1, x2): ''' Compute the root mean square error between two 1D arrays ''' return np.sqrt(((x1 - x2) ** 2).mean()) def rsquared(x1, x2): ''' compute the R-squared coefficient between two 1D arrays ''' residuals = x1 - x2 ss_res = np.sum(residuals**2) ss_tot = np.sum((x1 - np.mean(x1))**2) return 1 - (ss_res / ss_tot) def Pressure2Intensity(p, rho=1075.0, c=1515.0): ''' Return the spatial peak, pulse average acoustic intensity (ISPPA) associated with the specified pressure amplitude. :param p: pressure amplitude (Pa) :param rho: medium density (kg/m3) :param c: speed of sound in medium (m/s) :return: spatial peak, pulse average acoustic intensity (W/m2) ''' return p**2 / (2 * rho * c) def Intensity2Pressure(I, rho=1075.0, c=1515.0): ''' Return the pressure amplitude associated with the specified spatial peak, pulse average acoustic intensity (ISPPA). :param I: spatial peak, pulse average acoustic intensity (W/m2) :param rho: medium density (kg/m3) :param c: speed of sound in medium (m/s) :return: pressure amplitude (Pa) ''' return np.sqrt(2 * rho * c * I) def OpenFilesDialog(filetype, dirname=''): ''' Open a FileOpenDialogBox to select one or multiple file. The default directory and file type are given. :param dirname: default directory :param filetype: default file type :return: tuple of full paths to the chosen filenames ''' root = tk.Tk() root.withdraw() filenames = filedialog.askopenfilenames(filetypes=[(filetype + " files", '.' + filetype)], initialdir=dirname) if filenames: par_dir = os.path.abspath(os.path.join(filenames[0], os.pardir)) else: par_dir = None return (filenames, par_dir) def selectDirDialog(): ''' Open a dialog box to select a directory. :return: full path to selected directory ''' root = tk.Tk() root.withdraw() return filedialog.askdirectory() def SaveFileDialog(filename, dirname=None, ext=None): ''' Open a dialog box to save file. :param filename: filename :param dirname: initial directory :param ext: default extension :return: full path to the chosen filename ''' root = tk.Tk() root.withdraw() filename_out = filedialog.asksaveasfilename( defaultextension=ext, initialdir=dirname, initialfile=filename) return filename_out def downsample(t_dense, y, nsparse): ''' Decimate periodic signals to a specified number of samples.''' if(y.ndim) > 1: nsignals = y.shape[0] else: nsignals = 1 y = np.array([y]) # determine time step and period of input signal T = t_dense[-1] - t_dense[0] dt_dense = t_dense[1] - t_dense[0] # resample time vector linearly t_ds = np.linspace(t_dense[0], t_dense[-1], nsparse) # create MAV window nmav = int(0.03 * T / dt_dense) if nmav % 2 == 0: nmav += 1 mav = np.ones(nmav) / nmav # determine signals padding npad = int((nmav - 1) / 2) # determine indexes of sampling on convolved signals ids = np.round(np.linspace(0, t_dense.size - 1, nsparse)).astype(int) y_ds = np.empty((nsignals, nsparse)) # loop through signals for i in range(nsignals): # pad, convolve and resample pad_left = y[i, -(npad + 2):-2] pad_right = y[i, 1:npad + 1] y_ext = np.concatenate((pad_left, y[i, :], pad_right), axis=0) y_mav = np.convolve(y_ext, mav, mode='valid') y_ds[i, :] = y_mav[ids] if nsignals == 1: y_ds = y_ds[0, :] return (t_ds, y_ds) def rescale(x, lb=None, ub=None, lb_new=0, ub_new=1): ''' Rescale a value to a specific interval by linear transformation. ''' if lb is None: lb = x.min() if ub is None: ub = x.max() xnorm = (x - lb) / (ub - lb) return xnorm * (ub_new - lb_new) + lb_new def getStimPulses(t, states): ''' Determine the onset and offset times of pulses from a stimulation vector. :param t: time vector (s). :param states: a vector of stimulation state (ON/OFF) at each instant in time. :return: 3-tuple with number of patches, timing of STIM-ON an STIM-OFF instants. ''' # Compute states derivatives and identify bounds indexes of pulses dstates = np.diff(states) ipulse_on = np.insert(np.where(dstates > 0.0)[0] + 1, 0, 0) ipulse_off = np.where(dstates < 0.0)[0] + 1 if ipulse_off.size < ipulse_on.size: ioff = t.size - 1 if ipulse_off.size == 0: ipulse_off = np.array([ioff]) else: ipulse_off = np.insert(ipulse_off, ipulse_off.size - 1, ioff) # Get time instants for pulses ON and OFF npulses = ipulse_on.size tpulse_on = t[ipulse_on] tpulse_off = t[ipulse_off] # return 3-tuple with #pulses, pulse ON and pulse OFF instants return npulses, tpulse_on, tpulse_off +def plotStimPatches(ax, tpatch_on, tpatch_off, tfactor): + for j in range(tpatch_on.size): + ax.axvspan(tpatch_on[j] * tfactor, tpatch_off[j] * tfactor, + edgecolor='none', facecolor='#8A8A8A', alpha=0.2) + + +def extractPltVar(obj, pltvar, df, nsamples, name): + if 'alias' in pltvar: + var = eval(pltvar['alias']) + elif 'key' in pltvar: + var = df[pltvar['key']] + elif 'constant' in pltvar: + var = eval(pltvar['constant']) * np.ones(nsamples) + else: + var = df[name] + + if var.size == nsamples - 2: + var = np.hstack((np.array([pltvar.get('y0', var[0])] * 2), var)) + var *= pltvar.get('factor', 1) + + return var + + + def getNeuronLookupsFile(mechname, a=None, Fdrive=None, Adrive=None, fs=False): fpath = os.path.join( os.path.split(__file__)[0], 'neurons', '{}_lookups'.format(mechname) ) if a is not None: fpath += '_{:.0f}nm'.format(a * 1e9) if Fdrive is not None: fpath += '_{:.0f}kHz'.format(Fdrive * 1e-3) if Adrive is not None: fpath += '_{:.0f}kPa'.format(Adrive * 1e-3) if fs is True: fpath += '_fs' return '{}.pkl'.format(fpath) def getLookups4D(mechname): ''' Retrieve 4D lookup tables and reference vectors for a given membrane mechanism. :param mechname: name of membrane density mechanism :return: 4-tuple with 1D numpy arrays of reference input vectors (charge density and one other variable), a dictionary of associated 2D lookup numpy arrays, and a dictionnary with information about the other variable. ''' # Check lookup file existence lookup_path = getNeuronLookupsFile(mechname) if not os.path.isfile(lookup_path): raise FileNotFoundError('Missing lookup file: "{}"'.format(lookup_path)) # Load lookups dictionary logger.debug('Loading lookup table') with open(lookup_path, 'rb') as fh: df = pickle.load(fh) inputs = df['input'] lookups4D = df['lookup'] # Retrieve 1D inputs from lookups dictionary aref = inputs['a'] Fref = inputs['f'] Aref = inputs['A'] Qref = inputs['Q'] return aref, Fref, Aref, Qref, lookups4D def getLookupsOff(mechname): ''' Retrieve appropriate US-OFF lookup tables and reference vectors for a given membrane mechanism. :param mechname: name of membrane density mechanism :return: 2-tuple with 1D numpy array of reference charge density and dictionary of associated 1D lookup numpy arrays. ''' # Get 4D lookups and input vectors aref, Fref, Aref, Qref, lookups4D = getLookups4D(mechname) # Perform 2D projection in appropriate dimensions logger.debug('Interpolating lookups at A = 0') lookups_off = {key: y4D[0, 0, 0, :] for key, y4D in lookups4D.items()} return Qref, lookups_off def getLookups2D(mechname, a=None, Fdrive=None, Adrive=None): ''' Retrieve appropriate 2D lookup tables and reference vectors for a given membrane mechanism, projected at a specific combination of sonophore radius, US frequency and/or acoustic pressure amplitude. :param mechname: name of membrane density mechanism :param a: sonophore radius (m) :param Fdrive: US frequency (Hz) :param Adrive: Acoustic peak pressure ampplitude (Hz) :return: 4-tuple with 1D numpy arrays of reference input vectors (charge density and one other variable), a dictionary of associated 2D lookup numpy arrays, and a dictionnary with information about the other variable. ''' # Get 4D lookups and input vectors aref, Fref, Aref, Qref, lookups4D = getLookups4D(mechname) # Check that inputs are within lookup range if a is not None: a = isWithin('radius', a, (aref.min(), aref.max())) if Fdrive is not None: Fdrive = isWithin('frequency', Fdrive, (Fref.min(), Fref.max())) if Adrive is not None: Adrive = isWithin('amplitude', Adrive, (Aref.min(), Aref.max())) # Determine projection dimensions based on inputs var_a = {'name': 'a', 'label': 'sonophore radius', 'val': a, 'unit': 'm', 'factor': 1e9, 'ref': aref, 'axis': 0} var_Fdrive = {'name': 'f', 'label': 'frequency', 'val': Fdrive, 'unit': 'Hz', 'factor': 1e-3, 'ref': Fref, 'axis': 1} var_Adrive = {'name': 'A', 'label': 'amplitude', 'val': Adrive, 'unit': 'Pa', 'factor': 1e-3, 'ref': Aref, 'axis': 2} if not isinstance(Adrive, float): var1 = var_a var2 = var_Fdrive var3 = var_Adrive elif not isinstance(Fdrive, float): var1 = var_a var2 = var_Adrive var3 = var_Fdrive elif not isinstance(a, float): var1 = var_Fdrive var2 = var_Adrive var3 = var_a # Perform 2D projection in appropriate dimensions logger.debug('Interpolating lookups at (%s = %s%s, %s = %s%s)', var1['name'], si_format(var1['val'], space=' '), var1['unit'], var2['name'], si_format(var2['val'], space=' '), var2['unit']) lookups3D = {key: interp1d(var1['ref'], y4D, axis=var1['axis'])(var1['val']) for key, y4D in lookups4D.items()} if var2['axis'] > var1['axis']: var2['axis'] -= 1 lookups2D = {key: interp1d(var2['ref'], y3D, axis=var2['axis'])(var2['val']) for key, y3D in lookups3D.items()} if var3['val'] is not None: logger.debug('Interpolating lookups at %d new %s values between %s%s and %s%s', len(var3['val']), var3['name'], si_format(min(var3['val']), space=' '), var3['unit'], si_format(max(var3['val']), space=' '), var3['unit']) lookups2D = {key: interp1d(var3['ref'], y2D, axis=0)(var3['val']) for key, y2D in lookups2D.items()} var3['ref'] = np.array(var3['val']) return var3['ref'], Qref, lookups2D, var3 def getLookups2Dfs(mechname, a, Fdrive, fs): # Check lookup file existence lookup_path = getNeuronLookupsFile(mechname, a=a, Fdrive=Fdrive, fs=True) if not os.path.isfile(lookup_path): raise FileNotFoundError('Missing lookup file: "{}"'.format(lookup_path)) # Load lookups dictionary logger.debug('Loading lookup table') with open(lookup_path, 'rb') as fh: df = pickle.load(fh) inputs = df['input'] lookups3D = df['lookup'] # Retrieve 1D inputs from lookups dictionary fsref = inputs['fs'] Aref = inputs['A'] Qref = inputs['Q'] # Check that fs is within lookup range fs = isWithin('coverage', fs, (fsref.min(), fsref.max())) # Perform projection at fs logger.debug('Interpolating lookups at fs = %s%%', fs * 1e2) lookups2D = {key: interp1d(fsref, y3D, axis=2)(fs) for key, y3D in lookups3D.items()} return Aref, Qref, lookups2D def isWithin(name, val, bounds, rel_tol=1e-9): ''' Check if a floating point number is within an interval. If the value falls outside the interval, an error is raised. If the value falls just outside the interval due to rounding errors, the associated interval bound is returned. :param val: float value :param bounds: interval bounds (float tuple) :return: original or corrected value ''' if isinstance(val, list) or isinstance(val, np.ndarray) or isinstance(val, tuple): return [isWithin(name, v, bounds, rel_tol) for v in val] if val >= bounds[0] and val <= bounds[1]: return val elif val < bounds[0] and math.isclose(val, bounds[0], rel_tol=rel_tol): logger.warning('Rounding %s value (%s) to interval lower bound (%s)', name, val, bounds[0]) return bounds[0] elif val > bounds[1] and math.isclose(val, bounds[1], rel_tol=rel_tol): logger.warning('Rounding %s value (%s) to interval upper bound (%s)', name, val, bounds[1]) return bounds[1] else: raise ValueError('{} value ({}) out of [{}, {}] interval'.format( name, val, bounds[0], bounds[1])) def getLookupsCompTime(mechname): # Check lookup file existence lookup_path = getNeuronLookupsFile(mechname) if not os.path.isfile(lookup_path): raise FileNotFoundError('Missing lookup file: "{}"'.format(lookup_path)) # Load lookups dictionary logger.debug('Loading comp times') with open(lookup_path, 'rb') as fh: df = pickle.load(fh) tcomps4D = df['tcomp'] return np.sum(tcomps4D) def nernst(z_ion, Cion_in, Cion_out, T): ''' Return the Nernst potential of a specific ion given its intra and extracellular concentrations. :param z_ion: ion valence :param Cion_in: intracellular ion concentration :param Cion_out: extracellular ion concentration :param T: temperature (K) :return: ion Nernst potential (mV) ''' return (Rg * T) / (z_ion * FARADAY) * np.log(Cion_out / Cion_in) * 1e3 def vtrap(x, y): ''' Generic function used to compute rate constants. ''' return x / (np.exp(x / y) - 1) def efun(x): ''' Generic function used to compute rate constants. ''' return x / (np.exp(x) - 1) def ghkDrive(Vm, Z_ion, Cion_in, Cion_out, T): ''' Use the Goldman-Hodgkin-Katz equation to compute the electrochemical driving force of a specific ion species for a given membrane potential. :param Vm: membrane potential (mV) :param Cin: intracellular ion concentration (M) :param Cout: extracellular ion concentration (M) :param T: temperature (K) :return: electrochemical driving force of a single ion particle (mC.m-3) ''' x = Z_ion * FARADAY * Vm / (Rg * T) * 1e-3 # [-] eCin = Cion_in * efun(-x) # M eCout = Cion_out * efun(x) # M return FARADAY * (eCin - eCout) * 1e6 # mC/m3 def getLowIntensitiesSTN(): ''' Return an array of acoustic intensities (W/m2) used to study the STN neuron in Tarnaud, T., Joseph, W., Martens, L., and Tanghe, E. (2018). Computational Modeling of Ultrasonic Subthalamic Nucleus Stimulation. IEEE Trans Biomed Eng. ''' return np.hstack(( np.arange(10, 101, 10), np.arange(101, 131, 1), np.array([140]) )) # W/m2 diff --git a/paper figures/fig9.py b/paper figures/fig9.py index 53a9800..267ec01 100644 --- a/paper figures/fig9.py +++ b/paper figures/fig9.py @@ -1,102 +1,102 @@ # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Date: 2018-12-09 12:06:01 # @Last Modified by: Theo Lemaire -# @Last Modified time: 2019-03-11 14:31:22 +# @Last Modified time: 2019-03-14 17:25:19 ''' Sub-panels of SONIC model validation on an STN neuron (response to CW sonication). ''' import os import logging import numpy as np import matplotlib import matplotlib.pyplot as plt from argparse import ArgumentParser from PySONIC.utils import logger, selectDirDialog, ASTIM_filecode, getLowIntensitiesSTN, Intensity2Pressure from PySONIC.plt import plotFRProfile, plotBatch # Plot parameters matplotlib.rcParams['pdf.fonttype'] = 42 matplotlib.rcParams['ps.fonttype'] = 42 matplotlib.rcParams['font.family'] = 'arial' # Figure basename figbase = os.path.splitext(__file__)[0] def main(): ap = ArgumentParser() # Runtime options ap.add_argument('-v', '--verbose', default=False, action='store_true', help='Increase verbosity') ap.add_argument('-i', '--inputdir', type=str, help='Input directory') ap.add_argument('-f', '--figset', type=str, nargs='+', help='Figure set', default='all') ap.add_argument('-s', '--save', default=False, action='store_true', help='Save output figures as pdf') args = ap.parse_args() loglevel = logging.DEBUG if args.verbose is True else logging.INFO logger.setLevel(loglevel) inputdir = selectDirDialog() if args.inputdir is None else args.inputdir if inputdir == '': logger.error('No input directory chosen') return figset = args.figset if figset is 'all': figset = ['a', 'b'] logger.info('Generating panels {} of {}'.format(figset, figbase)) # Parameters neuron = 'STN' a = 32e-9 # m Fdrive = 500e3 # Hz tstim = 1 # s PRF = 1e2 DC = 1. # Range of intensities intensities = getLowIntensitiesSTN() # W/m2 # Levels depicted with individual traces subset_intensities = [112, 114, 123] # W/m2 # convert to amplitudes and get filepaths amplitudes = Intensity2Pressure(intensities) # Pa fnames = ['{}.pkl'.format(ASTIM_filecode(neuron, a, Fdrive, A, tstim, PRF, DC, 'sonic')) for A in amplitudes] fpaths = [os.path.join(inputdir, 'STN', fn) for fn in fnames] # Generate figures figs = [] if 'a' in figset: fig = plotFRProfile(fpaths, 'Qm', no_offset=True, no_first=False, zref='A', zscale='lin', cmap='Oranges') fig.canvas.set_window_title(figbase + 'a') figs.append(fig) if 'b' in figset: isubset = [np.argwhere(intensities == x)[0][0] for x in subset_intensities] subset_amplitudes = amplitudes[isubset] titles = ['{:.2f} kPa ({:.0f} W/m2)'.format(A * 1e-3, I) for A, I in zip(subset_amplitudes, subset_intensities)] print(titles) - figtraces = plotBatch([fpaths[i] for i in isubset], vars_dict={'Q_m': ['Qm']}) + figtraces = plotBatch([fpaths[i] for i in isubset], pltscheme={'Q_m': ['Qm']}) for fig, title in zip(figtraces, titles): fig.axes[0].set_title(title) fig.canvas.set_window_title(figbase + 'b {}'.format(title)) figs.append(fig) if args.save: for fig in figs: s = fig.canvas.get_window_title() s = s.replace('(', '- ').replace('/', '_').replace(')', '') figname = '{}.pdf'.format(s) fig.savefig(os.path.join(inputdir, figname), transparent=True) else: plt.show() if __name__ == '__main__': main() diff --git a/scripts/plot_tprofiles.py b/scripts/plot_tprofiles.py index d61166d..2d4f04c 100644 --- a/scripts/plot_tprofiles.py +++ b/scripts/plot_tprofiles.py @@ -1,64 +1,64 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Date: 2017-02-13 12:41:26 # @Email: theo.lemaire@epfl.ch # @Last Modified by: Theo Lemaire -# @Last Modified time: 2018-12-27 18:57:43 +# @Last Modified time: 2019-03-14 17:25:40 ''' Plot temporal profiles of specific simulation output variables. ''' import logging from argparse import ArgumentParser import matplotlib.pyplot as plt from PySONIC.utils import logger, OpenFilesDialog from PySONIC.plt import plotComp, plotBatch # Set logging level logger.setLevel(logging.INFO) default_comp = 'Qm' defaults_batch = {'Q_m': ['Qm'], 'V_m': ['Vm']} # defaults_batch = {'Pac': ['Pac'], 'Z': ['Z'], 'Cm': ['Cm'], 'Vm': ['Vm'], 'Qm': ['Qm']} def main(): ap = ArgumentParser() # Runtime options ap.add_argument('-v', '--verbose', default=False, action='store_true', help='Increase verbosity') ap.add_argument('--hide', default=False, action='store_true', help='Hide output') ap.add_argument('-o', '--outputdir', type=str, default=None, help='Output directory') ap.add_argument('-c', '--compare', default=False, action='store_true', help='Comparative graph') ap.add_argument('-s', '--save', default=False, action='store_true', help='Save output') ap.add_argument('--vars', type=str, nargs='+', default=None, help='Variables to plot') ap.add_argument('-f', '--frequency', type=int, default=1, help='Sampling frequency for plot') # Parse arguments args = ap.parse_args() loglevel = logging.DEBUG if args.verbose is True else logging.INFO logger.setLevel(loglevel) # Select data files pkl_filepaths, _ = OpenFilesDialog('pkl') if not pkl_filepaths: logger.error('No input file') return # Comparative plot if args.compare: varname = default_comp if args.vars is None else args.vars[0] plotComp(pkl_filepaths, varname=varname) else: - vars_dict = defaults_batch if args.vars is None else {key: [key] for key in args.vars} - plotBatch(pkl_filepaths, title=True, vars_dict=vars_dict, directory=args.outputdir, + pltscheme = defaults_batch if args.vars is None else {key: [key] for key in args.vars} + plotBatch(pkl_filepaths, title=True, pltscheme=pltscheme, directory=args.outputdir, plt_save=args.save, ask_before_save=not args.save) if not args.hide: plt.show() if __name__ == '__main__': main() diff --git a/scripts/run_astim.py b/scripts/run_astim.py index 542ad29..e65ba54 100644 --- a/scripts/run_astim.py +++ b/scripts/run_astim.py @@ -1,161 +1,157 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Date: 2017-02-13 18:16:09 # @Email: theo.lemaire@epfl.ch # @Last Modified by: Theo Lemaire -# @Last Modified time: 2019-03-13 11:03:01 +# @Last Modified time: 2019-03-14 17:29:52 ''' Run A-STIM simulations of a specific point-neuron. ''' import os import logging import numpy as np import matplotlib.pyplot as plt from argparse import ArgumentParser from PySONIC.core import NeuronalBilayerSonophore from PySONIC.utils import logger, selectDirDialog, Intensity2Pressure, getLowIntensitiesSTN from PySONIC.neurons import getNeuronsDict from PySONIC.batches import createAStimQueue, runBatch from PySONIC.plt import plotBatch # Default parameters defaults = dict( neuron='RS', radius=[32.0], # nm freq=[500.0], # kHz amp=[100.0], # kPa duration=[100.0], # ms PRF=[100.0], # Hz DC=[100.0], # % offset=[50.], # ms method='sonic' ) def runAStimBatch(outdir, nbls, stim_params, method, mpi=False): ''' Run batch A-STIM simulations of the system for various neuron types and stimulation parameters. :param outdir: full path to output directory :param stim_params: dictionary containing sweeps for all stimulation parameters :param method: numerical integration method ("classic", "hybrid" or "sonic") :param mpi: boolean statting wether or not to use multiprocessing :return: list of full paths to the output files ''' mandatory_params = ['freqs', 'durations', 'offsets', 'PRFs', 'DCs'] for mparam in mandatory_params: if mparam not in stim_params: raise ValueError('Missing stimulation parameter field: "{}"'.format(mparam)) logger.info("Starting A-STIM simulation batch") # Generate queue queue = createAStimQueue( stim_params['freqs'], stim_params.get('amps', [None]), stim_params['durations'], stim_params['offsets'], stim_params['PRFs'], stim_params['DCs'], method ) # Run batch return runBatch(nbls, 'runAndSave', queue, extra_params=[outdir], mpi=mpi) def main(): ap = ArgumentParser() # Runtime options ap.add_argument('--mpi', default=False, action='store_true', help='Use multiprocessing') ap.add_argument('-v', '--verbose', default=False, action='store_true', help='Increase verbosity') - ap.add_argument('-p', '--plotQ', default=False, action='store_true', help='Plot Qm') - ap.add_argument('--plotV', default=False, action='store_true', help='Plot Vm') - ap.add_argument('--plotall', default=False, action='store_true', help='Plot all variables') + ap.add_argument('-p', '--plot', type=str, default=None, help='Variables to plot') ap.add_argument('-o', '--outputdir', type=str, default=None, help='Output directory') ap.add_argument('-t', '--titrate', default=False, action='store_true', help='Perform titration') ap.add_argument('-m', '--method', type=str, default=defaults['method'], help='Numerical integration method ("classic", "hybrid" or "sonic")') # Stimulation parameters ap.add_argument('-n', '--neuron', type=str, default=defaults['neuron'], help='Neuron name (string)') ap.add_argument('-a', '--radius', nargs='+', type=float, help='Sonophore radius (nm)') ap.add_argument('-f', '--freq', nargs='+', type=float, help='US frequency (kHz)') ap.add_argument('-A', '--amp', nargs='+', type=float, help='Acoustic pressure amplitude (kPa)') ap.add_argument('-I', '--intensity', nargs='+', type=float, help='Acoustic intensity (W/cm2)') ap.add_argument('--spanI', default=False, action='store_true', help='Span acoustic intensities from 10 to 140 W/m2') ap.add_argument('-d', '--duration', nargs='+', type=float, help='Stimulus duration (ms)') ap.add_argument('--offset', nargs='+', type=float, help='Offset duration (ms)') ap.add_argument('--PRF', nargs='+', type=float, help='PRF (Hz)') ap.add_argument('--DC', nargs='+', type=float, help='Duty cycle (%%)') ap.add_argument('--spanDC', default=False, action='store_true', help='Span DC from 1 to 100%') # Parse arguments args = {key: value for key, value in vars(ap.parse_args()).items() if value is not None} loglevel = logging.DEBUG if args['verbose'] is True else logging.INFO logger.setLevel(loglevel) outdir = args['outputdir'] if 'outputdir' in args else selectDirDialog() - plot = True if (args['plotQ'] or args['plotV'] or args['plotall']) else False mpi = args['mpi'] titrate = args['titrate'] method = args['method'] neuron_str = args['neuron'] radii = np.array(args.get('radius', defaults['radius'])) * 1e-9 # m if args['spanI']: amps = Intensity2Pressure(getLowIntensitiesSTN()) # Pa elif 'amp' in args: amps = np.array(args['amp']) * 1e3 # Pa elif 'intensity' in args: amps = Intensity2Pressure(np.array(args['intensity']) * 1e4) # Pa else: amps = np.array(defaults['amp']) * 1e3 # Pa if args['spanDC']: DCs = np.arange(1, 101) # % else: DCs = np.array(args.get('DC', defaults['DC'])) # % stim_params = dict( freqs=np.array(args.get('freq', defaults['freq'])) * 1e3, # Hz amps=amps, # Pa durations=np.array(args.get('duration', defaults['duration'])) * 1e-3, # s PRFs=np.array(args.get('PRF', defaults['PRF'])), # Hz DCs=DCs * 1e-2, # (-) offsets=np.array(args.get('offset', defaults['offset'])) * 1e-3 # s ) if titrate: stim_params['amps'] = [None] # Run A-STIM batch if neuron_str not in getNeuronsDict(): logger.error('Unknown neuron type: "%s"', neuron_str) return neuron = getNeuronsDict()[neuron_str]() pkl_filepaths = [] for a in radii: nbls = NeuronalBilayerSonophore(a, neuron) pkl_filepaths += runAStimBatch(outdir, nbls, stim_params, method, mpi=mpi) pkl_dir, _ = os.path.split(pkl_filepaths[0]) # Plot resulting profiles - if plot: - if args['plotQ']: - vars_dict = {'Q_m': ['Qm']} - elif args['plotV']: - vars_dict = {'V_m': ['Vm']} - elif args['plotall']: - vars_dict = None - plotBatch(pkl_filepaths, vars_dict=vars_dict, fs=10) + if args['plot']: + pltscheme = { + 'Q': {'Q_m': ['Qm']}, + 'V': {'V_m': ['Vm']}, + 'all': None + }[args['plot']] + plotBatch(pkl_filepaths, pltscheme=pltscheme, fs=10) plt.show() if __name__ == '__main__': main() diff --git a/scripts/run_mech.py b/scripts/run_mech.py index b287521..f600e62 100644 --- a/scripts/run_mech.py +++ b/scripts/run_mech.py @@ -1,122 +1,122 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Date: 2016-11-21 10:46:56 # @Email: theo.lemaire@epfl.ch # @Last Modified by: Theo Lemaire -# @Last Modified time: 2018-11-22 17:55:11 +# @Last Modified time: 2019-03-14 16:31:50 ''' Run simulations of the NICE mechanical model. ''' import os import logging import numpy as np import matplotlib.pyplot as plt from argparse import ArgumentParser from PySONIC.core import BilayerSonophore from PySONIC.utils import logger, selectDirDialog, Intensity2Pressure from PySONIC.neurons import CorticalRS from PySONIC.batches import createQueue, runBatch from PySONIC.plt import plotBatch # Default parameters defaults = dict( Cm0=CorticalRS().Cm0 * 1e2, # uF/m2 Qm0=CorticalRS().Vm0, # nC/m2 radius=[32.0], # nm embedding=[0.], # um freq=[500.0], # kHz amp=[100.0], # kPa charge=[0.] # nC/cm2 ) def runMechBatch(outdir, bls, stim_params, mpi=False): ''' Run batch simulations of the mechanical system with imposed values of charge density. :param outdir: full path to output directory :param bls: BilayerSonophore object :param stim_params: dictionary containing sweeps for all stimulation parameters :param mpi: boolean stating whether or not to use multiprocessing :return: list of full paths to the output files ''' # Checking validity of stimulation parameters mandatory_params = ['freqs', 'amps', 'charges'] for mparam in mandatory_params: if mparam not in stim_params: raise ValueError('Missing stimulation parameter field: "{}"'.format(mparam)) logger.info("Starting mechanical simulation batch") # Unpack stimulation parameters freqs = np.array(stim_params['freqs']) amps = np.array(stim_params['amps']) charges = np.array(stim_params['charges']) # Generate simulations queue and run batch queue = createQueue((freqs, amps, charges)) return runBatch(bls, 'runAndSave', queue, extra_params=[outdir], mpi=mpi) def main(): ap = ArgumentParser() # Runtime options ap.add_argument('--mpi', default=False, action='store_true', help='Use multiprocessing') ap.add_argument('-v', '--verbose', default=False, action='store_true', help='Increase verbosity') ap.add_argument('-p', '--plot', default=False, action='store_true', help='Plot results') ap.add_argument('-o', '--outputdir', type=str, default=None, help='Output directory') # Stimulation parameters ap.add_argument('-a', '--radius', nargs='+', type=float, help='Sonophore radius (nm)') ap.add_argument('--Cm0', type=float, default=defaults['Cm0'], help='Resting membrane capacitance (uF/cm2)') ap.add_argument('--Qm0', type=float, default=defaults['Qm0'], help='Resting membrane charge density (nC/cm2)') ap.add_argument('-d', '--embedding', nargs='+', type=float, help='Embedding depth (um)') ap.add_argument('-f', '--freq', nargs='+', type=float, help='US frequency (kHz)') ap.add_argument('-A', '--amp', nargs='+', type=float, help='Acoustic pressure amplitude (kPa)') ap.add_argument('-I', '--intensity', nargs='+', type=float, help='Acoustic intensity (W/cm2)') ap.add_argument('-Q', '--charge', nargs='+', type=float, help='Membrane charge density (nC/cm2)') # Parse arguments args = {key: value for key, value in vars(ap.parse_args()).items() if value is not None} loglevel = logging.DEBUG if args['verbose'] is True else logging.INFO logger.setLevel(loglevel) outdir = args['outputdir'] if 'outputdir' in args else selectDirDialog() mpi = args['mpi'] plot = args['plot'] Cm0 = args['Cm0'] * 1e-2 # F/m2 Qm0 = args['Qm0'] * 1e-5 # C/m2 radii = np.array(args.get('radius', defaults['radius'])) * 1e-9 # m embeddings = np.array(args.get('embedding', defaults['embedding'])) * 1e-6 # m if 'amp' in args: amps = np.array(args['amp']) * 1e3 # Pa elif 'intensity' in args: amps = Intensity2Pressure(np.array(args['intensity']) * 1e4) # Pa else: amps = np.array(defaults['amp']) * 1e3 # Pa stim_params = dict( freqs=np.array(args.get('freq', defaults['freq'])) * 1e3, # Hz amps=amps, # Pa charges=np.array(args.get('charge', defaults['charge'])) * 1e-5 # C/m2 ) # Run MECH batch pkl_filepaths = [] for a in radii: for d in embeddings: bls = BilayerSonophore(a, Cm0, Qm0, embedding_depth=d) pkl_filepaths += runMechBatch(outdir, bls, stim_params, mpi=mpi) pkl_dir, _ = os.path.split(pkl_filepaths[0]) # Plot resulting profiles if plot: - plotBatch(pkl_filepaths, vars_dict={'Z': ['Z']}) + plotBatch(pkl_filepaths) plt.show() if __name__ == '__main__': main()