diff --git a/PointNICE/bls.py b/PointNICE/bls.py index fcff0ea..c26a56a 100644 --- a/PointNICE/bls.py +++ b/PointNICE/bls.py @@ -1,685 +1,685 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Date: 2016-09-29 16:16:19 # @Email: theo.lemaire@epfl.ch # @Last Modified by: Theo Lemaire -# @Last Modified time: 2018-05-15 11:23:57 +# @Last Modified time: 2018-07-23 14:03:27 import inspect import logging import warnings import numpy as np import scipy.integrate as integrate from scipy.optimize import brentq, curve_fit from .utils import * from .constants import * # Get package logger logger = logging.getLogger('PointNICE') class BilayerSonophore: """ This class contains the geometric and mechanical parameters of the Bilayer Sonophore Model, as well as all the core functions needed to compute the dynamics (kinetics and kinematics) of the bilayer membrane cavitation, and run dynamic BLS simulations. """ # BIOMECHANICAL PARAMETERS T = 309.15 # Temperature (K) Rg = 8.314 # Universal gas constant (Pa.m^3.mol^-1.K^-1) delta0 = 2.0e-9 # Thickness of the leaflet (m) Delta_ = 1.4e-9 # Initial gap between the two leaflets on a non-charged membrane at equil. (m) pDelta = 1.0e5 # Attraction/repulsion pressure coefficient (Pa) m = 5.0 # Exponent in the repulsion term (dimensionless) n = 3.3 # Exponent in the attraction term (dimensionless) rhoL = 1075.0 # Density of the surrounding fluid (kg/m^3) muL = 7.0e-4 # Dynamic viscosity of the surrounding fluid (Pa.s) muS = 0.035 # Dynamic viscosity of the leaflet (Pa.s) kA = 0.24 # Area compression modulus of the leaflet (N/m) alpha = 7.56 # Tissue shear loss modulus frequency coefficient (Pa.s) C0 = 0.62 # Initial gas molar concentration in the surrounding fluid (mol/m^3) kH = 1.613e5 # Henry's constant (Pa.m^3/mol) P0 = 1.0e5 # Static pressure in the surrounding fluid (Pa) Dgl = 3.68e-9 # Diffusion coefficient of gas in the fluid (m^2/s) xi = 0.5e-9 # Boundary layer thickness for gas transport across leaflet (m) c = 1515.0 # Speed of sound in medium (m/s) # BIOPHYSICAL PARAMETERS epsilon0 = 8.854e-12 # Vacuum permittivity (F/m) epsilonR = 1.0 # Relative permittivity of intramembrane cavity (dimensionless) def __init__(self, diameter, Fdrive, Cm0, Qm0, embedding_depth=0.0): """ Constructor of the class. :param diameter: in-plane diameter of the sonophore structure within the membrane (m) :param Fdrive: frequency of acoustic perturbation (Hz) :param Cm0: membrane resting capacitance (F/m2) :param Qm0: membrane resting charge density (C/m2) :param embedding_depth: depth of the embedding tissue around the membrane (m) """ logger.debug('%.1f nm BLS initialization at %.2f kHz, %.2f nC/cm2', diameter * 1e9, Fdrive * 1e-3, Qm0 * 1e5) # Extract resting constants and geometry self.Cm0 = Cm0 self.Qm0 = Qm0 self.a = diameter self.d = embedding_depth self.S0 = np.pi * self.a**2 # Derive frequency-dependent tissue elastic modulus G_tissue = self.alpha * Fdrive # G'' (Pa) self.kA_tissue = 2 * G_tissue * self.d # kA of the tissue layer (N/m) # Check existence of lookups for derived parameters lookups = get_BLS_lookups(self.a) Qkey = '{:.2f}'.format(Qm0 * 1e5) # If no lookup, compute parameters and store them in lookup if not lookups or Qkey not in lookups: # Find Delta that cancels out Pm + Pec at Z = 0 (m) if self.Qm0 == 0.0: D_eq = self.Delta_ else: (D_eq, Pnet_eq) = self.findDeltaEq(self.Qm0) assert Pnet_eq < PNET_EQ_MAX, 'High Pnet at Z = 0 with ∆ = %.2f nm' % (D_eq * 1e9) self.Delta = D_eq # Find optimal Lennard-Jones parameters to approximate PMavg (LJ_approx, std_err, _) = self.LJfitPMavg() assert std_err < PMAVG_STD_ERR_MAX, 'High error in PmAvg nonlinear fit:'\ ' std_err = %.2f Pa' % std_err self.LJ_approx = LJ_approx lookups[Qkey] = {'LJ_approx': LJ_approx, 'Delta_eq': D_eq} logger.debug('Saving BLS derived parameters to lookup file') save_BLS_lookups(self.a, lookups) # If lookup exists, load parameters from it else: logger.debug('Loading BLS derived parameters from lookup file') self.LJ_approx = lookups[Qkey]['LJ_approx'] self.Delta = lookups[Qkey]['Delta_eq'] # Compute initial volume and gas content self.V0 = np.pi * self.Delta * self.a**2 self.ng0 = self.gasPa2mol(self.P0, self.V0) def __str__(self): s = '-------- Bilayer Sonophore --------\n' s += 'class attributes:\n' class_attrs = inspect.getmembers(self.__class__, lambda a: not(inspect.isroutine(a))) class_attrs = [a for a in class_attrs if not(a[0].startswith('__') and a[0].endswith('__'))] for ca in class_attrs: s += '{} = {}\n'.format(ca[0], ca[1]) s += 'instance attributes:\n' inst_attrs = inspect.getmembers(self, lambda a: not(inspect.isroutine(a))) inst_attrs = [a for a in inst_attrs if not(a[0].startswith('__') and a[0].endswith('__')) and a not in class_attrs] for ia in inst_attrs: s += '{} = {}\n'.format(ia[0], ia[1]) return s def reinit(self): logger.debug('Re-initializing BLS object') # Find Delta that cancels out Pm + Pec at Z = 0 (m) if self.Qm0 == 0.0: D_eq = self.Delta_ else: (D_eq, Pnet_eq) = self.findDeltaEq(self.Qm0) assert Pnet_eq < PNET_EQ_MAX, 'High Pnet at Z = 0 with ∆ = %.2f nm' % (D_eq * 1e9) self.Delta = D_eq # Compute initial volume and gas content self.V0 = np.pi * self.Delta * self.a**2 self.ng0 = self.gasPa2mol(self.P0, self.V0) def curvrad(self, Z): """ Return the (signed) instantaneous curvature radius of the leaflet. :param Z: leaflet apex outward deflection value (m) :return: leaflet curvature radius (m) """ if Z == 0.0: return np.inf else: return (self.a**2 + Z**2) / (2 * Z) def v_curvrad(self, Z): ''' Vectorized curvrad function ''' return np.array([self.curvrad(z) for z in Z]) def surface(self, Z): """ Return the surface area of the stretched leaflet (spherical cap). :param Z: leaflet apex outward deflection value (m) :return: surface of the stretched leaflet (m^2) """ return np.pi * (self.a**2 + Z**2) def volume(self, Z): """ Return the total volume of the inter-leaflet space (cylinder +/- spherical cap). :param Z: leaflet apex outward deflection value (m) :return: inner volume of the bilayer sonophore structure (m^3) """ return np.pi * self.a**2 * self.Delta\ * (1 + (Z / (3 * self.Delta) * (3 + Z**2 / self.a**2))) def arealstrain(self, Z): """ Compute the areal strain of the stretched leaflet. epsilon = (S - S0)/S0 = (Z/a)^2 :param Z: leaflet apex outward deflection value (m) :return: areal strain (dimensionless) """ return (Z / self.a)**2 def Capct(self, Z): """ Compute the membrane capacitance per unit area, under the assumption of parallel-plate capacitor with average inter-layer distance. :param Z: leaflet apex outward deflection value (m) :return: capacitance per unit area (F/m2) """ if Z == 0.0: return self.Cm0 else: return ((self.Cm0 * self.Delta / self.a**2) * (Z + (self.a**2 - Z**2 - Z * self.Delta) / (2 * Z) * np.log((2 * Z + self.Delta) / self.Delta))) def v_Capct(self, Z): ''' Vectorized Capct function ''' return np.array([self.Capct(z) for z in Z]) def derCapct(self, Z, U): """ Compute the derivative of the membrane capacitance per unit area with respect to time, under the assumption of parallel-plate capacitor. :param Z: leaflet apex outward deflection value (m) :param U: leaflet apex outward deflection velocity (m/s) :return: derivative of capacitance per unit area (F/m2.s) """ dCmdZ = ((self.Cm0 * self.Delta / self.a**2) * ((Z**2 + self.a**2) / (Z * (2 * Z + self.Delta)) - ((Z**2 + self.a**2) * np.log((2 * Z + self.Delta) / self.Delta)) / (2 * Z**2))) return dCmdZ * U def localdef(self, r, Z, R): """ Compute the (signed) local transverse leaflet deviation at a distance r from the center of the dome. :param r: in-plane distance from center of the sonophore (m) :param Z: leaflet apex outward deflection value (m) :param R: leaflet curvature radius (m) :return: local transverse leaflet deviation (m) """ if np.abs(Z) == 0.0: return 0.0 else: return np.sign(Z) * (np.sqrt(R**2 - r**2) - np.abs(R) + np.abs(Z)) def Pacoustic(self, t, Adrive, Fdrive, phi=np.pi): """ Compute the acoustic pressure at a specific time, given the amplitude, frequency and phase of the acoustic stimulus. :param t: time of interest :param Adrive: acoustic drive amplitude (Pa) :param Fdrive: acoustic drive frequency (Hz) :param phi: acoustic drive phase (rad) """ return Adrive * np.sin(2 * np.pi * Fdrive * t - phi) def PMlocal(self, r, Z, R): """ Compute the local intermolecular pressure. :param r: in-plane distance from center of the sonophore (m) :param Z: leaflet apex outward deflection value (m) :param R: leaflet curvature radius (m) :return: local intermolecular pressure (Pa) """ z = self.localdef(r, Z, R) relgap = (2 * z + self.Delta) / self.Delta_ return self.pDelta * ((1 / relgap)**self.m - (1 / relgap)**self.n) def PMavg(self, Z, R, S): """ Compute the average intermolecular pressure felt across the leaflet by quadratic integration. :param Z: leaflet apex outward deflection value (m) :param R: leaflet curvature radius (m) :param S: surface of the stretched leaflet (m^2) :return: averaged intermolecular resultant pressure across the leaflet (Pa) .. warning:: quadratic integration is computationally expensive. """ # Intermolecular force over an infinitely thin ring of radius r fMring = lambda r, Z, R: 2 * np.pi * r * self.PMlocal(r, Z, R) # Integrate from 0 to a fTotal, _ = integrate.quad(fMring, 0, self.a, args=(Z, R)) return fTotal / S def v_PMavg(self, Z, R, S): ''' Vectorized PMavg function ''' return np.array([self.PMavg(Z[i], R[i], S[i]) for i in range(Z.size)]) def LJfitPMavg(self): """ Determine optimal parameters of a Lennard-Jones expression approximating the average intermolecular pressure. These parameters are obtained by a nonlinear fit of the Lennard-Jones function for a range of deflection values between predetermined Zmin and Zmax. :return: 3-tuple with optimized LJ parameters for PmAvg prediction (Map) and the standard and max errors of the prediction in the fitting range (in Pascals) """ # Determine lower bound of deflection range: when Pm = Pmmax PMmax = LJFIT_PM_MAX # Pa Zminlb = -0.49 * self.Delta Zminub = 0.0 f = lambda Z, Pmmax: self.PMavg(Z, self.curvrad(Z), self.surface(Z)) - PMmax Zmin = brentq(f, Zminlb, Zminub, args=(PMmax), xtol=1e-16) # Create vectors for geometric variables Zmax = 2 * self.a Z = np.arange(Zmin, Zmax, 1e-11) Pmavg = self.v_PMavg(Z, self.v_curvrad(Z), self.surface(Z)) # Compute optimal nonlinear fit of custom LJ function with initial guess x0_guess = self.delta0 C_guess = 0.1 * self.pDelta nrep_guess = self.m nattr_guess = self.n pguess = (x0_guess, C_guess, nrep_guess, nattr_guess) popt, _ = curve_fit(lambda x, x0, C, nrep, nattr: LennardJones(x, self.Delta, x0, C, nrep, nattr), Z, Pmavg, p0=pguess, maxfev=10000) (x0_opt, C_opt, nrep_opt, nattr_opt) = popt Pmavg_fit = LennardJones(Z, self.Delta, x0_opt, C_opt, nrep_opt, nattr_opt) # Compute prediction error residuals = Pmavg - Pmavg_fit ss_res = np.sum(residuals**2) N = residuals.size std_err = np.sqrt(ss_res / N) max_err = max(np.abs(residuals)) logger.debug('LJ approx: x0 = %.2f nm, C = %.2f kPa, m = %.2f, n = %.2f', x0_opt * 1e9, C_opt * 1e-3, nrep_opt, nattr_opt) LJ_approx = {"x0": x0_opt, "C": C_opt, "nrep": nrep_opt, "nattr": nattr_opt} return (LJ_approx, std_err, max_err) def PMavgpred(self, Z): """ Return the predicted intermolecular pressure based on a specific Lennard-Jones function fitted on the deflection physiological range. :param Z: leaflet apex outward deflection value (m) :return: predicted average intermolecular pressure (Pa) """ return LennardJones(Z, self.Delta, self.LJ_approx['x0'], self.LJ_approx['C'], self.LJ_approx['nrep'], self.LJ_approx['nattr']) def Pelec(self, Z, Qm): """ Compute the electric equivalent pressure term. :param Z: leaflet apex outward deflection value (m) :param Qm: membrane charge density (C/m2) :return: electric equivalent pressure (Pa) """ relS = self.S0 / self.surface(Z) abs_perm = self.epsilon0 * self.epsilonR # F/m return -relS * Qm**2 / (2 * abs_perm) # Pa def findDeltaEq(self, Qm): """ Compute the Delta that cancels out the (Pm + Pec) equation at Z = 0 for a given membrane charge density, using the Brent method to refine the pressure root iteratively. :param Qm: membrane charge density (C/m2) :return: equilibrium value (m) and associated pressure (Pa) """ f = lambda Delta: (self.pDelta * ((self.Delta_ / Delta)**self.m - (self.Delta_ / Delta)**self.n) + self.Pelec(0.0, Qm)) Delta_lb = 0.1 * self.Delta_ Delta_ub = 2.0 * self.Delta_ Delta_eq = brentq(f, Delta_lb, Delta_ub, xtol=1e-16) logger.debug('∆eq = %.2f nm', Delta_eq * 1e9) return (Delta_eq, f(Delta_eq)) def gasflux(self, Z, P): """ Compute the gas molar flux through the BLS boundary layer for an unsteady system. :param Z: leaflet apex outward deflection value (m) :param P: internal gas pressure in the inter-leaflet space (Pa) :return: gas molar flux (mol/s) """ dC = self.C0 - P / self.kH return 2 * self.surface(Z) * self.Dgl * dC / self.xi def gasmol2Pa(self, ng, V): """ Compute the gas pressure in the inter-leaflet space for an unsteady system, from the value of gas molar content. :param ng: internal molar content (mol) :param V: inner volume of the bilayer sonophore structure (m^3) :return: internal gas pressure (Pa) """ return ng * self.Rg * self.T / V def gasPa2mol(self, P, V): """ Compute the gas molar content in the inter-leaflet space for an unsteady system, from the value of internal gas pressure. :param P: internal gas pressure in the inter-leaflet space (Pa) :param V: inner volume of the bilayer sonophore structure (m^3) :return: internal gas molar content (mol) """ return P * V / (self.Rg * self.T) def PtotQS(self, Z, ng, Qm, Pac, Pm_comp_method): """ Compute the balance pressure of the quasi-steady system, upon application of an external perturbation on a charged membrane: Ptot = Pm + Pg + Pec - P0 - Pac. :param Z: leaflet apex outward deflection value (m) :param ng: internal molar content (mol) :param Qm: membrane charge density (C/m2) :param Pac: external acoustic perturbation (Pa) :param Pm_comp_method: type of method used to compute average intermolecular pressure :return: total balance pressure (Pa) """ if Pm_comp_method is PmCompMethod.direct: Pm = self.PMavg(Z, self.curvrad(Z), self.surface(Z)) elif Pm_comp_method is PmCompMethod.predict: Pm = self.PMavgpred(Z) return Pm + self.gasmol2Pa(ng, self.volume(Z)) - self.P0 - Pac + self.Pelec(Z, Qm) def balancedefQS(self, ng, Qm, Pac=0.0, Pm_comp_method=PmCompMethod.predict): """ Compute the leaflet deflection upon application of an external perturbation to a quasi-steady system with a charged membrane. This function uses the Brent method (progressive approximation of function root) to solve the following transcendental equation for Z: Pm + Pg + Pec - P0 - Pac = 0. :param ng: internal molar content (mol) :param Qm: membrane charge density (C/m2) :param Pac: external acoustic perturbation (Pa) :param Pm_comp_method: type of method used to compute average intermolecular pressure :return: leaflet deflection (Z) canceling out the balance equation """ lb = -0.49 * self.Delta ub = self.a Plb = self.PtotQS(lb, ng, Qm, Pac, Pm_comp_method) Pub = self.PtotQS(ub, ng, Qm, Pac, Pm_comp_method) assert (Plb > 0 > Pub), '[%d, %d] is not a sign changing interval for PtotQS' % (lb, ub) return brentq(self.PtotQS, lb, ub, args=(ng, Qm, Pac, Pm_comp_method), xtol=1e-16) def TEleaflet(self, Z): """ Compute the circumferential elastic tension felt across the entire leaflet upon stretching. :param Z: leaflet apex outward deflection value (m) :return: circumferential elastic tension (N/m) """ return self.kA * self.arealstrain(Z) def TEtissue(self, Z): """ Compute the circumferential elastic tension felt across the embedding viscoelastic tissue layer upon stretching. :param Z: leaflet apex outward deflection value (m) :return: circumferential elastic tension (N/m) """ return self.kA_tissue * self.arealstrain(Z) def TEtot(self, Z): """ Compute the total circumferential elastic tension (leaflet and embedding tissue) felt upon stretching. :param Z: leaflet apex outward deflection value (m) :return: circumferential elastic tension (N/m) """ return self.TEleaflet(Z) + self.TEtissue(Z) def PEtot(self, Z, R): """ Compute the total elastic tension pressure (leaflet + embedding tissue) felt upon stretching. :param Z: leaflet apex outward deflection value (m) :param R: leaflet curvature radius (m) :return: elastic tension pressure (Pa) """ return - self.TEtot(Z) / R def PVleaflet(self, U, R): """ Compute the viscous stress felt across the entire leaflet upon stretching. :param U: leaflet apex outward deflection velocity (m/s) :param R: leaflet curvature radius (m) :return: leaflet viscous stress (Pa) """ return - 12 * U * self.delta0 * self.muS / R**2 def PVfluid(self, U, R): """ Compute the viscous stress felt across the entire fluid upon stretching. :param U: leaflet apex outward deflection velocity (m/s) :param R: leaflet curvature radius (m) :return: fluid viscous stress (Pa) """ return - 4 * U * self.muL / np.abs(R) def accP(self, Pres, R): """ Compute the pressure-driven acceleration of the leaflet in the unsteady system, upon application of an external perturbation. :param Pres: net resultant pressure (Pa) :param R: leaflet curvature radius (m) :return: pressure-driven acceleration (m/s^2) """ return Pres / (self.rhoL * np.abs(R)) def accNL(self, U, R): """ Compute the non-linear term of the leaflet acceleration in the unsteady system, upon application of an external perturbation. :param U: leaflet apex outward deflection velocity (m/s) :param R: leaflet curvature radius (m) :return: nonlinear acceleration (m/s^2) .. note:: A simplified version of nonlinear acceleration (neglecting dR/dH) is used here. """ # return - (3/2 - 2*R/H) * U**2 / R return -(3 * U**2) / (2 * R) def eqMech(self, y, t, Adrive, Fdrive, Qm, phi, Pm_comp_method=PmCompMethod.predict): """ Compute the derivatives of the 3-ODE mechanical system variables, with an imposed constant charge density. :param y: vector of HH system variables at time t :param t: specific instant in time (s) :param Adrive: acoustic drive amplitude (Pa) :param Fdrive: acoustic drive frequency (Hz) :param Qm: membrane charge density (F/m2) :param phi: acoustic drive phase (rad) :param Pm_comp_method: type of method used to compute average intermolecular pressure :return: vector of mechanical system derivatives at time t """ # Split input vector explicitly (U, Z, ng) = y # Correct deflection value is below critical compression if Z < -0.5 * self.Delta: logger.warning('Deflection out of range: Z = %.2f nm', Z * 1e9) Z = -0.49 * self.Delta # Compute curvature radius R = self.curvrad(Z) # Compute total pressure Pg = self.gasmol2Pa(ng, self.volume(Z)) if Pm_comp_method is PmCompMethod.direct: Pm = self.PMavg(Z, self.curvrad(Z), self.surface(Z)) elif Pm_comp_method is PmCompMethod.predict: Pm = self.PMavgpred(Z) Ptot = (Pm + Pg - self.P0 - self.Pacoustic(t, Adrive, Fdrive, phi) + self.PEtot(Z, R) + self.PVleaflet(U, R) + self.PVfluid(U, R) + self.Pelec(Z, Qm)) # Compute derivatives dUdt = self.accP(Ptot, R) + self.accNL(U, R) dZdt = U dngdt = self.gasflux(Z, Pg) # Return derivatives vector return [dUdt, dZdt, dngdt] def run(self, Fdrive, Adrive, Qm, phi=np.pi, Pm_comp_method=PmCompMethod.predict): """ Compute short solutions of the mechanical system for specific US stimulation parameters and with an imposed membrane charge density. :param Fdrive: acoustic drive frequency (Hz) :param Adrive: acoustic drive amplitude (Pa) :param phi: acoustic drive phase (rad) :param Qm: imposed membrane charge density (C/m2) :param Pm_comp_method: type of method used to compute average intermolecular pressure :return: 3-tuple with the time profile, the solution matrix and a state vector """ # Check validity of stimulation parameters if not all(isinstance(param, float) for param in [Fdrive, Adrive, Qm, phi]): raise InputError('Invalid stimulation parameters (must be float typed)') if Fdrive <= 0: raise InputError('Invalid US driving frequency: {} kHz (must be strictly positive)' .format(Fdrive * 1e-3)) if Adrive < 0: raise InputError('Invalid US pressure amplitude: {} kPa (must be positive or null)' .format(Adrive * 1e-3)) if Qm < CHARGE_RANGE[0] or Qm > CHARGE_RANGE[1]: raise InputError('Invalid applied charge: {} nC/cm2 (must be within [{}, {}] interval' .format(Qm * 1e5, CHARGE_RANGE[0] * 1e5, CHARGE_RANGE[1] * 1e5)) if phi < 0 or phi >= 2 * np.pi: raise InputError('Invalid US pressure phase: {:.2f} rad (must be within [0, 2 PI[ rad' .format(phi)) # Raise warnings as error warnings.filterwarnings('error') # Determine mechanical system time step Tdrive = 1 / Fdrive dt_mech = Tdrive / NPC_FULL t_mech_cycle = np.linspace(0, Tdrive - dt_mech, NPC_FULL) # Initialize system variables t0 = 0.0 Z0 = 0.0 U0 = 0.0 ng0 = self.ng0 # Solve quasi-steady equation to compute first deflection value Pac1 = self.Pacoustic(t0 + dt_mech, Adrive, Fdrive, phi) Z1 = self.balancedefQS(ng0, Qm, Pac1, Pm_comp_method) U1 = (Z1 - Z0) / dt_mech # Construct arrays to hold system variables states = np.array([1, 1]) t = np.array([t0, t0 + dt_mech]) y = np.array([[U0, U1], [Z0, Z1], [ng0, ng0]]) # Integrate mechanical system for a few acoustic cycles until stabilization j = 0 ng_last = None Z_last = None periodic_conv = False while not periodic_conv and j < NCYCLES_MAX: t_mech = t_mech_cycle + t[-1] + dt_mech y_mech = integrate.odeint(self.eqMech, y[:, -1], t_mech, args=(Adrive, Fdrive, Qm, phi, Pm_comp_method)).T # Compare Z and ng signals over the last 2 acoustic periods if j > 0: Z_rmse = rmse(Z_last, y_mech[1, :]) ng_rmse = rmse(ng_last, y_mech[2, :]) logger.debug('step %u: Z_rmse = %.2e m, ng_rmse = %.2e mol', j, Z_rmse, ng_rmse) if Z_rmse < Z_ERR_MAX and ng_rmse < NG_ERR_MAX: periodic_conv = True # Update last vectors for next comparison Z_last = y_mech[1, :] ng_last = y_mech[2, :] # Concatenate time and solutions to global vectors states = np.concatenate([states, np.ones(NPC_FULL)], axis=0) t = np.concatenate([t, t_mech], axis=0) y = np.concatenate([y, y_mech], axis=1) # Increment loop index j += 1 if j == NCYCLES_MAX: logger.warning('No convergence: stopping after %u cycles', j) else: logger.debug('Periodic convergence after %u cycles', j) states[-1] = 0 # return output variables return (t, y[1:, :], states) diff --git a/PointNICE/plt/pltutils.py b/PointNICE/plt/pltutils.py index a84416e..d00e2da 100644 --- a/PointNICE/plt/pltutils.py +++ b/PointNICE/plt/pltutils.py @@ -1,1237 +1,1235 @@ # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Date: 2017-08-23 14:55:37 # @Last Modified by: Theo Lemaire -# @Last Modified time: 2018-07-12 17:21:39 +# @Last Modified time: 2018-07-23 14:29:15 ''' Plotting utilities ''' import sys import os import pickle import ntpath import re import logging import tkinter as tk from tkinter import filedialog import numpy as np from scipy.interpolate import interp2d # from scipy.optimize import brentq import matplotlib import matplotlib.pyplot as plt from matplotlib.patches import Rectangle import matplotlib.cm as cm from matplotlib.ticker import FormatStrFormatter import pandas as pd from .. import neurons from ..constants import DT_EFF from ..utils import getNeuronsDict, getLookupDir, rescale, InputError, computeMeshEdges, si_format, itrpLookupsFreq from ..bls import BilayerSonophore from .pltvars import pltvars matplotlib.rcParams['pdf.fonttype'] = 42 matplotlib.rcParams['ps.fonttype'] = 42 matplotlib.rcParams['font.family'] = 'arial' # Get package logger logger = logging.getLogger('PointNICE') # Define global variables neuron = None bls = None timeunits = {'ASTIM': 't_ms', 'ESTIM': 't_ms', 'MECH': 't_us'} # Regular expression for input files rgxp = re.compile('(ESTIM|ASTIM)_([A-Za-z]*)_(.*).pkl') rgxp_mech = re.compile('(MECH)_(.*).pkl') # nb = '[0-9]*[.]?[0-9]+' # rgxp_ASTIM = re.compile('(ASTIM)_(\w+)_(PW|CW)_({0})nm_({0})kHz_({0})kPa_({0})ms(.*)_(\w+).pkl'.format(nb)) # rgxp_ESTIM = re.compile('(ESTIM)_(\w+)_(PW|CW)_({0})mA_per_m2_({0})ms(.*).pkl'.format(nb)) # rgxp_PW = re.compile('_PRF({0})kHz_DC({0})_(PW|CW)_(\d+)kHz_(\d+)kPa_(\d+)ms_(.*).pkl'.format(nb)) # Figure naming conventions ESTIM_CW_title = '{} neuron: CW E-STIM {:.2f}mA/m2, {:.0f}ms' ESTIM_PW_title = '{} neuron: PW E-STIM {:.2f}mA/m2, {:.0f}ms, {:.2f}Hz PRF, {:.0f}% DC' ASTIM_CW_title = '{} neuron: CW A-STIM {:.0f}kHz, {:.0f}kPa, {:.0f}ms' ASTIM_PW_title = '{} neuron: PW A-STIM {:.0f}kHz, {:.0f}kPa, {:.0f}ms, {:.2f}Hz PRF, {:.2f}% DC' MECH_title = '{:.0f}nm BLS structure: MECH-STIM {:.0f}kHz, {:.0f}kPa' def cm2inch(*tupl): inch = 2.54 if isinstance(tupl[0], tuple): return tuple(i / inch for i in tupl[0]) else: return tuple(i / inch for i in tupl) class InteractiveLegend(object): """ Class defining an interactive matplotlib legend, where lines visibility can be toggled by simply clicking on the corresponding legend label. Other graphic objects can also be associated to the toggle of a specific line Adapted from: http://stackoverflow.com/questions/31410043/hiding-lines-after-showing-a-pyplot-figure """ def __init__(self, legend, aliases): self.legend = legend self.fig = legend.axes.figure self.lookup_artist, self.lookup_handle = self._build_lookups(legend) self._setup_connections() self.handles_aliases = aliases self.update() def _setup_connections(self): for artist in self.legend.texts + self.legend.legendHandles: artist.set_picker(10) # 10 points tolerance self.fig.canvas.mpl_connect('pick_event', self.on_pick) def _build_lookups(self, legend): ''' Method of the InteractiveLegend class building the legend lookups. ''' labels = [t.get_text() for t in legend.texts] handles = legend.legendHandles label2handle = dict(zip(labels, handles)) handle2text = dict(zip(handles, legend.texts)) lookup_artist = {} lookup_handle = {} for artist in legend.axes.get_children(): if artist.get_label() in labels: handle = label2handle[artist.get_label()] lookup_handle[artist] = handle lookup_artist[handle] = artist lookup_artist[handle2text[handle]] = artist lookup_handle.update(zip(handles, handles)) lookup_handle.update(zip(legend.texts, handles)) return lookup_artist, lookup_handle def on_pick(self, event): handle = event.artist if handle in self.lookup_artist: artist = self.lookup_artist[handle] artist.set_visible(not artist.get_visible()) self.update() def update(self): for artist in self.lookup_artist.values(): handle = self.lookup_handle[artist] if artist.get_visible(): handle.set_visible(True) if artist in self.handles_aliases: for al in self.handles_aliases[artist]: al.set_visible(True) else: handle.set_visible(False) if artist in self.handles_aliases: for al in self.handles_aliases[artist]: al.set_visible(False) self.fig.canvas.draw() def show(self): ''' showing the interactive legend ''' plt.show() def getPatchesLoc(t, states): ''' Determine the location of stimulus patches. :param t: simulation time vector (s). :param states: a vector of stimulation state (ON/OFF) at each instant in time. :return: 3-tuple with number of patches, timing of STIM-ON an STIM-OFF instants. ''' # Compute states derivatives and identify bounds indexes of pulses dstates = np.diff(states) ipatch_on = np.insert(np.where(dstates > 0.0)[0] + 1, 0, 0) ipatch_off = np.where(dstates < 0.0)[0] if ipatch_off.size < ipatch_on.size: ioff = t.size - 1 if ipatch_off.size == 0: ipatch_off = np.array([ioff]) else: ipatch_off = np.insert(ipatch_off, ipatch_off.size - 1, ioff) # Get time instants for pulses ON and OFF npatches = ipatch_on.size tpatch_on = t[ipatch_on] tpatch_off = t[ipatch_off] # return 3-tuple with #patches, pulse ON and pulse OFF instants return (npatches, tpatch_on, tpatch_off) def SaveFigDialog(dirname, filename): """ Open a FileSaveDialogBox to set the directory and name of the figure to be saved. The default directory and filename are given, and the default extension is ".pdf" :param dirname: default directory :param filename: default filename :return: full path to the chosen filename """ root = tk.Tk() root.withdraw() filename_out = filedialog.asksaveasfilename(defaultextension=".pdf", initialdir=dirname, initialfile=filename) return filename_out def plotComp(varname, filepaths, labels=None, fs=15, lw=2, colors=None, lines=None, patches='one', xticks=None, yticks=None, blacklegend=False, straightlegend=False, showfig=True, inset=None, figsize=(11, 4)): ''' Compare profiles of several specific output variables of NICE simulations. :param varname: name of variable to extract and compare :param filepaths: list of full paths to output data files to be compared :param labels: list of labels to use in the legend :param fs: labels fontsize :param patches: string indicating whether to indicate periods of stimulation with colored rectangular patches ''' # Input check 1: variable name if varname not in pltvars: raise InputError('Unknown plot variable: "{}"'.format(varname)) pltvar = pltvars[varname] # Input check 2: labels if labels is not None: if len(labels) != len(filepaths): raise InputError('Invalid labels ({}): not matching number of compared files ({})' .format(len(labels), len(filepaths))) if not all(isinstance(x, str) for x in labels): raise InputError('Invalid labels: must be string typed') # Input check 3: line styles and colors if colors is None: colors = ['C{}'.format(j) for j in range(len(filepaths))] if lines is None: lines = ['-'] * len(filepaths) # Input check 4: STIM-ON patches greypatch = False if patches == 'none': patches = [False] * len(filepaths) elif patches == 'all': patches = [True] * len(filepaths) elif patches == 'one': patches = [True] + [False] * (len(filepaths) - 1) greypatch = True elif isinstance(patches, list): if len(patches) != len(filepaths): raise InputError('Invalid patches ({}): not matching number of compared files ({})' .format(len(patches), len(filepaths))) if not all(isinstance(p, bool) for p in patches): raise InputError('Invalid patch sequence: all list items must be boolean typed') else: raise InputError('Invalid patches: must be either "none", all", "one", or a boolean list') # Initialize figure and axis fig, ax = plt.subplots(figsize=figsize) ax.set_zorder(0) for item in ['top', 'right']: ax.spines[item].set_visible(False) if 'min' in pltvar and 'max' in pltvar: # optional min and max on y-axis ax.set_ylim(pltvar['min'], pltvar['max']) if pltvar['unit']: # y-label with optional unit ax.set_ylabel('$\\rm {}\ ({})$'.format(pltvar['label'], pltvar['unit']), fontsize=fs) else: ax.set_ylabel('$\\rm {}$'.format(pltvar['label']), fontsize=fs) if xticks is not None: # optional x-ticks ax.set_xticks(xticks) if yticks is not None: # optional y-ticks ax.set_yticks(yticks) else: ax.locator_params(axis='y', nbins=2) if any(ax.get_yticks() < 0): ax.yaxis.set_major_formatter(FormatStrFormatter('%+.0f')) for tick in ax.xaxis.get_major_ticks() + ax.yaxis.get_major_ticks(): tick.label.set_fontsize(fs) # Optional inset axis if inset is not None: inset_ax = fig.add_axes(ax.get_position()) inset_ax.set_xlim(inset['xlims'][0], inset['xlims'][1]) inset_ax.set_ylim(inset['ylims'][0], inset['ylims'][1]) inset_ax.set_xticks([]) inset_ax.set_yticks([]) # inset_ax.patch.set_alpha(1.0) inset_ax.set_zorder(1) inset_ax.add_patch(Rectangle((inset['xlims'][0], inset['ylims'][0]), inset['xlims'][1] - inset['xlims'][0], inset['ylims'][1] - inset['ylims'][0], color='w')) # Retrieve neurons dictionary neurons_dict = getNeuronsDict() # Loop through data files aliases = {} for j, filepath in enumerate(filepaths): # Retrieve sim type pkl_filename = ntpath.basename(filepath) mo1 = rgxp.fullmatch(pkl_filename) mo2 = rgxp_mech.fullmatch(pkl_filename) if mo1: mo = mo1 elif mo2: mo = mo2 else: logger.error('Error: "%s" file does not match regexp pattern', pkl_filename) sys.exit(1) sim_type = mo.group(1) if sim_type not in ('MECH', 'ASTIM', 'ESTIM'): raise InputError('Invalid simulation type: {}'.format(sim_type)) if j == 0: sim_type_ref = sim_type t_plt = pltvars[timeunits[sim_type]] elif sim_type != sim_type_ref: raise InputError('Invalid comparison: different simulation types') # Load data logger.info('Loading data from "%s"', pkl_filename) with open(filepath, 'rb') as fh: frame = pickle.load(fh) df = frame['data'] meta = frame['meta'] # Extract variables t = df['t'].values states = df['states'].values nsamples = t.size # Initialize neuron object if ESTIM or ASTIM sim type if sim_type in ['ASTIM', 'ESTIM']: neuron_name = mo.group(2) global neuron neuron = neurons_dict[neuron_name]() Cm0 = neuron.Cm0 Qm0 = Cm0 * neuron.Vm0 * 1e-3 # Extract neuron states if needed if 'alias' in pltvar and 'neuron_states' in pltvar['alias']: neuron_states = [df[sn].values for sn in neuron.states_names] else: Cm0 = meta['Cm0'] Qm0 = meta['Qm0'] # Initialize BLS if needed if sim_type in ['MECH', 'ASTIM'] and 'alias' in pltvar and 'bls' in pltvar['alias']: global bls bls = BilayerSonophore(meta['a'], meta['Fdrive'], Cm0, Qm0) # Determine patches location npatches, tpatch_on, tpatch_off = getPatchesLoc(t, states) # Add onset to time vectors if t_plt['onset'] > 0.0: - tonset = np.array([-t_plt['onset'], -DT_EFF]) + tonset = np.array([-t_plt['onset'], -t[0] - t[1]]) t = np.hstack((tonset, t)) - # t = np.insert(t, 0, -t_plt['onset']) - # states = np.insert(states, 0, 0) + states = np.hstack((states, np.zeros(2))) # Set x-axis label ax.set_xlabel('$\\rm {}\ ({})$'.format(t_plt['label'], t_plt['unit']), fontsize=fs) # Extract variable to plot if 'alias' in pltvar: var = eval(pltvar['alias']) elif 'key' in pltvar: var = df[pltvar['key']].values elif 'constant' in pltvar: var = eval(pltvar['constant']) * np.ones(nsamples) else: var = df[varname].values if var.size == t.size - 2: if varname is 'Vm': var = np.hstack((np.array([neuron.Vm0] * 2), var)) else: var = np.hstack((np.array([var[0]] * 2), var)) # var = np.insert(var, 0, var[0]) # Determine legend label if labels is not None: label = labels[j] else: if sim_type == 'ESTIM': if meta['DC'] == 1.0: label = ESTIM_CW_title.format(neuron_name, meta['Astim'], meta['tstim'] * 1e3) else: label = ESTIM_PW_title.format(neuron_name, meta['Astim'], meta['tstim'] * 1e3, meta['PRF'], meta['DC'] * 1e2) elif sim_type == 'ASTIM': if meta['DC'] == 1.0: label = ASTIM_CW_title.format(neuron_name, meta['Fdrive'] * 1e-3, meta['Adrive'] * 1e-3, meta['tstim'] * 1e3) else: label = ASTIM_PW_title.format(neuron_name, meta['Fdrive'] * 1e-3, meta['Adrive'] * 1e-3, meta['tstim'] * 1e3, meta['PRF'], meta['DC'] * 1e2) elif sim_type == 'MECH': label = MECH_title.format(meta['a'] * 1e9, meta['Fdrive'] * 1e-3, meta['Adrive'] * 1e-3) # Plot trace handle = ax.plot(t * t_plt['factor'], var * pltvar['factor'], linewidth=lw, linestyle=lines[j], color=colors[j], label=label) if inset is not None: inset_window = np.logical_and(t > (inset['xlims'][0] / t_plt['factor']), t < (inset['xlims'][1] / t_plt['factor'])) inset_ax.plot(t[inset_window] * t_plt['factor'], var[inset_window] * pltvar['factor'], linewidth=lw, linestyle=lines[j], color=colors[j]) # Add optional STIM-ON patches if patches[j]: (ybottom, ytop) = ax.get_ylim() la = [] color = '#8A8A8A' if greypatch else handle[0].get_color() for i in range(npatches): la.append(ax.axvspan(tpatch_on[i] * t_plt['factor'], tpatch_off[i] * t_plt['factor'], edgecolor='none', facecolor=color, alpha=0.2)) aliases[handle[0]] = la if inset is not None: cond_on = np.logical_and(tpatch_on > (inset['xlims'][0] / t_plt['factor']), tpatch_on < (inset['xlims'][1] / t_plt['factor'])) cond_off = np.logical_and(tpatch_off > (inset['xlims'][0] / t_plt['factor']), tpatch_off < (inset['xlims'][1] / t_plt['factor'])) cond_glob = np.logical_and(tpatch_on < (inset['xlims'][0] / t_plt['factor']), tpatch_off > (inset['xlims'][1] / t_plt['factor'])) cond_onoff = np.logical_or(cond_on, cond_off) cond = np.logical_or(cond_onoff, cond_glob) npatches_inset = np.sum(cond) for i in range(npatches_inset): inset_ax.add_patch(Rectangle((tpatch_on[cond][i] * t_plt['factor'], ybottom), (tpatch_off[cond][i] - tpatch_on[cond][i]) * t_plt['factor'], ytop - ybottom, color=color, alpha=0.1)) fig.tight_layout() # Optional operations on inset: if inset is not None: # Re-position inset axis axpos = ax.get_position() left, right, = rescale(inset['xcoords'], ax.get_xlim()[0], ax.get_xlim()[1], axpos.x0, axpos.x0 + axpos.width) bottom, top, = rescale(inset['ycoords'], ax.get_ylim()[0], ax.get_ylim()[1], axpos.y0, axpos.y0 + axpos.height) inset_ax.set_position([left, bottom, right - left, top - bottom]) for i in inset_ax.spines.values(): i.set_linewidth(2) # Materialize inset target region with contour frame ax.plot(inset['xlims'], [inset['ylims'][0]] * 2, linestyle='-', color='k') ax.plot(inset['xlims'], [inset['ylims'][1]] * 2, linestyle='-', color='k') ax.plot([inset['xlims'][0]] * 2, inset['ylims'], linestyle='-', color='k') ax.plot([inset['xlims'][1]] * 2, inset['ylims'], linestyle='-', color='k') # Link target and inset with dashed lines if possible if inset['xcoords'][1] < inset['xlims'][0]: ax.plot([inset['xcoords'][1], inset['xlims'][0]], [inset['ycoords'][0], inset['ylims'][0]], linestyle='--', color='k') ax.plot([inset['xcoords'][1], inset['xlims'][0]], [inset['ycoords'][1], inset['ylims'][1]], linestyle='--', color='k') elif inset['xcoords'][0] > inset['xlims'][1]: ax.plot([inset['xcoords'][0], inset['xlims'][1]], [inset['ycoords'][0], inset['ylims'][0]], linestyle='--', color='k') ax.plot([inset['xcoords'][0], inset['xlims'][1]], [inset['ycoords'][1], inset['ylims'][1]], linestyle='--', color='k') else: logger.warning('Inset x-coordinates intersect with those of target region') # Create interactive legend leg = ax.legend(loc=1, fontsize=fs, frameon=False) if blacklegend: for l in leg.get_lines(): l.set_color('k') if straightlegend: for l in leg.get_lines(): l.set_linestyle('-') interactive_legend = InteractiveLegend(ax.legend_, aliases) if showfig: plt.show() return fig def plotBatch(directory, filepaths, vars_dict=None, plt_show=True, plt_save=False, ask_before_save=True, fig_ext='png', tag='fig', fs=15, lw=2, title=True, show_patches=True): ''' Plot a figure with profiles of several specific NICE output variables, for several NICE simulations. :param positions: subplot indexes of each variable :param filepaths: list of full paths to output data files to be compared :param vars_dict: dict of lists of variables names to extract and plot together :param plt_show: boolean stating whether to show the created figures :param plt_save: boolean stating whether to save the created figures :param ask_before_save: boolean stating whether to show the created figures :param fig_ext: file extension for the saved figures :param tag: suffix added to the end of the figures name :param fs: labels font size :param lw: curves line width :param title: boolean stating whether to display a general title on the figures :param show_patches: boolean indicating whether to indicate periods of stimulation with colored rectangular patches ''' # Check validity of plot variables if vars_dict: yvars = list(sum(list(vars_dict.values()), [])) for key in yvars: if key not in pltvars: raise InputError('Unknown plot variable: "{}"'.format(key)) # Dictionary of neurons neurons_dict = getNeuronsDict() # Loop through data files for filepath in filepaths: # Get code from file name pkl_filename = ntpath.basename(filepath) filecode = pkl_filename[0:-4] # Retrieve sim type mo1 = rgxp.fullmatch(pkl_filename) mo2 = rgxp_mech.fullmatch(pkl_filename) if mo1: mo = mo1 elif mo2: mo = mo2 else: logger.error('Error: "%s" file does not match regexp pattern', pkl_filename) sys.exit(1) sim_type = mo.group(1) if sim_type not in ('MECH', 'ASTIM', 'ESTIM'): raise InputError('Invalid simulation type: {}'.format(sim_type)) # Load data logger.info('Loading data from "%s"', pkl_filename) with open(filepath, 'rb') as fh: frame = pickle.load(fh) df = frame['data'] meta = frame['meta'] # Extract variables logger.info('Extracting variables') t = df['t'].values states = df['states'].values nsamples = t.size # Initialize channel mechanism if sim_type in ['ASTIM', 'ESTIM']: neuron_name = mo.group(2) global neuron neuron = neurons_dict[neuron_name]() neuron_states = [df[sn].values for sn in neuron.states_names] Cm0 = neuron.Cm0 Qm0 = Cm0 * neuron.Vm0 * 1e-3 t_plt = pltvars['t_ms'] else: Cm0 = meta['Cm0'] Qm0 = meta['Qm0'] t_plt = pltvars['t_us'] # Initialize BLS if sim_type in ['MECH', 'ASTIM']: global bls Fdrive = meta['Fdrive'] a = meta['a'] bls = BilayerSonophore(a, Fdrive, Cm0, Qm0) # Determine patches location npatches, tpatch_on, tpatch_off = getPatchesLoc(t, states) # Adding onset to time vector if t_plt['onset'] > 0.0: - tonset = np.array([-t_plt['onset'], -DT_EFF]) + tonset = np.array([-t_plt['onset'], -t[0] - t[1]]) t = np.hstack((tonset, t)) - # t = np.insert(t, 0, -t_plt['onset']) - # states = np.insert(states, 0, 0) + states = np.hstack((states, np.zeros(2))) # Determine variables to plot if not provided if not vars_dict: if sim_type == 'ASTIM': vars_dict = {'Z': ['Z'], 'Q_m': ['Qm']} elif sim_type == 'ESTIM': vars_dict = {'V_m': ['Vm']} elif sim_type == 'MECH': vars_dict = {'P_{AC}': ['Pac'], 'Z': ['Z'], 'n_g': ['ng']} if sim_type in ['ASTIM', 'ESTIM'] and hasattr(neuron, 'pltvars_scheme'): vars_dict.update(neuron.pltvars_scheme) labels = list(vars_dict.keys()) naxes = len(vars_dict) # Plotting if naxes == 1: _, ax = plt.subplots(figsize=(11, 4)) axes = [ax] else: _, axes = plt.subplots(naxes, 1, figsize=(11, min(3 * naxes, 9))) for i in range(naxes): ax = axes[i] for item in ['top', 'right']: ax.spines[item].set_visible(False) ax_pltvars = [pltvars[j] for j in vars_dict[labels[i]]] nvars = len(ax_pltvars) # X-axis if i < naxes - 1: ax.get_xaxis().set_ticklabels([]) else: ax.set_xlabel('${}\ ({})$'.format(t_plt['label'], t_plt['unit']), fontsize=fs) for tick in ax.xaxis.get_major_ticks(): tick.label.set_fontsize(fs) # Y-axis if ax_pltvars[0]['unit']: ax.set_ylabel('${}\ ({})$'.format(labels[i], ax_pltvars[0]['unit']), fontsize=fs) else: ax.set_ylabel('${}$'.format(labels[i]), fontsize=fs) if 'min' in ax_pltvars[0] and 'max' in ax_pltvars[0]: ax_min = min([ap['min'] for ap in ax_pltvars]) ax_max = max([ap['max'] for ap in ax_pltvars]) ax.set_ylim(ax_min, ax_max) ax.locator_params(axis='y', nbins=2) for tick in ax.yaxis.get_major_ticks(): tick.label.set_fontsize(fs) # Time series icolor = 0 for j in range(nvars): # Extract variable pltvar = ax_pltvars[j] if 'alias' in pltvar: var = eval(pltvar['alias']) elif 'key' in pltvar: var = df[pltvar['key']].values elif 'constant' in pltvar: var = eval(pltvar['constant']) * np.ones(nsamples) else: var = df[vars_dict[labels[i]][j]].values if var.size == t.size - 2: if pltvar['desc'] == 'membrane potential': var = np.hstack((np.array([neuron.Vm0] * 2), var)) else: var = np.hstack((np.array([var[0]] * 2), var)) # var = np.insert(var, 0, var[0]) # Plot variable if 'constant' in pltvar or pltvar['desc'] in ['net current']: ax.plot(t * t_plt['factor'], var * pltvar['factor'], '--', c='black', lw=lw, label='${}$'.format(pltvar['label'])) else: ax.plot(t * t_plt['factor'], var * pltvar['factor'], c='C{}'.format(icolor), lw=lw, label='${}$'.format(pltvar['label'])) icolor += 1 # Patches if show_patches == 1: (ybottom, ytop) = ax.get_ylim() for j in range(npatches): ax.axvspan(tpatch_on[j] * t_plt['factor'], tpatch_off[j] * t_plt['factor'], edgecolor='none', facecolor='#8A8A8A', alpha=0.2) # Legend if nvars > 1: ax.legend(fontsize=fs, loc=7, ncol=nvars // 4 + 1) # Title if title: if sim_type == 'ESTIM': if meta['DC'] == 1.0: fig_title = ESTIM_CW_title.format(neuron.name, meta['Astim'], meta['tstim'] * 1e3) else: fig_title = ESTIM_PW_title.format(neuron.name, meta['Astim'], meta['tstim'] * 1e3, meta['PRF'], meta['DC'] * 1e2) elif sim_type == 'ASTIM': if meta['DC'] == 1.0: fig_title = ASTIM_CW_title.format(neuron.name, Fdrive * 1e-3, meta['Adrive'] * 1e-3, meta['tstim'] * 1e3) else: fig_title = ASTIM_PW_title.format(neuron.name, Fdrive * 1e-3, meta['Adrive'] * 1e-3, meta['tstim'] * 1e3, meta['PRF'], meta['DC'] * 1e2) elif sim_type == 'MECH': fig_title = MECH_title.format(a * 1e9, Fdrive * 1e-3, meta['Adrive'] * 1e-3) axes[0].set_title(fig_title, fontsize=fs) plt.tight_layout() # Save figure if needed (automatic or checked) if plt_save: if ask_before_save: plt_filename = SaveFigDialog(directory, '{}_{}.{}'.format(filecode, tag, fig_ext)) else: plt_filename = '{}/{}_{}.{}'.format(directory, filecode, tag, fig_ext) if plt_filename: plt.savefig(plt_filename) logger.info('Saving figure as "{}"'.format(plt_filename)) plt.close() # Show all plots if needed if plt_show: plt.show() def plotGatingKinetics(neuron, fs=15): ''' Plot the voltage-dependent steady-states and time constants of activation and inactivation gates of the different ionic currents involved in a specific neuron's membrane. :param neuron: specific channel mechanism object :param fs: labels and title font size ''' # Input membrane potential vector Vm = np.linspace(-100, 50, 300) xinf_dict = {} taux_dict = {} logger.info('Computing %s neuron gating kinetics', neuron.name) names = neuron.states_names print(names) for xname in names: Vm_state = True # Names of functions of interest xinf_func_str = xname.lower() + 'inf' taux_func_str = 'tau' + xname.lower() alphax_func_str = 'alpha' + xname.lower() betax_func_str = 'beta' + xname.lower() # derx_func_str = 'der' + xname.upper() # 1st choice: use xinf and taux function if hasattr(neuron, xinf_func_str) and hasattr(neuron, taux_func_str): xinf_func = getattr(neuron, xinf_func_str) taux_func = getattr(neuron, taux_func_str) xinf = np.array([xinf_func(v) for v in Vm]) if isinstance(taux_func, float): taux = taux_func * np.ones(len(Vm)) else: taux = np.array([taux_func(v) for v in Vm]) # 2nd choice: use alphax and betax functions elif hasattr(neuron, alphax_func_str) and hasattr(neuron, betax_func_str): alphax_func = getattr(neuron, alphax_func_str) betax_func = getattr(neuron, betax_func_str) alphax = np.array([alphax_func(v) for v in Vm]) if isinstance(betax_func, float): betax = betax_func * np.ones(len(Vm)) else: betax = np.array([betax_func(v) for v in Vm]) taux = 1.0 / (alphax + betax) xinf = taux * alphax # # 3rd choice: use derX choice # elif hasattr(neuron, derx_func_str): # derx_func = getattr(neuron, derx_func_str) # xinf = brentq(lambda x: derx_func(neuron.Vm, x), 0, 1) else: Vm_state = False if not Vm_state: logger.error('no function to compute %s-state gating kinetics', xname) else: xinf_dict[xname] = xinf taux_dict[xname] = taux fig, axes = plt.subplots(2) fig.suptitle('{} neuron: gating dynamics'.format(neuron.name)) ax = axes[0] ax.get_xaxis().set_ticklabels([]) ax.set_ylabel('$X_{\infty}$', fontsize=fs) for xname in names: if xname in xinf_dict: ax.plot(Vm, xinf_dict[xname], lw=2, label='$' + xname + '_{\infty}$') ax.legend(fontsize=fs, loc=7) ax = axes[1] ax.set_xlabel('$V_m\ (mV)$', fontsize=fs) ax.set_ylabel('$\\tau_X\ (ms)$', fontsize=fs) for xname in names: if xname in taux_dict: ax.plot(Vm, taux_dict[xname] * 1e3, lw=2, label='$\\tau_{' + xname + '}$') ax.legend(fontsize=fs, loc=7) plt.show() def plotRateConstants(neuron, fs=15): ''' Plot the voltage-dependent activation and inactivation rate constants for each gate of all ionic currents involved in a specific neuron's membrane. :param neuron: specific channel mechanism object :param fs: labels and title font size ''' # Input membrane potential vector Vm = np.linspace(neuron.Vm0 - 10, 50, 100) alphax_dict = {} betax_dict = {} logger.info('Computing %s neuron gating kinetics', neuron.name) names = neuron.states_names for xname in names: Vm_state = True # Names of functions of interest xinf_func_str = xname.lower() + 'inf' taux_func_str = 'tau' + xname.lower() alphax_func_str = 'alpha' + xname.lower() betax_func_str = 'beta' + xname.lower() # 1st choice: use alphax and betax functions if hasattr(neuron, alphax_func_str) and hasattr(neuron, betax_func_str): alphax_func = getattr(neuron, alphax_func_str) betax_func = getattr(neuron, betax_func_str) alphax = np.array([alphax_func(v) for v in Vm]) betax = np.array([betax_func(v) for v in Vm]) # 2nd choice: use xinf and taux function elif hasattr(neuron, xinf_func_str) and hasattr(neuron, taux_func_str): xinf_func = getattr(neuron, xinf_func_str) taux_func = getattr(neuron, taux_func_str) xinf = np.array([xinf_func(v) for v in Vm]) taux = np.array([taux_func(v) for v in Vm]) alphax = xinf / taux betax = 1.0 / taux - alphax else: Vm_state = False if not Vm_state: logger.error('no function to compute %s-state gating kinetics', xname) else: alphax_dict[xname] = alphax betax_dict[xname] = betax naxes = len(alphax_dict) _, axes = plt.subplots(naxes, figsize=(11, min(3 * naxes, 9))) for i, xname in enumerate(alphax_dict.keys()): ax1 = axes[i] if i == 0: ax1.set_title('{} neuron: rate constants'.format(neuron.name)) if i == naxes - 1: ax1.set_xlabel('$V_m\ (mV)$', fontsize=fs) else: ax1.get_xaxis().set_ticklabels([]) ax1.set_ylabel('$\\alpha_{' + xname + '}\ (ms^{-1})$', fontsize=fs, color='C0') for label in ax1.get_yticklabels(): label.set_color('C0') ax1.plot(Vm, alphax_dict[xname] * 1e-3, lw=2) ax2 = ax1.twinx() ax2.set_ylabel('$\\beta_{' + xname + '}\ (ms^{-1})$', fontsize=fs, color='C1') for label in ax2.get_yticklabels(): label.set_color('C1') ax2.plot(Vm, betax_dict[xname] * 1e-3, lw=2, color='C1') plt.tight_layout() plt.show() def setGrid(n, ncolmax=3): ''' Determine number of rows and columns in figure grid, based on number of variables to plot. ''' if n <= ncolmax: return (1, n) else: return ((n - 1) // ncolmax + 1, ncolmax) def plotEffVars(neuron, Fdrive, a=32e-9, amps=None, charges=None, keys=None, fs=12, ncolmax=2): ''' Plot the profiles of effective variables of a specific neuron for a given frequency. For each variable, one line chart per amplitude is plotted, using charge as the input variable on the abscissa and a linear color code for the amplitude value. :param neuron: channel mechanism object :param Fdrive: acoustic drive frequency (Hz) :param a: sonophore diameter (m) :param amps: vector of amplitudes at which variables must be plotted (Pa) :param charges: vector of charges at which variables must be plotted (C/m2) :param keys: list of variables to plot :param fs: figure fontsize :param ncolmax: max number of columns on the figure :return: handle to the created figure ''' # Check lookup file existence lookup_file = '{}_lookups_a{:.1f}nm.pkl'.format(neuron.name, a * 1e9) lookup_path = '{}/{}'.format(getLookupDir(), lookup_file) if not os.path.isfile(lookup_path): raise InputError('Missing lookup file: "{}"'.format(lookup_file)) # Load coefficients with open(lookup_path, 'rb') as fh: lookups3D = pickle.load(fh) # Retrieve 1D inputs from lookup dictionary freqs = lookups3D.pop('f') amps_ref = lookups3D.pop('A') charges_ref = lookups3D.pop('Q') # Filter lookups keys if provided if keys is not None: lookups3D = {key: lookups3D[key] for key in keys} # Interpolate 3D lookups at US frequency lookups2D = itrpLookupsFreq(lookups3D, freqs, Fdrive) if 'V' in lookups2D: lookups2D['Vm'] = lookups2D.pop('V') keys[keys.index('V')] = 'Vm' # Define log-amplitude color code if amps is None: amps = amps_ref mymap = cm.get_cmap('Oranges') norm = matplotlib.colors.LogNorm(amps.min(), amps.max()) sm = cm.ScalarMappable(norm=norm, cmap=mymap) sm._A = [] # Plot logger.info('plotting') nrows, ncols = setGrid(len(lookups2D), ncolmax=ncolmax) xvar = pltvars['Qm'] if charges is None: charges = charges_ref Qbounds = np.array([charges.min(), charges.max()]) * xvar['factor'] fig, _ = plt.subplots(figsize=(3 * ncols, 1 * nrows), squeeze=False) for j, key in enumerate(keys): ax = plt.subplot2grid((nrows, ncols), (j // ncols, j % ncols)) for s in ['right', 'top']: ax.spines[s].set_visible(False) yvar = pltvars[key] if j // ncols == nrows - 1: ax.set_xlabel('$\\rm {}\ ({})$'.format(xvar['label'], xvar['unit']), fontsize=fs) ax.set_xticks(Qbounds) else: ax.set_xticks([]) ax.spines['bottom'].set_visible(False) ax.xaxis.set_label_coords(0.5, -0.1) ax.yaxis.set_label_coords(-0.02, 0.5) for item in ax.get_xticklabels() + ax.get_yticklabels(): item.set_fontsize(fs) ymin = np.inf ymax = -np.inf y0 = np.squeeze(interp2d(amps_ref, charges_ref, lookups2D[key].T)(0, charges)) # Plot effective variable for each selected amplitude for Adrive in amps: y = np.squeeze(interp2d(amps_ref, charges_ref, lookups2D[key].T)(Adrive, charges)) if 'alpha' in key or 'beta' in key: y[y > y0.max() * 2] = np.nan ax.plot(charges * xvar['factor'], y * yvar['factor'], c=sm.to_rgba(Adrive)) ymin = min(ymin, y.min()) ymax = max(ymax, y.max()) # Plot reference variable ax.plot(charges * xvar['factor'], y0 * yvar['factor'], '--', c='k') ymax = max(ymax, y0.max()) ymin = min(ymin, y0.min()) # Set axis y-limits if 'alpha' in key or 'beta' in key: ymax = y0.max() * 2 ylim = [ymin * yvar['factor'], ymax * yvar['factor']] if key == 'ng': ylim = [np.floor(ylim[0] * 1e2) / 1e2, np.ceil(ylim[1] * 1e2) / 1e2] else: factor = 1 / np.power(10, np.floor(np.log10(ylim[1]))) print(key, ylim[1], factor) ylim = [np.floor(ylim[0] * factor) / factor, np.ceil(ylim[1] * factor) / factor] dy = ylim[1] - ylim[0] ax.set_yticks(ylim) ax.set_ylim(ylim) # ax.set_ylim([ylim[0] - 0.05 * dy, ylim[1] + 0.05 * dy]) # Annotate variable and unit xlim = ax.get_xlim() if np.argmax(y0) < np.argmin(y0): xtext = xlim[0] + 0.6 * (xlim[1] - xlim[0]) else: xtext = xlim[0] + 0.01 * (xlim[1] - xlim[0]) if key in ['Vm', 'ng']: ytext = ylim[0] + 0.85 * dy else: ytext = ylim[0] + 0.15 * dy ax.text(xtext, ytext, '$\\rm {}\ ({})$'.format(yvar['label'], yvar['unit']), fontsize=fs) fig.suptitle('{} neuron: original vs. effective variables @ {:.0f} kHz'.format( neuron.name, Fdrive * 1e-3)) # Plot colorbar fig.subplots_adjust(left=0.10, bottom=0.05, top=0.9, right=0.85) cbarax = fig.add_axes([0.87, 0.05, 0.04, 0.85]) fig.colorbar(sm, cax=cbarax) cbarax.set_ylabel('amplitude (Pa)', fontsize=fs) for item in cbarax.get_yticklabels(): item.set_fontsize(fs) return fig def plotActivationMap(DCs, amps, actmap, FRlims, title=None, Ascale='log', FRscale='log', fs=8): ''' Plot a neuron's activation map over the amplitude x duty cycle 2D space. :param DCs: duty cycle vector :param amps: amplitude vector :param actmap: 2D activation matrix :param FRlims: lower and upper bounds of firing rate color-scale :param title: figure title :param Ascale: scale to use for the amplitude dimension ('lin' or 'log') :param FRscale: scale to use for the firing rate coloring ('lin' or 'log') :param fs: fontsize to use for the title and labels :return: 3-tuple with the handle to the generated figure and the mesh x and y coordinates ''' # Check firing rate bounding minFR, maxFR = (actmap[actmap > 0].min(), actmap.max()) logger.info('FR range: %.0f - %.0f Hz', minFR, maxFR) if minFR < FRlims[0]: logger.warning('Minimal firing rate (%.0f Hz) is below defined lower bound (%.0f Hz)', minFR, FRlims[0]) if maxFR > FRlims[1]: logger.warning('Maximal firing rate (%.0f Hz) is above defined upper bound (%.0f Hz)', maxFR, FRlims[1]) # Plot activation map if FRscale == 'lin': norm = matplotlib.colors.Normalize(*FRlims) elif FRscale == 'log': norm = matplotlib.colors.LogNorm(*FRlims) fig, ax = plt.subplots(figsize=cm2inch(8, 5.8)) fig.subplots_adjust(left=0.15, bottom=0.15, right=0.8, top=0.92) if title is not None: ax.set_title(title, fontsize=fs) if Ascale == 'log': ax.set_yscale('log') ax.set_xlabel('Duty cycle (%)', fontsize=fs, labelpad=-0.5) ax.set_ylabel('Amplitude (kPa)', fontsize=fs) ax.set_xlim(np.array([DCs.min(), DCs.max()]) * 1e2) for item in ax.get_xticklabels() + ax.get_yticklabels(): item.set_fontsize(fs) xedges = computeMeshEdges(DCs) yedges = computeMeshEdges(amps, scale='log') actmap[actmap == -1] = np.nan actmap[actmap == 0] = 1e-3 cmap = plt.get_cmap('viridis') cmap.set_bad('silver') cmap.set_under('k') ax.pcolormesh(xedges * 1e2, yedges * 1e-3, actmap, cmap=cmap, norm=norm) # Plot firing rate colorbar sm = plt.cm.ScalarMappable(cmap=cmap, norm=norm) sm._A = [] pos1 = ax.get_position() # get the map axis position cbarax = fig.add_axes([pos1.x1 + 0.02, pos1.y0, 0.03, pos1.height]) fig.colorbar(sm, cax=cbarax) cbarax.set_ylabel('Firing rate (Hz)', fontsize=fs) for item in cbarax.get_yticklabels(): item.set_fontsize(fs) return (fig, xedges, yedges) def plotDualMaxMap(DCs, amps, maxmap, factor, actmap, title, lbl='xy', Ascale='log', fs=8): ''' Plot a variable maximum map over the amplitude x duty cycle 2D space, with different color codes for the sub and supra-threshold regions. :param DCs: duty cycle vector :param amps: amplitude vector :param maxmap: 2D variable maximum matrix :param factor: unit factor to use for the colorbars labels :param actmap: 2D activation matrix :param title: figure title :param lbl: indicates whether to label the x and y axes :param Ascale: scale to use for the amplitude dimension ('lin' or 'log') :param fs: fontsize to use for the title and labels :return: a handle to the generated figure ''' # Split variable max map into sub-threshold and supra-threshold max maps maxmap_sub, maxmap_supra = maxmap.copy(), maxmap.copy() maxmap_sub[actmap >= 0] = np.nan maxmap_supra[actmap < 0] = np.nan # Plot dual max map fig, ax = plt.subplots(figsize=cm2inch(8, 5.8)) fig.subplots_adjust(left=0.15, bottom=0.15, right=0.8, top=0.92) ax.set_title(title, fontsize=fs) if Ascale == 'log': ax.set_yscale('log') if 'x' in lbl: ax.set_xlabel('Duty cycle (%)', fontsize=fs) else: ax.set_xticklabels([]) if 'y' in lbl: ax.set_ylabel('Amplitude (kPa)', fontsize=fs) else: ax.set_yticklabels([]) for item in ax.get_xticklabels() + ax.get_yticklabels(): item.set_fontsize(fs) xedges = computeMeshEdges(DCs) yedges = computeMeshEdges(amps, scale=Ascale) # Plot the 2 corresponding colorbars sm_sub = ax.pcolormesh(xedges * 1e2, yedges * 1e-3, maxmap_sub * factor, cmap='Blues') sm_supra = ax.pcolormesh(xedges * 1e2, yedges * 1e-3, maxmap_supra * factor, cmap='Reds') pos1 = ax.get_position() # get the map axis position height = (pos1.height - 0.05) / 2.0 cbarax_sub = fig.add_axes([pos1.x1 + 0.02, pos1.y0, 0.03, height]) cbar_sub = fig.colorbar(sm_sub, cax=cbarax_sub, format='%.1f') cbar_sub.set_ticks(np.array([np.nanmin(maxmap_sub), np.nanmax(maxmap_sub)]) * factor) cbarax_supra = fig.add_axes([pos1.x1 + 0.02, pos1.y1 - height, 0.03, height]) cbar_supra = fig.colorbar(sm_supra, cax=cbarax_supra, format='%.1f') cbar_supra.set_ticks(np.array([np.nanmin(maxmap_supra), np.nanmax(maxmap_supra)]) * factor) for item in cbarax_sub.get_yticklabels() + cbarax_supra.get_yticklabels(): item.set_fontsize(fs) return fig def plotRawTrace(fpath, key, ybounds): ''' Plot the raw signal of a given variable within specified bounds. :param foath: full path to the data file :param key: key to the target variable :param ybounds: y-axis bounds :return: handle to the generated figure ''' # Check file existence fname = ntpath.basename(fpath) if not os.path.isfile(fpath): raise InputError('Error: "{}" file does not exist'.format(fname)) # Load data logger.debug('Loading data from "%s"', fname) with open(fpath, 'rb') as fh: frame = pickle.load(fh) df = frame['data'] t = df['t'].values y = df[key].values * pltvars[key]['factor'] Δy = y.max() - y.min() logger.info('d%s = %.1f %s', key, Δy, pltvars[key]['unit']) # Plot trace fig, ax = plt.subplots(figsize=cm2inch(12.5, 5.8)) fig.canvas.set_window_title(fname) for s in ['top', 'bottom', 'left', 'right']: ax.spines[s].set_visible(False) ax.set_xticks([]) ax.set_yticks([]) ax.set_ylim(ybounds) ax.plot(t, y, color='k', linewidth=1) fig.tight_layout() return fig def plotTraces(fpath, keys, tbounds): ''' Plot the raw signal of sevral variables within specified bounds. :param foath: full path to the data file :param key: key to the target variable :param tbounds: x-axis bounds :return: handle to the generated figure ''' # Check file existence fname = ntpath.basename(fpath) if not os.path.isfile(fpath): raise InputError('Error: "{}" file does not exist'.format(fname)) # Load data logger.debug('Loading data from "%s"', fname) with open(fpath, 'rb') as fh: frame = pickle.load(fh) df = frame['data'] t = df['t'].values * 1e3 # Plot trace fs = 8 fig, ax = plt.subplots(figsize=cm2inch(7, 3)) fig.canvas.set_window_title(fname) plt.subplots_adjust(left=0.2, bottom=0.2, right=0.95, top=0.95) for s in ['top', 'right']: ax.spines[s].set_visible(False) for s in ['bottom', 'left']: ax.spines[s].set_position(('axes', -0.03)) ax.spines[s].set_linewidth(2) ax.yaxis.set_tick_params(width=2) # ax.spines['bottom'].set_linewidth(2) ax.set_xlim(tbounds) ax.set_xticks([]) ymin = np.nan ymax = np.nan dt = tbounds[1] - tbounds[0] ax.set_xlabel('{}s'.format(si_format(dt * 1e-3, space=' ')), fontsize=fs) ax.set_ylabel('mV - $\\rm nC/cm^2$', fontsize=fs, labelpad=-15) colors = {'Vm': 'darkgrey', 'Qm': 'k'} for key in keys: y = df[key].values * pltvars[key]['factor'] ymin = np.nanmin([ymin, y.min()]) ymax = np.nanmax([ymax, y.max()]) # if key == 'Qm': # y0 = y[0] # ax.plot(t, y0 * np.ones(t.size), '--', color='k', linewidth=1) Δy = y.max() - y.min() logger.info('d%s = %.1f %s', key, Δy, pltvars[key]['unit']) ax.plot(t, y, color=colors[key], linewidth=1) ax.yaxis.set_major_formatter(FormatStrFormatter('%.0f')) # ax.set_yticks([ymin, ymax]) ax.set_ylim([-200, 100]) ax.set_yticks([-200, 100]) for item in ax.get_yticklabels(): item.set_fontsize(fs) # fig.tight_layout() return fig diff --git a/PointNICE/solvers/simutils.py b/PointNICE/solvers/simutils.py index 97d77b4..65fef31 100644 --- a/PointNICE/solvers/simutils.py +++ b/PointNICE/solvers/simutils.py @@ -1,2086 +1,2137 @@ # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Date: 2017-08-22 14:33:04 # @Last Modified by: Theo Lemaire -# @Last Modified time: 2018-07-23 13:40:44 +# @Last Modified time: 2018-07-23 14:39:56 """ Utility functions used in simulations """ import os import time import logging import pickle import shutil import tkinter as tk from tkinter import filedialog import numpy as np import pandas as pd from openpyxl import load_workbook import lockfile import multiprocessing as mp from ..bls import BilayerSonophore from .SolverUS import SolverUS from .SolverElec import SolverElec from ..constants import * from ..neurons import * from ..utils import getNeuronsDict, InputError, PmCompMethod, si_format, getCycleAverage, nDindexes # Get package logger logger = logging.getLogger('PointNICE') class Consumer(mp.Process): ''' Generic consumer process, taking tasks from a queue and outputing results in another queue. ''' def __init__(self, task_queue, result_queue): mp.Process.__init__(self) self.task_queue = task_queue self.result_queue = result_queue print('Starting {}'.format(self.name)) def run(self): while True: nextTask = self.task_queue.get() if nextTask is None: print('Exiting {}'.format(self.name)) self.task_queue.task_done() break print('{}: {}'.format(self.name, nextTask)) answer = nextTask() self.task_queue.task_done() self.result_queue.put(answer) return class LookupWorker(): ''' Worker class that computes "effective" coefficients of the HH system for a specific combination of stimulus frequency, stimulus amplitude and charge density. A short mechanical simulation is run while imposing the specific charge density, until periodic stabilization. The HH coefficients are then averaged over the last acoustic cycle to yield "effective" coefficients. ''' def __init__(self, wid, bls, neuron, Fdrive, Adrive, Qm, phi, nsims): ''' Class constructor. :param wid: worker ID :param bls: BilayerSonophore object :param neuron: neuron object :param Fdrive: acoustic drive frequency (Hz) :param Adrive: acoustic drive amplitude (Pa) :param Qm: imposed charge density (C/m2) :param phi: acoustic drive phase (rad) :param nsims: total number or simulations ''' self.id = wid self.bls = bls self.neuron = neuron self.Fdrive = Fdrive self.Adrive = Adrive self.Qm = Qm self.phi = phi self.nsims = nsims def __call__(self): ''' Method that computes effective coefficients. ''' try: # Run simulation and retrieve deflection and gas content vectors from last cycle _, [Z, ng], _ = self.bls.run(self.Fdrive, self.Adrive, self.Qm, self.phi) Z_last = Z[-NPC_FULL:] # m # Compute membrane potential vector Vm = self.Qm / self.bls.v_Capct(Z_last) * 1e3 # mV # Compute average cycle value for membrane potential and rate constants Vm_eff = np.mean(Vm) # mV rates_eff = self.neuron.getEffRates(Vm) # Take final cycle value for gas content ng_eff = ng[-1] # mole return (self.id, [Vm_eff, ng_eff, *rates_eff]) except (Warning, AssertionError) as inst: logger.warning('Integration error: %s. Continue batch? (y/n)', extra={inst}) user_str = input() if user_str not in ['y', 'Y']: return -1 def __str__(self): return 'simulation {}/{} (f = {}Hz, A = {}Pa, Q = {:.2f} nC/cm2)'\ .format(self.id + 1, self.nsims, *si_format([self.Fdrive, self.Adrive], space=' '), self.Qm * 1e5) class AStimWorker(): ''' Worker class that runs a single A-STIM simulation a given neuron for specific stimulation parameters, and save the results in a PKL file. ''' def __init__(self, wid, batch_dir, log_filepath, solver, neuron, Fdrive, Adrive, tstim, toffset, PRF, DC, int_method, nsims): ''' Class constructor. :param wid: worker ID :param solver: SolverUS object :param neuron: neuron object :param Fdrive: acoustic drive frequency (Hz) :param Adrive: acoustic drive amplitude (Pa) :param tstim: duration of US stimulation (s) :param toffset: duration of the offset (s) :param PRF: pulse repetition frequency (Hz) :param DC: pulse duty cycle (-) :param int_method: selected integration method :param nsims: total number or simulations ''' self.id = wid self.batch_dir = batch_dir self.log_filepath = log_filepath self.solver = solver self.neuron = neuron self.Fdrive = Fdrive self.Adrive = Adrive self.tstim = tstim self.toffset = toffset self.PRF = PRF self.DC = DC self.int_method = int_method self.nsims = nsims def __call__(self): ''' Method that runs the simulation. ''' simcode = 'ASTIM_{}_{}_{:.0f}nm_{:.0f}kHz_{:.1f}kPa_{:.0f}ms_{}{}'\ .format(self.neuron.name, 'CW' if self.DC == 1 else 'PW', self.solver.a * 1e9, self.Fdrive * 1e-3, self.Adrive * 1e-3, self.tstim * 1e3, 'PRF{:.2f}Hz_DC{:.2f}%_'.format(self.PRF, self.DC * 1e2) if self.DC < 1. else '', self.int_method) try: # Get date and time info date_str = time.strftime("%Y.%m.%d") daytime_str = time.strftime("%H:%M:%S") # Run simulation tstart = time.time() (t, y, states) = self.solver.run(self.neuron, self.Fdrive, self.Adrive, self.tstim, self.toffset, self.PRF, self.DC, self.int_method) Z, ng, Qm, Vm, *channels = y U = np.insert(np.diff(Z) / np.diff(t), 0, 0.0) tcomp = time.time() - tstart logger.debug('completed in %ss', si_format(tcomp, 2)) # Store dataframe and metadata df = pd.DataFrame({'t': t, 'states': states, 'U': U, 'Z': Z, 'ng': ng, 'Qm': Qm, 'Vm': Vm}) for j in range(len(self.neuron.states_names)): df[self.neuron.states_names[j]] = channels[j] meta = {'neuron': self.neuron.name, 'a': self.solver.a, 'd': self.solver.d, 'Fdrive': self.Fdrive, 'Adrive': self.Adrive, 'phi': np.pi, 'tstim': self.tstim, 'toffset': self.toffset, 'PRF': self.PRF, 'DC': self.DC, 'tcomp': tcomp} # Export into to PKL file output_filepath = '{}/{}.pkl'.format(self.batch_dir, simcode) with open(output_filepath, 'wb') as fh: pickle.dump({'meta': meta, 'data': df}, fh) logger.debug('simulation data exported to "%s"', output_filepath) # Detect spikes on Qm signal dt = t[1] - t[0] ipeaks, *_ = findPeaks(Qm, SPIKE_MIN_QAMP, int(np.ceil(SPIKE_MIN_DT / dt)), SPIKE_MIN_QPROM) n_spikes = ipeaks.size lat = t[ipeaks[0]] if n_spikes > 0 else 'N/A' sr = np.mean(1 / np.diff(t[ipeaks])) if n_spikes > 1 else 'N/A' logger.debug('%u spike%s detected', n_spikes, "s" if n_spikes > 1 else "") # Export key metrics to log file log = { 'A': date_str, 'B': daytime_str, 'C': self.neuron.name, 'D': self.solver.a * 1e9, 'E': self.solver.d * 1e6, 'F': self.Fdrive * 1e-3, 'G': self.Adrive * 1e-3, 'H': self.tstim * 1e3, 'I': self.PRF * 1e-3 if self.DC < 1 else 'N/A', 'J': self.DC, 'K': self.int_method, 'L': t.size, 'M': round(tcomp, 2), 'N': n_spikes, 'O': lat * 1e3 if isinstance(lat, float) else 'N/A', 'P': sr * 1e-3 if isinstance(sr, float) else 'N/A' } if xlslog(self.log_filepath, 'Data', log) == 1: logger.debug('log exported to "%s"', self.log_filepath) else: logger.error('log export to "%s" aborted', self.log_filepath) return output_filepath except (Warning, AssertionError) as inst: logger.warning('Integration error: %s. Continue batch? (y/n)', extra={inst}) user_str = input() if user_str not in ['y', 'Y']: return -1 def __str__(self): worker_str = 'A-STIM {} simulation {}/{}: {} neuron, a = {}m, f = {}Hz, A = {}Pa, t = {}s'\ .format(self.int_method, self.id, self.nsims, self.neuron.name, *si_format([self.solver.a, self.Fdrive], 1, space=' '), si_format(self.Adrive, 2, space=' '), si_format(self.tstim, 1, space=' ')) if self.DC < 1.0: worker_str += ', PRF = {}Hz, DC = {:.2f}%'\ .format(si_format(self.PRF, 2, space=' '), self.DC * 1e2) return worker_str class EStimWorker(): ''' Worker class that runs a single E-STIM simulation a given neuron for specific stimulation parameters, and save the results in a PKL file. ''' def __init__(self, wid, batch_dir, log_filepath, solver, neuron, Astim, tstim, toffset, PRF, DC, nsims): ''' Class constructor. :param wid: worker ID :param solver: SolverElec object :param neuron: neuron object :param Astim: stimulus amplitude (mA/m2) :param tstim: duration of US stimulation (s) :param toffset: duration of the offset (s) :param PRF: pulse repetition frequency (Hz) :param DC: pulse duty cycle (-) :param nsims: total number or simulations ''' self.id = wid self.batch_dir = batch_dir self.log_filepath = log_filepath self.solver = solver self.neuron = neuron self.Astim = Astim self.tstim = tstim self.toffset = toffset self.PRF = PRF self.DC = DC self.nsims = nsims def __call__(self): ''' Method that runs the simulation. ''' simcode = 'ESTIM_{}_{}_{:.1f}mA_per_m2_{:.0f}ms{}'\ .format(self.neuron.name, 'CW' if self.DC == 1 else 'PW', self.Astim, self.tstim * 1e3, '_PRF{:.2f}Hz_DC{:.2f}%'.format(self.PRF, self.DC * 1e2) if self.DC < 1. else '') try: # Get date and time info date_str = time.strftime("%Y.%m.%d") daytime_str = time.strftime("%H:%M:%S") # Run simulation tstart = time.time() (t, y, states) = self.solver.run(self.neuron, self.Astim, self.tstim, self.toffset, self.PRF, self.DC) Vm, *channels = y tcomp = time.time() - tstart logger.debug('completed in %ss', si_format(tcomp, 1)) # Store dataframe and metadata df = pd.DataFrame({'t': t, 'states': states, 'Vm': Vm, 'Qm': Vm * self.neuron.Cm0 * 1e-3}) for j in range(len(self.neuron.states_names)): df[self.neuron.states_names[j]] = channels[j] meta = {'neuron': self.neuron.name, 'Astim': self.Astim, 'tstim': self.tstim, 'toffset': self.toffset, 'PRF': self.PRF, 'DC': self.DC, 'tcomp': tcomp} # Export into to PKL file output_filepath = '{}/{}.pkl'.format(self.batch_dir, simcode) with open(output_filepath, 'wb') as fh: pickle.dump({'meta': meta, 'data': df}, fh) logger.debug('simulation data exported to "%s"', output_filepath) # Detect spikes on Vm signal dt = t[1] - t[0] ipeaks, *_ = findPeaks(Vm, SPIKE_MIN_VAMP, int(np.ceil(SPIKE_MIN_DT / dt)), SPIKE_MIN_VPROM) n_spikes = ipeaks.size lat = t[ipeaks[0]] if n_spikes > 0 else 'N/A' sr = np.mean(1 / np.diff(t[ipeaks])) if n_spikes > 1 else 'N/A' logger.debug('%u spike%s detected', n_spikes, "s" if n_spikes > 1 else "") # Export key metrics to log file log = { 'A': date_str, 'B': daytime_str, 'C': self.neuron.name, 'D': self.Astim, 'E': self.tstim * 1e3, 'F': self.PRF * 1e-3 if self.DC < 1 else 'N/A', 'G': self.DC, 'H': t.size, 'I': round(tcomp, 2), 'J': n_spikes, 'K': lat * 1e3 if isinstance(lat, float) else 'N/A', 'L': sr * 1e-3 if isinstance(sr, float) else 'N/A' } if xlslog(self.log_filepath, 'Data', log) == 1: logger.debug('log exported to "%s"', self.log_filepath) else: logger.error('log export to "%s" aborted', self.log_filepath) return output_filepath except (Warning, AssertionError) as inst: logger.warning('Integration error: %s. Continue batch? (y/n)', extra={inst}) user_str = input() if user_str not in ['y', 'Y']: return -1 def __str__(self): worker_str = 'E-STIM simulation {}/{}: {} neuron, A = {}A/m2, t = {}s'\ .format(self.id, self.nsims, self.neuron.name, si_format(self.Astim * 1e-3, 2, space=' '), si_format(self.tstim, 1, space=' ')) if self.DC < 1.0: worker_str += ', PRF = {}Hz, DC = {:.2f}%'\ .format(si_format(self.PRF, 2, space=' '), self.DC * 1e2) return worker_str +class MechWorker(): + ''' Worker class that runs a single simulation of the mechanical system with specific parameters + and an imposed value of charge density, and save the results in a PKL file. ''' + + def __init__(self, wid, batch_dir, log_filepath, bls, Fdrive, Adrive, Qm, nsims): + ''' Class constructor. + + :param wid: worker ID + :param batch_dir: full path to output directory of batch + :param log_filepath: full path log file of batch + :param bls: BilayerSonophore instance + :param Fdrive: acoustic drive frequency (Hz) + :param Adrive: acoustic drive amplitude (Pa) + :param Qm: applided membrane charge density (C/m2) + :param nsims: total number or simulations + ''' + + self.id = wid + self.batch_dir = batch_dir + self.log_filepath = log_filepath + self.bls = bls + self.Fdrive = Fdrive + self.Adrive = Adrive + self.Qm = Qm + self.nsims = nsims + + def __call__(self): + ''' Method that runs the simulation. ''' + + simcode = 'MECH_{:.0f}nm_{:.0f}kHz_{:.1f}kPa_{:.1f}nCcm2'\ + .format(self.bls.a * 1e9, self.Fdrive * 1e-3, self.Adrive * 1e-3, self.Qm * 1e5) + + try: + + # Get date and time info + date_str = time.strftime("%Y.%m.%d") + daytime_str = time.strftime("%H:%M:%S") + + # Run simulation + tstart = time.time() + (t, y, states) = self.bls.run(self.Fdrive, self.Adrive, self.Qm) + (Z, ng) = y + + U = np.insert(np.diff(Z) / np.diff(t), 0, 0.0) + tcomp = time.time() - tstart + logger.debug('completed in %ss', si_format(tcomp, 1)) + + # Store dataframe and metadata + df = pd.DataFrame({'t': t, 'states': states, 'U': U, 'Z': Z, 'ng': ng}) + meta = {'a': self.bls.a, 'd': self.bls.d, 'Cm0': self.bls.Cm0, 'Qm0': self.bls.Qm0, + 'Fdrive': self.Fdrive, 'Adrive': self.Adrive, 'phi': np.pi, 'Qm': self.Qm, + 'tcomp': tcomp} + + # Export into to PKL file + output_filepath = '{}/{}.pkl'.format(self.batch_dir, simcode) + with open(output_filepath, 'wb') as fh: + pickle.dump({'meta': meta, 'data': df}, fh) + logger.debug('simulation data exported to "%s"', output_filepath) + + # Compute key output metrics + Zmax = np.amax(Z) + Zmin = np.amin(Z) + Zabs_max = np.amax(np.abs([Zmin, Zmax])) + eAmax = self.bls.arealstrain(Zabs_max) + Tmax = self.bls.TEtot(Zabs_max) + Pmmax = self.bls.PMavgpred(Zmin) + ngmax = np.amax(ng) + dUdtmax = np.amax(np.abs(np.diff(U) / np.diff(t)**2)) + + # Export key metrics to log file + log = { + 'A': date_str, + 'B': daytime_str, + 'C': self.bls.a * 1e9, + 'D': self.bls.d * 1e6, + 'E': self.Fdrive * 1e-3, + 'F': self.Adrive * 1e-3, + 'G': self.Qm * 1e5, + 'H': t.size, + 'I': tcomp, + 'J': self.bls.kA + self.bls.kA_tissue, + 'K': Zmax * 1e9, + 'L': eAmax, + 'M': Tmax * 1e3, + 'N': (ngmax - self.bls.ng0) / self.bls.ng0, + 'O': Pmmax * 1e-3, + 'P': dUdtmax + } + + if xlslog(self.log_filepath, 'Data', log) == 1: + logger.info('log exported to "%s"', self.log_filepath) + else: + logger.error('log export to "%s" aborted', self.log_filepath) + + return output_filepath + + except (Warning, AssertionError) as inst: + logger.warning('Integration error: %s. Continue batch? (y/n)', extra={inst}) + user_str = input() + if user_str not in ['y', 'Y']: + return -1 + + def __str__(self): + + return 'Mechanical simulation {}/{}: a = {}m, d = {}m, f = {}Hz, A = {}Pa, Q = {}C/cm2'\ + .format(self.id, self.nsims, + *si_format([self.bls.a, self.bls.d, self.Fdrive], 1, space=' '), + *si_format([self.Adrive, self.Qm * 1e-4], 2, space=' ')) + + + # Naming nomenclature for output files MECH_code = 'MECH_{:.0f}nm_{:.0f}kHz_{:.1f}kPa_{:.1f}nCcm2' ESTIM_CW_code = 'ESTIM_{}_CW_{:.1f}mA_per_m2_{:.0f}ms' ESTIM_PW_code = 'ESTIM_{}_PW_{:.1f}mA_per_m2_{:.0f}ms_PRF{:.2f}Hz_DC{:.2f}%' ASTIM_CW_code = 'ASTIM_{}_CW_{:.0f}nm_{:.0f}kHz_{:.1f}kPa_{:.0f}ms_{}' ASTIM_PW_code = 'ASTIM_{}_PW_{:.0f}nm_{:.0f}kHz_{:.1f}kPa_{:.0f}ms_PRF{:.2f}Hz_DC{:.2f}%_{}' # Parameters units ASTIM_params = { 'f': {'index': 0, 'factor': 1e-3, 'unit': 'kHz'}, 'A': {'index': 1, 'factor': 1e-3, 'unit': 'kPa'}, 't': {'index': 2, 'factor': 1e3, 'unit': 'ms'}, 'PRF': {'index': 4, 'factor': 1e-3, 'unit': 'kHz'}, 'DC': {'index': 5, 'factor': 1e2, 'unit': '%'} } ESTIM_params = { 'A': {'index': 0, 'factor': 1e0, 'unit': 'mA/m2'}, 't': {'index': 1, 'factor': 1e3, 'unit': 'ms'}, 'PRF': {'index': 3, 'factor': 1e-3, 'unit': 'kHz'}, 'DC': {'index': 4, 'factor': 1e2, 'unit': '%'} } # Default geometry default_diam = 32e-9 default_embedding = 0.0e-6 def setBatchDir(): ''' Select batch directory for output files.α :return: full path to batch directory ''' root = tk.Tk() root.withdraw() batch_dir = filedialog.askdirectory() if not batch_dir: raise InputError('No output directory chosen') return batch_dir def checkBatchLog(batch_dir, batch_type): ''' Check for appropriate log file in batch directory, and create one if it is absent. :param batch_dir: full path to batch directory :param batch_type: type of simulation batch :return: 2 tuple with full path to log file and boolean stating if log file was created ''' # Check for directory existence if not os.path.isdir(batch_dir): raise InputError('"{}" output directory does not exist'.format(batch_dir)) # Determine log template from batch type if batch_type == 'MECH': logfile = 'log_MECH.xlsx' elif batch_type == 'A-STIM': logfile = 'log_ASTIM.xlsx' elif batch_type == 'E-STIM': logfile = 'log_ESTIM.xlsx' else: raise InputError('Unknown batch type', batch_type) # Get template in package subdirectory this_dir, _ = os.path.split(__file__) parent_dir = os.path.abspath(os.path.join(this_dir, os.pardir)) logsrc = parent_dir + '/templates/' + logfile assert os.path.isfile(logsrc), 'template log file "{}" not found'.format(logsrc) # Copy template in batch directory if no appropriate log file logdst = batch_dir + '/' + logfile is_log = os.path.isfile(logdst) if not is_log: shutil.copy2(logsrc, logdst) return (logdst, not is_log) def createSimQueue(amps, durations, offsets, PRFs, DCs): ''' Create a serialized 2D array of all parameter combinations for a series of individual parameter sweeps, while avoiding repetition of CW protocols for a given PRF sweep. :param amps: list (or 1D-array) of acoustic amplitudes :param durations: list (or 1D-array) of stimulus durations :param offsets: list (or 1D-array) of stimulus offsets (paired with durations array) :param PRFs: list (or 1D-array) of pulse-repetition frequencies :param DCs: list (or 1D-array) of duty cycle values :return: 2D-array with (amplitude, duration, offset, PRF, DC) for each stimulation protocol ''' # Convert input to 1D-arrays amps = np.array(amps) durations = np.array(durations) offsets = np.array(offsets) PRFs = np.array(PRFs) DCs = np.array(DCs) # Create index arrays iamps = range(len(amps)) idurs = range(len(durations)) # Create empty output matrix queue = np.empty((1, 5)) # Continuous protocols if 1.0 in DCs: nCW = len(amps) * len(durations) arr1 = np.ones(nCW) iCW_queue = np.array(np.meshgrid(iamps, idurs)).T.reshape(nCW, 2) CW_queue = np.vstack((amps[iCW_queue[:, 0]], durations[iCW_queue[:, 1]], offsets[iCW_queue[:, 1]], PRFs.min() * arr1, arr1)).T queue = np.vstack((queue, CW_queue)) # Pulsed protocols if np.any(DCs != 1.0): pulsed_DCs = DCs[DCs != 1.0] iPRFs = range(len(PRFs)) ipulsed_DCs = range(len(pulsed_DCs)) nPW = len(amps) * len(durations) * len(PRFs) * len(pulsed_DCs) iPW_queue = np.array(np.meshgrid(iamps, idurs, iPRFs, ipulsed_DCs)).T.reshape(nPW, 4) PW_queue = np.vstack((amps[iPW_queue[:, 0]], durations[iPW_queue[:, 1]], offsets[iPW_queue[:, 1]], PRFs[iPW_queue[:, 2]], pulsed_DCs[iPW_queue[:, 3]])).T queue = np.vstack((queue, PW_queue)) # Return return queue[1:, :] def xlslog(filename, sheetname, data): """ Append log data on a new row to specific sheet of excel workbook, using a lockfile to avoid read/write errors between concurrent processes. :param filename: absolute or relative path to the Excel workbook :param sheetname: name of the Excel spreadsheet to which data is appended :param data: data structure to be added to specific columns on a new row :return: boolean indicating success (1) or failure (0) of operation """ try: lock = lockfile.FileLock(filename) lock.acquire() wb = load_workbook(filename) ws = wb[sheetname] keys = data.keys() i = 1 row_data = {} for k in keys: row_data[k] = data[k] i += 1 ws.append(row_data) wb.save(filename) lock.release() return 1 except PermissionError: # If file cannot be accessed for writing because already opened logger.warning('Cannot write to "%s". Close the file and type "Y"', filename) user_str = input() if user_str in ['y', 'Y']: return xlslog(filename, sheetname, data) else: return 0 def detectPeaks(x, mph=None, mpd=1, threshold=0, edge='rising', kpsh=False, valley=False, ax=None): ''' Detect peaks in data based on their amplitude and other features. Adapted from Marco Duarte: http://nbviewer.jupyter.org/github/demotu/BMC/blob/master/notebooks/DetectPeaks.ipynb :param x: 1D array_like data. :param mph: minimum peak height (default = None). :param mpd: minimum peak distance in indexes (default = 1) :param threshold : minimum peak prominence (default = 0) :param edge : for a flat peak, keep only the rising edge ('rising'), only the falling edge ('falling'), both edges ('both'), or don't detect a flat peak (None). (default = 'rising') :param kpsh: keep peaks with same height even if they are closer than `mpd` (default = False). :param valley: detect valleys (local minima) instead of peaks (default = False). :param show: plot data in matplotlib figure (default = False). :param ax: a matplotlib.axes.Axes instance, optional (default = None). :return: 1D array with the indices of the peaks ''' print('min peak height:', mph, ', min peak distance:', mpd, ', min peak prominence:', threshold) # Convert input to numpy array x = np.atleast_1d(x).astype('float64') # Revert signal sign for valley detection if valley: x = -x # Differentiate signal dx = np.diff(x) # Find indices of all peaks with edge criterion ine, ire, ife = np.array([[], [], []], dtype=int) if not edge: ine = np.where((np.hstack((dx, 0)) < 0) & (np.hstack((0, dx)) > 0))[0] else: if edge.lower() in ['rising', 'both']: ire = np.where((np.hstack((dx, 0)) <= 0) & (np.hstack((0, dx)) > 0))[0] if edge.lower() in ['falling', 'both']: ife = np.where((np.hstack((dx, 0)) < 0) & (np.hstack((0, dx)) >= 0))[0] ind = np.unique(np.hstack((ine, ire, ife))) # Remove first and last values of x if they are detected as peaks if ind.size and ind[0] == 0: ind = ind[1:] if ind.size and ind[-1] == x.size - 1: ind = ind[:-1] print('{} raw peaks'.format(ind.size)) # Remove peaks < minimum peak height if ind.size and mph is not None: ind = ind[x[ind] >= mph] print('{} height-filtered peaks'.format(ind.size)) # Remove peaks - neighbors < threshold if ind.size and threshold > 0: dx = np.min(np.vstack([x[ind] - x[ind - 1], x[ind] - x[ind + 1]]), axis=0) ind = np.delete(ind, np.where(dx < threshold)[0]) print('{} prominence-filtered peaks'.format(ind.size)) # Detect small peaks closer than minimum peak distance if ind.size and mpd > 1: ind = ind[np.argsort(x[ind])][::-1] # sort ind by peak height idel = np.zeros(ind.size, dtype=bool) for i in range(ind.size): if not idel[i]: # keep peaks with the same height if kpsh is True idel = idel | (ind >= ind[i] - mpd) & (ind <= ind[i] + mpd) \ & (x[ind[i]] > x[ind] if kpsh else True) idel[i] = 0 # Keep current peak # remove the small peaks and sort back the indices by their occurrence ind = np.sort(ind[~idel]) print('{} distance-filtered peaks'.format(ind.size)) return ind def detectPeaksTime(t, y, mph, mtd, mpp=0): """ Extension of the detectPeaks function to detect peaks in data based on their amplitude and time difference, with a non-uniform time vector. :param t: time vector (not necessarily uniform) :param y: signal :param mph: minimal peak height :param mtd: minimal time difference :mpp: minmal peak prominence :return: array of peak indexes """ # Determine whether time vector is uniform (threshold in time step variation) dt = np.diff(t) if (dt.max() - dt.min()) / dt.min() < 1e-2: isuniform = True else: isuniform = False if isuniform: print('uniform time vector') dt = t[1] - t[0] mpd = int(np.ceil(mtd / dt)) ipeaks = detectPeaks(y, mph, mpd=mpd, threshold=mpp) else: print('non-uniform time vector') # Detect peaks on signal with no restriction on inter-peak distance irawpeaks = detectPeaks(y, mph, mpd=1, threshold=mpp) npeaks = irawpeaks.size if npeaks > 0: # Filter relevant peaks with temporal distance ipeaks = [irawpeaks[0]] for i in range(1, npeaks): i1 = ipeaks[-1] i2 = irawpeaks[i] if t[i2] - t[i1] < mtd: if y[i2] > y[i1]: ipeaks[-1] = i2 else: ipeaks.append(i2) else: ipeaks = [] ipeaks = np.array(ipeaks) return ipeaks def detectSpikes(t, Qm, min_amp, min_dt): ''' Detect spikes on a charge density signal, and return their number, latency and rate. :param t: time vector (s) :param Qm: charge density vector (C/m2) :param min_amp: minimal charge amplitude to detect spikes (C/m2) :param min_dt: minimal time interval between 2 spikes (s) :return: 3-tuple with number of spikes, latency (s) and spike rate (sp/s) ''' i_spikes = detectPeaksTime(t, Qm, min_amp, min_dt) if len(i_spikes) > 0: latency = t[i_spikes[0]] # s n_spikes = i_spikes.size if n_spikes > 1: first_to_last_spike = t[i_spikes[-1]] - t[i_spikes[0]] # s spike_rate = (n_spikes - 1) / first_to_last_spike # spikes/s else: spike_rate = 'N/A' else: latency = 'N/A' spike_rate = 'N/A' n_spikes = 0 return (n_spikes, latency, spike_rate) def findPeaks(y, mph=None, mpd=None, mpp=None): ''' Detect peaks in a signal based on their height, prominence and/or separating distance. :param y: signal vector :param mph: minimum peak height (in signal units, default = None). :param mpd: minimum inter-peak distance (in indexes, default = None) :param mpp: minimum peak prominence (in signal units, default = None) :return: 4-tuple of arrays with the indexes of peaks occurence, peaks prominence, peaks width at half-prominence and peaks half-prominence bounds (left and right) Adapted from: - Marco Duarte's detect_peaks function (http://nbviewer.jupyter.org/github/demotu/BMC/blob/master/notebooks/DetectPeaks.ipynb) - MATLAB findpeaks function (https://ch.mathworks.com/help/signal/ref/findpeaks.html) ''' # Define empty output empty = (np.array([]),) * 4 # Find all peaks and valleys dy = np.diff(y) s = np.sign(dy) ipeaks = np.where(np.diff(s) < 0)[0] + 1 ivalleys = np.where(np.diff(s) > 0)[0] + 1 # Return empty output if no peak detected if ipeaks.size == 0: return empty # Ensure each peak is bounded by two valleys, adding signal boundaries as valleys if necessary if ivalleys.size == 0 or ipeaks[0] < ivalleys[0]: ivalleys = np.insert(ivalleys, 0, 0) if ipeaks[-1] > ivalleys[-1]: ivalleys = np.insert(ivalleys, ivalleys.size, y.size - 1) # assert ivalleys.size - ipeaks.size == 1, 'Number of peaks and valleys not matching' if ivalleys.size - ipeaks.size != 1: logger.warning('detection incongruity: %u peaks vs. %u valleys detected', ipeaks.size, ivalleys.size) return empty # Remove peaks < minimum peak height if mph is not None: ipeaks = ipeaks[y[ipeaks] >= mph] if ipeaks.size == 0: return empty # Detect small peaks closer than minimum peak distance if mpd is not None: ipeaks = ipeaks[np.argsort(y[ipeaks])][::-1] # sort ipeaks by descending peak height idel = np.zeros(ipeaks.size, dtype=bool) # initialize boolean deletion array (all false) for i in range(ipeaks.size): # for each peak if not idel[i]: # if not marked for deletion closepeaks = (ipeaks >= ipeaks[i] - mpd) & (ipeaks <= ipeaks[i] + mpd) # close peaks idel = idel | closepeaks # mark for deletion along with previously marked peaks # idel = idel | (ipeaks >= ipeaks[i] - mpd) & (ipeaks <= ipeaks[i] + mpd) idel[i] = 0 # keep current peak # remove the small peaks and sort back the indices by their occurrence ipeaks = np.sort(ipeaks[~idel]) # Detect smallest valleys between consecutive relevant peaks ibottomvalleys = [] if ipeaks[0] > ivalleys[0]: itrappedvalleys = ivalleys[ivalleys < ipeaks[0]] ibottomvalleys.append(itrappedvalleys[np.argmin(y[itrappedvalleys])]) for i, j in zip(ipeaks[:-1], ipeaks[1:]): itrappedvalleys = ivalleys[np.logical_and(ivalleys > i, ivalleys < j)] ibottomvalleys.append(itrappedvalleys[np.argmin(y[itrappedvalleys])]) if ipeaks[-1] < ivalleys[-1]: itrappedvalleys = ivalleys[ivalleys > ipeaks[-1]] ibottomvalleys.append(itrappedvalleys[np.argmin(y[itrappedvalleys])]) ipeaks = ipeaks ivalleys = np.array(ibottomvalleys, dtype=int) # Ensure each peak is bounded by two valleys, adding signal boundaries as valleys if necessary if ipeaks[0] < ivalleys[0]: ivalleys = np.insert(ivalleys, 0, 0) if ipeaks[-1] > ivalleys[-1]: ivalleys = np.insert(ivalleys, ivalleys.size, y.size - 1) # Remove peaks < minimum peak prominence if mpp is not None: # Compute peaks prominences as difference between peaks and their closest valley prominences = y[ipeaks] - np.amax((y[ivalleys[:-1]], y[ivalleys[1:]]), axis=0) # initialize peaks and valleys deletion tables idelp = np.zeros(ipeaks.size, dtype=bool) idelv = np.zeros(ivalleys.size, dtype=bool) # for each peak (sorted by ascending prominence order) for ind in np.argsort(prominences): ipeak = ipeaks[ind] # get peak index # get peak bases as first valleys on either side not marked for deletion indleftbase = ind indrightbase = ind + 1 while idelv[indleftbase]: indleftbase -= 1 while idelv[indrightbase]: indrightbase += 1 ileftbase = ivalleys[indleftbase] irightbase = ivalleys[indrightbase] # Compute peak prominence and mark for deletion if < mpp indmaxbase = indleftbase if y[ileftbase] > y[irightbase] else indrightbase if y[ipeak] - y[ivalleys[indmaxbase]] < mpp: idelp[ind] = True # mark peak for deletion idelv[indmaxbase] = True # mark highest surrouding valley for deletion # remove irrelevant peaks and valleys, and sort back the indices by their occurrence ipeaks = np.sort(ipeaks[~idelp]) ivalleys = np.sort(ivalleys[~idelv]) if ipeaks.size == 0: return empty # Compute peaks prominences and reference half-prominence levels prominences = y[ipeaks] - np.amax((y[ivalleys[:-1]], y[ivalleys[1:]]), axis=0) refheights = y[ipeaks] - prominences / 2 # Compute half-prominence bounds ibounds = np.empty((ipeaks.size, 2)) for i in range(ipeaks.size): # compute the index of the left-intercept at half max ileft = ipeaks[i] while ileft >= ivalleys[i] and y[ileft] > refheights[i]: ileft -= 1 if ileft < ivalleys[i]: # intercept exactly on valley ibounds[i, 0] = ivalleys[i] else: # interpolate intercept linearly between signal boundary points a = (y[ileft + 1] - y[ileft]) / 1 b = y[ileft] - a * ileft ibounds[i, 0] = (refheights[i] - b) / a # compute the index of the right-intercept at half max iright = ipeaks[i] while iright <= ivalleys[i + 1] and y[iright] > refheights[i]: iright += 1 if iright > ivalleys[i + 1]: # intercept exactly on valley ibounds[i, 1] = ivalleys[i + 1] else: # interpolate intercept linearly between signal boundary points if iright == y.size - 1: # special case: if end of signal is reached, decrement iright iright -= 1 a = (y[iright + 1] - y[iright]) / 1 b = y[iright] - a * iright ibounds[i, 1] = (refheights[i] - b) / a # Compute peaks widths at half-prominence widths = np.diff(ibounds, axis=1) return (ipeaks, prominences, widths, ibounds) -def runMech(batch_dir, log_filepath, bls, Fdrive, Adrive, Qm): - ''' Run a single simulation of the mechanical system with specific parameters and - an imposed value of charge density, and save the results in a PKL file. - - :param batch_dir: full path to output directory of batch - :param log_filepath: full path log file of batch - :param bls: BilayerSonophore instance - :param Fdrive: acoustic drive frequency (Hz) - :param Adrive: acoustic drive amplitude (Pa) - :param Qm: applided membrane charge density (C/m2) - :return: full path to the output file - ''' - - simcode = MECH_code.format(bls.a * 1e9, Fdrive * 1e-3, Adrive * 1e-3, Qm * 1e5) - - # Get date and time info - date_str = time.strftime("%Y.%m.%d") - daytime_str = time.strftime("%H:%M:%S") - - # Run simulation - tstart = time.time() - (t, y, states) = bls.run(Fdrive, Adrive, Qm) - (Z, ng) = y - - U = np.insert(np.diff(Z) / np.diff(t), 0, 0.0) - tcomp = time.time() - tstart - logger.debug('completed in %ss', si_format(tcomp, 1)) - - # Store dataframe and metadata - df = pd.DataFrame({'t': t, 'states': states, 'U': U, 'Z': Z, 'ng': ng}) - meta = {'a': bls.a, 'd': bls.d, 'Cm0': bls.Cm0, 'Qm0': bls.Qm0, 'Fdrive': Fdrive, - 'Adrive': Adrive, 'phi': np.pi, 'Qm': Qm, 'tcomp': tcomp} - - # Export into to PKL file - output_filepath = '{}/{}.pkl'.format(batch_dir, simcode) - with open(output_filepath, 'wb') as fh: - pickle.dump({'meta': meta, 'data': df}, fh) - logger.debug('simulation data exported to "%s"', output_filepath) - - # Compute key output metrics - Zmax = np.amax(Z) - Zmin = np.amin(Z) - Zabs_max = np.amax(np.abs([Zmin, Zmax])) - eAmax = bls.arealstrain(Zabs_max) - Tmax = bls.TEtot(Zabs_max) - Pmmax = bls.PMavgpred(Zmin) - ngmax = np.amax(ng) - dUdtmax = np.amax(np.abs(np.diff(U) / np.diff(t)**2)) - - # Export key metrics to log file - log = { - 'A': date_str, - 'B': daytime_str, - 'C': bls.a * 1e9, - 'D': bls.d * 1e6, - 'E': Fdrive * 1e-3, - 'F': Adrive * 1e-3, - 'G': Qm * 1e5, - 'H': t.size, - 'I': tcomp, - 'J': bls.kA + bls.kA_tissue, - 'K': Zmax * 1e9, - 'L': eAmax, - 'M': Tmax * 1e3, - 'N': (ngmax - bls.ng0) / bls.ng0, - 'O': Pmmax * 1e-3, - 'P': dUdtmax - } - - if xlslog(log_filepath, 'Data', log) == 1: - logger.info('log exported to "%s"', log_filepath) - else: - logger.error('log export to "%s" aborted', log_filepath) - - return output_filepath - - -def runMechBatch(batch_dir, log_filepath, Cm0, Qm0, stim_params, a=default_diam, d=default_embedding): +def runMechBatch(batch_dir, log_filepath, Cm0, Qm0, stim_params, a=default_diam, d=default_embedding, + multiprocess=False): ''' Run batch simulations of the mechanical system with imposed values of charge density, for various sonophore spans and stimulation parameters. :param batch_dir: full path to output directory of batch :param log_filepath: full path log file of batch :param Cm0: membrane resting capacitance (F/m2) :param Qm0: membrane resting charge density (C/m2) :param stim_params: dictionary containing sweeps for all stimulation parameters :param a: BLS in-plane diameter (m) :param d: depth of embedding tissue around plasma membrane (m) + :param multiprocess: boolean statting wether or not to use multiprocessing ''' # Checking validity of stimulation parameters mandatory_params = ['freqs', 'amps', 'charges'] - for mp in mandatory_params: - if mp not in stim_params: - raise InputError('Missing stimulation parameter field: "{}"'.format(mp)) - - # Define logging format - MECH_log = ('Mechanical simulation %u/%u (a = %sm, d = %sm, f = %sHz, A = %sPa, Q = %sC/cm2)') + for mparam in mandatory_params: + if mparam not in stim_params: + raise InputError('Missing stimulation parameter field: "{}"'.format(mparam)) logger.info("Starting mechanical simulation batch") # Unpack stimulation parameters amps = np.array(stim_params['amps']) charges = np.array(stim_params['charges']) # Generate simulations queue nA = len(amps) nQ = len(charges) sim_queue = np.array(np.meshgrid(amps, charges)).T.reshape(nA * nQ, 2) nqueue = sim_queue.shape[0] + # Initiate multiple processing objects if needed + if multiprocess: + + mp.freeze_support() + + # Create tasks and results queues + tasks = mp.JoinableQueue() + results = mp.Queue() + + # Create and start consumer processes + nconsumers = mp.cpu_count() + consumers = [Consumer(tasks, results) for i in range(nconsumers)] + for w in consumers: + w.start() + # Run simulations nsims = len(stim_params['freqs']) * nqueue - simcount = 0 + wid = 0 filepaths = [] for Fdrive in stim_params['freqs']: # Create BilayerSonophore instance (modulus of embedding tissue depends on frequency) bls = BilayerSonophore(a, Fdrive, Cm0, Qm0, d) for i in range(nqueue): - simcount += 1 + wid += 1 Adrive, Qm = sim_queue[i, :] + worker = MechWorker(wid, batch_dir, log_filepath, bls, Fdrive, Adrive, Qm, nsims) + if multiprocess: + tasks.put(worker, block=False) + else: + logger.info('%s', worker) + output_filepath = worker.__call__() + filepaths.append(output_filepath) - # Log - logger.info(MECH_log, simcount, nsims, si_format(a, 1), si_format(d, 1), - si_format(Fdrive, 1), si_format(Adrive, 2), si_format(Qm * 1e-4)) + if multiprocess: + # Stop processes + for i in range(nconsumers): + tasks.put(None, block=False) + tasks.join() - # Run simulation - try: - output_filepath = runMech(batch_dir, log_filepath, bls, Fdrive, Adrive, Qm) - filepaths.append(output_filepath) - except (Warning, AssertionError) as inst: - logger.warning('Integration error: %s. Continue batch? (y/n)', extra={inst}) - user_str = input() - if user_str not in ['y', 'Y']: - return filepaths + # Retrieve workers output + for x in range(nsims): + output_filepath = results.get() + filepaths.append(output_filepath) + + # Close tasks and results queues + tasks.close() + results.close() return filepaths def titrateEStim(solver, neuron, Astim, tstim, toffset, PRF=1.5e3, DC=1.0): """ Use a dichotomic recursive search to determine the threshold value of a specific electric stimulation parameter needed to obtain neural excitation, keeping all other parameters fixed. The titration parameter can be stimulation amplitude, duration or any variable for which the number of spikes is a monotonically increasing function. This function is called recursively until an accurate threshold is found. :param solver: solver instance :param neuron: neuron object :param Astim: injected current density amplitude (mA/m2) :param tstim: duration of US stimulation (s) :param toffset: duration of the offset (s) :param PRF: pulse repetition frequency (Hz) :param DC: pulse duty cycle (-) :return: 5-tuple with the determined amplitude threshold, time profile, solution matrix, state vector and response latency """ # Determine titration type if isinstance(Astim, tuple): t_type = 'A' interval = Astim thr = TITRATION_ESTIM_DA_MAX maxval = TITRATION_ESTIM_A_MAX elif isinstance(tstim, tuple): t_type = 't' interval = tstim thr = TITRATION_DT_THR maxval = TITRATION_T_MAX elif isinstance(DC, tuple): t_type = 'DC' interval = DC thr = TITRATION_DDC_THR maxval = TITRATION_DC_MAX else: logger.error('Invalid titration type') return 0. t_var = ESTIM_params[t_type] # Check amplitude interval and define current value if interval[0] >= interval[1]: raise InputError('Invaid {} interval: {} (must be defined as [lb, ub])' .format(t_type, interval)) value = (interval[0] + interval[1]) / 2 # Define stimulation parameters if t_type == 'A': stim_params = [value, tstim, toffset, PRF, DC] elif t_type == 't': stim_params = [Astim, value, toffset, PRF, DC] elif t_type == 'DC': stim_params = [Astim, tstim, toffset, PRF, value] # Run simulation and detect spikes (t, y, states) = solver.run(neuron, *stim_params) # n_spikes, latency, _ = detectSpikes(t, y[0, :], SPIKE_MIN_VAMP, SPIKE_MIN_DT) dt = t[1] - t[0] ipeaks, *_ = findPeaks(y[0, :], SPIKE_MIN_VAMP, int(np.ceil(SPIKE_MIN_DT / dt)), SPIKE_MIN_VPROM) n_spikes = ipeaks.size latency = t[ipeaks[0]] if n_spikes > 0 else None logger.debug('%.2f %s ---> %u spike%s detected', value * t_var['factor'], t_var['unit'], n_spikes, "s" if n_spikes > 1 else "") # If accurate threshold is found, return simulation results if (interval[1] - interval[0]) <= thr and n_spikes == 1: return (value, t, y, states, latency) # Otherwise, refine titration interval and iterate recursively else: if n_spikes == 0: if (maxval - interval[1]) <= thr: # if upper bound too close to max then stop logger.warning('no spikes detected within titration interval') return (np.nan, t, y, states, latency) new_interval = (value, interval[1]) else: new_interval = (interval[0], value) stim_params[t_var['index']] = new_interval return titrateEStim(solver, neuron, *stim_params) def runEStimBatch(batch_dir, log_filepath, neurons, stim_params, multiprocess=False): ''' Run batch E-STIM simulations of the system for various neuron types and stimulation parameters. :param batch_dir: full path to output directory of batch :param log_filepath: full path log file of batch :param neurons: list of neurons names :param stim_params: dictionary containing sweeps for all stimulation parameters :param multiprocess: boolean statting wether or not to use multiprocessing :return: list of full paths to the output files ''' mandatory_params = ['amps', 'durations', 'offsets', 'PRFs', 'DCs'] for mparam in mandatory_params: if mparam not in stim_params: raise InputError('Missing stimulation parameter field: "{}"'.format(mparam)) logger.info("Starting E-STIM simulation batch") # Generate simulations queue sim_queue = createSimQueue(stim_params['amps'], stim_params['durations'], stim_params['offsets'], stim_params['PRFs'], stim_params['DCs']) nqueue = sim_queue.shape[0] # Initialize solver solver = SolverElec() # Initiate multiple processing objects if needed if multiprocess: mp.freeze_support() # Create tasks and results queues tasks = mp.JoinableQueue() results = mp.Queue() # Create and start consumer processes nconsumers = mp.cpu_count() consumers = [Consumer(tasks, results) for i in range(nconsumers)] for w in consumers: w.start() # Run simulations nsims = len(neurons) * nqueue wid = 0 filepaths = [] for nname in neurons: neuron = getNeuronsDict()[nname]() # Run simulations in queue for i in range(nqueue): wid += 1 Astim, tstim, toffset, PRF, DC = sim_queue[i, :] worker = EStimWorker(wid, batch_dir, log_filepath, solver, neuron, Astim, tstim, toffset, PRF, DC, nsims) if multiprocess: tasks.put(worker, block=False) else: logger.info('%s', worker) output_filepath = worker.__call__() filepaths.append(output_filepath) if multiprocess: # Stop processes for i in range(nconsumers): tasks.put(None, block=False) tasks.join() # Retrieve workers output for x in range(nsims): output_filepath = results.get() filepaths.append(output_filepath) # Close tasks and results queues tasks.close() results.close() return filepaths def titrateEStimBatch(batch_dir, log_filepath, neurons, stim_params): ''' Run batch electrical titrations of the system for various neuron types and stimulation parameters, to determine the threshold of a specific stimulus parameter for neural excitation. :param batch_dir: full path to output directory of batch :param log_filepath: full path log file of batch :param neurons: list of neurons names :param stim_params: dictionary containing sweeps for all stimulation parameters :return: list of full paths to the output files ''' # Define logging format ESTIM_titration_log = '%s neuron - E-STIM titration %u/%u (%s)' logger.info("Starting E-STIM titration batch") # Determine titration parameter and titrations list if 'durations' not in stim_params: t_type = 't' sim_queue = createSimQueue(stim_params['amps'], [None], [TITRATION_T_OFFSET], stim_params['PRFs'], stim_params['DCs']) elif 'amps' not in stim_params: t_type = 'A' sim_queue = createSimQueue([None], stim_params['durations'], [TITRATION_T_OFFSET] * len(stim_params['durations']), stim_params['PRFs'], stim_params['DCs']) elif 'DC' not in stim_params: t_type = 'DC' sim_queue = createSimQueue(stim_params['amps'], stim_params['durations'], [TITRATION_T_OFFSET] * len(stim_params['durations']), stim_params['PRFs'], [None]) nqueue = sim_queue.shape[0] t_var = ESTIM_params[t_type] # Create SolverElec instance solver = SolverElec() # Run titrations nsims = len(neurons) * nqueue simcount = 0 filepaths = [] for nname in neurons: neuron = getNeuronsDict()[nname]() for i in range(nqueue): simcount += 1 # Extract parameters Astim, tstim, toffset, PRF, DC = sim_queue[i, :] if Astim is None: Astim = (0., 2 * TITRATION_ESTIM_A_MAX) elif tstim is None: tstim = (0., 2 * TITRATION_T_MAX) elif DC is None: DC = (0., 2 * TITRATION_DC_MAX) curr_params = [Astim, tstim, PRF, DC] # Generate log str log_str = '' pnames = list(ESTIM_params.keys()) j = 0 for cp in curr_params: pn = pnames[j] pi = ESTIM_params[pn] if not isinstance(cp, tuple): if log_str: log_str += ', ' log_str += '{} = {:.2f} {}'.format(pn, pi['factor'] * cp, pi['unit']) j += 1 # Get date and time info date_str = time.strftime("%Y.%m.%d") daytime_str = time.strftime("%H:%M:%S") # Log logger.info(ESTIM_titration_log, neuron.name, simcount, nsims, log_str) # Run titration tstart = time.time() try: (output_thr, t, y, states, lat) = titrateEStim(solver, neuron, Astim, tstim, toffset, PRF, DC) Vm, *channels = y tcomp = time.time() - tstart logger.info('completed in %ss, threshold = %.2f %s', si_format(tcomp, 2), output_thr * t_var['factor'], t_var['unit']) # Determine output variable if t_type == 'A': Astim = output_thr elif t_type == 't': tstim = output_thr elif t_type == 'DC': DC = output_thr # Define output naming if DC == 1.0: simcode = ESTIM_CW_code.format(neuron.name, Astim, tstim * 1e3) else: simcode = ESTIM_PW_code.format(neuron.name, Astim, tstim * 1e3, PRF, DC * 1e2) # Store dataframe and metadata df = pd.DataFrame({'t': t, 'states': states, 'Vm': Vm}) for j in range(len(neuron.states_names)): df[neuron.states_names[j]] = channels[j] meta = {'neuron': neuron.name, 'Astim': Astim, 'tstim': tstim, 'toffset': toffset, 'PRF': PRF, 'DC': DC, 'tcomp': tcomp} # Export into to PKL file output_filepath = '{}/{}.pkl'.format(batch_dir, simcode) with open(output_filepath, 'wb') as fh: pickle.dump({'meta': meta, 'data': df}, fh) logger.info('simulation data exported to "%s"', output_filepath) filepaths.append(output_filepath) # Detect spikes on Qm signal # n_spikes, lat, sr = detectSpikes(t, Vm, SPIKE_MIN_VAMP, SPIKE_MIN_DT) dt = t[1] - t[0] ipeaks, *_ = findPeaks(Vm, SPIKE_MIN_VAMP, int(np.ceil(SPIKE_MIN_DT / dt)), SPIKE_MIN_VPROM) n_spikes = ipeaks.size lat = t[ipeaks[0]] if n_spikes > 0 else 'N/A' sr = np.mean(1 / np.diff(t[ipeaks])) if n_spikes > 1 else 'N/A' logger.info('%u spike%s detected', n_spikes, "s" if n_spikes > 1 else "") # Export key metrics to log file log = { 'A': date_str, 'B': daytime_str, 'C': neuron.name, 'D': Astim, 'E': tstim * 1e3, 'F': PRF * 1e-3 if DC < 1 else 'N/A', 'G': DC, 'H': t.size, 'I': round(tcomp, 2), 'J': n_spikes, 'K': lat * 1e3 if isinstance(lat, float) else 'N/A', 'L': sr * 1e-3 if isinstance(sr, float) else 'N/A' } if xlslog(log_filepath, 'Data', log) == 1: logger.info('log exported to "%s"', log_filepath) else: logger.error('log export to "%s" aborted', log_filepath) except (Warning, AssertionError) as inst: logger.warning('Integration error: %s. Continue batch? (y/n)', extra={inst}) user_str = input() if user_str not in ['y', 'Y']: return filepaths return filepaths def titrateAStim(solver, neuron, Fdrive, Adrive, tstim, toffset, PRF=1.5e3, DC=1.0, int_method='effective'): """ Use a dichotomic recursive search to determine the threshold value of a specific acoustic stimulation parameter needed to obtain neural excitation, keeping all other parameters fixed. The titration parameter can be stimulation amplitude, duration or any variable for which the number of spikes is a monotonically increasing function. This function is called recursively until an accurate threshold is found. :param solver: solver instance :param neuron: neuron object :param Fdrive: acoustic drive frequency (Hz) :param Adrive: acoustic drive amplitude (Pa) :param tstim: duration of US stimulation (s) :param toffset: duration of the offset (s) :param PRF: pulse repetition frequency (Hz) :param DC: pulse duty cycle (-) :param int_method: selected integration method :return: 5-tuple with the determined amplitude threshold, time profile, solution matrix, state vector and response latency """ # Determine titration type if isinstance(Adrive, tuple): t_type = 'A' interval = Adrive thr = TITRATION_ASTIM_DA_MAX maxval = TITRATION_ASTIM_A_MAX elif isinstance(tstim, tuple): t_type = 't' interval = tstim thr = TITRATION_DT_THR maxval = TITRATION_T_MAX elif isinstance(DC, tuple): t_type = 'DC' interval = DC thr = TITRATION_DDC_THR maxval = TITRATION_DC_MAX else: logger.error('Invalid titration type') return 0. t_var = ASTIM_params[t_type] # Check amplitude interval and define current value if interval[0] >= interval[1]: raise InputError('Invaid {} interval: {} (must be defined as [lb, ub])' .format(t_type, interval)) value = (interval[0] + interval[1]) / 2 # Define stimulation parameters if t_type == 'A': stim_params = [Fdrive, value, tstim, toffset, PRF, DC] elif t_type == 't': stim_params = [Fdrive, Adrive, value, toffset, PRF, DC] elif t_type == 'DC': stim_params = [Fdrive, Adrive, tstim, toffset, PRF, value] # Run simulation and detect spikes (t, y, states) = solver.run(neuron, *stim_params, int_method) # n_spikes, latency, _ = detectSpikes(t, y[2, :], SPIKE_MIN_QAMP, SPIKE_MIN_DT) dt = t[1] - t[0] ipeaks, *_ = findPeaks(y[2, :], SPIKE_MIN_QAMP, int(np.ceil(SPIKE_MIN_DT / dt)), SPIKE_MIN_QPROM) n_spikes = ipeaks.size latency = t[ipeaks[0]] if n_spikes > 0 else None logger.debug('%.2f %s ---> %u spike%s detected', value * t_var['factor'], t_var['unit'], n_spikes, "s" if n_spikes > 1 else "") # If accurate threshold is found, return simulation results if (interval[1] - interval[0]) <= thr and n_spikes == 1: return (value, t, y, states, latency) # Otherwise, refine titration interval and iterate recursively else: if n_spikes == 0: if (maxval - interval[1]) <= thr: # if upper bound too close to max then stop logger.warning('no spikes detected within titration interval') return (np.nan, t, y, states, latency) new_interval = (value, interval[1]) else: new_interval = (interval[0], value) stim_params[t_var['index']] = new_interval return titrateAStim(solver, neuron, *stim_params, int_method) def runAStimBatch(batch_dir, log_filepath, neurons, stim_params, a=default_diam, int_method='effective', multiprocess=False): ''' Run batch simulations of the system for various neuron types, sonophore and stimulation parameters. :param batch_dir: full path to output directory of batch :param log_filepath: full path log file of batch :param neurons: list of neurons names :param stim_params: dictionary containing sweeps for all stimulation parameters :param a: BLS structure diameter (m) :param int_method: selected integration method :param multiprocess: boolean statting wether or not to use multiprocessing :return: list of full paths to the output files ''' mandatory_params = ['freqs', 'amps', 'durations', 'offsets', 'PRFs', 'DCs'] for mparam in mandatory_params: if mparam not in stim_params: raise InputError('Missing stimulation parameter field: "{}"'.format(mparam)) - # Define logging format - ASTIM_CW_log = 'A-STIM %s simulation %u/%u: %s neuron, a = %sm, f = %sHz, A = %sPa, t = %ss' - ASTIM_PW_log = ('A-STIM %s simulation %u/%u: %s neuron, a = %sm, f = %sHz, ' - 'A = %sPa, t = %ss, PRF = %sHz, DC = %.2f%%') - logger.info("Starting A-STIM simulation batch") # Generate simulations queue sim_queue = createSimQueue(stim_params['amps'], stim_params['durations'], stim_params['offsets'], stim_params['PRFs'], stim_params['DCs']) nqueue = sim_queue.shape[0] # Initiate multiple processing objects if needed if multiprocess: mp.freeze_support() # Create tasks and results queues tasks = mp.JoinableQueue() results = mp.Queue() # Create and start consumer processes nconsumers = mp.cpu_count() consumers = [Consumer(tasks, results) for i in range(nconsumers)] for w in consumers: w.start() # Run simulations nsims = len(neurons) * len(stim_params['freqs']) * nqueue wid = 0 filepaths = [] for nname in neurons: neuron = getNeuronsDict()[nname]() for Fdrive in stim_params['freqs']: # Initialize SolverUS solver = SolverUS(a, neuron, Fdrive) # Run simulations in queue for i in range(nqueue): wid += 1 Adrive, tstim, toffset, PRF, DC = sim_queue[i, :] worker = AStimWorker(wid, batch_dir, log_filepath, solver, neuron, Fdrive, Adrive, tstim, toffset, PRF, DC, int_method, nsims) if multiprocess: tasks.put(worker, block=False) else: logger.info('%s', worker) output_filepath = worker.__call__() filepaths.append(output_filepath) if multiprocess: # Stop processes for i in range(nconsumers): tasks.put(None, block=False) tasks.join() # Retrieve workers output for x in range(nsims): output_filepath = results.get() filepaths.append(output_filepath) # Close tasks and results queues tasks.close() results.close() return filepaths def titrateAStimBatch(batch_dir, log_filepath, neurons, stim_params, a=default_diam, int_method='effective'): ''' Run batch acoustic titrations of the system for various neuron types, sonophore and stimulation parameters, to determine the threshold of a specific stimulus parameter for neural excitation. :param batch_dir: full path to output directory of batch :param log_filepath: full path log file of batch :param neurons: list of neurons names :param stim_params: dictionary containing sweeps for all stimulation parameters :param a: BLS structure diameter (m) :param int_method: selected integration method :return: list of full paths to the output files ''' # Define logging format ASTIM_titration_log = '%s neuron - A-STIM titration %u/%u (a = %sm, %s)' logger.info("Starting A-STIM titration batch") # Define default parameters int_method = 'effective' # Determine titration parameter and titrations list if 'durations' not in stim_params: t_type = 't' sim_queue = createSimQueue(stim_params['amps'], [None], [TITRATION_T_OFFSET], stim_params['PRFs'], stim_params['DCs']) elif 'amps' not in stim_params: t_type = 'A' sim_queue = createSimQueue([None], stim_params['durations'], [TITRATION_T_OFFSET] * len(stim_params['durations']), stim_params['PRFs'], stim_params['DCs']) elif 'DC' not in stim_params: t_type = 'DC' sim_queue = createSimQueue(stim_params['amps'], stim_params['durations'], [TITRATION_T_OFFSET] * len(stim_params['durations']), stim_params['PRFs'], [None]) nqueue = sim_queue.shape[0] t_var = ASTIM_params[t_type] # Run titrations nsims = len(neurons) * len(stim_params['freqs']) * nqueue simcount = 0 filepaths = [] for nname in neurons: neuron = getNeuronsDict()[nname]() for Fdrive in stim_params['freqs']: # Create SolverUS instance (modulus of embedding tissue depends on frequency) solver = SolverUS(a, neuron, Fdrive) for i in range(nqueue): simcount += 1 # Extract parameters Adrive, tstim, toffset, PRF, DC = sim_queue[i, :] if Adrive is None: Adrive = (0., 2 * TITRATION_ASTIM_A_MAX) elif tstim is None: tstim = (0., 2 * TITRATION_T_MAX) elif DC is None: DC = (0., 2 * TITRATION_DC_MAX) curr_params = [Fdrive, Adrive, tstim, PRF, DC] # Generate log str log_str = '' pnames = list(ASTIM_params.keys()) j = 0 for cp in curr_params: pn = pnames[j] pi = ASTIM_params[pn] if not isinstance(cp, tuple): if log_str: log_str += ', ' log_str += '{} = {:.2f} {}'.format(pn, pi['factor'] * cp, pi['unit']) j += 1 # Get date and time info date_str = time.strftime("%Y.%m.%d") daytime_str = time.strftime("%H:%M:%S") # Log logger.info(ASTIM_titration_log, neuron.name, simcount, nsims, si_format(a, 1), log_str) # Run titration tstart = time.time() try: (output_thr, t, y, states, lat) = titrateAStim(solver, neuron, Fdrive, Adrive, tstim, toffset, PRF, DC) Z, ng, Qm, Vm, *channels = y U = np.insert(np.diff(Z) / np.diff(t), 0, 0.0) tcomp = time.time() - tstart logger.info('completed in %.2f s, threshold = %.2f %s', tcomp, output_thr * t_var['factor'], t_var['unit']) # Determine output variable if t_type == 'A': Adrive = output_thr elif t_type == 't': tstim = output_thr elif t_type == 'DC': DC = output_thr # Define output naming if DC == 1.0: simcode = ASTIM_CW_code.format(neuron.name, a * 1e9, Fdrive * 1e-3, Adrive * 1e-3, tstim * 1e3, int_method) else: simcode = ASTIM_PW_code.format(neuron.name, a * 1e9, Fdrive * 1e-3, Adrive * 1e-3, tstim * 1e3, PRF, DC * 1e2, int_method) # Store dataframe and metadata df = pd.DataFrame({'t': t, 'states': states, 'U': U, 'Z': Z, 'ng': ng, 'Qm': Qm, 'Vm': Vm}) for j in range(len(neuron.states_names)): df[neuron.states_names[j]] = channels[j] meta = {'neuron': neuron.name, 'a': solver.a, 'd': solver.d, 'Fdrive': Fdrive, 'Adrive': Adrive, 'phi': np.pi, 'tstim': tstim, 'toffset': toffset, 'PRF': PRF, 'DC': DC, 'tcomp': tcomp} # Export into to PKL file output_filepath = '{}/{}.pkl'.format(batch_dir, simcode) with open(output_filepath, 'wb') as fh: pickle.dump({'meta': meta, 'data': df}, fh) logger.debug('simulation data exported to "%s"', output_filepath) filepaths.append(output_filepath) # Detect spikes on Qm signal # n_spikes, lat, sr = detectSpikes(t, Qm, SPIKE_MIN_QAMP, SPIKE_MIN_DT) dt = t[1] - t[0] ipeaks, *_ = findPeaks(Qm, SPIKE_MIN_QAMP, int(np.ceil(SPIKE_MIN_DT / dt)), SPIKE_MIN_QPROM) n_spikes = ipeaks.size lat = t[ipeaks[0]] if n_spikes > 0 else 'N/A' sr = np.mean(1 / np.diff(t[ipeaks])) if n_spikes > 1 else 'N/A' logger.info('%u spike%s detected', n_spikes, "s" if n_spikes > 1 else "") # Export key metrics to log file log = { 'A': date_str, 'B': daytime_str, 'C': neuron.name, 'D': solver.a * 1e9, 'E': solver.d * 1e6, 'F': Fdrive * 1e-3, 'G': Adrive * 1e-3, 'H': tstim * 1e3, 'I': PRF * 1e-3 if DC < 1 else 'N/A', 'J': DC, 'K': int_method, 'L': t.size, 'M': round(tcomp, 2), 'N': n_spikes, 'O': lat * 1e3 if isinstance(lat, float) else 'N/A', 'P': sr * 1e-3 if isinstance(sr, float) else 'N/A' } if xlslog(log_filepath, 'Data', log) == 1: logger.info('log exported to "%s"', log_filepath) else: logger.error('log export to "%s" aborted', log_filepath) except (Warning, AssertionError) as inst: logger.warning('Integration error: %s. Continue batch? (y/n)', extra={inst}) user_str = input() if user_str not in ['y', 'Y']: return filepaths return filepaths def computeSpikeMetrics(filenames): ''' Analyze the charge density profile from a list of files and compute for each one of them the following spiking metrics: - latency (ms) - firing rate mean and standard deviation (Hz) - spike amplitude mean and standard deviation (nC/cm2) - spike width mean and standard deviation (ms) :param filenames: list of files to analyze :return: a dataframe with the computed metrics ''' # Initialize metrics dictionaries keys = [ 'latencies (ms)', 'mean firing rates (Hz)', 'std firing rates (Hz)', 'mean spike amplitudes (nC/cm2)', 'std spike amplitudes (nC/cm2)', 'mean spike widths (ms)', 'std spike widths (ms)' ] metrics = {k: [] for k in keys} # Compute spiking metrics for fname in filenames: # Load data from file logger.debug('loading data from file "{}"'.format(fname)) with open(fname, 'rb') as fh: frame = pickle.load(fh) df = frame['data'] meta = frame['meta'] tstim = meta['tstim'] t = df['t'].values Qm = df['Qm'].values dt = t[1] - t[0] # Detect spikes on charge profile mpd = int(np.ceil(SPIKE_MIN_DT / dt)) ispikes, prominences, widths, _ = findPeaks(Qm, SPIKE_MIN_QAMP, mpd, SPIKE_MIN_QPROM) widths *= dt if ispikes.size > 0: # Compute latency latency = t[ispikes[0]] # Select prior-offset spikes ispikes_prior = ispikes[t[ispikes] < tstim] else: latency = np.nan ispikes_prior = np.array([]) # Compute spikes widths and amplitude if ispikes_prior.size > 0: widths_prior = widths[:ispikes_prior.size] prominences_prior = prominences[:ispikes_prior.size] else: widths_prior = np.array([np.nan]) prominences_prior = np.array([np.nan]) # Compute inter-spike intervals and firing rates if ispikes_prior.size > 1: ISIs_prior = np.diff(t[ispikes_prior]) FRs_prior = 1 / ISIs_prior else: ISIs_prior = np.array([np.nan]) FRs_prior = np.array([np.nan]) # Log spiking metrics logger.debug('%u spikes detected (%u prior to offset)', ispikes.size, ispikes_prior.size) logger.debug('latency: %.2f ms', latency * 1e3) logger.debug('average spike width within stimulus: %.2f +/- %.2f ms', np.nanmean(widths_prior) * 1e3, np.nanstd(widths_prior) * 1e3) logger.debug('average spike amplitude within stimulus: %.2f +/- %.2f nC/cm2', np.nanmean(prominences_prior) * 1e5, np.nanstd(prominences_prior) * 1e5) logger.debug('average ISI within stimulus: %.2f +/- %.2f ms', np.nanmean(ISIs_prior) * 1e3, np.nanstd(ISIs_prior) * 1e3) logger.debug('average FR within stimulus: %.2f +/- %.2f Hz', np.nanmean(FRs_prior), np.nanstd(FRs_prior)) # Complete metrics dictionaries metrics['latencies (ms)'].append(latency * 1e3) metrics['mean firing rates (Hz)'].append(np.mean(FRs_prior)) metrics['std firing rates (Hz)'].append(np.std(FRs_prior)) metrics['mean spike amplitudes (nC/cm2)'].append(np.mean(prominences_prior) * 1e5) metrics['std spike amplitudes (nC/cm2)'].append(np.std(prominences_prior) * 1e5) metrics['mean spike widths (ms)'].append(np.mean(widths_prior) * 1e3) metrics['std spike widths (ms)'].append(np.std(widths_prior) * 1e3) # Return dataframe with metrics return pd.DataFrame(metrics, columns=metrics.keys()) def getCycleProfiles(a, f, A, Cm0, Qm0, Qm): ''' Run a mechanical simulation until periodic stabilization, and compute pressure profiles over the last acoustic cycle. :param a: in-plane diameter of the sonophore structure within the membrane (m) :param f: acoustic drive frequency (Hz) :param A: acoustic drive amplitude (Pa) :param Cm0: membrane resting capacitance (F/m2) :param Qm0: membrane resting charge density (C/m2) :param Qm: imposed membrane charge density (C/m2) :return: a dataframe with the time, kinematic and pressure profiles over the last cycle. ''' # Create sonophore object bls = BilayerSonophore(a, f, Cm0, Qm0) # Run default simulation and compute relevant profiles logger.info('Running mechanical simulation (a = %sm, f = %sHz, A = %sPa)', si_format(a, 1), si_format(f, 1), si_format(A, 1)) t, y, _ = bls.run(f, A, Qm, Pm_comp_method=PmCompMethod.direct) dt = (t[-1] - t[0]) / (t.size - 1) Z, ng = y[:, -NPC_FULL:] t = t[-NPC_FULL:] t -= t[0] logger.info('Computing pressure cyclic profiles') R = bls.v_curvrad(Z) U = np.diff(Z) / dt U = np.hstack((U, U[-1])) data = { 't': t, 'Z': Z, 'Cm': bls.v_Capct(Z), 'P_M': bls.v_PMavg(Z, R, bls.surface(Z)), 'P_Q': bls.Pelec(Z, Qm), 'P_{VE}': bls.PEtot(Z, R) + bls.PVleaflet(U, R), 'P_V': bls.PVfluid(U, R), 'P_G': bls.gasmol2Pa(ng, bls.volume(Z)), 'P_0': - np.ones(Z.size) * bls.P0 } return pd.DataFrame(data, columns=data.keys()) def runSweepSA(bls, f, A, Qm, params, rel_sweep): ''' Run mechanical simulations while varying multiple model parameters around their default value, and compute the relative changes in cycle-averaged sonophore membrane potential over the last acoustic period upon periodic stabilization. :param bls: BilayerSonophore object :param f: acoustic drive frequency (Hz) :param A: acoustic drive amplitude (Pa) :param Qm: imposed membrane charge density (C/m2) :param params: list of model parameters to explore :param rel_sweep: array of relative parameter changes :return: a dataframe with the cycle-averaged sonophore membrane potentials for the parameter variations, for each parameter. ''' nsweep = len(rel_sweep) logger.info('Starting sensitivity analysis (%u parameters, sweep size = %u)', len(params), nsweep) t0 = time.time() # Run default simulation and compute cycle-averaged membrane potential _, y, _ = bls.run(f, A, Qm, Pm_comp_method=PmCompMethod.direct) Z = y[0, -NPC_FULL:] Cm = bls.v_Capct(Z) # F/m2 Vmavg_default = np.mean(Qm / Cm) * 1e3 # mV # Create data dictionary for computed output changes data = {'relative input change': rel_sweep - 1} nsims = len(params) * nsweep for j, p in enumerate(params): default = getattr(bls, p) sweep = rel_sweep * default Vmavg = np.empty(nsweep) logger.info('Computing system\'s sentitivty to %s (default = %.2e)', p, default) for i, val in enumerate(sweep): # Re-initialize BLS object with modififed attribute setattr(bls, p, val) bls.reinit() # Run simulation and compute cycle-averaged membrane potential _, y, _ = bls.run(f, A, Qm, Pm_comp_method=PmCompMethod.direct) Z = y[0, -NPC_FULL:] Cm = bls.v_Capct(Z) # F/m2 Vmavg[i] = np.mean(Qm / Cm) * 1e3 # mV logger.info('simulation %u/%u: %s = %.2e (%+.1f %%) --> |Vm| = %.1f mV (%+.3f %%)', j * nsweep + i + 1, nsims, p, val, (val - default) / default * 1e2, Vmavg[i], (Vmavg[i] - Vmavg_default) / Vmavg_default * 1e2) # Fill in data dictionary data[p] = Vmavg # Set parameter back to default setattr(bls, p, default) tcomp = time.time() - t0 logger.info('Sensitivity analysis susccessfully completed in %.0f s', tcomp) # return pandas dataframe return pd.DataFrame(data, columns=data.keys()) def getActivationMap(root, neuron, a, f, tstim, toffset, PRF, amps, DCs): ''' Compute the activation map of a neuron at a given frequency and PRF, by computing the spiking metrics of simulation results over a 2D space (amplitude x duty cycle). :param root: directory containing the input data files :param neuron: neuron name :param a: sonophore diameter :param f: acoustic drive frequency (Hz) :param tstim: duration of US stimulation (s) :param toffset: duration of the offset (s) :param PRF: pulse repetition frequency (Hz) :param amps: vector of acoustic amplitudes (Pa) :param DCs: vector of duty cycles (-) :return the activation matrix ''' # Initialize activation map actmap = np.empty((amps.size, DCs.size)) # Loop through amplitudes and duty cycles nfiles = DCs.size * amps.size for i, A in enumerate(amps): for j, DC in enumerate(DCs): # Define filename PW_str = '_PRF{:.2f}Hz_DC{:.2f}%'.format(PRF, DC * 1e2) if DC < 1 else '' fname = ('ASTIM_{}_{}W_{:.0f}nm_{:.0f}kHz_{:.1f}kPa_{:.0f}ms{}_effective.pkl' .format(neuron, 'P' if DC < 1 else 'C', a * 1e9, f * 1e-3, A * 1e-3, tstim * 1e3, PW_str)) # Extract charge profile from data file fpath = os.path.join(root, fname) if os.path.isfile(fpath): logger.debug('Loading file {}/{}: "{}"'.format(i * amps.size + j + 1, nfiles, fname)) with open(fpath, 'rb') as fh: frame = pickle.load(fh) df = frame['data'] meta = frame['meta'] tstim = meta['tstim'] t = df['t'].values Qm = df['Qm'].values dt = t[1] - t[0] # Detect spikes on charge profile during stimulus mpd = int(np.ceil(SPIKE_MIN_DT / dt)) ispikes, *_ = findPeaks(Qm[t <= tstim], SPIKE_MIN_QAMP, mpd, SPIKE_MIN_QPROM) # Compute firing metrics if ispikes.size == 0: # if no spike, assign -1 actmap[i, j] = -1 elif ispikes.size == 1: # if only 1 spike, assign 0 actmap[i, j] = 0 else: # if more than 1 spike, assign firing rate FRs = 1 / np.diff(t[ispikes]) actmap[i, j] = np.mean(FRs) else: logger.error('"{}" file not found'.format(fname)) actmap[i, j] = np.nan return actmap def getMaxMap(key, root, neuron, a, f, tstim, toffset, PRF, amps, DCs, mode='max', cavg=False): ''' Compute the max. value map of a neuron's specific variable at a given frequency and PRF over a 2D space (amplitude x duty cycle). :param key: the variable name to find in the simulations dataframes :param root: directory containing the input data files :param neuron: neuron name :param a: sonophore diameter :param f: acoustic drive frequency (Hz) :param tstim: duration of US stimulation (s) :param toffset: duration of the offset (s) :param PRF: pulse repetition frequency (Hz) :param amps: vector of acoustic amplitudes (Pa) :param DCs: vector of duty cycles (-) :param mode: string indicating whether to search for maximum, minimum or absolute maximum :return the maximum matrix ''' # Initialize max map maxmap = np.empty((amps.size, DCs.size)) # Loop through amplitudes and duty cycles nfiles = DCs.size * amps.size for i, A in enumerate(amps): for j, DC in enumerate(DCs): # Define filename PW_str = '_PRF{:.2f}Hz_DC{:.2f}%'.format(PRF, DC * 1e2) if DC < 1 else '' fname = ('ASTIM_{}_{}W_{:.0f}nm_{:.0f}kHz_{:.1f}kPa_{:.0f}ms{}_effective.pkl' .format(neuron, 'P' if DC < 1 else 'C', a * 1e9, f * 1e-3, A * 1e-3, tstim * 1e3, PW_str)) # Extract charge profile from data file fpath = os.path.join(root, fname) if os.path.isfile(fpath): logger.debug('Loading file {}/{}: "{}"'.format(i * amps.size + j + 1, nfiles, fname)) with open(fpath, 'rb') as fh: frame = pickle.load(fh) df = frame['data'] t = df['t'].values if key in df: x = df[key].values else: x = eval(key) if cavg: x = getCycleAverage(t, x, 1 / PRF) if mode == 'min': maxmap[i, j] = x.min() elif mode == 'max': maxmap[i, j] = x.max() elif mode == 'absmax': maxmap[i, j] = np.abs(x).max() else: maxmap[i, j] = np.nan return maxmap def computeAStimLookups(neuron, a, freqs, amps, phi=np.pi, multiprocess=False): ''' Run simulations of the mechanical system for a multiple combinations of imposed US frequencies, acoustic amplitudes and charge densities, compute effective coefficients and store them in a dictionary of 3D arrays. :param neuron: neuron object :param freqs: array of acoustic drive frequencies (Hz) :param amps: array of acoustic drive amplitudes (Pa) :param phi: acoustic drive phase (rad) :param multiprocess: boolean statting wether or not to use multiprocessing :return: lookups dictionary ''' # Check validity of input parameters if not isinstance(neuron, BaseMech): raise InputError('Invalid neuron type: "{}" (must inherit from BaseMech class)' .format(neuron.name)) if not isinstance(freqs, np.ndarray): if isinstance(freqs, list): if not all(isinstance(x, float) for x in freqs): raise InputError('Invalid frequencies (must all be float typed)') freqs = np.array(freqs) else: raise InputError('Invalid frequencies (must be provided as list or numpy array)') if not isinstance(amps, np.ndarray): if isinstance(amps, list): if not all(isinstance(x, float) for x in amps): raise InputError('Invalid amplitudes (must all be float typed)') amps = np.array(amps) else: raise InputError('Invalid amplitudes (must be provided as list or numpy array)') nf = freqs.size nA = amps.size if nf == 0: raise InputError('Empty frequencies array') if nA == 0: raise InputError('Empty amplitudes array') if freqs.min() <= 0: raise InputError('Invalid US driving frequencies (must all be strictly positive)') if amps.min() < 0: raise InputError('Invalid US pressure amplitudes (must all be positive or null)') logger.info('Starting batch lookup creation for %s neuron', neuron.name) t0 = time.time() # Initialize BLS object bls = BilayerSonophore(a, 0.0, neuron.Cm0, neuron.Cm0 * neuron.Vm0 * 1e-3) # Create neuron-specific charge vector charges = np.arange(neuron.Qbounds[0], neuron.Qbounds[1] + 1e-5, 1e-5) # C/m2 nQ = charges.size dims = (nf, nA, nQ) # Initialize lookup dictionary of 3D array to store effective coefficients coeffs_names = ['V', 'ng', *neuron.coeff_names] ncoeffs = len(coeffs_names) lookup_dict = {cn: np.empty(dims) for cn in coeffs_names} # Initiate multipleprocessing objects if needed if multiprocess: mp.freeze_support() # Create tasks and results queues tasks = mp.JoinableQueue() results = mp.Queue() # Create and start consumer processes nconsumers = mp.cpu_count() consumers = [Consumer(tasks, results) for i in range(nconsumers)] for w in consumers: w.start() # Loop through all (f, A, Q) combinations nsims = np.prod(np.array(dims)) for i in range(nf): for j in range(nA): for k in range(nQ): wid = i * (nA * nQ) + j * nQ + k worker = LookupWorker(wid, bls, neuron, freqs[i], amps[j], charges[k], phi, nsims) if multiprocess: tasks.put(worker, block=False) else: logger.info('%s', worker) _, Qcoeffs = worker.__call__() for icoeff in range(ncoeffs): lookup_dict[coeffs_names[icoeff]][i, j, k] = Qcoeffs[icoeff] if multiprocess: # Stop processes for i in range(nconsumers): tasks.put(None, block=False) tasks.join() # Retrieve workers output for x in range(nsims): wid, Qcoeffs = results.get() i, j, k = nDindexes(dims, wid) for icoeff in range(ncoeffs): lookup_dict[coeffs_names[icoeff]][i, j, k] = Qcoeffs[icoeff] # Close tasks and results queues tasks.close() results.close() # Add input frequency, amplitude and charge arrays to lookup dictionary lookup_dict['f'] = freqs # Hz lookup_dict['A'] = amps # Pa lookup_dict['Q'] = charges # C/m2 logger.info('%s lookups computed in %.0f s', neuron.name, time.time() - t0) return lookup_dict diff --git a/sim/batch_MECH.py b/sim/batch_MECH.py index 9fee6a5..7825cb4 100644 --- a/sim/batch_MECH.py +++ b/sim/batch_MECH.py @@ -1,54 +1,71 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Date: 2016-11-21 10:46:56 # @Email: theo.lemaire@epfl.ch # @Last Modified by: Theo Lemaire -# @Last Modified time: 2018-06-13 19:21:36 +# @Last Modified time: 2018-07-23 14:39:27 """ Run batch simulations of the NICE mechanical model with imposed charge densities """ import sys import os import logging import numpy as np +from argparse import ArgumentParser from PointNICE.utils import logger, InputError from PointNICE.solvers import setBatchDir, checkBatchLog, runMechBatch from PointNICE.neurons import * from PointNICE.plt import plotBatch -# Set logging level -logger.setLevel(logging.DEBUG) a = 32e-9 # in-plane diameter (m) # Electrical properties of the membrane neuron = CorticalRS() Cm0 = neuron.Cm0 Qm0 = neuron.Vm0 * 1e-5 -# Cm0 = 1e-2 # membrane resting capacitance (F/m2) -# Qm0 = -80e-5 # membrane resting charge density (C/m2) # Stimulation parameters stim_params = { 'freqs': [500e3], # Hz 'amps': [50e3], # Pa - 'charges': np.linspace(-72, 0, 5) * 1e-5 # C/m2 + 'charges': np.linspace(-72, 0, 3) * 1e-5 # C/m2 } -# Select output directory -try: - batch_dir = setBatchDir() - log_filepath, _ = checkBatchLog(batch_dir, 'MECH') +if __name__ == '__main__': - # Run MECH batch - pkl_filepaths = runMechBatch(batch_dir, log_filepath, Cm0, Qm0, stim_params, a) - pkl_dir, _ = os.path.split(pkl_filepaths[0]) + # Define argument parser + ap = ArgumentParser() + ap.add_argument('-m', '--multiprocessing', default=False, action='store_true', + help='Use multiprocessing') + ap.add_argument('-v', '--verbose', default=False, action='store_true', + help='Increase verbosity') + ap.add_argument('-p', '--plot', default=False, action='store_true', + help='Plot results') - # Plot resulting profiles - plotBatch(pkl_dir, pkl_filepaths) + # Parse arguments + args = ap.parse_args() + if args.verbose: + logger.setLevel(logging.DEBUG) + else: + logger.setLevel(logging.INFO) -except InputError as err: - logger.error(err) - sys.exit(1) + try: + # Select output directory + batch_dir = setBatchDir() + log_filepath, _ = checkBatchLog(batch_dir, 'MECH') + + # Run MECH batch + pkl_filepaths = runMechBatch(batch_dir, log_filepath, Cm0, Qm0, stim_params, a, + multiprocess=args.multiprocessing) + pkl_dir, _ = os.path.split(pkl_filepaths[0]) + + # Plot resulting profiles + if args.plot: + plotBatch(pkl_dir, pkl_filepaths) + + except InputError as err: + logger.error(err) + sys.exit(1) diff --git a/sim/run_ESTIM.py b/sim/run_ESTIM.py index 5c06e75..fc92185 100644 --- a/sim/run_ESTIM.py +++ b/sim/run_ESTIM.py @@ -1,95 +1,93 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Date: 2017-02-13 18:16:09 # @Email: theo.lemaire@epfl.ch # @Last Modified by: Theo Lemaire -# @Last Modified time: 2018-07-23 13:28:04 +# @Last Modified time: 2018-07-23 13:56:40 """ Script to run ESTIM simulations from command line. """ import sys import os import logging from argparse import ArgumentParser from PointNICE.utils import logger, getNeuronsDict, InputError from PointNICE.solvers import checkBatchLog, SolverElec, EStimWorker from PointNICE.plt import plotBatch # Default parameters default = { 'neuron': 'RS', 'A': 10.0, # mA/m2 't': 150.0, # ms 'off': 20.0, # ms 'PRF': 100.0, # Hz 'DC': 100.0 # % } def main(): # Define argument parser ap = ArgumentParser() # ASTIM parameters ap.add_argument('-n', '--neuron', type=str, default=default['neuron'], help='Neuron name (string)') ap.add_argument('-A', '--amplitude', type=float, default=default['A'], help='Stimulus amplitude (mA/m2)') ap.add_argument('-t', '--duration', type=float, default=default['t'], help='Stimulus duration (ms)') ap.add_argument('--offset', type=float, default=default['off'], help='Offset duration (ms)') ap.add_argument('--PRF', type=float, default=default['PRF'], help='PRF (Hz)') ap.add_argument('--DC', type=float, default=default['DC'], help='Duty cycle (%%)') ap.add_argument('-o', '--outputdir', type=str, default=os.getcwd(), help='Output directory') # Boolean arguments ap.add_argument('-v', '--verbose', default=False, action='store_true', help='Increase verbosity') ap.add_argument('-p', '--plot', default=False, action='store_true', help='Plot results') # Parse arguments args = ap.parse_args() neuron_str = args.neuron Astim = args.amplitude # mA/m2 tstim = args.duration * 1e-3 # s toffset = args.offset * 1e-3 # s PRF = args.PRF # Hz DC = args.DC * 1e-2 output_dir = args.outputdir if args.verbose: logger.setLevel(logging.DEBUG) else: logger.setLevel(logging.INFO) try: if neuron_str not in getNeuronsDict(): raise InputError('Unknown neuron type: "{}"'.format(neuron_str)) log_filepath, _ = checkBatchLog(output_dir, 'E-STIM') neuron = getNeuronsDict()[neuron_str]() worker = EStimWorker(1, output_dir, log_filepath, SolverElec(), neuron, Astim, tstim, toffset, PRF, DC, 1) logger.info('%s', worker) outfile = worker.__call__() - logger.info('Finished') - logger.info('Finished') if args.plot: plotBatch(output_dir, [outfile]) except InputError as err: logger.error(err) sys.exit(1) if __name__ == '__main__': main() diff --git a/sim/run_MECH.py b/sim/run_MECH.py index 7ea62ad..82a85c0 100644 --- a/sim/run_MECH.py +++ b/sim/run_MECH.py @@ -1,96 +1,94 @@ # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Date: 2018-03-15 18:33:59 # @Last Modified by: Theo Lemaire -# @Last Modified time: 2018-05-03 12:13:26 +# @Last Modified time: 2018-07-23 14:30:27 """ Script to run MECH simulations from command line. """ import sys import os import logging from argparse import ArgumentParser -from PointNICE.utils import logger, InputError, si_format +from PointNICE.utils import logger, InputError from PointNICE.bls import BilayerSonophore -from PointNICE.solvers import checkBatchLog, runMech +from PointNICE.solvers import checkBatchLog, MechWorker from PointNICE.plt import plotBatch # Default parameters default = { 'a': 32.0, # nm 'd': 0.0, # um 'f': 500.0, # kHz 'A': 100.0, # kPa 'Cm0': 1.0, # uF/cm2 'Qm0': 0.0, # nC/cm2 'Qm': 0.0, # nC/cm2 } def main(): # Define argument parser ap = ArgumentParser() # ASTIM parameters ap.add_argument('-a', '--diameter', type=float, default=default['a'], help='Sonophore diameter (nm)') ap.add_argument('-d', '--embedding', type=float, default=default['d'], help='Embedding depth (um)') ap.add_argument('-f', '--frequency', type=float, default=default['f'], help='Acoustic drive frequency (kHz)') ap.add_argument('-A', '--amplitude', type=float, default=default['A'], help='Acoustic pressure amplitude (kPa)') ap.add_argument('-Cm0', '--restcapct', type=float, default=default['Cm0'], help='Membrane resting capacitance (uF/cm2)') ap.add_argument('-Qm0', '--restcharge', type=float, default=default['Qm0'], help='Membrane resting charge density (nC/cm2)') ap.add_argument('-Qm', '--charge', type=float, default=default['Qm'], help='Applied charge density (nC/cm2)') ap.add_argument('-o', '--outputdir', type=str, default=os.getcwd(), help='Output directory') # Boolean parameters ap.add_argument('-v', '--verbose', default=False, action='store_true', help='Increase verbosity') ap.add_argument('-p', '--plot', default=False, action='store_true', help='Plot results') # Parse arguments args = ap.parse_args() a = args.diameter * 1e-9 # m d = args.embedding * 1e-6 # m Fdrive = args.frequency * 1e3 # Hz Adrive = args.amplitude * 1e3 # Pa Cm0 = args.restcapct * 1e-2 # F/m2 Qm0 = args.restcharge * 1e-5 # C/m2 Qm = args.charge * 1e-5 # C/m2 output_dir = args.outputdir if args.verbose: logger.setLevel(logging.DEBUG) else: logger.setLevel(logging.INFO) try: log_filepath, _ = checkBatchLog(output_dir, 'MECH') - logger.info('Running MECH simulation: a = %sm, f = %sHz, A = %sPa, Cm0 = %sF/cm2, ' - 'Qm0 = %sC/cm2, Qm0 = %sC/cm2', si_format(a, 1), si_format(Fdrive, 1), - si_format(Adrive, 2), si_format(Cm0 * 1e-4), si_format(Qm0 * 1e-4), - si_format(Qm * 1e-4)) - bls = BilayerSonophore(a, Fdrive, Cm0, Qm0, d) - outfile = runMech(output_dir, log_filepath, bls, Fdrive, Adrive, Qm) + worker = MechWorker(1, output_dir, log_filepath, BilayerSonophore(a, Fdrive, Cm0, Qm0, d), + Fdrive, Adrive, Qm, 1) + logger.info('%s', worker) + outfile = worker.__call__() logger.info('Finished') if args.plot: plotBatch(output_dir, [outfile]) except InputError as err: logger.error(err) sys.exit(1) if __name__ == '__main__': main()