diff --git a/PointNICE/bls.py b/PointNICE/bls.py index 9d6ac72..43fa9e5 100644 --- a/PointNICE/bls.py +++ b/PointNICE/bls.py @@ -1,630 +1,637 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Date: 2016-09-29 16:16:19 # @Email: theo.lemaire@epfl.ch # @Last Modified by: Theo Lemaire -# @Last Modified time: 2018-03-15 14:01:36 +# @Last Modified time: 2018-03-15 15:08:31 import logging import warnings import numpy as np import scipy.integrate as integrate from scipy.optimize import brentq, curve_fit from .utils import * from .constants import * # Get package logger logger = logging.getLogger('PointNICE') class BilayerSonophore: """ This class contains the geometric and mechanical parameters of the Bilayer Sonophore Model, as well as all the core functions needed to compute the dynamics (kinetics and kinematics) of the bilayer membrane cavitation, and run dynamic BLS simulations. """ # BIOMECHANICAL PARAMETERS T = 309.15 # Temperature (K) Rg = 8.314 # Universal gas constant (Pa.m^3.mol^-1.K^-1) delta0 = 2.0e-9 # Thickness of the leaflet (m) Delta_ = 1.4e-9 # Initial gap between the two leaflets on a non-charged membrane at equil. (m) pDelta = 1.0e5 # Attraction/repulsion pressure coefficient (Pa) m = 5.0 # Exponent in the repulsion term (dimensionless) n = 3.3 # Exponent in the attraction term (dimensionless) rhoL = 1075.0 # Density of the surrounding fluid (kg/m^3) muL = 7.0e-4 # Dynamic viscosity of the surrounding fluid (Pa.s) muS = 0.035 # Dynamic viscosity of the leaflet (Pa.s) kA = 0.24 # Area compression modulus of the leaflet (N/m) alpha = 7.56 # Tissue shear loss modulus frequency coefficient (Pa.s) C0 = 0.62 # Initial gas molar concentration in the surrounding fluid (mol/m^3) kH = 1.613e5 # Henry's constant (Pa.m^3/mol) P0 = 1.0e5 # Static pressure in the surrounding fluid (Pa) Dgl = 3.68e-9 # Diffusion coefficient of gas in the fluid (m^2/s) xi = 0.5e-9 # Boundary layer thickness for gas transport across leaflet (m) c = 1515.0 # Speed of sound in medium (m/s) # BIOPHYSICAL PARAMETERS epsilon0 = 8.854e-12 # Vacuum permittivity (F/m) epsilonR = 1.0 # Relative permittivity of intramembrane cavity (dimensionless) def __init__(self, diameter, Fdrive, Cm0, Qm0, embedding_depth=0.0): """ Constructor of the class. :param diameter: in-plane diameter of the sonophore structure within the membrane (m) :param Fdrive: frequency of acoustic perturbation (Hz) :param Cm0: membrane resting capacitance (F/m2) :param Qm0: membrane resting charge density (C/m2) :param embedding_depth: depth of the embedding tissue around the membrane (m) """ logger.debug('%.1f nm BLS initialization at %.2f kHz, %.2f nC/cm2', diameter * 1e9, Fdrive * 1e-3, Qm0 * 1e5) # Extract resting constants and geometry self.Cm0 = Cm0 self.Qm0 = Qm0 self.a = diameter self.d = embedding_depth self.S0 = np.pi * self.a**2 # Derive frequency-dependent tissue elastic modulus G_tissue = self.alpha * Fdrive # G'' (Pa) self.kA_tissue = 2 * G_tissue * self.d # kA of the tissue layer (N/m) # Check existence of lookups for derived parameters lookups = get_BLS_lookups(self.a) Qkey = '{:.2f}'.format(Qm0 * 1e5) # If no lookup, compute parameters and store them in lookup if not lookups or Qkey not in lookups: # Find Delta that cancels out Pm + Pec at Z = 0 (m) if self.Qm0 == 0.0: self.Delta = self.Delta_ else: (D_eq, Pnet_eq) = self.findDeltaEq(self.Qm0) assert Pnet_eq < PNET_EQ_MAX, 'High Pnet at Z = 0 with ∆ = %.2f nm' % (D_eq * 1e9) self.Delta = D_eq # Find optimal Lennard-Jones parameters to approximate PMavg (LJ_approx, std_err, _) = self.LJfitPMavg() assert std_err < PMAVG_STD_ERR_MAX, 'High error in PmAvg nonlinear fit:'\ ' std_err = %.2f Pa' % std_err self.LJ_approx = LJ_approx lookups[Qkey] = {'LJ_approx': LJ_approx, 'Delta_eq': D_eq} logger.debug('Saving BLS derived parameters to lookup file') save_BLS_lookups(self.a, lookups) # If lookup exists, load parameters from it else: logger.debug('Loading BLS derived parameters from lookup file') self.LJ_approx = lookups[Qkey]['LJ_approx'] self.Delta = lookups[Qkey]['Delta_eq'] # Compute initial volume and gas content self.V0 = np.pi * self.Delta * self.a**2 self.ng0 = self.gasPa2mol(self.P0, self.V0) def curvrad(self, Z): """ Return the (signed) instantaneous curvature radius of the leaflet. :param Z: leaflet apex outward deflection value (m) :return: leaflet curvature radius (m) """ if Z == 0.0: return np.inf else: return (self.a**2 + Z**2) / (2 * Z) def surface(self, Z): """ Return the surface area of the stretched leaflet (spherical cap). :param Z: leaflet apex outward deflection value (m) :return: surface of the stretched leaflet (m^2) """ return np.pi * (self.a**2 + Z**2) def volume(self, Z): """ Return the total volume of the inter-leaflet space (cylinder +/- spherical cap). :param Z: leaflet apex outward deflection value (m) :return: inner volume of the bilayer sonophore structure (m^3) """ return np.pi * self.a**2 * self.Delta\ * (1 + (Z / (3 * self.Delta) * (3 + Z**2 / self.a**2))) def arealstrain(self, Z): """ Compute the areal strain of the stretched leaflet. epsilon = (S - S0)/S0 = (Z/a)^2 :param Z: leaflet apex outward deflection value (m) :return: areal strain (dimensionless) """ return (Z / self.a)**2 def Capct(self, Z): """ Compute the membrane capacitance per unit area, under the assumption of parallel-plate capacitor with average inter-layer distance. :param Z: leaflet apex outward deflection value (m) :return: capacitance per unit area (F/m2) """ if Z == 0.0: return self.Cm0 else: return ((self.Cm0 * self.Delta / self.a**2) * (Z + (self.a**2 - Z**2 - Z * self.Delta) / (2 * Z) * np.log((2 * Z + self.Delta) / self.Delta))) def derCapct(self, Z, U): """ Compute the derivative of the membrane capacitance per unit area with respect to time, under the assumption of parallel-plate capacitor. :param Z: leaflet apex outward deflection value (m) :param U: leaflet apex outward deflection velocity (m/s) :return: derivative of capacitance per unit area (F/m2.s) """ dCmdZ = ((self.Cm0 * self.Delta / self.a**2) * ((Z**2 + self.a**2) / (Z * (2 * Z + self.Delta)) - ((Z**2 + self.a**2) * np.log((2 * Z + self.Delta) / self.Delta)) / (2 * Z**2))) return dCmdZ * U def localdef(self, r, Z, R): """ Compute the (signed) local transverse leaflet deviation at a distance r from the center of the dome. :param r: in-plane distance from center of the sonophore (m) :param Z: leaflet apex outward deflection value (m) :param R: leaflet curvature radius (m) :return: local transverse leaflet deviation (m) """ if np.abs(Z) == 0.0: return 0.0 else: return np.sign(Z) * (np.sqrt(R**2 - r**2) - np.abs(R) + np.abs(Z)) def Pacoustic(self, t, Adrive, Fdrive, phi=np.pi): """ Compute the acoustic pressure at a specific time, given the amplitude, frequency and phase of the acoustic stimulus. :param t: time of interest :param Adrive: acoustic drive amplitude (Pa) :param Fdrive: acoustic drive frequency (Hz) :param phi: acoustic drive phase (rad) """ return Adrive * np.sin(2 * np.pi * Fdrive * t - phi) def PMlocal(self, r, Z, R): """ Compute the local intermolecular pressure. :param r: in-plane distance from center of the sonophore (m) :param Z: leaflet apex outward deflection value (m) :param R: leaflet curvature radius (m) :return: local intermolecular pressure (Pa) """ z = self.localdef(r, Z, R) relgap = (2 * z + self.Delta) / self.Delta_ return self.pDelta * ((1 / relgap)**self.m - (1 / relgap)**self.n) def PMavg(self, Z, R, S): """ Compute the average intermolecular pressure felt across the leaflet by quadratic integration. :param Z: leaflet apex outward deflection value (m) :param R: leaflet curvature radius (m) :param S: surface of the stretched leaflet (m^2) :return: averaged intermolecular resultant pressure across the leaflet (Pa) .. warning:: quadratic integration is computationally expensive. """ # Intermolecular force over an infinitely thin ring of radius r fMring = lambda r, Z, R: 2 * np.pi * r * self.PMlocal(r, Z, R) # Integrate from 0 to a fTotal, _ = integrate.quad(fMring, 0, self.a, args=(Z, R)) return fTotal / S def LJfitPMavg(self): """ Determine optimal parameters of a Lennard-Jones expression approximating the average intermolecular pressure. These parameters are obtained by a nonlinear fit of the Lennard-Jones function for a range of deflection values between predetermined Zmin and Zmax. :return: 3-tuple with optimized LJ parameters for PmAvg prediction (Map) and the standard and max errors of the prediction in the fitting range (in Pascals) """ # Determine lower bound of deflection range: when Pm = Pmmax PMmax = LJFIT_PM_MAX # Pa Zminlb = -0.49 * self.Delta Zminub = 0.0 f = lambda Z, Pmmax: self.PMavg(Z, self.curvrad(Z), self.surface(Z)) - PMmax Zmin = brentq(f, Zminlb, Zminub, args=(PMmax), xtol=1e-16) # Create vectors for geometric variables Zmax = 2 * self.a Z = np.arange(Zmin, Zmax, 1e-11) Pmavg = np.array([self.PMavg(ZZ, self.curvrad(ZZ), self.surface(ZZ)) for ZZ in Z]) # Compute optimal nonlinear fit of custom LJ function with initial guess x0_guess = 2e-9 C_guess = 1e4 nrep_guess = 5.0 nattr_guess = 3.0 pguess = (x0_guess, C_guess, nrep_guess, nattr_guess) popt, _ = curve_fit(lambda x, x0, C, nrep, nattr: LennardJones(x, self.Delta, x0, C, nrep, nattr), Z, Pmavg, p0=pguess, maxfev=10000) (x0_opt, C_opt, nrep_opt, nattr_opt) = popt Pmavg_fit = LennardJones(Z, self.Delta, x0_opt, C_opt, nrep_opt, nattr_opt) # Compute prediction error residuals = Pmavg - Pmavg_fit ss_res = np.sum(residuals**2) N = residuals.size std_err = np.sqrt(ss_res / N) max_err = max(np.abs(residuals)) logger.debug('LJ approx: x0 = %.2f nm, C = %.2f kPa, m = %.2f, n = %.2f', x0_opt * 1e9, C_opt * 1e-3, nrep_opt, nattr_opt) LJ_approx = {"x0": x0_opt, "C": C_opt, "nrep": nrep_opt, "nattr": nattr_opt} return (LJ_approx, std_err, max_err) def PMavgpred(self, Z): """ Return the predicted intermolecular pressure based on a specific Lennard-Jones function fitted on the deflection physiological range. :param Z: leaflet apex outward deflection value (m) :return: predicted average intermolecular pressure (Pa) """ return LennardJones(Z, self.Delta, self.LJ_approx['x0'], self.LJ_approx['C'], self.LJ_approx['nrep'], self.LJ_approx['nattr']) def Pelec(self, Z, Qm): """ Compute the electric equivalent pressure term. :param Z: leaflet apex outward deflection value (m) :param Qm: membrane charge density (C/m2) :return: electric equivalent pressure (Pa) """ relS = self.S0 / self.surface(Z) abs_perm = self.epsilon0 * self.epsilonR # F/m return -relS * Qm**2 / (2 * abs_perm) # Pa def findDeltaEq(self, Qm): """ Compute the Delta that cancels out the (Pm + Pec) equation at Z = 0 for a given membrane charge density, using the Brent method to refine the pressure root iteratively. :param Qm: membrane charge density (C/m2) :return: equilibrium value (m) and associated pressure (Pa) """ f = lambda Delta: (self.pDelta * ((self.Delta_ / Delta)**self.m - (self.Delta_ / Delta)**self.n) + self.Pelec(0.0, Qm)) Delta_lb = 0.1 * self.Delta_ Delta_ub = 2.0 * self.Delta_ Delta_eq = brentq(f, Delta_lb, Delta_ub, xtol=1e-16) logger.debug('∆eq = %.2f nm', Delta_eq * 1e9) return (Delta_eq, f(Delta_eq)) def gasflux(self, Z, P): """ Compute the gas molar flux through the BLS boundary layer for an unsteady system. :param Z: leaflet apex outward deflection value (m) :param P: internal gas pressure in the inter-leaflet space (Pa) :return: gas molar flux (mol/s) """ dC = self.C0 - P / self.kH return 2 * self.surface(Z) * self.Dgl * dC / self.xi def gasmol2Pa(self, ng, V): """ Compute the gas pressure in the inter-leaflet space for an unsteady system, from the value of gas molar content. :param ng: internal molar content (mol) :param V: inner volume of the bilayer sonophore structure (m^3) :return: internal gas pressure (Pa) """ return ng * self.Rg * self.T / V def gasPa2mol(self, P, V): """ Compute the gas molar content in the inter-leaflet space for an unsteady system, from the value of internal gas pressure. :param P: internal gas pressure in the inter-leaflet space (Pa) :param V: inner volume of the bilayer sonophore structure (m^3) :return: internal gas molar content (mol) """ return P * V / (self.Rg * self.T) def PtotQS(self, Z, ng, Qm, Pac, Pm_comp_method): """ Compute the balance pressure of the quasi-steady system, upon application of an external perturbation on a charged membrane: Ptot = Pm + Pg + Pec - P0 - Pac. :param Z: leaflet apex outward deflection value (m) :param ng: internal molar content (mol) :param Qm: membrane charge density (C/m2) :param Pac: external acoustic perturbation (Pa) :param Pm_comp_method: type of method used to compute average intermolecular pressure :return: total balance pressure (Pa) """ if Pm_comp_method is PmCompMethod.direct: Pm = self.PMavg(Z, self.curvrad(Z), self.surface(Z)) elif Pm_comp_method is PmCompMethod.predict: Pm = self.PMavgpred(Z) return Pm + self.gasmol2Pa(ng, self.volume(Z)) - self.P0 - Pac + self.Pelec(Z, Qm) def balancedefQS(self, ng, Qm, Pac=0.0, Pm_comp_method=PmCompMethod.predict): """ Compute the leaflet deflection upon application of an external perturbation to a quasi-steady system with a charged membrane. This function uses the Brent method (progressive approximation of function root) to solve the following transcendental equation for Z: Pm + Pg + Pec - P0 - Pac = 0. :param ng: internal molar content (mol) :param Qm: membrane charge density (C/m2) :param Pac: external acoustic perturbation (Pa) :param Pm_comp_method: type of method used to compute average intermolecular pressure :return: leaflet deflection (Z) canceling out the balance equation """ lb = -0.49 * self.Delta ub = self.a Plb = self.PtotQS(lb, ng, Qm, Pac, Pm_comp_method) Pub = self.PtotQS(ub, ng, Qm, Pac, Pm_comp_method) assert (Plb > 0 > Pub), '[%d, %d] is not a sign changing interval for PtotQS' % (lb, ub) return brentq(self.PtotQS, lb, ub, args=(ng, Qm, Pac, Pm_comp_method), xtol=1e-16) def TEleaflet(self, Z): """ Compute the circumferential elastic tension felt across the entire leaflet upon stretching. :param Z: leaflet apex outward deflection value (m) :return: circumferential elastic tension (N/m) """ return self.kA * self.arealstrain(Z) def TEtissue(self, Z): """ Compute the circumferential elastic tension felt across the embedding viscoelastic tissue layer upon stretching. :param Z: leaflet apex outward deflection value (m) :return: circumferential elastic tension (N/m) """ return self.kA_tissue * self.arealstrain(Z) def TEtot(self, Z): """ Compute the total circumferential elastic tension (leaflet and embedding tissue) felt upon stretching. :param Z: leaflet apex outward deflection value (m) :return: circumferential elastic tension (N/m) """ return self.TEleaflet(Z) + self.TEtissue(Z) def PEtot(self, Z, R): """ Compute the total elastic tension pressure (leaflet + embedding tissue) felt upon stretching. :param Z: leaflet apex outward deflection value (m) :param R: leaflet curvature radius (m) :return: elastic tension pressure (Pa) """ return - self.TEtot(Z) / R def PVleaflet(self, U, R): """ Compute the viscous stress felt across the entire leaflet upon stretching. :param U: leaflet apex outward deflection velocity (m/s) :param R: leaflet curvature radius (m) :return: leaflet viscous stress (Pa) """ return - 12 * U * self.delta0 * self.muS / R**2 def PVfluid(self, U, R): """ Compute the viscous stress felt across the entire fluid upon stretching. :param U: leaflet apex outward deflection velocity (m/s) :param R: leaflet curvature radius (m) :return: fluid viscous stress (Pa) """ return - 4 * U * self.muL / np.abs(R) def accP(self, Pres, R): """ Compute the pressure-driven acceleration of the leaflet in the unsteady system, upon application of an external perturbation. :param Pres: net resultant pressure (Pa) :param R: leaflet curvature radius (m) :return: pressure-driven acceleration (m/s^2) """ return Pres / (self.rhoL * np.abs(R)) def accNL(self, U, R): """ Compute the non-linear term of the leaflet acceleration in the unsteady system, upon application of an external perturbation. :param U: leaflet apex outward deflection velocity (m/s) :param R: leaflet curvature radius (m) :return: nonlinear acceleration (m/s^2) .. note:: A simplified version of nonlinear acceleration (neglecting dR/dH) is used here. """ # return - (3/2 - 2*R/H) * U**2 / R return -(3 * U**2) / (2 * R) def eqMech(self, y, t, Adrive, Fdrive, Qm, phi): """ Compute the derivatives of the 3-ODE mechanical system variables, with an imposed constant charge density. :param y: vector of HH system variables at time t :param t: specific instant in time (s) :param Adrive: acoustic drive amplitude (Pa) :param Fdrive: acoustic drive frequency (Hz) :param Qm: membrane charge density (F/m2) :param phi: acoustic drive phase (rad) :return: vector of mechanical system derivatives at time t """ # Split input vector explicitly (U, Z, ng) = y # Correct deflection value is below critical compression if Z < -0.5 * self.Delta: logger.warning('Deflection out of range: Z = %.2f nm', Z * 1e9) Z = -0.5 * self.Delta # Compute curvature radius R = self.curvrad(Z) # Compute total pressure Pg = self.gasmol2Pa(ng, self.volume(Z)) Ptot = (self.PMavgpred(Z) + Pg - self.P0 - self.Pacoustic(t, Adrive, Fdrive, phi) + self.PEtot(Z, R) + self.PVleaflet(U, R) + self.PVfluid(U, R) + self.Pelec(Z, Qm)) # Compute derivatives dUdt = self.accP(Ptot, R) + self.accNL(U, R) dZdt = U dngdt = self.gasflux(Z, Pg) # Return derivatives vector return [dUdt, dZdt, dngdt] def runMech(self, Fdrive, Adrive, Qm, phi=np.pi): """ Compute short solutions of the mechanical system for specific US stimulation parameters and with an imposed membrane charge density. :param Fdrive: acoustic drive frequency (Hz) :param Adrive: acoustic drive amplitude (Pa) :param phi: acoustic drive phase (rad) :param Qm: imposed membrane charge density (C/m2) :return: 3-tuple with the time profile, the solution matrix and a state vector """ # Check validity of stimulation parameters - for param in [Fdrive, Adrive, Qm, phi]: - assert isinstance(param, float), 'stimulation parameters must be float typed' - assert Fdrive > 0, 'Driving frequency must be strictly positive' - assert Adrive >= 0, 'Acoustic pressure amplitude must be positive' - assert CHARGE_RANGE[0] <= Qm <= CHARGE_RANGE[1], ('Applied charge must be ' - 'within physiological range') - assert phi >= 0 and phi < 2 * np.pi, 'US phase must be within [0, 2 PI)' + if not all(isinstance(param, float) for param in [Fdrive, Adrive, Qm, phi]): + raise InputError('Invalid stimulation parameters (must be float typed)') + if Fdrive <= 0: + raise InputError('Invalid US driving frequency: {} kHz (must be strictly positive)' + .format(Fdrive * 1e-3)) + if Adrive < 0: + raise InputError('Invalid US pressure amplitude: {} kPa (must be positive or null)' + .format(Adrive * 1e-3)) + if Qm < CHARGE_RANGE[0] or Qm > CHARGE_RANGE[1]: + raise InputError('Invalid applied charge: {} nC/cm2 (must be within [{}, {}] interval' + .format(Qm * 1e5, CHARGE_RANGE[0] * 1e5, CHARGE_RANGE[1] * 1e5)) + if phi < 0 or phi >= 2 * np.pi: + raise InputError('Invalid US pressure phase: {:.2f} rad (must be within [0, 2 PI[ rad' + .format(phi)) # Raise warnings as error warnings.filterwarnings('error') # Determine mechanical system time step Tdrive = 1 / Fdrive dt_mech = Tdrive / NPC_FULL t_mech_cycle = np.linspace(0, Tdrive - dt_mech, NPC_FULL) # Initialize system variables t0 = 0.0 Z0 = 0.0 U0 = 0.0 ng0 = self.ng0 # Solve quasi-steady equation to compute first deflection value Pac1 = self.Pacoustic(t0 + dt_mech, Adrive, Fdrive, phi) Z1 = self.balancedefQS(ng0, Qm, Pac1, PmCompMethod.predict) U1 = (Z1 - Z0) / dt_mech # Construct arrays to hold system variables states = np.array([1, 1]) t = np.array([t0, t0 + dt_mech]) y = np.array([[U0, U1], [Z0, Z1], [ng0, ng0]]) # Integrate mechanical system for a few acoustic cycles until stabilization j = 0 ng_last = None Z_last = None periodic_conv = False try: while not periodic_conv and j < NCYCLES_MAX: t_mech = t_mech_cycle + t[-1] + dt_mech y_mech = integrate.odeint(self.eqMech, y[:, -1], t_mech, args=(Adrive, Fdrive, Qm, phi)).T # Compare Z and ng signals over the last 2 acoustic periods if j > 0: Z_rmse = rmse(Z_last, y_mech[1, :]) ng_rmse = rmse(ng_last, y_mech[2, :]) logger.debug('step %u: Z_rmse = %.2e m, ng_rmse = %.2e mol', j, Z_rmse, ng_rmse) if Z_rmse < Z_ERR_MAX and ng_rmse < NG_ERR_MAX: periodic_conv = True # Update last vectors for next comparison Z_last = y_mech[1, :] ng_last = y_mech[2, :] # Concatenate time and solutions to global vectors states = np.concatenate([states, np.ones(NPC_FULL)], axis=0) t = np.concatenate([t, t_mech], axis=0) y = np.concatenate([y, y_mech], axis=1) # Increment loop index j += 1 if j == NCYCLES_MAX: logger.warning('No convergence: stopping after %u cycles', j) else: logger.debug('Periodic convergence after %u cycles', j) except (Warning, AssertionError) as inst: logger.error('Mech. system integration error:', extra={inst}) states[-1] = 0 # return output variables return (t, y[1:, :], states) diff --git a/PointNICE/plt/pltutils.py b/PointNICE/plt/pltutils.py index 3db09d6..4b6ee3d 100644 --- a/PointNICE/plt/pltutils.py +++ b/PointNICE/plt/pltutils.py @@ -1,851 +1,860 @@ # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Date: 2017-08-23 14:55:37 # @Last Modified by: Theo Lemaire -# @Last Modified time: 2018-03-13 12:27:43 +# @Last Modified time: 2018-03-15 16:52:26 ''' Plotting utilities ''' import sys import os import pickle import ntpath import re import logging import tkinter as tk from tkinter import filedialog import numpy as np from scipy.interpolate import interp2d # from scipy.optimize import brentq import matplotlib.pyplot as plt from matplotlib.patches import Rectangle import matplotlib.cm as cm import pandas as pd from .. import neurons -from ..utils import getNeuronsDict, getLookupDir, rescale +from ..utils import getNeuronsDict, getLookupDir, rescale, InputError from ..bls import BilayerSonophore from .pltvars import pltvars # Get package logger logger = logging.getLogger('PointNICE') # Define global variables neuron = None bls = None # Regular expression for input files rgxp = re.compile('(ESTIM|ASTIM)_([A-Za-z]*)_(.*).pkl') rgxp_mech = re.compile('(MECH)_(.*).pkl') # nb = '[0-9]*[.]?[0-9]+' # rgxp_ASTIM = re.compile('(ASTIM)_(\w+)_(PW|CW)_({0})nm_({0})kHz_({0})kPa_({0})ms(.*)_(\w+).pkl'.format(nb)) # rgxp_ESTIM = re.compile('(ESTIM)_(\w+)_(PW|CW)_({0})mA_per_m2_({0})ms(.*).pkl'.format(nb)) # rgxp_PW = re.compile('_PRF({0})kHz_DC({0})_(PW|CW)_(\d+)kHz_(\d+)kPa_(\d+)ms_(.*).pkl'.format(nb)) # Figure naming conventions ESTIM_CW_title = '{} neuron: CW E-STIM {:.2f}mA/m2, {:.0f}ms' ESTIM_PW_title = '{} neuron: PW E-STIM {:.2f}mA/m2, {:.0f}ms, {:.2f}kHz PRF, {:.0f}% DC' ASTIM_CW_title = '{} neuron: CW A-STIM {:.0f}kHz, {:.0f}kPa, {:.0f}ms' ASTIM_PW_title = '{} neuron: PW A-STIM {:.0f}kHz, {:.0f}kPa, {:.0f}ms, {:.2f}kHz PRF, {:.2f}% DC' MECH_title = '{:.0f}nm BLS structure: MECH-STIM {:.0f}kHz, {:.0f}kPa' class InteractiveLegend(object): """ Class defining an interactive matplotlib legend, where lines visibility can be toggled by simply clicking on the corresponding legend label. Other graphic objects can also be associated to the toggle of a specific line Adapted from: http://stackoverflow.com/questions/31410043/hiding-lines-after-showing-a-pyplot-figure """ def __init__(self, legend, aliases): self.legend = legend self.fig = legend.axes.figure self.lookup_artist, self.lookup_handle = self._build_lookups(legend) self._setup_connections() self.handles_aliases = aliases self.update() def _setup_connections(self): for artist in self.legend.texts + self.legend.legendHandles: artist.set_picker(10) # 10 points tolerance self.fig.canvas.mpl_connect('pick_event', self.on_pick) def _build_lookups(self, legend): ''' Method of the InteractiveLegend class building the legend lookups. ''' labels = [t.get_text() for t in legend.texts] handles = legend.legendHandles label2handle = dict(zip(labels, handles)) handle2text = dict(zip(handles, legend.texts)) lookup_artist = {} lookup_handle = {} for artist in legend.axes.get_children(): if artist.get_label() in labels: handle = label2handle[artist.get_label()] lookup_handle[artist] = handle lookup_artist[handle] = artist lookup_artist[handle2text[handle]] = artist lookup_handle.update(zip(handles, handles)) lookup_handle.update(zip(legend.texts, handles)) return lookup_artist, lookup_handle def on_pick(self, event): handle = event.artist if handle in self.lookup_artist: artist = self.lookup_artist[handle] artist.set_visible(not artist.get_visible()) self.update() def update(self): for artist in self.lookup_artist.values(): handle = self.lookup_handle[artist] if artist.get_visible(): handle.set_visible(True) if artist in self.handles_aliases: for al in self.handles_aliases[artist]: al.set_visible(True) else: handle.set_visible(False) if artist in self.handles_aliases: for al in self.handles_aliases[artist]: al.set_visible(False) self.fig.canvas.draw() def show(self): ''' showing the interactive legend ''' plt.show() def getPatchesLoc(t, states): ''' Determine the location of stimulus patches. :param t: simulation time vector (s). :param states: a vector of stimulation state (ON/OFF) at each instant in time. :return: 3-tuple with number of patches, timing of STIM-ON an STIM-OFF instants. ''' # Compute states derivatives and identify bounds indexes of pulses dstates = np.diff(states) ipatch_on = np.insert(np.where(dstates > 0.0)[0] + 1, 0, 0) ipatch_off = np.where(dstates < 0.0)[0] # Get time instants for pulses ON and OFF npatches = ipatch_on.size tpatch_on = t[ipatch_on] tpatch_off = t[ipatch_off] # return 3-tuple with #patches, pulse ON and pulse OFF instants return (npatches, tpatch_on, tpatch_off) def SaveFigDialog(dirname, filename): """ Open a FileSaveDialogBox to set the directory and name of the figure to be saved. The default directory and filename are given, and the default extension is ".pdf" :param dirname: default directory :param filename: default filename :return: full path to the chosen filename """ root = tk.Tk() root.withdraw() filename_out = filedialog.asksaveasfilename(defaultextension=".pdf", initialdir=dirname, initialfile=filename) return filename_out def plotComp(yvars, filepaths, labels=None, fs=15, show_patches=True): ''' Compare profiles of several specific output variables of NICE simulations. :param yvars: list of variables names to extract and compare :param filepaths: list of full paths to output data files to be compared :param labels: list of labels to use in the legend :param fs: labels fontsize :param show_patches: boolean indicating whether to indicate periods of stimulation with colored rectangular patches ''' # check labels if given if labels: - assert len(labels) == len(filepaths), 'labels do not match number of compared files' - assert all(isinstance(x, str) for x in labels), 'labels must be string typed' + if len(labels) != len(filepaths): + raise InputError('Invalid labels ({}): not matching number of compared files ({})' + .format(len(labels), len(filepaths))) + if not all(isinstance(x, str) for x in labels): + raise InputError('Invalid labels: must be string typed') nvars = len(yvars) # y variables plotting information for key in yvars: - assert key in pltvars, 'unknown plot variable "{}"'.format(key) + if key not in pltvars: + raise InputError('Unknown plot variable: "{}"'.format(key)) y_pltvars = [pltvars[key] for key in yvars] # Dictionary of neurons neurons_dict = getNeuronsDict() # Initialize figure and axes if nvars == 1: _, ax = plt.subplots(figsize=(11, 4)) axes = [ax] else: _, axes = plt.subplots(nvars, 1, figsize=(11, min(3 * nvars, 9))) for i in range(nvars): ax = axes[i] pltvar = y_pltvars[i] if 'min' in pltvar and 'max' in pltvar: ax.set_ylim(pltvar['min'], pltvar['max']) if pltvar['unit']: ax.set_ylabel('${}\ ({})$'.format(pltvar['label'], pltvar['unit']), fontsize=fs) else: ax.set_ylabel('${}$'.format(pltvar['label']), fontsize=fs) if i < nvars - 1: ax.get_xaxis().set_ticklabels([]) else: for tick in ax.xaxis.get_major_ticks(): tick.label.set_fontsize(fs) ax.locator_params(axis='y', nbins=2) for tick in ax.yaxis.get_major_ticks(): tick.label.set_fontsize(fs) # Loop through data files j = 0 aliases = {} for filepath in filepaths: pkl_filename = ntpath.basename(filepath) # Retrieve sim type mo1 = rgxp.fullmatch(pkl_filename) mo2 = rgxp_mech.fullmatch(pkl_filename) if mo1: mo = mo1 elif mo2: mo = mo2 else: logger.error('Error: "%s" file does not match regexp pattern', pkl_filename) sys.exit(1) sim_type = mo.group(1) - assert sim_type in ['MECH', 'ASTIM', 'ESTIM'], 'invalid stimulation type' + if sim_type not in ('MECH', 'ASTIM', 'ESTIM'): + raise InputError('Invalid simulation type: {}'.format(sim_type)) if j == 0: sim_type_ref = sim_type - else: - assert sim_type == sim_type_ref, 'Error: comparing different simulation types' + elif sim_type != sim_type_ref: + raise InputError('Invalid comparison: different simulation types') # Load data logger.info('Loading data from "%s"', pkl_filename) with open(filepath, 'rb') as fh: frame = pickle.load(fh) df = frame['data'] meta = frame['meta'] # Extract variables logger.info('Extracting variables') t = df['t'].values states = df['states'].values nsamples = t.size # Initialize channel mechanism if sim_type in ['ASTIM', 'ESTIM']: neuron_name = mo.group(2) global neuron neuron = neurons_dict[neuron_name]() neuron_states = [df[sn].values for sn in neuron.states_names] Cm0 = neuron.Cm0 Qm0 = Cm0 * neuron.Vm0 * 1e-3 t_plt = pltvars['t_ms'] else: Cm0 = meta['Cm0'] Qm0 = meta['Qm0'] t_plt = pltvars['t_us'] # Initialize BLS if sim_type in ['MECH', 'ASTIM']: global bls Fdrive = meta['Fdrive'] a = meta['a'] bls = BilayerSonophore(a, Fdrive, Cm0, Qm0) # Determine patches location npatches, tpatch_on, tpatch_off = getPatchesLoc(t, states) # Adding onset to time and states vectors if t_plt['onset'] > 0.0: t = np.insert(t, 0, -t_plt['onset']) states = np.insert(states, 0, 0) # Extract variables to plot vrs = [] for i in range(nvars): pltvar = y_pltvars[i] if 'alias' in pltvar: var = eval(pltvar['alias']) elif 'key' in pltvar: var = df[pltvar['key']].values elif 'constant' in pltvar: var = eval(pltvar['constant']) * np.ones(nsamples) else: var = df[yvars[i]].values if var.size == t.size - 1: var = np.insert(var, 0, var[0]) vrs.append(var) # Legend label if labels: label = labels[j] else: if sim_type == 'ESTIM': if meta['DC'] == 1.0: label = ESTIM_CW_title.format(neuron.name, meta['Astim'], meta['tstim'] * 1e3) else: label = ESTIM_PW_title.format(neuron.name, meta['Astim'], meta['tstim'] * 1e3, meta['PRF'] * 1e-3, meta['DC'] * 1e2) elif sim_type == 'ASTIM': if meta['DC'] == 1.0: label = ASTIM_CW_title.format(neuron.name, Fdrive * 1e-3, meta['Adrive'] * 1e-3, meta['tstim'] * 1e3) else: label = ASTIM_PW_title.format(neuron.name, Fdrive * 1e-3, meta['Adrive'] * 1e-3, meta['tstim'] * 1e3, meta['PRF'] * 1e-3, meta['DC'] * 1e2) elif sim_type == 'MECH': label = MECH_title.format(a * 1e9, Fdrive * 1e-3, meta['Adrive'] * 1e-3) # Plotting handles = [axes[i].plot(t * t_plt['factor'], vrs[i] * y_pltvars[i]['factor'], linewidth=2, label=label) for i in range(nvars)] if show_patches: k = 0 # stimulation patches for ax in axes: handle = handles[k] (ybottom, ytop) = ax.get_ylim() la = [] for i in range(npatches): la.append(ax.add_patch(Rectangle((tpatch_on[i] * t_plt['factor'], ybottom), (tpatch_off[i] - tpatch_on[i]) * t_plt['factor'], ytop - ybottom, color=handle[0].get_color(), alpha=0.2))) aliases[handle[0]] = la k += 1 j += 1 # set x-axis label axes[-1].set_xlabel('${}\ ({})$'.format(t_plt['label'], t_plt['unit']), fontsize=fs) plt.tight_layout() iLegends = [] for k in range(nvars): axes[k].legend(loc=7, fontsize=fs) iLegends.append(InteractiveLegend(axes[k].legend_, aliases)) plt.show() def plotBatch(directory, filepaths, vars_dict=None, plt_show=True, plt_save=False, ask_before_save=True, fig_ext='png', tag='fig', fs=15, lw=2, title=True, show_patches=True): ''' Plot a figure with profiles of several specific NICE output variables, for several NICE simulations. :param positions: subplot indexes of each variable :param filepaths: list of full paths to output data files to be compared :param vars_dict: dict of lists of variables names to extract and plot together :param plt_show: boolean stating whether to show the created figures :param plt_save: boolean stating whether to save the created figures :param ask_before_save: boolean stating whether to show the created figures :param fig_ext: file extension for the saved figures :param tag: suffix added to the end of the figures name :param fs: labels font size :param lw: curves line width :param title: boolean stating whether to display a general title on the figures :param show_patches: boolean indicating whether to indicate periods of stimulation with colored rectangular patches ''' # Check validity of plot variables if vars_dict: yvars = list(sum(list(vars_dict.values()), [])) for key in yvars: - assert key in pltvars, 'unknown plot variable "{}"'.format(key) + if key not in pltvars: + raise InputError('Unknown plot variable: "{}"'.format(key)) # Dictionary of neurons neurons_dict = getNeuronsDict() # Loop through data files for filepath in filepaths: # Get code from file name pkl_filename = ntpath.basename(filepath) filecode = pkl_filename[0:-4] # Retrieve sim type mo1 = rgxp.fullmatch(pkl_filename) mo2 = rgxp_mech.fullmatch(pkl_filename) if mo1: mo = mo1 elif mo2: mo = mo2 else: logger.error('Error: "%s" file does not match regexp pattern', pkl_filename) sys.exit(1) sim_type = mo.group(1) - assert sim_type in ['MECH', 'ASTIM', 'ESTIM'], 'invalid stimulation type' + if sim_type not in ('MECH', 'ASTIM', 'ESTIM'): + raise InputError('Invalid simulation type: {}'.format(sim_type)) # Load data logger.info('Loading data from "%s"', pkl_filename) with open(filepath, 'rb') as fh: frame = pickle.load(fh) df = frame['data'] meta = frame['meta'] # Extract variables logger.info('Extracting variables') t = df['t'].values states = df['states'].values nsamples = t.size # Initialize channel mechanism if sim_type in ['ASTIM', 'ESTIM']: neuron_name = mo.group(2) global neuron neuron = neurons_dict[neuron_name]() neuron_states = [df[sn].values for sn in neuron.states_names] Cm0 = neuron.Cm0 Qm0 = Cm0 * neuron.Vm0 * 1e-3 t_plt = pltvars['t_ms'] else: Cm0 = meta['Cm0'] Qm0 = meta['Qm0'] t_plt = pltvars['t_us'] # Initialize BLS if sim_type in ['MECH', 'ASTIM']: global bls Fdrive = meta['Fdrive'] a = meta['a'] bls = BilayerSonophore(a, Fdrive, Cm0, Qm0) # Determine patches location npatches, tpatch_on, tpatch_off = getPatchesLoc(t, states) # Adding onset to time and states vectors if t_plt['onset'] > 0.0: t = np.insert(t, 0, -t_plt['onset']) states = np.insert(states, 0, 0) # Determine variables to plot if not provided if not vars_dict: if sim_type == 'ASTIM': vars_dict = {'Z': ['Z'], 'Q_m': ['Qm']} elif sim_type == 'ESTIM': vars_dict = {'V_m': ['Vm']} elif sim_type == 'MECH': vars_dict = {'P_{AC}': ['Pac'], 'Z': ['Z'], 'n_g': ['ng']} if sim_type in ['ASTIM', 'ESTIM'] and hasattr(neuron, 'pltvars_scheme'): vars_dict.update(neuron.pltvars_scheme) labels = list(vars_dict.keys()) naxes = len(vars_dict) # Plotting if naxes == 1: _, ax = plt.subplots(figsize=(11, 4)) axes = [ax] else: _, axes = plt.subplots(naxes, 1, figsize=(11, min(3 * naxes, 9))) for i in range(naxes): ax = axes[i] ax_pltvars = [pltvars[j] for j in vars_dict[labels[i]]] nvars = len(ax_pltvars) # X-axis if i < naxes - 1: ax.get_xaxis().set_ticklabels([]) else: ax.set_xlabel('${}\ ({})$'.format(t_plt['label'], t_plt['unit']), fontsize=fs) for tick in ax.xaxis.get_major_ticks(): tick.label.set_fontsize(fs) # Y-axis if ax_pltvars[0]['unit']: ax.set_ylabel('${}\ ({})$'.format(labels[i], ax_pltvars[0]['unit']), fontsize=fs) else: ax.set_ylabel('${}$'.format(labels[i]), fontsize=fs) if 'min' in ax_pltvars[0] and 'max' in ax_pltvars[0]: ax_min = min([ap['min'] for ap in ax_pltvars]) ax_max = max([ap['max'] for ap in ax_pltvars]) ax.set_ylim(ax_min, ax_max) ax.locator_params(axis='y', nbins=2) for tick in ax.yaxis.get_major_ticks(): tick.label.set_fontsize(fs) # Time series icolor = 0 for j in range(nvars): # Extract variable pltvar = ax_pltvars[j] if 'alias' in pltvar: var = eval(pltvar['alias']) elif 'key' in pltvar: var = df[pltvar['key']].values elif 'constant' in pltvar: var = eval(pltvar['constant']) * np.ones(nsamples) else: var = df[vars_dict[labels[i]][j]].values if var.size == t.size - 1: var = np.insert(var, 0, var[0]) # Plot variable if 'constant' in pltvar: ax.plot(t * t_plt['factor'], var * pltvar['factor'], '--', c='black', lw=lw, label='${}$'.format(pltvar['label'])) else: ax.plot(t * t_plt['factor'], var * pltvar['factor'], c='C{}'.format(icolor), lw=lw, label='${}$'.format(pltvar['label'])) icolor += 1 # Patches if show_patches == 1: (ybottom, ytop) = ax.get_ylim() for j in range(npatches): ax.add_patch(Rectangle((tpatch_on[j] * t_plt['factor'], ybottom), (tpatch_off[j] - tpatch_on[j]) * t_plt['factor'], ytop - ybottom, color='#8A8A8A', alpha=0.1)) # Legend if nvars > 1: ax.legend(fontsize=fs, loc=7, ncol=nvars // 4 + 1) # Title if title: if sim_type == 'ESTIM': if meta['DC'] == 1.0: fig_title = ESTIM_CW_title.format(neuron.name, meta['Astim'], meta['tstim'] * 1e3) else: fig_title = ESTIM_PW_title.format(neuron.name, meta['Astim'], meta['tstim'] * 1e3, meta['PRF'] * 1e-3, meta['DC'] * 1e2) elif sim_type == 'ASTIM': if meta['DC'] == 1.0: fig_title = ASTIM_CW_title.format(neuron.name, Fdrive * 1e-3, meta['Adrive'] * 1e-3, meta['tstim'] * 1e3) else: fig_title = ASTIM_PW_title.format(neuron.name, Fdrive * 1e-3, meta['Adrive'] * 1e-3, meta['tstim'] * 1e3, meta['PRF'] * 1e-3, meta['DC'] * 1e2) elif sim_type == 'MECH': fig_title = MECH_title.format(a * 1e9, Fdrive * 1e-3, meta['Adrive'] * 1e-3) axes[0].set_title(fig_title, fontsize=fs) plt.tight_layout() # Save figure if needed (automatic or checked) if plt_save: if ask_before_save: plt_filename = SaveFigDialog(directory, '{}_{}.{}'.format(filecode, tag, fig_ext)) else: plt_filename = '{}/{}_{}.{}'.format(directory, filecode, tag, fig_ext) if plt_filename: plt.savefig(plt_filename) logger.info('Saving figure as "{}"'.format(plt_filename)) plt.close() # Show all plots if needed if plt_show: plt.show() def plotGatingKinetics(neuron, fs=15): ''' Plot the voltage-dependent steady-states and time constants of activation and inactivation gates of the different ionic currents involved in a specific neuron's membrane. :param neuron: specific channel mechanism object :param fs: labels and title font size ''' # Input membrane potential vector Vm = np.linspace(-100, 50, 300) xinf_dict = {} taux_dict = {} logger.info('Computing %s neuron gating kinetics', neuron.name) names = neuron.states_names print(names) for xname in names: Vm_state = True # Names of functions of interest xinf_func_str = xname.lower() + 'inf' taux_func_str = 'tau' + xname.lower() alphax_func_str = 'alpha' + xname.lower() betax_func_str = 'beta' + xname.lower() # derx_func_str = 'der' + xname.upper() # 1st choice: use xinf and taux function if hasattr(neuron, xinf_func_str) and hasattr(neuron, taux_func_str): xinf_func = getattr(neuron, xinf_func_str) taux_func = getattr(neuron, taux_func_str) xinf = np.array([xinf_func(v) for v in Vm]) if isinstance(taux_func, float): taux = taux_func * np.ones(len(Vm)) else: taux = np.array([taux_func(v) for v in Vm]) # 2nd choice: use alphax and betax functions elif hasattr(neuron, alphax_func_str) and hasattr(neuron, betax_func_str): alphax_func = getattr(neuron, alphax_func_str) betax_func = getattr(neuron, betax_func_str) alphax = np.array([alphax_func(v) for v in Vm]) if isinstance(betax_func, float): betax = betax_func * np.ones(len(Vm)) else: betax = np.array([betax_func(v) for v in Vm]) taux = 1.0 / (alphax + betax) xinf = taux * alphax # # 3rd choice: use derX choice # elif hasattr(neuron, derx_func_str): # derx_func = getattr(neuron, derx_func_str) # xinf = brentq(lambda x: derx_func(neuron.Vm, x), 0, 1) else: Vm_state = False if not Vm_state: logger.error('no function to compute %s-state gating kinetics', xname) else: xinf_dict[xname] = xinf taux_dict[xname] = taux fig, axes = plt.subplots(2) fig.suptitle('{} neuron: gating dynamics'.format(neuron.name)) ax = axes[0] ax.get_xaxis().set_ticklabels([]) ax.set_ylabel('$X_{\infty}$', fontsize=fs) for xname in names: if xname in xinf_dict: ax.plot(Vm, xinf_dict[xname], lw=2, label='$' + xname + '_{\infty}$') ax.legend(fontsize=fs, loc=7) ax = axes[1] ax.set_xlabel('$V_m\ (mV)$', fontsize=fs) ax.set_ylabel('$\\tau_X\ (ms)$', fontsize=fs) for xname in names: if xname in taux_dict: ax.plot(Vm, taux_dict[xname] * 1e3, lw=2, label='$\\tau_{' + xname + '}$') ax.legend(fontsize=fs, loc=7) plt.show() def plotRateConstants(neuron, fs=15): ''' Plot the voltage-dependent activation and inactivation rate constants for each gate of all ionic currents involved in a specific neuron's membrane. :param neuron: specific channel mechanism object :param fs: labels and title font size ''' # Input membrane potential vector Vm = np.linspace(-250, 250, 300) alphax_dict = {} betax_dict = {} logger.info('Computing %s neuron gating kinetics', neuron.name) names = neuron.states_names for xname in names: Vm_state = True # Names of functions of interest xinf_func_str = xname.lower() + 'inf' taux_func_str = 'tau' + xname.lower() alphax_func_str = 'alpha' + xname.lower() betax_func_str = 'beta' + xname.lower() # 1st choice: use alphax and betax functions if hasattr(neuron, alphax_func_str) and hasattr(neuron, betax_func_str): alphax_func = getattr(neuron, alphax_func_str) betax_func = getattr(neuron, betax_func_str) alphax = np.array([alphax_func(v) for v in Vm]) betax = np.array([betax_func(v) for v in Vm]) # 2nd choice: use xinf and taux function elif hasattr(neuron, xinf_func_str) and hasattr(neuron, taux_func_str): xinf_func = getattr(neuron, xinf_func_str) taux_func = getattr(neuron, taux_func_str) xinf = np.array([xinf_func(v) for v in Vm]) taux = np.array([taux_func(v) for v in Vm]) alphax = xinf / taux betax = 1.0 / taux - alphax else: Vm_state = False if not Vm_state: logger.error('no function to compute %s-state gating kinetics', xname) else: alphax_dict[xname] = alphax betax_dict[xname] = betax naxes = len(alphax_dict) _, axes = plt.subplots(naxes, figsize=(11, min(3 * naxes, 9))) for i, xname in enumerate(alphax_dict.keys()): ax1 = axes[i] if i == 0: ax1.set_title('{} neuron: rate constants'.format(neuron.name)) if i == naxes - 1: ax1.set_xlabel('$V_m\ (mV)$', fontsize=fs) else: ax1.get_xaxis().set_ticklabels([]) ax1.set_ylabel('$\\alpha_{' + xname + '}\ (ms^{-1})$', fontsize=fs, color='C0') for label in ax1.get_yticklabels(): label.set_color('C0') ax1.plot(Vm, alphax_dict[xname] * 1e-3, lw=2) ax2 = ax1.twinx() ax2.set_ylabel('$\\beta_{' + xname + '}\ (ms^{-1})$', fontsize=fs, color='C1') for label in ax2.get_yticklabels(): label.set_color('C1') ax2.plot(Vm, betax_dict[xname] * 1e-3, lw=2, color='C1') plt.tight_layout() plt.show() def setGrid(n): ''' Determine number of rows and columns in figure grid, based on number of variables to plot. ''' if n <= 3: return (1, n) else: return ((n - 1) // 3 + 1, 3) def plotEffCoeffs(neuron, Fdrive, a=32e-9, fs=12): ''' Plot the profiles of all effective coefficients of a specific neuron for a given frequency. For each coefficient X, one line chart per lookup amplitude is plotted, using charge as the input variable on the abscissa and a linear color code for the amplitude value. :param neuron: channel mechanism object :param Fdrive: acoustic drive frequency (Hz) :param a: sonophore diameter (m) :param fs: figure fontsize ''' # Check lookup file existence lookup_file = '{}_lookups_a{:.1f}nm.pkl'.format(neuron.name, a * 1e9) lookup_path = '{}/{}'.format(getLookupDir(), lookup_file) - assert os.path.isfile(lookup_path), ('No lookup file available for {} ' - 'neuron type').format(neuron.name) + if not os.path.isfile(lookup_path): + raise InputError('Missing lookup file: "{}"'.format(lookup_file)) # Load coefficients with open(lookup_path, 'rb') as fh: lookup_dict = pickle.load(fh) # Retrieve 1D inputs from lookup dictionary freqs = lookup_dict['f'] amps = lookup_dict['A'] charges = lookup_dict['Q'] # Check that frequency is within lookup range margin = 1e-9 # adding margin to compensate for eventual round error frange = (freqs.min() - margin, freqs.max() + margin) - assert frange[0] <= Fdrive <= frange[1], \ - 'Fdrive must be within [{:.1f}, {:.1f}] kHz'.format(*[f * 1e-3 for f in frange]) + if Fdrive < frange[0] or Fdrive > frange[1]: + raise InputError(('Invalid frequency: {:.2f} kHz (must be within ' + + '{:.1f} kHz - {:.1f} MHz lookup interval)') + .format(Fdrive * 1e-3, frange[0] * 1e-3, frange[1] * 1e-6)) # Define coefficients list coeffs_list = ['V', 'ng', *neuron.coeff_names] AQ_coeffs = {} # If Fdrive in lookup frequencies, simply project (A, Q) dataset at that frequency if Fdrive in freqs: iFdrive = np.searchsorted(freqs, Fdrive) logger.info('Using lookups directly at %.2f kHz', freqs[iFdrive] * 1e-3) for cn in coeffs_list: AQ_coeffs[cn] = np.squeeze(lookup_dict[cn][iFdrive, :, :]) # Otherwise, project 2 (A, Q) interpolation datasets at Fdrive bounding values # indexes in lookup frequencies onto two 1D charge-based interpolation datasets, and # interpolate between them afterwards else: ilb = np.searchsorted(freqs, Fdrive) - 1 logger.info('Interpolating lookups between %.2f kHz and %.2f kHz', freqs[ilb] * 1e-3, freqs[ilb + 1] * 1e-3) for cn in coeffs_list: AQ_slice = [] for iAdrive in range(len(amps)): fQ_slice = np.squeeze(lookup_dict[cn][:, iAdrive, :]) itrp = interp2d(freqs, charges, fQ_slice.T) Q_vect = itrp(Fdrive, charges) AQ_slice.append(Q_vect) AQ_coeffs[cn] = np.squeeze(np.array([AQ_slice])) # Replace dict key AQ_coeffs['Veff'] = AQ_coeffs.pop('V') # Plotting logger.info('plotting') Amin, Amax = amps.min(), amps.max() ncoeffs = len(coeffs_list) nrows, ncols = setGrid(ncoeffs) xvar = pltvars['Qm'] mymap = cm.get_cmap('viridis') sm_amp = plt.cm.ScalarMappable(cmap=mymap, norm=plt.Normalize(Amin * 1e-3, Amax * 1e-3)) sm_amp._A = [] fig, _ = plt.subplots(figsize=(5 * ncols, 1.8 * nrows), squeeze=False) for j, cn in enumerate(AQ_coeffs.keys()): ax = plt.subplot2grid((nrows, ncols), (j // 3, j % 3)) # ax = axes[j // 3, j % 3] yvar = pltvars[cn] ax.set_xlabel('${}\ ({})$'.format(xvar['label'], xvar['unit']), fontsize=fs) ax.set_ylabel('${}\ ({})$'.format(yvar['label'], yvar['unit']), fontsize=fs) for i, Adrive in enumerate(amps): ax.plot(charges * xvar['factor'], AQ_coeffs[cn][i, :] * yvar['factor'], c=mymap(rescale(Adrive, Amin, Amax))) plt.tight_layout() fig.suptitle('{} neuron: effective coefficients @ {:.0f} kHz'.format( neuron.name, Fdrive * 1e-3)) fig.subplots_adjust(top=0.9, right=0.85) cbar_ax = fig.add_axes([0.87, 0.05, 0.02, 0.9]) fig.add_axes() fig.colorbar(sm_amp, cax=cbar_ax) cbar_ax.set_ylabel('$A_{drive} \ (kPa)$', fontsize=fs) plt.show() diff --git a/PointNICE/solvers/SolverElec.py b/PointNICE/solvers/SolverElec.py index 3ee5d0a..a122a04 100644 --- a/PointNICE/solvers/SolverElec.py +++ b/PointNICE/solvers/SolverElec.py @@ -1,150 +1,164 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Date: 2016-09-29 16:16:19 # @Email: theo.lemaire@epfl.ch # @Last Modified by: Theo Lemaire -# @Last Modified time: 2018-03-14 19:19:13 +# @Last Modified time: 2018-03-15 15:32:02 import warnings import logging import numpy as np import scipy.integrate as integrate from ..constants import * +from ..neurons import BaseMech +from ..utils import InputError # Get package logger logger = logging.getLogger('PointNICE') class SolverElec: def __init__(self): # Do nothing pass def eqHH(self, y, _, neuron, Iinj): ''' Compute the derivatives of a HH system variables for a specific value of injected current. :param y: vector of HH system variables at time t :param t: time value (s, unused) :param neuron: neuron object :param Iinj: injected current (mA/m2) :return: vector of HH system derivatives at time t ''' Vm, *states = y Iionic = neuron.currNet(Vm, states) # mA/m2 dVmdt = (- Iionic + Iinj) / neuron.Cm0 # mV/s dstates = neuron.derStates(Vm, states) return [dVmdt, *dstates] def run(self, neuron, Astim, tstim, toffset, PRF=None, DC=1.0): ''' Compute solutions of a neuron's HH system for a specific set of electrical stimulation parameters, using a classic integration scheme. :param neuron: neuron object :param Astim: pulse amplitude (mA/m2) :param tstim: pulse duration (s) :param toffset: offset duration (s) :param PRF: pulse repetition frequency (Hz) :param DC: pulse duty cycle (-) :return: 3-tuple with the time profile and solution matrix and a state vector ''' # Check validity of stimulation parameters - for param in [Astim, tstim, toffset, DC]: - assert isinstance(param, float), 'stimulation parameters must be float typed' - assert tstim > 0, 'Stimulus duration must be strictly positive' - assert toffset >= 0, 'Stimulus offset must be positive or null' - assert DC > 0 and DC <= 1, 'Duty cycle must be within [0; 1)' + if not isinstance(neuron, BaseMech): + raise InputError('Invalid neuron type: "{}" (must inherit from BaseMech class)' + .format(neuron.name)) + if not all(isinstance(param, float) for param in [Astim, tstim, toffset, DC]): + raise InputError('Invalid stimulation parameters (must be float typed)') + if tstim <= 0: + raise InputError('Invalid stimulus duration: {} ms (must be strictly positive)' + .format(tstim * 1e3)) + if toffset < 0: + raise InputError('Invalid stimulus offset: {} ms (must be positive or null)' + .format(toffset * 1e3)) + if DC <= 0.0 or DC > 1.0: + raise InputError('Invalid duty cycle: {} (must be within ]0; 1])'.format(DC)) if DC < 1.0: - assert isinstance(PRF, float), 'if provided, the PRF parameter must be float typed' - assert PRF is not None, 'PRF must be provided when using duty cycles smaller than 1' - assert PRF >= 1 / tstim, 'PR interval must be smaller than stimulus duration' + if not isinstance(PRF, float): + raise InputError('Invalid PRF value (must be float typed)') + if PRF is None: + raise InputError('Missing PRF value (must be provided when DC < 1)') + if PRF < 1 / tstim: + raise InputError('Invalid PRF: {} Hz (PR interval exceeds stimulus duration' + .format(PRF)) # Raise warnings as error warnings.filterwarnings('error') # Determine system time step dt = DT_ESTIM # if CW stimulus: divide integration during stimulus into single interval if DC == 1.0: PRF = 1 / tstim # Compute vector sizes npulses = int(np.round(PRF * tstim)) Tpulse_on = DC / PRF Tpulse_off = (1 - DC) / PRF n_pulse_on = int(np.round(Tpulse_on / dt)) n_pulse_off = int(np.round(Tpulse_off / dt)) # For high-PRF pulsed protocols: adapt time step if greater than TON or TOFF dt_warning_msg = 'high-PRF protocol: lowering integration time step to %.2e ms to match %s' if Tpulse_on > 0 and n_pulse_on == 0: logger.warning(dt_warning_msg, Tpulse_on * 1e3, 'TON') dt = Tpulse_on n_pulse_on = int(np.round(Tpulse_on / dt)) n_pulse_off = int(np.round(Tpulse_off / dt)) if Tpulse_off > 0 and n_pulse_off == 0: logger.warning(dt_warning_msg, Tpulse_off * 1e3, 'TOFF') dt = Tpulse_off n_pulse_on = int(np.round(Tpulse_on / dt)) n_pulse_off = int(np.round(Tpulse_off / dt)) # Compute offset size n_off = int(np.round(toffset / dt)) # Set initial conditions y0 = [neuron.Vm0, *neuron.states0] nvar = len(y0) # Initialize global arrays t = np.array([0.]) states = np.array([1]) y = np.array([y0]).T # Initialize pulse time and states vectors t_pulse0 = np.linspace(0, Tpulse_on + Tpulse_off, n_pulse_on + n_pulse_off) states_pulse = np.concatenate((np.ones(n_pulse_on), np.zeros(n_pulse_off))) # Loop through all pulse (ON and OFF) intervals for i in range(npulses): # Construct and initialize arrays t_pulse = t_pulse0 + t[-1] y_pulse = np.empty((nvar, n_pulse_on + n_pulse_off)) # Integrate ON system y_pulse[:, :n_pulse_on] = integrate.odeint(self.eqHH, y[:, -1], t_pulse[:n_pulse_on], args=(neuron, Astim)).T # Integrate OFF system if n_pulse_off > 0: y_pulse[:, n_pulse_on:] = integrate.odeint(self.eqHH, y_pulse[:, -1], t_pulse[n_pulse_on:], args=(neuron, 0.0, 0.0, 0.0)).T # Append pulse arrays to global arrays states = np.concatenate([states, states_pulse[1:]]) t = np.concatenate([t, t_pulse[1:]]) y = np.concatenate([y, y_pulse[:, 1:]], axis=1) # Integrate offset interval if n_off > 0: t_off = np.linspace(0, toffset, n_off) + t[-1] states_off = np.zeros(n_off) y_off = integrate.odeint(self.eqHH, y[:, -1], t_off, args=(neuron, 0.0)).T # Concatenate offset arrays to global arrays states = np.concatenate([states, states_off[1:]]) t = np.concatenate([t, t_off[1:]]) y = np.concatenate([y, y_off[:, 1:]], axis=1) # Return output variables return (t, y, states) diff --git a/PointNICE/solvers/SolverUS.py b/PointNICE/solvers/SolverUS.py index 917e26d..b19876e 100644 --- a/PointNICE/solvers/SolverUS.py +++ b/PointNICE/solvers/SolverUS.py @@ -1,780 +1,833 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Date: 2016-09-29 16:16:19 # @Email: theo.lemaire@epfl.ch # @Last Modified by: Theo Lemaire -# @Last Modified time: 2018-03-15 11:54:35 +# @Last Modified time: 2018-03-15 16:48:35 import os import warnings import pickle import logging import progressbar as pb import numpy as np import scipy.integrate as integrate from scipy.interpolate import interp2d from ..bls import BilayerSonophore from ..utils import * from ..constants import * from ..neurons import BaseMech # Get package logger logger = logging.getLogger('PointNICE') class SolverUS(BilayerSonophore): """ This class extends the BilayerSonophore class by adding a biophysical Hodgkin-Huxley model on top of the mechanical BLS model. """ def __init__(self, diameter, neuron, Fdrive, embedding_depth=0.0): """ Constructor of the class. :param diameter: in-plane diameter of the sonophore structure within the membrane (m) :param neuron: neuron object :param Fdrive: frequency of acoustic perturbation (Hz) :param embedding_depth: depth of the embedding tissue around the membrane (m) """ # Check validity of input parameters - assert isinstance(neuron, BaseMech), ('neuron mechanism must be inherited ' - 'from the BaseMech class') - assert Fdrive >= 0., 'Driving frequency must be positive' + if not isinstance(neuron, BaseMech): + raise InputError('Invalid neuron type: "{}" (must inherit from BaseMech class)' + .format(neuron.name)) + if not isinstance(Fdrive, float): + raise InputError('Invalid US driving frequency (must be float typed)') + if Fdrive < 0: + raise InputError('Invalid US driving frequency: {} kHz (must be positive or null)' + .format(Fdrive * 1e-3)) + # TODO: check parameters dictionary (float type, mandatory members) # Initialize BLS object Cm0 = neuron.Cm0 Vm0 = neuron.Vm0 BilayerSonophore.__init__(self, diameter, Fdrive, Cm0, Cm0 * Vm0 * 1e-3, embedding_depth) logger.debug('US solver initialization with %s neuron', neuron.name) def eqHH(self, y, t, neuron, Cm): """ Compute the derivatives of the n-ODE HH system variables, based on a value of membrane capacitance. :param y: vector of HH system variables at time t :param t: specific instant in time (s) :param neuron: neuron object :param Cm: membrane capacitance (F/m2) :return: vector of HH system derivatives at time t """ # Split input vector explicitly Qm, *states = y # Compute membrane potential Vm = Qm / Cm * 1e3 # mV # Compute derivatives dQm = - neuron.currNet(Vm, states) * 1e-3 # A/m2 dstates = neuron.derStates(Vm, states) # Return derivatives vector return [dQm, *dstates] def eqHH2(self, t, y, neuron, Cm): return self.eqHH(y, t, neuron, Cm) def eqFull(self, y, t, neuron, Adrive, Fdrive, phi): """ Compute the derivatives of the (n+3) ODE full NBLS system variables. :param y: vector of state variables :param t: specific instant in time (s) :param neuron: neuron object :param Adrive: acoustic drive amplitude (Pa) :param Fdrive: acoustic drive frequency (Hz) :param phi: acoustic drive phase (rad) :return: vector of derivatives """ # Compute derivatives of mechanical and electrical systems dydt_mech = self.eqMech(y[:3], t, Adrive, Fdrive, y[3], phi) dydt_elec = self.eqHH(y[3:], t, neuron, self.Capct(y[1])) # return concatenated output return dydt_mech + dydt_elec def eqFull2(self, t, y, neuron, Adrive, Fdrive, phi): return self.eqFull(y, t, neuron, Adrive, Fdrive, phi) def eqHHeff(self, t, y, neuron, interp_data): """ Compute the derivatives of the n-ODE effective HH system variables, based on 1-dimensional linear interpolation of "effective" coefficients that summarize the system's behaviour over an acoustic cycle. :param t: specific instant in time (s) :param y: vector of HH system variables at time t :param neuron: neuron object :param interp_data: dictionary of 1D data points of "effective" coefficients over the charge domain, for specific frequency and amplitude values. :return: vector of effective system derivatives at time t """ # Split input vector explicitly Qm, *states = y # Compute charge and channel states variation Vm = np.interp(Qm, interp_data['Q'], interp_data['V']) # mV dQmdt = - neuron.currNet(Vm, states) * 1e-3 dstates = neuron.derStatesEff(Qm, states, interp_data) # Return derivatives vector return [dQmdt, *dstates] def getEffCoeffs(self, neuron, Fdrive, Adrive, Qm, phi=np.pi): """ Compute "effective" coefficients of the HH system for a specific combination of stimulus frequency, stimulus amplitude and charge density. A short mechanical simulation is run while imposing the specific charge density, until periodic stabilization. The HH coefficients are then averaged over the last acoustic cycle to yield "effective" coefficients. :param neuron: neuron object :param Fdrive: acoustic drive frequency (Hz) :param Adrive: acoustic drive amplitude (Pa) :param Qm: imposed charge density (C/m2) :param phi: acoustic drive phase (rad) :return: tuple with the effective potential, gas content and channel rates """ # Run simulation and retrieve deflection and gas content vectors from last cycle (_, y, _) = self.runMech(Fdrive, Adrive, Qm, phi) (Z, ng) = y Z_last = Z[-NPC_FULL:] # m # Compute membrane potential vector Vm = np.array([Qm / self.Capct(ZZ) * 1e3 for ZZ in Z_last]) # mV # Compute average cycle value for membrane potential and rate constants Vm_eff = np.mean(Vm) # mV rates_eff = neuron.getEffRates(Vm) # Take final cycle value for gas content ng_eff = ng[-1] # mole return (Vm_eff, ng_eff, *rates_eff) def createLookup(self, neuron, freqs, amps, phi=np.pi): """ Run simulations of the mechanical system for a multiple combinations of imposed charge densities and acoustic amplitudes, compute effective coefficients and store them as 2D arrays in a lookup file. :param neuron: neuron object :param freqs: array of acoustic drive frequencies (Hz) :param amps: array of acoustic drive amplitudes (Pa) :param phi: acoustic drive phase (rad) """ # Check if lookup file already exists lookup_file = '{}_lookups_a{:.1f}nm.pkl'.format(neuron.name, self.a * 1e9) lookup_filepath = '{0}/{1}'.format(getLookupDir(), lookup_file) # assert not os.path.isfile(lookup_filepath), '"{}" file already exists'.format(lookup_file) + if os.path.isfile(lookup_filepath): + logger.warning('"%s" file already exists', lookup_file) - # Check validity of stimulation parameters - assert freqs.min() > 0, 'Driving frequencies must be strictly positive' - assert amps.min() >= 0, 'Acoustic pressure amplitudes must be positive' + # Check validity of input parameters + if not isinstance(neuron, BaseMech): + raise InputError('Invalid neuron type: "{}" (must inherit from BaseMech class)' + .format(neuron.name)) + if not isinstance(freqs, np.ndarray): + if isinstance(freqs, list): + if not all(isinstance(x, float) for x in freqs): + raise InputError('Invalid frequencies (must all be float typed)') + freqs = np.array(freqs) + else: + raise InputError('Invalid frequencies (must be provided as list or numpy array)') + if not isinstance(amps, np.ndarray): + if isinstance(amps, list): + if not all(isinstance(x, float) for x in amps): + raise InputError('Invalid amplitudes (must all be float typed)') + amps = np.array(amps) + else: + raise InputError('Invalid amplitudes (must be provided as list or numpy array)') + + nf = freqs.size + nA = amps.size + if nf == 0: + raise InputError('Empty frequencies array') + if nA == 0: + raise InputError('Empty amplitudes array') + if freqs.min() <= 0: + raise InputError('Invalid US driving frequencies (must all be strictly positive)') + if amps.min() < 0: + raise InputError('Invalid US pressure amplitudes (must all be positive or null)') logger.info('Creating lookup table for %s neuron', neuron.name) # Create neuron-specific charge vector charges = np.arange(neuron.Qbounds[0], neuron.Qbounds[1] + 1e-5, 1e-5) # C/m2 # Initialize lookup dictionary of 3D array to store effective coefficients - nf = freqs.size - nA = amps.size nQ = charges.size coeffs_names = ['V', 'ng', *neuron.coeff_names] ncoeffs = len(coeffs_names) lookup_dict = {cn: np.empty((nf, nA, nQ)) for cn in coeffs_names} # Loop through all (f, A, Q) combinations nsims = nf * nA * nQ isim = 0 log_str = 'short simulation %u/%u (f = %.2f kHz, A = %.2f kPa, Q = %.2f nC/cm2)' for i in range(nf): for j in range(nA): for k in range(nQ): isim += 1 # Run short simulation and store effective coefficients logger.info(log_str, isim, nsims, freqs[i] * 1e-3, amps[j] * 1e-3, charges[k] * 1e5) sim_coeffs = self.getEffCoeffs(neuron, freqs[i], amps[j], charges[k], phi) for icoeff in range(ncoeffs): lookup_dict[coeffs_names[icoeff]][i, j, k] = sim_coeffs[icoeff] # Add input frequency, amplitude and charge arrays to lookup dictionary lookup_dict['f'] = freqs # Hz lookup_dict['A'] = amps # Pa lookup_dict['Q'] = charges # C/m2 # Save dictionary in lookup file logger.info('Saving %s neuron lookup table in file: "%s"', neuron.name, lookup_file) with open(lookup_filepath, 'wb') as fh: pickle.dump(lookup_dict, fh) def __runClassic(self, neuron, Fdrive, Adrive, tstim, toffset, PRF, DC, phi=np.pi): """ Compute solutions of the system for a specific set of US stimulation parameters, using a classic integration scheme. The first iteration uses the quasi-steady simplification to compute the initiation of motion from a flat leaflet configuration. Afterwards, the ODE system is solved iteratively until completion. :param neuron: neuron object :param Fdrive: acoustic drive frequency (Hz) :param Adrive: acoustic drive amplitude (Pa) :param tstim: duration of US stimulation (s) :param toffset: duration of the offset (s) :param PRF: pulse repetition frequency (Hz) :param DC: pulse duty cycle (-) :param phi: acoustic drive phase (rad) :return: 3-tuple with the time profile, the effective solution matrix and a state vector """ # Raise warnings as error warnings.filterwarnings('error') # Determine system time step Tdrive = 1 / Fdrive dt = Tdrive / NPC_FULL # if CW stimulus: divide integration during stimulus into 100 intervals if DC == 1.0: PRF = 100 / tstim # Compute vector sizes npulses = int(np.round(PRF * tstim)) Tpulse_on = DC / PRF Tpulse_off = (1 - DC) / PRF n_pulse_on = int(np.round(Tpulse_on / dt)) n_pulse_off = int(np.round(Tpulse_off / dt)) n_off = int(np.round(toffset / dt)) # Solve quasi-steady equation to compute first deflection value Z0 = 0.0 ng0 = self.ng0 Qm0 = self.Qm0 Pac1 = self.Pacoustic(dt, Adrive, Fdrive, phi) Z1 = self.balancedefQS(ng0, Qm0, Pac1) # Initialize global arrays states = np.array([1, 1]) t = np.array([0., dt]) y_membrane = np.array([[0., (Z1 - Z0) / dt], [Z0, Z1], [ng0, ng0], [Qm0, Qm0]]) y_channels = np.tile(neuron.states0, (2, 1)).T y = np.vstack((y_membrane, y_channels)) nvar = y.shape[0] # Initialize pulse time and states vectors t_pulse0 = np.linspace(0, Tpulse_on + Tpulse_off, n_pulse_on + n_pulse_off) states_pulse = np.concatenate((np.ones(n_pulse_on), np.zeros(n_pulse_off))) # Initialize progress bar if logger.getEffectiveLevel() == logging.INFO: widgets = ['Running: ', pb.Percentage(), ' ', pb.Bar(), ' ', pb.ETA()] pbar = pb.ProgressBar(widgets=widgets, max_value=int(npulses * (toffset + tstim) / tstim)) pbar.start() # Loop through all pulse (ON and OFF) intervals for i in range(npulses): # Construct and initialize arrays t_pulse = t_pulse0 + t[-1] y_pulse = np.empty((nvar, n_pulse_on + n_pulse_off)) # Integrate ON system y_pulse[:, :n_pulse_on] = integrate.odeint(self.eqFull, y[:, -1], t_pulse[:n_pulse_on], args=(neuron, Adrive, Fdrive, phi)).T # Integrate OFF system if n_pulse_off > 0: y_pulse_off[:, n_pulse_on:] = integrate.odeint(self.eqFull, y_pulse[:, -1], t_pulse[n_pulse_on:], args=(neuron, 0.0, 0.0, 0.0)).T # Append pulse arrays to global arrays states = np.concatenate([states, states_pulse[1:]]) t = np.concatenate([t, t_pulse[1:]]) y = np.concatenate([y, y_pulse[:, 1:]], axis=1) # Update progress bar if logger.getEffectiveLevel() == logging.INFO: pbar.update(i) # Integrate offset interval if n_off > 0: t_off = np.linspace(0, toffset, n_off) + t[-1] states_off = np.zeros(n_off) y_off = integrate.odeint(self.eqFull, y[:, -1], t_off, args=(neuron, 0.0, 0.0, 0.0)).T # Concatenate offset arrays to global arrays states = np.concatenate([states, states_off[1:]]) t = np.concatenate([t, t_off[1:]]) y = np.concatenate([y, y_off[:, 1:]], axis=1) # Terminate progress bar if logger.getEffectiveLevel() == logging.INFO: pbar.finish() # Downsample arrays in time-domain accordgin to target temporal resolution ds_factor = int(np.round(CLASSIC_TARGET_DT / dt)) if ds_factor > 1: Fs = 1 / (dt * ds_factor) logger.info('Downsampling output arrays by factor %u (Fs = %.2f MHz)', ds_factor, Fs * 1e-6) t = t[::ds_factor] y = y[:, ::ds_factor] states = states[::ds_factor] # Return output variables return (t, y[1:, :], states) def __runEffective(self, neuron, Fdrive, Adrive, tstim, toffset, PRF, DC, dt=DT_EFF): """ Compute solutions of the system for a specific set of US stimulation parameters, using charge-predicted "effective" coefficients to solve the HH equations at each step. :param neuron: neuron object :param Fdrive: acoustic drive frequency (Hz) :param Adrive: acoustic drive amplitude (Pa) :param tstim: duration of US stimulation (s) :param toffset: duration of the offset (s) :param PRF: pulse repetition frequency (Hz) :param DC: pulse duty cycle (-) :param dt: integration time step (s) :return: 3-tuple with the time profile, the effective solution matrix and a state vector """ # Raise warnings as error warnings.filterwarnings('error') # Check lookup file existence lookup_file = '{}_lookups_a{:.1f}nm.pkl'.format(neuron.name, self.a * 1e9) lookup_path = '{}/{}'.format(getLookupDir(), lookup_file) - assert os.path.isfile(lookup_path), ('No lookup file available for {} ' - 'neuron type').format(neuron.name) + if not os.path.isfile(lookup_path): + raise InputError('Missing lookup file: "{}"'.format(lookup_file)) # Load coefficients with open(lookup_path, 'rb') as fh: lookup_dict = pickle.load(fh) # Retrieve 1D inputs from lookup dictionary freqs = lookup_dict['f'] amps = lookup_dict['A'] charges = lookup_dict['Q'] # Check that stimulation parameters are within lookup range margin = 1e-9 # adding margin to compensate for eventual round error frange = (freqs.min() - margin, freqs.max() + margin) Arange = (amps.min() - margin, amps.max() + margin) - assert frange[0] <= Fdrive <= frange[1], \ - 'Fdrive must be within [{:.1f}, {:.1f}] kHz'.format(*[f * 1e-3 for f in frange]) - assert Arange[0] <= Adrive <= Arange[1], \ - 'Adrive must be within [{:.1f}, {:.1f}] kPa'.format(*[A * 1e-3 for A in Arange]) + + if Fdrive < frange[0] or Fdrive > frange[1]: + raise InputError(('Invalid frequency: {:.2f} kHz (must be within ' + + '{:.1f} kHz - {:.1f} MHz lookup interval)') + .format(Fdrive * 1e-3, frange[0] * 1e-3, frange[1] * 1e-6)) + if Adrive < Arange[0] or Adrive > Arange[1]: + raise InputError(('Invalid amplitude: {:.2f} kPa (must be within ' + + '{:.1f} - {:.1f} kPa lookup interval)') + .format(Adrive * 1e-3, Arange[0] * 1e-3, Arange[1] * 1e-3)) # Define interpolation datasets to be projected coeffs_list = ['V', 'ng', *neuron.coeff_names] # If Fdrive in lookup frequencies, simply project (A, Q) interpolation dataset # at Fdrive index onto 1D charge-based interpolation dataset if Fdrive in freqs: iFdrive = np.searchsorted(freqs, Fdrive) logger.debug('Using lookups directly at %.2f kHz', freqs[iFdrive] * 1e-3) coeffs1d = {} for cn in coeffs_list: coeff2d = np.squeeze(lookup_dict[cn][iFdrive, :, :]) itrp = interp2d(amps, charges, coeff2d.T) coeffs1d[cn] = itrp(Adrive, charges) if cn == 'ng': coeffs1d['ng0'] = itrp(0.0, charges) # Otherwise, project 2 (A, Q) interpolation datasets at Fdrive bounding values # indexes in lookup frequencies onto two 1D charge-based interpolation datasets, and # interpolate between them afterwards else: ilb = np.searchsorted(freqs, Fdrive) - 1 logger.debug('Interpolating lookups between %.2f kHz and %.2f kHz', freqs[ilb] * 1e-3, freqs[ilb + 1] * 1e-3) coeffs1d = {} for cn in coeffs_list: coeffs1d_bounds = [] ng0_bounds = [] for iFdrive in [ilb, ilb + 1]: coeff2d = np.squeeze(lookup_dict[cn][iFdrive, :, :]) itrp = interp2d(amps, charges, coeff2d.T) coeffs1d_bounds.append(itrp(Adrive, charges)) if cn == 'ng': ng0_bounds.append(itrp(0.0, charges)) coeffs1d_bounds = np.squeeze(np.array([coeffs1d_bounds])) itrp = interp2d(freqs[ilb:ilb + 2], charges, coeffs1d_bounds.T) coeffs1d[cn] = itrp(Fdrive, charges) if cn == 'ng': ng0_bounds = np.squeeze(np.array([ng0_bounds])) itrp = interp2d(freqs[ilb:ilb + 2], charges, ng0_bounds.T) coeffs1d['ng0'] = itrp(Fdrive, charges) # Squeeze interpolated vectors extra dimensions and add input charges vector coeffs1d = {key: np.squeeze(value) for key, value in coeffs1d.items()} coeffs1d['Q'] = charges # Initialize system solvers solver_on = integrate.ode(self.eqHHeff) solver_on.set_integrator('lsoda', nsteps=SOLVER_NSTEPS) solver_on.set_f_params(neuron, coeffs1d) solver_off = integrate.ode(self.eqHH2) solver_off.set_integrator('lsoda', nsteps=SOLVER_NSTEPS) # if CW stimulus: change PRF to have exactly one integration interval during stimulus if DC == 1.0: PRF = 1 / tstim # Compute vector sizes npulses = int(np.round(PRF * tstim)) Tpulse_on = DC / PRF Tpulse_off = (1 - DC) / PRF n_pulse_on = int(np.round(Tpulse_on / dt)) + 1 n_pulse_off = int(np.round(Tpulse_off / dt)) # For high-PRF pulsed protocols: adapt time step if greater than TON or TOFF dt_warning_msg = 'high-PRF protocol: lowering integration time step to %.2e ms to match %s' if Tpulse_on > 0 and n_pulse_on == 0: logger.warning(dt_warning_msg, Tpulse_on * 1e3, 'TON') dt = Tpulse_on n_pulse_on = int(np.round(Tpulse_on / dt)) n_pulse_off = int(np.round(Tpulse_off / dt)) if Tpulse_off > 0 and n_pulse_off == 0: logger.warning(dt_warning_msg, Tpulse_off * 1e3, 'TOFF') dt = Tpulse_off n_pulse_on = int(np.round(Tpulse_on / dt)) n_pulse_off = int(np.round(Tpulse_off / dt)) # Compute ofset size n_off = int(np.round(toffset / dt)) # Initialize global arrays states = np.array([1]) t = np.array([0.0]) y = np.atleast_2d(np.insert(neuron.states0, 0, self.Qm0)).T nvar = y.shape[0] Zeff = np.array([0.0]) ngeff = np.array([self.ng0]) # Initializing accurate pulse time vector t_pulse_on = np.linspace(0, Tpulse_on, n_pulse_on) t_pulse_off = np.linspace(dt, Tpulse_off, n_pulse_off) + Tpulse_on t_pulse0 = np.concatenate([t_pulse_on, t_pulse_off]) states_pulse = np.concatenate((np.ones(n_pulse_on), np.zeros(n_pulse_off))) # Loop through all pulse (ON and OFF) intervals for i in range(npulses): # Construct and initialize arrays t_pulse = t_pulse0 + t[-1] y_pulse = np.empty((nvar, n_pulse_on + n_pulse_off)) ngeff_pulse = np.empty(n_pulse_on + n_pulse_off) Zeff_pulse = np.empty(n_pulse_on + n_pulse_off) y_pulse[:, 0] = y[:, -1] ngeff_pulse[0] = ngeff[-1] Zeff_pulse[0] = Zeff[-1] # Initialize iterator k = 0 # Integrate ON system solver_on.set_initial_value(y_pulse[:, k], t_pulse[k]) while solver_on.successful() and k < n_pulse_on - 1: k += 1 solver_on.integrate(t_pulse[k]) y_pulse[:, k] = solver_on.y ngeff_pulse[k] = np.interp(y_pulse[0, k], coeffs1d['Q'], coeffs1d['ng']) # mole Zeff_pulse[k] = self.balancedefQS(ngeff_pulse[k], y_pulse[0, k]) # m # Integrate OFF system if n_pulse_off > 0: solver_off.set_initial_value(y_pulse[:, k], t_pulse[k]) solver_off.set_f_params(neuron, self.Capct(Zeff_pulse[k])) while solver_off.successful() and k < n_pulse_on + n_pulse_off - 1: k += 1 solver_off.integrate(t_pulse[k]) y_pulse[:, k] = solver_off.y ngeff_pulse[k] = np.interp(y_pulse[0, k], coeffs1d['Q'], coeffs1d['ng0']) # mole Zeff_pulse[k] = self.balancedefQS(ngeff_pulse[k], y_pulse[0, k]) # m solver_off.set_f_params(neuron, self.Capct(Zeff_pulse[k])) # Append pulse arrays to global arrays states = np.concatenate([states[:-1], states_pulse]) t = np.concatenate([t, t_pulse[1:]]) y = np.concatenate([y, y_pulse[:, 1:]], axis=1) Zeff = np.concatenate([Zeff, Zeff_pulse[1:]]) ngeff = np.concatenate([ngeff, ngeff_pulse[1:]]) # Integrate offset interval if n_off > 0: t_off = np.linspace(0, toffset, n_off) + t[-1] states_off = np.zeros(n_off) y_off = np.empty((nvar, n_off)) ngeff_off = np.empty(n_off) Zeff_off = np.empty(n_off) y_off[:, 0] = y[:, -1] ngeff_off[0] = ngeff[-1] Zeff_off[0] = Zeff[-1] solver_off.set_initial_value(y_off[:, 0], t_off[0]) solver_off.set_f_params(neuron, self.Capct(Zeff_pulse[-1])) k = 0 while solver_off.successful() and k < n_off - 1: k += 1 solver_off.integrate(t_off[k]) y_off[:, k] = solver_off.y ngeff_off[k] = np.interp(y_off[0, k], coeffs1d['Q'], coeffs1d['ng0']) # mole Zeff_off[k] = self.balancedefQS(ngeff_off[k], y_off[0, k]) # m solver_off.set_f_params(neuron, self.Capct(Zeff_off[k])) # Concatenate offset arrays to global arrays states = np.concatenate([states, states_off[1:]]) t = np.concatenate([t, t_off[1:]]) y = np.concatenate([y, y_off[:, 1:]], axis=1) Zeff = np.concatenate([Zeff, Zeff_off[1:]]) ngeff = np.concatenate([ngeff, ngeff_off[1:]]) # Add Zeff and ngeff to solution matrix y = np.vstack([Zeff, ngeff, y]) # return output variables return (t, y, states) def __runHybrid(self, neuron, Fdrive, Adrive, tstim, toffset, phi=np.pi): """ Compute solutions of the system for a specific set of US stimulation parameters, using a hybrid integration scheme. The first iteration uses the quasi-steady simplification to compute the initiation of motion from a flat leaflet configuration. Afterwards, the NBLS ODE system is solved iteratively for "slices" of N-microseconds, in a 2-steps scheme: - First, the full (n+3) ODE system is integrated for a few acoustic cycles until Z and ng reach a stable periodic solution (limit cycle) - Second, the signals of the 3 mechanical variables over the last acoustic period are selected and resampled to a far lower sampling rate - Third, the HH n-ODE system is integrated for the remaining time of the slice, using periodic expansion of the mechanical signals to precompute the values of capacitance. :param neuron: neuron object :param Fdrive: acoustic drive frequency (Hz) :param Adrive: acoustic drive amplitude (Pa) :param tstim: duration of US stimulation (s) :param toffset: duration of the offset (s) :param phi: acoustic drive phase (rad) :return: 3-tuple with the time profile, the solution matrix and a state vector .. warning:: This method cannot handle pulsed stimuli """ # Raise warnings as error warnings.filterwarnings('error') # Initialize full and HH systems solvers solver_full = integrate.ode(self.eqFull2) solver_full.set_f_params(neuron, Adrive, Fdrive, phi) solver_full.set_integrator('lsoda', nsteps=SOLVER_NSTEPS) solver_hh = integrate.ode(self.eqHH2) solver_hh.set_integrator('dop853', nsteps=SOLVER_NSTEPS, atol=1e-12) # Determine full and HH systems time steps Tdrive = 1 / Fdrive dt_full = Tdrive / NPC_FULL dt_hh = Tdrive / NPC_HH n_full_per_hh = int(NPC_FULL / NPC_HH) t_full_cycle = np.linspace(0, Tdrive - dt_full, NPC_FULL) t_hh_cycle = np.linspace(0, Tdrive - dt_hh, NPC_HH) # Determine number of samples in prediction vectors npc_pred = NPC_FULL - n_full_per_hh + 1 # Solve quasi-steady equation to compute first deflection value Z0 = 0.0 ng0 = self.ng0 Qm0 = self.Qm0 Pac1 = self.Pacoustic(dt_full, Adrive, Fdrive, phi) Z1 = self.balancedefQS(ng0, Qm0, Pac1) # Initialize global arrays states = np.array([1, 1]) t = np.array([0., dt_full]) y_membrane = np.array([[0., (Z1 - Z0) / dt_full], [Z0, Z1], [ng0, ng0], [Qm0, Qm0]]) y_channels = np.tile(neuron.states0, (2, 1)).T y = np.vstack((y_membrane, y_channels)) nvar = y.shape[0] # Initialize progress bar if logger.getEffectiveLevel() == logging.DEBUG: widgets = ['Running: ', pb.Percentage(), ' ', pb.Bar(), ' ', pb.ETA()] pbar = pb.ProgressBar(widgets=widgets, max_value=1000) pbar.start() # For each hybrid integration interval irep = 0 sim_error = False while not sim_error and t[-1] < tstim + toffset: # Integrate full system for a few acoustic cycles until stabilization periodic_conv = False j = 0 ng_last = None Z_last = None while not sim_error and not periodic_conv: if t[-1] > tstim: solver_full.set_f_params(neuron, 0.0, 0.0, 0.0) t_full = t_full_cycle + t[-1] + dt_full y_full = np.empty((nvar, NPC_FULL)) y0_full = y[:, -1] solver_full.set_initial_value(y0_full, t[-1]) k = 0 try: # try to integrate and catch errors/warnings while solver_full.successful() and k <= NPC_FULL - 1: solver_full.integrate(t_full[k]) y_full[:, k] = solver_full.y k += 1 except (Warning, AssertionError) as inst: sim_error = True logger.error('Full system integration error at step %u', k) logger.error(inst) # Compare Z and ng signals over the last 2 acoustic periods if j > 0 and rmse(Z_last, y_full[1, :]) < Z_ERR_MAX \ and rmse(ng_last, y_full[2, :]) < NG_ERR_MAX: periodic_conv = True # Update last vectors for next comparison Z_last = y_full[1, :] ng_last = y_full[2, :] # Concatenate time and solutions to global vectors states = np.concatenate([states, np.ones(NPC_FULL)], axis=0) t = np.concatenate([t, t_full], axis=0) y = np.concatenate([y, y_full], axis=1) # Increment loop index j += 1 # Retrieve last period of the 3 mechanical variables to propagate in HH system t_last = t[-npc_pred:] mech_last = y[0:3, -npc_pred:] # Downsample signals to specified HH system time step (_, mech_pred) = DownSample(t_last, mech_last, NPC_HH) # Integrate HH system until certain dQ or dT is reached Q0 = y[3, -1] dQ = 0.0 t0_interval = t[-1] dt_interval = 0.0 j = 0 if t[-1] < tstim: tlim = tstim else: tlim = tstim + toffset while (not sim_error and t[-1] < tlim and (np.abs(dQ) < DQ_UPDATE or dt_interval < DT_UPDATE)): t_hh = t_hh_cycle + t[-1] + dt_hh y_hh = np.empty((nvar - 3, NPC_HH)) y0_hh = y[3:, -1] solver_hh.set_initial_value(y0_hh, t[-1]) k = 0 try: # try to integrate and catch errors/warnings while solver_hh.successful() and k <= NPC_HH - 1: solver_hh.set_f_params(neuron, self.Capct(mech_pred[1, k])) solver_hh.integrate(t_hh[k]) y_hh[:, k] = solver_hh.y k += 1 except (Warning, AssertionError) as inst: sim_error = True logger.error('HH system integration error at step %u', k) logger.error(inst) # Concatenate time and solutions to global vectors states = np.concatenate([states, np.zeros(NPC_HH)], axis=0) t = np.concatenate([t, t_hh], axis=0) y = np.concatenate([y, np.concatenate([mech_pred, y_hh], axis=0)], axis=1) # Compute charge variation from interval beginning dQ = y[3, -1] - Q0 dt_interval = t[-1] - t0_interval # Increment loop index j += 1 # Update progress bar if logger.getEffectiveLevel() == logging.DEBUG: pbar.update(int(1000 * (t[-1] / (tstim + toffset)))) irep += 1 # Terminate progress bar if logger.getEffectiveLevel() == logging.DEBUG: pbar.finish() # Return output return (t, y[1:, :], states) def run(self, neuron, Fdrive, Adrive, tstim, toffset, PRF=None, DC=1.0, sim_type='effective'): """ Run simulation of the system for a specific set of US stimulation parameters. :param neuron: neuron object :param Fdrive: acoustic drive frequency (Hz) :param Adrive: acoustic drive amplitude (Pa) :param tstim: duration of US stimulation (s) :param toffset: duration of the offset (s) :param PRF: pulse repetition frequency (Hz) :param DC: pulse duty cycle (-) :param sim_type: selected integration method :return: 3-tuple with the time profile, the solution matrix and a state vector """ - # # Check validity of simulation type - sim_types = ('classic, effective, hybrid') - assert sim_type in sim_types, 'Allowed simulation types are {}'.format(sim_types) + # Check validity of simulation type + if sim_type not in ('classic', 'effective', 'hybrid'): + raise InputError('Invalid integration method: "{}"'.format(sim_type)) # Check validity of stimulation parameters - assert isinstance(neuron, BaseMech), ('neuron mechanism must be inherited ' - 'from the BaseMech class') - for param in [Fdrive, Adrive, tstim, toffset, DC]: - assert isinstance(param, float), 'stimulation parameters must be float typed' - assert Fdrive > 0, 'Driving frequency must be strictly positive' - assert Adrive >= 0, 'Acoustic pressure amplitude must be positive' - assert tstim > 0, 'Stimulus duration must be strictly positive' - assert toffset >= 0, 'Stimulus offset must be positive or null' - assert DC > 0 and DC <= 1, 'Duty cycle must be within [0; 1)' + if not isinstance(neuron, BaseMech): + raise InputError('Invalid neuron type: "{}" (must inherit from BaseMech class)' + .format(neuron.name)) + if not all(isinstance(param, float) for param in [Fdrive, Adrive, tstim, toffset, DC]): + raise InputError('Invalid stimulation parameters (must be float typed)') + if Fdrive <= 0: + raise InputError('Invalid US driving frequency: {} kHz (must be strictly positive)' + .format(Fdrive * 1e-3)) + if Adrive < 0: + raise InputError('Invalid US pressure amplitude: {} kPa (must be positive or null)' + .format(Adrive * 1e-3)) + if tstim <= 0: + raise InputError('Invalid stimulus duration: {} ms (must be strictly positive)' + .format(tstim * 1e3)) + if toffset < 0: + raise InputError('Invalid stimulus offset: {} ms (must be positive or null)' + .format(toffset * 1e3)) + if DC <= 0.0 or DC > 1.0: + raise InputError('Invalid duty cycle: {} (must be within ]0; 1])'.format(DC)) if DC < 1.0: - assert isinstance(PRF, float), 'if provided, the PRF parameter must be float typed' - assert PRF is not None, 'PRF must be provided when using duty cycles smaller than 1' - assert PRF >= 1 / tstim, 'PR interval must be smaller than stimulus duration' - assert PRF < Fdrive, 'PRF must be smaller than driving frequency' + if not isinstance(PRF, float): + raise InputError('Invalid PRF value (must be float typed)') + if PRF is None: + raise InputError('Missing PRF value (must be provided when DC < 1)') + if PRF < 1 / tstim: + raise InputError('Invalid PRF: {} Hz (PR interval exceeds stimulus duration' + .format(PRF)) + if PRF >= Fdrive: + raise InputError('Invalid PRF: {} Hz (must be smaller than driving frequency)' + .format(PRF)) # Call appropriate simulation function if sim_type == 'classic': return self.__runClassic(neuron, Fdrive, Adrive, tstim, toffset, PRF, DC) elif sim_type == 'effective': return self.__runEffective(neuron, Fdrive, Adrive, tstim, toffset, PRF, DC) elif sim_type == 'hybrid': - assert DC == 1.0, 'Hybrid method can only handle continuous wave stimuli' + if DC < 1.0: + raise InputError('Pulsed protocol incompatible with hybrid integration method') return self.__runHybrid(neuron, Fdrive, Adrive, tstim, toffset) - diff --git a/PointNICE/solvers/simutils.py b/PointNICE/solvers/simutils.py index 2326002..e2670af 100644 --- a/PointNICE/solvers/simutils.py +++ b/PointNICE/solvers/simutils.py @@ -1,1166 +1,1183 @@ # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Date: 2017-08-22 14:33:04 # @Last Modified by: Theo Lemaire -# @Last Modified time: 2018-03-14 18:20:06 +# @Last Modified time: 2018-03-15 17:01:53 """ Utility functions used in simulations """ import os import time import logging import pickle import shutil import tkinter as tk from tkinter import filedialog import numpy as np from openpyxl import load_workbook import lockfile import pandas as pd from ..bls import BilayerSonophore from .SolverUS import SolverUS from .SolverElec import SolverElec from ..constants import * -from ..utils import getNeuronsDict +from ..utils import getNeuronsDict, InputError # Get package logger logger = logging.getLogger('PointNICE') # Naming nomenclature for output files MECH_code = 'MECH_{:.0f}nm_{:.0f}kHz_{:.0f}kPa_{:.1f}nCcm2' ESTIM_CW_code = 'ESTIM_{}_CW_{:.1f}mA_per_m2_{:.0f}ms' ESTIM_PW_code = 'ESTIM_{}_PW_{:.1f}mA_per_m2_{:.0f}ms_PRF{:.2f}kHz_DC{:.2f}' ASTIM_CW_code = 'ASTIM_{}_CW_{:.0f}nm_{:.0f}kHz_{:.0f}kPa_{:.0f}ms_{}' ASTIM_PW_code = 'ASTIM_{}_PW_{:.0f}nm_{:.0f}kHz_{:.0f}kPa_{:.0f}ms_PRF{:.2f}kHz_DC{:.2f}_{}' # Parameters units ASTIM_params = { 'f': {'index': 0, 'factor': 1e-3, 'unit': 'kHz'}, 'A': {'index': 1, 'factor': 1e-3, 'unit': 'kPa'}, 't': {'index': 2, 'factor': 1e3, 'unit': 'ms'}, 'PRF': {'index': 4, 'factor': 1e-3, 'unit': 'kHz'}, 'DC': {'index': 5, 'factor': 1e2, 'unit': '%'} } ESTIM_params = { 'A': {'index': 0, 'factor': 1e0, 'unit': 'mA/m2'}, 't': {'index': 1, 'factor': 1e3, 'unit': 'ms'}, 'PRF': {'index': 3, 'factor': 1e-3, 'unit': 'kHz'}, 'DC': {'index': 4, 'factor': 1e2, 'unit': '%'} } # Default geometry default_diam = 32e-9 default_embedding = 0.0e-6 def setBatchDir(): ''' Select batch directory for output files.α :return: full path to batch directory ''' root = tk.Tk() root.withdraw() batch_dir = filedialog.askdirectory() - assert batch_dir, 'No batch directory chosen' + if not batch_dir: + raise InputError('No output directory chosen') return batch_dir def checkBatchLog(batch_dir, batch_type): ''' Check for appropriate log file in batch directory, and create one if it is absent. :param batch_dir: full path to batch directory :param batch_type: type of simulation batch :return: 2 tuple with full path to log file and boolean stating if log file was created ''' + # Check for directory existence + if not os.path.isdir(batch_dir): + raise InputError('"{}" output directory does not exist'.format(batch_dir)) + # Determine log template from batch type if batch_type == 'MECH': logfile = 'log_MECH.xlsx' elif batch_type == 'A-STIM': logfile = 'log_ASTIM.xlsx' elif batch_type == 'E-STIM': logfile = 'log_ESTIM.xlsx' else: - raise ValueError('Unknown batch type', batch_type) + raise InputError('Unknown batch type', batch_type) # Get template in package subdirectory this_dir, _ = os.path.split(__file__) parent_dir = os.path.abspath(os.path.join(this_dir, os.pardir)) logsrc = parent_dir + '/templates/' + logfile assert os.path.isfile(logsrc), 'template log file "{}" not found'.format(logsrc) # Copy template in batch directory if no appropriate log file logdst = batch_dir + '/' + logfile is_log = os.path.isfile(logdst) if not is_log: shutil.copy2(logsrc, logdst) return (logdst, not is_log) def createSimQueue(amps, durations, offsets, PRFs, DCs): ''' Create a serialized 2D array of all parameter combinations for a series of individual parameter sweeps, while avoiding repetition of CW protocols for a given PRF sweep. :param amps: list (or 1D-array) of acoustic amplitudes :param durations: list (or 1D-array) of stimulus durations :param offsets: list (or 1D-array) of stimulus offsets (paired with durations array) :param PRFs: list (or 1D-array) of pulse-repetition frequencies :param DCs: list (or 1D-array) of duty cycle values :return: 2D-array with (amplitude, duration, offset, PRF, DC) for each stimulation protocol ''' # Convert input to 1D-arrays amps = np.array(amps) durations = np.array(durations) offsets = np.array(offsets) PRFs = np.array(PRFs) DCs = np.array(DCs) # Create index arrays iamps = range(len(amps)) idurs = range(len(durations)) # Create empty output matrix queue = np.empty((1, 5)) # Continuous protocols if 1.0 in DCs: nCW = len(amps) * len(durations) arr1 = np.ones(nCW) iCW_queue = np.array(np.meshgrid(iamps, idurs)).T.reshape(nCW, 2) CW_queue = np.vstack((amps[iCW_queue[:, 0]], durations[iCW_queue[:, 1]], offsets[iCW_queue[:, 1]], PRFs.min() * arr1, arr1)).T queue = np.vstack((queue, CW_queue)) # Pulsed protocols if np.any(DCs != 1.0): pulsed_DCs = DCs[DCs != 1.0] iPRFs = range(len(PRFs)) ipulsed_DCs = range(len(pulsed_DCs)) nPW = len(amps) * len(durations) * len(PRFs) * len(pulsed_DCs) iPW_queue = np.array(np.meshgrid(iamps, idurs, iPRFs, ipulsed_DCs)).T.reshape(nPW, 4) PW_queue = np.vstack((amps[iPW_queue[:, 0]], durations[iPW_queue[:, 1]], offsets[iPW_queue[:, 1]], PRFs[iPW_queue[:, 2]], pulsed_DCs[iPW_queue[:, 3]])).T queue = np.vstack((queue, PW_queue)) # Return return queue[1:, :] def xlslog(filename, sheetname, data): """ Append log data on a new row to specific sheet of excel workbook, using a lockfile to avoid read/write errors between concurrent processes. :param filename: absolute or relative path to the Excel workbook :param sheetname: name of the Excel spreadsheet to which data is appended :param data: data structure to be added to specific columns on a new row :return: boolean indicating success (1) or failure (0) of operation """ try: lock = lockfile.FileLock(filename) lock.acquire() wb = load_workbook(filename) ws = wb[sheetname] keys = data.keys() i = 1 row_data = {} for k in keys: row_data[k] = data[k] i += 1 ws.append(row_data) wb.save(filename) lock.release() return 1 except PermissionError: # If file cannot be accessed for writing because already opened logger.warning('Cannot write to "%s". Close the file and type "Y"', filename) user_str = input() if user_str in ['y', 'Y']: return xlslog(filename, sheetname, data) else: return 0 def detectPeaks(x, mph=None, mpd=1, threshold=0, edge='rising', kpsh=False, valley=False, ax=None): """ Detect peaks in data based on their amplitude and inter-peak distance. """ x = np.atleast_1d(x).astype('float64') if x.size < 3: return np.array([], dtype=int) if valley: x = -x # find indices of all peaks dx = x[1:] - x[:-1] # handle NaN's indnan = np.where(np.isnan(x))[0] if indnan.size: x[indnan] = np.inf dx[np.where(np.isnan(dx))[0]] = np.inf ine, ire, ife = np.array([[], [], []], dtype=int) if not edge: ine = np.where((np.hstack((dx, 0)) < 0) & (np.hstack((0, dx)) > 0))[0] else: if edge.lower() in ['rising', 'both']: ire = np.where((np.hstack((dx, 0)) <= 0) & (np.hstack((0, dx)) > 0))[0] if edge.lower() in ['falling', 'both']: ife = np.where((np.hstack((dx, 0)) < 0) & (np.hstack((0, dx)) >= 0))[0] ind = np.unique(np.hstack((ine, ire, ife))) # handle NaN's if ind.size and indnan.size: # NaN's and values close to NaN's cannot be peaks ind = ind[np.in1d(ind, np.unique(np.hstack((indnan, indnan - 1, indnan + 1))), invert=True)] # first and last values of x cannot be peaks if ind.size and ind[0] == 0: ind = ind[1:] if ind.size and ind[-1] == x.size - 1: ind = ind[:-1] # remove peaks < minimum peak height if ind.size and mph is not None: ind = ind[x[ind] >= mph] # remove peaks - neighbors < threshold if ind.size and threshold > 0: dx = np.min(np.vstack([x[ind] - x[ind - 1], x[ind] - x[ind + 1]]), axis=0) ind = np.delete(ind, np.where(dx < threshold)[0]) # detect small peaks closer than minimum peak distance if ind.size and mpd > 1: ind = ind[np.argsort(x[ind])][::-1] # sort ind by peak height idel = np.zeros(ind.size, dtype=bool) for i in range(ind.size): if not idel[i]: # keep peaks with the same height if kpsh is True idel = idel | (ind >= ind[i] - mpd) & (ind <= ind[i] + mpd) \ & (x[ind[i]] > x[ind] if kpsh else True) idel[i] = 0 # Keep current peak # remove the small peaks and sort back the indices by their occurrence ind = np.sort(ind[~idel]) return ind def detectPeaksTime(t, y, mph, mtd): """ Extension of the detectPeaks function to detect peaks in data based on their amplitude and time difference, with a non-uniform time vector. :param t: time vector (not necessarily uniform) :param y: signal :param mph: minimal peak height :param mtd: minimal time difference :return: array of peak indexes """ # Detect peaks on signal with no restriction on inter-peak distance raw_indexes = detectPeaks(y, mph, mpd=1) if raw_indexes.size > 0: # Filter relevant peaks with temporal distance n_raw = raw_indexes.size filtered_indexes = np.array([raw_indexes[0]]) for i in range(1, n_raw): i1 = filtered_indexes[-1] i2 = raw_indexes[i] if t[i2] - t[i1] < mtd: if y[i2] > y[i1]: filtered_indexes[-1] = i2 else: filtered_indexes = np.append(filtered_indexes, i2) # Return peak indexes return filtered_indexes else: return None def detectSpikes(t, Qm, min_amp, min_dt): ''' Detect spikes on a charge density signal, and return their number, latency and rate. :param t: time vector (s) :param Qm: charge density vector (C/m2) :param min_amp: minimal charge amplitude to detect spikes (C/m2) :param min_dt: minimal time interval between 2 spikes (s) :return: 3-tuple with number of spikes, latency (s) and spike rate (sp/s) ''' i_spikes = detectPeaksTime(t, Qm, min_amp, min_dt) if i_spikes is not None: latency = t[i_spikes[0]] # s n_spikes = i_spikes.size if n_spikes > 1: first_to_last_spike = t[i_spikes[-1]] - t[i_spikes[0]] # s spike_rate = (n_spikes - 1) / first_to_last_spike # spikes/s else: spike_rate = 'N/A' else: latency = 'N/A' spike_rate = 'N/A' n_spikes = 0 return (n_spikes, latency, spike_rate) def runEStim(batch_dir, log_filepath, solver, neuron, Astim, tstim, toffset, PRF, DC): ''' Run a single E-STIM simulation a given neuron for specific stimulation parameters, and save the results in a PKL file. :param batch_dir: full path to output directory of batch :param log_filepath: full path log file of batch :param solver: SolverElec instance :param Astim: pulse amplitude (mA/m2) :param tstim: pulse duration (s) :param toffset: offset duration (s) :param PRF: pulse repetition frequency (Hz) :param DC: pulse duty cycle (-) :return: full path to the output file ''' if DC == 1.0: simcode = ESTIM_CW_code.format(neuron.name, Astim, tstim * 1e3) else: simcode = ESTIM_PW_code.format(neuron.name, Astim, tstim * 1e3, PRF * 1e-3, DC) # Get date and time info date_str = time.strftime("%Y.%m.%d") daytime_str = time.strftime("%H:%M:%S") # Run simulation tstart = time.time() (t, y, states) = solver.run(neuron, Astim, tstim, toffset, PRF, DC) Vm, *channels = y tcomp = time.time() - tstart logger.debug('completed in %.2f seconds', tcomp) # Store dataframe and metadata df = pd.DataFrame({'t': t, 'states': states, 'Vm': Vm}) for j in range(len(neuron.states_names)): df[neuron.states_names[j]] = channels[j] meta = {'neuron': neuron.name, 'Astim': Astim, 'tstim': tstim, 'toffset': toffset, 'PRF': PRF, 'DC': DC, 'tcomp': tcomp} # Export into to PKL file output_filepath = '{}/{}.pkl'.format(batch_dir, simcode) with open(output_filepath, 'wb') as fh: pickle.dump({'meta': meta, 'data': df}, fh) logger.debug('simulation data exported to "%s"', output_filepath) # Detect spikes on Vm signal n_spikes, lat, sr = detectSpikes(t, Vm, SPIKE_MIN_VAMP, SPIKE_MIN_DT) logger.debug('%u spike%s detected', n_spikes, "s" if n_spikes > 1 else "") # Export key metrics to log file log = { 'A': date_str, 'B': daytime_str, 'C': neuron.name, 'D': Astim, 'E': tstim * 1e3, 'F': PRF * 1e-3 if DC < 1 else 'N/A', 'G': DC, 'H': t.size, 'I': round(tcomp, 2), 'J': n_spikes, 'K': lat * 1e3 if isinstance(lat, float) else 'N/A', 'L': sr * 1e-3 if isinstance(sr, float) else 'N/A' } if xlslog(log_filepath, 'Data', log) == 1: logger.debug('log exported to "%s"', log_filepath) else: logger.error('log export to "%s" aborted', log_filepath) return output_filepath def runEStimBatch(batch_dir, log_filepath, neurons, stim_params): ''' Run batch E-STIM simulations of the system for various neuron types and stimulation parameters. :param batch_dir: full path to output directory of batch :param log_filepath: full path log file of batch :param neurons: list of neurons names :param stim_params: dictionary containing sweeps for all stimulation parameters :return: list of full paths to the output files ''' mandatory_params = ['amps', 'durations', 'offsets', 'PRFs', 'DCs'] for mp in mandatory_params: - assert mp in stim_params, 'stim_params dictionary must contain "{}" field'.format(mp) + if mp not in stim_params: + raise InputError('Missing stimulation parameter field: "{}"'.format(mp)) # Define logging format ESTIM_CW_log = 'E-STIM simulation %u/%u: %s neuron, A = %.1f mA/m2, t = %.1f ms' ESTIM_PW_log = ('E-STIM simulation %u/%u: %s neuron, A = %.1f mA/m2, t = %.1f ms, ' 'PRF = %.2f kHz, DC = %.2f') logger.info("Starting E-STIM simulation batch") # Generate simulations queue sim_queue = createSimQueue(stim_params['amps'], stim_params['durations'], stim_params['offsets'], stim_params['PRFs'], stim_params['DCs']) nqueue = sim_queue.shape[0] # Initialize solver solver = SolverElec() # Run simulations nsims = len(neurons) * nqueue simcount = 0 filepaths = [] for nname in neurons: neuron = getNeuronsDict()[nname]() for i in range(nqueue): simcount += 1 Astim, tstim, toffset, PRF, DC = sim_queue[i, :] if DC == 1.0: logger.info(ESTIM_CW_log, simcount, nsims, neuron.name, Astim, tstim * 1e3) else: logger.info(ESTIM_PW_log, simcount, nsims, neuron.name, Astim, tstim * 1e3, PRF * 1e-3, DC) output_filepath = runEStim(batch_dir, log_filepath, solver, neuron, Astim, tstim, toffset, PRF, DC) filepaths.append(output_filepath) return filepaths def runAStim(batch_dir, log_filepath, solver, neuron, Fdrive, Adrive, tstim, toffset, PRF, DC, int_method='effective'): ''' Run a single A-STIM simulation a given neuron for specific stimulation parameters, and save the results in a PKL file. :param batch_dir: full path to output directory of batch :param log_filepath: full path log file of batch :param solver: SolverUS instance :param Fdrive: acoustic drive frequency (Hz) :param Adrive: acoustic drive amplitude (Pa) :param tstim: duration of US stimulation (s) :param toffset: duration of the offset (s) :param PRF: pulse repetition frequency (Hz) :param DC: pulse duty cycle (-) :param int_method: selected integration method :return: full path to the output file ''' if DC == 1.0: simcode = ASTIM_CW_code.format(neuron.name, solver.a * 1e9, Fdrive * 1e-3, Adrive * 1e-3, tstim * 1e3, int_method) else: simcode = ASTIM_PW_code.format(neuron.name, solver.a * 1e9, Fdrive * 1e-3, Adrive * 1e-3, tstim * 1e3, PRF * 1e-3, DC, int_method) # Get date and time info date_str = time.strftime("%Y.%m.%d") daytime_str = time.strftime("%H:%M:%S") # Run simulation tstart = time.time() (t, y, states) = solver.run(neuron, Fdrive, Adrive, tstim, toffset, PRF, DC, int_method) Z, ng, Qm, *channels = y U = np.insert(np.diff(Z) / np.diff(t), 0, 0.0) tcomp = time.time() - tstart logger.debug('completed in %.2f seconds', tcomp) # Store dataframe and metadata df = pd.DataFrame({'t': t, 'states': states, 'U': U, 'Z': Z, 'ng': ng, 'Qm': Qm, 'Vm': Qm * 1e3 / np.array([solver.Capct(ZZ) for ZZ in Z])}) for j in range(len(neuron.states_names)): df[neuron.states_names[j]] = channels[j] meta = {'neuron': neuron.name, 'a': solver.a, 'd': solver.d, 'Fdrive': Fdrive, 'Adrive': Adrive, 'phi': np.pi, 'tstim': tstim, 'toffset': toffset, 'PRF': PRF, 'DC': DC, 'tcomp': tcomp} # Export into to PKL file output_filepath = '{}/{}.pkl'.format(batch_dir, simcode) with open(output_filepath, 'wb') as fh: pickle.dump({'meta': meta, 'data': df}, fh) logger.debug('simulation data exported to "%s"', output_filepath) # Detect spikes on Qm signal n_spikes, lat, sr = detectSpikes(t, Qm, SPIKE_MIN_QAMP, SPIKE_MIN_DT) logger.debug('%u spike%s detected', n_spikes, "s" if n_spikes > 1 else "") # Export key metrics to log file log = { 'A': date_str, 'B': daytime_str, 'C': neuron.name, 'D': solver.a * 1e9, 'E': solver.d * 1e6, 'F': Fdrive * 1e-3, 'G': Adrive * 1e-3, 'H': tstim * 1e3, 'I': PRF * 1e-3 if DC < 1 else 'N/A', 'J': DC, 'K': int_method, 'L': t.size, 'M': round(tcomp, 2), 'N': n_spikes, 'O': lat * 1e3 if isinstance(lat, float) else 'N/A', 'P': sr * 1e-3 if isinstance(sr, float) else 'N/A' } if xlslog(log_filepath, 'Data', log) == 1: logger.debug('log exported to "%s"', log_filepath) else: logger.error('log export to "%s" aborted', log_filepath) return output_filepath def runAStimBatch(batch_dir, log_filepath, neurons, stim_params, a=default_diam, int_method='effective'): ''' Run batch simulations of the system for various neuron types, sonophore and stimulation parameters. :param batch_dir: full path to output directory of batch :param log_filepath: full path log file of batch :param neurons: list of neurons names :param stim_params: dictionary containing sweeps for all stimulation parameters :param a: BLS structure diameter (m) :param int_method: selected integration method :return: list of full paths to the output files ''' mandatory_params = ['freqs', 'amps', 'durations', 'offsets', 'PRFs', 'DCs'] for mp in mandatory_params: - assert mp in stim_params, 'stim_params dictionary must contain "{}" field'.format(mp) + if mp not in stim_params: + raise InputError('Missing stimulation parameter field: "{}"'.format(mp)) # Define logging format ASTIM_CW_log = ('A-STIM %s simulation %u/%u: %s neuron, a = %.1f nm, f = %.2f kHz, ' 'A = %.2f kPa, t = %.2f ms') ASTIM_PW_log = ('A-STIM %s simulation %u/%u: %s neuron, a = %.1f nm, f = %.2f kHz, ' 'A = %.2f kPa, t = %.2f ms, PRF = %.2f kHz, DC = %.3f') logger.info("Starting A-STIM simulation batch") # Generate simulations queue sim_queue = createSimQueue(stim_params['amps'], stim_params['durations'], stim_params['offsets'], stim_params['PRFs'], stim_params['DCs']) nqueue = sim_queue.shape[0] # Run simulations nsims = len(neurons) * len(stim_params['freqs']) * nqueue simcount = 0 filepaths = [] for nname in neurons: neuron = getNeuronsDict()[nname]() for Fdrive in stim_params['freqs']: # Initialize SolverUS solver = SolverUS(a, neuron, Fdrive) for i in range(nqueue): simcount += 1 Adrive, tstim, toffset, PRF, DC = sim_queue[i, :] # Log and define naming if DC == 1.0: logger.info(ASTIM_CW_log, int_method, simcount, nsims, neuron.name, a * 1e9, Fdrive * 1e-3, Adrive * 1e-3, tstim * 1e3) else: logger.info(ASTIM_PW_log, int_method, simcount, nsims, neuron.name, a * 1e9, Fdrive * 1e-3, Adrive * 1e-3, tstim * 1e3, PRF * 1e-3, DC) # Run simulation output_filepath = runAStim(batch_dir, log_filepath, solver, neuron, Fdrive, Adrive, tstim, toffset, PRF, DC, int_method) filepaths.append(output_filepath) return filepaths def titrateEStim(solver, neuron, Astim, tstim, toffset, PRF=1.5e3, DC=1.0): """ Use a dichotomic recursive search to determine the threshold value of a specific electric stimulation parameter needed to obtain neural excitation, keeping all other parameters fixed. The titration parameter can be stimulation amplitude, duration or any variable for which the number of spikes is a monotonically increasing function. This function is called recursively until an accurate threshold is found. :param solver: solver instance :param neuron: neuron object :param Astim: injected current density amplitude (mA/m2) :param tstim: duration of US stimulation (s) :param toffset: duration of the offset (s) :param PRF: pulse repetition frequency (Hz) :param DC: pulse duty cycle (-) :return: 5-tuple with the determined amplitude threshold, time profile, solution matrix, state vector and response latency """ # Determine titration type if isinstance(Astim, tuple): t_type = 'A' interval = Astim thr = TITRATION_ESTIM_DA_MAX maxval = TITRATION_ESTIM_A_MAX elif isinstance(tstim, tuple): t_type = 't' interval = tstim thr = TITRATION_DT_THR maxval = TITRATION_T_MAX elif isinstance(DC, tuple): t_type = 'DC' interval = DC thr = TITRATION_DDC_THR maxval = TITRATION_DC_MAX else: logger.error('Invalid titration type') return 0. t_var = ESTIM_params[t_type] # Check amplitude interval and define current value - assert interval[0] < interval[1], '{} interval must be defined as (lb, ub)'.format(t_type) + if interval[0] >= interval[1]: + raise InputError('Invaid {} interval: {} (must be defined as [lb, ub])' + .format(t_type, interval)) value = (interval[0] + interval[1]) / 2 # Define stimulation parameters if t_type == 'A': stim_params = [value, tstim, toffset, PRF, DC] elif t_type == 't': stim_params = [Astim, value, toffset, PRF, DC] elif t_type == 'DC': stim_params = [Astim, tstim, toffset, PRF, value] # Run simulation and detect spikes (t, y, states) = solver.run(neuron, *stim_params) n_spikes, latency, _ = detectSpikes(t, y[0, :], SPIKE_MIN_VAMP, SPIKE_MIN_DT) logger.debug('%.2f %s ---> %u spike%s detected', value * t_var['factor'], t_var['unit'], n_spikes, "s" if n_spikes > 1 else "") # If accurate threshold is found, return simulation results if (interval[1] - interval[0]) <= thr and n_spikes == 1: return (value, t, y, states, latency) # Otherwise, refine titration interval and iterate recursively else: if n_spikes == 0: if (maxval - interval[1]) <= thr: # if upper bound too close to max then stop logger.warning('no spikes detected within titration interval') return (np.nan, t, y, states, latency) new_interval = (value, interval[1]) else: new_interval = (interval[0], value) stim_params[t_var['index']] = new_interval return titrateEStim(solver, neuron, *stim_params) def titrateAStim(solver, neuron, Fdrive, Adrive, tstim, toffset, PRF=1.5e3, DC=1.0, int_method='effective'): """ Use a dichotomic recursive search to determine the threshold value of a specific acoustic stimulation parameter needed to obtain neural excitation, keeping all other parameters fixed. The titration parameter can be stimulation amplitude, duration or any variable for which the number of spikes is a monotonically increasing function. This function is called recursively until an accurate threshold is found. :param solver: solver instance :param neuron: neuron object :param Fdrive: acoustic drive frequency (Hz) :param Adrive: acoustic drive amplitude (Pa) :param tstim: duration of US stimulation (s) :param toffset: duration of the offset (s) :param PRF: pulse repetition frequency (Hz) :param DC: pulse duty cycle (-) :param int_method: selected integration method :return: 5-tuple with the determined amplitude threshold, time profile, solution matrix, state vector and response latency """ # Determine titration type if isinstance(Adrive, tuple): t_type = 'A' interval = Adrive thr = TITRATION_ASTIM_DA_MAX maxval = TITRATION_ASTIM_A_MAX elif isinstance(tstim, tuple): t_type = 't' interval = tstim thr = TITRATION_DT_THR maxval = TITRATION_T_MAX elif isinstance(DC, tuple): t_type = 'DC' interval = DC thr = TITRATION_DDC_THR maxval = TITRATION_DC_MAX else: logger.error('Invalid titration type') return 0. t_var = ASTIM_params[t_type] # Check amplitude interval and define current value - assert interval[0] < interval[1], '{} interval must be defined as (lb, ub)'.format(t_type) + if interval[0] >= interval[1]: + raise InputError('Invaid {} interval: {} (must be defined as [lb, ub])' + .format(t_type, interval)) value = (interval[0] + interval[1]) / 2 # Define stimulation parameters if t_type == 'A': stim_params = [Fdrive, value, tstim, toffset, PRF, DC] elif t_type == 't': stim_params = [Fdrive, Adrive, value, toffset, PRF, DC] elif t_type == 'DC': stim_params = [Fdrive, Adrive, tstim, toffset, PRF, value] # Run simulation and detect spikes (t, y, states) = solver.run(neuron, *stim_params, int_method) n_spikes, latency, _ = detectSpikes(t, y[2, :], SPIKE_MIN_QAMP, SPIKE_MIN_DT) logger.debug('%.2f %s ---> %u spike%s detected', value * t_var['factor'], t_var['unit'], n_spikes, "s" if n_spikes > 1 else "") # If accurate threshold is found, return simulation results if (interval[1] - interval[0]) <= thr and n_spikes == 1: return (value, t, y, states, latency) # Otherwise, refine titration interval and iterate recursively else: if n_spikes == 0: if (maxval - interval[1]) <= thr: # if upper bound too close to max then stop logger.warning('no spikes detected within titration interval') return (np.nan, t, y, states, latency) new_interval = (value, interval[1]) else: new_interval = (interval[0], value) stim_params[t_var['index']] = new_interval return titrateAStim(solver, neuron, *stim_params, int_method) def titrateAStimBatch(batch_dir, log_filepath, neurons, stim_params, a=default_diam, int_method='effective'): ''' Run batch acoustic titrations of the system for various neuron types, sonophore and stimulation parameters, to determine the threshold of a specific stimulus parameter for neural excitation. :param batch_dir: full path to output directory of batch :param log_filepath: full path log file of batch :param neurons: list of neurons names :param stim_params: dictionary containing sweeps for all stimulation parameters :param a: BLS structure diameter (m) :param int_method: selected integration method :return: list of full paths to the output files ''' # Define logging format ASTIM_titration_log = '%s neuron - A-STIM titration %u/%u (a = %.1f nm, %s)' logger.info("Starting A-STIM titration batch") # Define default parameters int_method = 'effective' # Determine titration parameter and titrations list if 'durations' not in stim_params: t_type = 't' sim_queue = createSimQueue(stim_params['amps'], [None], [TITRATION_T_OFFSET], stim_params['PRFs'], stim_params['DCs']) # sim_queue = np.delete(sim_queue, 1, axis=1) elif 'amps' not in stim_params: t_type = 'A' sim_queue = createSimQueue([None], stim_params['durations'], [TITRATION_T_OFFSET] * len(stim_params['durations']), stim_params['PRFs'], stim_params['DCs']) elif 'DC' not in stim_params: t_type = 'DC' sim_queue = createSimQueue(stim_params['amps'], stim_params['durations'], [TITRATION_T_OFFSET] * len(stim_params['durations']), stim_params['PRFs'], [None]) nqueue = sim_queue.shape[0] t_var = ASTIM_params[t_type] # Run titrations nsims = len(neurons) * len(stim_params['freqs']) * nqueue simcount = 0 filepaths = [] for nname in neurons: neuron = getNeuronsDict()[nname]() for Fdrive in stim_params['freqs']: try: # Create SolverUS instance (modulus of embedding tissue depends on frequency) solver = SolverUS(a, neuron, Fdrive) for i in range(nqueue): simcount += 1 # Extract parameters Adrive, tstim, toffset, PRF, DC = sim_queue[i, :] if Adrive is None: Adrive = (0., 2 * TITRATION_ASTIM_A_MAX) elif tstim is None: tstim = (0., 2 * TITRATION_T_MAX) elif DC is None: DC = (0., 2 * TITRATION_DC_MAX) curr_params = [Fdrive, Adrive, tstim, PRF, DC] # Generate log str log_str = '' pnames = list(ASTIM_params.keys()) j = 0 for cp in curr_params: pn = pnames[j] pi = ASTIM_params[pn] if not isinstance(cp, tuple): if log_str: log_str += ', ' log_str += '{} = {:.2f} {}'.format(pn, pi['factor'] * cp, pi['unit']) j += 1 # Get date and time info date_str = time.strftime("%Y.%m.%d") daytime_str = time.strftime("%H:%M:%S") # Log logger.info(ASTIM_titration_log, neuron.name, simcount, nsims, a * 1e9, log_str) # Run titration tstart = time.time() (output_thr, t, y, states, lat) = titrateAStim(solver, neuron, Fdrive, Adrive, tstim, toffset, PRF, DC) Z, ng, Qm, *channels = y U = np.insert(np.diff(Z) / np.diff(t), 0, 0.0) tcomp = time.time() - tstart logger.info('completed in %.2f s, threshold = %.2f %s', tcomp, output_thr * t_var['factor'], t_var['unit']) # Determine output variable if t_type == 'A': Adrive = output_thr elif t_type == 't': tstim = output_thr elif t_type == 'DC': DC = output_thr # Define output naming if DC == 1.0: simcode = ASTIM_CW_code.format(neuron.name, a * 1e9, Fdrive * 1e-3, Adrive * 1e-3, tstim * 1e3, int_method) else: simcode = ASTIM_PW_code.format(neuron.name, a * 1e9, Fdrive * 1e-3, Adrive * 1e-3, tstim * 1e3, PRF * 1e-3, DC, int_method) # Store dataframe and metadata df = pd.DataFrame({'t': t, 'states': states, 'U': U, 'Z': Z, 'ng': ng, 'Qm': Qm, 'Vm': Qm * 1e3 / np.array([solver.Capct(ZZ) for ZZ in Z])}) for j in range(len(neuron.states_names)): df[neuron.states_names[j]] = channels[j] meta = {'neuron': neuron.name, 'a': solver.a, 'd': solver.d, 'Fdrive': Fdrive, 'Adrive': Adrive, 'phi': np.pi, 'tstim': tstim, 'toffset': toffset, 'PRF': PRF, 'DC': DC, 'tcomp': tcomp} # Export into to PKL file output_filepath = '{}/{}.pkl'.format(batch_dir, simcode) with open(output_filepath, 'wb') as fh: pickle.dump({'meta': meta, 'data': df}, fh) logger.debug('simulation data exported to "%s"', output_filepath) filepaths.append(output_filepath) # Detect spikes on Qm signal n_spikes, lat, sr = detectSpikes(t, Qm, SPIKE_MIN_QAMP, SPIKE_MIN_DT) logger.info('%u spike%s detected', n_spikes, "s" if n_spikes > 1 else "") # Export key metrics to log file log = { 'A': date_str, 'B': daytime_str, 'C': neuron.name, 'D': solver.a * 1e9, 'E': solver.d * 1e6, 'F': Fdrive * 1e-3, 'G': Adrive * 1e-3, 'H': tstim * 1e3, 'I': PRF * 1e-3 if DC < 1 else 'N/A', 'J': DC, 'K': int_method, 'L': t.size, 'M': round(tcomp, 2), 'N': n_spikes, 'O': lat * 1e3 if isinstance(lat, float) else 'N/A', 'P': sr * 1e-3 if isinstance(sr, float) else 'N/A' } if xlslog(log_filepath, 'Data', log) == 1: logger.info('log exported to "%s"', log_filepath) else: logger.error('log export to "%s" aborted', log_filepath) except AssertionError as err: logger.error(err) return filepaths def titrateEStimBatch(batch_dir, log_filepath, neurons, stim_params): ''' Run batch electrical titrations of the system for various neuron types and stimulation parameters, to determine the threshold of a specific stimulus parameter for neural excitation. :param batch_dir: full path to output directory of batch :param log_filepath: full path log file of batch :param neurons: list of neurons names :param stim_params: dictionary containing sweeps for all stimulation parameters :return: list of full paths to the output files ''' # Define logging format ESTIM_titration_log = '%s neuron - E-STIM titration %u/%u (%s)' logger.info("Starting E-STIM titration batch") # Determine titration parameter and titrations list if 'durations' not in stim_params: t_type = 't' sim_queue = createSimQueue(stim_params['amps'], [None], [TITRATION_T_OFFSET], stim_params['PRFs'], stim_params['DCs']) elif 'amps' not in stim_params: t_type = 'A' sim_queue = createSimQueue([None], stim_params['durations'], [TITRATION_T_OFFSET] * len(stim_params['durations']), stim_params['PRFs'], stim_params['DCs']) elif 'DC' not in stim_params: t_type = 'DC' sim_queue = createSimQueue(stim_params['amps'], stim_params['durations'], [TITRATION_T_OFFSET] * len(stim_params['durations']), stim_params['PRFs'], [None]) nqueue = sim_queue.shape[0] t_var = ESTIM_params[t_type] # Create SolverElec instance solver = SolverElec() # Run titrations nsims = len(neurons) * nqueue simcount = 0 filepaths = [] for nname in neurons: neuron = getNeuronsDict()[nname]() try: for i in range(nqueue): simcount += 1 # Extract parameters Astim, tstim, toffset, PRF, DC = sim_queue[i, :] if Astim is None: Astim = (0., 2 * TITRATION_ESTIM_A_MAX) elif tstim is None: tstim = (0., 2 * TITRATION_T_MAX) elif DC is None: DC = (0., 2 * TITRATION_DC_MAX) curr_params = [Astim, tstim, PRF, DC] # Generate log str log_str = '' pnames = list(ESTIM_params.keys()) j = 0 for cp in curr_params: pn = pnames[j] pi = ESTIM_params[pn] if not isinstance(cp, tuple): if log_str: log_str += ', ' log_str += '{} = {:.2f} {}'.format(pn, pi['factor'] * cp, pi['unit']) j += 1 # Get date and time info date_str = time.strftime("%Y.%m.%d") daytime_str = time.strftime("%H:%M:%S") # Log logger.info(ESTIM_titration_log, neuron.name, simcount, nsims, log_str) # Run titration tstart = time.time() (output_thr, t, y, states, lat) = titrateEStim(solver, neuron, Astim, tstim, toffset, PRF, DC) Vm, *channels = y tcomp = time.time() - tstart logger.info('completed in %.2f s, threshold = %.2f %s', tcomp, output_thr * t_var['factor'], t_var['unit']) # Determine output variable if t_type == 'A': Astim = output_thr elif t_type == 't': tstim = output_thr elif t_type == 'DC': DC = output_thr # Define output naming if DC == 1.0: simcode = ESTIM_CW_code.format(neuron.name, Astim, tstim * 1e3) else: simcode = ESTIM_PW_code.format(neuron.name, Astim, tstim * 1e3, PRF * 1e-3, DC) # Store dataframe and metadata df = pd.DataFrame({'t': t, 'states': states, 'Vm': Vm}) for j in range(len(neuron.states_names)): df[neuron.states_names[j]] = channels[j] meta = {'neuron': neuron.name, 'Astim': Astim, 'tstim': tstim, 'toffset': toffset, 'PRF': PRF, 'DC': DC, 'tcomp': tcomp} # Export into to PKL file output_filepath = '{}/{}.pkl'.format(batch_dir, simcode) with open(output_filepath, 'wb') as fh: pickle.dump({'meta': meta, 'data': df}, fh) logger.info('simulation data exported to "%s"', output_filepath) filepaths.append(output_filepath) # Detect spikes on Qm signal n_spikes, lat, sr = detectSpikes(t, Vm, SPIKE_MIN_VAMP, SPIKE_MIN_DT) logger.info('%u spike%s detected', n_spikes, "s" if n_spikes > 1 else "") # Export key metrics to log file log = { 'A': date_str, 'B': daytime_str, 'C': neuron.name, 'D': Astim, 'E': tstim * 1e3, 'F': PRF * 1e-3 if DC < 1 else 'N/A', 'G': DC, 'H': t.size, 'I': round(tcomp, 2), 'J': n_spikes, 'K': lat * 1e3 if isinstance(lat, float) else 'N/A', 'L': sr * 1e-3 if isinstance(sr, float) else 'N/A' } if xlslog(log_filepath, 'Data', log) == 1: logger.info('log exported to "%s"', log_filepath) else: logger.error('log export to "%s" aborted', log_filepath) except AssertionError as err: logger.error(err) return filepaths def runMechBatch(batch_dir, log_filepath, Cm0, Qm0, stim_params, a=default_diam, d=default_embedding): ''' Run batch simulations of the mechanical system with imposed values of charge density, for various sonophore spans and stimulation parameters. :param batch_dir: full path to output directory of batch :param log_filepath: full path log file of batch :param Cm0: membrane resting capacitance (F/m2) :param Qm0: membrane resting charge density (C/m2) :param stim_params: dictionary containing sweeps for all stimulation parameters :param a: BLS in-plane diameter (m) :param d: depth of embedding tissue around plasma membrane (m) ''' + # Checking validity of stimulation parameters + mandatory_params = ['freqs', 'amps', 'charges'] + for mp in mandatory_params: + if mp not in stim_params: + raise InputError('Missing stimulation parameter field: "{}"'.format(mp)) + # Define logging format MECH_log = ('Mechanical simulation %u/%u (a = %.1f nm, d = %.1f um, f = %.2f kHz, ' 'A = %.2f kPa, Q = %.1f nC/cm2)') logger.info("Starting mechanical simulation batch") # Unpack stimulation parameters amps = np.array(stim_params['amps']) charges = np.array(stim_params['charges']) # Generate simulations queue nA = len(amps) nQ = len(charges) sim_queue = np.array(np.meshgrid(amps, charges)).T.reshape(nA * nQ, 2) nqueue = sim_queue.shape[0] # Run simulations nsims = len(stim_params['freqs']) * nqueue simcount = 0 filepaths = [] for Fdrive in stim_params['freqs']: try: # Create BilayerSonophore instance (modulus of embedding tissue depends on frequency) bls = BilayerSonophore(a, Fdrive, Cm0, Qm0, d) for i in range(nqueue): simcount += 1 Adrive, Qm = sim_queue[i, :] # Get date and time info date_str = time.strftime("%Y.%m.%d") daytime_str = time.strftime("%H:%M:%S") # Log and define naming logger.info(MECH_log, simcount, nsims, a * 1e9, d * 1e6, Fdrive * 1e-3, Adrive * 1e-3, Qm * 1e5) simcode = MECH_code.format(a * 1e9, Fdrive * 1e-3, Adrive * 1e-3, Qm * 1e5) # Run simulation tstart = time.time() (t, y, states) = bls.runMech(Fdrive, Adrive, Qm) (Z, ng) = y U = np.insert(np.diff(Z) / np.diff(t), 0, 0.0) tcomp = time.time() - tstart logger.info('completed in %.2f seconds', tcomp) # Store dataframe and metadata df = pd.DataFrame({'t': t, 'states': states, 'U': U, 'Z': Z, 'ng': ng}) meta = {'a': a, 'd': d, 'Cm0': Cm0, 'Qm0': Qm0, 'Fdrive': Fdrive, 'Adrive': Adrive, 'phi': np.pi, 'Qm': Qm, 'tcomp': tcomp} # Export into to PKL file output_filepath = '{}/{}.pkl'.format(batch_dir, simcode) with open(output_filepath, 'wb') as fh: pickle.dump({'meta': meta, 'data': df}, fh) logger.debug('simulation data exported to "%s"', output_filepath) filepaths.append(output_filepath) # Compute key output metrics Zmax = np.amax(Z) Zmin = np.amin(Z) Zabs_max = np.amax(np.abs([Zmin, Zmax])) eAmax = bls.arealstrain(Zabs_max) Tmax = bls.TEtot(Zabs_max) Pmmax = bls.PMavgpred(Zmin) ngmax = np.amax(ng) dUdtmax = np.amax(np.abs(np.diff(U) / np.diff(t)**2)) # Export key metrics to log file log = { 'A': date_str, 'B': daytime_str, 'C': a * 1e9, 'D': d * 1e6, 'E': Fdrive * 1e-3, 'F': Adrive * 1e-3, 'G': Qm * 1e5, 'H': t.size, 'I': tcomp, 'J': bls.kA + bls.kA_tissue, 'K': Zmax * 1e9, 'L': eAmax, 'M': Tmax * 1e3, 'N': (ngmax - bls.ng0) / bls.ng0, 'O': Pmmax * 1e-3, 'P': dUdtmax } if xlslog(log_filepath, 'Data', log) == 1: logger.info('log exported to "%s"', log_filepath) else: logger.error('log export to "%s" aborted', log_filepath) except AssertionError as err: logger.error(err) return filepaths diff --git a/PointNICE/utils.py b/PointNICE/utils.py index 9aea989..dfd6298 100644 --- a/PointNICE/utils.py +++ b/PointNICE/utils.py @@ -1,299 +1,304 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Date: 2016-09-19 22:30:46 # @Email: theo.lemaire@epfl.ch # @Last Modified by: Theo Lemaire -# @Last Modified time: 2018-03-12 19:47:10 +# @Last Modified time: 2018-03-15 15:21:07 """ Definition of generic utility functions used in other modules """ from enum import Enum import os import tkinter as tk from tkinter import filedialog import inspect import json import yaml from openpyxl import load_workbook import numpy as np import colorlog from . import neurons def setLogger(): log_formatter = colorlog.ColoredFormatter( '%(log_color)s %(asctime)s %(message)s', datefmt='%d/%m/%Y %H:%M:%S:', reset=True, log_colors={ 'DEBUG': 'green', 'INFO': 'white', 'WARNING': 'yellow', 'ERROR': 'red', 'CRITICAL': 'red,bg_white', }, style='%' ) log_handler = colorlog.StreamHandler() log_handler.setFormatter(log_formatter) color_logger = colorlog.getLogger('PointNICE') color_logger.addHandler(log_handler) return color_logger # Get package logger logger = setLogger() +class InputError(Exception): + ''' Exception raised for errors in the input. ''' + pass + + class PmCompMethod(Enum): """ Enum: types of computation method for the intermolecular pressure """ direct = 1 predict = 2 def loadYAML(filepath): """ Load a dictionary of parameters from an external YAML file. :param filepath: full path to the YAML file :return: parameters dictionary """ _, filename = os.path.split(filepath) logger.debug('Loading parameters from "%s"', filename) with open(filepath, 'r') as f: stream = f.read() params = yaml.load(stream) return ParseNestedDict(params) def getLookupDir(): """ Return the location of the directory holding lookups files. :return: absolute path to the directory """ this_dir, _ = os.path.split(__file__) return this_dir + '/lookups' def ParseNestedDict(dict_in): """ Loop through a nested dictionary object and convert all string fields to floats. """ for key, value in dict_in.items(): if isinstance(value, dict): # If value itself is dictionary dict_in[key] = ParseNestedDict(value) elif isinstance(dict_in[key], str): dict_in[key] = float(dict_in[key]) return dict_in def OpenFilesDialog(filetype, dirname=''): """ Open a FileOpenDialogBox to select one or multiple file. The default directory and file type are given. :param dirname: default directory :param filetype: default file type :return: tuple of full paths to the chosen filenames """ root = tk.Tk() root.withdraw() filenames = filedialog.askopenfilenames(filetypes=[(filetype + " files", '.' + filetype)], initialdir=dirname) if filenames: par_dir = os.path.abspath(os.path.join(filenames[0], os.pardir)) else: par_dir = None return (filenames, par_dir) def ImportExcelCol(filename, sheetname, colstr, startrow): """ Load a specific column of an excel workbook as a numpy array. :param filename: absolute or relative path to the Excel workbook :param sheetname: name of the Excel spreadsheet to which data is appended :param colstr: string of the column to import :param startrow: index of the first row to consider :return: 1D numpy array with the column data """ wb = load_workbook(filename, read_only=True) ws = wb.get_sheet_by_name(sheetname) range_start_str = colstr + str(startrow) range_stop_str = colstr + str(ws.max_row) tmp = np.array([[i.value for i in j] for j in ws[range_start_str:range_stop_str]]) return tmp[:, 0] def ConstructMatrix(serialized_inputA, serialized_inputB, serialized_output): """ Construct a 2D output matrix from serialized input. :param serialized_inputA: serialized input variable A :param serialized_inputB: serialized input variable B :param serialized_output: serialized output variable :return: 4-tuple with vectors of unique values of A (m) and B (n), output variable 2D matrix (m,n) and number of holes in the matrix """ As = np.unique(serialized_inputA) Bs = np.unique(serialized_inputB) nA = As.size nB = Bs.size output = np.zeros((nA, nB)) output[:] = np.NAN nholes = 0 for i in range(nA): iA = np.where(serialized_inputA == As[i]) for j in range(nB): iB = np.where(serialized_inputB == Bs[j]) iMatch = np.intersect1d(iA, iB) if iMatch.size == 0: nholes += 1 elif iMatch.size > 1: logger.warning('Identical serialized inputs with values (%f, %f)', As[i], Bs[j]) else: iMatch = iMatch[0] output[i, j] = serialized_output[iMatch] return (As, Bs, output, nholes) def rmse(x1, x2): """ Compute the root mean square error between two 1D arrays """ return np.sqrt(((x1 - x2) ** 2).mean()) def rsquared(x1, x2): ''' compute the R-squared coefficient between two 1D arrays ''' residuals = x1 - x2 ss_res = np.sum(residuals**2) ss_tot = np.sum((x1 - np.mean(x1))**2) return 1 - (ss_res / ss_tot) def DownSample(t_dense, y, nsparse): """ Decimate periodic signals to a specified number of samples.""" if(y.ndim) > 1: nsignals = y.shape[0] else: nsignals = 1 y = np.array([y]) # determine time step and period of input signal T = t_dense[-1] - t_dense[0] dt_dense = t_dense[1] - t_dense[0] # resample time vector linearly t_ds = np.linspace(t_dense[0], t_dense[-1], nsparse) # create MAV window nmav = int(0.03 * T / dt_dense) if nmav % 2 == 0: nmav += 1 mav = np.ones(nmav) / nmav # determine signals padding npad = int((nmav - 1) / 2) # determine indexes of sampling on convolved signals ids = np.round(np.linspace(0, t_dense.size - 1, nsparse)).astype(int) y_ds = np.empty((nsignals, nsparse)) # loop through signals for i in range(nsignals): # pad, convolve and resample pad_left = y[i, -(npad + 2):-2] pad_right = y[i, 1:npad + 1] y_ext = np.concatenate((pad_left, y[i, :], pad_right), axis=0) y_mav = np.convolve(y_ext, mav, mode='valid') y_ds[i, :] = y_mav[ids] if nsignals == 1: y_ds = y_ds[0, :] return (t_ds, y_ds) def Pressure2Intensity(p, rho=1075.0, c=1515.0): """ Return the spatial peak, pulse average acoustic intensity (ISPPA) associated with the specified pressure amplitude. :param p: pressure amplitude (Pa) :param rho: medium density (kg/m3) :param c: speed of sound in medium (m/s) :return: spatial peak, pulse average acoustic intensity (W/m2) """ return p**2 / (2 * rho * c) def Intensity2Pressure(I, rho=1075.0, c=1515.0): """ Return the pressure amplitude associated with the specified spatial peak, pulse average acoustic intensity (ISPPA). :param I: spatial peak, pulse average acoustic intensity (W/m2) :param rho: medium density (kg/m3) :param c: speed of sound in medium (m/s) :return: pressure amplitude (Pa) """ return np.sqrt(2 * rho * c * I) def find_nearest(array, value): ''' Find nearest element in 1D array. ''' idx = (np.abs(array - value)).argmin() return (idx, array[idx]) def rescale(x, lb, ub, lb_new=0, ub_new=1): ''' Rescale a value to a specific interval by linear transformation. ''' xnorm = (x - lb) / (ub - lb) return xnorm * (ub_new - lb_new) + lb_new def LennardJones(x, beta, alpha, C, m, n): """ Generic expression of a Lennard-Jones function, adapted for the context of symmetric deflection (distance = 2x). :param x: deflection (i.e. half-distance) :param beta: x-shifting factor :param alpha: x-scaling factor :param C: y-scaling factor :param m: exponent of the repulsion term :param n: exponent of the attraction term :return: Lennard-Jones potential at given distance (2x) """ return C * (np.power((alpha / (2 * x + beta)), m) - np.power((alpha / (2 * x + beta)), n)) def getNeuronsDict(): ''' Return dictionary of neurons classes that can be instantiated. ''' neurons_dict = {} for _, obj in inspect.getmembers(neurons): if inspect.isclass(obj) and isinstance(obj.name, str): neurons_dict[obj.name] = obj return neurons_dict def get_BLS_lookups(a): lookup_path = getLookupDir() + '/BLS_lookups_a{:.1f}nm.json'.format(a * 1e9) try: with open(lookup_path) as fh: sample = json.load(fh) return sample except FileNotFoundError: return {} def save_BLS_lookups(a, lookups): """ Save BLS parameter into specific lookup file :return: absolute path to the directory """ lookup_path = getLookupDir() + '/BLS_lookups_a{:.1f}nm.json'.format(a * 1e9) with open(lookup_path, 'w') as fh: json.dump(lookups, fh) diff --git a/plot/plot_batch.py b/plot/plot_batch.py index 7245a4d..af9b10e 100644 --- a/plot/plot_batch.py +++ b/plot/plot_batch.py @@ -1,33 +1,33 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Date: 2017-03-20 12:19:55 # @Email: theo.lemaire@epfl.ch # @Last Modified by: Theo Lemaire -# @Last Modified time: 2018-03-13 22:42:38 +# @Last Modified time: 2018-03-15 16:56:56 """ Batch plot profiles of several specific output variables of NICE simulations. """ import sys import logging -from PointNICE.utils import logger, OpenFilesDialog +from PointNICE.utils import logger, OpenFilesDialog, InputError from PointNICE.plt import plotBatch # Set logging level logger.setLevel(logging.INFO) # Select data files pkl_filepaths, pkl_dir = OpenFilesDialog('pkl') if not pkl_filepaths: logger.error('No input file') sys.exit(1) # Plot profiles try: # yvars = {'Q_m': ['Qm'], 'i_{Ca}\ kin.': ['s', 'u', 's2u'], 'I': ['iNa', 'iK', 'iT', 'iL']} yvars = {'Q_m': ['Qm']} plotBatch(pkl_dir, pkl_filepaths, title=False, vars_dict=yvars) -except AssertionError as err: +except InputError as err: logger.error(err) sys.exit(1) diff --git a/plot/plot_comp.py b/plot/plot_comp.py index cba53f9..4e78994 100644 --- a/plot/plot_comp.py +++ b/plot/plot_comp.py @@ -1,35 +1,35 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Date: 2017-02-13 12:41:26 # @Email: theo.lemaire@epfl.ch # @Last Modified by: Theo Lemaire -# @Last Modified time: 2018-03-13 22:47:06 +# @Last Modified time: 2018-03-15 16:57:56 """ Compare profiles of several specific output variables of NICE simulations. """ import sys import logging -from PointNICE.utils import logger, OpenFilesDialog +from PointNICE.utils import logger, OpenFilesDialog, InputError from PointNICE.plt import plotComp # Set logging level logger.setLevel(logging.INFO) # Select data files pkl_filepaths, _ = OpenFilesDialog('pkl') if not pkl_filepaths: logger.error('No input file') sys.exit(1) # Comparative plot try: yvars = ['Qm'] labels = ['classic', 'effective'] # labels = ['FS neuron', 'LTS neuron', 'RE neuron', 'RS neuron', 'TC neuron'] plotComp(yvars, pkl_filepaths) -except AssertionError as err: +except InputError as err: logger.error(err) sys.exit(1) diff --git a/sim/ASTIM_batch.py b/sim/ASTIM_batch.py index 3e4066d..4b6661c 100644 --- a/sim/ASTIM_batch.py +++ b/sim/ASTIM_batch.py @@ -1,53 +1,53 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Date: 2017-02-13 18:16:09 # @Email: theo.lemaire@epfl.ch # @Last Modified by: Theo Lemaire -# @Last Modified time: 2018-03-12 20:17:05 +# @Last Modified time: 2018-03-15 16:55:31 """ Run batch acoustic simulations of specific "point-neuron" models. """ import sys import os import logging import numpy as np -from PointNICE.utils import logger +from PointNICE.utils import logger, InputError from PointNICE.solvers import setBatchDir, checkBatchLog, runAStimBatch from PointNICE.plt import plotBatch # Set logging level logger.setLevel(logging.INFO) # Neurons -neurons = ['LeechP'] +neurons = ['RS'] # Stimulation parameters stim_params = { 'freqs': [1000e3], # Hz 'amps': np.array([10, 20, 40, 80, 150, 300, 600]) * 1e3, # Pa 'durations': np.array([20, 40, 60, 80, 100, 150, 200, 250, 300]) * 1e-3, # s 'PRFs': np.array([0.1, 0.2, 0.5, 1.0, 2.0, 5.0, 10.0]) * 1e3, # Hz 'DCs': np.array([1, 2, 5, 10, 25, 50, 75, 100]) * 1e-2 } stim_params['offsets'] = 350e-3 - stim_params['durations'] # s try: # Select output directory batch_dir = setBatchDir() log_filepath, _ = checkBatchLog(batch_dir, 'A-STIM') # Run A-STIM batch pkl_filepaths = runAStimBatch(batch_dir, log_filepath, neurons, stim_params, - int_method='classic') + int_method='effective') pkl_dir, _ = os.path.split(pkl_filepaths[0]) # Plot resulting profiles # yvars = {'Q_m': ['Qm']} # plotBatch(pkl_dir, pkl_filepaths, yvars) -except AssertionError as err: +except InputError as err: logger.error(err) sys.exit(1) diff --git a/sim/ASTIM_lookups.py b/sim/ASTIM_lookups.py index 81e1353..b36c2b7 100644 --- a/sim/ASTIM_lookups.py +++ b/sim/ASTIM_lookups.py @@ -1,43 +1,43 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Date: 2017-06-02 17:50:10 # @Email: theo.lemaire@epfl.ch # @Last Modified by: Theo Lemaire -# @Last Modified time: 2018-03-15 11:02:55 +# @Last Modified time: 2018-03-15 16:20:17 """ Create lookup tables for different acoustic frequencies. """ import logging import numpy as np import PointNICE -from PointNICE.utils import logger +from PointNICE.utils import logger, InputError from PointNICE.neurons import * # Set logging level logger.setLevel(logging.INFO) # BLS diameter (m) a = 32e-9 # Channel mechanisms neurons = [CorticalRS()] # Stimulation parameters freqs = np.array([20., 100., 500., 1000., 2000., 3000., 4000.]) * 1e3 # Hz amps = np.logspace(np.log10(0.1), np.log10(600), num=50) * 1e3 # Pa amps = np.insert(amps, 0, 0.0) # adding amplitude 0 logger.info('Starting batch lookup creation') for neuron in neurons: # Create a SolverUS instance (with dummy frequency parameter) solver = PointNICE.SolverUS(a, neuron, 0.0) # Create lookup file try: solver.createLookup(neuron, freqs, amps) logger.info('%s Lookup table successfully created', neuron.name) - except AssertionError as err: + except InputError as err: logger.error(err) diff --git a/sim/ASTIM_run.py b/sim/ASTIM_run.py index 06c488a..7e5a871 100644 --- a/sim/ASTIM_run.py +++ b/sim/ASTIM_run.py @@ -1,107 +1,108 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Date: 2017-02-13 18:16:09 # @Email: theo.lemaire@epfl.ch # @Last Modified by: Theo Lemaire -# @Last Modified time: 2018-03-07 18:51:31 +# @Last Modified time: 2018-03-15 15:23:11 """ Script to run ASTIM simulations from command line. """ import sys import os import logging from argparse import ArgumentParser -from PointNICE.utils import logger, getNeuronsDict +from PointNICE.utils import logger, getNeuronsDict, InputError from PointNICE.solvers import checkBatchLog, SolverUS, runAStim from PointNICE.plt import plotBatch # Default parameters default = { 'neuron': 'RS', 'a': 32.0, # nm 'f': 500.0, # kHz 'A': 100.0, # kPa 't': 150.0, # ms 'off': 20.0, # ms 'PRF': 100.0, # Hz 'DC': 100.0, # % 'int_method': 'effective' } def main(): # Define argument parser ap = ArgumentParser() # ASTIM parameters ap.add_argument('-n', '--neuron', type=str, default=default['neuron'], help='Neuron name (string)') ap.add_argument('-a', '--diameter', type=float, default=default['a'], help='BLS diameter (nm)') ap.add_argument('-f', '--frequency', type=float, default=default['f'], help='Acoustic drive frequency (kHz)') ap.add_argument('-A', '--amplitude', type=float, default=default['A'], help='Acoustic pressure amplitude (kPa)') ap.add_argument('-t', '--duration', type=float, default=default['t'], help='Stimulus duration (ms)') ap.add_argument('--offset', type=float, default=default['off'], help='Offset duration (ms)') ap.add_argument('--PRF', type=float, default=default['PRF'], help='PRF (Hz)') ap.add_argument('--DC', type=float, default=default['DC'], help='Duty cycle (%%)') ap.add_argument('-o', '--outputdir', type=str, default=os.getcwd(), help='Output directory') ap.add_argument('-m', '--method', type=str, default=default['int_method'], help='Numerical integration method ("classic", "hybrid" or "effective"') # Boolean parameters ap.add_argument('-v', '--verbose', default=False, action='store_true', help='Increase verbosity') ap.add_argument('-p', '--plot', default=False, action='store_true', help='Plot results') # Parse arguments args = ap.parse_args() neuron_str = args.neuron a = args.diameter * 1e-9 # m Fdrive = args.frequency * 1e3 # Hz Adrive = args.amplitude * 1e3 # Pa tstim = args.duration * 1e-3 # s toffset = args.offset * 1e-3 # s PRF = args.PRF # Hz DC = args.DC * 1e-2 output_dir = args.outputdir int_method = args.method if args.verbose: logger.setLevel(logging.DEBUG) else: logger.setLevel(logging.INFO) PW_str = ', PRF = {:.2f} Hz, DC = {:.1f}'.format(PRF, DC * 1e2) try: - assert neuron_str in getNeuronsDict(), '{} neuron type not implemented'.format(neuron_str) + if neuron_str not in getNeuronsDict(): + raise InputError('Unknown neuron type: "{}"'.format(neuron_str)) log_filepath, _ = checkBatchLog(output_dir, 'A-STIM') neuron = getNeuronsDict()[neuron_str]() logger.info('Running A-STIM simulation on %s neuron: a = %.1f nm, f = %.2f kHz, ' 'A = %.2f kPa, t = %.2f ms%s (%s method)', neuron_str, a * 1e9, Fdrive * 1e-3, Adrive * 1e-3, tstim * 1e3, PW_str if DC < 1.0 else "", int_method) solver = SolverUS(a, neuron, Fdrive) outfile = runAStim(output_dir, log_filepath, solver, neuron, Fdrive, Adrive, tstim, toffset, PRF, DC, int_method) logger.info('Finished') if args.plot: plotBatch(output_dir, [outfile]) - except AssertionError as err: + except InputError as err: logger.error(err) sys.exit(1) if __name__ == '__main__': main() diff --git a/sim/ASTIM_titration_batch.py b/sim/ASTIM_titration_batch.py index 72f9ed0..2d89219 100644 --- a/sim/ASTIM_titration_batch.py +++ b/sim/ASTIM_titration_batch.py @@ -1,48 +1,48 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Date: 2017-02-13 18:16:09 # @Email: theo.lemaire@epfl.ch # @Last Modified by: Theo Lemaire -# @Last Modified time: 2018-03-12 20:17:31 +# @Last Modified time: 2018-03-15 17:03:43 """ Run batch acoustic titrations of specific "point-neuron" models. """ import sys import os import logging import numpy as np -from PointNICE.utils import logger +from PointNICE.utils import logger, InputError from PointNICE.solvers import setBatchDir, checkBatchLog, titrateAStimBatch from PointNICE.plt import plotBatch # Set logging level logger.setLevel(logging.DEBUG) # Channels mechanisms neurons = ['RS'] # Stimulation parameters stim_params = { 'freqs': [5e5], # Hz # 'amps': [100e3], # Pa 'durations': [100e-3], # s 'PRFs': [1e2], # Hz 'DCs': [1.0, 0.05] } try: # Select output directory batch_dir = setBatchDir() log_filepath, _ = checkBatchLog(batch_dir, 'A-STIM') # Run titration batch pkl_filepaths = titrateAStimBatch(batch_dir, log_filepath, neurons, stim_params) pkl_dir, _ = os.path.split(pkl_filepaths[0]) # Plot resulting profiles plotBatch(pkl_dir, pkl_filepaths, {'Q_m': ['Qm']}) -except AssertionError as err: +except InputError as err: logger.error(err) sys.exit(1) diff --git a/sim/ESTIM_batch.py b/sim/ESTIM_batch.py index c774ce3..a5c7895 100644 --- a/sim/ESTIM_batch.py +++ b/sim/ESTIM_batch.py @@ -1,47 +1,47 @@ # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Date: 2017-08-24 11:55:07 # @Last Modified by: Theo Lemaire -# @Last Modified time: 2018-03-12 20:17:41 +# @Last Modified time: 2018-03-15 15:36:58 """ Run batch electrical simulations of specific "point-neuron" models. """ import sys import os import logging import numpy as np -from PointNICE.utils import logger +from PointNICE.utils import logger, InputError from PointNICE.solvers import setBatchDir, checkBatchLog, runEStimBatch from PointNICE.plt import plotBatch # Set logging level logger.setLevel(logging.INFO) # Neurons neurons = ['LeechP'] # Stimulation parameters stim_params = { 'amps': [2.0], # mA/m2 'durations': np.array([20, 40, 60, 80, 100, 150, 200, 250, 300]) * 1e-3, # s 'PRFs': np.array([0.1, 0.2, 0.5, 1.0, 2.0, 5.0, 10.0]) * 1e3, # Hz 'DCs': np.array([1, 2, 5, 10, 25, 50, 75, 100]) * 1e-2 } stim_params['offsets'] = 350e-3 - stim_params['durations'] # s try: # Select output directory batch_dir = setBatchDir() log_filepath, _ = checkBatchLog(batch_dir, 'E-STIM') # Run E-STIM batch pkl_filepaths = runEStimBatch(batch_dir, log_filepath, neurons, stim_params) pkl_dir, _ = os.path.split(pkl_filepaths[0]) # Plot resulting profiles # plotBatch(pkl_dir, pkl_filepaths) -except AssertionError as err: +except InputError as err: logger.error(err) sys.exit(1) diff --git a/sim/ESTIM_run.py b/sim/ESTIM_run.py index cb0ff04..96941df 100644 --- a/sim/ESTIM_run.py +++ b/sim/ESTIM_run.py @@ -1,94 +1,95 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Date: 2017-02-13 18:16:09 # @Email: theo.lemaire@epfl.ch # @Last Modified by: Theo Lemaire -# @Last Modified time: 2018-03-07 18:51:34 +# @Last Modified time: 2018-03-15 15:34:51 """ Script to run ESTIM simulations from command line. """ import sys import os import logging from argparse import ArgumentParser -from PointNICE.utils import logger, getNeuronsDict +from PointNICE.utils import logger, getNeuronsDict, InputError from PointNICE.solvers import checkBatchLog, SolverElec, runEStim from PointNICE.plt import plotBatch # Default parameters default = { 'neuron': 'RS', 'A': 10.0, # mA/m2 't': 150.0, # ms 'off': 20.0, # ms 'PRF': 100.0, # Hz 'DC': 100.0 # % } def main(): # Define argument parser ap = ArgumentParser() # ASTIM parameters ap.add_argument('-n', '--neuron', type=str, default=default['neuron'], help='Neuron name (string)') ap.add_argument('-A', '--amplitude', type=float, default=default['A'], help='Stimulus amplitude (mA/m2)') ap.add_argument('-t', '--duration', type=float, default=default['t'], help='Stimulus duration (ms)') ap.add_argument('--offset', type=float, default=default['off'], help='Offset duration (ms)') ap.add_argument('--PRF', type=float, default=default['PRF'], help='PRF (Hz)') ap.add_argument('--DC', type=float, default=default['DC'], help='Duty cycle (%%)') ap.add_argument('-o', '--outputdir', type=str, default=os.getcwd(), help='Output directory') # Boolean arguments ap.add_argument('-v', '--verbose', default=False, action='store_true', help='Increase verbosity') ap.add_argument('-p', '--plot', default=False, action='store_true', help='Plot results') # Parse arguments args = ap.parse_args() neuron_str = args.neuron Astim = args.amplitude # mA/m2 tstim = args.duration * 1e-3 # s toffset = args.offset * 1e-3 # s PRF = args.PRF # Hz DC = args.DC * 1e-2 output_dir = args.outputdir if args.verbose: logger.setLevel(logging.DEBUG) else: logger.setLevel(logging.INFO) PW_str = ', PRF = {:.2f} Hz, DC = {:.1f}'.format(PRF, DC * 1e2) try: - assert neuron_str in getNeuronsDict(), '{} neuron type not implemented'.format(neuron_str) + if neuron_str not in getNeuronsDict(): + raise InputError('Unknown neuron type: "{}"'.format(neuron_str)) log_filepath, _ = checkBatchLog(output_dir, 'E-STIM') neuron = getNeuronsDict()[neuron_str]() solver = SolverElec() logger.info('Running E-STIM simulation on %s neuron: A = %.2f mA/m2, t = %.2f ms%s', neuron_str, Astim, tstim * 1e3, PW_str if DC < 1.0 else "") outfile = runEStim(output_dir, log_filepath, solver, neuron, Astim, tstim, toffset, PRF, DC) logger.info('Finished') if args.plot: plotBatch(output_dir, [outfile]) - except AssertionError as err: + except InputError as err: logger.error(err) sys.exit(1) if __name__ == '__main__': main() diff --git a/sim/ESTIM_titration_batch.py b/sim/ESTIM_titration_batch.py index 2cbb666..82e9f16 100644 --- a/sim/ESTIM_titration_batch.py +++ b/sim/ESTIM_titration_batch.py @@ -1,46 +1,46 @@ # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Date: 2017-08-25 14:50:39 # @Last Modified by: Theo Lemaire -# @Last Modified time: 2018-03-12 20:17:56 +# @Last Modified time: 2018-03-15 17:03:59 """ Run batch electrical titrations of specific "point-neuron" models. """ import sys import os import logging import numpy as np -from PointNICE.utils import logger +from PointNICE.utils import logger, InputError from PointNICE.solvers import setBatchDir, checkBatchLog, titrateEStimBatch from PointNICE.plt import plotBatch # Set logging level logger.setLevel(logging.DEBUG) # Neurons neurons = ['RS'] # Stimulation parameters stim_params = { 'amps': [20.0], # mA/m2 'durations': [0.5], # s 'PRFs': [1e2], # Hz # 'DCs': [1.0] } try: # Select output directory batch_dir = setBatchDir() log_filepath, _ = checkBatchLog(batch_dir, 'E-STIM') # Run titration batch pkl_filepaths = titrateEStimBatch(batch_dir, log_filepath, neurons, stim_params) pkl_dir, _ = os.path.split(pkl_filepaths[0]) # Plot resulting profiles plotBatch(pkl_dir, pkl_filepaths, {'V_m': ['Vm']}) -except AssertionError as err: +except InputError as err: logger.error(err) sys.exit(1) diff --git a/sim/MECH_batch.py b/sim/MECH_batch.py index 25ec68d..bc2702c 100644 --- a/sim/MECH_batch.py +++ b/sim/MECH_batch.py @@ -1,54 +1,54 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Date: 2016-11-21 10:46:56 # @Email: theo.lemaire@epfl.ch # @Last Modified by: Theo Lemaire -# @Last Modified time: 2018-03-14 19:44:51 +# @Last Modified time: 2018-03-15 17:02:02 """ Run batch simulations of the NICE mechanical model with imposed charge densities """ import sys import os import logging import numpy as np -from PointNICE.utils import logger +from PointNICE.utils import logger, InputError from PointNICE.solvers import setBatchDir, checkBatchLog, runMechBatch from PointNICE.neurons import * from PointNICE.plt import plotBatch # Set logging level logger.setLevel(logging.DEBUG) a = 32e-9 # in-plane diameter (m) # Electrical properties of the membrane neuron = CorticalRS() Cm0 = neuron.Cm0 Qm0 = neuron.Vm0 * 1e-5 # Cm0 = 1e-2 # membrane resting capacitance (F/m2) # Qm0 = -80e-5 # membrane resting charge density (C/m2) # Stimulation parameters stim_params = { 'freqs': [20.0e3], # Hz 'amps': [352.24e3], # Pa - 'charges': np.arange(-80.0, 60.0) * 1e-5 # C/m2 + # 'charges': np.arange(-80.0, 60.0) * 1e-5 # C/m2 } # Select output directory try: batch_dir = setBatchDir() log_filepath, _ = checkBatchLog(batch_dir, 'MECH') # Run MECH batch pkl_filepaths = runMechBatch(batch_dir, log_filepath, Cm0, Qm0, stim_params, a) pkl_dir, _ = os.path.split(pkl_filepaths[0]) # Plot resulting profiles # plotBatch(pkl_dir, pkl_filepaths) -except AssertionError as err: +except InputError as err: logger.error(err) sys.exit(1)