diff --git a/PySONIC/lookups/BLS_lookups.json b/PySONIC/lookups/BLS_lookups.json index 41bd8c1..7275c93 100644 --- a/PySONIC/lookups/BLS_lookups.json +++ b/PySONIC/lookups/BLS_lookups.json @@ -1,299 +1,436 @@ { "32.0": { "-80.00": { "LJ_approx": { "x0": 1.7875580514692446e-09, "C": 14506.791031634148, "nrep": 3.911252335063797, "nattr": 0.9495868868453603 }, "Delta_eq": 1.2344323203763867e-09 }, "-71.40": { "LJ_approx": { "x0": 1.710159362626304e-09, "C": 16757.44053535206, "nrep": 3.9149844779001572, "nattr": 0.9876139143736086 }, "Delta_eq": 1.2566584760815426e-09 }, "-54.00": { "LJ_approx": { "x0": 1.588820457014353e-09, "C": 21124.28839722447, "nrep": 3.9219530533405726, "nattr": 1.0531179666960837 }, "Delta_eq": 1.302942739961778e-09 }, "-71.90": { "LJ_approx": { "x0": 1.7142977983903395e-09, "C": 16627.43538695451, "nrep": 3.9147721975981384, "nattr": 0.9855168537576823 }, "Delta_eq": 1.2553492695740507e-09 }, "-89.50": { "LJ_approx": { "x0": 1.8913883171160228e-09, "C": 12016.525797229067, "nrep": 3.9069373029335464, "nattr": 0.9021994595277029 }, "Delta_eq": 1.2107230911508513e-09 }, "-61.93": { "LJ_approx": { "x0": 1.6390264131559902e-09, "C": 19180.97634755811, "nrep": 3.9188840789597705, "nattr": 1.0250251620607604 }, "Delta_eq": 1.281743450351987e-09 }, "-53.00": { "LJ_approx": { "x0": 1.5830321174402216e-09, "C": 21361.655211483354, "nrep": 3.9223254792281588, "nattr": 1.0564645995714745 }, "Delta_eq": 1.305609024046854e-09 }, "-53.58": { "LJ_approx": { "x0": 1.5863754403940291e-09, "C": 21224.206759769622, "nrep": 3.922109852077156, "nattr": 1.0545313303221624 }, "Delta_eq": 1.3040630712174578e-09 }, "-48.87": { "LJ_approx": { "x0": 1.5603070731170595e-09, "C": 22321.280954434333, "nrep": 3.9238276518118833, "nattr": 1.0698008224359472 }, "Delta_eq": 1.3165739825437056e-09 }, "0.00": { "LJ_approx": { "x0": 1.429523524073023e-09, "C": 28748.036227122713, "nrep": 3.9338919786768276, "nattr": 1.1551044201542804 }, "Delta_eq": 1.4e-09 }, "-70.00": { "LJ_approx": { "x0": 1.698788510560293e-09, "C": 17120.631318195712, "nrep": 3.915575488491436, "nattr": 0.9934238714780391 }, "Delta_eq": 1.2603339470322538e-09 } }, "64.0": { "-80.00": { "LJ_approx": { "x0": 1.783357531675752e-09, "C": 14639.319598806138, "nrep": 3.9113027551187414, "nattr": 0.9404151935643594 }, "Delta_eq": 1.2344323203763867e-09 + }, + "-71.90": { + "LJ_approx": { + "x0": 1.7103254451796522e-09, + "C": 16775.90747591089, + "nrep": 3.9148582072320104, + "nattr": 0.9747613204242506 + }, + "Delta_eq": 1.2553492695740507e-09 + }, + "-71.40": { + "LJ_approx": { + "x0": 1.7061996856525807e-09, + "C": 16906.878806702443, + "nrep": 3.9150725853841957, + "nattr": 0.976778398349503 + }, + "Delta_eq": 1.2566584760815426e-09 + }, + "-54.00": { + "LJ_approx": { + "x0": 1.585264156646392e-09, + "C": 21303.176047683613, + "nrep": 3.92211004079812, + "nattr": 1.0402641595550777 + }, + "Delta_eq": 1.302942739961778e-09 + }, + "-89.50": { + "LJ_approx": { + "x0": 1.886870747500225e-09, + "C": 12129.260307725155, + "nrep": 3.906945602966425, + "nattr": 0.895598309376088 + }, + "Delta_eq": 1.2107230911508513e-09 + }, + "-61.93": { + "LJ_approx": { + "x0": 1.635297932833928e-09, + "C": 19347.359224637305, + "nrep": 3.9190113341505843, + "nattr": 1.0129039638328117 + }, + "Delta_eq": 1.281743450351987e-09 } }, "50.0": { "-71.90": { "LJ_approx": { "x0": 1.7114794411958874e-09, "C": 16732.841575829825, "nrep": 3.914827883392826, "nattr": 0.9781401389550711 }, "Delta_eq": 1.2553492695740507e-09 } }, "10.0": { "0.00": { "LJ_approx": { "x0": 1.4403460578039628e-09, "C": 27932.27792195569, "nrep": 3.9334138654752686, "nattr": 1.19526523864855 }, "Delta_eq": 1.4e-09 } }, "100.0": { "0.00": { "LJ_approx": { "x0": 1.4254455131143225e-09, "C": 29048.417918044444, "nrep": 3.9342659189249254, "nattr": 1.1351227816904121 }, "Delta_eq": 1.4e-09 + }, + "-71.90": { + "LJ_approx": { + "x0": 1.7087681652667724e-09, + "C": 16833.83962398515, + "nrep": 3.914908533680663, + "nattr": 0.9697102045586926 + }, + "Delta_eq": 1.2553492695740507e-09 + }, + "-71.40": { + "LJ_approx": { + "x0": 1.7046480280451768e-09, + "C": 16965.15489682674, + "nrep": 3.9151238997284845, + "nattr": 0.9716928395857687 + }, + "Delta_eq": 1.2566584760815426e-09 + }, + "-54.00": { + "LJ_approx": { + "x0": 1.5838767580748841e-09, + "C": 21372.412565003375, + "nrep": 3.922191259312523, + "nattr": 1.0344167918733638 + }, + "Delta_eq": 1.302942739961778e-09 + }, + "-89.50": { + "LJ_approx": { + "x0": 1.8850831984948754e-09, + "C": 12173.857567383837, + "nrep": 3.906959887367545, + "nattr": 0.8923154523792497 + }, + "Delta_eq": 1.2107230911508513e-09 + }, + "-61.93": { + "LJ_approx": { + "x0": 1.6338405482612552e-09, + "C": 19411.96069791924, + "nrep": 3.9190798895919405, + "nattr": 1.0073171165886772 + }, + "Delta_eq": 1.281743450351987e-09 } }, "500.0": { "0.00": { "LJ_approx": { "x0": 1.4236928207491738e-09, "C": 29174.423851140436, "nrep": 3.934489087598531, "nattr": 1.1244706663694597 }, "Delta_eq": 1.4e-09 } }, "15.0": { "-71.90": { "LJ_approx": { "x0": 1.722207527516432e-09, "C": 16331.124295102756, "nrep": 3.914721476976037, "nattr": 1.0021304453280393 }, "Delta_eq": 1.2553492695740507e-09 }, "-71.40": { "LJ_approx": { "x0": 1.7180439454408102e-09, "C": 16459.15520614834, "nrep": 3.9149304238974905, "nattr": 1.0043742068193204 }, "Delta_eq": 1.2566584760815426e-09 }, "-54.00": { "LJ_approx": { "x0": 1.5959361190370466e-09, "C": 20763.823187183425, "nrep": 3.921785737782814, "nattr": 1.0737976563473925 }, "Delta_eq": 1.302942739961778e-09 }, "-89.50": { "LJ_approx": { "x0": 1.9002701433896942e-09, "C": 11795.576706463997, "nrep": 3.9070059150621814, "nattr": 0.9114123498082551 }, "Delta_eq": 1.2107230911508513e-09 }, "-61.93": { "LJ_approx": { "x0": 1.646473044478369e-09, "C": 18846.9803104403, "nrep": 3.918766624746869, "nattr": 1.044220870098494 }, "Delta_eq": 1.281743450351987e-09 } }, "70.0": { "-61.93": { "LJ_approx": { "x0": 1.6349587829329923e-09, "C": 19362.426097728276, "nrep": 3.9190260877993097, "nattr": 1.0116609492531905 }, "Delta_eq": 1.281743450351987e-09 }, "-71.90": { "LJ_approx": { "x0": 1.7099628637265945e-09, "C": 16789.420291577666, "nrep": 3.9148688185890483, "nattr": 0.9736446481917082 }, "Delta_eq": 1.2553492695740507e-09 }, "-71.40": { "LJ_approx": { "x0": 1.7058388214243274e-09, "C": 16920.455606556396, "nrep": 3.9150834388621805, "nattr": 0.9756530742858303 }, "Delta_eq": 1.2566584760815426e-09 }, "-54.00": { "LJ_approx": { "x0": 1.584940743805642e-09, "C": 21319.35643207058, "nrep": 3.9221277053335264, "nattr": 1.0389567310289958 }, "Delta_eq": 1.302942739961778e-09 }, "-89.50": { "LJ_approx": { "x0": 1.886457143504779e-09, "C": 12139.584658299475, "nrep": 3.9069481854501382, "nattr": 0.8948872453823127 }, "Delta_eq": 1.2107230911508513e-09 } }, "150.0": { "-71.90": { "LJ_approx": { "x0": 1.707781542586275e-09, "C": 16870.340248462282, "nrep": 3.914948339378328, "nattr": 0.9660711186661354 }, "Delta_eq": 1.2553492695740507e-09 }, "-71.40": { "LJ_approx": { "x0": 1.7036651779798684e-09, "C": 17001.86272239105, "nrep": 3.9151643485118117, "nattr": 0.9680342060844059 }, "Delta_eq": 1.2566584760815426e-09 }, "-54.00": { "LJ_approx": { "x0": 1.5830041112065094e-09, "C": 21415.64649760579, "nrep": 3.9222512220871013, "nattr": 1.0303387653647968 }, "Delta_eq": 1.302942739961778e-09 }, "-89.50": { "LJ_approx": { "x0": 1.883936689519767e-09, "C": 12202.390459945682, "nrep": 3.906974649816042, "nattr": 0.8898102498996743 }, "Delta_eq": 1.2107230911508513e-09 }, "-61.93": { "LJ_approx": { "x0": 1.6329212539031057e-09, "C": 19452.44141782297, "nrep": 3.9191317215599946, "nattr": 1.0033715654432673 }, "Delta_eq": 1.281743450351987e-09 } + }, + "16.0": { + "-71.90": { + "LJ_approx": { + "x0": 1.721337196662225e-09, + "C": 16363.738563915058, + "nrep": 3.9147202446411637, + "nattr": 1.0005179992350384 + }, + "Delta_eq": 1.2553492695740507e-09 + }, + "-71.40": { + "LJ_approx": { + "x0": 1.717176984027311e-09, + "C": 16491.97282711717, + "nrep": 3.9149293522242252, + "nattr": 1.0027563589061772 + }, + "Delta_eq": 1.2566584760815426e-09 + }, + "-54.00": { + "LJ_approx": { + "x0": 1.5951507107307484e-09, + "C": 20803.713978702526, + "nrep": 3.921796023871708, + "nattr": 1.0717457519944718 + }, + "Delta_eq": 1.302942739961778e-09 + }, + "-89.50": { + "LJ_approx": { + "x0": 1.899300769652515e-09, + "C": 11819.650350288994, + "nrep": 3.906993085458305, + "nattr": 0.9105930969008011 + }, + "Delta_eq": 1.2107230911508513e-09 + }, + "-61.93": { + "LJ_approx": { + "x0": 1.6456524054221337e-09, + "C": 18883.851022856303, + "nrep": 3.918771854603072, + "nattr": 1.042336408142498 + }, + "Delta_eq": 1.281743450351987e-09 + } } } \ No newline at end of file diff --git a/PySONIC/neurons/base.py b/PySONIC/neurons/base.py index b02c91c..4e786f0 100644 --- a/PySONIC/neurons/base.py +++ b/PySONIC/neurons/base.py @@ -1,160 +1,165 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Date: 2017-08-03 11:53:04 # @Email: theo.lemaire@epfl.ch # @Last Modified by: Theo Lemaire -# @Last Modified time: 2018-08-21 15:27:49 +# @Last Modified time: 2018-09-19 15:26:56 ''' Module standard API for all neuron mechanisms. Each mechanism class can use different methods to define the membrane dynamics of a specific neuron type. However, they must contain some mandatory attributes and methods in order to be properly imported in other sonic modules and used in NICE simulations. ''' import abc +import numpy as np class BaseMech(metaclass=abc.ABCMeta): ''' Abstract class defining the common API (i.e. mandatory attributes and methods) of all subclasses implementing the channels mechanisms of specific neurons. The mandatory attributes are: - **name**: a string defining the name of the mechanism. - **Cm0**: a float defining the membrane resting capacitance (in F/m2) - **Vm0**: a float defining the membrane resting potential (in mV) - **states_names**: a list of strings defining the names of the different state probabilities governing the channels behaviour (i.e. the differential HH variables). - **states0**: a 1D array of floats (NOT integers !!!) defining the initial values of the different state probabilities. - **coeff_names**: a list of strings defining the names of the different coefficients to be used in effective simulations. The mandatory methods are: - **currNet**: compute the net ionic current density (in mA/m2) across the membrane, given a specific membrane potential (in mV) and channel states. - **steadyStates**: compute the channels steady-state values for a specific membrane potential value (in mV). - **derStates**: compute the derivatives of channel states, given a specific membrane potential (in mV) and channel states. This method must return a list of derivatives ordered identically as in the states0 attribute. - **getEffRates**: get the effective rate constants of ion channels to be used in effective simulations. This method must return an array of effective rates ordered identically as in the coeff_names attribute. - **derStatesEff**: compute the effective derivatives of channel states, based on 1-dimensional linear interpolators of "effective" coefficients. This method must return a list of derivatives ordered identically as in the states0 attribute. ''' @property @abc.abstractmethod def name(self): return 'Should never reach here' @property @abc.abstractmethod def Cm0(self): return 'Should never reach here' @property @abc.abstractmethod def Vm0(self): return 'Should never reach here' # @property # @abc.abstractmethod # def states_names(self): # return 'Should never reach here' # @property # @abc.abstractmethod # def states0(self): # return 'Should never reach here' # @property # @abc.abstractmethod # def coeff_names(self): # return 'Should never reach here' @abc.abstractmethod def currNet(self, Vm, states): ''' Compute the net ionic current per unit area. :param Vm: membrane potential (mV) :states: state probabilities of the ion channels :return: current per unit area (mA/m2) ''' @abc.abstractmethod def steadyStates(self, Vm): ''' Compute the channels steady-state values for a specific membrane potential value. :param Vm: membrane potential (mV) :return: array of steady-states ''' @abc.abstractmethod def derStates(self, Vm, states): ''' Compute the derivatives of channel states. :param Vm: membrane potential (mV) :states: state probabilities of the ion channels :return: current per unit area (mA/m2) ''' @abc.abstractmethod def getEffRates(self, Vm): ''' Get the effective rate constants of ion channels, averaged along an acoustic cycle, for future use in effective simulations. :param Vm: array of membrane potential values for an acoustic cycle (mV) :return: an array of rate average constants (s-1) ''' @abc.abstractmethod def derStatesEff(self, Qm, states, interp_data): ''' Compute the effective derivatives of channel states, based on 1-dimensional linear interpolation of "effective" coefficients that summarize the system's behaviour over an acoustic cycle. :param Qm: membrane charge density (C/m2) :states: state probabilities of the ion channels :param interp_data: dictionary of 1D vectors of "effective" coefficients over the charge domain, for specific frequency and amplitude values. ''' def getGates(self): ''' Retrieve the names of the neuron's states that match an ion channel gating. ''' gates = [] for x in self.states_names: if 'alpha{}'.format(x.lower()) in self.coeff_names: gates.append(x) return gates def getRates(self, Vm): ''' Compute the ion channels rate constants for a given membrane potential. :param Vm: membrane potential (mV) :return: a dictionary of rate constants and their values at the given potential. ''' rates = {} for x in self.getGates(): x = x.lower() alpha_str, beta_str = ['{}{}'.format(s, x.lower()) for s in ['alpha', 'beta']] inf_str, tau_str = ['{}inf'.format(x.lower()), 'tau{}'.format(x.lower())] if hasattr(self, 'alpha{}'.format(x)): alphax = getattr(self, alpha_str)(Vm) betax = getattr(self, beta_str)(Vm) elif hasattr(self, '{}inf'.format(x)): xinf = getattr(self, inf_str)(Vm) taux = getattr(self, tau_str)(Vm) alphax = xinf / taux betax = 1 / taux - alphax rates[alpha_str] = alphax rates[beta_str] = betax return rates + + def vtrap(self, x, y): + ''' Generic function used to compute rate constants. ''' + return x / (np.exp(x / y) - 1) diff --git a/PySONIC/neurons/cortical.py b/PySONIC/neurons/cortical.py index 4bd0e71..fc09cc0 100644 --- a/PySONIC/neurons/cortical.py +++ b/PySONIC/neurons/cortical.py @@ -1,817 +1,818 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Date: 2017-07-31 15:19:51 # @Email: theo.lemaire@epfl.ch # @Last Modified by: Theo Lemaire -# @Last Modified time: 2018-08-21 16:10:36 +# @Last Modified time: 2018-09-19 15:53:34 ''' Channels mechanisms for thalamic neurons. ''' import logging import numpy as np from .base import BaseMech + # Get package logger logger = logging.getLogger('PySONIC') class Cortical(BaseMech): ''' Class defining the generic membrane channel dynamics of a cortical neuron with 4 different current types: - Inward Sodium current - Outward, delayed-rectifier Potassium current - Outward, slow non.inactivating Potassium current - Non-specific leakage current This generic class cannot be used directly as it does not contain any specific parameters. Reference: *Pospischil, M., Toledo-Rodriguez, M., Monier, C., Piwkowska, Z., Bal, T., Frégnac, Y., Markram, H., and Destexhe, A. (2008). Minimal Hodgkin-Huxley type models for different classes of cortical and thalamic neurons. Biol Cybern 99, 427–441.* ''' # Generic biophysical parameters of cortical cells Cm0 = 1e-2 # Cell membrane resting capacitance (F/m2) Vm0 = 0.0 # Dummy value for membrane potential (mV) VNa = 50.0 # Sodium Nernst potential (mV) VK = -90.0 # Potassium Nernst potential (mV) VCa = 120.0 # # Calcium Nernst potential (mV) def __init__(self): ''' Constructor of the class ''' # Names and initial states of the channels state probabilities self.states_names = ['m', 'h', 'n', 'p'] self.states0 = np.array([]) # Names of the different coefficients to be averaged in a lookup table. self.coeff_names = ['alpham', 'betam', 'alphah', 'betah', 'alphan', 'betan', 'alphap', 'betap'] # Charge interval bounds for lookup creation self.Qbounds = (np.round(self.Vm0 - 25.0) * 1e-5, 50.0e-5) def alpham(self, Vm): ''' Compute the alpha rate for the open-probability of Sodium channels. :param Vm: membrane potential (mV) :return: rate constant (s-1) ''' Vdiff = Vm - self.VT - alpha = (-0.32 * (Vdiff - 13) / (np.exp(- (Vdiff - 13) / 4) - 1)) # ms-1 + alpha = 0.32 * self.vtrap(13 - Vdiff, 4) # ms-1 return alpha * 1e3 # s-1 def betam(self, Vm): ''' Compute the beta rate for the open-probability of Sodium channels. :param Vm: membrane potential (mV) :return: rate constant (s-1) ''' Vdiff = Vm - self.VT - beta = (0.28 * (Vdiff - 40) / (np.exp((Vdiff - 40) / 5) - 1)) # ms-1 + beta = 0.28 * self.vtrap(Vdiff - 40, 5) # ms-1 return beta * 1e3 # s-1 def alphah(self, Vm): ''' Compute the alpha rate for the inactivation-probability of Sodium channels. :param Vm: membrane potential (mV) :return: rate constant (s-1) ''' Vdiff = Vm - self.VT alpha = (0.128 * np.exp(-(Vdiff - 17) / 18)) # ms-1 return alpha * 1e3 # s-1 def betah(self, Vm): ''' Compute the beta rate for the inactivation-probability of Sodium channels. :param Vm: membrane potential (mV) :return: rate constant (s-1) ''' Vdiff = Vm - self.VT beta = (4 / (1 + np.exp(-(Vdiff - 40) / 5))) # ms-1 return beta * 1e3 # s-1 def alphan(self, Vm): ''' Compute the alpha rate for the open-probability of delayed-rectifier Potassium channels. :param Vm: membrane potential (mV) :return: rate constant (s-1) ''' Vdiff = Vm - self.VT - alpha = (-0.032 * (Vdiff - 15) / (np.exp(-(Vdiff - 15) / 5) - 1)) # ms-1 + alpha = 0.032 * self.vtrap(15 - Vdiff, 5) # ms-1 return alpha * 1e3 # s-1 def betan(self, Vm): ''' Compute the beta rate for the open-probability of delayed-rectifier Potassium channels. :param Vm: membrane potential (mV) :return: rate constant (s-1) ''' Vdiff = Vm - self.VT beta = (0.5 * np.exp(-(Vdiff - 10) / 40)) # ms-1 return beta * 1e3 # s-1 def pinf(self, Vm): ''' Compute the asymptotic value of the open-probability of slow non-inactivating Potassium channels. :param Vm: membrane potential (mV) :return: asymptotic probability (-) ''' return 1.0 / (1 + np.exp(-(Vm + 35) / 10)) # prob def taup(self, Vm): ''' Compute the decay time constant for adaptation of slow non-inactivating Potassium channels. :param Vm: membrane potential (mV) :return: decayed time constant (s) ''' return self.TauMax / (3.3 * np.exp((Vm + 35) / 20) + np.exp(-(Vm + 35) / 20)) # s def derM(self, Vm, m): ''' Compute the evolution of the open-probability of Sodium channels. :param Vm: membrane potential (mV) :param m: open-probability of Sodium channels (prob) :return: derivative of open-probability w.r.t. time (prob/s) ''' return self.alpham(Vm) * (1 - m) - self.betam(Vm) * m def derH(self, Vm, h): ''' Compute the evolution of the inactivation-probability of Sodium channels. :param Vm: membrane potential (mV) :param h: inactivation-probability of Sodium channels (prob) :return: derivative of open-probability w.r.t. time (prob/s) ''' return self.alphah(Vm) * (1 - h) - self.betah(Vm) * h def derN(self, Vm, n): ''' Compute the evolution of the open-probability of delayed-rectifier Potassium channels. :param Vm: membrane potential (mV) :param n: open-probability of delayed-rectifier Potassium channels (prob) :return: derivative of open-probability w.r.t. time (prob/s) ''' return self.alphan(Vm) * (1 - n) - self.betan(Vm) * n def derP(self, Vm, p): ''' Compute the evolution of the open-probability of slow non-inactivating Potassium channels. :param Vm: membrane potential (mV) :param p: open-probability of slow non-inactivating Potassium channels (prob) :return: derivative of open-probability w.r.t. time (prob/s) ''' return (self.pinf(Vm) - p) / self.taup(Vm) def currNa(self, m, h, Vm): ''' Compute the inward Sodium current per unit area. :param m: open-probability of Sodium channels :param h: inactivation-probability of Sodium channels :param Vm: membrane potential (mV) :return: current per unit area (mA/m2) ''' GNa = self.GNaMax * m**3 * h return GNa * (Vm - self.VNa) def currK(self, n, Vm): ''' Compute the outward, delayed-rectifier Potassium current per unit area. :param n: open-probability of delayed-rectifier Potassium channels :param Vm: membrane potential (mV) :return: current per unit area (mA/m2) ''' GK = self.GKMax * n**4 return GK * (Vm - self.VK) def currM(self, p, Vm): ''' Compute the outward, slow non-inactivating Potassium current per unit area. :param p: open-probability of the slow non-inactivating Potassium channels :param Vm: membrane potential (mV) :return: current per unit area (mA/m2) ''' GM = self.GMMax * p return GM * (Vm - self.VK) def currL(self, Vm): ''' Compute the non-specific leakage current per unit area. :param Vm: membrane potential (mV) :return: current per unit area (mA/m2) ''' return self.GL * (Vm - self.VL) def currNet(self, Vm, states): ''' Concrete implementation of the abstract API method. ''' m, h, n, p = states return (self.currNa(m, h, Vm) + self.currK(n, Vm) + self.currM(p, Vm) + self.currL(Vm)) # mA/m2 def steadyStates(self, Vm): ''' Concrete implementation of the abstract API method. ''' # Solve the equation dx/dt = 0 at Vm for each x-state meq = self.alpham(Vm) / (self.alpham(Vm) + self.betam(Vm)) heq = self.alphah(Vm) / (self.alphah(Vm) + self.betah(Vm)) neq = self.alphan(Vm) / (self.alphan(Vm) + self.betan(Vm)) peq = self.pinf(Vm) return np.array([meq, heq, neq, peq]) def derStates(self, Vm, states): ''' Concrete implementation of the abstract API method. ''' m, h, n, p = states dmdt = self.derM(Vm, m) dhdt = self.derH(Vm, h) dndt = self.derN(Vm, n) dpdt = self.derP(Vm, p) return [dmdt, dhdt, dndt, dpdt] def getEffRates(self, Vm): ''' Concrete implementation of the abstract API method. ''' # Compute average cycle value for rate constants am_avg = np.mean(self.alpham(Vm)) bm_avg = np.mean(self.betam(Vm)) ah_avg = np.mean(self.alphah(Vm)) bh_avg = np.mean(self.betah(Vm)) an_avg = np.mean(self.alphan(Vm)) bn_avg = np.mean(self.betan(Vm)) + Tp = self.taup(Vm) pinf = self.pinf(Vm) ap_avg = np.mean(pinf / Tp) bp_avg = np.mean(1 / Tp) - ap_avg # Return array of coefficients return np.array([am_avg, bm_avg, ah_avg, bh_avg, an_avg, bn_avg, ap_avg, bp_avg]) def derStatesEff(self, Qm, states, interp_data): ''' Concrete implementation of the abstract API method. ''' rates = np.array([np.interp(Qm, interp_data['Q'], interp_data[rn]) for rn in self.coeff_names]) m, h, n, p = states dmdt = rates[0] * (1 - m) - rates[1] * m dhdt = rates[2] * (1 - h) - rates[3] * h dndt = rates[4] * (1 - n) - rates[5] * n dpdt = rates[6] * (1 - p) - rates[7] * p return [dmdt, dhdt, dndt, dpdt] class CorticalRS(Cortical): ''' Specific membrane channel dynamics of a cortical regular spiking, excitatory pyramidal neuron. Reference: *Pospischil, M., Toledo-Rodriguez, M., Monier, C., Piwkowska, Z., Bal, T., Frégnac, Y., Markram, H., and Destexhe, A. (2008). Minimal Hodgkin-Huxley type models for different classes of cortical and thalamic neurons. Biol Cybern 99, 427–441.* ''' # Name of channel mechanism name = 'RS' # Cell-specific biophysical parameters Vm0 = -71.9 # Cell membrane resting potential (mV) GNaMax = 560.0 # Max. conductance of Sodium current (S/m^2) GKMax = 60.0 # Max. conductance of delayed Potassium current (S/m^2) GMMax = 0.75 # Max. conductance of slow non-inactivating Potassium current (S/m^2) GL = 0.205 # Conductance of non-specific leakage current (S/m^2) VL = -70.3 # Non-specific leakage Nernst potential (mV) VT = -56.2 # Spike threshold adjustment parameter (mV) TauMax = 0.608 # Max. adaptation decay of slow non-inactivating Potassium current (s) # Default plotting scheme pltvars_scheme = { 'i_{Na}\ kin.': ['m', 'h'], 'i_K\ kin.': ['n'], 'i_M\ kin.': ['p'], 'I': ['iNa', 'iK', 'iM', 'iL', 'iNet'] } def __init__(self): ''' Constructor of the class. ''' # Instantiate parent class super().__init__() # Define initial channel probabilities (solving dx/dt = 0 at resting potential) self.states0 = self.steadyStates(self.Vm0) class CorticalFS(Cortical): ''' Specific membrane channel dynamics of a cortical fast-spiking, inhibitory neuron. Reference: *Pospischil, M., Toledo-Rodriguez, M., Monier, C., Piwkowska, Z., Bal, T., Frégnac, Y., Markram, H., and Destexhe, A. (2008). Minimal Hodgkin-Huxley type models for different classes of cortical and thalamic neurons. Biol Cybern 99, 427–441.* ''' # Name of channel mechanism name = 'FS' # Cell-specific biophysical parameters Vm0 = -71.4 # Cell membrane resting potential (mV) GNaMax = 580.0 # Max. conductance of Sodium current (S/m^2) GKMax = 39.0 # Max. conductance of delayed Potassium current (S/m^2) GMMax = 0.787 # Max. conductance of slow non-inactivating Potassium current (S/m^2) GL = 0.38 # Conductance of non-specific leakage current (S/m^2) VL = -70.4 # Non-specific leakage Nernst potential (mV) VT = -57.9 # Spike threshold adjustment parameter (mV) TauMax = 0.502 # Max. adaptation decay of slow non-inactivating Potassium current (s) # Default plotting scheme pltvars_scheme = { 'i_{Na}\ kin.': ['m', 'h'], 'i_K\ kin.': ['n'], 'i_M\ kin.': ['p'], 'I': ['iNa', 'iK', 'iM', 'iL', 'iNet'] } def __init__(self): ''' Constructor of the class. ''' # Instantiate parent class super().__init__() # Define initial channel probabilities (solving dx/dt = 0 at resting potential) self.states0 = self.steadyStates(self.Vm0) class CorticalLTS(Cortical): ''' Specific membrane channel dynamics of a cortical low-threshold spiking, inhibitory neuron with an additional inward Calcium current due to the presence of a T-type channel. References: *Pospischil, M., Toledo-Rodriguez, M., Monier, C., Piwkowska, Z., Bal, T., Frégnac, Y., Markram, H., and Destexhe, A. (2008). Minimal Hodgkin-Huxley type models for different classes of cortical and thalamic neurons. Biol Cybern 99, 427–441.* *Huguenard, J.R., and McCormick, D.A. (1992). Simulation of the currents involved in rhythmic oscillations in thalamic relay neurons. J. Neurophysiol. 68, 1373–1383.* ''' # Name of channel mechanism name = 'LTS' # Cell-specific biophysical parameters Vm0 = -54.0 # Cell membrane resting potential (mV) GNaMax = 500.0 # Max. conductance of Sodium current (S/m^2) GKMax = 40.0 # Max. conductance of delayed Potassium current (S/m^2) GMMax = 0.28 # Max. conductance of slow non-inactivating Potassium current (S/m^2) GTMax = 4.0 # Max. conductance of low-threshold Calcium current (S/m^2) GL = 0.19 # Conductance of non-specific leakage current (S/m^2) VL = -50.0 # Non-specific leakage Nernst potential (mV) VT = -50.0 # Spike threshold adjustment parameter (mV) TauMax = 4.0 # Max. adaptation decay of slow non-inactivating Potassium current (s) Vx = -7.0 # Voltage-dependence uniform shift factor at 36°C (mV) # Default plotting scheme pltvars_scheme = { 'i_{Na}\ kin.': ['m', 'h'], 'i_K\ kin.': ['n'], 'i_M\ kin.': ['p'], 'i_T\ kin.': ['s', 'u'], 'I': ['iNa', 'iK', 'iM', 'iT', 'iL', 'iNet'] } def __init__(self): ''' Constructor of the class. ''' # Instantiate parent class super().__init__() # Add names of cell-specific Calcium channel probabilities self.states_names += ['s', 'u'] # Define initial channel probabilities (solving dx/dt = 0 at resting potential) self.states0 = self.steadyStates(self.Vm0) # Define the names of the different coefficients to be averaged in a lookup table. self.coeff_names += ['alphas', 'betas', 'alphau', 'betau'] def sinf(self, Vm): ''' Compute the asymptotic value of the open-probability of the S-type, activation gate of Calcium channels. :param Vm: membrane potential (mV) :return: asymptotic probability (-) ''' return 1.0 / (1.0 + np.exp(-(Vm + self.Vx + 57.0) / 6.2)) # prob def taus(self, Vm): ''' Compute the decay time constant for adaptation of S-type, activation gate of Calcium channels. :param Vm: membrane potential (mV) :return: decayed time constant (s) ''' tmp = np.exp(-(Vm + self.Vx + 132.0) / 16.7) + np.exp((Vm + self.Vx + 16.8) / 18.2) return 1.0 / 3.7 * (0.612 + 1.0 / tmp) * 1e-3 # s def uinf(self, Vm): ''' Compute the asymptotic value of the open-probability of the U-type, inactivation gate of Calcium channels. :param Vm: membrane potential (mV) :return: asymptotic probability (-) ''' return 1.0 / (1.0 + np.exp((Vm + self.Vx + 81.0) / 4.0)) # prob def tauu(self, Vm): ''' Compute the decay time constant for adaptation of U-type, inactivation gate of Calcium channels. :param Vm: membrane potential (mV) :return: decayed time constant (s) ''' if Vm + self.Vx < -80.0: return 1.0 / 3.7 * np.exp((Vm + self.Vx + 467.0) / 66.6) * 1e-3 # s else: return 1.0 / 3.7 * (np.exp(-(Vm + self.Vx + 22) / 10.5) + 28.0) * 1e-3 # s def derS(self, Vm, s): ''' Compute the evolution of the open-probability of the S-type, activation gate of Calcium channels. :param Vm: membrane potential (mV) :param s: open-probability of S-type Calcium activation gates (prob) :return: derivative of open-probability w.r.t. time (prob/s) ''' return (self.sinf(Vm) - s) / self.taus(Vm) def derU(self, Vm, u): ''' Compute the evolution of the open-probability of the U-type, inactivation gate of Calcium channels. :param Vm: membrane potential (mV) :param u: open-probability of U-type Calcium inactivation gates (prob) :return: derivative of open-probability w.r.t. time (prob/s) ''' return (self.uinf(Vm) - u) / self.tauu(Vm) def currCa(self, s, u, Vm): ''' Compute the inward, low-threshold Calcium current per unit area. :param s: open-probability of the S-type activation gate of Calcium channels :param u: open-probability of the U-type inactivation gate of Calcium channels :param Vm: membrane potential (mV) :return: current per unit area (mA/m2) ''' GT = self.GTMax * s**2 * u return GT * (Vm - self.VCa) def currNet(self, Vm, states): ''' Concrete implementation of the abstract API method. ''' m, h, n, p, s, u = states return (self.currNa(m, h, Vm) + self.currK(n, Vm) + self.currM(p, Vm) + self.currCa(s, u, Vm) + self.currL(Vm)) # mA/m2 def steadyStates(self, Vm): ''' Concrete implementation of the abstract API method. ''' # Call parent method to compute Sodium and Potassium channels gates steady-states NaK_eqstates = super().steadyStates(Vm) # Compute Calcium channel gates steady-states seq = self.sinf(Vm) ueq = self.uinf(Vm) Ca_eqstates = np.array([seq, ueq]) # Merge all steady-states and return return np.concatenate((NaK_eqstates, Ca_eqstates)) def derStates(self, Vm, states): ''' Concrete implementation of the abstract API method. ''' # Unpack input states *NaK_states, s, u = states # Call parent method to compute Sodium and Potassium channels states derivatives NaK_derstates = super().derStates(Vm, NaK_states) # Compute Calcium channels states derivatives dsdt = self.derS(Vm, s) dudt = self.derU(Vm, u) # Merge all states derivatives and return return NaK_derstates + [dsdt, dudt] def getEffRates(self, Vm): ''' Concrete implementation of the abstract API method. ''' # Call parent method to compute Sodium and Potassium effective rate constants NaK_rates = super().getEffRates(Vm) # Compute Calcium effective rate constants Ts = self.taus(Vm) as_avg = np.mean(self.sinf(Vm) / Ts) bs_avg = np.mean(1 / Ts) - as_avg Tu = np.array([self.tauu(v) for v in Vm]) au_avg = np.mean(self.uinf(Vm) / Tu) bu_avg = np.mean(1 / Tu) - au_avg Ca_rates = np.array([as_avg, bs_avg, au_avg, bu_avg]) # Merge all rates and return return np.concatenate((NaK_rates, Ca_rates)) def derStatesEff(self, Qm, states, interp_data): ''' Concrete implementation of the abstract API method. ''' # Unpack input states *NaK_states, s, u = states # Call parent method to compute Sodium and Potassium channels states derivatives NaK_dstates = super().derStatesEff(Qm, NaK_states, interp_data) # Compute Calcium channels states derivatives Ca_rates = np.array([np.interp(Qm, interp_data['Q'], interp_data[rn]) for rn in self.coeff_names[8:]]) dsdt = Ca_rates[0] * (1 - s) - Ca_rates[1] * s dudt = Ca_rates[2] * (1 - u) - Ca_rates[3] * u # Merge all states derivatives and return return NaK_dstates + [dsdt, dudt] class CorticalIB(Cortical): ''' Specific membrane channel dynamics of a cortical intrinsically bursting neuron with an additional inward Calcium current due to the presence of a L-type channel. References: *Pospischil, M., Toledo-Rodriguez, M., Monier, C., Piwkowska, Z., Bal, T., Frégnac, Y., Markram, H., and Destexhe, A. (2008). Minimal Hodgkin-Huxley type models for different classes of cortical and thalamic neurons. Biol Cybern 99, 427–441.* *Reuveni, I., Friedman, A., Amitai, Y., and Gutnick, M.J. (1993). Stepwise repolarization from Ca2+ plateaus in neocortical pyramidal cells: evidence for nonhomogeneous distribution of HVA Ca2+ channels in dendrites. J. Neurosci. 13, 4609–4621.* ''' # Name of channel mechanism name = 'IB' # Cell-specific biophysical parameters Vm0 = -71.4 # Cell membrane resting potential (mV) GNaMax = 500 # Max. conductance of Sodium current (S/m^2) GKMax = 50 # Max. conductance of delayed Potassium current (S/m^2) GMMax = 0.3 # Max. conductance of slow non-inactivating Potassium current (S/m^2) GCaLMax = 1.0 # Max. conductance of L-type Calcium current (S/m^2) GL = 0.1 # Conductance of non-specific leakage current (S/m^2) VL = -70 # Non-specific leakage Nernst potential (mV) VT = -56.2 # Spike threshold adjustment parameter (mV) TauMax = 0.608 # Max. adaptation decay of slow non-inactivating Potassium current (s) # Default plotting scheme pltvars_scheme = { 'i_{Na}\ kin.': ['m', 'h'], 'i_K\ kin.': ['n'], 'i_M\ kin.': ['p'], 'i_{CaL}\ kin.': ['q', 'r', 'q2r'], 'I': ['iNa', 'iK', 'iM', 'iCaL', 'iL', 'iNet'] } def __init__(self): ''' Constructor of the class. ''' # Instantiate parent class super().__init__() # Add names of cell-specific Calcium channel probabilities self.states_names += ['q', 'r'] # Define initial channel probabilities (solving dx/dt = 0 at resting potential) self.states0 = self.steadyStates(self.Vm0) # Define the names of the different coefficients to be averaged in a lookup table. self.coeff_names += ['alphaq', 'betaq', 'alphar', 'betar'] def alphaq(self, Vm): ''' Compute the alpha rate for the open-probability of L-type Calcium channels. :param Vm: membrane potential (mV) :return: rate constant (s-1) ''' - - alpha = - 0.055 * (Vm + 27) / (np.exp(-(Vm + 27) / 3.8) - 1) # ms-1 + alpha = 0.055 * self.vtrap(-(Vm + 27), 3.8) # ms-1 return alpha * 1e3 # s-1 def betaq(self, Vm): ''' Compute the beta rate for the open-probability of L-type Calcium channels. :param Vm: membrane potential (mV) :return: rate constant (s-1) ''' beta = 0.94 * np.exp(-(Vm + 75) / 17) # ms-1 return beta * 1e3 # s-1 def alphar(self, Vm): ''' Compute the alpha rate for the inactivation-probability of L-type Calcium channels. :param Vm: membrane potential (mV) :return: rate constant (s-1) ''' alpha = 0.000457 * np.exp(-(Vm + 13) / 50) # ms-1 return alpha * 1e3 # s-1 def betar(self, Vm): ''' Compute the beta rate for the inactivation-probability of L-type Calcium channels. :param Vm: membrane potential (mV) :return: rate constant (s-1) ''' beta = 0.0065 / (np.exp(-(Vm + 15) / 28) + 1) # ms-1 return beta * 1e3 # s-1 def derQ(self, Vm, q): ''' Compute the evolution of the open-probability of the Q (activation) gate of L-type Calcium channels. :param Vm: membrane potential (mV) :param q: open-probability of Q gate (prob) :return: derivative of open-probability w.r.t. time (prob/s) ''' return self.alphaq(Vm) * (1 - q) - self.betaq(Vm) * q def derR(self, Vm, r): ''' Compute the evolution of the open-probability of the R (inactivation) gate of L-type Calcium channels. :param Vm: membrane potential (mV) :param r: open-probability of R gate (prob) :return: derivative of open-probability w.r.t. time (prob/s) ''' return self.alphar(Vm) * (1 - r) - self.betar(Vm) * r def currCaL(self, q, r, Vm): ''' Compute the inward L-type Calcium current per unit area. :param q: open-probability of Q gate (prob) :param r: open-probability of R gate (prob) :param Vm: membrane potential (mV) :return: current per unit area (mA/m2) ''' GCaL = self.GCaLMax * q**2 * r return GCaL * (Vm - self.VCa) def currNet(self, Vm, states): ''' Concrete implementation of the abstract API method. ''' m, h, n, p, q, r = states return (self.currNa(m, h, Vm) + self.currK(n, Vm) + self.currM(p, Vm) + self.currCaL(q, r, Vm) + self.currL(Vm)) # mA/m2 def steadyStates(self, Vm): ''' Concrete implementation of the abstract API method. ''' # Call parent method to compute Sodium and Potassium channels gates steady-states NaK_eqstates = super().steadyStates(Vm) # Compute L-type Calcium channel gates steady-states qeq = self.alphaq(Vm) / (self.alphaq(Vm) + self.betaq(Vm)) req = self.alphar(Vm) / (self.alphar(Vm) + self.betar(Vm)) CaL_eqstates = np.array([qeq, req]) # Merge all steady-states and return return np.concatenate((NaK_eqstates, CaL_eqstates)) def derStates(self, Vm, states): ''' Concrete implementation of the abstract API method. ''' # Unpack input states *NaK_states, q, r = states # Call parent method to compute Sodium and Potassium channels states derivatives NaK_derstates = super().derStates(Vm, NaK_states) # Compute L-type Calcium channels states derivatives dqdt = self.derQ(Vm, q) drdt = self.derR(Vm, r) # Merge all states derivatives and return return NaK_derstates + [dqdt, drdt] def getEffRates(self, Vm): ''' Concrete implementation of the abstract API method. ''' # Call parent method to compute Sodium and Potassium effective rate constants NaK_rates = super().getEffRates(Vm) # Compute Calcium effective rate constants aq_avg = np.mean(self.alphaq(Vm)) bq_avg = np.mean(self.betaq(Vm)) ar_avg = np.mean(self.alphar(Vm)) br_avg = np.mean(self.betar(Vm)) CaL_rates = np.array([aq_avg, bq_avg, ar_avg, br_avg]) # Merge all rates and return return np.concatenate((NaK_rates, CaL_rates)) def derStatesEff(self, Qm, states, interp_data): ''' Concrete implementation of the abstract API method. ''' # Unpack input states *NaK_states, q, r = states # Call parent method to compute Sodium and Potassium channels states derivatives NaK_dstates = super().derStatesEff(Qm, NaK_states, interp_data) # Compute Calcium channels states derivatives CaL_rates = np.array([np.interp(Qm, interp_data['Q'], interp_data[rn]) for rn in self.coeff_names[8:]]) dqdt = CaL_rates[0] * (1 - q) - CaL_rates[1] * q drdt = CaL_rates[2] * (1 - r) - CaL_rates[3] * r # Merge all states derivatives and return return NaK_dstates + [dqdt, drdt] diff --git a/PySONIC/neurons/thalamic.py b/PySONIC/neurons/thalamic.py index 3cf7495..4f8fd77 100644 --- a/PySONIC/neurons/thalamic.py +++ b/PySONIC/neurons/thalamic.py @@ -1,792 +1,792 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Date: 2017-07-31 15:20:54 # @Email: theo.lemaire@epfl.ch # @Last Modified by: Theo Lemaire -# @Last Modified time: 2018-08-21 16:10:36 +# @Last Modified time: 2018-09-19 16:10:09 ''' Channels mechanisms for thalamic neurons. ''' import logging import numpy as np from .base import BaseMech # Get package logger logger = logging.getLogger('PySONIC') class Thalamic(BaseMech): ''' Class defining the generic membrane channel dynamics of a thalamic neuron with 4 different current types: - Inward Sodium current - Outward Potassium current - Inward Calcium current - Non-specific leakage current This generic class cannot be used directly as it does not contain any specific parameters. Reference: *Plaksin, M., Kimmel, E., and Shoham, S. (2016). Cell-Type-Selective Effects of Intramembrane Cavitation as a Unifying Theoretical Framework for Ultrasonic Neuromodulation. eNeuro 3.* ''' # Generic biophysical parameters of thalamic cells Cm0 = 1e-2 # Cell membrane resting capacitance (F/m2) Vm0 = 0.0 # Dummy value for membrane potential (mV) VNa = 50.0 # Sodium Nernst potential (mV) VK = -90.0 # Potassium Nernst potential (mV) VCa = 120.0 # Calcium Nernst potential (mV) def __init__(self): ''' Constructor of the class ''' # Names and initial states of the channels state probabilities self.states_names = ['m', 'h', 'n', 's', 'u'] self.states0 = np.array([]) # Names of the different coefficients to be averaged in a lookup table. self.coeff_names = ['alpham', 'betam', 'alphah', 'betah', 'alphan', 'betan', 'alphas', 'betas', 'alphau', 'betau'] # Charge interval bounds for lookup creation self.Qbounds = (np.round(self.Vm0 - 25.0) * 1e-5, 50.0e-5) def alpham(self, Vm): ''' Compute the alpha rate for the open-probability of Sodium channels. :param Vm: membrane potential (mV) :return: rate constant (s-1) ''' Vdiff = Vm - self.VT - alpha = (-0.32 * (Vdiff - 13) / (np.exp(- (Vdiff - 13) / 4) - 1)) # ms-1 + alpha = 0.32 * self.vtrap(13 - Vdiff, 4) # ms-1 return alpha * 1e3 # s-1 def betam(self, Vm): ''' Compute the beta rate for the open-probability of Sodium channels. :param Vm: membrane potential (mV) :return: rate constant (s-1) ''' Vdiff = Vm - self.VT - beta = (0.28 * (Vdiff - 40) / (np.exp((Vdiff - 40) / 5) - 1)) # ms-1 + beta = 0.28 * self.vtrap(Vdiff - 40, 5) # ms-1 return beta * 1e3 # s-1 def alphah(self, Vm): ''' Compute the alpha rate for the inactivation-probability of Sodium channels. :param Vm: membrane potential (mV) :return: rate constant (s-1) ''' Vdiff = Vm - self.VT alpha = (0.128 * np.exp(-(Vdiff - 17) / 18)) # ms-1 return alpha * 1e3 # s-1 def betah(self, Vm): ''' Compute the beta rate for the inactivation-probability of Sodium channels. :param Vm: membrane potential (mV) :return: rate constant (s-1) ''' Vdiff = Vm - self.VT beta = (4 / (1 + np.exp(-(Vdiff - 40) / 5))) # ms-1 return beta * 1e3 # s-1 def alphan(self, Vm): ''' Compute the alpha rate for the open-probability of delayed-rectifier Potassium channels. :param Vm: membrane potential (mV) :return: rate constant (s-1) ''' Vdiff = Vm - self.VT - alpha = (-0.032 * (Vdiff - 15) / (np.exp(-(Vdiff - 15) / 5) - 1)) # ms-1 + alpha = 0.032 * self.vtrap(15 - Vdiff, 5) # ms-1 return alpha * 1e3 # s-1 def betan(self, Vm): ''' Compute the beta rate for the open-probability of delayed-rectifier Potassium channels. :param Vm: membrane potential (mV) :return: rate constant (s-1) ''' Vdiff = Vm - self.VT beta = (0.5 * np.exp(-(Vdiff - 10) / 40)) # ms-1 return beta * 1e3 # s-1 def derM(self, Vm, m): ''' Compute the evolution of the open-probability of Sodium channels. :param Vm: membrane potential (mV) :param m: open-probability of Sodium channels (prob) :return: derivative of open-probability w.r.t. time (prob/s) ''' return self.alpham(Vm) * (1 - m) - self.betam(Vm) * m def derH(self, Vm, h): ''' Compute the evolution of the inactivation-probability of Sodium channels. :param Vm: membrane potential (mV) :param h: inactivation-probability of Sodium channels (prob) :return: derivative of open-probability w.r.t. time (prob/s) ''' return self.alphah(Vm) * (1 - h) - self.betah(Vm) * h def derN(self, Vm, n): ''' Compute the evolution of the open-probability of delayed-rectifier Potassium channels. :param Vm: membrane potential (mV) :param n: open-probability of delayed-rectifier Potassium channels (prob) :return: derivative of open-probability w.r.t. time (prob/s) ''' return self.alphan(Vm) * (1 - n) - self.betan(Vm) * n def derS(self, Vm, s): ''' Compute the evolution of the open-probability of the S-type, activation gate of Calcium channels. :param Vm: membrane potential (mV) :param s: open-probability of S-type Calcium activation gates (prob) :return: derivative of open-probability w.r.t. time (prob/s) ''' return (self.sinf(Vm) - s) / self.taus(Vm) def derU(self, Vm, u): ''' Compute the evolution of the open-probability of the U-type, inactivation gate of Calcium channels. :param Vm: membrane potential (mV) :param u: open-probability of U-type Calcium inactivation gates (prob) :return: derivative of open-probability w.r.t. time (prob/s) ''' return (self.uinf(Vm) - u) / self.tauu(Vm) def currNa(self, m, h, Vm): ''' Compute the inward Sodium current per unit area. :param m: open-probability of Sodium channels :param h: inactivation-probability of Sodium channels :param Vm: membrane potential (mV) :return: current per unit area (mA/m2) ''' GNa = self.GNaMax * m**3 * h return GNa * (Vm - self.VNa) def currK(self, n, Vm): ''' Compute the outward delayed-rectifier Potassium current per unit area. :param n: open-probability of delayed-rectifier Potassium channels :param Vm: membrane potential (mV) :return: current per unit area (mA/m2) ''' GK = self.GKMax * n**4 return GK * (Vm - self.VK) def currCa(self, s, u, Vm): ''' Compute the inward Calcium current per unit area. :param s: open-probability of the S-type activation gate of Calcium channels :param u: open-probability of the U-type inactivation gate of Calcium channels :param Vm: membrane potential (mV) :return: current per unit area (mA/m2) ''' GT = self.GTMax * s**2 * u return GT * (Vm - self.VCa) def currL(self, Vm): ''' Compute the non-specific leakage current per unit area. :param Vm: membrane potential (mV) :return: current per unit area (mA/m2) ''' return self.GL * (Vm - self.VL) def currNet(self, Vm, states): ''' Concrete implementation of the abstract API method. ''' m, h, n, s, u = states return (self.currNa(m, h, Vm) + self.currK(n, Vm) + self.currCa(s, u, Vm) + self.currL(Vm)) # mA/m2 def steadyStates(self, Vm): ''' Concrete implementation of the abstract API method. ''' # Solve the equation dx/dt = 0 at Vm for each x-state meq = self.alpham(Vm) / (self.alpham(Vm) + self.betam(Vm)) heq = self.alphah(Vm) / (self.alphah(Vm) + self.betah(Vm)) neq = self.alphan(Vm) / (self.alphan(Vm) + self.betan(Vm)) seq = self.sinf(Vm) ueq = self.uinf(Vm) return np.array([meq, heq, neq, seq, ueq]) def derStates(self, Vm, states): ''' Concrete implementation of the abstract API method. ''' m, h, n, s, u = states dmdt = self.derM(Vm, m) dhdt = self.derH(Vm, h) dndt = self.derN(Vm, n) dsdt = self.derS(Vm, s) dudt = self.derU(Vm, u) return [dmdt, dhdt, dndt, dsdt, dudt] def getEffRates(self, Vm): ''' Concrete implementation of the abstract API method. ''' # Compute average cycle value for rate constants am_avg = np.mean(self.alpham(Vm)) bm_avg = np.mean(self.betam(Vm)) ah_avg = np.mean(self.alphah(Vm)) bh_avg = np.mean(self.betah(Vm)) an_avg = np.mean(self.alphan(Vm)) bn_avg = np.mean(self.betan(Vm)) Ts = self.taus(Vm) as_avg = np.mean(self.sinf(Vm) / Ts) bs_avg = np.mean(1 / Ts) - as_avg Tu = np.array([self.tauu(v) for v in Vm]) au_avg = np.mean(self.uinf(Vm) / Tu) bu_avg = np.mean(1 / Tu) - au_avg # Return array of coefficients return np.array([am_avg, bm_avg, ah_avg, bh_avg, an_avg, bn_avg, as_avg, bs_avg, au_avg, bu_avg]) def derStatesEff(self, Qm, states, interp_data): ''' Concrete implementation of the abstract API method. ''' rates = np.array([np.interp(Qm, interp_data['Q'], interp_data[rn]) for rn in self.coeff_names]) m, h, n, s, u = states dmdt = rates[0] * (1 - m) - rates[1] * m dhdt = rates[2] * (1 - h) - rates[3] * h dndt = rates[4] * (1 - n) - rates[5] * n dsdt = rates[6] * (1 - s) - rates[7] * s dudt = rates[8] * (1 - u) - rates[9] * u return [dmdt, dhdt, dndt, dsdt, dudt] class ThalamicRE(Thalamic): ''' Specific membrane channel dynamics of a thalamic reticular neuron. References: *Destexhe, A., Contreras, D., Steriade, M., Sejnowski, T.J., and Huguenard, J.R. (1996). In vivo, in vitro, and computational analysis of dendritic calcium currents in thalamic reticular neurons. J. Neurosci. 16, 169–185.* *Huguenard, J.R., and Prince, D.A. (1992). A novel T-type current underlies prolonged Ca(2+)-dependent burst firing in GABAergic neurons of rat thalamic reticular nucleus. J. Neurosci. 12, 3804–3817.* ''' # Name of channel mechanism name = 'RE' # Cell-specific biophysical parameters Vm0 = -89.5 # Cell membrane resting potential (mV) GNaMax = 2000.0 # Max. conductance of Sodium current (S/m^2) GKMax = 200.0 # Max. conductance of Potassium current (S/m^2) GTMax = 30.0 # Max. conductance of low-threshold Calcium current (S/m^2) GL = 0.5 # Conductance of non-specific leakage current (S/m^2) VL = -90.0 # Non-specific leakage Nernst potential (mV) VT = -67.0 # Spike threshold adjustment parameter (mV) # Default plotting scheme pltvars_scheme = { 'i_{Na}\ kin.': ['m', 'h', 'm3h'], 'i_K\ kin.': ['n'], 'i_{TS}\ kin.': ['s', 'u', 's2u'], 'I': ['iNa', 'iK', 'iTs', 'iL', 'iNet'] } def __init__(self): ''' Constructor of the class. ''' # Instantiate parent class super().__init__() # Define initial channel probabilities (solving dx/dt = 0 at resting potential) self.states0 = self.steadyStates(self.Vm0) def sinf(self, Vm): ''' Compute the asymptotic value of the open-probability of the S-type, activation gate of Calcium channels. :param Vm: membrane potential (mV) :return: asymptotic probability (-) ''' return 1.0 / (1.0 + np.exp(-(Vm + 52.0) / 7.4)) # prob def taus(self, Vm): ''' Compute the decay time constant for adaptation of S-type, activation gate of Calcium channels. :param Vm: membrane potential (mV) :return: decayed time constant (s) ''' return (1 + 0.33 / (np.exp((Vm + 27.0) / 10.0) + np.exp(-(Vm + 102.0) / 15.0))) * 1e-3 # s def uinf(self, Vm): ''' Compute the asymptotic value of the open-probability of the U-type, inactivation gate of Calcium channels. :param Vm: membrane potential (mV) :return: asymptotic probability (-) ''' return 1.0 / (1.0 + np.exp((Vm + 80.0) / 5.0)) # prob def tauu(self, Vm): ''' Compute the decay time constant for adaptation of U-type, inactivation gate of Calcium channels. :param Vm: membrane potential (mV) :return: decayed time constant (s) ''' return (28.3 + 0.33 / (np.exp((Vm + 48.0) / 4.0) + np.exp(-(Vm + 407.0) / 50.0))) * 1e-3 # s class ThalamoCortical(Thalamic): ''' Specific membrane channel dynamics of a thalamo-cortical neuron, with a specific hyperpolarization-activated, mixed cationic current and a leakage Potassium current. References: *Pospischil, M., Toledo-Rodriguez, M., Monier, C., Piwkowska, Z., Bal, T., Frégnac, Y., Markram, H., and Destexhe, A. (2008). Minimal Hodgkin-Huxley type models for different classes of cortical and thalamic neurons. Biol Cybern 99, 427–441.* *Destexhe, A., Bal, T., McCormick, D.A., and Sejnowski, T.J. (1996). Ionic mechanisms underlying synchronized oscillations and propagating waves in a model of ferret thalamic slices. J. Neurophysiol. 76, 2049–2070.* *McCormick, D.A., and Huguenard, J.R. (1992). A model of the electrophysiological properties of thalamocortical relay neurons. J. Neurophysiol. 68, 1384–1400.* ''' # Name of channel mechanism name = 'TC' # Cell-specific biophysical parameters # Vm0 = -63.4 # Cell membrane resting potential (mV) Vm0 = -61.93 # Cell membrane resting potential (mV) GNaMax = 900.0 # Max. conductance of Sodium current (S/m^2) GKMax = 100.0 # Max. conductance of Potassium current (S/m^2) GTMax = 20.0 # Max. conductance of low-threshold Calcium current (S/m^2) GKL = 0.138 # Conductance of leakage Potassium current (S/m^2) GhMax = 0.175 # Max. conductance of mixed cationic current (S/m^2) GL = 0.1 # Conductance of non-specific leakage current (S/m^2) Vh = -40.0 # Mixed cationic current reversal potential (mV) VL = -70.0 # Non-specific leakage Nernst potential (mV) VT = -52.0 # Spike threshold adjustment parameter (mV) Vx = 0.0 # Voltage-dependence uniform shift factor at 36°C (mV) tau_Ca_removal = 5e-3 # decay time constant for intracellular Ca2+ dissolution (s) CCa_min = 50e-9 # minimal intracellular Calcium concentration (M) deff = 100e-9 # effective depth beneath membrane for intracellular [Ca2+] calculation F_Ca = 1.92988e5 # Faraday constant for bivalent ion (Coulomb / mole) nCa = 4 # number of Calcium binding sites on regulating factor k1 = 2.5e22 # intracellular Ca2+ regulation factor (M-4 s-1) k2 = 0.4 # intracellular Ca2+ regulation factor (s-1) k3 = 100.0 # intracellular Ca2+ regulation factor (s-1) k4 = 1.0 # intracellular Ca2+ regulation factor (s-1) # Default plotting scheme pltvars_scheme = { 'i_{Na}\ kin.': ['m', 'h'], 'i_K\ kin.': ['n'], 'i_{T}\ kin.': ['s', 'u'], 'i_{H}\ kin.': ['O', 'OL', 'O + 2OL'], 'I': ['iNa', 'iK', 'iT', 'iH', 'iKL', 'iL', 'iNet'] } def __init__(self): ''' Constructor of the class. ''' # Instantiate parent class super().__init__() # Compute current to concentration conversion constant self.iT_2_CCa = 1e-6 / (self.deff * self.F_Ca) # Define names of the channels state probabilities self.states_names += ['O', 'C', 'P0', 'C_Ca'] # Define the names of the different coefficients to be averaged in a lookup table. self.coeff_names += ['alphao', 'betao'] # Define initial channel probabilities (solving dx/dt = 0 at resting potential) self.states0 = self.steadyStates(self.Vm0) def sinf(self, Vm): ''' Compute the asymptotic value of the open-probability of the S-type, activation gate of Calcium channels. Reference: *Pospischil, M., Toledo-Rodriguez, M., Monier, C., Piwkowska, Z., Bal, T., Frégnac, Y., Markram, H., and Destexhe, A. (2008). Minimal Hodgkin-Huxley type models for different classes of cortical and thalamic neurons. Biol Cybern 99, 427–441.* :param Vm: membrane potential (mV) :return: asymptotic probability (-) ''' return 1.0 / (1.0 + np.exp(-(Vm + self.Vx + 57.0) / 6.2)) # prob def taus(self, Vm): ''' Compute the decay time constant for adaptation of S-type, activation gate of Calcium channels. Reference: *Pospischil, M., Toledo-Rodriguez, M., Monier, C., Piwkowska, Z., Bal, T., Frégnac, Y., Markram, H., and Destexhe, A. (2008). Minimal Hodgkin-Huxley type models for different classes of cortical and thalamic neurons. Biol Cybern 99, 427–441.* :param Vm: membrane potential (mV) :return: decayed time constant (s) ''' tmp = np.exp(-(Vm + self.Vx + 132.0) / 16.7) + np.exp((Vm + self.Vx + 16.8) / 18.2) return 1.0 / 3.7 * (0.612 + 1.0 / tmp) * 1e-3 # s def uinf(self, Vm): ''' Compute the asymptotic value of the open-probability of the U-type, inactivation gate of Calcium channels. Reference: *Pospischil, M., Toledo-Rodriguez, M., Monier, C., Piwkowska, Z., Bal, T., Frégnac, Y., Markram, H., and Destexhe, A. (2008). Minimal Hodgkin-Huxley type models for different classes of cortical and thalamic neurons. Biol Cybern 99, 427–441.* :param Vm: membrane potential (mV) :return: asymptotic probability (-) ''' return 1.0 / (1.0 + np.exp((Vm + self.Vx + 81.0) / 4.0)) # prob def tauu(self, Vm): ''' Compute the decay time constant for adaptation of U-type, inactivation gate of Calcium channels. Reference: *Pospischil, M., Toledo-Rodriguez, M., Monier, C., Piwkowska, Z., Bal, T., Frégnac, Y., Markram, H., and Destexhe, A. (2008). Minimal Hodgkin-Huxley type models for different classes of cortical and thalamic neurons. Biol Cybern 99, 427–441.* :param Vm: membrane potential (mV) :return: decayed time constant (s) ''' if Vm + self.Vx < -80.0: return 1.0 / 3.7 * np.exp((Vm + self.Vx + 467.0) / 66.6) * 1e-3 # s else: return 1 / 3.7 * (np.exp(-(Vm + self.Vx + 22) / 10.5) + 28.0) * 1e-3 # s def derS(self, Vm, s): ''' Compute the evolution of the open-probability of the S-type, activation gate of Calcium channels. :param Vm: membrane potential (mV) :param s: open-probability of S-type Calcium activation gates (prob) :return: derivative of open-probability w.r.t. time (prob/s) ''' return (self.sinf(Vm) - s) / self.taus(Vm) def derU(self, Vm, u): ''' Compute the evolution of the open-probability of the U-type, inactivation gate of Calcium channels. :param Vm: membrane potential (mV) :param u: open-probability of U-type Calcium inactivation gates (prob) :return: derivative of open-probability w.r.t. time (prob/s) ''' return (self.uinf(Vm) - u) / self.tauu(Vm) def oinf(self, Vm): ''' Voltage-dependent steady-state activation of hyperpolarization-activated cation current channels. Reference: *Huguenard, J.R., and McCormick, D.A. (1992). Simulation of the currents involved in rhythmic oscillations in thalamic relay neurons. J. Neurophysiol. 68, 1373–1383.* :param Vm: membrane potential (mV) :return: steady-state activation (-) ''' return 1.0 / (1.0 + np.exp((Vm + 75.0) / 5.5)) def tauo(self, Vm): ''' Time constant for activation of hyperpolarization-activated cation current channels. Reference: *Huguenard, J.R., and McCormick, D.A. (1992). Simulation of the currents involved in rhythmic oscillations in thalamic relay neurons. J. Neurophysiol. 68, 1373–1383.* :param Vm: membrane potential (mV) :return: time constant (s) ''' return 1 / (np.exp(-14.59 - 0.086 * Vm) + np.exp(-1.87 + 0.0701 * Vm)) * 1e-3 def alphao(self, Vm): ''' Transition rate between closed and open form of hyperpolarization-activated cation current channels. :param Vm: membrane potential (mV) :return: transition rate (s-1) ''' return self.oinf(Vm) / self.tauo(Vm) def betao(self, Vm): ''' Transition rate between open and closed form of hyperpolarization-activated cation current channels. :param Vm: membrane potential (mV) :return: transition rate (s-1) ''' return (1 - self.oinf(Vm)) / self.tauo(Vm) def derC(self, C, O, Vm): ''' Compute the evolution of the proportion of hyperpolarization-activated cation current channels in closed state. Kinetics scheme of Calcium dependent activation derived from: *Destexhe, A., Bal, T., McCormick, D.A., and Sejnowski, T.J. (1996). Ionic mechanisms underlying synchronized oscillations and propagating waves in a model of ferret thalamic slices. J. Neurophysiol. 76, 2049–2070.* :param Vm: membrane potential (mV) :param C: proportion of Ih channels in closed state (-) :param O: proportion of Ih channels in open state (-) :return: derivative of proportion w.r.t. time (s-1) ''' return self.betao(Vm) * O - self.alphao(Vm) * C def derO(self, C, O, P0, Vm): ''' Compute the evolution of the proportion of hyperpolarization-activated cation current channels in open state. Kinetics scheme of Calcium dependent activation derived from: *Destexhe, A., Bal, T., McCormick, D.A., and Sejnowski, T.J. (1996). Ionic mechanisms underlying synchronized oscillations and propagating waves in a model of ferret thalamic slices. J. Neurophysiol. 76, 2049–2070.* :param Vm: membrane potential (mV) :param C: proportion of Ih channels in closed state (-) :param O: proportion of Ih channels in open state (-) :param P0: proportion of Ih channels regulating factor in unbound state (-) :return: derivative of proportion w.r.t. time (s-1) ''' return - self.derC(C, O, Vm) - self.k3 * O * (1 - P0) + self.k4 * (1 - O - C) def derP0(self, P0, C_Ca): ''' Compute the evolution of the proportion of Ih channels regulating factor in unbound state. Kinetics scheme of Calcium dependent activation derived from: *Destexhe, A., Bal, T., McCormick, D.A., and Sejnowski, T.J. (1996). Ionic mechanisms underlying synchronized oscillations and propagating waves in a model of ferret thalamic slices. J. Neurophysiol. 76, 2049–2070.* :param Vm: membrane potential (mV) :param P0: proportion of Ih channels regulating factor in unbound state (-) :param C_Ca: Calcium concentration in effective submembranal space (M) :return: derivative of proportion w.r.t. time (s-1) ''' return self.k2 * (1 - P0) - self.k1 * P0 * C_Ca**self.nCa def derC_Ca(self, C_Ca, ICa): ''' Compute the evolution of the Calcium concentration in submembranal space. Model of Ca2+ buffering and contribution from iCa derived from: *McCormick, D.A., and Huguenard, J.R. (1992). A model of the electrophysiological properties of thalamocortical relay neurons. J. Neurophysiol. 68, 1384–1400.* :param Vm: membrane potential (mV) :param C_Ca: Calcium concentration in submembranal space (M) :param ICa: inward Calcium current filling up the submembranal space with Ca2+ (mA/m2) :return: derivative of Calcium concentration in submembranal space w.r.t. time (s-1) ''' return (self.CCa_min - C_Ca) / self.tau_Ca_removal - self.iT_2_CCa * ICa def currKL(self, Vm): ''' Compute the voltage-dependent leak Potassium current per unit area. :param Vm: membrane potential (mV) :return: current per unit area (mA/m2) ''' return self.GKL * (Vm - self.VK) def currH(self, O, C, Vm): ''' Compute the outward mixed cationic current per unit area. :param O: proportion of the channels in open form :param OL: proportion of the channels in locked-open form :param Vm: membrane potential (mV) :return: current per unit area (mA/m2) ''' OL = 1 - O - C return self.GhMax * (O + 2 * OL) * (Vm - self.Vh) def currNet(self, Vm, states): ''' Concrete implementation of the abstract API method. ''' m, h, n, s, u, O, C, _, _ = states return (self.currNa(m, h, Vm) + self.currK(n, Vm) + self.currCa(s, u, Vm) + self.currKL(Vm) + self.currH(O, C, Vm) + self.currL(Vm)) # mA/m2 def steadyStates(self, Vm): ''' Concrete implementation of the abstract API method. ''' # Call parent method to compute Sodium, Potassium and Calcium channels gates steady-states NaKCa_eqstates = super().steadyStates(Vm) # Compute steady-state Calcium current seq = NaKCa_eqstates[3] ueq = NaKCa_eqstates[4] iTeq = self.currCa(seq, ueq, Vm) # Compute steady-state variables for the kinetics system of Ih CCa_eq = self.CCa_min - self.tau_Ca_removal * self.iT_2_CCa * iTeq P0_eq = self.k2 / (self.k2 + self.k1 * CCa_eq**self.nCa) BA = self.betao(Vm) / self.alphao(Vm) O_eq = self.k4 / (self.k3 * (1 - P0_eq) + self.k4 * (1 + BA)) C_eq = BA * O_eq kin_eqstates = np.array([O_eq, C_eq, P0_eq, CCa_eq]) # Merge all steady-states and return return np.concatenate((NaKCa_eqstates, kin_eqstates)) def derStates(self, Vm, states): ''' Concrete implementation of the abstract API method. ''' m, h, n, s, u, O, C, P0, C_Ca = states NaKCa_states = [m, h, n, s, u] NaKCa_derstates = super().derStates(Vm, NaKCa_states) dO_dt = self.derO(C, O, P0, Vm) dC_dt = self.derC(C, O, Vm) dP0_dt = self.derP0(P0, C_Ca) ICa = self.currCa(s, u, Vm) dCCa_dt = self.derC_Ca(C_Ca, ICa) return NaKCa_derstates + [dO_dt, dC_dt, dP0_dt, dCCa_dt] def getEffRates(self, Vm): ''' Concrete implementation of the abstract API method. ''' # Compute effective coefficients for Sodium, Potassium and Calcium conductances NaKCa_effrates = super().getEffRates(Vm) # Compute effective coefficients for Ih conductance ao_avg = np.mean(self.alphao(Vm)) bo_avg = np.mean(self.betao(Vm)) iH_effrates = np.array([ao_avg, bo_avg]) # Return array of coefficients return np.concatenate((NaKCa_effrates, iH_effrates)) def derStatesEff(self, Qm, states, interp_data): ''' Concrete implementation of the abstract API method. ''' rates = np.array([np.interp(Qm, interp_data['Q'], interp_data[rn]) for rn in self.coeff_names]) Vmeff = np.interp(Qm, interp_data['Q'], interp_data['V']) # Unpack states m, h, n, s, u, O, C, P0, C_Ca = states # INa, IK, ICa effective states derivatives dmdt = rates[0] * (1 - m) - rates[1] * m dhdt = rates[2] * (1 - h) - rates[3] * h dndt = rates[4] * (1 - n) - rates[5] * n dsdt = rates[6] * (1 - s) - rates[7] * s dudt = rates[8] * (1 - u) - rates[9] * u # Ih effective states derivatives dC_dt = rates[11] * O - rates[10] * C dO_dt = - dC_dt - self.k3 * O * (1 - P0) + self.k4 * (1 - O - C) dP0_dt = self.derP0(P0, C_Ca) ICa_eff = self.currCa(s, u, Vmeff) dCCa_dt = self.derC_Ca(C_Ca, ICa_eff) # Merge derivatives and return return [dmdt, dhdt, dndt, dsdt, dudt, dO_dt, dC_dt, dP0_dt, dCCa_dt] diff --git a/PySONIC/solvers/simutils.py b/PySONIC/solvers/simutils.py index 4f4d7c7..64e769c 100644 --- a/PySONIC/solvers/simutils.py +++ b/PySONIC/solvers/simutils.py @@ -1,2431 +1,2427 @@ # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Date: 2017-08-22 14:33:04 # @Last Modified by: Theo Lemaire -# @Last Modified time: 2018-09-19 13:38:53 +# @Last Modified time: 2018-09-19 16:01:35 """ Utility functions used in simulations """ import os import time import logging import pickle import shutil import tkinter as tk from tkinter import filedialog import numpy as np import pandas as pd from openpyxl import load_workbook import lockfile import multiprocessing as mp from ..bls import BilayerSonophore from .SolverUS import SolverUS from .SolverElec import SolverElec from ..constants import * from ..neurons import * from ..utils import getNeuronsDict, InputError, PmCompMethod, si_format, getCycleAverage, nDindexes # Get package logger logger = logging.getLogger('PySONIC') class Consumer(mp.Process): ''' Generic consumer process, taking tasks from a queue and outputing results in another queue. ''' def __init__(self, task_queue, result_queue): mp.Process.__init__(self) self.task_queue = task_queue self.result_queue = result_queue print('Starting {}'.format(self.name)) def run(self): while True: nextTask = self.task_queue.get() if nextTask is None: print('Exiting {}'.format(self.name)) self.task_queue.task_done() break print('{}: {}'.format(self.name, nextTask)) answer = nextTask() self.task_queue.task_done() self.result_queue.put(answer) return class Worker(): ''' Generic worker class. ''' def __init__(self, wid, nsims, ncoeffs=0): ''' Class constructor. :param wid: worker ID :param nsims: total number or simulations ''' self.id = wid self.nsims = nsims self.ncoeffs = ncoeffs def __call__(self): ''' worker method. ''' return (self.id, np.random.rand(self.ncoeffs) * 1e9) def __str__(self): return 'simulation {}/{}'.format(self.id + 1, self.nsims) class LookupWorker(Worker): ''' Worker class that computes "effective" coefficients of the HH system for a specific combination of stimulus frequency, stimulus amplitude and charge density. A short mechanical simulation is run while imposing the specific charge density, until periodic stabilization. The HH coefficients are then averaged over the last acoustic cycle to yield "effective" coefficients. ''' def __init__(self, wid, bls, neuron, Fdrive, Adrive, Qm, phi, nsims): ''' Class constructor. :param wid: worker ID :param bls: BilayerSonophore object :param neuron: neuron object :param Fdrive: acoustic drive frequency (Hz) :param Adrive: acoustic drive amplitude (Pa) :param Qm: imposed charge density (C/m2) :param phi: acoustic drive phase (rad) :param nsims: total number or simulations ''' super().__init__(wid, nsims) self.bls = bls self.neuron = neuron self.Fdrive = Fdrive self.Adrive = Adrive self.Qm = Qm self.phi = phi def __call__(self): ''' Method that computes effective coefficients. ''' try: # Run simulation and retrieve deflection and gas content vectors from last cycle _, [Z, ng], _ = self.bls.run(self.Fdrive, self.Adrive, self.Qm, self.phi) Z_last = Z[-NPC_FULL:] # m # Compute membrane potential vector Vm = self.Qm / self.bls.v_Capct(Z_last) * 1e3 # mV # Compute average cycle value for membrane potential and rate constants Vm_eff = np.mean(Vm) # mV rates_eff = self.neuron.getEffRates(Vm) # Take final cycle value for gas content ng_eff = ng[-1] # mole return (self.id, [Vm_eff, ng_eff, *rates_eff]) except (Warning, AssertionError) as inst: logger.warning('Integration error: %s. Continue batch? (y/n)', extra={inst}) user_str = input() if user_str not in ['y', 'Y']: return -1 def __str__(self): return '{} (a = {}m, f = {}Hz, A = {}Pa, Q = {:.2f} nC/cm2)'.format( super().__str__(), *si_format([self.bls.a, self.Fdrive, self.Adrive], space=' '), self.Qm * 1e5) class AStimWorker(Worker): ''' Worker class that runs a single A-STIM simulation a given neuron for specific stimulation parameters, and save the results in a PKL file. ''' def __init__(self, wid, batch_dir, log_filepath, solver, neuron, Fdrive, Adrive, tstim, toffset, PRF, DC, int_method, nsims): ''' Class constructor. :param wid: worker ID :param solver: SolverUS object :param neuron: neuron object :param Fdrive: acoustic drive frequency (Hz) :param Adrive: acoustic drive amplitude (Pa) :param tstim: duration of US stimulation (s) :param toffset: duration of the offset (s) :param PRF: pulse repetition frequency (Hz) :param DC: pulse duty cycle (-) :param int_method: selected integration method :param nsims: total number or simulations ''' super().__init__(wid, nsims) self.batch_dir = batch_dir self.log_filepath = log_filepath self.solver = solver self.neuron = neuron self.Fdrive = Fdrive self.Adrive = Adrive self.tstim = tstim self.toffset = toffset self.PRF = PRF self.DC = DC self.int_method = int_method def __call__(self): ''' Method that runs the simulation. ''' simcode = 'ASTIM_{}_{}_{:.0f}nm_{:.0f}kHz_{:.1f}kPa_{:.0f}ms_{}{}'\ .format(self.neuron.name, 'CW' if self.DC == 1 else 'PW', self.solver.a * 1e9, self.Fdrive * 1e-3, self.Adrive * 1e-3, self.tstim * 1e3, 'PRF{:.2f}Hz_DC{:.2f}%_'.format(self.PRF, self.DC * 1e2) if self.DC < 1. else '', self.int_method) try: # Get date and time info date_str = time.strftime("%Y.%m.%d") daytime_str = time.strftime("%H:%M:%S") # Run simulation tstart = time.time() (t, y, states) = self.solver.run(self.neuron, self.Fdrive, self.Adrive, self.tstim, self.toffset, self.PRF, self.DC, self.int_method) Z, ng, Qm, Vm, *channels = y U = np.insert(np.diff(Z) / np.diff(t), 0, 0.0) tcomp = time.time() - tstart logger.debug('completed in %ss', si_format(tcomp, 2)) # Store dataframe and metadata df = pd.DataFrame({'t': t, 'states': states, 'U': U, 'Z': Z, 'ng': ng, 'Qm': Qm, 'Vm': Vm}) for j in range(len(self.neuron.states_names)): df[self.neuron.states_names[j]] = channels[j] meta = {'neuron': self.neuron.name, 'a': self.solver.a, 'd': self.solver.d, 'Fdrive': self.Fdrive, 'Adrive': self.Adrive, 'phi': np.pi, 'tstim': self.tstim, 'toffset': self.toffset, 'PRF': self.PRF, 'DC': self.DC, 'tcomp': tcomp} # Export into to PKL file output_filepath = '{}/{}.pkl'.format(self.batch_dir, simcode) with open(output_filepath, 'wb') as fh: pickle.dump({'meta': meta, 'data': df}, fh) logger.debug('simulation data exported to "%s"', output_filepath) # Detect spikes on Qm signal dt = t[1] - t[0] ipeaks, *_ = findPeaks(Qm, SPIKE_MIN_QAMP, int(np.ceil(SPIKE_MIN_DT / dt)), SPIKE_MIN_QPROM) n_spikes = ipeaks.size lat = t[ipeaks[0]] if n_spikes > 0 else 'N/A' sr = np.mean(1 / np.diff(t[ipeaks])) if n_spikes > 1 else 'N/A' logger.debug('%u spike%s detected', n_spikes, "s" if n_spikes > 1 else "") # Export key metrics to log file log = { 'A': date_str, 'B': daytime_str, 'C': self.neuron.name, 'D': self.solver.a * 1e9, 'E': self.solver.d * 1e6, 'F': self.Fdrive * 1e-3, 'G': self.Adrive * 1e-3, 'H': self.tstim * 1e3, 'I': self.PRF * 1e-3 if self.DC < 1 else 'N/A', 'J': self.DC, 'K': self.int_method, 'L': t.size, 'M': round(tcomp, 2), 'N': n_spikes, 'O': lat * 1e3 if isinstance(lat, float) else 'N/A', 'P': sr * 1e-3 if isinstance(sr, float) else 'N/A' } if xlslog(self.log_filepath, 'Data', log) == 1: logger.debug('log exported to "%s"', self.log_filepath) else: logger.error('log export to "%s" aborted', self.log_filepath) return output_filepath except (Warning, AssertionError) as inst: logger.warning('Integration error: %s. Continue batch? (y/n)', extra={inst}) user_str = input() if user_str not in ['y', 'Y']: return -1 def __str__(self): worker_str = 'A-STIM {} {}: {} neuron, a = {}m, f = {}Hz, A = {}Pa, t = {}s'\ .format(self.int_method, super().__str__(), self.neuron.name, *si_format([self.solver.a, self.Fdrive], 1, space=' '), si_format(self.Adrive, 2, space=' '), si_format(self.tstim, 1, space=' ')) if self.DC < 1.0: worker_str += ', PRF = {}Hz, DC = {:.2f}%'\ .format(si_format(self.PRF, 2, space=' '), self.DC * 1e2) return worker_str class EStimWorker(Worker): ''' Worker class that runs a single E-STIM simulation a given neuron for specific stimulation parameters, and save the results in a PKL file. ''' def __init__(self, wid, batch_dir, log_filepath, solver, neuron, Astim, tstim, toffset, PRF, DC, nsims): ''' Class constructor. :param wid: worker ID :param solver: SolverElec object :param neuron: neuron object :param Astim: stimulus amplitude (mA/m2) :param tstim: duration of US stimulation (s) :param toffset: duration of the offset (s) :param PRF: pulse repetition frequency (Hz) :param DC: pulse duty cycle (-) :param nsims: total number or simulations ''' super().__init__(wid, nsims) self.batch_dir = batch_dir self.log_filepath = log_filepath self.solver = solver self.neuron = neuron self.Astim = Astim self.tstim = tstim self.toffset = toffset self.PRF = PRF self.DC = DC def __call__(self): ''' Method that runs the simulation. ''' simcode = 'ESTIM_{}_{}_{:.1f}mA_per_m2_{:.0f}ms{}'\ .format(self.neuron.name, 'CW' if self.DC == 1 else 'PW', self.Astim, self.tstim * 1e3, '_PRF{:.2f}Hz_DC{:.2f}%'.format(self.PRF, self.DC * 1e2) if self.DC < 1. else '') try: # Get date and time info date_str = time.strftime("%Y.%m.%d") daytime_str = time.strftime("%H:%M:%S") # Run simulation tstart = time.time() (t, y, states) = self.solver.run(self.neuron, self.Astim, self.tstim, self.toffset, self.PRF, self.DC) Vm, *channels = y tcomp = time.time() - tstart logger.debug('completed in %ss', si_format(tcomp, 1)) # Store dataframe and metadata df = pd.DataFrame({'t': t, 'states': states, 'Vm': Vm, 'Qm': Vm * self.neuron.Cm0 * 1e-3}) for j in range(len(self.neuron.states_names)): df[self.neuron.states_names[j]] = channels[j] meta = {'neuron': self.neuron.name, 'Astim': self.Astim, 'tstim': self.tstim, 'toffset': self.toffset, 'PRF': self.PRF, 'DC': self.DC, 'tcomp': tcomp} # Export into to PKL file output_filepath = '{}/{}.pkl'.format(self.batch_dir, simcode) with open(output_filepath, 'wb') as fh: pickle.dump({'meta': meta, 'data': df}, fh) logger.debug('simulation data exported to "%s"', output_filepath) # Detect spikes on Vm signal dt = t[1] - t[0] ipeaks, *_ = findPeaks(Vm, SPIKE_MIN_VAMP, int(np.ceil(SPIKE_MIN_DT / dt)), SPIKE_MIN_VPROM) n_spikes = ipeaks.size lat = t[ipeaks[0]] if n_spikes > 0 else 'N/A' sr = np.mean(1 / np.diff(t[ipeaks])) if n_spikes > 1 else 'N/A' logger.debug('%u spike%s detected', n_spikes, "s" if n_spikes > 1 else "") # Export key metrics to log file log = { 'A': date_str, 'B': daytime_str, 'C': self.neuron.name, 'D': self.Astim, 'E': self.tstim * 1e3, 'F': self.PRF * 1e-3 if self.DC < 1 else 'N/A', 'G': self.DC, 'H': t.size, 'I': round(tcomp, 2), 'J': n_spikes, 'K': lat * 1e3 if isinstance(lat, float) else 'N/A', 'L': sr * 1e-3 if isinstance(sr, float) else 'N/A' } if xlslog(self.log_filepath, 'Data', log) == 1: logger.debug('log exported to "%s"', self.log_filepath) else: logger.error('log export to "%s" aborted', self.log_filepath) return output_filepath except (Warning, AssertionError) as inst: logger.warning('Integration error: %s. Continue batch? (y/n)', extra={inst}) user_str = input() if user_str not in ['y', 'Y']: return -1 def __str__(self): worker_str = 'E-STIM {}: {} neuron, A = {}A/m2, t = {}s'\ .format(super().__str__(), self.neuron.name, si_format(self.Astim * 1e-3, 2, space=' '), si_format(self.tstim, 1, space=' ')) if self.DC < 1.0: worker_str += ', PRF = {}Hz, DC = {:.2f}%'\ .format(si_format(self.PRF, 2, space=' '), self.DC * 1e2) return worker_str class MechWorker(Worker): ''' Worker class that runs a single simulation of the mechanical system with specific parameters and an imposed value of charge density, and save the results in a PKL file. ''' def __init__(self, wid, batch_dir, log_filepath, bls, Fdrive, Adrive, Qm, nsims): ''' Class constructor. :param wid: worker ID :param batch_dir: full path to output directory of batch :param log_filepath: full path log file of batch :param bls: BilayerSonophore instance :param Fdrive: acoustic drive frequency (Hz) :param Adrive: acoustic drive amplitude (Pa) :param Qm: applided membrane charge density (C/m2) :param nsims: total number or simulations ''' super().__init__(wid, nsims) self.batch_dir = batch_dir self.log_filepath = log_filepath self.bls = bls self.Fdrive = Fdrive self.Adrive = Adrive self.Qm = Qm def __call__(self): ''' Method that runs the simulation. ''' simcode = 'MECH_{:.0f}nm_{:.0f}kHz_{:.1f}kPa_{:.1f}nCcm2'\ .format(self.bls.a * 1e9, self.Fdrive * 1e-3, self.Adrive * 1e-3, self.Qm * 1e5) try: # Get date and time info date_str = time.strftime("%Y.%m.%d") daytime_str = time.strftime("%H:%M:%S") # Run simulation tstart = time.time() (t, y, states) = self.bls.run(self.Fdrive, self.Adrive, self.Qm) (Z, ng) = y U = np.insert(np.diff(Z) / np.diff(t), 0, 0.0) tcomp = time.time() - tstart logger.debug('completed in %ss', si_format(tcomp, 1)) # Store dataframe and metadata df = pd.DataFrame({'t': t, 'states': states, 'U': U, 'Z': Z, 'ng': ng}) meta = {'a': self.bls.a, 'd': self.bls.d, 'Cm0': self.bls.Cm0, 'Qm0': self.bls.Qm0, 'Fdrive': self.Fdrive, 'Adrive': self.Adrive, 'phi': np.pi, 'Qm': self.Qm, 'tcomp': tcomp} # Export into to PKL file output_filepath = '{}/{}.pkl'.format(self.batch_dir, simcode) with open(output_filepath, 'wb') as fh: pickle.dump({'meta': meta, 'data': df}, fh) logger.debug('simulation data exported to "%s"', output_filepath) # Compute key output metrics Zmax = np.amax(Z) Zmin = np.amin(Z) Zabs_max = np.amax(np.abs([Zmin, Zmax])) eAmax = self.bls.arealstrain(Zabs_max) Tmax = self.bls.TEtot(Zabs_max) Pmmax = self.bls.PMavgpred(Zmin) ngmax = np.amax(ng) dUdtmax = np.amax(np.abs(np.diff(U) / np.diff(t)**2)) # Export key metrics to log file log = { 'A': date_str, 'B': daytime_str, 'C': self.bls.a * 1e9, 'D': self.bls.d * 1e6, 'E': self.Fdrive * 1e-3, 'F': self.Adrive * 1e-3, 'G': self.Qm * 1e5, 'H': t.size, 'I': tcomp, 'J': self.bls.kA + self.bls.kA_tissue, 'K': Zmax * 1e9, 'L': eAmax, 'M': Tmax * 1e3, 'N': (ngmax - self.bls.ng0) / self.bls.ng0, 'O': Pmmax * 1e-3, 'P': dUdtmax } if xlslog(self.log_filepath, 'Data', log) == 1: logger.info('log exported to "%s"', self.log_filepath) else: logger.error('log export to "%s" aborted', self.log_filepath) return output_filepath except (Warning, AssertionError) as inst: logger.warning('Integration error: %s. Continue batch? (y/n)', extra={inst}) user_str = input() if user_str not in ['y', 'Y']: return -1 def __str__(self): return 'Mechanical {}: a = {}m, d = {}m, f = {}Hz, A = {}Pa, Q = {}C/cm2'\ .format(super().__str__(), *si_format([self.bls.a, self.bls.d, self.Fdrive], 1, space=' '), *si_format([self.Adrive, self.Qm * 1e-4], 2, space=' ')) class EStimTitrator(Worker): ''' Worker class that uses a dichotomic recursive search to determine the threshold value of a specific electric stimulation parameter needed to obtain neural excitation, keeping all other parameters fixed. The titration parameter can be stimulation amplitude, duration or any variable for which the number of spikes is a monotonically increasing function. ''' def __init__(self, wid, solver, neuron, Astim, tstim, toffset, PRF, DC, nsims=1): ''' Class constructor. :param wid: worker ID :param solver: SolverElec object :param neuron: neuron object :param Astim: injected current density amplitude (mA/m2) :param tstim: duration of US stimulation (s) :param toffset: duration of the offset (s) :param PRF: pulse repetition frequency (Hz) :param DC: pulse duty cycle (-) :param nsims: total number or simulations ''' super().__init__(wid, nsims) self.solver = solver self.neuron = neuron self.Astim = Astim self.tstim = tstim self.toffset = toffset self.PRF = PRF self.DC = DC # Determine titration type if Astim is None: self.t_type = 'A' self.unit = 'A/m2' self.factor = 1e-3 self.interval = (0., 2 * TITRATION_ESTIM_A_MAX) self.thr = TITRATION_ESTIM_DA_MAX self.maxval = TITRATION_ESTIM_A_MAX elif tstim is None: self.t_type = 't' self.unit = 's' self.factor = 1 self.interval = (0., 2 * TITRATION_T_MAX) self.thr = TITRATION_DT_THR self.maxval = TITRATION_T_MAX elif DC is None: self.t_type = 'DC' self.unit = '%' self.factor = 1e2 self.interval = (0., 2 * TITRATION_DC_MAX) self.thr = TITRATION_DDC_THR self.maxval = TITRATION_DC_MAX else: print('Error: Invalid titration type') self.t0 = None def __call__(self): ''' Method running the titration, called recursively until an accurate threshold is found. :return: 5-tuple with the determined threshold, time profile, solution matrix, state vector and response latency ''' if self.t0 is None: self.t0 = time.time() # Define current value value = (self.interval[0] + self.interval[1]) / 2 # Define stimulation parameters if self.t_type == 'A': stim_params = [value, self.tstim, self.toffset, self.PRF, self.DC] elif self.t_type == 't': stim_params = [self.Astim, value, self.toffset, self.PRF, self.DC] elif self.t_type == 'DC': stim_params = [self.Astim, self.tstim, self.toffset, self.PRF, value] # Run simulation and detect spikes (t, y, states) = self.solver.run(self.neuron, *stim_params) dt = t[1] - t[0] ipeaks, *_ = findPeaks(y[0, :], SPIKE_MIN_VAMP, int(np.ceil(SPIKE_MIN_DT / dt)), SPIKE_MIN_VPROM) n_spikes = ipeaks.size latency = t[ipeaks[0]] if n_spikes > 0 else None print('{}{} ---> {} spike{} detected'.format(si_format(value * self.factor, 2, space=' '), self.unit, n_spikes, "s" if n_spikes > 1 else "")) # If accurate threshold is found, return simulation results if (self.interval[1] - self.interval[0]) <= self.thr and n_spikes == 1: tcomp = time.time() - self.t0 print('completed in {:.2f} s, threshold = {}{}' .format(tcomp, si_format(value * self.factor, 2, space=' '), self.unit)) return (value, t, y, states, latency, tcomp) # Otherwise, refine titration interval and iterate recursively else: if n_spikes == 0: # if value too close to max then stop if (self.maxval - value) <= self.thr: logger.warning('no spikes detected within titration interval') return (np.nan, t, y, states, latency) self.interval = (value, self.interval[1]) else: self.interval = (self.interval[0], value) return self.__call__() def __str__(self): params = [self.Astim, self.tstim, self.PRF, self.DC] punits = {'A': 'A/m2', 't': 's', 'PRF': 'Hz', 'DC': '%'} if self.Astim is not None: params[0] *= 1e-3 if self.DC is not None: params[3] *= 1e2 pnames = list(punits.keys()) ittr = params.index(None) del params[ittr] del pnames[ittr] log_str = ', '.join(['{} = {}{}'.format(pname, si_format(param, 2, space=' '), punits[pname]) for pname, param in zip(pnames, params)]) return '{} neuron - E-STIM titration {}/{} ({})'\ .format(self.neuron.name, self.id, self.nsims, log_str) class AStimTitrator(Worker): ''' Worker class that uses a dichotomic recursive search to determine the threshold value of a specific acoustic stimulation parameter needed to obtain neural excitation, keeping all other parameters fixed. The titration parameter can be stimulation amplitude, duration or any variable for which the number of spikes is a monotonically increasing function. ''' def __init__(self, wid, solver, neuron, Fdrive, Adrive, tstim, toffset, PRF, DC, int_method='sonic', nsims=1): ''' Class constructor. :param wid: worker ID :param solver: SolverUS object :param neuron: neuron object :param Fdrive: acoustic drive frequency (Hz) :param Adrive: acoustic drive amplitude (Pa) :param tstim: duration of US stimulation (s) :param toffset: duration of the offset (s) :param PRF: pulse repetition frequency (Hz) :param DC: pulse duty cycle (-) :param int_method: selected integration method :param nsims: total number or simulations ''' super().__init__(wid, nsims) self.solver = solver self.neuron = neuron self.Fdrive = Fdrive self.Adrive = Adrive self.tstim = tstim self.toffset = toffset self.PRF = PRF self.DC = DC self.int_method = int_method # Determine titration type if Adrive is None: self.t_type = 'A' self.unit = 'Pa' self.factor = 1 self.interval = (0., 2 * TITRATION_ASTIM_A_MAX) self.thr = TITRATION_ASTIM_DA_MAX self.maxval = TITRATION_ASTIM_A_MAX elif tstim is None: self.t_type = 't' self.unit = 's' self.factor = 1 self.interval = (0., 2 * TITRATION_T_MAX) self.thr = TITRATION_DT_THR self.maxval = TITRATION_T_MAX elif DC is None: self.t_type = 'DC' self.unit = '%' self.factor = 1e2 self.interval = (0., 2 * TITRATION_DC_MAX) self.thr = TITRATION_DDC_THR self.maxval = TITRATION_DC_MAX else: print('Error: Invalid titration type') self.t0 = None def __call__(self): ''' Method running the titration, called recursively until an accurate threshold is found. :return: 5-tuple with the determined threshold, time profile, solution matrix, state vector and response latency ''' if self.t0 is None: self.t0 = time.time() # Define current value value = (self.interval[0] + self.interval[1]) / 2 # Define stimulation parameters if self.t_type == 'A': stim_params = [self.Fdrive, value, self.tstim, self.toffset, self.PRF, self.DC] elif self.t_type == 't': stim_params = [self.Fdrive, self.Adrive, value, self.toffset, self.PRF, self.DC] elif self.t_type == 'DC': stim_params = [self.Fdrive, self.Adrive, self.tstim, self.toffset, self.PRF, value] # Run simulation and detect spikes (t, y, states) = self.solver.run(self.neuron, *stim_params, self.int_method) dt = t[1] - t[0] ipeaks, *_ = findPeaks(y[2, :], SPIKE_MIN_QAMP, int(np.ceil(SPIKE_MIN_DT / dt)), SPIKE_MIN_QPROM) n_spikes = ipeaks.size latency = t[ipeaks[0]] if n_spikes > 0 else None print('{}{} ---> {} spike{} detected'.format(si_format(value * self.factor, 2, space=' '), self.unit, n_spikes, "s" if n_spikes > 1 else "")) # If accurate threshold is found, return simulation results if (self.interval[1] - self.interval[0]) <= self.thr and n_spikes == 1: tcomp = time.time() - self.t0 print('completed in {:.2f} s, threshold = {}{}' .format(tcomp, si_format(value * self.factor, 2, space=' '), self.unit)) return (value, t, y, states, latency, tcomp) # Otherwise, refine titration interval and iterate recursively else: if n_spikes == 0: # if value too close to max then stop if (self.maxval - value) <= self.thr: logger.warning('no spikes detected within titration interval') return (np.nan, t, y, states, latency) self.interval = (value, self.interval[1]) else: self.interval = (self.interval[0], value) return self.__call__() def __str__(self): params = [self.Fdrive, self.Adrive, self.tstim, self.PRF, self.DC] punits = {'f': 'Hz', 'A': 'A/m2', 't': 's', 'PRF': 'Hz', 'DC': '%'} if self.Adrive is not None: params[1] *= 1e-3 if self.DC is not None: params[4] *= 1e2 pnames = list(punits.keys()) ittr = params.index(None) del params[ittr] del pnames[ittr] log_str = ', '.join(['{} = {}{}'.format(pname, si_format(param, 2, space=' '), punits[pname]) for pname, param in zip(pnames, params)]) return '{} neuron - A-STIM titration {}/{} ({})'\ .format(self.neuron.name, self.id, self.nsims, log_str) def setBatchDir(): ''' Select batch directory for output files.α :return: full path to batch directory ''' root = tk.Tk() root.withdraw() batch_dir = filedialog.askdirectory() if not batch_dir: raise InputError('No output directory chosen') return batch_dir def checkBatchLog(batch_dir, batch_type): ''' Check for appropriate log file in batch directory, and create one if it is absent. :param batch_dir: full path to batch directory :param batch_type: type of simulation batch :return: 2 tuple with full path to log file and boolean stating if log file was created ''' # Check for directory existence if not os.path.isdir(batch_dir): raise InputError('"{}" output directory does not exist'.format(batch_dir)) # Determine log template from batch type if batch_type == 'MECH': logfile = 'log_MECH.xlsx' elif batch_type == 'A-STIM': logfile = 'log_ASTIM.xlsx' elif batch_type == 'E-STIM': logfile = 'log_ESTIM.xlsx' else: raise InputError('Unknown batch type', batch_type) # Get template in package subdirectory this_dir, _ = os.path.split(__file__) parent_dir = os.path.abspath(os.path.join(this_dir, os.pardir)) logsrc = parent_dir + '/templates/' + logfile assert os.path.isfile(logsrc), 'template log file "{}" not found'.format(logsrc) # Copy template in batch directory if no appropriate log file logdst = batch_dir + '/' + logfile is_log = os.path.isfile(logdst) if not is_log: shutil.copy2(logsrc, logdst) return (logdst, not is_log) def createSimQueue(amps, durations, offsets, PRFs, DCs): ''' Create a serialized 2D array of all parameter combinations for a series of individual parameter sweeps, while avoiding repetition of CW protocols for a given PRF sweep. :param amps: list (or 1D-array) of acoustic amplitudes :param durations: list (or 1D-array) of stimulus durations :param offsets: list (or 1D-array) of stimulus offsets (paired with durations array) :param PRFs: list (or 1D-array) of pulse-repetition frequencies :param DCs: list (or 1D-array) of duty cycle values :return: 2D-array with (amplitude, duration, offset, PRF, DC) for each stimulation protocol ''' # Convert input to 1D-arrays amps = np.array(amps) durations = np.array(durations) offsets = np.array(offsets) PRFs = np.array(PRFs) DCs = np.array(DCs) # Create index arrays iamps = range(len(amps)) idurs = range(len(durations)) # Create empty output matrix queue = np.empty((1, 5)) # Continuous protocols if 1.0 in DCs: nCW = len(amps) * len(durations) arr1 = np.ones(nCW) iCW_queue = np.array(np.meshgrid(iamps, idurs)).T.reshape(nCW, 2) CW_queue = np.vstack((amps[iCW_queue[:, 0]], durations[iCW_queue[:, 1]], offsets[iCW_queue[:, 1]], PRFs.min() * arr1, arr1)).T queue = np.vstack((queue, CW_queue)) # Pulsed protocols if np.any(DCs != 1.0): pulsed_DCs = DCs[DCs != 1.0] iPRFs = range(len(PRFs)) ipulsed_DCs = range(len(pulsed_DCs)) nPW = len(amps) * len(durations) * len(PRFs) * len(pulsed_DCs) iPW_queue = np.array(np.meshgrid(iamps, idurs, iPRFs, ipulsed_DCs)).T.reshape(nPW, 4) PW_queue = np.vstack((amps[iPW_queue[:, 0]], durations[iPW_queue[:, 1]], offsets[iPW_queue[:, 1]], PRFs[iPW_queue[:, 2]], pulsed_DCs[iPW_queue[:, 3]])).T queue = np.vstack((queue, PW_queue)) # Return return queue[1:, :] def xlslog(filename, sheetname, data): """ Append log data on a new row to specific sheet of excel workbook, using a lockfile to avoid read/write errors between concurrent processes. :param filename: absolute or relative path to the Excel workbook :param sheetname: name of the Excel spreadsheet to which data is appended :param data: data structure to be added to specific columns on a new row :return: boolean indicating success (1) or failure (0) of operation """ try: lock = lockfile.FileLock(filename) lock.acquire() wb = load_workbook(filename) ws = wb[sheetname] keys = data.keys() i = 1 row_data = {} for k in keys: row_data[k] = data[k] i += 1 ws.append(row_data) wb.save(filename) lock.release() return 1 except PermissionError: # If file cannot be accessed for writing because already opened logger.warning('Cannot write to "%s". Close the file and type "Y"', filename) user_str = input() if user_str in ['y', 'Y']: return xlslog(filename, sheetname, data) else: return 0 def detectPeaks(x, mph=None, mpd=1, threshold=0, edge='rising', kpsh=False, valley=False, ax=None): ''' Detect peaks in data based on their amplitude and other features. Adapted from Marco Duarte: http://nbviewer.jupyter.org/github/demotu/BMC/blob/master/notebooks/DetectPeaks.ipynb :param x: 1D array_like data. :param mph: minimum peak height (default = None). :param mpd: minimum peak distance in indexes (default = 1) :param threshold : minimum peak prominence (default = 0) :param edge : for a flat peak, keep only the rising edge ('rising'), only the falling edge ('falling'), both edges ('both'), or don't detect a flat peak (None). (default = 'rising') :param kpsh: keep peaks with same height even if they are closer than `mpd` (default = False). :param valley: detect valleys (local minima) instead of peaks (default = False). :param show: plot data in matplotlib figure (default = False). :param ax: a matplotlib.axes.Axes instance, optional (default = None). :return: 1D array with the indices of the peaks ''' print('min peak height:', mph, ', min peak distance:', mpd, ', min peak prominence:', threshold) # Convert input to numpy array x = np.atleast_1d(x).astype('float64') # Revert signal sign for valley detection if valley: x = -x # Differentiate signal dx = np.diff(x) # Find indices of all peaks with edge criterion ine, ire, ife = np.array([[], [], []], dtype=int) if not edge: ine = np.where((np.hstack((dx, 0)) < 0) & (np.hstack((0, dx)) > 0))[0] else: if edge.lower() in ['rising', 'both']: ire = np.where((np.hstack((dx, 0)) <= 0) & (np.hstack((0, dx)) > 0))[0] if edge.lower() in ['falling', 'both']: ife = np.where((np.hstack((dx, 0)) < 0) & (np.hstack((0, dx)) >= 0))[0] ind = np.unique(np.hstack((ine, ire, ife))) # Remove first and last values of x if they are detected as peaks if ind.size and ind[0] == 0: ind = ind[1:] if ind.size and ind[-1] == x.size - 1: ind = ind[:-1] print('{} raw peaks'.format(ind.size)) # Remove peaks < minimum peak height if ind.size and mph is not None: ind = ind[x[ind] >= mph] print('{} height-filtered peaks'.format(ind.size)) # Remove peaks - neighbors < threshold if ind.size and threshold > 0: dx = np.min(np.vstack([x[ind] - x[ind - 1], x[ind] - x[ind + 1]]), axis=0) ind = np.delete(ind, np.where(dx < threshold)[0]) print('{} prominence-filtered peaks'.format(ind.size)) # Detect small peaks closer than minimum peak distance if ind.size and mpd > 1: ind = ind[np.argsort(x[ind])][::-1] # sort ind by peak height idel = np.zeros(ind.size, dtype=bool) for i in range(ind.size): if not idel[i]: # keep peaks with the same height if kpsh is True idel = idel | (ind >= ind[i] - mpd) & (ind <= ind[i] + mpd) \ & (x[ind[i]] > x[ind] if kpsh else True) idel[i] = 0 # Keep current peak # remove the small peaks and sort back the indices by their occurrence ind = np.sort(ind[~idel]) print('{} distance-filtered peaks'.format(ind.size)) return ind def detectPeaksTime(t, y, mph, mtd, mpp=0): """ Extension of the detectPeaks function to detect peaks in data based on their amplitude and time difference, with a non-uniform time vector. :param t: time vector (not necessarily uniform) :param y: signal :param mph: minimal peak height :param mtd: minimal time difference :mpp: minmal peak prominence :return: array of peak indexes """ # Determine whether time vector is uniform (threshold in time step variation) dt = np.diff(t) if (dt.max() - dt.min()) / dt.min() < 1e-2: isuniform = True else: isuniform = False if isuniform: print('uniform time vector') dt = t[1] - t[0] mpd = int(np.ceil(mtd / dt)) ipeaks = detectPeaks(y, mph, mpd=mpd, threshold=mpp) else: print('non-uniform time vector') # Detect peaks on signal with no restriction on inter-peak distance irawpeaks = detectPeaks(y, mph, mpd=1, threshold=mpp) npeaks = irawpeaks.size if npeaks > 0: # Filter relevant peaks with temporal distance ipeaks = [irawpeaks[0]] for i in range(1, npeaks): i1 = ipeaks[-1] i2 = irawpeaks[i] if t[i2] - t[i1] < mtd: if y[i2] > y[i1]: ipeaks[-1] = i2 else: ipeaks.append(i2) else: ipeaks = [] ipeaks = np.array(ipeaks) return ipeaks def detectSpikes(t, Qm, min_amp, min_dt): ''' Detect spikes on a charge density signal, and return their number, latency and rate. :param t: time vector (s) :param Qm: charge density vector (C/m2) :param min_amp: minimal charge amplitude to detect spikes (C/m2) :param min_dt: minimal time interval between 2 spikes (s) :return: 3-tuple with number of spikes, latency (s) and spike rate (sp/s) ''' i_spikes = detectPeaksTime(t, Qm, min_amp, min_dt) if len(i_spikes) > 0: latency = t[i_spikes[0]] # s n_spikes = i_spikes.size if n_spikes > 1: first_to_last_spike = t[i_spikes[-1]] - t[i_spikes[0]] # s spike_rate = (n_spikes - 1) / first_to_last_spike # spikes/s else: spike_rate = 'N/A' else: latency = 'N/A' spike_rate = 'N/A' n_spikes = 0 return (n_spikes, latency, spike_rate) def findPeaks(y, mph=None, mpd=None, mpp=None): ''' Detect peaks in a signal based on their height, prominence and/or separating distance. :param y: signal vector :param mph: minimum peak height (in signal units, default = None). :param mpd: minimum inter-peak distance (in indexes, default = None) :param mpp: minimum peak prominence (in signal units, default = None) :return: 4-tuple of arrays with the indexes of peaks occurence, peaks prominence, peaks width at half-prominence and peaks half-prominence bounds (left and right) Adapted from: - Marco Duarte's detect_peaks function (http://nbviewer.jupyter.org/github/demotu/BMC/blob/master/notebooks/DetectPeaks.ipynb) - MATLAB findpeaks function (https://ch.mathworks.com/help/signal/ref/findpeaks.html) ''' # Define empty output empty = (np.array([]),) * 4 # Differentiate signal dy = np.diff(y) # Find all peaks and valleys # s = np.sign(dy) # ipeaks = np.where(np.diff(s) < 0.0)[0] + 1 # ivalleys = np.where(np.diff(s) > 0.0)[0] + 1 ipeaks = np.where((np.hstack((dy, 0)) <= 0) & (np.hstack((0, dy)) > 0))[0] ivalleys = np.where((np.hstack((dy, 0)) >= 0) & (np.hstack((0, dy)) < 0))[0] # Return empty output if no peak detected if ipeaks.size == 0: return empty logger.debug('%u peaks found, starting at index %u and ending at index %u', ipeaks.size, ipeaks[0], ipeaks[-1]) logger.debug('%u valleys found, starting at index %u and ending at index %u', ivalleys.size, ivalleys[0], ivalleys[-1]) # Ensure each peak is bounded by two valleys, adding signal boundaries as valleys if necessary if ivalleys.size == 0 or ipeaks[0] < ivalleys[0]: ivalleys = np.insert(ivalleys, 0, -1) if ipeaks[-1] > ivalleys[-1]: ivalleys = np.insert(ivalleys, ivalleys.size, y.size - 1) if ivalleys.size - ipeaks.size != 1: logger.debug('Cleaning up incongruities') i = 0 while i < min(ipeaks.size, ivalleys.size) - 1: if ipeaks[i] < ivalleys[i]: # 2 peaks between consecutive valleys -> remove lowest idel = i - 1 if y[ipeaks[i - 1]] < y[ipeaks[i]] else i logger.debug('Removing abnormal peak at index %u', ipeaks[idel]) ipeaks = np.delete(ipeaks, idel) if ipeaks[i] > ivalleys[i + 1]: idel = i + 1 if y[ivalleys[i]] < y[ivalleys[i + 1]] else i logger.debug('Removing abnormal valley at index %u', ivalleys[idel]) ivalleys = np.delete(ivalleys, idel) else: i += 1 logger.debug('Post-cleanup: %u peaks and %u valleys', ipeaks.size, ivalleys.size) # Remove peaks < minimum peak height if mph is not None: ipeaks = ipeaks[y[ipeaks] >= mph] if ipeaks.size == 0: return empty # Detect small peaks closer than minimum peak distance if mpd is not None: ipeaks = ipeaks[np.argsort(y[ipeaks])][::-1] # sort ipeaks by descending peak height idel = np.zeros(ipeaks.size, dtype=bool) # initialize boolean deletion array (all false) for i in range(ipeaks.size): # for each peak if not idel[i]: # if not marked for deletion closepeaks = (ipeaks >= ipeaks[i] - mpd) & (ipeaks <= ipeaks[i] + mpd) # close peaks idel = idel | closepeaks # mark for deletion along with previously marked peaks # idel = idel | (ipeaks >= ipeaks[i] - mpd) & (ipeaks <= ipeaks[i] + mpd) idel[i] = 0 # keep current peak # remove the small peaks and sort back the indices by their occurrence ipeaks = np.sort(ipeaks[~idel]) # Detect smallest valleys between consecutive relevant peaks ibottomvalleys = [] if ipeaks[0] > ivalleys[0]: itrappedvalleys = ivalleys[ivalleys < ipeaks[0]] ibottomvalleys.append(itrappedvalleys[np.argmin(y[itrappedvalleys])]) for i, j in zip(ipeaks[:-1], ipeaks[1:]): itrappedvalleys = ivalleys[np.logical_and(ivalleys > i, ivalleys < j)] ibottomvalleys.append(itrappedvalleys[np.argmin(y[itrappedvalleys])]) if ipeaks[-1] < ivalleys[-1]: itrappedvalleys = ivalleys[ivalleys > ipeaks[-1]] ibottomvalleys.append(itrappedvalleys[np.argmin(y[itrappedvalleys])]) ipeaks = ipeaks ivalleys = np.array(ibottomvalleys, dtype=int) # Ensure each peak is bounded by two valleys, adding signal boundaries as valleys if necessary if ipeaks[0] < ivalleys[0]: ivalleys = np.insert(ivalleys, 0, 0) if ipeaks[-1] > ivalleys[-1]: ivalleys = np.insert(ivalleys, ivalleys.size, y.size - 1) # Remove peaks < minimum peak prominence if mpp is not None: # Compute peaks prominences as difference between peaks and their closest valley prominences = y[ipeaks] - np.amax((y[ivalleys[:-1]], y[ivalleys[1:]]), axis=0) # initialize peaks and valleys deletion tables idelp = np.zeros(ipeaks.size, dtype=bool) idelv = np.zeros(ivalleys.size, dtype=bool) # for each peak (sorted by ascending prominence order) for ind in np.argsort(prominences): ipeak = ipeaks[ind] # get peak index # get peak bases as first valleys on either side not marked for deletion indleftbase = ind indrightbase = ind + 1 while idelv[indleftbase]: indleftbase -= 1 while idelv[indrightbase]: indrightbase += 1 ileftbase = ivalleys[indleftbase] irightbase = ivalleys[indrightbase] # Compute peak prominence and mark for deletion if < mpp indmaxbase = indleftbase if y[ileftbase] > y[irightbase] else indrightbase if y[ipeak] - y[ivalleys[indmaxbase]] < mpp: idelp[ind] = True # mark peak for deletion idelv[indmaxbase] = True # mark highest surrouding valley for deletion # remove irrelevant peaks and valleys, and sort back the indices by their occurrence ipeaks = np.sort(ipeaks[~idelp]) ivalleys = np.sort(ivalleys[~idelv]) if ipeaks.size == 0: return empty # Compute peaks prominences and reference half-prominence levels prominences = y[ipeaks] - np.amax((y[ivalleys[:-1]], y[ivalleys[1:]]), axis=0) refheights = y[ipeaks] - prominences / 2 # Compute half-prominence bounds ibounds = np.empty((ipeaks.size, 2)) for i in range(ipeaks.size): # compute the index of the left-intercept at half max ileft = ipeaks[i] while ileft >= ivalleys[i] and y[ileft] > refheights[i]: ileft -= 1 if ileft < ivalleys[i]: # intercept exactly on valley ibounds[i, 0] = ivalleys[i] else: # interpolate intercept linearly between signal boundary points a = (y[ileft + 1] - y[ileft]) / 1 b = y[ileft] - a * ileft ibounds[i, 0] = (refheights[i] - b) / a # compute the index of the right-intercept at half max iright = ipeaks[i] while iright <= ivalleys[i + 1] and y[iright] > refheights[i]: iright += 1 if iright > ivalleys[i + 1]: # intercept exactly on valley ibounds[i, 1] = ivalleys[i + 1] else: # interpolate intercept linearly between signal boundary points if iright == y.size - 1: # special case: if end of signal is reached, decrement iright iright -= 1 a = (y[iright + 1] - y[iright]) / 1 b = y[iright] - a * iright ibounds[i, 1] = (refheights[i] - b) / a # Compute peaks widths at half-prominence widths = np.diff(ibounds, axis=1) return (ipeaks - 1, prominences, widths, ibounds) def runMechBatch(batch_dir, log_filepath, Cm0, Qm0, stim_params, a, d=0.0, multiprocess=False): ''' Run batch simulations of the mechanical system with imposed values of charge density, for various sonophore spans and stimulation parameters. :param batch_dir: full path to output directory of batch :param log_filepath: full path log file of batch :param Cm0: membrane resting capacitance (F/m2) :param Qm0: membrane resting charge density (C/m2) :param stim_params: dictionary containing sweeps for all stimulation parameters :param a: BLS in-plane diameter (m) :param d: depth of embedding tissue around plasma membrane (m) :param multiprocess: boolean statting wether or not to use multiprocessing ''' # Checking validity of stimulation parameters mandatory_params = ['freqs', 'amps', 'charges'] for mparam in mandatory_params: if mparam not in stim_params: raise InputError('Missing stimulation parameter field: "{}"'.format(mparam)) logger.info("Starting mechanical simulation batch") # Unpack stimulation parameters amps = np.array(stim_params['amps']) charges = np.array(stim_params['charges']) # Generate simulations queue nA = len(amps) nQ = len(charges) sim_queue = np.array(np.meshgrid(amps, charges)).T.reshape(nA * nQ, 2) nqueue = sim_queue.shape[0] nsims = len(stim_params['freqs']) * nqueue # Initiate multiple processing objects if needed if multiprocess: mp.freeze_support() # Create tasks and results queues tasks = mp.JoinableQueue() results = mp.Queue() # Create and start consumer processes nconsumers = min(mp.cpu_count(), nsims) consumers = [Consumer(tasks, results) for i in range(nconsumers)] for w in consumers: w.start() # Run simulations wid = 0 filepaths = [] for Fdrive in stim_params['freqs']: # Create BilayerSonophore instance (modulus of embedding tissue depends on frequency) bls = BilayerSonophore(a, Fdrive, Cm0, Qm0, d) for i in range(nqueue): - wid += 1 Adrive, Qm = sim_queue[i, :] worker = MechWorker(wid, batch_dir, log_filepath, bls, Fdrive, Adrive, Qm, nsims) if multiprocess: tasks.put(worker, block=False) else: logger.info('%s', worker) output_filepath = worker.__call__() filepaths.append(output_filepath) + wid += 1 if multiprocess: # Stop processes for i in range(nconsumers): tasks.put(None, block=False) tasks.join() # Retrieve workers output for x in range(nsims): output_filepath = results.get() filepaths.append(output_filepath) # Close tasks and results queues tasks.close() results.close() return filepaths def runEStimBatch(batch_dir, log_filepath, neurons, stim_params, multiprocess=False): ''' Run batch E-STIM simulations of the system for various neuron types and stimulation parameters. :param batch_dir: full path to output directory of batch :param log_filepath: full path log file of batch :param neurons: list of neurons names :param stim_params: dictionary containing sweeps for all stimulation parameters :param multiprocess: boolean statting wether or not to use multiprocessing :return: list of full paths to the output files ''' mandatory_params = ['amps', 'durations', 'offsets', 'PRFs', 'DCs'] for mparam in mandatory_params: if mparam not in stim_params: raise InputError('Missing stimulation parameter field: "{}"'.format(mparam)) logger.info("Starting E-STIM simulation batch") # Generate simulations queue sim_queue = createSimQueue(stim_params['amps'], stim_params['durations'], stim_params['offsets'], stim_params['PRFs'], stim_params['DCs']) nqueue = sim_queue.shape[0] nsims = len(neurons) * nqueue # Initialize solver solver = SolverElec() # Initiate multiple processing objects if needed if multiprocess: mp.freeze_support() # Create tasks and results queues tasks = mp.JoinableQueue() results = mp.Queue() # Create and start consumer processes nconsumers = min(mp.cpu_count(), nsims) consumers = [Consumer(tasks, results) for i in range(nconsumers)] for w in consumers: w.start() # Run simulations wid = 0 filepaths = [] for nname in neurons: neuron = getNeuronsDict()[nname]() # Run simulations in queue for i in range(nqueue): - - wid += 1 Astim, tstim, toffset, PRF, DC = sim_queue[i, :] worker = EStimWorker(wid, batch_dir, log_filepath, solver, neuron, Astim, tstim, toffset, PRF, DC, nsims) if multiprocess: tasks.put(worker, block=False) else: logger.info('%s', worker) output_filepath = worker.__call__() filepaths.append(output_filepath) + wid += 1 if multiprocess: # Stop processes for i in range(nconsumers): tasks.put(None, block=False) tasks.join() # Retrieve workers output for x in range(nsims): output_filepath = results.get() filepaths.append(output_filepath) # Close tasks and results queues tasks.close() results.close() return filepaths def titrateEStimBatch(batch_dir, log_filepath, neurons, stim_params, multiprocess=False): ''' Run batch electrical titrations of the system for various neuron types and stimulation parameters, to determine the threshold of a specific stimulus parameter for neural excitation. :param batch_dir: full path to output directory of batch :param log_filepath: full path log file of batch :param neurons: list of neurons names :param stim_params: dictionary containing sweeps for all stimulation parameters :param multiprocess: boolean statting wether or not to use multiprocessing :return: list of full paths to the output files ''' logger.info("Starting E-STIM titration batch") # Determine titration parameter and titrations list if 'durations' not in stim_params: t_type = 't' sim_queue = createSimQueue(stim_params['amps'], [None], [TITRATION_T_OFFSET], stim_params['PRFs'], stim_params['DCs']) elif 'amps' not in stim_params: t_type = 'A' sim_queue = createSimQueue([None], stim_params['durations'], [TITRATION_T_OFFSET] * len(stim_params['durations']), stim_params['PRFs'], stim_params['DCs']) elif 'DC' not in stim_params: t_type = 'DC' sim_queue = createSimQueue(stim_params['amps'], stim_params['durations'], [TITRATION_T_OFFSET] * len(stim_params['durations']), stim_params['PRFs'], [None]) nqueue = sim_queue.shape[0] # Create SolverElec instance solver = SolverElec() # Initiate multiple processing objects if needed nsims = len(neurons) * nqueue if multiprocess: mp.freeze_support() # Create tasks and results queues tasks = mp.JoinableQueue() results = mp.Queue() # Create and start consumer processes nconsumers = min(mp.cpu_count(), nsims) consumers = [Consumer(tasks, results) for i in range(nconsumers)] for w in consumers: w.start() # Run titrations wid = 0 filepaths = [] for nname in neurons: neuron = getNeuronsDict()[nname]() for i in range(nqueue): - wid += 1 - # Extract parameters Astim, tstim, toffset, PRF, DC = sim_queue[i, :] # Get date and time info date_str = time.strftime("%Y.%m.%d") daytime_str = time.strftime("%H:%M:%S") try: # Run titration titrator = EStimTitrator(wid, solver, neuron, *sim_queue[i, :], nsims) if multiprocess: tasks.put(titrator, block=False) else: logger.info('%s', titrator) (output_thr, t, y, states, lat, tcomp) = titrator.__call__() Vm, *channels = y # Determine output variable if t_type == 'A': Astim = output_thr elif t_type == 't': tstim = output_thr elif t_type == 'DC': DC = output_thr # Define output naming simcode = 'ESTIM_{}_{}_{:.1f}mA_per_m2_{:.0f}ms{}'\ .format(neuron.name, 'CW' if DC == 1. else 'PW', Astim, tstim * 1e3, '_PRF{:.2f}Hz_DC{:.2f}%'.format(PRF, DC * 1e2) if DC < 1. else '') # Store dataframe and metadata df = pd.DataFrame({'t': t, 'states': states, 'Vm': Vm}) for j in range(len(neuron.states_names)): df[neuron.states_names[j]] = channels[j] meta = {'neuron': neuron.name, 'Astim': Astim, 'tstim': tstim, 'toffset': toffset, 'PRF': PRF, 'DC': DC, 'tcomp': tcomp} # Export into to PKL file output_filepath = '{}/{}.pkl'.format(batch_dir, simcode) with open(output_filepath, 'wb') as fh: pickle.dump({'meta': meta, 'data': df}, fh) logger.info('simulation data exported to "%s"', output_filepath) filepaths.append(output_filepath) # Detect spikes on Qm signal dt = t[1] - t[0] ipeaks, *_ = findPeaks(Vm, SPIKE_MIN_VAMP, int(np.ceil(SPIKE_MIN_DT / dt)), SPIKE_MIN_VPROM) n_spikes = ipeaks.size lat = t[ipeaks[0]] if n_spikes > 0 else 'N/A' sr = np.mean(1 / np.diff(t[ipeaks])) if n_spikes > 1 else 'N/A' logger.info('%u spike%s detected', n_spikes, "s" if n_spikes > 1 else "") # Export key metrics to log file log = { 'A': date_str, 'B': daytime_str, 'C': neuron.name, 'D': Astim, 'E': tstim * 1e3, 'F': PRF * 1e-3 if DC < 1 else 'N/A', 'G': DC, 'H': t.size, 'I': round(tcomp, 2), 'J': n_spikes, 'K': lat * 1e3 if isinstance(lat, float) else 'N/A', 'L': sr * 1e-3 if isinstance(sr, float) else 'N/A' } if xlslog(log_filepath, 'Data', log) == 1: logger.info('log exported to "%s"', log_filepath) else: logger.error('log export to "%s" aborted', log_filepath) except (Warning, AssertionError) as inst: logger.warning('Integration error: %s. Continue batch? (y/n)', extra={inst}) user_str = input() if user_str not in ['y', 'Y']: return filepaths + wid += 1 if multiprocess: # Stop processes for i in range(nconsumers): tasks.put(None, block=False) tasks.join() # Retrieve workers output for x in range(nsims): (output_thr, t, y, states, lat, tcomp) = results.get() Vm, *channels = y # Determine output variable if t_type == 'A': Astim = output_thr elif t_type == 't': tstim = output_thr elif t_type == 'DC': DC = output_thr # Define output naming simcode = 'ESTIM_{}_{}_{:.1f}mA_per_m2_{:.0f}ms{}'\ .format(neuron.name, 'CW' if DC == 1. else 'PW', Astim, tstim * 1e3, '_PRF{:.2f}Hz_DC{:.2f}%'.format(PRF, DC * 1e2) if DC < 1. else '') # Store dataframe and metadata df = pd.DataFrame({'t': t, 'states': states, 'Vm': Vm}) for j in range(len(neuron.states_names)): df[neuron.states_names[j]] = channels[j] meta = {'neuron': neuron.name, 'Astim': Astim, 'tstim': tstim, 'toffset': toffset, 'PRF': PRF, 'DC': DC, 'tcomp': tcomp} # Export into to PKL file output_filepath = '{}/{}.pkl'.format(batch_dir, simcode) with open(output_filepath, 'wb') as fh: pickle.dump({'meta': meta, 'data': df}, fh) logger.info('simulation data exported to "%s"', output_filepath) filepaths.append(output_filepath) # Detect spikes on Qm signal dt = t[1] - t[0] ipeaks, *_ = findPeaks(Vm, SPIKE_MIN_VAMP, int(np.ceil(SPIKE_MIN_DT / dt)), SPIKE_MIN_VPROM) n_spikes = ipeaks.size lat = t[ipeaks[0]] if n_spikes > 0 else 'N/A' sr = np.mean(1 / np.diff(t[ipeaks])) if n_spikes > 1 else 'N/A' logger.info('%u spike%s detected', n_spikes, "s" if n_spikes > 1 else "") # Export key metrics to log file log = { 'A': date_str, 'B': daytime_str, 'C': neuron.name, 'D': Astim, 'E': tstim * 1e3, 'F': PRF * 1e-3 if DC < 1 else 'N/A', 'G': DC, 'H': t.size, 'I': round(tcomp, 2), 'J': n_spikes, 'K': lat * 1e3 if isinstance(lat, float) else 'N/A', 'L': sr * 1e-3 if isinstance(sr, float) else 'N/A' } if xlslog(log_filepath, 'Data', log) == 1: logger.info('log exported to "%s"', log_filepath) else: logger.error('log export to "%s" aborted', log_filepath) # Close tasks and results queues tasks.close() results.close() return filepaths def runAStimBatch(batch_dir, log_filepath, neurons, stim_params, a, int_method='sonic', multiprocess=False): ''' Run batch simulations of the system for various neuron types, sonophore and stimulation parameters. :param batch_dir: full path to output directory of batch :param log_filepath: full path log file of batch :param neurons: list of neurons names :param stim_params: dictionary containing sweeps for all stimulation parameters :param a: BLS structure diameter (m) :param int_method: selected integration method :param multiprocess: boolean statting wether or not to use multiprocessing :return: list of full paths to the output files ''' mandatory_params = ['freqs', 'amps', 'durations', 'offsets', 'PRFs', 'DCs'] for mparam in mandatory_params: if mparam not in stim_params: raise InputError('Missing stimulation parameter field: "{}"'.format(mparam)) logger.info("Starting A-STIM simulation batch") # Generate simulations queue sim_queue = createSimQueue(stim_params['amps'], stim_params['durations'], stim_params['offsets'], stim_params['PRFs'], stim_params['DCs']) nqueue = sim_queue.shape[0] nsims = len(neurons) * len(stim_params['freqs']) * nqueue # Initiate multiple processing objects if needed if multiprocess: mp.freeze_support() # Create tasks and results queues tasks = mp.JoinableQueue() results = mp.Queue() # Create and start consumer processes nconsumers = min(mp.cpu_count(), nsims) consumers = [Consumer(tasks, results) for i in range(nconsumers)] for w in consumers: w.start() # Run simulations wid = 0 filepaths = [] for nname in neurons: neuron = getNeuronsDict()[nname]() for Fdrive in stim_params['freqs']: # Initialize SolverUS solver = SolverUS(a, neuron, Fdrive) # Run simulations in queue for i in range(nqueue): - wid += 1 Adrive, tstim, toffset, PRF, DC = sim_queue[i, :] worker = AStimWorker(wid, batch_dir, log_filepath, solver, neuron, Fdrive, Adrive, tstim, toffset, PRF, DC, int_method, nsims) if multiprocess: tasks.put(worker, block=False) else: logger.info('%s', worker) output_filepath = worker.__call__() filepaths.append(output_filepath) + wid += 1 if multiprocess: # Stop processes for i in range(nconsumers): tasks.put(None, block=False) tasks.join() # Retrieve workers output for x in range(nsims): output_filepath = results.get() filepaths.append(output_filepath) # Close tasks and results queues tasks.close() results.close() return filepaths def titrateAStimBatch(batch_dir, log_filepath, neurons, stim_params, a, int_method='sonic', multiprocess=False): ''' Run batch acoustic titrations of the system for various neuron types, sonophore and stimulation parameters, to determine the threshold of a specific stimulus parameter for neural excitation. :param batch_dir: full path to output directory of batch :param log_filepath: full path log file of batch :param neurons: list of neurons names :param stim_params: dictionary containing sweeps for all stimulation parameters :param a: BLS structure diameter (m) :param int_method: selected integration method :param multiprocess: boolean statting wether or not to use multiprocessing :return: list of full paths to the output files ''' logger.info("Starting A-STIM titration batch") # Determine titration parameter and titrations list if 'durations' not in stim_params: t_type = 't' sim_queue = createSimQueue(stim_params['amps'], [None], [TITRATION_T_OFFSET], stim_params['PRFs'], stim_params['DCs']) elif 'amps' not in stim_params: t_type = 'A' sim_queue = createSimQueue([None], stim_params['durations'], [TITRATION_T_OFFSET] * len(stim_params['durations']), stim_params['PRFs'], stim_params['DCs']) elif 'DC' not in stim_params: t_type = 'DC' sim_queue = createSimQueue(stim_params['amps'], stim_params['durations'], [TITRATION_T_OFFSET] * len(stim_params['durations']), stim_params['PRFs'], [None]) print(sim_queue) nqueue = sim_queue.shape[0] nsims = len(neurons) * len(stim_params['freqs']) * nqueue - print(nsims) - # Initialize multiprocessing pobjects if needed if multiprocess: mp.freeze_support() # Create tasks and results queues tasks = mp.JoinableQueue() results = mp.Queue() # Create and start consumer processes nconsumers = min(mp.cpu_count(), nsims) consumers = [Consumer(tasks, results) for i in range(nconsumers)] for w in consumers: w.start() # Run titrations wid = 0 filepaths = [] for nname in neurons: neuron = getNeuronsDict()[nname]() for Fdrive in stim_params['freqs']: # Create SolverUS instance (modulus of embedding tissue depends on frequency) solver = SolverUS(a, neuron, Fdrive) for i in range(nqueue): - wid += 1 # Extract parameters Adrive, tstim, toffset, PRF, DC = sim_queue[i, :] # Get date and time info date_str = time.strftime("%Y.%m.%d") daytime_str = time.strftime("%H:%M:%S") # Run titration try: titrator = AStimTitrator(wid, solver, neuron, Fdrive, *sim_queue[i, :], int_method, nsims) if multiprocess: tasks.put(titrator, block=False) else: logger.info('%s', titrator) (output_thr, t, y, states, lat, tcomp) = titrator.__call__() Z, ng, Qm, Vm, *channels = y U = np.insert(np.diff(Z) / np.diff(t), 0, 0.0) # Determine output variable if t_type == 'A': Adrive = output_thr elif t_type == 't': tstim = output_thr elif t_type == 'DC': DC = output_thr # Define output naming simcode = 'ASTIM_{}_{}_{:.0f}nm_{:.0f}kHz_{:.1f}kPa_{:.0f}ms_{}{}'\ .format(neuron.name, 'CW' if DC == 1 else 'PW', solver.a * 1e9, Fdrive * 1e-3, Adrive * 1e-3, tstim * 1e3, 'PRF{:.2f}Hz_DC{:.2f}%_'.format(PRF, DC * 1e2) if DC < 1. else '', int_method) # Store dataframe and metadata df = pd.DataFrame({'t': t, 'states': states, 'U': U, 'Z': Z, 'ng': ng, 'Qm': Qm, 'Vm': Vm}) for j in range(len(neuron.states_names)): df[neuron.states_names[j]] = channels[j] meta = {'neuron': neuron.name, 'a': solver.a, 'd': solver.d, 'Fdrive': Fdrive, 'Adrive': Adrive, 'phi': np.pi, 'tstim': tstim, 'toffset': toffset, 'PRF': PRF, 'DC': DC, 'tcomp': tcomp} # Export into to PKL file output_filepath = '{}/{}.pkl'.format(batch_dir, simcode) with open(output_filepath, 'wb') as fh: pickle.dump({'meta': meta, 'data': df}, fh) logger.debug('simulation data exported to "%s"', output_filepath) filepaths.append(output_filepath) # Detect spikes on Qm signal dt = t[1] - t[0] ipeaks, *_ = findPeaks(Qm, SPIKE_MIN_QAMP, int(np.ceil(SPIKE_MIN_DT / dt)), SPIKE_MIN_QPROM) n_spikes = ipeaks.size lat = t[ipeaks[0]] if n_spikes > 0 else 'N/A' sr = np.mean(1 / np.diff(t[ipeaks])) if n_spikes > 1 else 'N/A' logger.info('%u spike%s detected', n_spikes, "s" if n_spikes > 1 else "") # Export key metrics to log file log = { 'A': date_str, 'B': daytime_str, 'C': neuron.name, 'D': solver.a * 1e9, 'E': solver.d * 1e6, 'F': Fdrive * 1e-3, 'G': Adrive * 1e-3, 'H': tstim * 1e3, 'I': PRF * 1e-3 if DC < 1 else 'N/A', 'J': DC, 'K': int_method, 'L': t.size, 'M': round(tcomp, 2), 'N': n_spikes, 'O': lat * 1e3 if isinstance(lat, float) else 'N/A', 'P': sr * 1e-3 if isinstance(sr, float) else 'N/A' } if xlslog(log_filepath, 'Data', log) == 1: logger.info('log exported to "%s"', log_filepath) else: logger.error('log export to "%s" aborted', log_filepath) except (Warning, AssertionError) as inst: logger.warning('Integration error: %s. Continue batch? (y/n)', extra={inst}) user_str = input() if user_str not in ['y', 'Y']: return filepaths + wid += 1 if multiprocess: # Stop processes for i in range(nconsumers): tasks.put(None, block=False) tasks.join() # Retrieve workers output for x in range(nsims): (output_thr, t, y, states, lat, tcomp) = results.get() Z, ng, Qm, Vm, *channels = y U = np.insert(np.diff(Z) / np.diff(t), 0, 0.0) # Determine output variable if t_type == 'A': Adrive = output_thr elif t_type == 't': tstim = output_thr elif t_type == 'DC': DC = output_thr # Define output naming simcode = 'ASTIM_{}_{}_{:.0f}nm_{:.0f}kHz_{:.1f}kPa_{:.0f}ms_{}{}'\ .format(neuron.name, 'CW' if DC == 1 else 'PW', solver.a * 1e9, Fdrive * 1e-3, Adrive * 1e-3, tstim * 1e3, 'PRF{:.2f}Hz_DC{:.2f}%_'.format(PRF, DC * 1e2) if DC < 1. else '', int_method) # Store dataframe and metadata df = pd.DataFrame({'t': t, 'states': states, 'U': U, 'Z': Z, 'ng': ng, 'Qm': Qm, 'Vm': Vm}) for j in range(len(neuron.states_names)): df[neuron.states_names[j]] = channels[j] meta = {'neuron': neuron.name, 'a': solver.a, 'd': solver.d, 'Fdrive': Fdrive, 'Adrive': Adrive, 'phi': np.pi, 'tstim': tstim, 'toffset': toffset, 'PRF': PRF, 'DC': DC, 'tcomp': tcomp} # Export into to PKL file output_filepath = '{}/{}.pkl'.format(batch_dir, simcode) with open(output_filepath, 'wb') as fh: pickle.dump({'meta': meta, 'data': df}, fh) logger.debug('simulation data exported to "%s"', output_filepath) filepaths.append(output_filepath) # Detect spikes on Qm signal dt = t[1] - t[0] ipeaks, *_ = findPeaks(Qm, SPIKE_MIN_QAMP, int(np.ceil(SPIKE_MIN_DT / dt)), SPIKE_MIN_QPROM) n_spikes = ipeaks.size lat = t[ipeaks[0]] if n_spikes > 0 else 'N/A' sr = np.mean(1 / np.diff(t[ipeaks])) if n_spikes > 1 else 'N/A' logger.info('%u spike%s detected', n_spikes, "s" if n_spikes > 1 else "") # Export key metrics to log file log = { 'A': date_str, 'B': daytime_str, 'C': neuron.name, 'D': solver.a * 1e9, 'E': solver.d * 1e6, 'F': Fdrive * 1e-3, 'G': Adrive * 1e-3, 'H': tstim * 1e3, 'I': PRF * 1e-3 if DC < 1 else 'N/A', 'J': DC, 'K': int_method, 'L': t.size, 'M': round(tcomp, 2), 'N': n_spikes, 'O': lat * 1e3 if isinstance(lat, float) else 'N/A', 'P': sr * 1e-3 if isinstance(sr, float) else 'N/A' } if xlslog(log_filepath, 'Data', log) == 1: logger.info('log exported to "%s"', log_filepath) else: logger.error('log export to "%s" aborted', log_filepath) return filepaths def computeSpikeMetrics(filenames): ''' Analyze the charge density profile from a list of files and compute for each one of them the following spiking metrics: - latency (ms) - firing rate mean and standard deviation (Hz) - spike amplitude mean and standard deviation (nC/cm2) - spike width mean and standard deviation (ms) :param filenames: list of files to analyze :return: a dataframe with the computed metrics ''' # Initialize metrics dictionaries keys = [ 'latencies (ms)', 'mean firing rates (Hz)', 'std firing rates (Hz)', 'mean spike amplitudes (nC/cm2)', 'std spike amplitudes (nC/cm2)', 'mean spike widths (ms)', 'std spike widths (ms)' ] metrics = {k: [] for k in keys} # Compute spiking metrics for fname in filenames: # Load data from file logger.debug('loading data from file "{}"'.format(fname)) with open(fname, 'rb') as fh: frame = pickle.load(fh) df = frame['data'] meta = frame['meta'] tstim = meta['tstim'] t = df['t'].values Qm = df['Qm'].values dt = t[1] - t[0] # Detect spikes on charge profile mpd = int(np.ceil(SPIKE_MIN_DT / dt)) ispikes, prominences, widths, _ = findPeaks(Qm, SPIKE_MIN_QAMP, mpd, SPIKE_MIN_QPROM) widths *= dt if ispikes.size > 0: # Compute latency latency = t[ispikes[0]] # Select prior-offset spikes ispikes_prior = ispikes[t[ispikes] < tstim] else: latency = np.nan ispikes_prior = np.array([]) # Compute spikes widths and amplitude if ispikes_prior.size > 0: widths_prior = widths[:ispikes_prior.size] prominences_prior = prominences[:ispikes_prior.size] else: widths_prior = np.array([np.nan]) prominences_prior = np.array([np.nan]) # Compute inter-spike intervals and firing rates if ispikes_prior.size > 1: ISIs_prior = np.diff(t[ispikes_prior]) FRs_prior = 1 / ISIs_prior else: ISIs_prior = np.array([np.nan]) FRs_prior = np.array([np.nan]) # Log spiking metrics logger.debug('%u spikes detected (%u prior to offset)', ispikes.size, ispikes_prior.size) logger.debug('latency: %.2f ms', latency * 1e3) logger.debug('average spike width within stimulus: %.2f +/- %.2f ms', np.nanmean(widths_prior) * 1e3, np.nanstd(widths_prior) * 1e3) logger.debug('average spike amplitude within stimulus: %.2f +/- %.2f nC/cm2', np.nanmean(prominences_prior) * 1e5, np.nanstd(prominences_prior) * 1e5) logger.debug('average ISI within stimulus: %.2f +/- %.2f ms', np.nanmean(ISIs_prior) * 1e3, np.nanstd(ISIs_prior) * 1e3) logger.debug('average FR within stimulus: %.2f +/- %.2f Hz', np.nanmean(FRs_prior), np.nanstd(FRs_prior)) # Complete metrics dictionaries metrics['latencies (ms)'].append(latency * 1e3) metrics['mean firing rates (Hz)'].append(np.mean(FRs_prior)) metrics['std firing rates (Hz)'].append(np.std(FRs_prior)) metrics['mean spike amplitudes (nC/cm2)'].append(np.mean(prominences_prior) * 1e5) metrics['std spike amplitudes (nC/cm2)'].append(np.std(prominences_prior) * 1e5) metrics['mean spike widths (ms)'].append(np.mean(widths_prior) * 1e3) metrics['std spike widths (ms)'].append(np.std(widths_prior) * 1e3) # Return dataframe with metrics return pd.DataFrame(metrics, columns=metrics.keys()) def getCycleProfiles(a, f, A, Cm0, Qm0, Qm): ''' Run a mechanical simulation until periodic stabilization, and compute pressure profiles over the last acoustic cycle. :param a: in-plane diameter of the sonophore structure within the membrane (m) :param f: acoustic drive frequency (Hz) :param A: acoustic drive amplitude (Pa) :param Cm0: membrane resting capacitance (F/m2) :param Qm0: membrane resting charge density (C/m2) :param Qm: imposed membrane charge density (C/m2) :return: a dataframe with the time, kinematic and pressure profiles over the last cycle. ''' # Create sonophore object bls = BilayerSonophore(a, f, Cm0, Qm0) # Run default simulation and compute relevant profiles logger.info('Running mechanical simulation (a = %sm, f = %sHz, A = %sPa)', si_format(a, 1), si_format(f, 1), si_format(A, 1)) t, y, _ = bls.run(f, A, Qm, Pm_comp_method=PmCompMethod.direct) dt = (t[-1] - t[0]) / (t.size - 1) Z, ng = y[:, -NPC_FULL:] t = t[-NPC_FULL:] t -= t[0] logger.info('Computing pressure cyclic profiles') R = bls.v_curvrad(Z) U = np.diff(Z) / dt U = np.hstack((U, U[-1])) data = { 't': t, 'Z': Z, 'Cm': bls.v_Capct(Z), 'P_M': bls.v_PMavg(Z, R, bls.surface(Z)), 'P_Q': bls.Pelec(Z, Qm), 'P_{VE}': bls.PEtot(Z, R) + bls.PVleaflet(U, R), 'P_V': bls.PVfluid(U, R), 'P_G': bls.gasmol2Pa(ng, bls.volume(Z)), 'P_0': - np.ones(Z.size) * bls.P0 } return pd.DataFrame(data, columns=data.keys()) def runSweepSA(bls, f, A, Qm, params, rel_sweep): ''' Run mechanical simulations while varying multiple model parameters around their default value, and compute the relative changes in cycle-averaged sonophore membrane potential over the last acoustic period upon periodic stabilization. :param bls: BilayerSonophore object :param f: acoustic drive frequency (Hz) :param A: acoustic drive amplitude (Pa) :param Qm: imposed membrane charge density (C/m2) :param params: list of model parameters to explore :param rel_sweep: array of relative parameter changes :return: a dataframe with the cycle-averaged sonophore membrane potentials for the parameter variations, for each parameter. ''' nsweep = len(rel_sweep) logger.info('Starting sensitivity analysis (%u parameters, sweep size = %u)', len(params), nsweep) t0 = time.time() # Run default simulation and compute cycle-averaged membrane potential _, y, _ = bls.run(f, A, Qm, Pm_comp_method=PmCompMethod.direct) Z = y[0, -NPC_FULL:] Cm = bls.v_Capct(Z) # F/m2 Vmavg_default = np.mean(Qm / Cm) * 1e3 # mV # Create data dictionary for computed output changes data = {'relative input change': rel_sweep - 1} nsims = len(params) * nsweep for j, p in enumerate(params): default = getattr(bls, p) sweep = rel_sweep * default Vmavg = np.empty(nsweep) logger.info('Computing system\'s sentitivty to %s (default = %.2e)', p, default) for i, val in enumerate(sweep): # Re-initialize BLS object with modififed attribute setattr(bls, p, val) bls.reinit() # Run simulation and compute cycle-averaged membrane potential _, y, _ = bls.run(f, A, Qm, Pm_comp_method=PmCompMethod.direct) Z = y[0, -NPC_FULL:] Cm = bls.v_Capct(Z) # F/m2 Vmavg[i] = np.mean(Qm / Cm) * 1e3 # mV logger.info('simulation %u/%u: %s = %.2e (%+.1f %%) --> |Vm| = %.1f mV (%+.3f %%)', j * nsweep + i + 1, nsims, p, val, (val - default) / default * 1e2, Vmavg[i], (Vmavg[i] - Vmavg_default) / Vmavg_default * 1e2) # Fill in data dictionary data[p] = Vmavg # Set parameter back to default setattr(bls, p, default) tcomp = time.time() - t0 logger.info('Sensitivity analysis susccessfully completed in %.0f s', tcomp) # return pandas dataframe return pd.DataFrame(data, columns=data.keys()) def getActivationMap(root, neuron, a, f, tstim, toffset, PRF, amps, DCs): ''' Compute the activation map of a neuron at a given frequency and PRF, by computing the spiking metrics of simulation results over a 2D space (amplitude x duty cycle). :param root: directory containing the input data files :param neuron: neuron name :param a: sonophore diameter :param f: acoustic drive frequency (Hz) :param tstim: duration of US stimulation (s) :param toffset: duration of the offset (s) :param PRF: pulse repetition frequency (Hz) :param amps: vector of acoustic amplitudes (Pa) :param DCs: vector of duty cycles (-) :return the activation matrix ''' # Initialize activation map actmap = np.empty((amps.size, DCs.size)) # Loop through amplitudes and duty cycles nfiles = DCs.size * amps.size for i, A in enumerate(amps): for j, DC in enumerate(DCs): # Define filename PW_str = '_PRF{:.2f}Hz_DC{:.2f}%'.format(PRF, DC * 1e2) if DC < 1 else '' fname = ('ASTIM_{}_{}W_{:.0f}nm_{:.0f}kHz_{:.1f}kPa_{:.0f}ms{}_effective.pkl' .format(neuron, 'P' if DC < 1 else 'C', a * 1e9, f * 1e-3, A * 1e-3, tstim * 1e3, PW_str)) # Extract charge profile from data file fpath = os.path.join(root, fname) if os.path.isfile(fpath): logger.debug('Loading file {}/{}: "{}"'.format(i * amps.size + j + 1, nfiles, fname)) with open(fpath, 'rb') as fh: frame = pickle.load(fh) df = frame['data'] meta = frame['meta'] tstim = meta['tstim'] t = df['t'].values Qm = df['Qm'].values dt = t[1] - t[0] # Detect spikes on charge profile during stimulus mpd = int(np.ceil(SPIKE_MIN_DT / dt)) ispikes, *_ = findPeaks(Qm[t <= tstim], SPIKE_MIN_QAMP, mpd, SPIKE_MIN_QPROM) # Compute firing metrics if ispikes.size == 0: # if no spike, assign -1 actmap[i, j] = -1 elif ispikes.size == 1: # if only 1 spike, assign 0 actmap[i, j] = 0 else: # if more than 1 spike, assign firing rate FRs = 1 / np.diff(t[ispikes]) actmap[i, j] = np.mean(FRs) else: logger.error('"{}" file not found'.format(fname)) actmap[i, j] = np.nan return actmap def getMaxMap(key, root, neuron, a, f, tstim, toffset, PRF, amps, DCs, mode='max', cavg=False): ''' Compute the max. value map of a neuron's specific variable at a given frequency and PRF over a 2D space (amplitude x duty cycle). :param key: the variable name to find in the simulations dataframes :param root: directory containing the input data files :param neuron: neuron name :param a: sonophore diameter :param f: acoustic drive frequency (Hz) :param tstim: duration of US stimulation (s) :param toffset: duration of the offset (s) :param PRF: pulse repetition frequency (Hz) :param amps: vector of acoustic amplitudes (Pa) :param DCs: vector of duty cycles (-) :param mode: string indicating whether to search for maximum, minimum or absolute maximum :return the maximum matrix ''' # Initialize max map maxmap = np.empty((amps.size, DCs.size)) # Loop through amplitudes and duty cycles nfiles = DCs.size * amps.size for i, A in enumerate(amps): for j, DC in enumerate(DCs): # Define filename PW_str = '_PRF{:.2f}Hz_DC{:.2f}%'.format(PRF, DC * 1e2) if DC < 1 else '' fname = ('ASTIM_{}_{}W_{:.0f}nm_{:.0f}kHz_{:.1f}kPa_{:.0f}ms{}_effective.pkl' .format(neuron, 'P' if DC < 1 else 'C', a * 1e9, f * 1e-3, A * 1e-3, tstim * 1e3, PW_str)) # Extract charge profile from data file fpath = os.path.join(root, fname) if os.path.isfile(fpath): logger.debug('Loading file {}/{}: "{}"'.format(i * amps.size + j + 1, nfiles, fname)) with open(fpath, 'rb') as fh: frame = pickle.load(fh) df = frame['data'] t = df['t'].values if key in df: x = df[key].values else: x = eval(key) if cavg: x = getCycleAverage(t, x, 1 / PRF) if mode == 'min': maxmap[i, j] = x.min() elif mode == 'max': maxmap[i, j] = x.max() elif mode == 'absmax': maxmap[i, j] = np.abs(x).max() else: maxmap[i, j] = np.nan return maxmap def computeAStimLookups(neuron, aref, fref, Aref, phi=np.pi, multiprocess=False): ''' Run simulations of the mechanical system for a multiple combinations of imposed US frequencies, acoustic amplitudes and charge densities, compute effective coefficients and store them in a dictionary of 3D arrays. :param neuron: neuron object :param aref: array of sonophore diameters (m) :param fref: array of acoustic drive frequencies (Hz) :param Aref: array of acoustic drive amplitudes (Pa) :param phi: acoustic drive phase (rad) :param multiprocess: boolean statting wether or not to use multiprocessing :return: lookups dictionary ''' # Check validity of input parameters if not isinstance(neuron, BaseMech): raise InputError('Invalid neuron type: "{}" (must inherit from BaseMech class)' .format(neuron.name)) for key, values in {'diameters': aref, 'frequencies': fref, 'amplitudes': Aref}.items(): if not (isinstance(values, list) or isinstance(values, np.ndarray)): raise InputError('Invalid {} (must be provided as list or numpy array)'.format(key)) if not all(isinstance(x, float) for x in values): raise InputError('Invalid {} (must all be float typed)'.format(key)) if len(values) == 0: raise InputError('Empty {} array'.format(key)) if key in ('diameters', 'frequencies') and min(values) <= 0: raise InputError('Invalid {} (must all be strictly positive)'.format(key)) if key is 'amplitudes' and min(values) < 0: raise InputError('Invalid {} (must all be positive or null)'.format(key)) logger.info('Starting batch lookup creation for %s neuron', neuron.name) t0 = time.time() # Get input dimensions na, nf, nA = len(aref), len(fref), len(Aref) # Create neuron-specific charge vector Qref = np.arange(neuron.Qbounds[0], neuron.Qbounds[1] + 1e-5, 1e-5) # C/m2 nQ = Qref.size dims = (na, nf, nA, nQ) # Initialize lookup dictionary of 3D array to store effective coefficients coeffs_names = ['V', 'ng', *neuron.coeff_names] ncoeffs = len(coeffs_names) lookup_dict = {cn: np.empty(dims) for cn in coeffs_names} # Initiate multipleprocessing objects if needed if multiprocess: mp.freeze_support() # Create tasks and results queues tasks = mp.JoinableQueue() results = mp.Queue() # Create and start consumer processes nconsumers = mp.cpu_count() consumers = [Consumer(tasks, results) for i in range(nconsumers)] for w in consumers: w.start() # Loop through all (a, f, A, Q) combinations and launch workers nsims = np.prod(np.array(dims)) wid = 0 for ia, a in enumerate(aref): # Initialize BLS object bls = BilayerSonophore(a, 0.0, neuron.Cm0, neuron.Cm0 * neuron.Vm0 * 1e-3) for iF, f in enumerate(fref): for iA, A in enumerate(Aref): for iQ, Q in enumerate(Qref): worker = LookupWorker(wid, bls, neuron, f, A, Q, phi, nsims) if multiprocess: tasks.put(worker, block=False) else: logger.info('%s', worker) _, Qcoeffs = worker.__call__() for icoeff in range(ncoeffs): lookup_dict[coeffs_names[icoeff]][ia, iF, iA, iQ] = Qcoeffs[icoeff] wid += 1 if multiprocess: # Stop processes for i in range(nconsumers): tasks.put(None, block=False) tasks.join() # Retrieve workers output for x in range(nsims): wid, Qcoeffs = results.get() ia, iF, iA, iQ = nDindexes(dims, wid) for icoeff in range(ncoeffs): lookup_dict[coeffs_names[icoeff]][ia, iF, iA, iQ] = Qcoeffs[icoeff] # Close tasks and results queues tasks.close() results.close() # Add input vectors to lookup dictionary lookup_dict['a'] = aref # nm lookup_dict['f'] = fref # Hz lookup_dict['A'] = Aref # Pa lookup_dict['Q'] = Qref # C/m2 logger.info('%s lookups computed in %.0f s', neuron.name, time.time() - t0) return lookup_dict def computeTestLookups(neuron, aref, fref, Aref, phi=np.pi, multiprocess=False): ''' Run simulations of the mechanical system for a multiple combinations of imposed US frequencies, acoustic amplitudes and charge densities, compute effective coefficients and store them in a dictionary of 3D arrays. :param neuron: neuron object :param aref: array of sonophore diameters (m) :param fref: array of acoustic drive frequencies (Hz) :param Aref: array of acoustic drive amplitudes (Pa) :param phi: acoustic drive phase (rad) :param multiprocess: boolean statting wether or not to use multiprocessing :return: lookups dictionary ''' logger.info('Starting dummy lookup creation for %s neuron', neuron.name) t0 = time.time() # Get input dimensions na, nf, nA = len(aref), len(fref), len(Aref) # Create neuron-specific charge vector Qref = np.arange(neuron.Qbounds[0], neuron.Qbounds[1] + 1e-5, 1e-5) # C/m2 nQ = Qref.size dims = (na, nf, nA, nQ) # Initialize lookup dictionary of 3D array to store effective coefficients coeffs_names = ['V', 'ng', *neuron.coeff_names] ncoeffs = len(coeffs_names) lookup_dict = {cn: np.empty(dims) for cn in coeffs_names} # Initiate multipleprocessing objects if needed if multiprocess: mp.freeze_support() # Create tasks and results queues tasks = mp.JoinableQueue() results = mp.Queue() # Create and start consumer processes nconsumers = mp.cpu_count() consumers = [Consumer(tasks, results) for i in range(nconsumers)] for w in consumers: w.start() # Loop through all (a, f, A, Q) combinations and launch workers nsims = np.prod(np.array(dims)) wid = 0 for ia, a in enumerate(aref): for iF, f in enumerate(fref): for iA, A in enumerate(Aref): for iQ, Q in enumerate(Qref): worker = Worker(wid, nsims, ncoeffs) if multiprocess: tasks.put(worker, block=False) else: logger.info('%s', worker) _, Qcoeffs = worker.__call__() for icoeff in range(ncoeffs): lookup_dict[coeffs_names[icoeff]][ia, iF, iA, iQ] = Qcoeffs[icoeff] wid += 1 if multiprocess: # Stop processes for i in range(nconsumers): tasks.put(None, block=False) tasks.join() # Retrieve workers output for x in range(nsims): wid, Qcoeffs = results.get() ia, iF, iA, iQ = nDindexes(dims, wid) for icoeff in range(ncoeffs): lookup_dict[coeffs_names[icoeff]][ia, iF, iA, iQ] = Qcoeffs[icoeff] # Close tasks and results queues tasks.close() results.close() # Add input vectors to lookup dictionary lookup_dict['a'] = aref # nm lookup_dict['f'] = fref # Hz lookup_dict['A'] = Aref # Pa lookup_dict['Q'] = Qref # C/m2 logger.info('%s lookups computed in %.0f s', neuron.name, time.time() - t0) return lookup_dict diff --git a/PySONIC/utils.py b/PySONIC/utils.py index d259add..8abcec0 100644 --- a/PySONIC/utils.py +++ b/PySONIC/utils.py @@ -1,553 +1,563 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Date: 2016-09-19 22:30:46 # @Email: theo.lemaire@epfl.ch # @Last Modified by: Theo Lemaire -# @Last Modified time: 2018-09-15 17:11:56 +# @Last Modified time: 2018-09-19 15:43:48 """ Definition of generic utility functions used in other modules """ from enum import Enum import operator import os import pickle import tkinter as tk from tkinter import filedialog import inspect import json import yaml from openpyxl import load_workbook import numpy as np import colorlog from scipy.interpolate import interp1d from . import neurons def setLogger(): log_formatter = colorlog.ColoredFormatter( '%(log_color)s %(asctime)s %(message)s', datefmt='%d/%m/%Y %H:%M:%S:', reset=True, log_colors={ 'DEBUG': 'green', 'INFO': 'white', 'WARNING': 'yellow', 'ERROR': 'red', 'CRITICAL': 'red,bg_white', }, style='%' ) log_handler = colorlog.StreamHandler() log_handler.setFormatter(log_formatter) color_logger = colorlog.getLogger('PySONIC') color_logger.addHandler(log_handler) return color_logger # Get package logger logger = setLogger() class InputError(Exception): ''' Exception raised for errors in the input. ''' pass class PmCompMethod(Enum): """ Enum: types of computation method for the intermolecular pressure """ direct = 1 predict = 2 def loadYAML(filepath): """ Load a dictionary of parameters from an external YAML file. :param filepath: full path to the YAML file :return: parameters dictionary """ _, filename = os.path.split(filepath) logger.debug('Loading parameters from "%s"', filename) with open(filepath, 'r') as f: stream = f.read() params = yaml.load(stream) return ParseNestedDict(params) def getLookupDir(): """ Return the location of the directory holding lookups files. :return: absolute path to the directory """ this_dir, _ = os.path.split(__file__) return os.path.join(this_dir, 'lookups') def ParseNestedDict(dict_in): """ Loop through a nested dictionary object and convert all string fields to floats. """ for key, value in dict_in.items(): if isinstance(value, dict): # If value itself is dictionary dict_in[key] = ParseNestedDict(value) elif isinstance(dict_in[key], str): dict_in[key] = float(dict_in[key]) return dict_in def OpenFilesDialog(filetype, dirname=''): """ Open a FileOpenDialogBox to select one or multiple file. The default directory and file type are given. :param dirname: default directory :param filetype: default file type :return: tuple of full paths to the chosen filenames """ root = tk.Tk() root.withdraw() filenames = filedialog.askopenfilenames(filetypes=[(filetype + " files", '.' + filetype)], initialdir=dirname) if filenames: par_dir = os.path.abspath(os.path.join(filenames[0], os.pardir)) else: par_dir = None return (filenames, par_dir) +def selectDirDialog(): + """ Open a dialog box to select a directory. + + :return: full path to selected directory + """ + root = tk.Tk() + root.withdraw() + return filedialog.askdirectory() + + def ImportExcelCol(filename, sheetname, colstr, startrow): """ Load a specific column of an excel workbook as a numpy array. :param filename: absolute or relative path to the Excel workbook :param sheetname: name of the Excel spreadsheet to which data is appended :param colstr: string of the column to import :param startrow: index of the first row to consider :return: 1D numpy array with the column data """ wb = load_workbook(filename, read_only=True) ws = wb[sheetname] range_start_str = colstr + str(startrow) range_stop_str = colstr + str(ws.max_row) tmp = np.array([[i.value for i in j] for j in ws[range_start_str:range_stop_str]]) return tmp[:, 0] def ConstructMatrix(serialized_inputA, serialized_inputB, serialized_output): """ Construct a 2D output matrix from serialized input. :param serialized_inputA: serialized input variable A :param serialized_inputB: serialized input variable B :param serialized_output: serialized output variable :return: 4-tuple with vectors of unique values of A (m) and B (n), output variable 2D matrix (m,n) and number of holes in the matrix """ As = np.unique(serialized_inputA) Bs = np.unique(serialized_inputB) nA = As.size nB = Bs.size output = np.zeros((nA, nB)) output[:] = np.NAN nholes = 0 for i in range(nA): iA = np.where(serialized_inputA == As[i]) for j in range(nB): iB = np.where(serialized_inputB == Bs[j]) iMatch = np.intersect1d(iA, iB) if iMatch.size == 0: nholes += 1 elif iMatch.size > 1: logger.warning('Identical serialized inputs with values (%f, %f)', As[i], Bs[j]) else: iMatch = iMatch[0] output[i, j] = serialized_output[iMatch] return (As, Bs, output, nholes) def rmse(x1, x2): """ Compute the root mean square error between two 1D arrays """ return np.sqrt(((x1 - x2) ** 2).mean()) def rsquared(x1, x2): ''' compute the R-squared coefficient between two 1D arrays ''' residuals = x1 - x2 ss_res = np.sum(residuals**2) ss_tot = np.sum((x1 - np.mean(x1))**2) return 1 - (ss_res / ss_tot) def DownSample(t_dense, y, nsparse): """ Decimate periodic signals to a specified number of samples.""" if(y.ndim) > 1: nsignals = y.shape[0] else: nsignals = 1 y = np.array([y]) # determine time step and period of input signal T = t_dense[-1] - t_dense[0] dt_dense = t_dense[1] - t_dense[0] # resample time vector linearly t_ds = np.linspace(t_dense[0], t_dense[-1], nsparse) # create MAV window nmav = int(0.03 * T / dt_dense) if nmav % 2 == 0: nmav += 1 mav = np.ones(nmav) / nmav # determine signals padding npad = int((nmav - 1) / 2) # determine indexes of sampling on convolved signals ids = np.round(np.linspace(0, t_dense.size - 1, nsparse)).astype(int) y_ds = np.empty((nsignals, nsparse)) # loop through signals for i in range(nsignals): # pad, convolve and resample pad_left = y[i, -(npad + 2):-2] pad_right = y[i, 1:npad + 1] y_ext = np.concatenate((pad_left, y[i, :], pad_right), axis=0) y_mav = np.convolve(y_ext, mav, mode='valid') y_ds[i, :] = y_mav[ids] if nsignals == 1: y_ds = y_ds[0, :] return (t_ds, y_ds) def Pressure2Intensity(p, rho=1075.0, c=1515.0): """ Return the spatial peak, pulse average acoustic intensity (ISPPA) associated with the specified pressure amplitude. :param p: pressure amplitude (Pa) :param rho: medium density (kg/m3) :param c: speed of sound in medium (m/s) :return: spatial peak, pulse average acoustic intensity (W/m2) """ return p**2 / (2 * rho * c) def Intensity2Pressure(I, rho=1075.0, c=1515.0): """ Return the pressure amplitude associated with the specified spatial peak, pulse average acoustic intensity (ISPPA). :param I: spatial peak, pulse average acoustic intensity (W/m2) :param rho: medium density (kg/m3) :param c: speed of sound in medium (m/s) :return: pressure amplitude (Pa) """ return np.sqrt(2 * rho * c * I) def find_nearest(array, value): ''' Find nearest element in 1D array. ''' idx = (np.abs(array - value)).argmin() return (idx, array[idx]) def rescale(x, lb=None, ub=None, lb_new=0, ub_new=1): ''' Rescale a value to a specific interval by linear transformation. ''' if lb is None: lb = x.min() if ub is None: ub = x.max() xnorm = (x - lb) / (ub - lb) return xnorm * (ub_new - lb_new) + lb_new def LennardJones(x, beta, alpha, C, m, n): """ Generic expression of a Lennard-Jones function, adapted for the context of symmetric deflection (distance = 2x). :param x: deflection (i.e. half-distance) :param beta: x-shifting factor :param alpha: x-scaling factor :param C: y-scaling factor :param m: exponent of the repulsion term :param n: exponent of the attraction term :return: Lennard-Jones potential at given distance (2x) """ return C * (np.power((alpha / (2 * x + beta)), m) - np.power((alpha / (2 * x + beta)), n)) def getNeuronsDict(): ''' Return dictionary of neurons classes that can be instantiated. ''' neurons_dict = {} for _, obj in inspect.getmembers(neurons): if inspect.isclass(obj) and isinstance(obj.name, str): neurons_dict[obj.name] = obj return neurons_dict def get_BLS_lookups(): lookup_path = getLookupDir() + '/BLS_lookups.json' try: with open(lookup_path) as fh: sample = json.load(fh) return sample except FileNotFoundError: return {} def save_BLS_lookups(lookups): """ Save BLS parameter into specific lookup file :return: absolute path to the directory """ lookup_path = getLookupDir() + '/BLS_lookups.json' with open(lookup_path, 'w') as fh: json.dump(lookups, fh, indent=2) def extractCompTimes(filenames): ''' Extract computation times from a list of simulation files. ''' tcomps = np.empty(len(filenames)) for i, fn in enumerate(filenames): logger.info('Loading data from "%s"', fn) with open(fn, 'rb') as fh: frame = pickle.load(fh) meta = frame['meta'] tcomps[i] = meta['tcomp'] return tcomps def computeMeshEdges(x, scale='lin'): ''' Compute the appropriate edges of a mesh that quads a linear or logarihtmic distribution. :param x: the input vector :param scale: the type of distribution ('lin' for linear, 'log' for logarihtmic) :return: the edges vector ''' if scale is 'log': x = np.log10(x) dx = x[1] - x[0] if scale is 'lin': y = np.linspace(x[0] - dx / 2, x[-1] + dx / 2, x.size + 1) elif scale is 'log': y = np.logspace(x[0] - dx / 2, x[-1] + dx / 2, x.size + 1) return y si_prefixes = { 'y': 1e-24, # yocto 'z': 1e-21, # zepto 'a': 1e-18, # atto 'f': 1e-15, # femto 'p': 1e-12, # pico 'n': 1e-9, # nano 'u': 1e-6, # micro 'm': 1e-3, # mili '': 1e0, # None 'k': 1e3, # kilo 'M': 1e6, # mega 'G': 1e9, # giga 'T': 1e12, # tera 'P': 1e15, # peta 'E': 1e18, # exa 'Z': 1e21, # zetta 'Y': 1e24, # yotta } def si_format(x, precision=0, space=''): ''' Format a float according to the SI unit system, with the appropriate prefix letter. ''' if isinstance(x, float) or isinstance(x, int) or isinstance(x, np.float) or\ isinstance(x, np.int32) or isinstance(x, np.int64): if x == 0: factor = 1e0 prefix = '' else: sorted_si_prefixes = sorted(si_prefixes.items(), key=operator.itemgetter(1)) vals = [tmp[1] for tmp in sorted_si_prefixes] # vals = list(si_prefixes.values()) ix = np.searchsorted(vals, np.abs(x)) - 1 if np.abs(x) == vals[ix + 1]: ix += 1 factor = vals[ix] prefix = sorted_si_prefixes[ix][0] # prefix = list(si_prefixes.keys())[ix] return '{{:.{}f}}{}{}'.format(precision, space, prefix).format(x / factor) elif isinstance(x, list) or isinstance(x, tuple): return [si_format(item, precision, space) for item in x] elif isinstance(x, np.ndarray) and x.ndim == 1: return [si_format(float(item), precision, space) for item in x] else: print(type(x)) def getCycleAverage(t, y, T): ''' Compute the cycle-averaged profile of a signal given a specific periodicity. :param t: time vector (s) :param y: signal vector :param T: period (s) :return: cycle-averaged signal vector ''' nsamples = y.size ncycles = int((t[-1] - t[0]) // T) npercycle = int(nsamples // ncycles) return np.mean(np.reshape(y[:ncycles * npercycle], (ncycles, npercycle)), axis=1) def itrpLookupsFreq(lookups3D, freqs, Fdrive): """ Interpolate a dictionary of 3D lookups at a given frequency. :param lookups3D: dictionary of 3D lookups :param freqs: array of lookup frequencies (Hz) :param Fdrive: acoustic drive frequency (Hz) :return: a dictionary of 2D lookups interpolated a the given frequency """ # Converting frequencies to integers for a better equality check int_freqs = list(map(int, freqs)) # If Fdrive in lookup frequencies, simply take (A, Q) slice at Fdrive index if int(Fdrive) in int_freqs: iFdrive = np.searchsorted(int_freqs, int(Fdrive)) # logger.debug('Using lookups directly at %.2f kHz', freqs[iFdrive] * 1e-3) lookups2D = {key: np.squeeze(lookups3D[key][iFdrive, :, :]) for key in lookups3D.keys()} # Otherwise, interpolate linearly between 2 (A, Q) slices at Fdrive bounding values indexes else: ilb = np.searchsorted(freqs, Fdrive) - 1 iub = ilb + 1 # logger.debug('Interpolating lookups between %.2f kHz and %.2f kHz', # freqs[ilb] * 1e-3, freqs[iub] * 1e-3) lookups2D_lb = {key: np.squeeze(lookups3D[key][ilb, :, :]) for key in lookups3D.keys()} lookups2D_ub = {key: np.squeeze(lookups3D[key][iub, :, :]) for key in lookups3D.keys()} Fratio = (Fdrive - freqs[ilb]) / (freqs[iub] - freqs[ilb]) lookups2D = {key: lookups2D_lb[key] + (lookups2D_ub[key] - lookups2D_lb[key]) * Fratio for key in lookups3D.keys()} return lookups2D def getLookups2D(mechname, a, Fdrive): ''' Retrieve appropriate 2D lookup tables and reference vectors for a given membrane mechanism, sonophore diameter and US frequency. :param mechname: name of membrane density mechanism :param a: sonophore diameter (m) :param Fdrive: US frequency (Hz) :return: 3-tuple with 1D numpy arrays of reference acoustic amplitudes and charge densities, and a dictionary of 2D lookup numpy arrays ''' # Check lookup file existence lookup_file = '{}_lookups_a{:.1f}nm.pkl'.format(mechname, a * 1e9) # lookup_file = '{}_coeffs.pkl'.format(mechname) lookup_path = '{}/{}'.format(getLookupDir(), lookup_file) if not os.path.isfile(lookup_path): raise InputError('Missing lookup file: "{}"'.format(lookup_file)) # Load lookups dictionary with open(lookup_path, 'rb') as fh: # lookups4D = pickle.load(fh) lookups3D = pickle.load(fh) # Retrieve 1D inputs from lookups dictionary # aref = lookups4D.pop('a') # Fref = lookups4D.pop('f') # Aref = lookups4D.pop('A') # Qref = lookups4D.pop('Q') Fref = lookups3D.pop('f') Aref = lookups3D.pop('A') Qref = lookups3D.pop('Q') # Check that sonophore diameter is within lookup range # arange = (aref.min() - 1e-12, aref.max() + 1e-12) # if a < arange[0] or a > arange[1]: # raise InputError('Invalid sonophore diameter: {}m (must be within {}m - {}m lookup interval)' # .format(*si_format([a, *arange], precision=2, space=' '))) # Check that US frequency is within lookup range Frange = (Fref.min() - 1e-9, Fref.max() + 1e-9) if Fdrive < Frange[0] or Fdrive > Frange[1]: raise InputError('Invalid frequency: {}Hz (must be within {}Hz - {}Hz lookup interval)' .format(*si_format([Fdrive, *Frange], precision=2, space=' '))) # Interpolate 4D lookups at sonophore diameter and then at US frequency # lookups3D = {key: interp1d(aref, y4D, axis=0)(a) for key, y4D in lookups4D.items()} lookups2D = {key: interp1d(Fref, y3D, axis=0)(Fdrive) for key, y3D in lookups3D.items()} return Aref, Qref, lookups2D def nDindexes(dims, index): ''' Find index positions in a n-dimensional array. :param dims: dimensions of the n-dimensional array (tuple or list) :param index: index position in the flattened n-dimensional array :return: list of indexes along each array dimension ''' dims = list(dims) # Find size of each array dimension dims.reverse() dimsizes = [1] r = 1 for x in dims[:-1]: r *= x dimsizes.append(r) dims.reverse() dimsizes.reverse() # Find indexes indexes = [] remainder = index for dimsize in dimsizes[:-1]: idim, remainder = divmod(remainder, dimsize) indexes.append(idim) indexes.append(remainder) return indexes def pow10_format(number, precision=2): ''' Format a number in power of 10 notation. ''' ret_string = '{0:.{1:d}e}'.format(number, precision) a, b = ret_string.split("e") a = float(a) b = int(b) return '{}10^{{{}}}'.format('{} * '.format(a) if a != 1. else '', b) def checkNumBounds(values, bounds): ''' Check if a set of numbers is within predefined bounds. ''' # checking parameters against reference bounds for x, bound in zip(values, bounds): if x < bound[0] or x > bound[1]: raise ValueError('Input value {} out of [{}, {}] range'.format(x, bound[0], bound[1])) pass def getDefaultIndexes(params, defaults): ''' Return the indexes of default values found in lists of parameters. :param params: dictionary of parameter arrays :param defaults: dictionary of default values :return: dictionary of resolved default indexes ''' idefs = {} for key, default in defaults.items(): if default not in params[key]: raise Exception('default {} ({}) not found in parameter values'.format(key, default)) idefs[key] = np.where(params[key] == default)[0][0] return idefs diff --git a/sim/lookups_ASTIM.py b/sim/lookups_ASTIM.py index fe438b8..1286b23 100644 --- a/sim/lookups_ASTIM.py +++ b/sim/lookups_ASTIM.py @@ -1,102 +1,102 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Date: 2017-06-02 17:50:10 # @Email: theo.lemaire@epfl.ch # @Last Modified by: Theo Lemaire -# @Last Modified time: 2018-09-15 17:07:56 +# @Last Modified time: 2018-09-19 15:54:52 """ Create lookup table for specific neuron. """ import os import sys import pickle import logging import numpy as np from argparse import ArgumentParser from PySONIC.solvers import computeAStimLookups, computeTestLookups from PySONIC.utils import logger, InputError, getNeuronsDict, getLookupDir # Default parameters default = { 'neuron': 'RS', - 'diams': np.array([15.0, 32.0, 70.0, 150.0]), # nm + 'diams': np.array([16.0, 32.0, 64.0]), # nm 'freqs': np.array([20., 100., 500., 1e3, 2e3, 3e3, 4e3]), # kHz 'amps': np.insert(np.logspace(np.log10(0.1), np.log10(600), num=50), 0, 0.0), # kPa } def main(): # Define argument parser ap = ArgumentParser() # Stimulation parameters ap.add_argument('-n', '--neuron', type=str, default=default['neuron'], help='Neuron name (string)') ap.add_argument('-a', '--diameters', nargs='+', type=float, default=None, help='Sonophore diameters (nm)') ap.add_argument('-f', '--frequencies', nargs='+', type=float, default=None, help='Acoustic drive frequencies (kHz)') ap.add_argument('-A', '--amplitudes', nargs='+', type=float, default=None, help='Acoustic pressure amplitudes (kPa)') # Boolean parameters ap.add_argument('-m', '--multiprocessing', default=False, action='store_true', help='Use multiprocessing') ap.add_argument('-v', '--verbose', default=False, action='store_true', help='Increase verbosity') ap.add_argument('-t', '--test', default=False, action='store_true', help='Run test batch') # Parse arguments args = ap.parse_args() neuron_str = args.neuron diams = (default['diams'] if args.diameters is None else np.array(args.diameters)) * 1e-9 # m freqs = (default['freqs'] if args.frequencies is None else np.array(args.frequencies)) * 1e3 # Hz amps = (default['amps'] if args.amplitudes is None else np.array(args.amplitudes)) * 1e3 # Pa if args.verbose: logger.setLevel(logging.DEBUG) else: logger.setLevel(logging.INFO) # Check neuron name validity if neuron_str not in getNeuronsDict(): raise InputError('Unknown neuron type: "{}"'.format(neuron_str)) neuron = getNeuronsDict()[neuron_str]() # Check if lookup file already exists lookup_file = '{}_lookups.pkl'.format(neuron.name) lookup_filepath = '{0}/{1}'.format(getLookupDir(), lookup_file) if os.path.isfile(lookup_filepath): logger.warning('"%s" file already exists and will be overwritten. ' + 'Continue? (y/n)', lookup_file) user_str = input() if user_str not in ['y', 'Y']: logger.info('%s Lookup creation canceled', neuron.name) sys.exit(0) try: if args.test: # Compute test lookups lookup_dict = computeTestLookups(neuron, diams, freqs, amps, multiprocess=args.multiprocessing) else: # Compute real lookups lookup_dict = computeAStimLookups(neuron, diams, freqs, amps, multiprocess=args.multiprocessing) # Save dictionary in lookup file logger.info('Saving %s neuron lookup table in file: "%s"', neuron.name, lookup_file) with open(lookup_filepath, 'wb') as fh: pickle.dump(lookup_dict, fh) except InputError as err: logger.error(err) sys.exit(1) if __name__ == '__main__': main() diff --git a/sim/run_ASTIM.py b/sim/run_ASTIM.py index 05c5e36..3a93bf3 100644 --- a/sim/run_ASTIM.py +++ b/sim/run_ASTIM.py @@ -1,105 +1,104 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Date: 2017-02-13 18:16:09 # @Email: theo.lemaire@epfl.ch # @Last Modified by: Theo Lemaire -# @Last Modified time: 2018-08-25 02:14:36 +# @Last Modified time: 2018-09-19 16:02:28 """ Script to run ASTIM simulations from command line. """ import sys -import os import logging from argparse import ArgumentParser -from PySONIC.utils import logger, getNeuronsDict, InputError +from PySONIC.utils import logger, getNeuronsDict, InputError, selectDirDialog from PySONIC.solvers import checkBatchLog, SolverUS, AStimWorker from PySONIC.plt import plotBatch # Default parameters default = { 'neuron': 'RS', 'a': 32.0, # nm 'f': 500.0, # kHz 'A': 100.0, # kPa 't': 150.0, # ms 'off': 100.0, # ms 'PRF': 100.0, # Hz 'DC': 100.0, # % 'int_method': 'sonic' } def main(): # Define argument parser ap = ArgumentParser() # ASTIM parameters ap.add_argument('-n', '--neuron', type=str, default=default['neuron'], help='Neuron name (string)') ap.add_argument('-a', '--diameter', type=float, default=default['a'], help='Sonophore diameter (nm)') ap.add_argument('-f', '--frequency', type=float, default=default['f'], help='Acoustic drive frequency (kHz)') ap.add_argument('-A', '--amplitude', type=float, default=default['A'], help='Acoustic pressure amplitude (kPa)') ap.add_argument('-t', '--duration', type=float, default=default['t'], help='Stimulus duration (ms)') ap.add_argument('--offset', type=float, default=default['off'], help='Offset duration (ms)') ap.add_argument('--PRF', type=float, default=default['PRF'], help='PRF (Hz)') ap.add_argument('--DC', type=float, default=default['DC'], help='Duty cycle (%%)') - ap.add_argument('-o', '--outputdir', type=str, default=os.getcwd(), + ap.add_argument('-o', '--outputdir', type=str, default=None, help='Output directory') ap.add_argument('-m', '--method', type=str, default=default['int_method'], help='Numerical integration method ("classic", "hybrid" or "sonic"') # Boolean parameters ap.add_argument('-v', '--verbose', default=False, action='store_true', help='Increase verbosity') ap.add_argument('-p', '--plot', default=False, action='store_true', help='Plot results') # Parse arguments args = ap.parse_args() neuron_str = args.neuron a = args.diameter * 1e-9 # m Fdrive = args.frequency * 1e3 # Hz Adrive = args.amplitude * 1e3 # Pa tstim = args.duration * 1e-3 # s toffset = args.offset * 1e-3 # s PRF = args.PRF # Hz DC = args.DC * 1e-2 - output_dir = args.outputdir + output_dir = selectDirDialog() if args.outputdir is None else args.outputdir int_method = args.method if args.verbose: logger.setLevel(logging.DEBUG) else: logger.setLevel(logging.INFO) try: if neuron_str not in getNeuronsDict(): raise InputError('Unknown neuron type: "{}"'.format(neuron_str)) log_filepath, _ = checkBatchLog(output_dir, 'A-STIM') neuron = getNeuronsDict()[neuron_str]() - worker = AStimWorker(1, output_dir, log_filepath, SolverUS(a, neuron, Fdrive), neuron, + worker = AStimWorker(0, output_dir, log_filepath, SolverUS(a, neuron, Fdrive), neuron, Fdrive, Adrive, tstim, toffset, PRF, DC, int_method, 1) logger.info('%s', worker) outfile = worker.__call__() logger.info('Finished') if args.plot: plotBatch(output_dir, [outfile]) except InputError as err: logger.error(err) sys.exit(1) if __name__ == '__main__': main() diff --git a/sim/run_ESTIM.py b/sim/run_ESTIM.py index f3d7c06..b024fe4 100644 --- a/sim/run_ESTIM.py +++ b/sim/run_ESTIM.py @@ -1,93 +1,92 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Date: 2017-02-13 18:16:09 # @Email: theo.lemaire@epfl.ch # @Last Modified by: Theo Lemaire -# @Last Modified time: 2018-08-21 16:07:37 +# @Last Modified time: 2018-09-19 16:02:38 """ Script to run ESTIM simulations from command line. """ import sys -import os import logging from argparse import ArgumentParser -from PySONIC.utils import logger, getNeuronsDict, InputError +from PySONIC.utils import logger, getNeuronsDict, InputError, selectDirDialog from PySONIC.solvers import checkBatchLog, SolverElec, EStimWorker from PySONIC.plt import plotBatch # Default parameters default = { 'neuron': 'RS', 'A': 10.0, # mA/m2 't': 150.0, # ms 'off': 20.0, # ms 'PRF': 100.0, # Hz 'DC': 100.0 # % } def main(): # Define argument parser ap = ArgumentParser() # ASTIM parameters ap.add_argument('-n', '--neuron', type=str, default=default['neuron'], help='Neuron name (string)') ap.add_argument('-A', '--amplitude', type=float, default=default['A'], help='Stimulus amplitude (mA/m2)') ap.add_argument('-t', '--duration', type=float, default=default['t'], help='Stimulus duration (ms)') ap.add_argument('--offset', type=float, default=default['off'], help='Offset duration (ms)') ap.add_argument('--PRF', type=float, default=default['PRF'], help='PRF (Hz)') ap.add_argument('--DC', type=float, default=default['DC'], help='Duty cycle (%%)') - ap.add_argument('-o', '--outputdir', type=str, default=os.getcwd(), + ap.add_argument('-o', '--outputdir', type=str, default=None, help='Output directory') # Boolean arguments ap.add_argument('-v', '--verbose', default=False, action='store_true', help='Increase verbosity') ap.add_argument('-p', '--plot', default=False, action='store_true', help='Plot results') # Parse arguments args = ap.parse_args() neuron_str = args.neuron Astim = args.amplitude # mA/m2 tstim = args.duration * 1e-3 # s toffset = args.offset * 1e-3 # s PRF = args.PRF # Hz DC = args.DC * 1e-2 - output_dir = args.outputdir + output_dir = selectDirDialog() if args.outputdir is None else args.outputdir if args.verbose: logger.setLevel(logging.DEBUG) else: logger.setLevel(logging.INFO) try: if neuron_str not in getNeuronsDict(): raise InputError('Unknown neuron type: "{}"'.format(neuron_str)) log_filepath, _ = checkBatchLog(output_dir, 'E-STIM') neuron = getNeuronsDict()[neuron_str]() - worker = EStimWorker(1, output_dir, log_filepath, SolverElec(), neuron, Astim, tstim, toffset, + worker = EStimWorker(0, output_dir, log_filepath, SolverElec(), neuron, Astim, tstim, toffset, PRF, DC, 1) logger.info('%s', worker) outfile = worker.__call__() logger.info('Finished') if args.plot: plotBatch(output_dir, [outfile]) except InputError as err: logger.error(err) sys.exit(1) if __name__ == '__main__': main() diff --git a/sim/run_MECH.py b/sim/run_MECH.py index e39afa2..8afaf2e 100644 --- a/sim/run_MECH.py +++ b/sim/run_MECH.py @@ -1,94 +1,93 @@ # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Date: 2018-03-15 18:33:59 # @Last Modified by: Theo Lemaire -# @Last Modified time: 2018-08-21 16:07:37 +# @Last Modified time: 2018-09-19 16:02:47 """ Script to run MECH simulations from command line. """ import sys -import os import logging from argparse import ArgumentParser -from PySONIC.utils import logger, InputError +from PySONIC.utils import logger, InputError, selectDirDialog from PySONIC.bls import BilayerSonophore from PySONIC.solvers import checkBatchLog, MechWorker from PySONIC.plt import plotBatch # Default parameters default = { 'a': 32.0, # nm 'd': 0.0, # um 'f': 500.0, # kHz 'A': 100.0, # kPa 'Cm0': 1.0, # uF/cm2 'Qm0': 0.0, # nC/cm2 'Qm': 0.0, # nC/cm2 } def main(): # Define argument parser ap = ArgumentParser() # ASTIM parameters ap.add_argument('-a', '--diameter', type=float, default=default['a'], help='Sonophore diameter (nm)') ap.add_argument('-d', '--embedding', type=float, default=default['d'], help='Embedding depth (um)') ap.add_argument('-f', '--frequency', type=float, default=default['f'], help='Acoustic drive frequency (kHz)') ap.add_argument('-A', '--amplitude', type=float, default=default['A'], help='Acoustic pressure amplitude (kPa)') ap.add_argument('-Cm0', '--restcapct', type=float, default=default['Cm0'], help='Membrane resting capacitance (uF/cm2)') ap.add_argument('-Qm0', '--restcharge', type=float, default=default['Qm0'], help='Membrane resting charge density (nC/cm2)') ap.add_argument('-Qm', '--charge', type=float, default=default['Qm'], help='Applied charge density (nC/cm2)') - ap.add_argument('-o', '--outputdir', type=str, default=os.getcwd(), + ap.add_argument('-o', '--outputdir', type=str, default=None, help='Output directory') # Boolean parameters ap.add_argument('-v', '--verbose', default=False, action='store_true', help='Increase verbosity') ap.add_argument('-p', '--plot', default=False, action='store_true', help='Plot results') # Parse arguments args = ap.parse_args() a = args.diameter * 1e-9 # m d = args.embedding * 1e-6 # m Fdrive = args.frequency * 1e3 # Hz Adrive = args.amplitude * 1e3 # Pa Cm0 = args.restcapct * 1e-2 # F/m2 Qm0 = args.restcharge * 1e-5 # C/m2 Qm = args.charge * 1e-5 # C/m2 - output_dir = args.outputdir + output_dir = selectDirDialog() if args.outputdir is None else args.outputdir if args.verbose: logger.setLevel(logging.DEBUG) else: logger.setLevel(logging.INFO) try: log_filepath, _ = checkBatchLog(output_dir, 'MECH') - worker = MechWorker(1, output_dir, log_filepath, BilayerSonophore(a, Fdrive, Cm0, Qm0, d), + worker = MechWorker(0, output_dir, log_filepath, BilayerSonophore(a, Fdrive, Cm0, Qm0, d), Fdrive, Adrive, Qm, 1) logger.info('%s', worker) outfile = worker.__call__() logger.info('Finished') if args.plot: plotBatch(output_dir, [outfile]) except InputError as err: logger.error(err) sys.exit(1) if __name__ == '__main__': main()