diff --git a/PointNICE/plt/pltutils.py b/PointNICE/plt/pltutils.py index 9e32ef5..7bfe4c6 100644 --- a/PointNICE/plt/pltutils.py +++ b/PointNICE/plt/pltutils.py @@ -1,533 +1,557 @@ # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Date: 2017-08-23 14:55:37 # @Last Modified by: Theo Lemaire -# @Last Modified time: 2017-08-25 17:18:54 +# @Last Modified time: 2017-08-25 17:38:50 ''' Plotting utilities ''' import pickle import ntpath import re import logging import inspect import tkinter as tk from tkinter import filedialog import numpy as np import matplotlib.pyplot as plt from matplotlib.patches import Rectangle from .. import channels from ..bls import BilayerSonophore from .pltvars import pltvars # Get package logger logger = logging.getLogger('PointNICE') # Define global variables neuron = None bls = None # Regular expression for input files rgxp = re.compile('(ESTIM|ASTIM)_([A-Za-z]*)_(.*).pkl') rgxp_mech = re.compile('(MECH)_(.*).pkl') # Figure naming conventions ESTIM_CW_title = '{} neuron: CW E-STIM {:.2f}mA/m2, {:.0f}ms' ESTIM_PW_title = '{} neuron: PW E-STIM {:.2f}mA/m2, {:.0f}ms, {:.2f}kHz PRF, {:.0f}% DC' ASTIM_CW_title = '{} neuron: CW A-STIM {:.0f}kHz, {:.0f}kPa, {:.0f}ms' ASTIM_PW_title = '{} neuron: PW A-STIM {:.0f}kHz, {:.0f}kPa, {:.0f}ms, {:.2f}kHz PRF, {:.0f}% DC' MECH_title = '{:.0f}nm BLS structure: MECH-STIM {:.0f}kHz, {:.0f}kPa' class InteractiveLegend(object): """ Class defining an interactive matplotlib legend, where lines visibility can be toggled by simply clicking on the corresponding legend label. Other graphic objects can also be associated to the toggle of a specific line Adapted from: http://stackoverflow.com/questions/31410043/hiding-lines-after-showing-a-pyplot-figure """ def __init__(self, legend, aliases): self.legend = legend self.fig = legend.axes.figure self.lookup_artist, self.lookup_handle = self._build_lookups(legend) self._setup_connections() self.handles_aliases = aliases self.update() def _setup_connections(self): for artist in self.legend.texts + self.legend.legendHandles: artist.set_picker(10) # 10 points tolerance self.fig.canvas.mpl_connect('pick_event', self.on_pick) def _build_lookups(self, legend): ''' Method of the InteractiveLegend class building the legend lookups. ''' labels = [t.get_text() for t in legend.texts] handles = legend.legendHandles label2handle = dict(zip(labels, handles)) handle2text = dict(zip(handles, legend.texts)) lookup_artist = {} lookup_handle = {} for artist in legend.axes.get_children(): if artist.get_label() in labels: handle = label2handle[artist.get_label()] lookup_handle[artist] = handle lookup_artist[handle] = artist lookup_artist[handle2text[handle]] = artist lookup_handle.update(zip(handles, handles)) lookup_handle.update(zip(legend.texts, handles)) return lookup_artist, lookup_handle def on_pick(self, event): handle = event.artist if handle in self.lookup_artist: artist = self.lookup_artist[handle] artist.set_visible(not artist.get_visible()) self.update() def update(self): for artist in self.lookup_artist.values(): handle = self.lookup_handle[artist] if artist.get_visible(): handle.set_visible(True) if artist in self.handles_aliases: for al in self.handles_aliases[artist]: al.set_visible(True) else: handle.set_visible(False) if artist in self.handles_aliases: for al in self.handles_aliases[artist]: al.set_visible(False) self.fig.canvas.draw() def show(self): ''' showing the interactive legend ''' plt.show() def getPatchesLoc(t, states): ''' Determine the location of stimulus patches. :param t: simulation time vector (s). :param states: a vector of stimulation state (ON/OFF) at each instant in time. :return: 3-tuple with number of patches, timing of STIM-ON an STIM-OFF instants. ''' # Compute states derivatives and identify bounds indexes of pulses dstates = np.diff(states) ipatch_on = np.insert(np.where(dstates > 0.0)[0] + 1, 0, 0) ipatch_off = np.where(dstates < 0.0)[0] # Get time instants for pulses ON and OFF npatches = ipatch_on.size tpatch_on = t[ipatch_on] tpatch_off = t[ipatch_off] # return 3-tuple with #patches, pulse ON and pulse OFF instants return (npatches, tpatch_on, tpatch_off) def SaveFigDialog(dirname, filename): """ Open a FileSaveDialogBox to set the directory and name of the figure to be saved. The default directory and filename are given, and the default extension is ".pdf" :param dirname: default directory :param filename: default filename :return: full path to the chosen filename """ root = tk.Tk() root.withdraw() filename_out = filedialog.asksaveasfilename(defaultextension=".pdf", initialdir=dirname, initialfile=filename) return filename_out def plotComp(yvars, filepaths, fs=15, show_patches=True): ''' Compare profiles of several specific output variables of NICE simulations. :param yvars: list of variables names to extract and compare :param filepaths: list of full paths to output data files to be compared :param fs: labels fontsize :param show_patches: boolean indicating whether to indicate periods of stimulation with colored rectangular patches ''' nvars = len(yvars) - # x and y variables plotting information - t_plt = pltvars['t'] + # y variables plotting information y_pltvars = [pltvars[key] for key in yvars] # Dictionary of neurons neurons = {} for _, obj in inspect.getmembers(channels): if inspect.isclass(obj) and isinstance(obj.name, str): neurons[obj.name] = obj - # Initialize figure and axes if nvars == 1: _, ax = plt.subplots(figsize=(11, 4)) axes = [ax] else: _, axes = plt.subplots(nvars, 1, figsize=(11, min(3 * nvars, 9))) for i in range(nvars): ax = axes[i] pltvar = y_pltvars[i] if 'min' in pltvar and 'max' in pltvar: ax.set_ylim(pltvar['min'], pltvar['max']) if pltvar['unit']: ax.set_ylabel('${}\ ({})$'.format(pltvar['label'], pltvar['unit']), fontsize=fs) else: ax.set_ylabel('${}$'.format(pltvar['label']), fontsize=fs) if i < nvars - 1: ax.get_xaxis().set_ticklabels([]) else: - ax.set_xlabel('${}\ ({})$'.format(t_plt['label'], t_plt['unit']), fontsize=fs) for tick in ax.xaxis.get_major_ticks(): tick.label.set_fontsize(fs) ax.locator_params(axis='y', nbins=2) for tick in ax.yaxis.get_major_ticks(): tick.label.set_fontsize(fs) - - # Loop through data files - tstim_ref = 0.0 - nstim = 0 j = 0 aliases = {} for filepath in filepaths: pkl_filename = ntpath.basename(filepath) - # Retrieve neuron name - mo = rgxp.fullmatch(pkl_filename) - if not mo: + # Retrieve sim type + mo1 = rgxp.fullmatch(pkl_filename) + mo2 = rgxp_mech.fullmatch(pkl_filename) + if mo1: + mo = mo1 + elif mo2: + mo = mo2 + else: print('Error: PKL file does not match regular expression pattern') quit() sim_type = mo.group(1) - neuron_name = mo.group(2) + assert sim_type in ['MECH', 'ASTIM', 'ESTIM'], 'invalid stimulation type' + + if j == 0: + sim_type_ref = sim_type + else: + assert sim_type == sim_type_ref, 'Error: comparing different simulation types' # Load data print('Loading data from "' + pkl_filename + '"') with open(filepath, 'rb') as pkl_file: data = pickle.load(pkl_file) - # Extract useful variables + # Extract variables + print('Extracting variables') t = data['t'] states = data['states'] - tstim = data['tstim'] + nsamples = t.size - # Initialize BLS and channels mechanism - global neuron - neuron = neurons[neuron_name]() - neuron_states = [data[sn] for sn in neuron.states_names] + # Initialize channel mechanism + if sim_type in ['ASTIM', 'ESTIM']: + neuron_name = mo.group(2) + global neuron + neuron = neurons[neuron_name]() + neuron_states = [data[sn] for sn in neuron.states_names] + Cm0 = neuron.Cm0 + Qm0 = Cm0 * neuron.Vm0 * 1e-3 + t_plt = pltvars['t_ms'] + else: + Cm0 = data['Cm0'] + Qm0 = data['Qm0'] + t_plt = pltvars['t_us'] # Initialize BLS - if sim_type == 'A': + if sim_type in ['MECH', 'ASTIM']: global bls params = data['params'] Fdrive = data['Fdrive'] a = data['a'] d = data['d'] geom = {"a": a, "d": d} - Qm0 = neuron.Cm0 * neuron.Vm0 * 1e-3 - bls = BilayerSonophore(geom, params, Fdrive, neuron.Cm0, Qm0) + bls = BilayerSonophore(geom, params, Fdrive, Cm0, Qm0) + + # Determine patches location + npatches, tpatch_on, tpatch_off = getPatchesLoc(t, states) - # Get data of variables to plot + # Adding onset to time and states vectors + if t_plt['onset'] > 0.0: + t = np.insert(t, 0, -t_plt['onset']) + states = np.insert(states, 0, 0) + + # Extract variables to plot vrs = [] for i in range(nvars): pltvar = y_pltvars[i] if 'alias' in pltvar: var = eval(pltvar['alias']) elif 'key' in pltvar: var = data[pltvar['key']] + elif 'constant' in pltvar: + var = eval(pltvar['constant']) * np.ones(nsamples) else: var = data[yvars[i]] + if var.size == t.size - 1: + var = np.insert(var, 0, var[0]) vrs.append(var) - # Determine patches location - npatches, tpatch_on, tpatch_off = getPatchesLoc(t, states) + # # Determine patches location + # npatches, tpatch_on, tpatch_off = getPatchesLoc(t, states) - # Adding onset to all signals - if t_plt['onset'] > 0.0: - t = np.insert(t + t_plt['onset'], 0, 0.0) - for i in range(nvars): - vrs[i] = np.insert(vrs[i], 0, vrs[i][0]) - tpatch_on += t_plt['onset'] - tpatch_off += t_plt['onset'] + # # Adding onset to all signals + # if t_plt['onset'] > 0.0: + # t = np.insert(t + t_plt['onset'], 0, 0.0) + # for i in range(nvars): + # vrs[i] = np.insert(vrs[i], 0, vrs[i][0]) + # tpatch_on += t_plt['onset'] + # tpatch_off += t_plt['onset'] # Legend label - if sim_type == 'E': - label = ESTIM_CW_title.format(neuron.name, data['Astim'], data['tstim'] * 1e3) - elif sim_type == 'A': + if sim_type == 'ESTIM': + if data['DF'] == 1.0: + label = ESTIM_CW_title.format(neuron.name, data['Astim'], data['tstim'] * 1e3) + else: + label = ESTIM_PW_title.format(neuron.name, data['Astim'], data['tstim'] * 1e3, + data['PRF'] * 1e-3, data['DF'] * 1e2) + elif sim_type == 'ASTIM': if data['DF'] == 1.0: label = ASTIM_CW_title.format(neuron.name, Fdrive * 1e-3, data['Adrive'] * 1e-3, data['tstim'] * 1e3) else: label = ASTIM_PW_title.format(neuron.name, Fdrive * 1e-3, data['Adrive'] * 1e-3, data['tstim'] * 1e3, data['PRF'] * 1e-3, data['DF'] * 1e2) + elif sim_type == 'MECH': + label = MECH_title.format(a * 1e9, Fdrive * 1e-3, data['Adrive'] * 1e-3) # Plotting handles = [axes[i].plot(t * t_plt['factor'], vrs[i] * y_pltvars[i]['factor'], linewidth=2, label=label) for i in range(nvars)] if show_patches: k = 0 # stimulation patches for ax in axes: handle = handles[k] (ybottom, ytop) = ax.get_ylim() la = [] for i in range(npatches): la.append(ax.add_patch(Rectangle((tpatch_on[i] * t_plt['factor'], ybottom), (tpatch_off[i] - tpatch_on[i]) * t_plt['factor'], ytop - ybottom, color=handle[0].get_color(), alpha=0.2))) aliases[handle[0]] = la k += 1 - if tstim != tstim_ref: - if nstim == 0: - nstim += 1 - tstim_ref = tstim - else: - print('Warning: comparing different stimulation durations') - j += 1 + # set x-axis label + axes[-1].set_xlabel('${}\ ({})$'.format(t_plt['label'], t_plt['unit']), fontsize=fs) + plt.tight_layout() iLegends = [] for k in range(nvars): axes[k].legend(loc='upper left', fontsize=fs) iLegends.append(InteractiveLegend(axes[k].legend_, aliases)) plt.show() def plotBatch(vars_dict, directory, filepaths, plt_show=True, plt_save=False, ask_before_save=1, fig_ext='png', tag='fig', fs=15, lw=4, title=True, show_patches=True): ''' Plot a figure with profiles of several specific NICE output variables, for several NICE simulations. :param vars_dict: dict of lists of variables names to extract and plot together :param positions: subplot indexes of each variable :param filepaths: list of full paths to output data files to be compared :param fs: labels fontsize :param show_patches: boolean indicating whether to indicate periods of stimulation with colored rectangular patches ''' labels = list(vars_dict.keys()) naxes = len(vars_dict) # Dictionary of neurons neurons = {} for _, obj in inspect.getmembers(channels): if inspect.isclass(obj) and isinstance(obj.name, str): neurons[obj.name] = obj # Loop through data files for filepath in filepaths: # Get code from file name pkl_filename = ntpath.basename(filepath) filecode = pkl_filename[0:-4] - # Retrieve neuron name + # Retrieve sim type mo1 = rgxp.fullmatch(pkl_filename) mo2 = rgxp_mech.fullmatch(pkl_filename) if mo1: mo = mo1 elif mo2: mo = mo2 else: print('Error: PKL file does not match regular expression pattern') quit() sim_type = mo.group(1) assert sim_type in ['MECH', 'ASTIM', 'ESTIM'], 'invalid stimulation type' # Load data print('Loading data from "' + pkl_filename + '"') with open(filepath, 'rb') as pkl_file: data = pickle.load(pkl_file) # Extract variables print('Extracting variables') t = data['t'] states = data['states'] nsamples = t.size # Initialize channel mechanism if sim_type in ['ASTIM', 'ESTIM']: neuron_name = mo.group(2) global neuron neuron = neurons[neuron_name]() neuron_states = [data[sn] for sn in neuron.states_names] Cm0 = neuron.Cm0 Qm0 = Cm0 * neuron.Vm0 * 1e-3 t_plt = pltvars['t_ms'] else: Cm0 = data['Cm0'] Qm0 = data['Qm0'] t_plt = pltvars['t_us'] # Initialize BLS if sim_type in ['MECH', 'ASTIM']: global bls params = data['params'] Fdrive = data['Fdrive'] a = data['a'] d = data['d'] geom = {"a": a, "d": d} bls = BilayerSonophore(geom, params, Fdrive, Cm0, Qm0) # Determine patches location npatches, tpatch_on, tpatch_off = getPatchesLoc(t, states) - # Adding onset to time vector and patches + # Adding onset to time and states vectors if t_plt['onset'] > 0.0: t = np.insert(t, 0, -t_plt['onset']) states = np.insert(states, 0, 0) - # tpatch_on += t_plt['onset'] - # tpatch_off += t_plt['onset'] # Plotting if naxes == 1: _, ax = plt.subplots(figsize=(11, 4)) axes = [ax] else: _, axes = plt.subplots(naxes, 1, figsize=(11, min(3 * naxes, 9))) for i in range(naxes): ax = axes[i] ax_pltvars = [pltvars[j] for j in vars_dict[labels[i]]] nvars = len(ax_pltvars) # X-axis if i < naxes - 1: ax.get_xaxis().set_ticklabels([]) else: ax.set_xlabel('${}\ ({})$'.format(t_plt['label'], t_plt['unit']), fontsize=fs) for tick in ax.xaxis.get_major_ticks(): tick.label.set_fontsize(fs) # Y-axis if ax_pltvars[0]['unit']: ax.set_ylabel('${}\ ({})$'.format(labels[i], ax_pltvars[0]['unit']), fontsize=fs) else: ax.set_ylabel('${}$'.format(labels[i]), fontsize=fs) if 'min' in ax_pltvars[0] and 'max' in ax_pltvars[0]: ax_min = min([ap['min'] for ap in ax_pltvars]) ax_max = max([ap['max'] for ap in ax_pltvars]) ax.set_ylim(ax_min, ax_max) ax.locator_params(axis='y', nbins=2) for tick in ax.yaxis.get_major_ticks(): tick.label.set_fontsize(fs) # Time series icolor = 0 for j in range(nvars): # Extract variable pltvar = ax_pltvars[j] if 'alias' in pltvar: var = eval(pltvar['alias']) elif 'key' in pltvar: var = data[pltvar['key']] elif 'constant' in pltvar: var = eval(pltvar['constant']) * np.ones(nsamples) else: var = data[vars_dict[labels[i]][j]] if var.size == t.size - 1: var = np.insert(var, 0, var[0]) # Plot variable if 'constant' in pltvar: ax.plot(t * t_plt['factor'], var * pltvar['factor'], '--', c='black', lw=lw, label='${}$'.format(pltvar['label'])) else: ax.plot(t * t_plt['factor'], var * pltvar['factor'], c='C{}'.format(icolor), lw=lw, label='${}$'.format(pltvar['label'])) icolor += 1 # Patches if show_patches == 1: (ybottom, ytop) = ax.get_ylim() for j in range(npatches): ax.add_patch(Rectangle((tpatch_on[j] * t_plt['factor'], ybottom), (tpatch_off[j] - tpatch_on[j]) * t_plt['factor'], ytop - ybottom, color='#8A8A8A', alpha=0.1)) # Legend if nvars > 1: ax.legend(fontsize=fs, loc=7, ncol=nvars // 4 + 1) # Title if title: if sim_type == 'ESTIM': if data['DF'] == 1.0: fig_title = ESTIM_CW_title.format(neuron.name, data['Astim'], data['tstim'] * 1e3) else: fig_title = ESTIM_PW_title.format(neuron.name, data['Astim'], data['tstim'] * 1e3, data['PRF'] * 1e-3, data['DF'] * 1e2) elif sim_type == 'ASTIM': if data['DF'] == 1.0: fig_title = ASTIM_CW_title.format(neuron.name, Fdrive * 1e-3, data['Adrive'] * 1e-3, data['tstim'] * 1e3) else: fig_title = ASTIM_PW_title.format(neuron.name, Fdrive * 1e-3, data['Adrive'] * 1e-3, data['tstim'] * 1e3, data['PRF'] * 1e-3, data['DF'] * 1e2) elif sim_type == 'MECH': fig_title = MECH_title.format(a * 1e9, Fdrive * 1e-3, data['Adrive'] * 1e-3) axes[0].set_title(fig_title, fontsize=fs) plt.tight_layout() # Save figure if needed (automatic or checked) if plt_save == 1: if ask_before_save == 1: plt_filename = SaveFigDialog(directory, '{}_{}.{}'.format(filecode, tag, fig_ext)) else: plt_filename = '{}/{}_{}.{}'.format(directory, filecode, tag, fig_ext) if plt_filename: plt.savefig(plt_filename) print('Saving figure as "{}"'.format(plt_filename)) plt.close() # Show all plots if needed if plt_show == 1: plt.show() diff --git a/PointNICE/solvers/simutils.py b/PointNICE/solvers/simutils.py index bc003d6..53a9efc 100644 --- a/PointNICE/solvers/simutils.py +++ b/PointNICE/solvers/simutils.py @@ -1,1143 +1,1143 @@ # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Date: 2017-08-22 14:33:04 # @Last Modified by: Theo Lemaire -# @Last Modified time: 2017-08-25 16:47:56 +# @Last Modified time: 2017-08-25 17:50:57 """ Utility functions used in simulations """ import os import time import logging import pickle import shutil import tkinter as tk from tkinter import filedialog import numpy as np from openpyxl import load_workbook from ..bls import BilayerSonophore from .SolverUS import SolverUS from .SolverElec import SolverElec from ..constants import * # Get package logger logger = logging.getLogger('PointNICE') # Naming and logging settings MECH_code = 'MECH_{:.0f}nm_{:.0f}kHz_{:.0f}kPa_{:.1f}nCcm2' MECH_log = ('Mechanical simulation %u/%u (a = %.1f nm, d = %.1f um, f = %.2f kHz, ' 'A = %.2f kPa, Q = %.1f nC/cm2)') ESTIM_CW_code = 'ESTIM_{}_CW_{:.1f}mA_per_m2_{:.0f}ms' ESTIM_PW_code = 'ESTIM_{}_PW_{:.1f}mA_per_m2_{:.0f}ms_PRF{:.2f}kHz_DF{:.2f}' ESTIM_CW_log = '%s neuron - CW E-STIM simulation %u/%u (A = %.1f mA/m2, t = %.1f ms)' ESTIM_PW_log = ('%s neuron - PW E-STIM simulation %u/%u (A = %.1f mA/m2, t = %.1f ms, ' 'PRF = %.2f kHz, DF = %.2f)') ASTIM_CW_code = 'ASTIM_{}_CW_{:.0f}nm_{:.0f}kHz_{:.0f}kPa_{:.0f}ms_{}' ASTIM_PW_code = 'ASTIM_{}_PW_{:.0f}nm_{:.0f}kHz_{:.0f}kPa_{:.0f}ms_PRF{:.2f}kHz_DF{:.2f}_{}' ASTIM_CW_log = ('%s neuron - CW A-STIM %s simulation %u/%u (a = %.1f nm, f = %.2f kHz, ' 'A = %.2f kPa, t = %.2f ms') ASTIM_PW_log = ('%s neuron - PW A-STIM %s simulation %u/%u (a = %.1f nm, f = %.2f kHz, ' 'A = %.2f kPa, t = %.2f ms, PRF = %.2f kHz, DF = %.2f)') ASTIM_titration_log = '%s neuron - A-STIM titration %u/%u (a = %.1f nm, %s)' ESTIM_titration_log = '%s neuron - E-STIM titration %u/%u (%s)' # Define parameters units ASTIM_params = { 'f': {'index': 0, 'factor': 1e-3, 'unit': 'kHz'}, 'A': {'index': 1, 'factor': 1e-3, 'unit': 'kPa'}, 't': {'index': 2, 'factor': 1e3, 'unit': 'ms'}, 'PRF': {'index': 4, 'factor': 1e-3, 'unit': 'kHz'}, 'DF': {'index': 5, 'factor': 1e2, 'unit': '%'} } ESTIM_params = { 'A': {'index': 0, 'factor': 1e0, 'unit': 'mA/m2'}, 't': {'index': 1, 'factor': 1e3, 'unit': 'ms'}, 'PRF': {'index': 3, 'factor': 1e-3, 'unit': 'kHz'}, 'DF': {'index': 4, 'factor': 1e2, 'unit': '%'} } def checkBatchLog(batch_type): ''' Determine batch directory, and add a log file to the directory if it is absent. :param batch_type: name of the log file to search for :return: 2-tuple with full paths to batch directory and log file ''' # Get batch directory from user root = tk.Tk() root.withdraw() batch_dir = filedialog.askdirectory() assert batch_dir, 'No batch directory chosen' # Determine log template from batch type if batch_type == 'mech': logfile = 'log_mech.xlsx' elif batch_type == 'A-STIM': logfile = 'log_ASTIM.xlsx' elif batch_type == 'E-STIM': logfile = 'log_ESTIM.xlsx' else: raise ValueError('Unknown batch type', batch_type) this_dir, _ = os.path.split(__file__) parent_dir = os.path.abspath(os.path.join(this_dir, os.pardir)) logsrc = parent_dir + '/templates/' + logfile assert os.path.isfile(logsrc), 'template log file "{}" not found'.format(logsrc) # Copy template in batch directory if no appropriate log file logdst = batch_dir + '/' + logfile if not os.path.isfile(logdst): shutil.copy2(logsrc, logdst) return (batch_dir, logdst) def createSimQueue(amps, durations, offsets, PRFs, DFs): ''' Create a serialized 2D array of all parameter combinations for a series of individual parameter sweeps, while avoiding repetition of CW protocols for a given PRF sweep. :param amps: list (or 1D-array) of acoustic amplitudes :param durations: list (or 1D-array) of stimulus durations :param offsets: list (or 1D-array) of stimulus offsets (paired with durations array) :param PRFs: list (or 1D-array) of pulse-repetition frequencies :param DFs: list (or 1D-array) of duty cycle values :return: 2D-array with (amplitude, duration, offset, PRF, DF) for each stimulation protocol ''' # Convert input to 1D-arrays amps = np.array(amps) durations = np.array(durations) offsets = np.array(offsets) PRFs = np.array(PRFs) DFs = np.array(DFs) # Create index arrays iamps = range(len(amps)) idurs = range(len(durations)) # Create empty output matrix queue = np.empty((1, 5)) # Continuous protocols if 1.0 in DFs: nCW = len(amps) * len(durations) arr1 = np.ones(nCW) iCW_queue = np.array(np.meshgrid(iamps, idurs)).T.reshape(nCW, 2) - CW_queue = np.hstack((amps[iCW_queue[:, 0]], durations[iCW_queue[:, 1]], - offsets[iCW_queue[:, 1]], PRFs.min() * arr1, arr1)) + CW_queue = np.vstack((amps[iCW_queue[:, 0]], durations[iCW_queue[:, 1]], + offsets[iCW_queue[:, 1]], PRFs.min() * arr1, arr1)).T queue = np.vstack((queue, CW_queue)) # Pulsed protocols if np.any(DFs != 1.0): pulsed_DFs = DFs[DFs != 1.0] iPRFs = range(len(PRFs)) ipulsed_DFs = range(len(pulsed_DFs)) nPW = len(amps) * len(durations) * len(PRFs) * len(pulsed_DFs) iPW_queue = np.array(np.meshgrid(iamps, idurs, iPRFs, ipulsed_DFs)).T.reshape(nPW, 4) - PW_queue = np.hstack((amps[iPW_queue[:, 0]], durations[iPW_queue[:, 1]], + PW_queue = np.vstack((amps[iPW_queue[:, 0]], durations[iPW_queue[:, 1]], offsets[iPW_queue[:, 1]], PRFs[iPW_queue[:, 2]], - pulsed_DFs[iPW_queue[:, 3]])) + pulsed_DFs[iPW_queue[:, 3]])).T queue = np.vstack((queue, PW_queue)) # Return return queue[1:, :] def xlslog(filename, sheetname, data): """ Append log data on a new row to specific sheet of excel workbook. :param filename: absolute or relative path to the Excel workbook :param sheetname: name of the Excel spreadsheet to which data is appended :param data: data structure to be added to specific columns on a new row :return: boolean indicating success (1) or failure (0) of operation """ try: wb = load_workbook(filename) ws = wb[sheetname] keys = data.keys() i = 1 row_data = {} for k in keys: row_data[k] = data[k] i += 1 ws.append(row_data) wb.save(filename) return 1 except PermissionError: # If file cannot be accessed for writing because already opened logger.error('Cannot write to "%s". Close the file and type "Y"', filename) user_str = input() if user_str in ['y', 'Y']: return xlslog(filename, sheetname, data) else: return 0 def detectPeaks(x, mph=None, mpd=1, threshold=0, edge='rising', kpsh=False, valley=False, ax=None): """ Detect peaks in data based on their amplitude and inter-peak distance. """ x = np.atleast_1d(x).astype('float64') if x.size < 3: return np.array([], dtype=int) if valley: x = -x # find indices of all peaks dx = x[1:] - x[:-1] # handle NaN's indnan = np.where(np.isnan(x))[0] if indnan.size: x[indnan] = np.inf dx[np.where(np.isnan(dx))[0]] = np.inf ine, ire, ife = np.array([[], [], []], dtype=int) if not edge: ine = np.where((np.hstack((dx, 0)) < 0) & (np.hstack((0, dx)) > 0))[0] else: if edge.lower() in ['rising', 'both']: ire = np.where((np.hstack((dx, 0)) <= 0) & (np.hstack((0, dx)) > 0))[0] if edge.lower() in ['falling', 'both']: ife = np.where((np.hstack((dx, 0)) < 0) & (np.hstack((0, dx)) >= 0))[0] ind = np.unique(np.hstack((ine, ire, ife))) # handle NaN's if ind.size and indnan.size: # NaN's and values close to NaN's cannot be peaks ind = ind[np.in1d(ind, np.unique(np.hstack((indnan, indnan - 1, indnan + 1))), invert=True)] # first and last values of x cannot be peaks if ind.size and ind[0] == 0: ind = ind[1:] if ind.size and ind[-1] == x.size - 1: ind = ind[:-1] # remove peaks < minimum peak height if ind.size and mph is not None: ind = ind[x[ind] >= mph] # remove peaks - neighbors < threshold if ind.size and threshold > 0: dx = np.min(np.vstack([x[ind] - x[ind - 1], x[ind] - x[ind + 1]]), axis=0) ind = np.delete(ind, np.where(dx < threshold)[0]) # detect small peaks closer than minimum peak distance if ind.size and mpd > 1: ind = ind[np.argsort(x[ind])][::-1] # sort ind by peak height idel = np.zeros(ind.size, dtype=bool) for i in range(ind.size): if not idel[i]: # keep peaks with the same height if kpsh is True idel = idel | (ind >= ind[i] - mpd) & (ind <= ind[i] + mpd) \ & (x[ind[i]] > x[ind] if kpsh else True) idel[i] = 0 # Keep current peak # remove the small peaks and sort back the indices by their occurrence ind = np.sort(ind[~idel]) return ind def detectPeaksTime(t, y, mph, mtd): """ Extension of the detectPeaks function to detect peaks in data based on their amplitude and time difference, with a non-uniform time vector. :param t: time vector (not necessarily uniform) :param y: signal :param mph: minimal peak height :param mtd: minimal time difference :return: array of peak indexes """ # Detect peaks on signal with no restriction on inter-peak distance raw_indexes = detectPeaks(y, mph, mpd=1) if raw_indexes.size > 0: # Filter relevant peaks with temporal distance n_raw = raw_indexes.size filtered_indexes = np.array([raw_indexes[0]]) for i in range(1, n_raw): i1 = filtered_indexes[-1] i2 = raw_indexes[i] if t[i2] - t[i1] < mtd: if y[i2] > y[i1]: filtered_indexes[-1] = i2 else: filtered_indexes = np.append(filtered_indexes, i2) # Return peak indexes return filtered_indexes else: return None def detectSpikes(t, Qm, min_amp, min_dt): ''' Detect spikes on a charge density signal, and return their number, latency and rate. :param t: time vector (s) :param Qm: charge density vector (C/m2) :param min_amp: minimal charge amplitude to detect spikes (C/m2) :param min_dt: minimal time interval between 2 spikes (s) :return: 3-tuple with number of spikes, latency (s) and spike rate (sp/s) ''' i_spikes = detectPeaksTime(t, Qm, min_amp, min_dt) if i_spikes is not None: latency = t[i_spikes[0]] # s n_spikes = i_spikes.size if n_spikes > 1: first_to_last_spike = t[i_spikes[-1]] - t[i_spikes[0]] # s spike_rate = n_spikes / first_to_last_spike # spikes/s else: spike_rate = 'N/A' else: latency = 'N/A' spike_rate = 'N/A' n_spikes = 0 return (n_spikes, latency, spike_rate) def runEStimBatch(batch_dir, log_filepath, neurons, stim_params): ''' Run batch E-STIM simulations of the system for various neuron types and stimulation parameters. :param batch_dir: full path to output directory of batch :param log_filepath: full path log file of batch :param neurons: array of channel mechanisms :param stim_params: dictionary containing sweeps for all stimulation parameters ''' logger.info("Starting E-STIM simulation batch") # Generate simulations queue sim_queue = createSimQueue(stim_params['amps'], stim_params['durations'], stim_params['offsets'], stim_params['PRFs'], stim_params['DFs']) nqueue = sim_queue.shape[0] # Initialize solver solver = SolverElec() # Run simulations nsims = len(neurons) * nqueue simcount = 0 filepaths = [] for ch_mech in neurons: try: for i in range(nqueue): simcount += 1 Astim, tstim, toffset, PRF, DF = sim_queue[i, :] # Get date and time info date_str = time.strftime("%Y.%m.%d") daytime_str = time.strftime("%H:%M:%S") # Log and define naming if DF == 1.0: logger.info(ESTIM_CW_log, ch_mech.name, simcount, nsims, Astim, tstim * 1e3) simcode = ESTIM_CW_code.format(ch_mech.name, Astim, tstim * 1e3) else: logger.info(ESTIM_PW_log, ch_mech.name, simcount, nsims, Astim, tstim * 1e3, PRF * 1e-3, DF) simcode = ESTIM_PW_code.format(ch_mech.name, Astim, tstim * 1e3, PRF * 1e-3, DF) # Run simulation tstart = time.time() (t, y, states) = solver.run(ch_mech, Astim, tstim, toffset, PRF, DF) Vm, *channels = y tcomp = time.time() - tstart logger.info('completed in %.2f seconds', tcomp) # Store data in dictionary data = { 'Astim': Astim, 'tstim': tstim, 'toffset': toffset, 'PRF': PRF, 'DF': DF, 't': t, 'states': states, 'Vm': Vm } for j in range(len(ch_mech.states_names)): data[ch_mech.states_names[j]] = channels[j] # Export data to PKL file output_filepath = batch_dir + '/' + simcode + ".pkl" with open(output_filepath, 'wb') as fh: pickle.dump(data, fh) logger.info('simulation data exported to "%s"', output_filepath) filepaths.append(output_filepath) # Detect spikes on Vm signal n_spikes, lat, sr = detectSpikes(t, Vm, SPIKE_MIN_VAMP, SPIKE_MIN_DT) logger.info('%u spike%s detected', n_spikes, "s" if n_spikes > 1 else "") # Export key metrics to log file log = { 'A': date_str, 'B': daytime_str, 'C': ch_mech.name, 'D': Astim, 'E': tstim * 1e3, 'F': PRF * 1e-3 if DF < 1 else 'N/A', 'G': DF, 'H': t.size, 'I': round(tcomp, 2), 'J': n_spikes, 'K': lat * 1e3 if isinstance(lat, float) else 'N/A', 'L': sr * 1e-3 if isinstance(sr, float) else 'N/A' } if xlslog(log_filepath, 'Data', log) == 1: logger.info('log exported to "%s"', log_filepath) else: logger.error('log export to "%s" aborted', log_filepath) except AssertionError as err: logger.error(err) return filepaths def runAStimBatch(batch_dir, log_filepath, neurons, bls_params, geom, stim_params, sim_type='effective'): ''' Run batch simulations of the system for various neuron types, sonophore and stimulation parameters. :param batch_dir: full path to output directory of batch :param log_filepath: full path log file of batch :param neurons: array of channel mechanisms :param bls_params: BLS biomechanical and biophysical parameters dictionary :param geom: BLS geometric constants dictionary :param stim_params: dictionary containing sweeps for all stimulation parameters :param sim_type: selected integration method ''' logger.info("Starting A-STIM simulation batch") # Unpack geometrical parameters a = geom['a'] d = geom['d'] # Generate simulations queue sim_queue = createSimQueue(stim_params['amps'], stim_params['durations'], stim_params['offsets'], stim_params['PRFs'], stim_params['DFs']) nqueue = sim_queue.shape[0] # Run simulations nsims = len(neurons) * len(stim_params['freqs']) * nqueue simcount = 0 filepaths = [] for ch_mech in neurons: for Fdrive in stim_params['freqs']: try: # Create SolverUS instance (modulus of embedding tissue depends on frequency) solver = SolverUS(geom, bls_params, ch_mech, Fdrive) for i in range(nqueue): simcount += 1 Adrive, tstim, toffset, PRF, DF = sim_queue[i, :] # Get date and time info date_str = time.strftime("%Y.%m.%d") daytime_str = time.strftime("%H:%M:%S") # Log and define naming if DF == 1.0: logger.info(ASTIM_CW_log, ch_mech.name, sim_type, simcount, nsims, a * 1e9, Fdrive * 1e-3, Adrive * 1e-3, tstim * 1e3) simcode = ASTIM_CW_code.format(ch_mech.name, a * 1e9, Fdrive * 1e-3, Adrive * 1e-3, tstim * 1e3, sim_type) else: logger.info(ASTIM_PW_log, ch_mech.name, sim_type, simcount, nsims, a * 1e9, Fdrive * 1e-3, Adrive * 1e-3, tstim * 1e3, PRF * 1e-3, DF) simcode = ASTIM_PW_code.format(ch_mech.name, a * 1e9, Fdrive * 1e-3, Adrive * 1e-3, tstim * 1e3, PRF * 1e-3, DF, sim_type) # Run simulation tstart = time.time() (t, y, states) = solver.run(ch_mech, Fdrive, Adrive, tstim, toffset, PRF, DF, sim_type) Z, ng, Qm, *channels = y U = np.insert(np.diff(Z) / np.diff(t), 0, 0.0) tcomp = time.time() - tstart logger.info('completed in %.2f seconds', tcomp) # Store data in dictionary bls_params['biophys']['Qm0'] = solver.Qm0 data = { 'a': a, 'd': d, 'params': bls_params, 'Fdrive': Fdrive, 'Adrive': Adrive, 'phi': np.pi, 'tstim': tstim, 'toffset': toffset, 'PRF': PRF, 'DF': DF, 't': t, 'states': states, 'U': U, 'Z': Z, 'ng': ng, 'Qm': Qm, 'Vm': Qm * 1e3 / np.array([solver.Capct(ZZ) for ZZ in Z]) } for j in range(len(ch_mech.states_names)): data[ch_mech.states_names[j]] = channels[j] # Export data to PKL file output_filepath = batch_dir + '/' + simcode + ".pkl" with open(output_filepath, 'wb') as fh: pickle.dump(data, fh) logger.info('simulation data exported to "%s"', output_filepath) filepaths.append(output_filepath) # Detect spikes on Qm signal n_spikes, lat, sr = detectSpikes(t, Qm, SPIKE_MIN_QAMP, SPIKE_MIN_DT) logger.info('%u spike%s detected', n_spikes, "s" if n_spikes > 1 else "") # Export key metrics to log file log = { 'A': date_str, 'B': daytime_str, 'C': ch_mech.name, 'D': a * 1e9, 'E': d * 1e6, 'F': Fdrive * 1e-3, 'G': Adrive * 1e-3, 'H': tstim * 1e3, 'I': PRF * 1e-3 if DF < 1 else 'N/A', 'J': DF, 'K': sim_type, 'L': t.size, 'M': round(tcomp, 2), 'N': n_spikes, 'O': lat * 1e3 if isinstance(lat, float) else 'N/A', 'P': sr * 1e-3 if isinstance(sr, float) else 'N/A' } if xlslog(log_filepath, 'Data', log) == 1: logger.info('log exported to "%s"', log_filepath) else: logger.error('log export to "%s" aborted', log_filepath) except AssertionError as err: logger.error(err) return filepaths def titrateEStim(solver, ch_mech, Astim, tstim, toffset, PRF=1.5e3, DF=1.0): """ Use a dichotomic recursive search to determine the threshold value of a specific electric stimulation parameter needed to obtain neural excitation, keeping all other parameters fixed. The titration parameter can be stimulation amplitude, duration or any variable for which the number of spikes is a monotonically increasing function. This function is called recursively until an accurate threshold is found. :param solver: solver instance :param ch_mech: channels mechanism object :param Astim: injected current density amplitude (mA/m2) :param tstim: duration of US stimulation (s) :param toffset: duration of the offset (s) :param PRF: pulse repetition frequency (Hz) :param DF: pulse duty factor (-) :return: 5-tuple with the determined amplitude threshold, time profile, solution matrix, state vector and response latency """ # Determine titration type if isinstance(Astim, tuple): t_type = 'A' interval = Astim thr = TITRATION_ESTIM_DA_MAX elif isinstance(tstim, tuple): t_type = 't' interval = tstim thr = TITRATION_DT_THR elif isinstance(DF, tuple): t_type = 'DF' interval = DF thr = TITRATION_DDF_THR else: logger.error('Invalid titration type') return 0. t_var = ESTIM_params[t_type] # Check amplitude interval and define current value assert interval[0] < interval[1], '{} interval must be defined as (lb, ub)'.format(t_type) value = (interval[0] + interval[1]) / 2 # Define stimulation parameters if t_type == 'A': stim_params = [value, tstim, toffset, PRF, DF] elif t_type == 't': stim_params = [Astim, value, toffset, PRF, DF] elif t_type == 'DF': stim_params = [Astim, tstim, toffset, PRF, value] # Run simulation and detect spikes (t, y, states) = solver.run(ch_mech, *stim_params) n_spikes, latency, _ = detectSpikes(t, y[0, :], SPIKE_MIN_VAMP, SPIKE_MIN_DT) logger.info('%.2f %s ---> %u spike%s detected', value * t_var['factor'], t_var['unit'], n_spikes, "s" if n_spikes > 1 else "") # If accurate threshold is found, return simulation results if (interval[1] - interval[0]) <= thr and n_spikes == 1: return (value, t, y, states, latency) # Otherwise, refine titration interval and iterate recursively else: if n_spikes == 0: new_interval = (value, interval[1]) else: new_interval = (interval[0], value) stim_params[t_var['index']] = new_interval return titrateEStim(solver, ch_mech, *stim_params) def titrateAStim(solver, ch_mech, Fdrive, Adrive, tstim, toffset, PRF=1.5e3, DF=1.0, sim_type='effective'): """ Use a dichotomic recursive search to determine the threshold value of a specific acoustic stimulation parameter needed to obtain neural excitation, keeping all other parameters fixed. The titration parameter can be stimulation amplitude, duration or any variable for which the number of spikes is a monotonically increasing function. This function is called recursively until an accurate threshold is found. :param solver: solver instance :param ch_mech: channels mechanism object :param Fdrive: acoustic drive frequency (Hz) :param Adrive: acoustic drive amplitude (Pa) :param tstim: duration of US stimulation (s) :param toffset: duration of the offset (s) :param PRF: pulse repetition frequency (Hz) :param DF: pulse duty factor (-) :param sim_type: selected integration method :return: 5-tuple with the determined amplitude threshold, time profile, solution matrix, state vector and response latency """ # Determine titration type if isinstance(Adrive, tuple): t_type = 'A' interval = Adrive thr = TITRATION_ASTIM_DA_MAX elif isinstance(tstim, tuple): t_type = 't' interval = tstim thr = TITRATION_DT_THR elif isinstance(DF, tuple): t_type = 'DF' interval = DF thr = TITRATION_DDF_THR else: logger.error('Invalid titration type') return 0. t_var = ASTIM_params[t_type] # Check amplitude interval and define current value assert interval[0] < interval[1], '{} interval must be defined as (lb, ub)'.format(t_type) value = (interval[0] + interval[1]) / 2 # Define stimulation parameters if t_type == 'A': stim_params = [Fdrive, value, tstim, toffset, PRF, DF] elif t_type == 't': stim_params = [Fdrive, Adrive, value, toffset, PRF, DF] elif t_type == 'DF': stim_params = [Fdrive, Adrive, tstim, toffset, PRF, value] # Run simulation and detect spikes (t, y, states) = solver.run(ch_mech, *stim_params, sim_type) n_spikes, latency, _ = detectSpikes(t, y[2, :], SPIKE_MIN_QAMP, SPIKE_MIN_DT) logger.info('%.2f %s ---> %u spike%s detected', value * t_var['factor'], t_var['unit'], n_spikes, "s" if n_spikes > 1 else "") # If accurate threshold is found, return simulation results if (interval[1] - interval[0]) <= thr and n_spikes == 1: return (value, t, y, states, latency) # Otherwise, refine titration interval and iterate recursively else: if n_spikes == 0: new_interval = (value, interval[1]) else: new_interval = (interval[0], value) stim_params[t_var['index']] = new_interval return titrateAStim(solver, ch_mech, *stim_params, sim_type) def titrateAStimBatch(batch_dir, log_filepath, neurons, bls_params, geom, stim_params): ''' Run batch acoustic titrations of the system for various neuron types, sonophore and stimulation parameters, to determine the threshold of a specific stimulus parameter for neural excitation. :param batch_dir: full path to output directory of batch :param log_filepath: full path log file of batch :param neurons: array of channel mechanisms :param bls_params: BLS biomechanical and biophysical parameters dictionary :param geom: BLS geometric constants dictionary :param stim_params: dictionary containing sweeps for all stimulation parameters ''' logger.info("Starting A-STIM titration batch") # Unpack geometrical parameters a = geom['a'] d = geom['d'] # Define default parameters sim_type = 'effective' offset = 30e-3 # Determine titration parameter and titrations list if 'durations' not in stim_params: t_type = 't' sim_queue = createSimQueue(stim_params['amps'], [None], [offset], stim_params['PRFs'], stim_params['DFs']) # sim_queue = np.delete(sim_queue, 1, axis=1) elif 'amps' not in stim_params: t_type = 'A' sim_queue = createSimQueue([None], stim_params['durations'], [offset] * len(stim_params['durations']), stim_params['PRFs'], stim_params['DFs']) elif 'DF' not in stim_params: t_type = 'DF' sim_queue = createSimQueue(stim_params['amps'], stim_params['durations'], [offset] * len(stim_params['durations']), stim_params['PRFs'], [None]) nqueue = sim_queue.shape[0] t_var = ASTIM_params[t_type] # Run titrations nsims = len(neurons) * len(stim_params['freqs']) * nqueue simcount = 0 filepaths = [] for ch_mech in neurons: for Fdrive in stim_params['freqs']: try: # Create SolverUS instance (modulus of embedding tissue depends on frequency) solver = SolverUS(geom, bls_params, ch_mech, Fdrive) for i in range(nqueue): simcount += 1 # Extract parameters Adrive, tstim, toffset, PRF, DF = sim_queue[i, :] if Adrive is None: Adrive = (0., 2 * TITRATION_ASTIM_A_MAX) elif tstim is None: tstim = (0., 2 * TITRATION_T_MAX) elif DF is None: DF = (0., 2 * TITRATION_DF_MAX) curr_params = [Fdrive, Adrive, tstim, PRF, DF] # Generate log str log_str = '' pnames = list(ASTIM_params.keys()) j = 0 for cp in curr_params: pn = pnames[j] pi = ASTIM_params[pn] if not isinstance(cp, tuple): if log_str: log_str += ', ' log_str += '{} = {:.2f} {}'.format(pn, pi['factor'] * cp, pi['unit']) j += 1 # Get date and time info date_str = time.strftime("%Y.%m.%d") daytime_str = time.strftime("%H:%M:%S") # Log logger.info(ASTIM_titration_log, ch_mech.name, simcount, nsims, a * 1e9, log_str) # Run titration tstart = time.time() (output_thr, t, y, states, lat) = titrateAStim(solver, ch_mech, Fdrive, Adrive, tstim, toffset, PRF, DF) Z, ng, Qm, *channels = y U = np.insert(np.diff(Z) / np.diff(t), 0, 0.0) tcomp = time.time() - tstart logger.info('completed in %.2f s, threshold = %.2f %s', tcomp, output_thr * t_var['factor'], t_var['unit']) # Determine output variable if t_type == 'A': Adrive = output_thr elif t_type == 't': tstim = output_thr elif t_type == 'DF': DF = output_thr # Define output naming if DF == 1.0: simcode = ASTIM_CW_code.format(ch_mech.name, a * 1e9, Fdrive * 1e-3, Adrive * 1e-3, tstim * 1e3, sim_type) else: simcode = ASTIM_PW_code.format(ch_mech.name, a * 1e9, Fdrive * 1e-3, Adrive * 1e-3, tstim * 1e3, PRF * 1e-3, DF, sim_type) # Store data in dictionary bls_params['biophys']['Qm0'] = solver.Qm0 data = { 'a': a, 'd': d, 'params': bls_params, 'Fdrive': Fdrive, 'Adrive': Adrive, 'phi': np.pi, 'tstim': tstim, 'toffset': toffset, 'PRF': PRF, 'DF': DF, 't': t, 'states': states, 'U': U, 'Z': Z, 'ng': ng, 'Qm': Qm, 'Vm': Qm * 1e3 / np.array([solver.Capct(ZZ) for ZZ in Z]) } for j in range(len(ch_mech.states_names)): data[ch_mech.states_names[j]] = channels[j] # Export data to PKL file output_filepath = batch_dir + '/' + simcode + ".pkl" with open(output_filepath, 'wb') as fh: pickle.dump(data, fh) logger.info('simulation data exported to "%s"', output_filepath) filepaths.append(output_filepath) # Detect spikes on Qm signal n_spikes, lat, sr = detectSpikes(t, Qm, SPIKE_MIN_QAMP, SPIKE_MIN_DT) logger.info('%u spike%s detected', n_spikes, "s" if n_spikes > 1 else "") # Export key metrics to log file log = { 'A': date_str, 'B': daytime_str, 'C': ch_mech.name, 'D': a * 1e9, 'E': d * 1e6, 'F': Fdrive * 1e-3, 'G': Adrive * 1e-3, 'H': tstim * 1e3, 'I': PRF * 1e-3 if DF < 1 else 'N/A', 'J': DF, 'K': sim_type, 'L': t.size, 'M': round(tcomp, 2), 'N': n_spikes, 'O': lat * 1e3 if isinstance(lat, float) else 'N/A', 'P': sr * 1e-3 if isinstance(sr, float) else 'N/A' } if xlslog(log_filepath, 'Data', log) == 1: logger.info('log exported to "%s"', log_filepath) else: logger.error('log export to "%s" aborted', log_filepath) except AssertionError as err: logger.error(err) return filepaths def titrateEStimBatch(batch_dir, log_filepath, neurons, stim_params): ''' Run batch electrical titrations of the system for various neuron types and stimulation parameters, to determine the threshold of a specific stimulus parameter for neural excitation. :param batch_dir: full path to output directory of batch :param log_filepath: full path log file of batch :param neurons: array of channel mechanisms :param stim_params: dictionary containing sweeps for all stimulation parameters ''' logger.info("Starting E-STIM titration batch") # Define default parameters offset = 30e-3 # Determine titration parameter and titrations list if 'durations' not in stim_params: t_type = 't' sim_queue = createSimQueue(stim_params['amps'], [None], [offset], stim_params['PRFs'], stim_params['DFs']) elif 'amps' not in stim_params: t_type = 'A' sim_queue = createSimQueue([None], stim_params['durations'], [offset] * len(stim_params['durations']), stim_params['PRFs'], stim_params['DFs']) elif 'DF' not in stim_params: t_type = 'DF' sim_queue = createSimQueue(stim_params['amps'], stim_params['durations'], [offset] * len(stim_params['durations']), stim_params['PRFs'], [None]) nqueue = sim_queue.shape[0] t_var = ESTIM_params[t_type] # Create SolverElec instance solver = SolverElec() # Run titrations nsims = len(neurons) * nqueue simcount = 0 filepaths = [] for ch_mech in neurons: try: for i in range(nqueue): simcount += 1 # Extract parameters Astim, tstim, toffset, PRF, DF = sim_queue[i, :] if Astim is None: Astim = (0., 2 * TITRATION_ESTIM_A_MAX) elif tstim is None: tstim = (0., 2 * TITRATION_T_MAX) elif DF is None: DF = (0., 2 * TITRATION_DF_MAX) curr_params = [Astim, tstim, PRF, DF] # Generate log str log_str = '' pnames = list(ESTIM_params.keys()) j = 0 for cp in curr_params: pn = pnames[j] pi = ESTIM_params[pn] if not isinstance(cp, tuple): if log_str: log_str += ', ' log_str += '{} = {:.2f} {}'.format(pn, pi['factor'] * cp, pi['unit']) j += 1 # Get date and time info date_str = time.strftime("%Y.%m.%d") daytime_str = time.strftime("%H:%M:%S") # Log logger.info(ESTIM_titration_log, ch_mech.name, simcount, nsims, log_str) # Run titration tstart = time.time() (output_thr, t, y, states, lat) = titrateEStim(solver, ch_mech, Astim, tstim, toffset, PRF, DF) Vm, *channels = y tcomp = time.time() - tstart logger.info('completed in %.2f s, threshold = %.2f %s', tcomp, output_thr * t_var['factor'], t_var['unit']) # Determine output variable if t_type == 'A': Astim = output_thr elif t_type == 't': tstim = output_thr elif t_type == 'DF': DF = output_thr # Define output naming if DF == 1.0: simcode = ESTIM_CW_code.format(ch_mech.name, Astim, tstim * 1e3) else: simcode = ESTIM_PW_code.format(ch_mech.name, Astim, tstim * 1e3, PRF * 1e-3, DF) # Store data in dictionary data = { 'Astim': Astim, 'tstim': tstim, 'toffset': toffset, 'PRF': PRF, 'DF': DF, 't': t, 'states': states, 'Vm': Vm } for j in range(len(ch_mech.states_names)): data[ch_mech.states_names[j]] = channels[j] # Export data to PKL file output_filepath = batch_dir + '/' + simcode + ".pkl" with open(output_filepath, 'wb') as fh: pickle.dump(data, fh) logger.info('simulation data exported to "%s"', output_filepath) filepaths.append(output_filepath) # Detect spikes on Qm signal n_spikes, lat, sr = detectSpikes(t, Vm, SPIKE_MIN_VAMP, SPIKE_MIN_DT) logger.info('%u spike%s detected', n_spikes, "s" if n_spikes > 1 else "") # Export key metrics to log file log = { 'A': date_str, 'B': daytime_str, 'C': ch_mech.name, 'D': Astim, 'E': tstim * 1e3, 'F': PRF * 1e-3 if DF < 1 else 'N/A', 'G': DF, 'H': t.size, 'I': round(tcomp, 2), 'J': n_spikes, 'K': lat * 1e3 if isinstance(lat, float) else 'N/A', 'L': sr * 1e-3 if isinstance(sr, float) else 'N/A' } if xlslog(log_filepath, 'Data', log) == 1: logger.info('log exported to "%s"', log_filepath) else: logger.error('log export to "%s" aborted', log_filepath) except AssertionError as err: logger.error(err) return filepaths def runMechBatch(batch_dir, log_filepath, bls_params, geom, Cm0, Qm0, stim_params): ''' Run batch simulations of the mechanical system with imposed values of charge density, for various sonophore spans and stimulation parameters. :param batch_dir: full path to output directory of batch :param log_filepath: full path log file of batch :param bls_params: BLS biomechanical and biophysical parameters dictionary :param geom: BLS geometric constants dictionary :param Cm0: membrane resting capacitance (F/m2) :param Qm0: membrane resting charge density (C/m2) :param stim_params: dictionary containing sweeps for all stimulation parameters ''' logger.info("Starting mechanical simulation batch") # Unpack geometrical and stimulation parameters a = geom['a'] d = geom['d'] amps = np.array(stim_params['amps']) charges = np.array(stim_params['charges']) # Generate simulations queue nA = len(amps) nQ = len(charges) sim_queue = np.array(np.meshgrid(amps, charges)).T.reshape(nA * nQ, 2) nqueue = sim_queue.shape[0] # Run simulations nsims = len(stim_params['freqs']) * nqueue simcount = 0 filepaths = [] for Fdrive in stim_params['freqs']: try: # Create BilayerSonophore instance (modulus of embedding tissue depends on frequency) bls = BilayerSonophore(geom, bls_params, Fdrive, Cm0, Qm0) for i in range(nqueue): simcount += 1 Adrive, Qm = sim_queue[i, :] # Get date and time info date_str = time.strftime("%Y.%m.%d") daytime_str = time.strftime("%H:%M:%S") # Log and define naming logger.info(MECH_log, simcount, nsims, a * 1e9, d * 1e6, Fdrive * 1e-3, Adrive * 1e-3, Qm * 1e5) simcode = MECH_code.format(a * 1e9, Fdrive * 1e-3, Adrive * 1e-3, Qm * 1e5) # Run simulation tstart = time.time() (t, y, states) = bls.runMech(Fdrive, Adrive, Qm) (Z, ng) = y U = np.insert(np.diff(Z) / np.diff(t), 0, 0.0) tcomp = time.time() - tstart logger.info('completed in %.2f seconds', tcomp) # Store data in dictionary data = { 'a': a, 'd': d, 'Cm0': Cm0, 'Qm0': Qm0, 'params': bls_params, 'Fdrive': Fdrive, 'Adrive': Adrive, 'phi': np.pi, 'Qm': Qm, 't': t, 'states': states, 'U': U, 'Z': Z, 'ng': ng } # Export data to PKL file output_filepath = batch_dir + '/' + simcode + ".pkl" with open(output_filepath, 'wb') as fh: pickle.dump(data, fh) logger.info('simulation data exported to "%s"', output_filepath) filepaths.append(output_filepath) # Compute key output metrics Zmax = np.amax(Z) Zmin = np.amin(Z) Zabs_max = np.amax(np.abs([Zmin, Zmax])) eAmax = bls.arealstrain(Zabs_max) Tmax = bls.TEtot(Zabs_max) Pmmax = bls.PMavgpred(Zmin) ngmax = np.amax(ng) dUdtmax = np.amax(np.abs(np.diff(U) / np.diff(t)**2)) # Export key metrics to log file log = { 'A': date_str, 'B': daytime_str, 'C': a * 1e9, 'D': d * 1e6, 'E': Fdrive * 1e-3, 'F': Adrive * 1e-3, 'G': Qm * 1e5, 'H': t.size, 'I': tcomp, 'J': bls.kA + bls.kA_tissue, 'K': Zmax * 1e9, 'L': eAmax, 'M': Tmax * 1e3, 'N': (ngmax - bls.ng0) / bls.ng0, 'O': Pmmax * 1e-3, 'P': dUdtmax } if xlslog(log_filepath, 'Data', log) == 1: logger.info('log exported to "%s"', log_filepath) else: logger.error('log export to "%s" aborted', log_filepath) except AssertionError as err: logger.error(err) return filepaths diff --git a/plot/plot_comp.py b/plot/plot_comp.py index 36dc5be..4e6d3a7 100644 --- a/plot/plot_comp.py +++ b/plot/plot_comp.py @@ -1,25 +1,26 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Date: 2017-02-13 12:41:26 # @Email: theo.lemaire@epfl.ch # @Last Modified by: Theo Lemaire -# @Last Modified time: 2017-08-25 17:09:22 +# @Last Modified time: 2017-08-25 17:52:39 """ Compare profiles of several specific output variables of NICE simulations. """ from PointNICE.utils import OpenFilesDialog from PointNICE.plt import plotComp # List of variables to plot -yvars = ['Pac', 'Pmavg', 'Telastic', 'Vm', 'iL', 'iNet'] -vars_mech = ['Pac', 'Z', 'ng'] +ASTIM_vars = ['Pac', 'Pmavg', 'Telastic', 'Vm', 'iL', 'iNet'] +ESTIM_vars = ['Vm', 'iL', 'iNet'] +MECH_vars = ['Pac', 'Z', 'ng'] # Select data files pkl_filepaths, _ = OpenFilesDialog('pkl') if not pkl_filepaths: print('error: no input file') quit() # Comparative plot -plotComp(vars_mech, pkl_filepaths) +plotComp(ESTIM_vars, pkl_filepaths) diff --git a/sim/ASTIM_batch.py b/sim/ASTIM_batch.py index 4d9506e..9272492 100644 --- a/sim/ASTIM_batch.py +++ b/sim/ASTIM_batch.py @@ -1,97 +1,97 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Date: 2017-02-13 18:16:09 # @Email: theo.lemaire@epfl.ch # @Last Modified by: Theo Lemaire -# @Last Modified time: 2017-08-25 14:48:38 +# @Last Modified time: 2017-08-25 17:39:24 """ Run batch acoustic simulations of specific "point-neuron" models. """ import os import logging import numpy as np from PointNICE.solvers import checkBatchLog, runAStimBatch from PointNICE.channels import * from PointNICE.utils import LoadParams from PointNICE.plt import plotBatch # Set logging options logging.basicConfig(format='%(asctime)s %(message)s', datefmt='%d/%m/%Y %H:%M:%S:') logger = logging.getLogger('PointNICE') logger.setLevel(logging.DEBUG) # BLS parameters bls_params = LoadParams() # Geometry of BLS structure a = 32e-9 # in-plane radius (m) d = 0.0e-6 # embedding tissue thickness (m) geom = {"a": a, "d": d} # Channels mechanisms neurons = [CorticalLTS()] # Stimulation parameters stim_params = { 'freqs': [3.5e5], # Hz - 'amps': [100e3], # Pa + 'amps': [150e3], # Pa 'durations': [50e-3], # s 'PRFs': [1e2], # Hz 'DFs': [0.5] } stim_params['offsets'] = [30e-3] * len(stim_params['durations']) # s # Select output directory try: (batch_dir, log_filepath) = checkBatchLog('A-STIM') except AssertionError as err: logger.error(err) quit() # Run A-STIM batch pkl_filepaths = runAStimBatch(batch_dir, log_filepath, neurons, bls_params, geom, stim_params) pkl_dir, _ = os.path.split(pkl_filepaths[0]) vars_RS_FS = { 'Z': ['Z'], 'Q_m': ['Qm'], 'i_{Na}\ kin.': ['m', 'h'], 'i_K\ kin.': ['n'], 'i_M\ kin.': ['p'], 'I': ['iNa', 'iK', 'iM', 'iL', 'iNet'] } vars_LTS = { 'Z': ['Z'], 'Q_m': ['Qm'], 'i_{Na}\ kin.': ['m', 'h'], 'i_K\ kin.': ['n'], 'i_M\ kin.': ['p'], 'i_T\ kin.': ['s', 'u'], 'I': ['iNa', 'iK', 'iM', 'iT', 'iL', 'iNet'] } vars_RE = { 'Z': ['Z'], 'Q_m': ['Qm'], 'i_{Na}\ kin.': ['m', 'h'], 'i_K\ kin.': ['n'], 'i_{TS}\ kin.': ['s', 'u'], 'I': ['iNa', 'iK', 'iTs', 'iL', 'iNet'] } vars_TC = { 'Z': ['Z'], 'Q_m': ['Qm'], 'i_{Na}\ kin.': ['m', 'h'], 'i_K\ kin.': ['n'], 'i_{T}\ kin.': ['s', 'u'], 'i_{H}\ kin.': ['O', 'OL', 'O + 2OL'], 'I': ['iNa', 'iK', 'iT', 'iH', 'iKL', 'iL', 'iNet'] } plotBatch(vars_LTS, pkl_dir, pkl_filepaths) diff --git a/sim/ESTIM_batch.py b/sim/ESTIM_batch.py index cc655b5..375764a 100644 --- a/sim/ESTIM_batch.py +++ b/sim/ESTIM_batch.py @@ -1,86 +1,86 @@ # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Date: 2017-08-24 11:55:07 # @Last Modified by: Theo Lemaire -# @Last Modified time: 2017-08-25 14:48:58 +# @Last Modified time: 2017-08-25 17:51:22 """ Run batch electrical simulations of specific "point-neuron" models. """ import os import logging from PointNICE.solvers import checkBatchLog, runEStimBatch from PointNICE.channels import * from PointNICE.plt import plotBatch # Set logging options logging.basicConfig(format='%(asctime)s %(message)s', datefmt='%d/%m/%Y %H:%M:%S:') logger = logging.getLogger('PointNICE') logger.setLevel(logging.DEBUG) # Channels mechanisms neurons = [LeechTouch()] # Stimulation parameters stim_params = { 'amps': [20.0], # mA/m2 'durations': [0.5], # s 'PRFs': [1e2], # Hz - 'DFs': [1.0] + 'DFs': [.5, 1.] } stim_params['offsets'] = [1.0] * len(stim_params['durations']) # s # Select output directory try: (batch_dir, log_filepath) = checkBatchLog('E-STIM') except AssertionError as err: logger.error(err) quit() # Run E-STIM batch pkl_filepaths = runEStimBatch(batch_dir, log_filepath, neurons, stim_params) pkl_dir, _ = os.path.split(pkl_filepaths[0]) vars_RS_FS = { 'V_m': ['Vm'], 'i_{Na}\ kin.': ['m', 'h'], 'i_L\ kin.': ['n'], 'i_M\ kin.': ['p'], 'I': ['iNa', 'iK', 'iM', 'iL', 'iNet'] } vars_LTS = { 'V_m': ['Vm'], 'i_{Na}\ kin.': ['m', 'h'], 'i_K\ kin.': ['n'], 'i_M\ kin.': ['p'], 'i_T\ kin.': ['s', 'u'], 'I': ['iNa', 'iK', 'iM', 'iT', 'iL', 'iNet'] } vars_RE = { 'V_m': ['Vm'], 'i_{Na}\ kin.': ['m', 'h'], 'i_K\ kin.': ['n'], 'i_{TS}\ kin.': ['s', 'u'], 'I': ['iNa', 'iK', 'iTs', 'iL', 'iNet'] } vars_TC = { 'V_m': ['Vm'], 'i_{Na}\ kin.': ['m', 'h'], 'i_K\ kin.': ['n'], 'i_{T}\ kin.': ['s', 'u'], 'i_{H}\ kin.': ['O', 'OL', 'O + 2OL'], 'I': ['iNa', 'iK', 'iT', 'iH', 'iKL', 'iL', 'iNet'] } vars_LeechT = { 'V_m': ['Vm'], 'i_{Na}\ kin.': ['m', 'h'], 'i_K\ kin.': ['n'], 'i_{Ca}\ kin.': ['s'], 'pools': ['C_Na_arb', 'C_Na_arb_activation', 'C_Ca_arb', 'C_Ca_arb_activation'], 'I': ['iNa', 'iK', 'iCa', 'iKCa', 'iPumpNa', 'iL', 'iNet'] } plotBatch(vars_LeechT, pkl_dir, pkl_filepaths, lw=2)