diff --git a/PointNICE/bls.py b/PointNICE/bls.py index 1387fef..9ea2d68 100644 --- a/PointNICE/bls.py +++ b/PointNICE/bls.py @@ -1,615 +1,617 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Date: 2016-09-29 16:16:19 # @Email: theo.lemaire@epfl.ch # @Last Modified by: Theo Lemaire -# @Last Modified time: 2017-09-06 16:26:53 +# @Last Modified time: 2017-09-10 17:13:17 import logging import warnings import numpy as np import scipy.integrate as integrate from scipy.optimize import brentq, curve_fit from .utils import * from .constants import * # Get package logger logger = logging.getLogger('PointNICE') class BilayerSonophore: """ This class contains the geometric and mechanical parameters of the Bilayer Sonophore Model, as well as all the core functions needed to compute the dynamics (kinetics and kinematics) of the bilayer membrane cavitation, and run dynamic BLS simulations. """ def __init__(self, geom, params, Fdrive, Cm0, Qm0): """ Constructor of the class. :param geom: BLS geometric constants dictionary :param params: BLS biomechanical and biophysical parameters dictionary :param Fdrive: frequency of acoustic perturbation (Hz) :param Cm0: membrane resting capacitance (F/m2) :param Qm0: membrane resting charge density (C/m2) """ logger.debug('%.1f nm BLS initialization at %.2f kHz, %.2f nC/cm2', geom['a'] * 1e9, Fdrive * 1e-3, Qm0 * 1e5) # Assign biomechanical and biophysical parameters as direct class attributes for key, value in params["biomech"].items(): setattr(self, key, value) for key, value in params["biophys"].items(): setattr(self, key, value) # Extract resting constants and geometry self.Cm0 = Cm0 self.Qm0 = Qm0 self.a = geom['a'] self.d = geom['d'] self.S0 = np.pi * self.a**2 # Derive frequency-dependent tissue elastic modulus G_tissue = self.alpha * Fdrive # G'' (Pa) self.kA_tissue = 2 * G_tissue * self.d # kA of the tissue layer (N/m) # Check existence of lookups for derived parameters lookups = get_BLS_lookups(geom['a']) Qkey = '{:.2f}'.format(Qm0 * 1e5) # If no lookup, compute parameters and store them in lookup if not lookups or Qkey not in lookups: # Find Delta that cancels out Pm + Pec at Z = 0 (m) if self.Qm0 == 0.0: self.Delta = self.Delta_ else: (D_eq, Pnet_eq) = self.findDeltaEq(self.Qm0) assert Pnet_eq < PNET_EQ_MAX, 'High Pnet at Z = 0 with ∆ = %.2f nm' % (D_eq * 1e9) self.Delta = D_eq # Find optimal Lennard-Jones parameters to approximate PMavg (LJ_approx, std_err, _) = self.LJfitPMavg() assert std_err < PMAVG_STD_ERR_MAX, 'High error in PmAvg nonlinear fit:'\ ' std_err = %.2f Pa' % std_err self.LJ_approx = LJ_approx lookups[Qkey] = {'LJ_approx': LJ_approx, 'Delta_eq': D_eq} logger.debug('Saving BLS derived parameters to lookup file') save_BLS_lookups(geom['a'], lookups) # If lookup exists, load parameters from it else: logger.debug('Loading BLS derived parameters from lookup file') self.LJ_approx = lookups[Qkey]['LJ_approx'] self.Delta = lookups[Qkey]['Delta_eq'] # Compute initial volume and gas content self.V0 = np.pi * self.Delta * self.a**2 self.ng0 = self.gasPa2mol(self.P0, self.V0) def curvrad(self, Z): """ Return the (signed) instantaneous curvature radius of the leaflet. :param Z: leaflet apex outward deflection value (m) :return: leaflet curvature radius (m) """ if Z == 0.0: return np.inf else: return (self.a**2 + Z**2) / (2 * Z) def surface(self, Z): """ Return the surface area of the stretched leaflet (spherical cap). :param Z: leaflet apex outward deflection value (m) :return: surface of the stretched leaflet (m^2) """ return np.pi * (self.a**2 + Z**2) def volume(self, Z): """ Return the total volume of the inter-leaflet space (cylinder +/- spherical cap). :param Z: leaflet apex outward deflection value (m) :return: inner volume of the bilayer sonophore structure (m^3) """ return np.pi * self.a**2 * self.Delta\ * (1 + (Z / (3 * self.Delta) * (3 + Z**2 / self.a**2))) def arealstrain(self, Z): """ Compute the areal strain of the stretched leaflet. epsilon = (S - S0)/S0 = (Z/a)^2 :param Z: leaflet apex outward deflection value (m) :return: areal strain (dimensionless) """ return (Z / self.a)**2 def Capct(self, Z): """ Compute the membrane capacitance per unit area, under the assumption of parallel-plate capacitor with average inter-layer distance. :param Z: leaflet apex outward deflection value (m) :return: capacitance per unit area (F/m2) """ if Z == 0.0: return self.Cm0 else: return ((self.Cm0 * self.Delta / self.a**2) * (Z + (self.a**2 - Z**2 - Z * self.Delta) / (2 * Z) * np.log((2 * Z + self.Delta) / self.Delta))) def derCapct(self, Z, U): """ Compute the derivative of the membrane capacitance per unit area with respect to time, under the assumption of parallel-plate capacitor. :param Z: leaflet apex outward deflection value (m) :param U: leaflet apex outward deflection velocity (m/s) :return: derivative of capacitance per unit area (F/m2.s) """ dCmdZ = ((self.Cm0 * self.Delta / self.a**2) * ((Z**2 + self.a**2) / (Z * (2 * Z + self.Delta)) - ((Z**2 + self.a**2) * np.log((2 * Z + self.Delta) / self.Delta)) / (2 * Z**2))) return dCmdZ * U def localdef(self, r, Z, R): """ Compute the (signed) local transverse leaflet deviation at a distance r from the center of the dome. :param r: in-plane distance from center of the sonophore (m) :param Z: leaflet apex outward deflection value (m) :param R: leaflet curvature radius (m) :return: local transverse leaflet deviation (m) """ if np.abs(Z) == 0.0: return 0.0 else: return np.sign(Z) * (np.sqrt(R**2 - r**2) - np.abs(R) + np.abs(Z)) def Pacoustic(self, t, Adrive, Fdrive, phi=np.pi): """ Compute the acoustic pressure at a specific time, given the amplitude, frequency and phase of the acoustic stimulus. :param t: time of interest :param Adrive: acoustic drive amplitude (Pa) :param Fdrive: acoustic drive frequency (Hz) :param phi: acoustic drive phase (rad) """ return Adrive * np.sin(2 * np.pi * Fdrive * t - phi) def PMlocal(self, r, Z, R): """ Compute the local intermolecular pressure. :param r: in-plane distance from center of the sonophore (m) :param Z: leaflet apex outward deflection value (m) :param R: leaflet curvature radius (m) :return: local intermolecular pressure (Pa) """ z = self.localdef(r, Z, R) relgap = (2 * z + self.Delta) / self.Delta_ return self.pDelta * ((1 / relgap)**self.m - (1 / relgap)**self.n) def PMavg(self, Z, R, S): """ Compute the average intermolecular pressure felt across the leaflet by quadratic integration. :param Z: leaflet apex outward deflection value (m) :param R: leaflet curvature radius (m) :param S: surface of the stretched leaflet (m^2) :return: averaged intermolecular resultant pressure across the leaflet (Pa) .. warning:: quadratic integration is computationally expensive. """ # Intermolecular force over an infinitely thin ring of radius r fMring = lambda r, Z, R: 2 * np.pi * r * self.PMlocal(r, Z, R) # Integrate from 0 to a fTotal, _ = integrate.quad(fMring, 0, self.a, args=(Z, R)) return fTotal / S def LJfitPMavg(self): """ Determine optimal parameters of a Lennard-Jones expression approximating the average intermolecular pressure. These parameters are obtained by a nonlinear fit of the Lennard-Jones function for a range of deflection values between predetermined Zmin and Zmax. :return: 3-tuple with optimized LJ parameters for PmAvg prediction (Map) and the standard and max errors of the prediction in the fitting range (in Pascals) """ # Determine lower bound of deflection range: when Pm = Pmmax PMmax = LJFIT_PM_MAX # Pa Zminlb = -0.49 * self.Delta Zminub = 0.0 f = lambda Z, Pmmax: self.PMavg(Z, self.curvrad(Z), self.surface(Z)) - PMmax Zmin = brentq(f, Zminlb, Zminub, args=(PMmax), xtol=1e-16) # Create vectors for geometric variables Zmax = 2 * self.a Z = np.arange(Zmin, Zmax, 1e-11) Pmavg = np.array([self.PMavg(ZZ, self.curvrad(ZZ), self.surface(ZZ)) for ZZ in Z]) # Compute optimal nonlinear fit of custom LJ function with initial guess x0_guess = 2e-9 C_guess = 1e4 nrep_guess = 5.0 nattr_guess = 3.0 pguess = (x0_guess, C_guess, nrep_guess, nattr_guess) popt, _ = curve_fit(lambda x, x0, C, nrep, nattr: LennardJones(x, self.Delta, x0, C, nrep, nattr), Z, Pmavg, p0=pguess, maxfev=10000) (x0_opt, C_opt, nrep_opt, nattr_opt) = popt Pmavg_fit = LennardJones(Z, self.Delta, x0_opt, C_opt, nrep_opt, nattr_opt) # Compute prediction error residuals = Pmavg - Pmavg_fit ss_res = np.sum(residuals**2) N = residuals.size std_err = np.sqrt(ss_res / N) max_err = max(np.abs(residuals)) logger.debug('LJ approx: x0 = %.2f nm, C = %.2f kPa, m = %.2f, n = %.2f', x0_opt * 1e9, C_opt * 1e-3, nrep_opt, nattr_opt) LJ_approx = {"x0": x0_opt, "C": C_opt, "nrep": nrep_opt, "nattr": nattr_opt} return (LJ_approx, std_err, max_err) def PMavgpred(self, Z): """ Return the predicted intermolecular pressure based on a specific Lennard-Jones function fitted on the deflection physiological range. :param Z: leaflet apex outward deflection value (m) :return: predicted average intermolecular pressure (Pa) """ return LennardJones(Z, self.Delta, self.LJ_approx['x0'], self.LJ_approx['C'], self.LJ_approx['nrep'], self.LJ_approx['nattr']) def Pelec(self, Z, Qm): """ Compute the electric equivalent pressure term. :param Z: leaflet apex outward deflection value (m) :param Qm: membrane charge density (C/m2) :return: electric equivalent pressure (Pa) """ relS = self.S0 / self.surface(Z) abs_perm = self.epsilon0 * self.epsilonR # F/m return -relS * Qm**2 / (2 * abs_perm) # Pa def findDeltaEq(self, Qm): """ Compute the Delta that cancels out the (Pm + Pec) equation at Z = 0 for a given membrane charge density, using the Brent method to refine the pressure root iteratively. :param Qm: membrane charge density (C/m2) :return: equilibrium value (m) and associated pressure (Pa) """ f = lambda Delta: (self.pDelta * ((self.Delta_ / Delta)**self.m - (self.Delta_ / Delta)**self.n) + self.Pelec(0.0, Qm)) Delta_lb = 0.1 * self.Delta_ Delta_ub = 2.0 * self.Delta_ Delta_eq = brentq(f, Delta_lb, Delta_ub, xtol=1e-16) logger.debug('∆eq = %.2f nm', Delta_eq * 1e9) return (Delta_eq, f(Delta_eq)) def gasflux(self, Z, P): """ Compute the gas molar flux through the BLS boundary layer for an unsteady system. :param Z: leaflet apex outward deflection value (m) :param P: internal gas pressure in the inter-leaflet space (Pa) :return: gas molar flux (mol/s) """ dC = self.C0 - P / self.kH return 2 * self.surface(Z) * self.Dgl * dC / self.xi def gasmol2Pa(self, ng, V): """ Compute the gas pressure in the inter-leaflet space for an unsteady system, from the value of gas molar content. :param ng: internal molar content (mol) :param V: inner volume of the bilayer sonophore structure (m^3) :return: internal gas pressure (Pa) """ return ng * self.Rg * self.T / V def gasPa2mol(self, P, V): """ Compute the gas molar content in the inter-leaflet space for an unsteady system, from the value of internal gas pressure. :param P: internal gas pressure in the inter-leaflet space (Pa) :param V: inner volume of the bilayer sonophore structure (m^3) :return: internal gas molar content (mol) """ return P * V / (self.Rg * self.T) def PtotQS(self, Z, ng, Qm, Pac, Pm_comp_method): """ Compute the balance pressure of the quasi-steady system, upon application of an external perturbation on a charged membrane: Ptot = Pm + Pg + Pec - P0 - Pac. :param Z: leaflet apex outward deflection value (m) :param ng: internal molar content (mol) :param Qm: membrane charge density (C/m2) :param Pac: external acoustic perturbation (Pa) :param Pm_comp_method: type of method used to compute average intermolecular pressure :return: total balance pressure (Pa) """ if Pm_comp_method is PmCompMethod.direct: Pm = self.PMavg(Z, self.curvrad(Z), self.surface(Z)) elif Pm_comp_method is PmCompMethod.predict: Pm = self.PMavgpred(Z) return Pm + self.gasmol2Pa(ng, self.volume(Z)) - self.P0 - Pac + self.Pelec(Z, Qm) def balancedefQS(self, ng, Qm, Pac=0.0, Pm_comp_method=PmCompMethod.predict): """ Compute the leaflet deflection upon application of an external perturbation to a quasi-steady system with a charged membrane. This function uses the Brent method (progressive approximation of function root) to solve the following transcendental equation for Z: Pm + Pg + Pec - P0 - Pac = 0. :param ng: internal molar content (mol) :param Qm: membrane charge density (C/m2) :param Pac: external acoustic perturbation (Pa) :param Pm_comp_method: type of method used to compute average intermolecular pressure :return: leaflet deflection (Z) canceling out the balance equation """ lb = -0.49 * self.Delta ub = self.a Plb = self.PtotQS(lb, ng, Qm, Pac, Pm_comp_method) Pub = self.PtotQS(ub, ng, Qm, Pac, Pm_comp_method) assert (Plb > 0 > Pub), '[%d, %d] is not a sign changing interval for PtotQS' % (lb, ub) return brentq(self.PtotQS, lb, ub, args=(ng, Qm, Pac, Pm_comp_method), xtol=1e-16) def TEleaflet(self, Z): """ Compute the circumferential elastic tension felt across the entire leaflet upon stretching. :param Z: leaflet apex outward deflection value (m) :return: circumferential elastic tension (N/m) """ return self.kA * self.arealstrain(Z) def TEtissue(self, Z): """ Compute the circumferential elastic tension felt across the embedding viscoelastic tissue layer upon stretching. :param Z: leaflet apex outward deflection value (m) :return: circumferential elastic tension (N/m) """ return self.kA_tissue * self.arealstrain(Z) def TEtot(self, Z): """ Compute the total circumferential elastic tension (leaflet and embedding tissue) felt upon stretching. :param Z: leaflet apex outward deflection value (m) :return: circumferential elastic tension (N/m) """ return self.TEleaflet(Z) + self.TEtissue(Z) def PEtot(self, Z, R): """ Compute the total elastic tension pressure (leaflet + embedding tissue) felt upon stretching. :param Z: leaflet apex outward deflection value (m) :param R: leaflet curvature radius (m) :return: elastic tension pressure (Pa) """ return - self.TEtot(Z) / R def PVleaflet(self, U, R): """ Compute the viscous stress felt across the entire leaflet upon stretching. :param U: leaflet apex outward deflection velocity (m/s) :param R: leaflet curvature radius (m) :return: leaflet viscous stress (Pa) """ return - 12 * U * self.delta0 * self.muS / R**2 def PVfluid(self, U, R): """ Compute the viscous stress felt across the entire fluid upon stretching. :param U: leaflet apex outward deflection velocity (m/s) :param R: leaflet curvature radius (m) :return: fluid viscous stress (Pa) """ return - 4 * U * self.muL / np.abs(R) def accP(self, Pres, R): """ Compute the pressure-driven acceleration of the leaflet in the unsteady system, upon application of an external perturbation. :param Pres: net resultant pressure (Pa) :param R: leaflet curvature radius (m) :return: pressure-driven acceleration (m/s^2) """ return Pres / (self.rhoL * np.abs(R)) def accNL(self, U, R): """ Compute the non-linear term of the leaflet acceleration in the unsteady system, upon application of an external perturbation. :param U: leaflet apex outward deflection velocity (m/s) :param R: leaflet curvature radius (m) :return: nonlinear acceleration (m/s^2) .. note:: A simplified version of nonlinear acceleration (neglecting dR/dH) is used here. """ # return - (3/2 - 2*R/H) * U**2 / R return -(3 * U**2) / (2 * R) def eqMech(self, t, y, Adrive, Fdrive, Qm, phi): """ Compute the derivatives of the 3-ODE mechanical system variables, with an imposed constant charge density. :param t: specific instant in time (s) :param y: vector of HH system variables at time t :param Adrive: acoustic drive amplitude (Pa) :param Fdrive: acoustic drive frequency (Hz) :param Qm: membrane charge density (F/m2) :param phi: acoustic drive phase (rad) :return: vector of mechanical system derivatives at time t """ # Split input vector explicitly (U, Z, ng) = y + # Check soundness of deflection value + assert Z > -0.5 * self.Delta, 'Deflection out of range' + # Compute curvature radius R = self.curvrad(Z) # Compute total pressure Pg = self.gasmol2Pa(ng, self.volume(Z)) Ptot = (self.PMavgpred(Z) + Pg - self.P0 - self.Pacoustic(t, Adrive, Fdrive, phi) + self.PEtot(Z, R) + self.PVleaflet(U, R) + self.PVfluid(U, R) + self.Pelec(Z, Qm)) # Compute derivatives dUdt = self.accP(Ptot, R) + self.accNL(U, R) dZdt = U dngdt = self.gasflux(Z, Pg) # Return derivatives vector return [dUdt, dZdt, dngdt] def runMech(self, Fdrive, Adrive, Qm, phi=np.pi): """ Compute short solutions of the mechanical system for specific US stimulation parameters and with an imposed membrane charge density. :param Fdrive: acoustic drive frequency (Hz) :param Adrive: acoustic drive amplitude (Pa) :param phi: acoustic drive phase (rad) :param Qm: imposed membrane charge density (C/m2) :return: 3-tuple with the time profile, the solution matrix and a state vector """ # Check validity of stimulation parameters for param in [Fdrive, Adrive, Qm, phi]: assert isinstance(param, float), 'stimulation parameters must be float typed' assert Fdrive > 0, 'Driving frequency must be strictly positive' assert Adrive >= 0, 'Acoustic pressure amplitude must be positive' assert INPUT_CHARGE_RANGE[0] <= Qm <= INPUT_CHARGE_RANGE[1], ('Applied charge must be ' 'within physiological range') assert phi >= 0 and phi < 2 * np.pi, 'US phase must be within [0, 2 PI)' # Raise warnings as error warnings.filterwarnings('error') # Initialize mechanical system solvers solver = integrate.ode(self.eqMech) solver.set_f_params(Adrive, Fdrive, Qm, phi) solver.set_integrator('lsoda', nsteps=SOLVER_NSTEPS) # Determine mechanical system time step Tdrive = 1 / Fdrive dt_mech = Tdrive / NPC_FULL t_mech_cycle = np.linspace(0, Tdrive - dt_mech, NPC_FULL) # Initialize system variables t0 = 0.0 Z0 = 0.0 U0 = 0.0 ng0 = self.ng0 # Solve quasi-steady equation to compute first deflection value Pac1 = self.Pacoustic(t0 + dt_mech, Adrive, Fdrive, phi) Z1 = self.balancedefQS(ng0, Qm, Pac1, PmCompMethod.predict) U1 = (Z1 - Z0) / dt_mech # Construct arrays to hold system variables states = np.array([1, 1]) t = np.array([t0, t0 + dt_mech]) y = np.array([[U0, U1], [Z0, Z1], [ng0, ng0]]) # Integrate mechanical system for a few acoustic cycles until stabilization sim_error = False periodic_conv = False j = 0 ng_last = None Z_last = None while not sim_error and not periodic_conv: t_mech = t_mech_cycle + t[-1] + dt_mech y_mech = np.empty((3, NPC_FULL)) y0_mech = y[:, -1] solver.set_initial_value(y0_mech, t[-1]) k = 0 try: # try to integrate and catch errors/warnings while solver.successful() and k <= NPC_FULL - 1: solver.integrate(t_mech[k]) y_mech[:, k] = solver.y - assert (y_mech[1, k] > -0.5 * self.Delta), 'Deflection out of range' k += 1 except (Warning, AssertionError) as inst: sim_error = True logger.error('Mech. system integration error at step %u', k, extra={inst}) # Compare Z and ng signals over the last 2 acoustic periods if j > 0: Z_rmse = rmse(Z_last, y_mech[1, :]) ng_rmse = rmse(ng_last, y_mech[2, :]) logger.debug('step %u: Z_rmse = %.2e m, ng_rmse = %.2e mol', j, Z_rmse, ng_rmse) if Z_rmse < Z_ERR_MAX and ng_rmse < NG_ERR_MAX: periodic_conv = True # Update last vectors for next comparison Z_last = y_mech[1, :] ng_last = y_mech[2, :] # Concatenate time and solutions to global vectors states = np.concatenate([states, np.ones(NPC_FULL)], axis=0) t = np.concatenate([t, t_mech], axis=0) y = np.concatenate([y, y_mech], axis=1) # Increment loop index j += 1 logger.debug('Periodic convergence after %u cycles', j) states[-1] = 0 # return output variables return (t, y[1:, :], states) diff --git a/PointNICE/plt/pltutils.py b/PointNICE/plt/pltutils.py index df20e0f..91ffb6d 100644 --- a/PointNICE/plt/pltutils.py +++ b/PointNICE/plt/pltutils.py @@ -1,835 +1,836 @@ # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Date: 2017-08-23 14:55:37 # @Last Modified by: Theo Lemaire -# @Last Modified time: 2017-09-06 15:59:49 +# @Last Modified time: 2017-09-10 17:01:41 ''' Plotting utilities ''' +import sys import os import pickle import ntpath import re import logging import tkinter as tk from tkinter import filedialog import numpy as np from scipy.interpolate import interp2d # from scipy.optimize import brentq import matplotlib.pyplot as plt from matplotlib.patches import Rectangle import matplotlib.cm as cm from .. import channels from ..utils import getNeuronsDict, getLookupDir, rescale from ..bls import BilayerSonophore from .pltvars import pltvars # Get package logger logger = logging.getLogger('PointNICE') # Define global variables neuron = None bls = None # Regular expression for input files rgxp = re.compile('(ESTIM|ASTIM)_([A-Za-z]*)_(.*).pkl') rgxp_mech = re.compile('(MECH)_(.*).pkl') # nb = '[0-9]*[.]?[0-9]+' # rgxp_ASTIM = re.compile('(ASTIM)_(\w+)_(PW|CW)_({0})nm_({0})kHz_({0})kPa_({0})ms(.*)_(\w+).pkl'.format(nb)) # rgxp_ESTIM = re.compile('(ESTIM)_(\w+)_(PW|CW)_({0})mA_per_m2_({0})ms(.*).pkl'.format(nb)) # rgxp_PW = re.compile('_PRF({0})kHz_DF({0})_(PW|CW)_(\d+)kHz_(\d+)kPa_(\d+)ms_(.*).pkl'.format(nb)) # Figure naming conventions ESTIM_CW_title = '{} neuron: CW E-STIM {:.2f}mA/m2, {:.0f}ms' ESTIM_PW_title = '{} neuron: PW E-STIM {:.2f}mA/m2, {:.0f}ms, {:.2f}kHz PRF, {:.0f}% DC' ASTIM_CW_title = '{} neuron: CW A-STIM {:.0f}kHz, {:.0f}kPa, {:.0f}ms' ASTIM_PW_title = '{} neuron: PW A-STIM {:.0f}kHz, {:.0f}kPa, {:.0f}ms, {:.2f}kHz PRF, {:.2f}% DC' MECH_title = '{:.0f}nm BLS structure: MECH-STIM {:.0f}kHz, {:.0f}kPa' class InteractiveLegend(object): """ Class defining an interactive matplotlib legend, where lines visibility can be toggled by simply clicking on the corresponding legend label. Other graphic objects can also be associated to the toggle of a specific line Adapted from: http://stackoverflow.com/questions/31410043/hiding-lines-after-showing-a-pyplot-figure """ def __init__(self, legend, aliases): self.legend = legend self.fig = legend.axes.figure self.lookup_artist, self.lookup_handle = self._build_lookups(legend) self._setup_connections() self.handles_aliases = aliases self.update() def _setup_connections(self): for artist in self.legend.texts + self.legend.legendHandles: artist.set_picker(10) # 10 points tolerance self.fig.canvas.mpl_connect('pick_event', self.on_pick) def _build_lookups(self, legend): ''' Method of the InteractiveLegend class building the legend lookups. ''' labels = [t.get_text() for t in legend.texts] handles = legend.legendHandles label2handle = dict(zip(labels, handles)) handle2text = dict(zip(handles, legend.texts)) lookup_artist = {} lookup_handle = {} for artist in legend.axes.get_children(): if artist.get_label() in labels: handle = label2handle[artist.get_label()] lookup_handle[artist] = handle lookup_artist[handle] = artist lookup_artist[handle2text[handle]] = artist lookup_handle.update(zip(handles, handles)) lookup_handle.update(zip(legend.texts, handles)) return lookup_artist, lookup_handle def on_pick(self, event): handle = event.artist if handle in self.lookup_artist: artist = self.lookup_artist[handle] artist.set_visible(not artist.get_visible()) self.update() def update(self): for artist in self.lookup_artist.values(): handle = self.lookup_handle[artist] if artist.get_visible(): handle.set_visible(True) if artist in self.handles_aliases: for al in self.handles_aliases[artist]: al.set_visible(True) else: handle.set_visible(False) if artist in self.handles_aliases: for al in self.handles_aliases[artist]: al.set_visible(False) self.fig.canvas.draw() def show(self): ''' showing the interactive legend ''' plt.show() def getPatchesLoc(t, states): ''' Determine the location of stimulus patches. :param t: simulation time vector (s). :param states: a vector of stimulation state (ON/OFF) at each instant in time. :return: 3-tuple with number of patches, timing of STIM-ON an STIM-OFF instants. ''' # Compute states derivatives and identify bounds indexes of pulses dstates = np.diff(states) ipatch_on = np.insert(np.where(dstates > 0.0)[0] + 1, 0, 0) ipatch_off = np.where(dstates < 0.0)[0] # Get time instants for pulses ON and OFF npatches = ipatch_on.size tpatch_on = t[ipatch_on] tpatch_off = t[ipatch_off] # return 3-tuple with #patches, pulse ON and pulse OFF instants return (npatches, tpatch_on, tpatch_off) def SaveFigDialog(dirname, filename): """ Open a FileSaveDialogBox to set the directory and name of the figure to be saved. The default directory and filename are given, and the default extension is ".pdf" :param dirname: default directory :param filename: default filename :return: full path to the chosen filename """ root = tk.Tk() root.withdraw() filename_out = filedialog.asksaveasfilename(defaultextension=".pdf", initialdir=dirname, initialfile=filename) return filename_out def plotComp(yvars, filepaths, labels=None, fs=15, show_patches=True): ''' Compare profiles of several specific output variables of NICE simulations. :param yvars: list of variables names to extract and compare :param filepaths: list of full paths to output data files to be compared :param labels: list of labels to use in the legend :param fs: labels fontsize :param show_patches: boolean indicating whether to indicate periods of stimulation with colored rectangular patches ''' # check labels if given if labels: assert len(labels) == len(filepaths), 'labels do not match number of compared files' assert all(isinstance(x, str) for x in labels), 'labels must be string typed' nvars = len(yvars) # y variables plotting information y_pltvars = [pltvars[key] for key in yvars] # Dictionary of neurons neurons = getNeuronsDict() # Initialize figure and axes if nvars == 1: _, ax = plt.subplots(figsize=(11, 4)) axes = [ax] else: _, axes = plt.subplots(nvars, 1, figsize=(11, min(3 * nvars, 9))) for i in range(nvars): ax = axes[i] pltvar = y_pltvars[i] if 'min' in pltvar and 'max' in pltvar: ax.set_ylim(pltvar['min'], pltvar['max']) if pltvar['unit']: ax.set_ylabel('${}\ ({})$'.format(pltvar['label'], pltvar['unit']), fontsize=fs) else: ax.set_ylabel('${}$'.format(pltvar['label']), fontsize=fs) if i < nvars - 1: ax.get_xaxis().set_ticklabels([]) else: for tick in ax.xaxis.get_major_ticks(): tick.label.set_fontsize(fs) ax.locator_params(axis='y', nbins=2) for tick in ax.yaxis.get_major_ticks(): tick.label.set_fontsize(fs) # Loop through data files j = 0 aliases = {} for filepath in filepaths: pkl_filename = ntpath.basename(filepath) # Retrieve sim type mo1 = rgxp.fullmatch(pkl_filename) mo2 = rgxp_mech.fullmatch(pkl_filename) if mo1: mo = mo1 elif mo2: mo = mo2 else: - print('Error: PKL file does not match regular expression pattern') - quit() + logger.error('Error: "%s" file does not match regexp pattern', pkl_filename) + sys.exit(1) sim_type = mo.group(1) assert sim_type in ['MECH', 'ASTIM', 'ESTIM'], 'invalid stimulation type' if j == 0: sim_type_ref = sim_type else: assert sim_type == sim_type_ref, 'Error: comparing different simulation types' # Load data - print('Loading data from "' + pkl_filename + '"') + logger.info('Loading data from "%s"', pkl_filename) with open(filepath, 'rb') as pkl_file: data = pickle.load(pkl_file) # Extract variables - print('Extracting variables') + logger.info('Extracting variables') t = data['t'] states = data['states'] nsamples = t.size # Initialize channel mechanism if sim_type in ['ASTIM', 'ESTIM']: neuron_name = mo.group(2) global neuron neuron = neurons[neuron_name]() neuron_states = [data[sn] for sn in neuron.states_names] Cm0 = neuron.Cm0 Qm0 = Cm0 * neuron.Vm0 * 1e-3 t_plt = pltvars['t_ms'] else: Cm0 = data['Cm0'] Qm0 = data['Qm0'] t_plt = pltvars['t_us'] # Initialize BLS if sim_type in ['MECH', 'ASTIM']: global bls params = data['params'] Fdrive = data['Fdrive'] a = data['a'] d = data['d'] geom = {"a": a, "d": d} bls = BilayerSonophore(geom, params, Fdrive, Cm0, Qm0) # Determine patches location npatches, tpatch_on, tpatch_off = getPatchesLoc(t, states) # Adding onset to time and states vectors if t_plt['onset'] > 0.0: t = np.insert(t, 0, -t_plt['onset']) states = np.insert(states, 0, 0) # Extract variables to plot vrs = [] for i in range(nvars): pltvar = y_pltvars[i] if 'alias' in pltvar: var = eval(pltvar['alias']) elif 'key' in pltvar: var = data[pltvar['key']] elif 'constant' in pltvar: var = eval(pltvar['constant']) * np.ones(nsamples) else: var = data[yvars[i]] if var.size == t.size - 1: var = np.insert(var, 0, var[0]) vrs.append(var) # Legend label if labels: label = labels[j] else: if sim_type == 'ESTIM': if data['DF'] == 1.0: label = ESTIM_CW_title.format(neuron.name, data['Astim'], data['tstim'] * 1e3) else: label = ESTIM_PW_title.format(neuron.name, data['Astim'], data['tstim'] * 1e3, data['PRF'] * 1e-3, data['DF'] * 1e2) elif sim_type == 'ASTIM': if data['DF'] == 1.0: label = ASTIM_CW_title.format(neuron.name, Fdrive * 1e-3, data['Adrive'] * 1e-3, data['tstim'] * 1e3) else: label = ASTIM_PW_title.format(neuron.name, Fdrive * 1e-3, data['Adrive'] * 1e-3, data['tstim'] * 1e3, data['PRF'] * 1e-3, data['DF'] * 1e2) elif sim_type == 'MECH': label = MECH_title.format(a * 1e9, Fdrive * 1e-3, data['Adrive'] * 1e-3) # Plotting handles = [axes[i].plot(t * t_plt['factor'], vrs[i] * y_pltvars[i]['factor'], linewidth=2, label=label) for i in range(nvars)] if show_patches: k = 0 # stimulation patches for ax in axes: handle = handles[k] (ybottom, ytop) = ax.get_ylim() la = [] for i in range(npatches): la.append(ax.add_patch(Rectangle((tpatch_on[i] * t_plt['factor'], ybottom), (tpatch_off[i] - tpatch_on[i]) * t_plt['factor'], ytop - ybottom, color=handle[0].get_color(), alpha=0.2))) aliases[handle[0]] = la k += 1 j += 1 # set x-axis label axes[-1].set_xlabel('${}\ ({})$'.format(t_plt['label'], t_plt['unit']), fontsize=fs) plt.tight_layout() iLegends = [] for k in range(nvars): axes[k].legend(loc='upper left', fontsize=fs) iLegends.append(InteractiveLegend(axes[k].legend_, aliases)) plt.show() def plotBatch(directory, filepaths, vars_dict=None, plt_show=True, plt_save=False, ask_before_save=True, fig_ext='png', tag='fig', fs=15, lw=2, title=True, show_patches=True): ''' Plot a figure with profiles of several specific NICE output variables, for several NICE simulations. :param positions: subplot indexes of each variable :param filepaths: list of full paths to output data files to be compared :param vars_dict: dict of lists of variables names to extract and plot together :param plt_show: boolean stating whether to show the created figures :param plt_save: boolean stating whether to save the created figures :param ask_before_save: boolean stating whether to show the created figures :param fig_ext: file extension for the saved figures :param tag: suffix added to the end of the figures name :param fs: labels font size :param lw: curves line width :param title: boolean stating whether to display a general title on the figures :param show_patches: boolean indicating whether to indicate periods of stimulation with colored rectangular patches ''' # Dictionary of neurons neurons = getNeuronsDict() # Loop through data files for filepath in filepaths: # Get code from file name pkl_filename = ntpath.basename(filepath) filecode = pkl_filename[0:-4] # Retrieve sim type mo1 = rgxp.fullmatch(pkl_filename) mo2 = rgxp_mech.fullmatch(pkl_filename) if mo1: mo = mo1 elif mo2: mo = mo2 else: logger.error('Error: "%s" file does not match regexp pattern', pkl_filename) - quit() + sys.exit(1) sim_type = mo.group(1) assert sim_type in ['MECH', 'ASTIM', 'ESTIM'], 'invalid stimulation type' # Load data - print('Loading data from "' + pkl_filename + '"') + logger.info('Loading data from "%s"', pkl_filename) with open(filepath, 'rb') as pkl_file: data = pickle.load(pkl_file) # Extract variables - print('Extracting variables') + logger.info('Extracting variables') t = data['t'] states = data['states'] nsamples = t.size # Initialize channel mechanism if sim_type in ['ASTIM', 'ESTIM']: neuron_name = mo.group(2) global neuron neuron = neurons[neuron_name]() neuron_states = [data[sn] for sn in neuron.states_names] Cm0 = neuron.Cm0 Qm0 = Cm0 * neuron.Vm0 * 1e-3 t_plt = pltvars['t_ms'] else: Cm0 = data['Cm0'] Qm0 = data['Qm0'] t_plt = pltvars['t_us'] # Initialize BLS if sim_type in ['MECH', 'ASTIM']: global bls params = data['params'] Fdrive = data['Fdrive'] a = data['a'] d = data['d'] geom = {"a": a, "d": d} bls = BilayerSonophore(geom, params, Fdrive, Cm0, Qm0) # Determine patches location npatches, tpatch_on, tpatch_off = getPatchesLoc(t, states) # Adding onset to time and states vectors if t_plt['onset'] > 0.0: t = np.insert(t, 0, -t_plt['onset']) states = np.insert(states, 0, 0) # Determine variables to plot if not provided if not vars_dict: if sim_type == 'ASTIM': vars_dict = {'Z': ['Z'], 'Q_m': ['Qm']} elif sim_type == 'ESTIM': vars_dict = {'V_m': ['Vm']} elif sim_type == 'MECH': vars_dict = {'P_{AC}': ['Pac'], 'Z': ['Z'], 'n_g': ['ng']} if sim_type in ['ASTIM', 'ESTIM'] and hasattr(neuron, 'pltvars_scheme'): vars_dict.update(neuron.pltvars_scheme) labels = list(vars_dict.keys()) naxes = len(vars_dict) # Plotting if naxes == 1: _, ax = plt.subplots(figsize=(11, 4)) axes = [ax] else: _, axes = plt.subplots(naxes, 1, figsize=(11, min(3 * naxes, 9))) for i in range(naxes): ax = axes[i] ax_pltvars = [pltvars[j] for j in vars_dict[labels[i]]] nvars = len(ax_pltvars) # X-axis if i < naxes - 1: ax.get_xaxis().set_ticklabels([]) else: ax.set_xlabel('${}\ ({})$'.format(t_plt['label'], t_plt['unit']), fontsize=fs) for tick in ax.xaxis.get_major_ticks(): tick.label.set_fontsize(fs) # Y-axis if ax_pltvars[0]['unit']: ax.set_ylabel('${}\ ({})$'.format(labels[i], ax_pltvars[0]['unit']), fontsize=fs) else: ax.set_ylabel('${}$'.format(labels[i]), fontsize=fs) if 'min' in ax_pltvars[0] and 'max' in ax_pltvars[0]: ax_min = min([ap['min'] for ap in ax_pltvars]) ax_max = max([ap['max'] for ap in ax_pltvars]) ax.set_ylim(ax_min, ax_max) ax.locator_params(axis='y', nbins=2) for tick in ax.yaxis.get_major_ticks(): tick.label.set_fontsize(fs) # Time series icolor = 0 for j in range(nvars): # Extract variable pltvar = ax_pltvars[j] if 'alias' in pltvar: var = eval(pltvar['alias']) elif 'key' in pltvar: var = data[pltvar['key']] elif 'constant' in pltvar: var = eval(pltvar['constant']) * np.ones(nsamples) else: var = data[vars_dict[labels[i]][j]] if var.size == t.size - 1: var = np.insert(var, 0, var[0]) # Plot variable if 'constant' in pltvar: ax.plot(t * t_plt['factor'], var * pltvar['factor'], '--', c='black', lw=lw, label='${}$'.format(pltvar['label'])) else: ax.plot(t * t_plt['factor'], var * pltvar['factor'], c='C{}'.format(icolor), lw=lw, label='${}$'.format(pltvar['label'])) icolor += 1 # Patches if show_patches == 1: (ybottom, ytop) = ax.get_ylim() for j in range(npatches): ax.add_patch(Rectangle((tpatch_on[j] * t_plt['factor'], ybottom), (tpatch_off[j] - tpatch_on[j]) * t_plt['factor'], ytop - ybottom, color='#8A8A8A', alpha=0.1)) # Legend if nvars > 1: ax.legend(fontsize=fs, loc=7, ncol=nvars // 4 + 1) # Title if title: if sim_type == 'ESTIM': if data['DF'] == 1.0: fig_title = ESTIM_CW_title.format(neuron.name, data['Astim'], data['tstim'] * 1e3) else: fig_title = ESTIM_PW_title.format(neuron.name, data['Astim'], data['tstim'] * 1e3, data['PRF'] * 1e-3, data['DF'] * 1e2) elif sim_type == 'ASTIM': if data['DF'] == 1.0: fig_title = ASTIM_CW_title.format(neuron.name, Fdrive * 1e-3, data['Adrive'] * 1e-3, data['tstim'] * 1e3) else: fig_title = ASTIM_PW_title.format(neuron.name, Fdrive * 1e-3, data['Adrive'] * 1e-3, data['tstim'] * 1e3, data['PRF'] * 1e-3, data['DF'] * 1e2) elif sim_type == 'MECH': fig_title = MECH_title.format(a * 1e9, Fdrive * 1e-3, data['Adrive'] * 1e-3) axes[0].set_title(fig_title, fontsize=fs) plt.tight_layout() # Save figure if needed (automatic or checked) if plt_save: if ask_before_save: plt_filename = SaveFigDialog(directory, '{}_{}.{}'.format(filecode, tag, fig_ext)) else: plt_filename = '{}/{}_{}.{}'.format(directory, filecode, tag, fig_ext) if plt_filename: plt.savefig(plt_filename) - print('Saving figure as "{}"'.format(plt_filename)) + logger.info('Saving figure as "{}"'.format(plt_filename)) plt.close() # Show all plots if needed if plt_show: plt.show() def plotGatingKinetics(neuron, fs=15): ''' Plot the voltage-dependent steady-states and time constants of activation and inactivation gates of the different ionic currents involved in a specific neuron's membrane. :param neuron: specific channel mechanism object :param fs: labels and title font size ''' # Input membrane potential vector Vm = np.linspace(-100, 50, 300) xinf_dict = {} taux_dict = {} - print('Computing {} neuron gating kinetics'.format(neuron.name)) + logger.info('Computing %s neuron gating kinetics', neuron.name) names = neuron.states_names for xname in names: Vm_state = True # Names of functions of interest xinf_func_str = xname.lower() + 'inf' taux_func_str = 'tau' + xname.lower() alphax_func_str = 'alpha' + xname.lower() betax_func_str = 'beta' + xname.lower() # derx_func_str = 'der' + xname.upper() # 1st choice: use xinf and taux function if hasattr(neuron, xinf_func_str) and hasattr(neuron, taux_func_str): xinf_func = getattr(neuron, xinf_func_str) taux_func = getattr(neuron, taux_func_str) xinf = np.array([xinf_func(v) for v in Vm]) taux = np.array([taux_func(v) for v in Vm]) # 2nd choice: use alphax and betax functions elif hasattr(neuron, alphax_func_str) and hasattr(neuron, betax_func_str): alphax_func = getattr(neuron, alphax_func_str) betax_func = getattr(neuron, betax_func_str) alphax = np.array([alphax_func(v) for v in Vm]) betax = np.array([betax_func(v) for v in Vm]) taux = 1.0 / (alphax + betax) xinf = taux * alphax # # 3rd choice: use derX choice # elif hasattr(neuron, derx_func_str): # derx_func = getattr(neuron, derx_func_str) # xinf = brentq(lambda x: derx_func(neuron.Vm, x), 0, 1) else: Vm_state = False if not Vm_state: - print('no function to compute {}-state gating kinetics'.format(xname)) + logger.error('no function to compute %s-state gating kinetics', xname) else: xinf_dict[xname] = xinf taux_dict[xname] = taux fig, axes = plt.subplots(2) fig.suptitle('{} neuron: gating dynamics'.format(neuron.name)) ax = axes[0] ax.get_xaxis().set_ticklabels([]) ax.set_ylabel('$X_{\infty}\ (mV)$', fontsize=fs) for xname in names: if xname in xinf_dict: ax.plot(Vm, xinf_dict[xname], lw=2, label='$' + xname + '_{\infty}$') ax.legend(fontsize=fs, loc=7) ax = axes[1] ax.set_xlabel('$V_m\ (mV)$', fontsize=fs) ax.set_ylabel('$\\tau_X\ (ms)$', fontsize=fs) for xname in names: if xname in taux_dict: ax.plot(Vm, taux_dict[xname] * 1e3, lw=2, label='$\\tau_{' + xname + '}$') ax.legend(fontsize=fs, loc=7) plt.show() def plotRateConstants(neuron, fs=15): ''' Plot the voltage-dependent activation and inactivation rate constants for each gate of all ionic currents involved in a specific neuron's membrane. :param neuron: specific channel mechanism object :param fs: labels and title font size ''' # Input membrane potential vector Vm = np.linspace(-250, 250, 300) alphax_dict = {} betax_dict = {} - print('Computing {} neuron gating kinetics'.format(neuron.name)) + logger.info('Computing %s neuron gating kinetics', neuron.name) names = neuron.states_names for xname in names: Vm_state = True # Names of functions of interest xinf_func_str = xname.lower() + 'inf' taux_func_str = 'tau' + xname.lower() alphax_func_str = 'alpha' + xname.lower() betax_func_str = 'beta' + xname.lower() # 1st choice: use alphax and betax functions if hasattr(neuron, alphax_func_str) and hasattr(neuron, betax_func_str): alphax_func = getattr(neuron, alphax_func_str) betax_func = getattr(neuron, betax_func_str) alphax = np.array([alphax_func(v) for v in Vm]) betax = np.array([betax_func(v) for v in Vm]) # 2nd choice: use xinf and taux function elif hasattr(neuron, xinf_func_str) and hasattr(neuron, taux_func_str): xinf_func = getattr(neuron, xinf_func_str) taux_func = getattr(neuron, taux_func_str) xinf = np.array([xinf_func(v) for v in Vm]) taux = np.array([taux_func(v) for v in Vm]) alphax = xinf / taux betax = 1.0 / taux - alphax else: Vm_state = False if not Vm_state: - print('no function to compute {}-state gating kinetics'.format(xname)) + logger.error('no function to compute %s-state gating kinetics', xname) else: alphax_dict[xname] = alphax betax_dict[xname] = betax naxes = len(alphax_dict) _, axes = plt.subplots(naxes, figsize=(11, min(3 * naxes, 9))) for i, xname in enumerate(alphax_dict.keys()): ax1 = axes[i] if i == 0: ax1.set_title('{} neuron: rate constants'.format(neuron.name)) if i == naxes - 1: ax1.set_xlabel('$V_m\ (mV)$', fontsize=fs) else: ax1.get_xaxis().set_ticklabels([]) ax1.set_ylabel('$\\alpha_{' + xname + '}\ (ms^{-1})$', fontsize=fs, color='C0') for label in ax1.get_yticklabels(): label.set_color('C0') ax1.plot(Vm, alphax_dict[xname] * 1e-3, lw=2) ax2 = ax1.twinx() ax2.set_ylabel('$\\beta_{' + xname + '}\ (ms^{-1})$', fontsize=fs, color='C1') for label in ax2.get_yticklabels(): label.set_color('C1') ax2.plot(Vm, betax_dict[xname] * 1e-3, lw=2, color='C1') plt.tight_layout() plt.show() def setGrid(n): ''' Determine number of rows and columns in figure grid, based on number of variables to plot. ''' if n <= 3: return (1, n) else: return ((n - 1) // 3 + 1, 3) def plotEffCoeffs(neuron, Fdrive, a=32e-9, fs=12): ''' Plot the profiles of all effective coefficients of a specific neuron for a given frequency. For each coefficient X, one line chart per lookup amplitude is plotted, using charge as the input variable on the abscissa and a linear color code for the amplitude value. :param neuron: channel mechanism object :param Fdrive: acoustic drive frequency (Hz) :param a: sonophore diameter (m) :param fs: figure fontsize ''' # Check lookup file existence lookup_file = '{}_lookups_a{:.1f}nm.pkl'.format(neuron.name, a * 1e9) lookup_path = '{}/{}'.format(getLookupDir(), lookup_file) assert os.path.isfile(lookup_path), ('No lookup file available for {} ' 'neuron type').format(neuron.name) # Load coefficients with open(lookup_path, 'rb') as fh: lookup_dict = pickle.load(fh) # Retrieve 1D inputs from lookup dictionary freqs = lookup_dict['f'] amps = lookup_dict['A'] charges = lookup_dict['Q'] # Check that frequency is within lookup range margin = 1e-9 # adding margin to compensate for eventual round error frange = (freqs.min() - margin, freqs.max() + margin) assert frange[0] <= Fdrive <= frange[1], \ 'Fdrive must be within [{:.1f}, {:.1f}] kHz'.format(*[f * 1e-3 for f in frange]) # Define coefficients list coeffs_list = ['V', 'ng', *neuron.coeff_names] AQ_coeffs = {} # If Fdrive in lookup frequencies, simply project (A, Q) dataset at that frequency if Fdrive in freqs: iFdrive = np.searchsorted(freqs, Fdrive) - print('Using lookups directly at {:.2f} kHz'.format(freqs[iFdrive] * 1e-3)) + logger.info('Using lookups directly at %.2f kHz', freqs[iFdrive] * 1e-3) for cn in coeffs_list: AQ_coeffs[cn] = np.squeeze(lookup_dict[cn][iFdrive, :, :]) # Otherwise, project 2 (A, Q) interpolation datasets at Fdrive bounding values # indexes in lookup frequencies onto two 1D charge-based interpolation datasets, and # interpolate between them afterwards else: ilb = np.searchsorted(freqs, Fdrive) - 1 - print('Interpolating lookups between {:.2f} kHz and {:.2f} kHz'.format( - freqs[ilb] * 1e-3, freqs[ilb + 1] * 1e-3)) + logger.info('Interpolating lookups between %.2f kHz and %.2f kHz', + freqs[ilb] * 1e-3, freqs[ilb + 1] * 1e-3) for cn in coeffs_list: AQ_slice = [] for iAdrive in range(len(amps)): fQ_slice = np.squeeze(lookup_dict[cn][:, iAdrive, :]) itrp = interp2d(freqs, charges, fQ_slice.T) Q_vect = itrp(Fdrive, charges) AQ_slice.append(Q_vect) AQ_coeffs[cn] = np.squeeze(np.array([AQ_slice])) # Replace dict key AQ_coeffs['Veff'] = AQ_coeffs.pop('V') # Plotting - print('plotting') + logger.info('plotting') Amin, Amax = amps.min(), amps.max() ncoeffs = len(coeffs_list) nrows, ncols = setGrid(ncoeffs) xvar = pltvars['Qm'] mymap = cm.get_cmap('viridis') sm_amp = plt.cm.ScalarMappable(cmap=mymap, norm=plt.Normalize(Amin * 1e-3, Amax * 1e-3)) sm_amp._A = [] fig, _ = plt.subplots(figsize=(5 * ncols, 1.8 * nrows), squeeze=False) for j, cn in enumerate(AQ_coeffs.keys()): ax = plt.subplot2grid((nrows, ncols), (j // 3, j % 3)) # ax = axes[j // 3, j % 3] yvar = pltvars[cn] ax.set_xlabel('${}\ ({})$'.format(xvar['label'], xvar['unit']), fontsize=fs) ax.set_ylabel('${}\ ({})$'.format(yvar['label'], yvar['unit']), fontsize=fs) for i, Adrive in enumerate(amps): ax.plot(charges * xvar['factor'], AQ_coeffs[cn][i, :] * yvar['factor'], c=mymap(rescale(Adrive, Amin, Amax))) plt.tight_layout() fig.suptitle('{} neuron: effective coefficients @ {:.0f} kHz'.format( neuron.name, Fdrive * 1e-3)) fig.subplots_adjust(top=0.9, right=0.85) cbar_ax = fig.add_axes([0.87, 0.05, 0.02, 0.9]) fig.add_axes() fig.colorbar(sm_amp, cax=cbar_ax) cbar_ax.set_ylabel('$A_{drive} \ (kPa)$', fontsize=fs) plt.show() diff --git a/PointNICE/solvers/SolverUS.py b/PointNICE/solvers/SolverUS.py index 08d7cf5..307c3b4 100644 --- a/PointNICE/solvers/SolverUS.py +++ b/PointNICE/solvers/SolverUS.py @@ -1,775 +1,772 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Date: 2016-09-29 16:16:19 # @Email: theo.lemaire@epfl.ch # @Last Modified by: Theo Lemaire -# @Last Modified time: 2017-09-06 17:01:45 +# @Last Modified time: 2017-09-10 17:14:21 import os import warnings import pickle import logging import progressbar as pb import numpy as np import scipy.integrate as integrate from scipy.interpolate import interp2d from ..bls import BilayerSonophore from ..utils import * from ..constants import * from ..channels import BaseMech # Get package logger logger = logging.getLogger('PointNICE') class SolverUS(BilayerSonophore): """ This class extends the BilayerSonophore class by adding a biophysical Hodgkin-Huxley model on top of the mechanical BLS model. """ def __init__(self, geom, params, ch_mech, Fdrive): """ Constructor of the class. :param geom: BLS geometric constants dictionary :param params: BLS biomechanical and biophysical parameters dictionary :param ch_mech: channels mechanism object :param Fdrive: frequency of acoustic perturbation (Hz) """ # Check validity of input parameters assert isinstance(ch_mech, BaseMech), ('channel mechanism must be inherited ' 'from the BaseMech class') assert Fdrive >= 0., 'Driving frequency must be positive' # TODO: check parameters dictionary (float type, mandatory members) # Initialize BLS object Cm0 = ch_mech.Cm0 Vm0 = ch_mech.Vm0 BilayerSonophore.__init__(self, geom, params, Fdrive, Cm0, Cm0 * Vm0 * 1e-3) logger.debug('US solver initialization with %s channel mechanism', ch_mech.name) def eqHH(self, t, y, ch_mech, Cm): """ Compute the derivatives of the n-ODE HH system variables, based on a value of membrane capacitance. :param t: specific instant in time (s) :param y: vector of HH system variables at time t :param ch_mech: channels mechanism object :param Cm: membrane capacitance (F/m2) :return: vector of HH system derivatives at time t """ # Split input vector explicitly Qm, *states = y # Compute membrane potential Vm = Qm / Cm * 1e3 # mV # Compute derivatives dQm = - ch_mech.currNet(Vm, states) * 1e-3 # A/m2 dstates = ch_mech.derStates(Vm, states) # Return derivatives vector return [dQm, *dstates] def eqFull(self, t, y, ch_mech, Adrive, Fdrive, phi): """ Compute the derivatives of the (n+3) ODE full NBLS system variables. :param t: specific instant in time (s) :param y: vector of state variables :param ch_mech: channels mechanism object :param Adrive: acoustic drive amplitude (Pa) :param Fdrive: acoustic drive frequency (Hz) :param phi: acoustic drive phase (rad) :return: vector of derivatives """ # Compute derivatives of mechanical and electrical systems dydt_mech = self.eqMech(t, y[:3], Adrive, Fdrive, y[3], phi) dydt_elec = self.eqHH(t, y[3:], ch_mech, self.Capct(y[1])) # return concatenated output return dydt_mech + dydt_elec def eqHHeff(self, t, y, ch_mech, interp_data): """ Compute the derivatives of the n-ODE effective HH system variables, based on 1-dimensional linear interpolation of "effective" coefficients that summarize the system's behaviour over an acoustic cycle. :param t: specific instant in time (s) :param y: vector of HH system variables at time t :param ch_mech: channels mechanism object :param channels: Channel object to compute a specific electrical membrane dynamics :param interp_data: dictionary of 1D data points of "effective" coefficients over the charge domain, for specific frequency and amplitude values. :return: vector of effective system derivatives at time t """ # Split input vector explicitly Qm, *states = y # Compute charge and channel states variation Vm = np.interp(Qm, interp_data['Q'], interp_data['V']) # mV dQmdt = - ch_mech.currNet(Vm, states) * 1e-3 dstates = ch_mech.derStatesEff(Qm, states, interp_data) # Return derivatives vector return [dQmdt, *dstates] def getEffCoeffs(self, ch_mech, Fdrive, Adrive, Qm, phi=np.pi): """ Compute "effective" coefficients of the HH system for a specific combination of stimulus frequency, stimulus amplitude and charge density. A short mechanical simulation is run while imposing the specific charge density, until periodic stabilization. The HH coefficients are then averaged over the last acoustic cycle to yield "effective" coefficients. :param ch_mech: channels mechanism object :param Fdrive: acoustic drive frequency (Hz) :param Adrive: acoustic drive amplitude (Pa) :param Qm: imposed charge density (C/m2) :param phi: acoustic drive phase (rad) :return: tuple with the effective potential, gas content and channel rates """ # Run simulation and retrieve deflection and gas content vectors from last cycle (_, y, _) = self.runMech(Fdrive, Adrive, Qm, phi) (Z, ng) = y Z_last = Z[-NPC_FULL:] # m # Compute membrane potential vector Vm = np.array([Qm / self.Capct(ZZ) * 1e3 for ZZ in Z_last]) # mV # Compute average cycle value for membrane potential and rate constants Vm_eff = np.mean(Vm) # mV rates_eff = ch_mech.getEffRates(Vm) # Take final cycle value for gas content ng_eff = ng[-1] # mole return (Vm_eff, ng_eff, *rates_eff) def createLookup(self, ch_mech, freqs, amps, charges, phi=np.pi): """ Run simulations of the mechanical system for a multiple combinations of imposed charge densities and acoustic amplitudes, compute effective coefficients and store them as 2D arrays in a lookup file. :param ch_mech: channels mechanism object :param freqs: array of acoustic drive frequencies (Hz) :param amps: array of acoustic drive amplitudes (Pa) :param charges: array of charge densities (C/m2) :param phi: acoustic drive phase (rad) """ # Check validity of stimulation parameters assert freqs.min() > 0, 'Driving frequencies must be strictly positive' assert amps.min() >= 0, 'Acoustic pressure amplitudes must be positive' logger.info('Creating lookup table for %s neuron', ch_mech.name) # Initialize lookup dictionary of 3D array to store effective coefficients nf = freqs.size nA = amps.size nQ = charges.size coeffs_names = ['V', 'ng', *ch_mech.coeff_names] ncoeffs = len(coeffs_names) lookup_dict = {cn: np.empty((nf, nA, nQ)) for cn in coeffs_names} # Loop through all (f, A, Q) combinations nsims = nf * nA * nQ isim = 0 log_str = 'short simulation %u/%u (f = %.2f kHz, A = %.2f kPa, Q = %.2f nC/cm2)' for i in range(nf): for j in range(nA): for k in range(nQ): isim += 1 # Run short simulation and store effective coefficients logger.info(log_str, isim, nsims, freqs[i] * 1e-3, amps[j] * 1e-3, charges[k] * 1e5) sim_coeffs = self.getEffCoeffs(ch_mech, freqs[i], amps[j], charges[k], phi) for icoeff in range(ncoeffs): lookup_dict[coeffs_names[icoeff]][i, j, k] = sim_coeffs[icoeff] # Add input frequency, amplitude and charge arrays to lookup dictionary lookup_dict['f'] = freqs # Hz lookup_dict['A'] = amps # Pa lookup_dict['Q'] = charges # C/m2 # Save dictionary in lookup file lookup_file = '{}_lookups_a{:.1f}nm.pkl'.format(ch_mech.name, self.a * 1e9) logger.info('Saving %s neuron lookup table in file: "%s"', ch_mech.name, lookup_file) lookup_filepath = '{0}/{1}'.format(getLookupDir(), lookup_file) with open(lookup_filepath, 'wb') as fh: pickle.dump(lookup_dict, fh) def __runClassic(self, ch_mech, Fdrive, Adrive, tstim, toffset, PRF, DF, phi=np.pi): """ Compute solutions of the system for a specific set of US stimulation parameters, using a classic integration scheme. The first iteration uses the quasi-steady simplification to compute the initiation of motion from a flat leaflet configuration. Afterwards, the ODE system is solved iteratively until completion. :param ch_mech: channels mechanism object :param Fdrive: acoustic drive frequency (Hz) :param Adrive: acoustic drive amplitude (Pa) :param tstim: duration of US stimulation (s) :param toffset: duration of the offset (s) :param PRF: pulse repetition frequency (Hz) :param DF: pulse duty factor (-) :param phi: acoustic drive phase (rad) :return: 3-tuple with the time profile, the effective solution matrix and a state vector """ # Raise warnings as error warnings.filterwarnings('error') # Initialize system solver solver_full = integrate.ode(self.eqFull) solver_full.set_integrator('lsoda', nsteps=SOLVER_NSTEPS, ixpr=True) # Determine system time step Tdrive = 1 / Fdrive dt = Tdrive / NPC_FULL # if CW stimulus: divide integration during stimulus into 100 intervals if DF == 1.0: PRF = 100 / tstim # Compute vector sizes npulses = int(np.round(PRF * tstim)) Tpulse_on = DF / PRF Tpulse_off = (1 - DF) / PRF n_pulse_on = int(np.round(Tpulse_on / dt)) n_pulse_off = int(np.round(Tpulse_off / dt)) n_off = int(np.round(toffset / dt)) # Solve quasi-steady equation to compute first deflection value Z0 = 0.0 ng0 = self.ng0 Qm0 = self.Qm0 Pac1 = self.Pacoustic(dt, Adrive, Fdrive, phi) Z1 = self.balancedefQS(ng0, Qm0, Pac1) # Initialize global arrays states = np.array([1, 1]) t = np.array([0., dt]) y_membrane = np.array([[0., (Z1 - Z0) / dt], [Z0, Z1], [ng0, ng0], [Qm0, Qm0]]) y_channels = np.tile(ch_mech.states0, (2, 1)).T y = np.vstack((y_membrane, y_channels)) nvar = y.shape[0] # Initialize pulse time and states vectors t_pulse0 = np.linspace(0, Tpulse_on + Tpulse_off, n_pulse_on + n_pulse_off) states_pulse = np.concatenate((np.ones(n_pulse_on), np.zeros(n_pulse_off))) # Initialize progress bar if logger.getEffectiveLevel() == logging.DEBUG: widgets = ['Running: ', pb.Percentage(), ' ', pb.Bar(), ' ', pb.ETA()] pbar = pb.ProgressBar(widgets=widgets, max_value=int(npulses * (toffset + tstim) / tstim)) pbar.start() # Loop through all pulse (ON and OFF) intervals for i in range(npulses): # Construct and initialize arrays t_pulse = t_pulse0 + t[-1] y_pulse = np.empty((nvar, n_pulse_on + n_pulse_off)) y_pulse[:, 0] = y[:, -1] # Initialize iterator k = 0 # Integrate ON system solver_full.set_f_params(ch_mech, Adrive, Fdrive, phi) solver_full.set_initial_value(y_pulse[:, k], t_pulse[k]) while solver_full.successful() and k < n_pulse_on - 1: k += 1 solver_full.integrate(t_pulse[k]) y_pulse[:, k] = solver_full.y # Integrate OFF system solver_full.set_f_params(ch_mech, 0.0, 0.0, 0.0) solver_full.set_initial_value(y_pulse[:, k], t_pulse[k]) while solver_full.successful() and k < n_pulse_on + n_pulse_off - 1: k += 1 solver_full.integrate(t_pulse[k]) y_pulse[:, k] = solver_full.y # Append pulse arrays to global arrays states = np.concatenate([states, states_pulse[1:]]) t = np.concatenate([t, t_pulse[1:]]) y = np.concatenate([y, y_pulse[:, 1:]], axis=1) # Update progress bar if logger.getEffectiveLevel() == logging.DEBUG: pbar.update(i) # Integrate offset interval if n_off > 0: t_off = np.linspace(0, toffset, n_off) + t[-1] states_off = np.zeros(n_off) y_off = np.empty((nvar, n_off)) y_off[:, 0] = y[:, -1] solver_full.set_initial_value(y_off[:, 0], t_off[0]) solver_full.set_f_params(ch_mech, 0.0, 0.0, 0.0) k = 0 while solver_full.successful() and k < n_off - 1: k += 1 solver_full.integrate(t_off[k]) y_off[:, k] = solver_full.y # Concatenate offset arrays to global arrays states = np.concatenate([states, states_off[1:]]) t = np.concatenate([t, t_off[1:]]) y = np.concatenate([y, y_off[:, 1:]], axis=1) # Terminate progress bar if logger.getEffectiveLevel() == logging.DEBUG: pbar.finish() # Downsample arrays in time-domain to reduce overall size t = t[::CLASSIC_DS_FACTOR] y = y[:, ::CLASSIC_DS_FACTOR] states = states[::CLASSIC_DS_FACTOR] # Return output variables return (t, y[1:, :], states) def __runEffective(self, ch_mech, Fdrive, Adrive, tstim, toffset, PRF, DF, dt=DT_EFF): """ Compute solutions of the system for a specific set of US stimulation parameters, using charge-predicted "effective" coefficients to solve the HH equations at each step. :param ch_mech: channels mechanism object :param Fdrive: acoustic drive frequency (Hz) :param Adrive: acoustic drive amplitude (Pa) :param tstim: duration of US stimulation (s) :param toffset: duration of the offset (s) :param PRF: pulse repetition frequency (Hz) :param DF: pulse duty factor (-) :param dt: integration time step (s) :return: 3-tuple with the time profile, the effective solution matrix and a state vector """ # Raise warnings as error warnings.filterwarnings('error') # Check lookup file existence lookup_file = '{}_lookups_a{:.1f}nm.pkl'.format(ch_mech.name, self.a * 1e9) lookup_path = '{}/{}'.format(getLookupDir(), lookup_file) assert os.path.isfile(lookup_path), ('No lookup file available for {} ' 'neuron type').format(ch_mech.name) # Load coefficients with open(lookup_path, 'rb') as fh: lookup_dict = pickle.load(fh) # Retrieve 1D inputs from lookup dictionary freqs = lookup_dict['f'] amps = lookup_dict['A'] charges = lookup_dict['Q'] # Check that stimulation parameters are within lookup range margin = 1e-9 # adding margin to compensate for eventual round error frange = (freqs.min() - margin, freqs.max() + margin) Arange = (amps.min() - margin, amps.max() + margin) assert frange[0] <= Fdrive <= frange[1], \ 'Fdrive must be within [{:.1f}, {:.1f}] kHz'.format(*[f * 1e-3 for f in frange]) assert Arange[0] <= Adrive <= Arange[1], \ 'Adrive must be within [{:.1f}, {:.1f}] kPa'.format(*[A * 1e-3 for A in Arange]) # Define interpolation datasets to be projected coeffs_list = ['V', 'ng', *ch_mech.coeff_names] # If Fdrive in lookup frequencies, simply project (A, Q) interpolation dataset # at Fdrive index onto 1D charge-based interpolation dataset if Fdrive in freqs: iFdrive = np.searchsorted(freqs, Fdrive) logger.debug('Using lookups directly at %.2f kHz', freqs[iFdrive] * 1e-3) coeffs1d = {} for cn in coeffs_list: coeff2d = np.squeeze(lookup_dict[cn][iFdrive, :, :]) itrp = interp2d(amps, charges, coeff2d.T) coeffs1d[cn] = itrp(Adrive, charges) if cn == 'ng': coeffs1d['ng0'] = itrp(0.0, charges) # Otherwise, project 2 (A, Q) interpolation datasets at Fdrive bounding values # indexes in lookup frequencies onto two 1D charge-based interpolation datasets, and # interpolate between them afterwards else: ilb = np.searchsorted(freqs, Fdrive) - 1 logger.debug('Interpolating lookups between %.2f kHz and %.2f kHz', freqs[ilb] * 1e-3, freqs[ilb + 1] * 1e-3) coeffs1d = {} for cn in coeffs_list: coeffs1d_bounds = [] ng0_bounds = [] for iFdrive in [ilb, ilb + 1]: coeff2d = np.squeeze(lookup_dict[cn][iFdrive, :, :]) itrp = interp2d(amps, charges, coeff2d.T) coeffs1d_bounds.append(itrp(Adrive, charges)) if cn == 'ng': ng0_bounds.append(itrp(0.0, charges)) coeffs1d_bounds = np.squeeze(np.array([coeffs1d_bounds])) itrp = interp2d(freqs[ilb:ilb + 2], charges, coeffs1d_bounds.T) coeffs1d[cn] = itrp(Fdrive, charges) if cn == 'ng': ng0_bounds = np.squeeze(np.array([ng0_bounds])) itrp = interp2d(freqs[ilb:ilb + 2], charges, ng0_bounds.T) coeffs1d['ng0'] = itrp(Fdrive, charges) # Squeeze interpolated vectors extra dimensions and add input charges vector coeffs1d = {key: np.squeeze(value) for key, value in coeffs1d.items()} coeffs1d['Q'] = charges # Initialize system solvers solver_on = integrate.ode(self.eqHHeff) solver_on.set_integrator('lsoda', nsteps=SOLVER_NSTEPS) solver_on.set_f_params(ch_mech, coeffs1d) solver_off = integrate.ode(self.eqHH) solver_off.set_integrator('lsoda', nsteps=SOLVER_NSTEPS) # if CW stimulus: change PRF to have exactly one integration interval during stimulus if DF == 1.0: PRF = 1 / tstim # Compute vector sizes npulses = int(np.round(PRF * tstim)) Tpulse_on = DF / PRF Tpulse_off = (1 - DF) / PRF n_pulse_on = int(np.round(Tpulse_on / dt)) + 1 n_pulse_off = int(np.round(Tpulse_off / dt)) n_off = int(np.round(toffset / dt)) # Initialize global arrays states = np.array([1]) t = np.array([0.0]) y = np.atleast_2d(np.insert(ch_mech.states0, 0, self.Qm0)).T nvar = y.shape[0] Zeff = np.array([0.0]) ngeff = np.array([self.ng0]) # Initializing accurate pulse time vector t_pulse_on = np.linspace(0, Tpulse_on, n_pulse_on) t_pulse_off = np.linspace(dt, Tpulse_off, n_pulse_off) + Tpulse_on t_pulse0 = np.concatenate([t_pulse_on, t_pulse_off]) states_pulse = np.concatenate((np.ones(n_pulse_on), np.zeros(n_pulse_off))) # Loop through all pulse (ON and OFF) intervals for i in range(npulses): # Construct and initialize arrays t_pulse = t_pulse0 + t[-1] y_pulse = np.empty((nvar, n_pulse_on + n_pulse_off)) ngeff_pulse = np.empty(n_pulse_on + n_pulse_off) Zeff_pulse = np.empty(n_pulse_on + n_pulse_off) y_pulse[:, 0] = y[:, -1] ngeff_pulse[0] = ngeff[-1] Zeff_pulse[0] = Zeff[-1] # Initialize iterator k = 0 # Integrate ON system solver_on.set_initial_value(y_pulse[:, k], t_pulse[k]) while solver_on.successful() and k < n_pulse_on - 1: k += 1 solver_on.integrate(t_pulse[k]) y_pulse[:, k] = solver_on.y ngeff_pulse[k] = np.interp(y_pulse[0, k], coeffs1d['Q'], coeffs1d['ng']) # mole Zeff_pulse[k] = self.balancedefQS(ngeff_pulse[k], y_pulse[0, k]) # m # Integrate OFF system solver_off.set_initial_value(y_pulse[:, k], t_pulse[k]) solver_off.set_f_params(ch_mech, self.Capct(Zeff_pulse[k])) while solver_off.successful() and k < n_pulse_on + n_pulse_off - 1: k += 1 solver_off.integrate(t_pulse[k]) y_pulse[:, k] = solver_off.y ngeff_pulse[k] = np.interp(y_pulse[0, k], coeffs1d['Q'], coeffs1d['ng0']) # mole Zeff_pulse[k] = self.balancedefQS(ngeff_pulse[k], y_pulse[0, k]) # m solver_off.set_f_params(ch_mech, self.Capct(Zeff_pulse[k])) # Append pulse arrays to global arrays states = np.concatenate([states[:-1], states_pulse]) t = np.concatenate([t, t_pulse[1:]]) y = np.concatenate([y, y_pulse[:, 1:]], axis=1) Zeff = np.concatenate([Zeff, Zeff_pulse[1:]]) ngeff = np.concatenate([ngeff, ngeff_pulse[1:]]) # Integrate offset interval if n_off > 0: t_off = np.linspace(0, toffset, n_off) + t[-1] states_off = np.zeros(n_off) y_off = np.empty((nvar, n_off)) ngeff_off = np.empty(n_off) Zeff_off = np.empty(n_off) y_off[:, 0] = y[:, -1] ngeff_off[0] = ngeff[-1] Zeff_off[0] = Zeff[-1] solver_off.set_initial_value(y_off[:, 0], t_off[0]) solver_off.set_f_params(ch_mech, self.Capct(Zeff_pulse[k])) k = 0 while solver_off.successful() and k < n_off - 1: k += 1 solver_off.integrate(t_off[k]) y_off[:, k] = solver_off.y ngeff_off[k] = np.interp(y_off[0, k], coeffs1d['Q'], coeffs1d['ng0']) # mole Zeff_off[k] = self.balancedefQS(ngeff_off[k], y_off[0, k]) # m solver_off.set_f_params(ch_mech, self.Capct(Zeff_off[k])) # Concatenate offset arrays to global arrays states = np.concatenate([states, states_off[1:]]) t = np.concatenate([t, t_off[1:]]) y = np.concatenate([y, y_off[:, 1:]], axis=1) Zeff = np.concatenate([Zeff, Zeff_off[1:]]) ngeff = np.concatenate([ngeff, ngeff_off[1:]]) # Add Zeff and ngeff to solution matrix y = np.vstack([Zeff, ngeff, y]) # return output variables return (t, y, states) def __runHybrid(self, ch_mech, Fdrive, Adrive, tstim, toffset, phi=np.pi): """ Compute solutions of the system for a specific set of US stimulation parameters, using a hybrid integration scheme. The first iteration uses the quasi-steady simplification to compute the initiation of motion from a flat leaflet configuration. Afterwards, the NBLS ODE system is solved iteratively for "slices" of N-microseconds, in a 2-steps scheme: - First, the full (n+3) ODE system is integrated for a few acoustic cycles until Z and ng reach a stable periodic solution (limit cycle) - Second, the signals of the 3 mechanical variables over the last acoustic period are selected and resampled to a far lower sampling rate - Third, the HH n-ODE system is integrated for the remaining time of the slice, using periodic expansion of the mechanical signals to precompute the values of capacitance. :param ch_mech: channels mechanism object :param Fdrive: acoustic drive frequency (Hz) :param Adrive: acoustic drive amplitude (Pa) :param tstim: duration of US stimulation (s) :param toffset: duration of the offset (s) :param phi: acoustic drive phase (rad) :return: 3-tuple with the time profile, the solution matrix and a state vector .. warning:: This method cannot handle pulsed stimuli """ # Raise warnings as error warnings.filterwarnings('error') # Initialize full and HH systems solvers solver_full = integrate.ode(self.eqFull) solver_full.set_f_params(ch_mech, Adrive, Fdrive, phi) solver_full.set_integrator('lsoda', nsteps=SOLVER_NSTEPS) solver_hh = integrate.ode(self.eqHH) solver_hh.set_integrator('dop853', nsteps=SOLVER_NSTEPS, atol=1e-12) # Determine full and HH systems time steps Tdrive = 1 / Fdrive dt_full = Tdrive / NPC_FULL dt_hh = Tdrive / NPC_HH n_full_per_hh = int(NPC_FULL / NPC_HH) t_full_cycle = np.linspace(0, Tdrive - dt_full, NPC_FULL) t_hh_cycle = np.linspace(0, Tdrive - dt_hh, NPC_HH) # Determine number of samples in prediction vectors npc_pred = NPC_FULL - n_full_per_hh + 1 # Solve quasi-steady equation to compute first deflection value Z0 = 0.0 ng0 = self.ng0 Qm0 = self.Qm0 Pac1 = self.Pacoustic(dt_full, Adrive, Fdrive, phi) Z1 = self.balancedefQS(ng0, Qm0, Pac1) # Initialize global arrays states = np.array([1, 1]) t = np.array([0., dt_full]) y_membrane = np.array([[0., (Z1 - Z0) / dt_full], [Z0, Z1], [ng0, ng0], [Qm0, Qm0]]) y_channels = np.tile(ch_mech.states0, (2, 1)).T y = np.vstack((y_membrane, y_channels)) nvar = y.shape[0] # Initialize progress bar if logger.getEffectiveLevel() == logging.DEBUG: widgets = ['Running: ', pb.Percentage(), ' ', pb.Bar(), ' ', pb.ETA()] pbar = pb.ProgressBar(widgets=widgets, max_value=1000) pbar.start() # For each hybrid integration interval irep = 0 sim_error = False while not sim_error and t[-1] < tstim + toffset: # Integrate full system for a few acoustic cycles until stabilization periodic_conv = False j = 0 ng_last = None Z_last = None while not sim_error and not periodic_conv: if t[-1] > tstim: solver_full.set_f_params(ch_mech, 0.0, 0.0, 0.0) t_full = t_full_cycle + t[-1] + dt_full y_full = np.empty((nvar, NPC_FULL)) y0_full = y[:, -1] solver_full.set_initial_value(y0_full, t[-1]) k = 0 try: # try to integrate and catch errors/warnings while solver_full.successful() and k <= NPC_FULL - 1: solver_full.integrate(t_full[k]) y_full[:, k] = solver_full.y - assert (y_full[1, k] > -0.5 * self.Delta), 'Deflection out of range' k += 1 except (Warning, AssertionError) as inst: sim_error = True logger.error('Full system integration error at step %u', k) - print(inst) + logger.error(inst) # Compare Z and ng signals over the last 2 acoustic periods if j > 0 and rmse(Z_last, y_full[1, :]) < Z_ERR_MAX \ and rmse(ng_last, y_full[2, :]) < NG_ERR_MAX: periodic_conv = True # Update last vectors for next comparison Z_last = y_full[1, :] ng_last = y_full[2, :] # Concatenate time and solutions to global vectors states = np.concatenate([states, np.ones(NPC_FULL)], axis=0) t = np.concatenate([t, t_full], axis=0) y = np.concatenate([y, y_full], axis=1) # Increment loop index j += 1 # Retrieve last period of the 3 mechanical variables to propagate in HH system t_last = t[-npc_pred:] mech_last = y[0:3, -npc_pred:] - # print('convergence after {} cycles'.format(j)) - # Downsample signals to specified HH system time step (_, mech_pred) = DownSample(t_last, mech_last, NPC_HH) # Integrate HH system until certain dQ or dT is reached Q0 = y[3, -1] dQ = 0.0 t0_interval = t[-1] dt_interval = 0.0 j = 0 if t[-1] < tstim: tlim = tstim else: tlim = tstim + toffset while (not sim_error and t[-1] < tlim and (np.abs(dQ) < DQ_UPDATE or dt_interval < DT_UPDATE)): t_hh = t_hh_cycle + t[-1] + dt_hh y_hh = np.empty((nvar - 3, NPC_HH)) y0_hh = y[3:, -1] solver_hh.set_initial_value(y0_hh, t[-1]) k = 0 try: # try to integrate and catch errors/warnings while solver_hh.successful() and k <= NPC_HH - 1: solver_hh.set_f_params(ch_mech, self.Capct(mech_pred[1, k])) solver_hh.integrate(t_hh[k]) y_hh[:, k] = solver_hh.y k += 1 except (Warning, AssertionError) as inst: sim_error = True logger.error('HH system integration error at step %u', k) - print(inst) + logger.error(inst) # Concatenate time and solutions to global vectors states = np.concatenate([states, np.zeros(NPC_HH)], axis=0) t = np.concatenate([t, t_hh], axis=0) y = np.concatenate([y, np.concatenate([mech_pred, y_hh], axis=0)], axis=1) # Compute charge variation from interval beginning dQ = y[3, -1] - Q0 dt_interval = t[-1] - t0_interval # Increment loop index j += 1 # Update progress bar if logger.getEffectiveLevel() == logging.DEBUG: pbar.update(int(1000 * (t[-1] / (tstim + toffset)))) irep += 1 # Terminate progress bar if logger.getEffectiveLevel() == logging.DEBUG: pbar.finish() # Return output return (t, y[1:, :], states) def run(self, ch_mech, Fdrive, Adrive, tstim, toffset, PRF=None, DF=1.0, sim_type='effective'): """ Run simulation of the system for a specific set of US stimulation parameters. :param ch_mech: channels mechanism object :param Fdrive: acoustic drive frequency (Hz) :param Adrive: acoustic drive amplitude (Pa) :param tstim: duration of US stimulation (s) :param toffset: duration of the offset (s) :param PRF: pulse repetition frequency (Hz) :param DF: pulse duty factor (-) :param sim_type: selected integration method :return: 3-tuple with the time profile, the solution matrix and a state vector """ # # Check validity of simulation type sim_types = ('classic, effective, hybrid') assert sim_type in sim_types, 'Allowed simulation types are {}'.format(sim_types) # Check validity of stimulation parameters assert isinstance(ch_mech, BaseMech), ('channel mechanism must be inherited ' 'from the BaseMech class') for param in [Fdrive, Adrive, tstim, toffset, DF]: assert isinstance(param, float), 'stimulation parameters must be float typed' assert Fdrive > 0, 'Driving frequency must be strictly positive' assert Adrive >= 0, 'Acoustic pressure amplitude must be positive' assert tstim > 0, 'Stimulus duration must be strictly positive' assert toffset >= 0, 'Stimulus offset must be positive or null' assert DF > 0 and DF <= 1, 'Duty cycle must be within [0; 1)' if DF < 1.0: assert isinstance(PRF, float), 'if provided, the PRF parameter must be float typed' assert PRF is not None, 'PRF must be provided when using duty cycles smaller than 1' assert PRF >= 1 / tstim, 'PR interval must be smaller than stimulus duration' assert PRF < Fdrive, 'PRF must be smaller than driving frequency' # Call appropriate simulation function if sim_type == 'classic': return self.__runClassic(ch_mech, Fdrive, Adrive, tstim, toffset, PRF, DF) elif sim_type == 'effective': return self.__runEffective(ch_mech, Fdrive, Adrive, tstim, toffset, PRF, DF) elif sim_type == 'hybrid': assert DF == 1.0, 'Hybrid method can only handle continuous wave stimuli' return self.__runHybrid(ch_mech, Fdrive, Adrive, tstim, toffset) diff --git a/plot/plot_batch.py b/plot/plot_batch.py index 8a0a858..33cec24 100644 --- a/plot/plot_batch.py +++ b/plot/plot_batch.py @@ -1,22 +1,30 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Date: 2017-03-20 12:19:55 # @Email: theo.lemaire@epfl.ch # @Last Modified by: Theo Lemaire -# @Last Modified time: 2017-09-01 13:45:53 +# @Last Modified time: 2017-09-10 17:04:50 """ Batch plot profiles of several specific output variables of NICE simulations. """ +import sys +import logging + from PointNICE.utils import OpenFilesDialog from PointNICE.plt import plotBatch +# Set logging options +logging.basicConfig(format='%(asctime)s %(message)s', datefmt='%d/%m/%Y %H:%M:%S:') +logger = logging.getLogger('PointNICE') +logger.setLevel(logging.INFO) + # Select data files pkl_filepaths, pkl_dir = OpenFilesDialog('pkl') if not pkl_filepaths: - print('error: no input file') - quit() + logger.error('No input file') + sys.exit(1) # Plot profiles yvars = {'Q_m': ['Qm'], 'i_{Ca}\ kin.': ['s', 'u', 's2u'], 'I': ['iNa', 'iK', 'iT', 'iL']} plotBatch(pkl_dir, pkl_filepaths, yvars) diff --git a/plot/plot_comp.py b/plot/plot_comp.py index d2e31e3..6a26c20 100644 --- a/plot/plot_comp.py +++ b/plot/plot_comp.py @@ -1,24 +1,32 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Date: 2017-02-13 12:41:26 # @Email: theo.lemaire@epfl.ch # @Last Modified by: Theo Lemaire -# @Last Modified time: 2017-09-01 16:58:41 +# @Last Modified time: 2017-09-10 17:04:55 """ Compare profiles of several specific output variables of NICE simulations. """ +import sys +import logging + from PointNICE.utils import OpenFilesDialog from PointNICE.plt import plotComp +# Set logging options +logging.basicConfig(format='%(asctime)s %(message)s', datefmt='%d/%m/%Y %H:%M:%S:') +logger = logging.getLogger('PointNICE') +logger.setLevel(logging.INFO) + # Select data files pkl_filepaths, _ = OpenFilesDialog('pkl') if not pkl_filepaths: - print('error: no input file') - quit() + logger.error('No input file') + sys.exit(1) # Comparative plot yvars = ['Qm'] plotComp(yvars, pkl_filepaths[::-1], labels=['normal last pulse', 'delayed -1ms', 'delayed +1ms']) diff --git a/setup.py b/setup.py index cdc43a7..324b078 100644 --- a/setup.py +++ b/setup.py @@ -1,45 +1,46 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Date: 2017-06-13 09:40:02 # @Email: theo.lemaire@epfl.ch # @Last Modified by: Theo Lemaire -# @Last Modified time: 2017-08-28 16:15:50 +# @Last Modified time: 2017-09-06 18:05:55 from setuptools import setup def readme(): with open('README.rst', encoding="utf8") as f: return f.read() setup(name='PointNICE', version='1.0', description='A Python framework to predict the electrical response of various neuron types\ to ultrasonic stimulation, according to the Neuronal Intramembrane Cavitation\ Excitation (NICE) model. The framework couples an optimized implementation of\ the Bilayer Sonophore (BLS) model with Hodgkin-Huxley "point-neuron" models.', long_description=readme(), url='???', classifiers=[ 'Development Status :: 4 - Beta', 'Intended Audience :: Science/Research', 'Programming Language :: Python :: 3', 'Topic :: Scientific/Engineering :: Physics' ], keywords=('ultrasound ultrasonic neuromodulation neurostimulation excitation\ biophysical model intramembrane cavitation NICE'), author='Théo Lemaire', author_email='theo.lemaire@epfl.ch', license='MIT', packages=['PointNICE'], + scripts=['sim/ESTIM_run.py', 'sim/ASTIM_run.py'], install_requires=[ 'numpy>=1.10', 'scipy>=0.17', 'matplotlib>=2', 'openpyxl>=2.4', 'pyyaml>=3.11', 'pycallgraph>=1.0.1' ], zip_safe=False)