diff --git a/PointNICE/solvers/SolverElec.py b/PointNICE/solvers/SolverElec.py index 8d6dd85..1af1c41 100644 --- a/PointNICE/solvers/SolverElec.py +++ b/PointNICE/solvers/SolverElec.py @@ -1,173 +1,173 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Date: 2016-09-29 16:16:19 # @Email: theo.lemaire@epfl.ch # @Last Modified by: Theo Lemaire -# @Last Modified time: 2018-03-12 20:16:06 +# @Last Modified time: 2018-03-13 14:30:51 import warnings import logging import numpy as np import scipy.integrate as integrate from ..constants import * # Get package logger logger = logging.getLogger('PointNICE') class SolverElec: def __init__(self): # Do nothing pass - def eqHH(self, _, y, ch_mech, Iinj): + def eqHH(self, _, y, neuron, Iinj): ''' Compute the derivatives of a HH system variables for a specific value of injected current. :param t: time value (s, unused) :param y: vector of HH system variables at time t - :param ch_mech: channels mechanism object + :param neuron: neuron object :param Iinj: injected current (mA/m2) :return: vector of HH system derivatives at time t ''' Vm, *states = y - Iionic = ch_mech.currNet(Vm, states) # mA/m2 - dVmdt = (- Iionic + Iinj) / ch_mech.Cm0 # mV/s - dstates = ch_mech.derStates(Vm, states) + Iionic = neuron.currNet(Vm, states) # mA/m2 + dVmdt = (- Iionic + Iinj) / neuron.Cm0 # mV/s + dstates = neuron.derStates(Vm, states) return [dVmdt, *dstates] - def run(self, ch_mech, Astim, tstim, toffset, PRF=None, DC=1.0): + def run(self, neuron, Astim, tstim, toffset, PRF=None, DC=1.0): ''' Compute solutions of a neuron's HH system for a specific set of electrical stimulation parameters, using a classic integration scheme. - :param ch_mech: channels mechanism object + :param neuron: neuron object :param Astim: pulse amplitude (mA/m2) :param tstim: pulse duration (s) :param toffset: offset duration (s) :param PRF: pulse repetition frequency (Hz) :param DC: pulse duty cycle (-) :return: 3-tuple with the time profile and solution matrix and a state vector ''' # Check validity of stimulation parameters for param in [Astim, tstim, toffset, DC]: assert isinstance(param, float), 'stimulation parameters must be float typed' assert tstim > 0, 'Stimulus duration must be strictly positive' assert toffset >= 0, 'Stimulus offset must be positive or null' assert DC > 0 and DC <= 1, 'Duty cycle must be within [0; 1)' if DC < 1.0: assert isinstance(PRF, float), 'if provided, the PRF parameter must be float typed' assert PRF is not None, 'PRF must be provided when using duty cycles smaller than 1' assert PRF >= 1 / tstim, 'PR interval must be smaller than stimulus duration' # Raise warnings as error warnings.filterwarnings('error') # Initialize system solver solver = integrate.ode(self.eqHH) solver.set_integrator('lsoda', nsteps=1000) # Determine system time step dt = DT_ESTIM # if CW stimulus: divide integration during stimulus into single interval if DC == 1.0: PRF = 1 / tstim # Compute vector sizes npulses = int(np.round(PRF * tstim)) Tpulse_on = DC / PRF Tpulse_off = (1 - DC) / PRF n_pulse_on = int(np.round(Tpulse_on / dt)) n_pulse_off = int(np.round(Tpulse_off / dt)) # For high-PRF pulsed protocols: adapt time step if greater than TON or TOFF dt_warning_msg = 'high-PRF protocol: lowering integration time step to %.2e ms to match %s' if Tpulse_on > 0 and n_pulse_on == 0: logger.warning(dt_warning_msg, Tpulse_on * 1e3, 'TON') dt = Tpulse_on n_pulse_on = int(np.round(Tpulse_on / dt)) n_pulse_off = int(np.round(Tpulse_off / dt)) if Tpulse_off > 0 and n_pulse_off == 0: logger.warning(dt_warning_msg, Tpulse_off * 1e3, 'TOFF') dt = Tpulse_off n_pulse_on = int(np.round(Tpulse_on / dt)) n_pulse_off = int(np.round(Tpulse_off / dt)) # Compute offset size n_off = int(np.round(toffset / dt)) # Set initial conditions - y0 = [ch_mech.Vm0, *ch_mech.states0] + y0 = [neuron.Vm0, *neuron.states0] nvar = len(y0) # Initialize global arrays t = np.array([0.]) states = np.array([1]) y = np.array([y0]).T # Initialize pulse time and states vectors t_pulse0 = np.linspace(0, Tpulse_on + Tpulse_off, n_pulse_on + n_pulse_off) states_pulse = np.concatenate((np.ones(n_pulse_on), np.zeros(n_pulse_off))) # Loop through all pulse (ON and OFF) intervals for i in range(npulses): # Construct and initialize arrays t_pulse = t_pulse0 + t[-1] y_pulse = np.empty((nvar, n_pulse_on + n_pulse_off)) y_pulse[:, 0] = y[:, -1] # Initialize iterator k = 0 # Integrate ON system - solver.set_f_params(ch_mech, Astim) + solver.set_f_params(neuron, Astim) solver.set_initial_value(y_pulse[:, k], t_pulse[k]) while solver.successful() and k < n_pulse_on - 1: k += 1 solver.integrate(t_pulse[k]) y_pulse[:, k] = solver.y # Integrate OFF system - solver.set_f_params(ch_mech, 0.0) + solver.set_f_params(neuron, 0.0) solver.set_initial_value(y_pulse[:, k], t_pulse[k]) while solver.successful() and k < n_pulse_on + n_pulse_off - 1: k += 1 solver.integrate(t_pulse[k]) y_pulse[:, k] = solver.y # Append pulse arrays to global arrays states = np.concatenate([states, states_pulse[1:]]) t = np.concatenate([t, t_pulse[1:]]) y = np.concatenate([y, y_pulse[:, 1:]], axis=1) # Integrate offset interval if n_off > 0: t_off = np.linspace(0, toffset, n_off) + t[-1] states_off = np.zeros(n_off) y_off = np.empty((nvar, n_off)) y_off[:, 0] = y[:, -1] solver.set_initial_value(y_off[:, 0], t_off[0]) - solver.set_f_params(ch_mech, 0.0) + solver.set_f_params(neuron, 0.0) k = 0 while solver.successful() and k < n_off - 1: k += 1 solver.integrate(t_off[k]) y_off[:, k] = solver.y # Concatenate offset arrays to global arrays states = np.concatenate([states, states_off[1:]]) t = np.concatenate([t, t_off[1:]]) y = np.concatenate([y, y_off[:, 1:]], axis=1) # Return output variables return (t, y, states) diff --git a/PointNICE/solvers/SolverUS.py b/PointNICE/solvers/SolverUS.py index 6846979..38d7ec3 100644 --- a/PointNICE/solvers/SolverUS.py +++ b/PointNICE/solvers/SolverUS.py @@ -1,787 +1,786 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Date: 2016-09-29 16:16:19 # @Email: theo.lemaire@epfl.ch # @Last Modified by: Theo Lemaire -# @Last Modified time: 2018-03-12 20:16:52 +# @Last Modified time: 2018-03-13 14:29:47 import os import warnings import pickle import logging import progressbar as pb import numpy as np import scipy.integrate as integrate from scipy.interpolate import interp2d from ..bls import BilayerSonophore from ..utils import * from ..constants import * from ..neurons import BaseMech # Get package logger logger = logging.getLogger('PointNICE') class SolverUS(BilayerSonophore): """ This class extends the BilayerSonophore class by adding a biophysical Hodgkin-Huxley model on top of the mechanical BLS model. """ - def __init__(self, diameter, ch_mech, Fdrive, embedding_depth=0.0): + def __init__(self, diameter, neuron, Fdrive, embedding_depth=0.0): """ Constructor of the class. :param diameter: in-plane diameter of the sonophore structure within the membrane (m) - :param ch_mech: channels mechanism object + :param neuron: neuron object :param Fdrive: frequency of acoustic perturbation (Hz) :param embedding_depth: depth of the embedding tissue around the membrane (m) """ # Check validity of input parameters - assert isinstance(ch_mech, BaseMech), ('channel mechanism must be inherited ' - 'from the BaseMech class') + assert isinstance(neuron, BaseMech), ('neuron mechanism must be inherited ' + 'from the BaseMech class') assert Fdrive >= 0., 'Driving frequency must be positive' # TODO: check parameters dictionary (float type, mandatory members) # Initialize BLS object - Cm0 = ch_mech.Cm0 - Vm0 = ch_mech.Vm0 + Cm0 = neuron.Cm0 + Vm0 = neuron.Vm0 BilayerSonophore.__init__(self, diameter, Fdrive, Cm0, Cm0 * Vm0 * 1e-3, embedding_depth) - logger.debug('US solver initialization with %s channel mechanism', ch_mech.name) + logger.debug('US solver initialization with %s neuron', neuron.name) - def eqHH(self, t, y, ch_mech, Cm): + def eqHH(self, t, y, neuron, Cm): """ Compute the derivatives of the n-ODE HH system variables, based on a value of membrane capacitance. :param t: specific instant in time (s) :param y: vector of HH system variables at time t - :param ch_mech: channels mechanism object + :param neuron: neuron object :param Cm: membrane capacitance (F/m2) :return: vector of HH system derivatives at time t """ # Split input vector explicitly Qm, *states = y # Compute membrane potential Vm = Qm / Cm * 1e3 # mV # Compute derivatives - dQm = - ch_mech.currNet(Vm, states) * 1e-3 # A/m2 - dstates = ch_mech.derStates(Vm, states) + dQm = - neuron.currNet(Vm, states) * 1e-3 # A/m2 + dstates = neuron.derStates(Vm, states) # Return derivatives vector return [dQm, *dstates] - def eqFull(self, t, y, ch_mech, Adrive, Fdrive, phi): + def eqFull(self, t, y, neuron, Adrive, Fdrive, phi): """ Compute the derivatives of the (n+3) ODE full NBLS system variables. :param t: specific instant in time (s) :param y: vector of state variables - :param ch_mech: channels mechanism object + :param neuron: neuron object :param Adrive: acoustic drive amplitude (Pa) :param Fdrive: acoustic drive frequency (Hz) :param phi: acoustic drive phase (rad) :return: vector of derivatives """ # Compute derivatives of mechanical and electrical systems dydt_mech = self.eqMech(t, y[:3], Adrive, Fdrive, y[3], phi) - dydt_elec = self.eqHH(t, y[3:], ch_mech, self.Capct(y[1])) + dydt_elec = self.eqHH(t, y[3:], neuron, self.Capct(y[1])) # return concatenated output return dydt_mech + dydt_elec - def eqHHeff(self, t, y, ch_mech, interp_data): + def eqHHeff(self, t, y, neuron, interp_data): """ Compute the derivatives of the n-ODE effective HH system variables, based on 1-dimensional linear interpolation of "effective" coefficients that summarize the system's behaviour over an acoustic cycle. :param t: specific instant in time (s) :param y: vector of HH system variables at time t - :param ch_mech: channels mechanism object - :param channels: Channel object to compute a specific electrical membrane dynamics + :param neuron: neuron object :param interp_data: dictionary of 1D data points of "effective" coefficients over the charge domain, for specific frequency and amplitude values. :return: vector of effective system derivatives at time t """ # Split input vector explicitly Qm, *states = y # Compute charge and channel states variation Vm = np.interp(Qm, interp_data['Q'], interp_data['V']) # mV - dQmdt = - ch_mech.currNet(Vm, states) * 1e-3 - dstates = ch_mech.derStatesEff(Qm, states, interp_data) + dQmdt = - neuron.currNet(Vm, states) * 1e-3 + dstates = neuron.derStatesEff(Qm, states, interp_data) # Return derivatives vector return [dQmdt, *dstates] - def getEffCoeffs(self, ch_mech, Fdrive, Adrive, Qm, phi=np.pi): + def getEffCoeffs(self, neuron, Fdrive, Adrive, Qm, phi=np.pi): """ Compute "effective" coefficients of the HH system for a specific combination of stimulus frequency, stimulus amplitude and charge density. A short mechanical simulation is run while imposing the specific charge density, until periodic stabilization. The HH coefficients are then averaged over the last acoustic cycle to yield "effective" coefficients. - :param ch_mech: channels mechanism object + :param neuron: neuron object :param Fdrive: acoustic drive frequency (Hz) :param Adrive: acoustic drive amplitude (Pa) :param Qm: imposed charge density (C/m2) :param phi: acoustic drive phase (rad) :return: tuple with the effective potential, gas content and channel rates """ # Run simulation and retrieve deflection and gas content vectors from last cycle (_, y, _) = self.runMech(Fdrive, Adrive, Qm, phi) (Z, ng) = y Z_last = Z[-NPC_FULL:] # m # Compute membrane potential vector Vm = np.array([Qm / self.Capct(ZZ) * 1e3 for ZZ in Z_last]) # mV # Compute average cycle value for membrane potential and rate constants Vm_eff = np.mean(Vm) # mV - rates_eff = ch_mech.getEffRates(Vm) + rates_eff = neuron.getEffRates(Vm) # Take final cycle value for gas content ng_eff = ng[-1] # mole return (Vm_eff, ng_eff, *rates_eff) - def createLookup(self, ch_mech, freqs, amps, charges, phi=np.pi): + def createLookup(self, neuron, freqs, amps, charges, phi=np.pi): """ Run simulations of the mechanical system for a multiple combinations of imposed charge densities and acoustic amplitudes, compute effective coefficients and store them as 2D arrays in a lookup file. - :param ch_mech: channels mechanism object + :param neuron: neuron object :param freqs: array of acoustic drive frequencies (Hz) :param amps: array of acoustic drive amplitudes (Pa) :param charges: array of charge densities (C/m2) :param phi: acoustic drive phase (rad) """ # Check validity of stimulation parameters assert freqs.min() > 0, 'Driving frequencies must be strictly positive' assert amps.min() >= 0, 'Acoustic pressure amplitudes must be positive' - logger.info('Creating lookup table for %s neuron', ch_mech.name) + logger.info('Creating lookup table for %s neuron', neuron.name) # Initialize lookup dictionary of 3D array to store effective coefficients nf = freqs.size nA = amps.size nQ = charges.size - coeffs_names = ['V', 'ng', *ch_mech.coeff_names] + coeffs_names = ['V', 'ng', *neuron.coeff_names] ncoeffs = len(coeffs_names) lookup_dict = {cn: np.empty((nf, nA, nQ)) for cn in coeffs_names} # Loop through all (f, A, Q) combinations nsims = nf * nA * nQ isim = 0 log_str = 'short simulation %u/%u (f = %.2f kHz, A = %.2f kPa, Q = %.2f nC/cm2)' for i in range(nf): for j in range(nA): for k in range(nQ): isim += 1 # Run short simulation and store effective coefficients logger.info(log_str, isim, nsims, freqs[i] * 1e-3, amps[j] * 1e-3, charges[k] * 1e5) - sim_coeffs = self.getEffCoeffs(ch_mech, freqs[i], amps[j], charges[k], phi) + sim_coeffs = self.getEffCoeffs(neuron, freqs[i], amps[j], charges[k], phi) for icoeff in range(ncoeffs): lookup_dict[coeffs_names[icoeff]][i, j, k] = sim_coeffs[icoeff] # Add input frequency, amplitude and charge arrays to lookup dictionary lookup_dict['f'] = freqs # Hz lookup_dict['A'] = amps # Pa lookup_dict['Q'] = charges # C/m2 # Save dictionary in lookup file - lookup_file = '{}_lookups_a{:.1f}nm.pkl'.format(ch_mech.name, self.a * 1e9) - logger.info('Saving %s neuron lookup table in file: "%s"', ch_mech.name, lookup_file) + lookup_file = '{}_lookups_a{:.1f}nm.pkl'.format(neuron.name, self.a * 1e9) + logger.info('Saving %s neuron lookup table in file: "%s"', neuron.name, lookup_file) lookup_filepath = '{0}/{1}'.format(getLookupDir(), lookup_file) with open(lookup_filepath, 'wb') as fh: pickle.dump(lookup_dict, fh) - def __runClassic(self, ch_mech, Fdrive, Adrive, tstim, toffset, PRF, DC, phi=np.pi): + def __runClassic(self, neuron, Fdrive, Adrive, tstim, toffset, PRF, DC, phi=np.pi): """ Compute solutions of the system for a specific set of US stimulation parameters, using a classic integration scheme. The first iteration uses the quasi-steady simplification to compute the initiation of motion from a flat leaflet configuration. Afterwards, the ODE system is solved iteratively until completion. - :param ch_mech: channels mechanism object + :param neuron: neuron object :param Fdrive: acoustic drive frequency (Hz) :param Adrive: acoustic drive amplitude (Pa) :param tstim: duration of US stimulation (s) :param toffset: duration of the offset (s) :param PRF: pulse repetition frequency (Hz) :param DC: pulse duty cycle (-) :param phi: acoustic drive phase (rad) :return: 3-tuple with the time profile, the effective solution matrix and a state vector """ # Raise warnings as error warnings.filterwarnings('error') # Initialize system solver solver_full = integrate.ode(self.eqFull) solver_full.set_integrator('lsoda', nsteps=SOLVER_NSTEPS, ixpr=True) # Determine system time step Tdrive = 1 / Fdrive dt = Tdrive / NPC_FULL # if CW stimulus: divide integration during stimulus into 100 intervals if DC == 1.0: PRF = 100 / tstim # Compute vector sizes npulses = int(np.round(PRF * tstim)) Tpulse_on = DC / PRF Tpulse_off = (1 - DC) / PRF n_pulse_on = int(np.round(Tpulse_on / dt)) n_pulse_off = int(np.round(Tpulse_off / dt)) n_off = int(np.round(toffset / dt)) # Solve quasi-steady equation to compute first deflection value Z0 = 0.0 ng0 = self.ng0 Qm0 = self.Qm0 Pac1 = self.Pacoustic(dt, Adrive, Fdrive, phi) Z1 = self.balancedefQS(ng0, Qm0, Pac1) # Initialize global arrays states = np.array([1, 1]) t = np.array([0., dt]) y_membrane = np.array([[0., (Z1 - Z0) / dt], [Z0, Z1], [ng0, ng0], [Qm0, Qm0]]) - y_channels = np.tile(ch_mech.states0, (2, 1)).T + y_channels = np.tile(neuron.states0, (2, 1)).T y = np.vstack((y_membrane, y_channels)) nvar = y.shape[0] # Initialize pulse time and states vectors t_pulse0 = np.linspace(0, Tpulse_on + Tpulse_off, n_pulse_on + n_pulse_off) states_pulse = np.concatenate((np.ones(n_pulse_on), np.zeros(n_pulse_off))) # Initialize progress bar if logger.getEffectiveLevel() == logging.DEBUG: widgets = ['Running: ', pb.Percentage(), ' ', pb.Bar(), ' ', pb.ETA()] pbar = pb.ProgressBar(widgets=widgets, max_value=int(npulses * (toffset + tstim) / tstim)) pbar.start() # Loop through all pulse (ON and OFF) intervals for i in range(npulses): # Construct and initialize arrays t_pulse = t_pulse0 + t[-1] y_pulse = np.empty((nvar, n_pulse_on + n_pulse_off)) y_pulse[:, 0] = y[:, -1] # Initialize iterator k = 0 # Integrate ON system - solver_full.set_f_params(ch_mech, Adrive, Fdrive, phi) + solver_full.set_f_params(neuron, Adrive, Fdrive, phi) solver_full.set_initial_value(y_pulse[:, k], t_pulse[k]) while solver_full.successful() and k < n_pulse_on - 1: k += 1 solver_full.integrate(t_pulse[k]) y_pulse[:, k] = solver_full.y # Integrate OFF system - solver_full.set_f_params(ch_mech, 0.0, 0.0, 0.0) + solver_full.set_f_params(neuron, 0.0, 0.0, 0.0) solver_full.set_initial_value(y_pulse[:, k], t_pulse[k]) while solver_full.successful() and k < n_pulse_on + n_pulse_off - 1: k += 1 solver_full.integrate(t_pulse[k]) y_pulse[:, k] = solver_full.y # Append pulse arrays to global arrays states = np.concatenate([states, states_pulse[1:]]) t = np.concatenate([t, t_pulse[1:]]) y = np.concatenate([y, y_pulse[:, 1:]], axis=1) # Update progress bar if logger.getEffectiveLevel() == logging.DEBUG: pbar.update(i) # Integrate offset interval if n_off > 0: t_off = np.linspace(0, toffset, n_off) + t[-1] states_off = np.zeros(n_off) y_off = np.empty((nvar, n_off)) y_off[:, 0] = y[:, -1] solver_full.set_initial_value(y_off[:, 0], t_off[0]) - solver_full.set_f_params(ch_mech, 0.0, 0.0, 0.0) + solver_full.set_f_params(neuron, 0.0, 0.0, 0.0) k = 0 while solver_full.successful() and k < n_off - 1: k += 1 solver_full.integrate(t_off[k]) y_off[:, k] = solver_full.y # Concatenate offset arrays to global arrays states = np.concatenate([states, states_off[1:]]) t = np.concatenate([t, t_off[1:]]) y = np.concatenate([y, y_off[:, 1:]], axis=1) # Terminate progress bar if logger.getEffectiveLevel() == logging.DEBUG: pbar.finish() # Downsample arrays in time-domain to reduce overall size t = t[::CLASSIC_DS_FACTOR] y = y[:, ::CLASSIC_DS_FACTOR] states = states[::CLASSIC_DS_FACTOR] # Return output variables return (t, y[1:, :], states) - def __runEffective(self, ch_mech, Fdrive, Adrive, tstim, toffset, PRF, DC, dt=DT_EFF): + def __runEffective(self, neuron, Fdrive, Adrive, tstim, toffset, PRF, DC, dt=DT_EFF): """ Compute solutions of the system for a specific set of US stimulation parameters, using charge-predicted "effective" coefficients to solve the HH equations at each step. - :param ch_mech: channels mechanism object + :param neuron: neuron object :param Fdrive: acoustic drive frequency (Hz) :param Adrive: acoustic drive amplitude (Pa) :param tstim: duration of US stimulation (s) :param toffset: duration of the offset (s) :param PRF: pulse repetition frequency (Hz) :param DC: pulse duty cycle (-) :param dt: integration time step (s) :return: 3-tuple with the time profile, the effective solution matrix and a state vector """ # Raise warnings as error warnings.filterwarnings('error') # Check lookup file existence - lookup_file = '{}_lookups_a{:.1f}nm.pkl'.format(ch_mech.name, self.a * 1e9) + lookup_file = '{}_lookups_a{:.1f}nm.pkl'.format(neuron.name, self.a * 1e9) lookup_path = '{}/{}'.format(getLookupDir(), lookup_file) assert os.path.isfile(lookup_path), ('No lookup file available for {} ' - 'neuron type').format(ch_mech.name) + 'neuron type').format(neuron.name) # Load coefficients with open(lookup_path, 'rb') as fh: lookup_dict = pickle.load(fh) # Retrieve 1D inputs from lookup dictionary freqs = lookup_dict['f'] amps = lookup_dict['A'] charges = lookup_dict['Q'] # Check that stimulation parameters are within lookup range margin = 1e-9 # adding margin to compensate for eventual round error frange = (freqs.min() - margin, freqs.max() + margin) Arange = (amps.min() - margin, amps.max() + margin) assert frange[0] <= Fdrive <= frange[1], \ 'Fdrive must be within [{:.1f}, {:.1f}] kHz'.format(*[f * 1e-3 for f in frange]) assert Arange[0] <= Adrive <= Arange[1], \ 'Adrive must be within [{:.1f}, {:.1f}] kPa'.format(*[A * 1e-3 for A in Arange]) # Define interpolation datasets to be projected - coeffs_list = ['V', 'ng', *ch_mech.coeff_names] + coeffs_list = ['V', 'ng', *neuron.coeff_names] # If Fdrive in lookup frequencies, simply project (A, Q) interpolation dataset # at Fdrive index onto 1D charge-based interpolation dataset if Fdrive in freqs: iFdrive = np.searchsorted(freqs, Fdrive) logger.debug('Using lookups directly at %.2f kHz', freqs[iFdrive] * 1e-3) coeffs1d = {} for cn in coeffs_list: coeff2d = np.squeeze(lookup_dict[cn][iFdrive, :, :]) itrp = interp2d(amps, charges, coeff2d.T) coeffs1d[cn] = itrp(Adrive, charges) if cn == 'ng': coeffs1d['ng0'] = itrp(0.0, charges) # Otherwise, project 2 (A, Q) interpolation datasets at Fdrive bounding values # indexes in lookup frequencies onto two 1D charge-based interpolation datasets, and # interpolate between them afterwards else: ilb = np.searchsorted(freqs, Fdrive) - 1 logger.debug('Interpolating lookups between %.2f kHz and %.2f kHz', freqs[ilb] * 1e-3, freqs[ilb + 1] * 1e-3) coeffs1d = {} for cn in coeffs_list: coeffs1d_bounds = [] ng0_bounds = [] for iFdrive in [ilb, ilb + 1]: coeff2d = np.squeeze(lookup_dict[cn][iFdrive, :, :]) itrp = interp2d(amps, charges, coeff2d.T) coeffs1d_bounds.append(itrp(Adrive, charges)) if cn == 'ng': ng0_bounds.append(itrp(0.0, charges)) coeffs1d_bounds = np.squeeze(np.array([coeffs1d_bounds])) itrp = interp2d(freqs[ilb:ilb + 2], charges, coeffs1d_bounds.T) coeffs1d[cn] = itrp(Fdrive, charges) if cn == 'ng': ng0_bounds = np.squeeze(np.array([ng0_bounds])) itrp = interp2d(freqs[ilb:ilb + 2], charges, ng0_bounds.T) coeffs1d['ng0'] = itrp(Fdrive, charges) # Squeeze interpolated vectors extra dimensions and add input charges vector coeffs1d = {key: np.squeeze(value) for key, value in coeffs1d.items()} coeffs1d['Q'] = charges # Initialize system solvers solver_on = integrate.ode(self.eqHHeff) solver_on.set_integrator('lsoda', nsteps=SOLVER_NSTEPS) - solver_on.set_f_params(ch_mech, coeffs1d) + solver_on.set_f_params(neuron, coeffs1d) solver_off = integrate.ode(self.eqHH) solver_off.set_integrator('lsoda', nsteps=SOLVER_NSTEPS) # if CW stimulus: change PRF to have exactly one integration interval during stimulus if DC == 1.0: PRF = 1 / tstim # Compute vector sizes npulses = int(np.round(PRF * tstim)) Tpulse_on = DC / PRF Tpulse_off = (1 - DC) / PRF n_pulse_on = int(np.round(Tpulse_on / dt)) + 1 n_pulse_off = int(np.round(Tpulse_off / dt)) # For high-PRF pulsed protocols: adapt time step if greater than TON or TOFF dt_warning_msg = 'high-PRF protocol: lowering integration time step to %.2e ms to match %s' if Tpulse_on > 0 and n_pulse_on == 0: logger.warning(dt_warning_msg, Tpulse_on * 1e3, 'TON') dt = Tpulse_on n_pulse_on = int(np.round(Tpulse_on / dt)) n_pulse_off = int(np.round(Tpulse_off / dt)) if Tpulse_off > 0 and n_pulse_off == 0: logger.warning(dt_warning_msg, Tpulse_off * 1e3, 'TOFF') dt = Tpulse_off n_pulse_on = int(np.round(Tpulse_on / dt)) n_pulse_off = int(np.round(Tpulse_off / dt)) # Compute ofset size n_off = int(np.round(toffset / dt)) # Initialize global arrays states = np.array([1]) t = np.array([0.0]) - y = np.atleast_2d(np.insert(ch_mech.states0, 0, self.Qm0)).T + y = np.atleast_2d(np.insert(neuron.states0, 0, self.Qm0)).T nvar = y.shape[0] Zeff = np.array([0.0]) ngeff = np.array([self.ng0]) # Initializing accurate pulse time vector t_pulse_on = np.linspace(0, Tpulse_on, n_pulse_on) t_pulse_off = np.linspace(dt, Tpulse_off, n_pulse_off) + Tpulse_on t_pulse0 = np.concatenate([t_pulse_on, t_pulse_off]) states_pulse = np.concatenate((np.ones(n_pulse_on), np.zeros(n_pulse_off))) # Loop through all pulse (ON and OFF) intervals for i in range(npulses): # Construct and initialize arrays t_pulse = t_pulse0 + t[-1] y_pulse = np.empty((nvar, n_pulse_on + n_pulse_off)) ngeff_pulse = np.empty(n_pulse_on + n_pulse_off) Zeff_pulse = np.empty(n_pulse_on + n_pulse_off) y_pulse[:, 0] = y[:, -1] ngeff_pulse[0] = ngeff[-1] Zeff_pulse[0] = Zeff[-1] # Initialize iterator k = 0 # Integrate ON system solver_on.set_initial_value(y_pulse[:, k], t_pulse[k]) while solver_on.successful() and k < n_pulse_on - 1: k += 1 solver_on.integrate(t_pulse[k]) y_pulse[:, k] = solver_on.y ngeff_pulse[k] = np.interp(y_pulse[0, k], coeffs1d['Q'], coeffs1d['ng']) # mole Zeff_pulse[k] = self.balancedefQS(ngeff_pulse[k], y_pulse[0, k]) # m # Integrate OFF system solver_off.set_initial_value(y_pulse[:, k], t_pulse[k]) - solver_off.set_f_params(ch_mech, self.Capct(Zeff_pulse[k])) + solver_off.set_f_params(neuron, self.Capct(Zeff_pulse[k])) while solver_off.successful() and k < n_pulse_on + n_pulse_off - 1: k += 1 solver_off.integrate(t_pulse[k]) y_pulse[:, k] = solver_off.y ngeff_pulse[k] = np.interp(y_pulse[0, k], coeffs1d['Q'], coeffs1d['ng0']) # mole Zeff_pulse[k] = self.balancedefQS(ngeff_pulse[k], y_pulse[0, k]) # m - solver_off.set_f_params(ch_mech, self.Capct(Zeff_pulse[k])) + solver_off.set_f_params(neuron, self.Capct(Zeff_pulse[k])) # Append pulse arrays to global arrays states = np.concatenate([states[:-1], states_pulse]) t = np.concatenate([t, t_pulse[1:]]) y = np.concatenate([y, y_pulse[:, 1:]], axis=1) Zeff = np.concatenate([Zeff, Zeff_pulse[1:]]) ngeff = np.concatenate([ngeff, ngeff_pulse[1:]]) # Integrate offset interval if n_off > 0: t_off = np.linspace(0, toffset, n_off) + t[-1] states_off = np.zeros(n_off) y_off = np.empty((nvar, n_off)) ngeff_off = np.empty(n_off) Zeff_off = np.empty(n_off) y_off[:, 0] = y[:, -1] ngeff_off[0] = ngeff[-1] Zeff_off[0] = Zeff[-1] solver_off.set_initial_value(y_off[:, 0], t_off[0]) - solver_off.set_f_params(ch_mech, self.Capct(Zeff_pulse[k])) + solver_off.set_f_params(neuron, self.Capct(Zeff_pulse[k])) k = 0 while solver_off.successful() and k < n_off - 1: k += 1 solver_off.integrate(t_off[k]) y_off[:, k] = solver_off.y ngeff_off[k] = np.interp(y_off[0, k], coeffs1d['Q'], coeffs1d['ng0']) # mole Zeff_off[k] = self.balancedefQS(ngeff_off[k], y_off[0, k]) # m - solver_off.set_f_params(ch_mech, self.Capct(Zeff_off[k])) + solver_off.set_f_params(neuron, self.Capct(Zeff_off[k])) # Concatenate offset arrays to global arrays states = np.concatenate([states, states_off[1:]]) t = np.concatenate([t, t_off[1:]]) y = np.concatenate([y, y_off[:, 1:]], axis=1) Zeff = np.concatenate([Zeff, Zeff_off[1:]]) ngeff = np.concatenate([ngeff, ngeff_off[1:]]) # Add Zeff and ngeff to solution matrix y = np.vstack([Zeff, ngeff, y]) # return output variables return (t, y, states) - def __runHybrid(self, ch_mech, Fdrive, Adrive, tstim, toffset, phi=np.pi): + def __runHybrid(self, neuron, Fdrive, Adrive, tstim, toffset, phi=np.pi): """ Compute solutions of the system for a specific set of US stimulation parameters, using a hybrid integration scheme. The first iteration uses the quasi-steady simplification to compute the initiation of motion from a flat leaflet configuration. Afterwards, the NBLS ODE system is solved iteratively for "slices" of N-microseconds, in a 2-steps scheme: - First, the full (n+3) ODE system is integrated for a few acoustic cycles until Z and ng reach a stable periodic solution (limit cycle) - Second, the signals of the 3 mechanical variables over the last acoustic period are selected and resampled to a far lower sampling rate - Third, the HH n-ODE system is integrated for the remaining time of the slice, using periodic expansion of the mechanical signals to precompute the values of capacitance. - :param ch_mech: channels mechanism object + :param neuron: neuron object :param Fdrive: acoustic drive frequency (Hz) :param Adrive: acoustic drive amplitude (Pa) :param tstim: duration of US stimulation (s) :param toffset: duration of the offset (s) :param phi: acoustic drive phase (rad) :return: 3-tuple with the time profile, the solution matrix and a state vector .. warning:: This method cannot handle pulsed stimuli """ # Raise warnings as error warnings.filterwarnings('error') # Initialize full and HH systems solvers solver_full = integrate.ode(self.eqFull) - solver_full.set_f_params(ch_mech, Adrive, Fdrive, phi) + solver_full.set_f_params(neuron, Adrive, Fdrive, phi) solver_full.set_integrator('lsoda', nsteps=SOLVER_NSTEPS) solver_hh = integrate.ode(self.eqHH) solver_hh.set_integrator('dop853', nsteps=SOLVER_NSTEPS, atol=1e-12) # Determine full and HH systems time steps Tdrive = 1 / Fdrive dt_full = Tdrive / NPC_FULL dt_hh = Tdrive / NPC_HH n_full_per_hh = int(NPC_FULL / NPC_HH) t_full_cycle = np.linspace(0, Tdrive - dt_full, NPC_FULL) t_hh_cycle = np.linspace(0, Tdrive - dt_hh, NPC_HH) # Determine number of samples in prediction vectors npc_pred = NPC_FULL - n_full_per_hh + 1 # Solve quasi-steady equation to compute first deflection value Z0 = 0.0 ng0 = self.ng0 Qm0 = self.Qm0 Pac1 = self.Pacoustic(dt_full, Adrive, Fdrive, phi) Z1 = self.balancedefQS(ng0, Qm0, Pac1) # Initialize global arrays states = np.array([1, 1]) t = np.array([0., dt_full]) y_membrane = np.array([[0., (Z1 - Z0) / dt_full], [Z0, Z1], [ng0, ng0], [Qm0, Qm0]]) - y_channels = np.tile(ch_mech.states0, (2, 1)).T + y_channels = np.tile(neuron.states0, (2, 1)).T y = np.vstack((y_membrane, y_channels)) nvar = y.shape[0] # Initialize progress bar if logger.getEffectiveLevel() == logging.DEBUG: widgets = ['Running: ', pb.Percentage(), ' ', pb.Bar(), ' ', pb.ETA()] pbar = pb.ProgressBar(widgets=widgets, max_value=1000) pbar.start() # For each hybrid integration interval irep = 0 sim_error = False while not sim_error and t[-1] < tstim + toffset: # Integrate full system for a few acoustic cycles until stabilization periodic_conv = False j = 0 ng_last = None Z_last = None while not sim_error and not periodic_conv: if t[-1] > tstim: - solver_full.set_f_params(ch_mech, 0.0, 0.0, 0.0) + solver_full.set_f_params(neuron, 0.0, 0.0, 0.0) t_full = t_full_cycle + t[-1] + dt_full y_full = np.empty((nvar, NPC_FULL)) y0_full = y[:, -1] solver_full.set_initial_value(y0_full, t[-1]) k = 0 try: # try to integrate and catch errors/warnings while solver_full.successful() and k <= NPC_FULL - 1: solver_full.integrate(t_full[k]) y_full[:, k] = solver_full.y k += 1 except (Warning, AssertionError) as inst: sim_error = True logger.error('Full system integration error at step %u', k) logger.error(inst) # Compare Z and ng signals over the last 2 acoustic periods if j > 0 and rmse(Z_last, y_full[1, :]) < Z_ERR_MAX \ and rmse(ng_last, y_full[2, :]) < NG_ERR_MAX: periodic_conv = True # Update last vectors for next comparison Z_last = y_full[1, :] ng_last = y_full[2, :] # Concatenate time and solutions to global vectors states = np.concatenate([states, np.ones(NPC_FULL)], axis=0) t = np.concatenate([t, t_full], axis=0) y = np.concatenate([y, y_full], axis=1) # Increment loop index j += 1 # Retrieve last period of the 3 mechanical variables to propagate in HH system t_last = t[-npc_pred:] mech_last = y[0:3, -npc_pred:] # Downsample signals to specified HH system time step (_, mech_pred) = DownSample(t_last, mech_last, NPC_HH) # Integrate HH system until certain dQ or dT is reached Q0 = y[3, -1] dQ = 0.0 t0_interval = t[-1] dt_interval = 0.0 j = 0 if t[-1] < tstim: tlim = tstim else: tlim = tstim + toffset while (not sim_error and t[-1] < tlim and (np.abs(dQ) < DQ_UPDATE or dt_interval < DT_UPDATE)): t_hh = t_hh_cycle + t[-1] + dt_hh y_hh = np.empty((nvar - 3, NPC_HH)) y0_hh = y[3:, -1] solver_hh.set_initial_value(y0_hh, t[-1]) k = 0 try: # try to integrate and catch errors/warnings while solver_hh.successful() and k <= NPC_HH - 1: - solver_hh.set_f_params(ch_mech, self.Capct(mech_pred[1, k])) + solver_hh.set_f_params(neuron, self.Capct(mech_pred[1, k])) solver_hh.integrate(t_hh[k]) y_hh[:, k] = solver_hh.y k += 1 except (Warning, AssertionError) as inst: sim_error = True logger.error('HH system integration error at step %u', k) logger.error(inst) # Concatenate time and solutions to global vectors states = np.concatenate([states, np.zeros(NPC_HH)], axis=0) t = np.concatenate([t, t_hh], axis=0) y = np.concatenate([y, np.concatenate([mech_pred, y_hh], axis=0)], axis=1) # Compute charge variation from interval beginning dQ = y[3, -1] - Q0 dt_interval = t[-1] - t0_interval # Increment loop index j += 1 # Update progress bar if logger.getEffectiveLevel() == logging.DEBUG: pbar.update(int(1000 * (t[-1] / (tstim + toffset)))) irep += 1 # Terminate progress bar if logger.getEffectiveLevel() == logging.DEBUG: pbar.finish() # Return output return (t, y[1:, :], states) - def run(self, ch_mech, Fdrive, Adrive, tstim, toffset, PRF=None, DC=1.0, + def run(self, neuron, Fdrive, Adrive, tstim, toffset, PRF=None, DC=1.0, sim_type='effective'): """ Run simulation of the system for a specific set of US stimulation parameters. - :param ch_mech: channels mechanism object + :param neuron: neuron object :param Fdrive: acoustic drive frequency (Hz) :param Adrive: acoustic drive amplitude (Pa) :param tstim: duration of US stimulation (s) :param toffset: duration of the offset (s) :param PRF: pulse repetition frequency (Hz) :param DC: pulse duty cycle (-) :param sim_type: selected integration method :return: 3-tuple with the time profile, the solution matrix and a state vector """ # # Check validity of simulation type sim_types = ('classic, effective, hybrid') assert sim_type in sim_types, 'Allowed simulation types are {}'.format(sim_types) # Check validity of stimulation parameters - assert isinstance(ch_mech, BaseMech), ('channel mechanism must be inherited ' - 'from the BaseMech class') + assert isinstance(neuron, BaseMech), ('neuron mechanism must be inherited ' + 'from the BaseMech class') for param in [Fdrive, Adrive, tstim, toffset, DC]: assert isinstance(param, float), 'stimulation parameters must be float typed' assert Fdrive > 0, 'Driving frequency must be strictly positive' assert Adrive >= 0, 'Acoustic pressure amplitude must be positive' assert tstim > 0, 'Stimulus duration must be strictly positive' assert toffset >= 0, 'Stimulus offset must be positive or null' assert DC > 0 and DC <= 1, 'Duty cycle must be within [0; 1)' if DC < 1.0: assert isinstance(PRF, float), 'if provided, the PRF parameter must be float typed' assert PRF is not None, 'PRF must be provided when using duty cycles smaller than 1' assert PRF >= 1 / tstim, 'PR interval must be smaller than stimulus duration' assert PRF < Fdrive, 'PRF must be smaller than driving frequency' # Call appropriate simulation function if sim_type == 'classic': - return self.__runClassic(ch_mech, Fdrive, Adrive, tstim, toffset, PRF, DC) + return self.__runClassic(neuron, Fdrive, Adrive, tstim, toffset, PRF, DC) elif sim_type == 'effective': - return self.__runEffective(ch_mech, Fdrive, Adrive, tstim, toffset, PRF, DC) + return self.__runEffective(neuron, Fdrive, Adrive, tstim, toffset, PRF, DC) elif sim_type == 'hybrid': assert DC == 1.0, 'Hybrid method can only handle continuous wave stimuli' - return self.__runHybrid(ch_mech, Fdrive, Adrive, tstim, toffset) + return self.__runHybrid(neuron, Fdrive, Adrive, tstim, toffset) diff --git a/PointNICE/solvers/simutils.py b/PointNICE/solvers/simutils.py index 6969970..da01eec 100644 --- a/PointNICE/solvers/simutils.py +++ b/PointNICE/solvers/simutils.py @@ -1,1165 +1,1165 @@ # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Date: 2017-08-22 14:33:04 # @Last Modified by: Theo Lemaire -# @Last Modified time: 2018-03-13 12:48:28 +# @Last Modified time: 2018-03-13 14:32:07 """ Utility functions used in simulations """ import os import time import logging import pickle import shutil import tkinter as tk from tkinter import filedialog import numpy as np from openpyxl import load_workbook import lockfile import pandas as pd from ..bls import BilayerSonophore from .SolverUS import SolverUS from .SolverElec import SolverElec from ..constants import * from ..utils import getNeuronsDict # Get package logger logger = logging.getLogger('PointNICE') # Naming nomenclature for output files MECH_code = 'MECH_{:.0f}nm_{:.0f}kHz_{:.0f}kPa_{:.1f}nCcm2' ESTIM_CW_code = 'ESTIM_{}_CW_{:.1f}mA_per_m2_{:.0f}ms' ESTIM_PW_code = 'ESTIM_{}_PW_{:.1f}mA_per_m2_{:.0f}ms_PRF{:.2f}kHz_DC{:.2f}' ASTIM_CW_code = 'ASTIM_{}_CW_{:.0f}nm_{:.0f}kHz_{:.0f}kPa_{:.0f}ms_{}' ASTIM_PW_code = 'ASTIM_{}_PW_{:.0f}nm_{:.0f}kHz_{:.0f}kPa_{:.0f}ms_PRF{:.2f}kHz_DC{:.2f}_{}' # Parameters units ASTIM_params = { 'f': {'index': 0, 'factor': 1e-3, 'unit': 'kHz'}, 'A': {'index': 1, 'factor': 1e-3, 'unit': 'kPa'}, 't': {'index': 2, 'factor': 1e3, 'unit': 'ms'}, 'PRF': {'index': 4, 'factor': 1e-3, 'unit': 'kHz'}, 'DC': {'index': 5, 'factor': 1e2, 'unit': '%'} } ESTIM_params = { 'A': {'index': 0, 'factor': 1e0, 'unit': 'mA/m2'}, 't': {'index': 1, 'factor': 1e3, 'unit': 'ms'}, 'PRF': {'index': 3, 'factor': 1e-3, 'unit': 'kHz'}, 'DC': {'index': 4, 'factor': 1e2, 'unit': '%'} } # Default geometry default_diam = 32e-9 default_embedding = 0.0e-6 def setBatchDir(): ''' Select batch directory for output files.α :return: full path to batch directory ''' root = tk.Tk() root.withdraw() batch_dir = filedialog.askdirectory() assert batch_dir, 'No batch directory chosen' return batch_dir def checkBatchLog(batch_dir, batch_type): ''' Check for appropriate log file in batch directory, and create one if it is absent. :param batch_dir: full path to batch directory :param batch_type: type of simulation batch :return: 2 tuple with full path to log file and boolean stating if log file was created ''' # Determine log template from batch type if batch_type == 'MECH': logfile = 'log_MECH.xlsx' elif batch_type == 'A-STIM': logfile = 'log_ASTIM.xlsx' elif batch_type == 'E-STIM': logfile = 'log_ESTIM.xlsx' else: raise ValueError('Unknown batch type', batch_type) # Get template in package subdirectory this_dir, _ = os.path.split(__file__) parent_dir = os.path.abspath(os.path.join(this_dir, os.pardir)) logsrc = parent_dir + '/templates/' + logfile assert os.path.isfile(logsrc), 'template log file "{}" not found'.format(logsrc) # Copy template in batch directory if no appropriate log file logdst = batch_dir + '/' + logfile is_log = os.path.isfile(logdst) if not is_log: shutil.copy2(logsrc, logdst) return (logdst, not is_log) def createSimQueue(amps, durations, offsets, PRFs, DCs): ''' Create a serialized 2D array of all parameter combinations for a series of individual parameter sweeps, while avoiding repetition of CW protocols for a given PRF sweep. :param amps: list (or 1D-array) of acoustic amplitudes :param durations: list (or 1D-array) of stimulus durations :param offsets: list (or 1D-array) of stimulus offsets (paired with durations array) :param PRFs: list (or 1D-array) of pulse-repetition frequencies :param DCs: list (or 1D-array) of duty cycle values :return: 2D-array with (amplitude, duration, offset, PRF, DC) for each stimulation protocol ''' # Convert input to 1D-arrays amps = np.array(amps) durations = np.array(durations) offsets = np.array(offsets) PRFs = np.array(PRFs) DCs = np.array(DCs) # Create index arrays iamps = range(len(amps)) idurs = range(len(durations)) # Create empty output matrix queue = np.empty((1, 5)) # Continuous protocols if 1.0 in DCs: nCW = len(amps) * len(durations) arr1 = np.ones(nCW) iCW_queue = np.array(np.meshgrid(iamps, idurs)).T.reshape(nCW, 2) CW_queue = np.vstack((amps[iCW_queue[:, 0]], durations[iCW_queue[:, 1]], offsets[iCW_queue[:, 1]], PRFs.min() * arr1, arr1)).T queue = np.vstack((queue, CW_queue)) # Pulsed protocols if np.any(DCs != 1.0): pulsed_DCs = DCs[DCs != 1.0] iPRFs = range(len(PRFs)) ipulsed_DCs = range(len(pulsed_DCs)) nPW = len(amps) * len(durations) * len(PRFs) * len(pulsed_DCs) iPW_queue = np.array(np.meshgrid(iamps, idurs, iPRFs, ipulsed_DCs)).T.reshape(nPW, 4) PW_queue = np.vstack((amps[iPW_queue[:, 0]], durations[iPW_queue[:, 1]], offsets[iPW_queue[:, 1]], PRFs[iPW_queue[:, 2]], pulsed_DCs[iPW_queue[:, 3]])).T queue = np.vstack((queue, PW_queue)) # Return return queue[1:, :] def xlslog(filename, sheetname, data): """ Append log data on a new row to specific sheet of excel workbook, using a lockfile to avoid read/write errors between concurrent processes. :param filename: absolute or relative path to the Excel workbook :param sheetname: name of the Excel spreadsheet to which data is appended :param data: data structure to be added to specific columns on a new row :return: boolean indicating success (1) or failure (0) of operation """ try: lock = lockfile.FileLock(filename) lock.acquire() wb = load_workbook(filename) ws = wb[sheetname] keys = data.keys() i = 1 row_data = {} for k in keys: row_data[k] = data[k] i += 1 ws.append(row_data) wb.save(filename) lock.release() return 1 except PermissionError: # If file cannot be accessed for writing because already opened logger.warning('Cannot write to "%s". Close the file and type "Y"', filename) user_str = input() if user_str in ['y', 'Y']: return xlslog(filename, sheetname, data) else: return 0 def detectPeaks(x, mph=None, mpd=1, threshold=0, edge='rising', kpsh=False, valley=False, ax=None): """ Detect peaks in data based on their amplitude and inter-peak distance. """ x = np.atleast_1d(x).astype('float64') if x.size < 3: return np.array([], dtype=int) if valley: x = -x # find indices of all peaks dx = x[1:] - x[:-1] # handle NaN's indnan = np.where(np.isnan(x))[0] if indnan.size: x[indnan] = np.inf dx[np.where(np.isnan(dx))[0]] = np.inf ine, ire, ife = np.array([[], [], []], dtype=int) if not edge: ine = np.where((np.hstack((dx, 0)) < 0) & (np.hstack((0, dx)) > 0))[0] else: if edge.lower() in ['rising', 'both']: ire = np.where((np.hstack((dx, 0)) <= 0) & (np.hstack((0, dx)) > 0))[0] if edge.lower() in ['falling', 'both']: ife = np.where((np.hstack((dx, 0)) < 0) & (np.hstack((0, dx)) >= 0))[0] ind = np.unique(np.hstack((ine, ire, ife))) # handle NaN's if ind.size and indnan.size: # NaN's and values close to NaN's cannot be peaks ind = ind[np.in1d(ind, np.unique(np.hstack((indnan, indnan - 1, indnan + 1))), invert=True)] # first and last values of x cannot be peaks if ind.size and ind[0] == 0: ind = ind[1:] if ind.size and ind[-1] == x.size - 1: ind = ind[:-1] # remove peaks < minimum peak height if ind.size and mph is not None: ind = ind[x[ind] >= mph] # remove peaks - neighbors < threshold if ind.size and threshold > 0: dx = np.min(np.vstack([x[ind] - x[ind - 1], x[ind] - x[ind + 1]]), axis=0) ind = np.delete(ind, np.where(dx < threshold)[0]) # detect small peaks closer than minimum peak distance if ind.size and mpd > 1: ind = ind[np.argsort(x[ind])][::-1] # sort ind by peak height idel = np.zeros(ind.size, dtype=bool) for i in range(ind.size): if not idel[i]: # keep peaks with the same height if kpsh is True idel = idel | (ind >= ind[i] - mpd) & (ind <= ind[i] + mpd) \ & (x[ind[i]] > x[ind] if kpsh else True) idel[i] = 0 # Keep current peak # remove the small peaks and sort back the indices by their occurrence ind = np.sort(ind[~idel]) return ind def detectPeaksTime(t, y, mph, mtd): """ Extension of the detectPeaks function to detect peaks in data based on their amplitude and time difference, with a non-uniform time vector. :param t: time vector (not necessarily uniform) :param y: signal :param mph: minimal peak height :param mtd: minimal time difference :return: array of peak indexes """ # Detect peaks on signal with no restriction on inter-peak distance raw_indexes = detectPeaks(y, mph, mpd=1) if raw_indexes.size > 0: # Filter relevant peaks with temporal distance n_raw = raw_indexes.size filtered_indexes = np.array([raw_indexes[0]]) for i in range(1, n_raw): i1 = filtered_indexes[-1] i2 = raw_indexes[i] if t[i2] - t[i1] < mtd: if y[i2] > y[i1]: filtered_indexes[-1] = i2 else: filtered_indexes = np.append(filtered_indexes, i2) # Return peak indexes return filtered_indexes else: return None def detectSpikes(t, Qm, min_amp, min_dt): ''' Detect spikes on a charge density signal, and return their number, latency and rate. :param t: time vector (s) :param Qm: charge density vector (C/m2) :param min_amp: minimal charge amplitude to detect spikes (C/m2) :param min_dt: minimal time interval between 2 spikes (s) :return: 3-tuple with number of spikes, latency (s) and spike rate (sp/s) ''' i_spikes = detectPeaksTime(t, Qm, min_amp, min_dt) if i_spikes is not None: latency = t[i_spikes[0]] # s n_spikes = i_spikes.size if n_spikes > 1: first_to_last_spike = t[i_spikes[-1]] - t[i_spikes[0]] # s spike_rate = (n_spikes - 1) / first_to_last_spike # spikes/s else: spike_rate = 'N/A' else: latency = 'N/A' spike_rate = 'N/A' n_spikes = 0 return (n_spikes, latency, spike_rate) def runEStim(batch_dir, log_filepath, solver, neuron, Astim, tstim, toffset, PRF, DC): ''' Run a single E-STIM simulation a given neuron for specific stimulation parameters, and save the results in a PKL file. :param batch_dir: full path to output directory of batch :param log_filepath: full path log file of batch :param solver: SolverElec instance :param Astim: pulse amplitude (mA/m2) :param tstim: pulse duration (s) :param toffset: offset duration (s) :param PRF: pulse repetition frequency (Hz) :param DC: pulse duty cycle (-) :return: full path to the output file ''' if DC == 1.0: simcode = ESTIM_CW_code.format(neuron.name, Astim, tstim * 1e3) else: simcode = ESTIM_PW_code.format(neuron.name, Astim, tstim * 1e3, PRF * 1e-3, DC) # Get date and time info date_str = time.strftime("%Y.%m.%d") daytime_str = time.strftime("%H:%M:%S") # Run simulation tstart = time.time() (t, y, states) = solver.run(neuron, Astim, tstim, toffset, PRF, DC) Vm, *channels = y tcomp = time.time() - tstart logger.debug('completed in %.2f seconds', tcomp) # Store dataframe and metadata df = pd.DataFrame({'t': t, 'states': states, 'Vm': Vm}) for j in range(len(neuron.states_names)): df[neuron.states_names[j]] = channels[j] meta = {'neuron': neuron.name, 'Astim': Astim, 'tstim': tstim, 'toffset': toffset, 'PRF': PRF, 'DC': DC, 'tcomp': tcomp} # Export into to PKL file output_filepath = '{}/{}.pkl'.format(batch_dir, simcode) with open(output_filepath, 'wb') as fh: pickle.dump({'meta': meta, 'data': df}, fh) logger.debug('simulation data exported to "%s"', output_filepath) # Detect spikes on Vm signal n_spikes, lat, sr = detectSpikes(t, Vm, SPIKE_MIN_VAMP, SPIKE_MIN_DT) logger.debug('%u spike%s detected', n_spikes, "s" if n_spikes > 1 else "") # Export key metrics to log file log = { 'A': date_str, 'B': daytime_str, 'C': neuron.name, 'D': Astim, 'E': tstim * 1e3, 'F': PRF * 1e-3 if DC < 1 else 'N/A', 'G': DC, 'H': t.size, 'I': round(tcomp, 2), 'J': n_spikes, 'K': lat * 1e3 if isinstance(lat, float) else 'N/A', 'L': sr * 1e-3 if isinstance(sr, float) else 'N/A' } if xlslog(log_filepath, 'Data', log) == 1: logger.debug('log exported to "%s"', log_filepath) else: logger.error('log export to "%s" aborted', log_filepath) return output_filepath def runEStimBatch(batch_dir, log_filepath, neurons, stim_params): ''' Run batch E-STIM simulations of the system for various neuron types and stimulation parameters. :param batch_dir: full path to output directory of batch :param log_filepath: full path log file of batch :param neurons: list of neurons names :param stim_params: dictionary containing sweeps for all stimulation parameters :return: list of full paths to the output files ''' mandatory_params = ['amps', 'durations', 'offsets', 'PRFs', 'DCs'] for mp in mandatory_params: assert mp in stim_params, 'stim_params dictionary must contain "{}" field'.format(mp) # Define logging format ESTIM_CW_log = 'E-STIM simulation %u/%u: %s neuron, A = %.1f mA/m2, t = %.1f ms' ESTIM_PW_log = ('E-STIM simulation %u/%u: %s neuron, A = %.1f mA/m2, t = %.1f ms, ' 'PRF = %.2f kHz, DC = %.2f') logger.info("Starting E-STIM simulation batch") # Generate simulations queue sim_queue = createSimQueue(stim_params['amps'], stim_params['durations'], stim_params['offsets'], stim_params['PRFs'], stim_params['DCs']) nqueue = sim_queue.shape[0] # Initialize solver solver = SolverElec() # Run simulations nsims = len(neurons) * nqueue simcount = 0 filepaths = [] for nname in neurons: neuron = getNeuronsDict()[nname]() for i in range(nqueue): simcount += 1 Astim, tstim, toffset, PRF, DC = sim_queue[i, :] if DC == 1.0: logger.info(ESTIM_CW_log, simcount, nsims, neuron.name, Astim, tstim * 1e3) else: logger.info(ESTIM_PW_log, simcount, nsims, neuron.name, Astim, tstim * 1e3, PRF * 1e-3, DC) output_filepath = runEStim(batch_dir, log_filepath, solver, neuron, Astim, tstim, toffset, PRF, DC) filepaths.append(output_filepath) return filepaths def runAStim(batch_dir, log_filepath, solver, neuron, Fdrive, Adrive, tstim, toffset, PRF, DC, int_method='effective'): ''' Run a single A-STIM simulation a given neuron for specific stimulation parameters, and save the results in a PKL file. :param batch_dir: full path to output directory of batch :param log_filepath: full path log file of batch :param solver: SolverUS instance :param Fdrive: acoustic drive frequency (Hz) :param Adrive: acoustic drive amplitude (Pa) :param tstim: duration of US stimulation (s) :param toffset: duration of the offset (s) :param PRF: pulse repetition frequency (Hz) :param DC: pulse duty cycle (-) :param int_method: selected integration method :return: full path to the output file ''' if DC == 1.0: simcode = ASTIM_CW_code.format(neuron.name, solver.a * 1e9, Fdrive * 1e-3, Adrive * 1e-3, tstim * 1e3, int_method) else: simcode = ASTIM_PW_code.format(neuron.name, solver.a * 1e9, Fdrive * 1e-3, Adrive * 1e-3, tstim * 1e3, PRF * 1e-3, DC, int_method) # Get date and time info date_str = time.strftime("%Y.%m.%d") daytime_str = time.strftime("%H:%M:%S") # Run simulation tstart = time.time() (t, y, states) = solver.run(neuron, Fdrive, Adrive, tstim, toffset, PRF, DC, int_method) Z, ng, Qm, *channels = y U = np.insert(np.diff(Z) / np.diff(t), 0, 0.0) tcomp = time.time() - tstart logger.debug('completed in %.2f seconds', tcomp) # Store dataframe and metadata df = pd.DataFrame({'t': t, 'states': states, 'U': U, 'Z': Z, 'ng': ng, 'Qm': Qm, 'Vm': Qm * 1e3 / np.array([solver.Capct(ZZ) for ZZ in Z])}) for j in range(len(neuron.states_names)): df[neuron.states_names[j]] = channels[j] meta = {'neuron': neuron.name, 'a': solver.a, 'd': solver.d, 'Fdrive': Fdrive, 'Adrive': Adrive, 'phi': np.pi, 'tstim': tstim, 'toffset': toffset, 'PRF': PRF, 'DC': DC, 'tcomp': tcomp} # Export into to PKL file output_filepath = '{}/{}.pkl'.format(batch_dir, simcode) with open(output_filepath, 'wb') as fh: pickle.dump({'meta': meta, 'data': df}, fh) logger.debug('simulation data exported to "%s"', output_filepath) # Detect spikes on Qm signal n_spikes, lat, sr = detectSpikes(t, Qm, SPIKE_MIN_QAMP, SPIKE_MIN_DT) logger.debug('%u spike%s detected', n_spikes, "s" if n_spikes > 1 else "") # Export key metrics to log file log = { 'A': date_str, 'B': daytime_str, 'C': neuron.name, 'D': solver.a * 1e9, 'E': solver.d * 1e6, 'F': Fdrive * 1e-3, 'G': Adrive * 1e-3, 'H': tstim * 1e3, 'I': PRF * 1e-3 if DC < 1 else 'N/A', 'J': DC, 'K': int_method, 'L': t.size, 'M': round(tcomp, 2), 'N': n_spikes, 'O': lat * 1e3 if isinstance(lat, float) else 'N/A', 'P': sr * 1e-3 if isinstance(sr, float) else 'N/A' } if xlslog(log_filepath, 'Data', log) == 1: logger.debug('log exported to "%s"', log_filepath) else: logger.error('log export to "%s" aborted', log_filepath) return output_filepath def runAStimBatch(batch_dir, log_filepath, neurons, stim_params, a=default_diam, int_method='effective'): ''' Run batch simulations of the system for various neuron types, sonophore and stimulation parameters. :param batch_dir: full path to output directory of batch :param log_filepath: full path log file of batch :param neurons: list of neurons names :param stim_params: dictionary containing sweeps for all stimulation parameters :param a: BLS structure diameter (m) :param int_method: selected integration method :return: list of full paths to the output files ''' mandatory_params = ['freqs', 'amps', 'durations', 'offsets', 'PRFs', 'DCs'] for mp in mandatory_params: assert mp in stim_params, 'stim_params dictionary must contain "{}" field'.format(mp) # Define logging format ASTIM_CW_log = ('A-STIM %s simulation %u/%u: %s neuron, a = %.1f nm, f = %.2f kHz, ' 'A = %.2f kPa, t = %.2f ms') ASTIM_PW_log = ('A-STIM %s simulation %u/%u: %s neuron, a = %.1f nm, f = %.2f kHz, ' 'A = %.2f kPa, t = %.2f ms, PRF = %.2f kHz, DC = %.3f') logger.info("Starting A-STIM simulation batch") # Generate simulations queue sim_queue = createSimQueue(stim_params['amps'], stim_params['durations'], stim_params['offsets'], stim_params['PRFs'], stim_params['DCs']) nqueue = sim_queue.shape[0] # Run simulations nsims = len(neurons) * len(stim_params['freqs']) * nqueue simcount = 0 filepaths = [] for nname in neurons: neuron = getNeuronsDict()[nname]() for Fdrive in stim_params['freqs']: # Initialize SolverUS solver = SolverUS(a, neuron, Fdrive) for i in range(nqueue): simcount += 1 Adrive, tstim, toffset, PRF, DC = sim_queue[i, :] # Log and define naming if DC == 1.0: logger.info(ASTIM_CW_log, int_method, simcount, nsims, neuron.name, a * 1e9, Fdrive * 1e-3, Adrive * 1e-3, tstim * 1e3) else: logger.info(ASTIM_PW_log, int_method, simcount, nsims, neuron.name, a * 1e9, Fdrive * 1e-3, Adrive * 1e-3, tstim * 1e3, PRF * 1e-3, DC) # Run simulation output_filepath = runAStim(batch_dir, log_filepath, solver, neuron, Fdrive, Adrive, tstim, toffset, PRF, DC, int_method) filepaths.append(output_filepath) return filepaths -def titrateEStim(solver, ch_mech, Astim, tstim, toffset, PRF=1.5e3, DC=1.0): +def titrateEStim(solver, neuron, Astim, tstim, toffset, PRF=1.5e3, DC=1.0): """ Use a dichotomic recursive search to determine the threshold value of a specific electric stimulation parameter needed to obtain neural excitation, keeping all other parameters fixed. The titration parameter can be stimulation amplitude, duration or any variable for which the number of spikes is a monotonically increasing function. This function is called recursively until an accurate threshold is found. :param solver: solver instance - :param ch_mech: channels mechanism object + :param neuron: neuron object :param Astim: injected current density amplitude (mA/m2) :param tstim: duration of US stimulation (s) :param toffset: duration of the offset (s) :param PRF: pulse repetition frequency (Hz) :param DC: pulse duty cycle (-) :return: 5-tuple with the determined amplitude threshold, time profile, solution matrix, state vector and response latency """ # Determine titration type if isinstance(Astim, tuple): t_type = 'A' interval = Astim thr = TITRATION_ESTIM_DA_MAX maxval = TITRATION_ESTIM_A_MAX elif isinstance(tstim, tuple): t_type = 't' interval = tstim thr = TITRATION_DT_THR maxval = TITRATION_T_MAX elif isinstance(DC, tuple): t_type = 'DC' interval = DC thr = TITRATION_DDC_THR maxval = TITRATION_DC_MAX else: logger.error('Invalid titration type') return 0. t_var = ESTIM_params[t_type] # Check amplitude interval and define current value assert interval[0] < interval[1], '{} interval must be defined as (lb, ub)'.format(t_type) value = (interval[0] + interval[1]) / 2 # Define stimulation parameters if t_type == 'A': stim_params = [value, tstim, toffset, PRF, DC] elif t_type == 't': stim_params = [Astim, value, toffset, PRF, DC] elif t_type == 'DC': stim_params = [Astim, tstim, toffset, PRF, value] # Run simulation and detect spikes - (t, y, states) = solver.run(ch_mech, *stim_params) + (t, y, states) = solver.run(neuron, *stim_params) n_spikes, latency, _ = detectSpikes(t, y[0, :], SPIKE_MIN_VAMP, SPIKE_MIN_DT) logger.debug('%.2f %s ---> %u spike%s detected', value * t_var['factor'], t_var['unit'], n_spikes, "s" if n_spikes > 1 else "") # If accurate threshold is found, return simulation results if (interval[1] - interval[0]) <= thr and n_spikes == 1: return (value, t, y, states, latency) # Otherwise, refine titration interval and iterate recursively else: if n_spikes == 0: if (maxval - interval[1]) <= thr: # if upper bound too close to max then stop logger.warning('no spikes detected within titration interval') return (np.nan, t, y, states, latency) new_interval = (value, interval[1]) else: new_interval = (interval[0], value) stim_params[t_var['index']] = new_interval - return titrateEStim(solver, ch_mech, *stim_params) + return titrateEStim(solver, neuron, *stim_params) -def titrateAStim(solver, ch_mech, Fdrive, Adrive, tstim, toffset, PRF=1.5e3, DC=1.0, +def titrateAStim(solver, neuron, Fdrive, Adrive, tstim, toffset, PRF=1.5e3, DC=1.0, int_method='effective'): """ Use a dichotomic recursive search to determine the threshold value of a specific acoustic stimulation parameter needed to obtain neural excitation, keeping all other parameters fixed. The titration parameter can be stimulation amplitude, duration or any variable for which the number of spikes is a monotonically increasing function. This function is called recursively until an accurate threshold is found. :param solver: solver instance - :param ch_mech: channels mechanism object + :param neuron: neuron object :param Fdrive: acoustic drive frequency (Hz) :param Adrive: acoustic drive amplitude (Pa) :param tstim: duration of US stimulation (s) :param toffset: duration of the offset (s) :param PRF: pulse repetition frequency (Hz) :param DC: pulse duty cycle (-) :param int_method: selected integration method :return: 5-tuple with the determined amplitude threshold, time profile, solution matrix, state vector and response latency """ # Determine titration type if isinstance(Adrive, tuple): t_type = 'A' interval = Adrive thr = TITRATION_ASTIM_DA_MAX maxval = TITRATION_ASTIM_A_MAX elif isinstance(tstim, tuple): t_type = 't' interval = tstim thr = TITRATION_DT_THR maxval = TITRATION_T_MAX elif isinstance(DC, tuple): t_type = 'DC' interval = DC thr = TITRATION_DDC_THR maxval = TITRATION_DC_MAX else: logger.error('Invalid titration type') return 0. t_var = ASTIM_params[t_type] # Check amplitude interval and define current value assert interval[0] < interval[1], '{} interval must be defined as (lb, ub)'.format(t_type) value = (interval[0] + interval[1]) / 2 # Define stimulation parameters if t_type == 'A': stim_params = [Fdrive, value, tstim, toffset, PRF, DC] elif t_type == 't': stim_params = [Fdrive, Adrive, value, toffset, PRF, DC] elif t_type == 'DC': stim_params = [Fdrive, Adrive, tstim, toffset, PRF, value] # Run simulation and detect spikes - (t, y, states) = solver.run(ch_mech, *stim_params, int_method) + (t, y, states) = solver.run(neuron, *stim_params, int_method) n_spikes, latency, _ = detectSpikes(t, y[2, :], SPIKE_MIN_QAMP, SPIKE_MIN_DT) logger.debug('%.2f %s ---> %u spike%s detected', value * t_var['factor'], t_var['unit'], n_spikes, "s" if n_spikes > 1 else "") # If accurate threshold is found, return simulation results if (interval[1] - interval[0]) <= thr and n_spikes == 1: return (value, t, y, states, latency) # Otherwise, refine titration interval and iterate recursively else: if n_spikes == 0: if (maxval - interval[1]) <= thr: # if upper bound too close to max then stop logger.warning('no spikes detected within titration interval') return (np.nan, t, y, states, latency) new_interval = (value, interval[1]) else: new_interval = (interval[0], value) stim_params[t_var['index']] = new_interval - return titrateAStim(solver, ch_mech, *stim_params, int_method) + return titrateAStim(solver, neuron, *stim_params, int_method) def titrateAStimBatch(batch_dir, log_filepath, neurons, stim_params, a=default_diam, int_method='effective'): ''' Run batch acoustic titrations of the system for various neuron types, sonophore and stimulation parameters, to determine the threshold of a specific stimulus parameter for neural excitation. :param batch_dir: full path to output directory of batch :param log_filepath: full path log file of batch :param neurons: list of neurons names :param stim_params: dictionary containing sweeps for all stimulation parameters :param a: BLS structure diameter (m) :param int_method: selected integration method :return: list of full paths to the output files ''' # Define logging format ASTIM_titration_log = '%s neuron - A-STIM titration %u/%u (a = %.1f nm, %s)' logger.info("Starting A-STIM titration batch") # Define default parameters int_method = 'effective' # Determine titration parameter and titrations list if 'durations' not in stim_params: t_type = 't' sim_queue = createSimQueue(stim_params['amps'], [None], [TITRATION_T_OFFSET], stim_params['PRFs'], stim_params['DCs']) # sim_queue = np.delete(sim_queue, 1, axis=1) elif 'amps' not in stim_params: t_type = 'A' sim_queue = createSimQueue([None], stim_params['durations'], [TITRATION_T_OFFSET] * len(stim_params['durations']), stim_params['PRFs'], stim_params['DCs']) elif 'DC' not in stim_params: t_type = 'DC' sim_queue = createSimQueue(stim_params['amps'], stim_params['durations'], [TITRATION_T_OFFSET] * len(stim_params['durations']), stim_params['PRFs'], [None]) nqueue = sim_queue.shape[0] t_var = ASTIM_params[t_type] # Run titrations nsims = len(neurons) * len(stim_params['freqs']) * nqueue simcount = 0 filepaths = [] for nname in neurons: neuron = getNeuronsDict()[nname]() for Fdrive in stim_params['freqs']: try: # Create SolverUS instance (modulus of embedding tissue depends on frequency) solver = SolverUS(a, neuron, Fdrive) for i in range(nqueue): simcount += 1 # Extract parameters Adrive, tstim, toffset, PRF, DC = sim_queue[i, :] if Adrive is None: Adrive = (0., 2 * TITRATION_ASTIM_A_MAX) elif tstim is None: tstim = (0., 2 * TITRATION_T_MAX) elif DC is None: DC = (0., 2 * TITRATION_DC_MAX) curr_params = [Fdrive, Adrive, tstim, PRF, DC] # Generate log str log_str = '' pnames = list(ASTIM_params.keys()) j = 0 for cp in curr_params: pn = pnames[j] pi = ASTIM_params[pn] if not isinstance(cp, tuple): if log_str: log_str += ', ' log_str += '{} = {:.2f} {}'.format(pn, pi['factor'] * cp, pi['unit']) j += 1 # Get date and time info date_str = time.strftime("%Y.%m.%d") daytime_str = time.strftime("%H:%M:%S") # Log logger.info(ASTIM_titration_log, neuron.name, simcount, nsims, a * 1e9, log_str) # Run titration tstart = time.time() (output_thr, t, y, states, lat) = titrateAStim(solver, neuron, Fdrive, Adrive, tstim, toffset, PRF, DC) Z, ng, Qm, *channels = y U = np.insert(np.diff(Z) / np.diff(t), 0, 0.0) tcomp = time.time() - tstart logger.info('completed in %.2f s, threshold = %.2f %s', tcomp, output_thr * t_var['factor'], t_var['unit']) # Determine output variable if t_type == 'A': Adrive = output_thr elif t_type == 't': tstim = output_thr elif t_type == 'DC': DC = output_thr # Define output naming if DC == 1.0: simcode = ASTIM_CW_code.format(neuron.name, a * 1e9, Fdrive * 1e-3, Adrive * 1e-3, tstim * 1e3, int_method) else: simcode = ASTIM_PW_code.format(neuron.name, a * 1e9, Fdrive * 1e-3, Adrive * 1e-3, tstim * 1e3, PRF * 1e-3, DC, int_method) # Store dataframe and metadata df = pd.DataFrame({'t': t, 'states': states, 'U': U, 'Z': Z, 'ng': ng, 'Qm': Qm, 'Vm': Qm * 1e3 / np.array([solver.Capct(ZZ) for ZZ in Z])}) for j in range(len(neuron.states_names)): df[neuron.states_names[j]] = channels[j] meta = {'neuron': neuron.name, 'a': solver.a, 'd': solver.d, 'Fdrive': Fdrive, 'Adrive': Adrive, 'phi': np.pi, 'tstim': tstim, 'toffset': toffset, 'PRF': PRF, 'DC': DC, 'tcomp': tcomp} # Export into to PKL file output_filepath = '{}/{}.pkl'.format(batch_dir, simcode) with open(output_filepath, 'wb') as fh: pickle.dump({'meta': meta, 'data': df}, fh) logger.debug('simulation data exported to "%s"', output_filepath) filepaths.append(output_filepath) # Detect spikes on Qm signal n_spikes, lat, sr = detectSpikes(t, Qm, SPIKE_MIN_QAMP, SPIKE_MIN_DT) logger.info('%u spike%s detected', n_spikes, "s" if n_spikes > 1 else "") # Export key metrics to log file log = { 'A': date_str, 'B': daytime_str, 'C': neuron.name, 'D': solver.a * 1e9, 'E': solver.d * 1e6, 'F': Fdrive * 1e-3, 'G': Adrive * 1e-3, 'H': tstim * 1e3, 'I': PRF * 1e-3 if DC < 1 else 'N/A', 'J': DC, 'K': int_method, 'L': t.size, 'M': round(tcomp, 2), 'N': n_spikes, 'O': lat * 1e3 if isinstance(lat, float) else 'N/A', 'P': sr * 1e-3 if isinstance(sr, float) else 'N/A' } if xlslog(log_filepath, 'Data', log) == 1: logger.info('log exported to "%s"', log_filepath) else: logger.error('log export to "%s" aborted', log_filepath) except AssertionError as err: logger.error(err) return filepaths def titrateEStimBatch(batch_dir, log_filepath, neurons, stim_params): ''' Run batch electrical titrations of the system for various neuron types and stimulation parameters, to determine the threshold of a specific stimulus parameter for neural excitation. :param batch_dir: full path to output directory of batch :param log_filepath: full path log file of batch :param neurons: list of neurons names :param stim_params: dictionary containing sweeps for all stimulation parameters :return: list of full paths to the output files ''' # Define logging format ESTIM_titration_log = '%s neuron - E-STIM titration %u/%u (%s)' logger.info("Starting E-STIM titration batch") # Determine titration parameter and titrations list if 'durations' not in stim_params: t_type = 't' sim_queue = createSimQueue(stim_params['amps'], [None], [TITRATION_T_OFFSET], stim_params['PRFs'], stim_params['DCs']) elif 'amps' not in stim_params: t_type = 'A' sim_queue = createSimQueue([None], stim_params['durations'], [TITRATION_T_OFFSET] * len(stim_params['durations']), stim_params['PRFs'], stim_params['DCs']) elif 'DC' not in stim_params: t_type = 'DC' sim_queue = createSimQueue(stim_params['amps'], stim_params['durations'], [TITRATION_T_OFFSET] * len(stim_params['durations']), stim_params['PRFs'], [None]) nqueue = sim_queue.shape[0] t_var = ESTIM_params[t_type] # Create SolverElec instance solver = SolverElec() # Run titrations nsims = len(neurons) * nqueue simcount = 0 filepaths = [] for nname in neurons: neuron = getNeuronsDict()[nname]() try: for i in range(nqueue): simcount += 1 # Extract parameters Astim, tstim, toffset, PRF, DC = sim_queue[i, :] if Astim is None: Astim = (0., 2 * TITRATION_ESTIM_A_MAX) elif tstim is None: tstim = (0., 2 * TITRATION_T_MAX) elif DC is None: DC = (0., 2 * TITRATION_DC_MAX) curr_params = [Astim, tstim, PRF, DC] # Generate log str log_str = '' pnames = list(ESTIM_params.keys()) j = 0 for cp in curr_params: pn = pnames[j] pi = ESTIM_params[pn] if not isinstance(cp, tuple): if log_str: log_str += ', ' log_str += '{} = {:.2f} {}'.format(pn, pi['factor'] * cp, pi['unit']) j += 1 # Get date and time info date_str = time.strftime("%Y.%m.%d") daytime_str = time.strftime("%H:%M:%S") # Log logger.info(ESTIM_titration_log, neuron.name, simcount, nsims, log_str) # Run titration tstart = time.time() (output_thr, t, y, states, lat) = titrateEStim(solver, neuron, Astim, tstim, toffset, PRF, DC) Vm, *channels = y tcomp = time.time() - tstart logger.info('completed in %.2f s, threshold = %.2f %s', tcomp, output_thr * t_var['factor'], t_var['unit']) # Determine output variable if t_type == 'A': Astim = output_thr elif t_type == 't': tstim = output_thr elif t_type == 'DC': DC = output_thr # Define output naming if DC == 1.0: simcode = ESTIM_CW_code.format(neuron.name, Astim, tstim * 1e3) else: simcode = ESTIM_PW_code.format(neuron.name, Astim, tstim * 1e3, PRF * 1e-3, DC) # Store dataframe and metadata df = pd.DataFrame({'t': t, 'states': states, 'Vm': Vm}) for j in range(len(neuron.states_names)): df[neuron.states_names[j]] = channels[j] meta = {'neuron': neuron.name, 'Astim': Astim, 'tstim': tstim, 'toffset': toffset, 'PRF': PRF, 'DC': DC, 'tcomp': tcomp} # Export into to PKL file output_filepath = '{}/{}.pkl'.format(batch_dir, simcode) with open(output_filepath, 'wb') as fh: pickle.dump({'meta': meta, 'data': df}, fh) logger.info('simulation data exported to "%s"', output_filepath) filepaths.append(output_filepath) # Detect spikes on Qm signal n_spikes, lat, sr = detectSpikes(t, Vm, SPIKE_MIN_VAMP, SPIKE_MIN_DT) logger.info('%u spike%s detected', n_spikes, "s" if n_spikes > 1 else "") # Export key metrics to log file log = { 'A': date_str, 'B': daytime_str, 'C': neuron.name, 'D': Astim, 'E': tstim * 1e3, 'F': PRF * 1e-3 if DC < 1 else 'N/A', 'G': DC, 'H': t.size, 'I': round(tcomp, 2), 'J': n_spikes, 'K': lat * 1e3 if isinstance(lat, float) else 'N/A', 'L': sr * 1e-3 if isinstance(sr, float) else 'N/A' } if xlslog(log_filepath, 'Data', log) == 1: logger.info('log exported to "%s"', log_filepath) else: logger.error('log export to "%s" aborted', log_filepath) except AssertionError as err: logger.error(err) return filepaths def runMechBatch(batch_dir, log_filepath, Cm0, Qm0, stim_params, a=default_diam, d=default_embedding): ''' Run batch simulations of the mechanical system with imposed values of charge density, for various sonophore spans and stimulation parameters. :param batch_dir: full path to output directory of batch :param log_filepath: full path log file of batch :param Cm0: membrane resting capacitance (F/m2) :param Qm0: membrane resting charge density (C/m2) :param stim_params: dictionary containing sweeps for all stimulation parameters :param a: BLS in-plane diameter (m) :param d: depth of embedding tissue around plasma membrane (m) ''' # Define logging format MECH_log = ('Mechanical simulation %u/%u (a = %.1f nm, d = %.1f um, f = %.2f kHz, ' 'A = %.2f kPa, Q = %.1f nC/cm2)') logger.info("Starting mechanical simulation batch") # Unpack stimulation parameters amps = np.array(stim_params['amps']) charges = np.array(stim_params['charges']) # Generate simulations queue nA = len(amps) nQ = len(charges) sim_queue = np.array(np.meshgrid(amps, charges)).T.reshape(nA * nQ, 2) nqueue = sim_queue.shape[0] # Run simulations nsims = len(stim_params['freqs']) * nqueue simcount = 0 filepaths = [] for Fdrive in stim_params['freqs']: try: # Create BilayerSonophore instance (modulus of embedding tissue depends on frequency) bls = BilayerSonophore(a, Fdrive, Cm0, Qm0, d) for i in range(nqueue): simcount += 1 Adrive, Qm = sim_queue[i, :] # Get date and time info date_str = time.strftime("%Y.%m.%d") daytime_str = time.strftime("%H:%M:%S") # Log and define naming logger.info(MECH_log, simcount, nsims, a * 1e9, d * 1e6, Fdrive * 1e-3, Adrive * 1e-3, Qm * 1e5) simcode = MECH_code.format(a * 1e9, Fdrive * 1e-3, Adrive * 1e-3, Qm * 1e5) # Run simulation tstart = time.time() (t, y, states) = bls.runMech(Fdrive, Adrive, Qm) (Z, ng) = y U = np.insert(np.diff(Z) / np.diff(t), 0, 0.0) tcomp = time.time() - tstart logger.info('completed in %.2f seconds', tcomp) # Store dataframe and metadata df = pd.DataFrame({'t': t, 'states': states, 'U': U, 'Z': Z, 'ng': ng}) meta = {'a': solver.a, 'd': solver.d, 'Cm0': Cm0, 'Qm0': Qm0, 'Fdrive': Fdrive, 'Adrive': Adrive, 'phi': np.pi, 'Qm': Qm, 'tcomp': tcomp} # Export into to PKL file output_filepath = '{}/{}.pkl'.format(batch_dir, simcode) with open(output_filepath, 'wb') as fh: pickle.dump({'meta': meta, 'data': df}, fh) logger.debug('simulation data exported to "%s"', output_filepath) # Compute key output metrics Zmax = np.amax(Z) Zmin = np.amin(Z) Zabs_max = np.amax(np.abs([Zmin, Zmax])) eAmax = bls.arealstrain(Zabs_max) Tmax = bls.TEtot(Zabs_max) Pmmax = bls.PMavgpred(Zmin) ngmax = np.amax(ng) dUdtmax = np.amax(np.abs(np.diff(U) / np.diff(t)**2)) # Export key metrics to log file log = { 'A': date_str, 'B': daytime_str, 'C': a * 1e9, 'D': d * 1e6, 'E': Fdrive * 1e-3, 'F': Adrive * 1e-3, 'G': Qm * 1e5, 'H': t.size, 'I': tcomp, 'J': bls.kA + bls.kA_tissue, 'K': Zmax * 1e9, 'L': eAmax, 'M': Tmax * 1e3, 'N': (ngmax - bls.ng0) / bls.ng0, 'O': Pmmax * 1e-3, 'P': dUdtmax } if xlslog(log_filepath, 'Data', log) == 1: logger.info('log exported to "%s"', log_filepath) else: logger.error('log export to "%s" aborted', log_filepath) except AssertionError as err: logger.error(err) return filepaths diff --git a/sim/ASTIM_lookups.py b/sim/ASTIM_lookups.py index a41f6d9..a7264a5 100644 --- a/sim/ASTIM_lookups.py +++ b/sim/ASTIM_lookups.py @@ -1,42 +1,42 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Date: 2017-06-02 17:50:10 # @Email: theo.lemaire@epfl.ch # @Last Modified by: Theo Lemaire -# @Last Modified time: 2018-03-12 19:45:18 +# @Last Modified time: 2018-03-13 14:32:29 """ Create lookup tables for different acoustic frequencies. """ import logging import numpy as np import PointNICE from PointNICE.utils import logger from PointNICE.neurons import * # Set logging level logger.setLevel(logging.DEBUG) # BLS diameter (m) a = 32e-9 # Channel mechanisms neurons = [LeechPressure()] # Stimulation parameters freqs = np.arange(100, 1001, 100) * 1e3 # Hz amps = np.logspace(np.log10(0.1), np.log10(600), num=50) * 1e3 # Pa amps = np.insert(amps, 0, 0.0) # adding amplitude 0 logger.info('Starting batch lookup creation') -for ch_mech in neurons: +for neuron in neurons: # Create a SolverUS instance (with dummy frequency parameter) - solver = PointNICE.SolverUS(a, ch_mech, 0.0) - charges = np.arange(np.round(ch_mech.Vm0 - 10.0), 50.0 + 1.0, 1.0) * 1e-5 # C/m2 + solver = PointNICE.SolverUS(a, neuron, 0.0) + charges = np.arange(np.round(neuron.Vm0 - 10.0), 50.0 + 1.0, 1.0) * 1e-5 # C/m2 # Create lookup file - solver.createLookup(ch_mech, freqs, amps, charges) + solver.createLookup(neuron, freqs, amps, charges) logger.info('Lookup tables successfully created')