diff --git a/PySONIC/constants.py b/PySONIC/constants.py index 2fa08e7..f90eeb8 100644 --- a/PySONIC/constants.py +++ b/PySONIC/constants.py @@ -1,61 +1,61 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Date: 2016-11-04 13:23:31 # @Email: theo.lemaire@epfl.ch # @Last Modified by: Theo Lemaire -# @Last Modified time: 2019-01-10 12:27:22 +# @Last Modified time: 2019-03-08 16:04:54 ''' Algorithmic constants used in the package. ''' # Biophysical constants FARADAY = 9.64853e4 # Faraday constant (C/mol) Rg = 8.31342 # Universal gas constant (Pa.m^3.mol^-1.K^-1 or J.mol^-1.K^-1) Z_Ca = 2 # Calcium valence Z_Na = 1 # Sodium valence Z_K = 1 # Potassium valence Celsius2Kelvin = 273.15 # Celsius to Kelvin conversion constant # Fitting and pre-processing LJFIT_PM_MAX = 1e8 # intermolecular pressure at the deflection lower bound for LJ fitting (Pa) PNET_EQ_MAX = 1e-1 # error threshold for net pressure at computed equilibrium position (Pa) PMAVG_STD_ERR_MAX = 3000 # error threshold in nonlinear fit of molecular pressure (Pa) # Mechanical simulations Z_ERR_MAX = 1e-11 # periodic convergence threshold for deflection (m) NG_ERR_MAX = 1e-24 # periodic convergence threshold for gas content (mol) NCYCLES_MAX = 10 # max number of acoustic cycles in mechanical simulations CHARGE_RANGE = (-200e-5, 150e-5) # physiological charge range constraining the membrane (C/m2) # E-STIM simulations DT_ESTIM = 1e-6 # A-STIM simulations SOLVER_NSTEPS = 1000 # maximum number of steps allowed during one call to the LSODA/DOP853 solvers CLASSIC_TARGET_DT = 1e-8 # target temporal resolution for output arrays of classic simulations NPC_FULL = 1000 # nb of samples per acoustic period in full system NPC_HH = 40 # nb of samples per acoustic period in HH system DQ_UPDATE = 1e-5 # charge evolution threshold between two hybrid integrations (C/m2) DT_UPDATE = 5e-4 # time interval between two hybrid integrations (s) DT_EFF = 5e-5 # time step for effective integration (s) MIN_SAMPLES_PER_PULSE_INT = 1 # minimal number of time points per pulse interval (TON of TOFF) # Spike detection -SPIKE_MIN_QAMP = 0.0 # threshold amplitude for spike detection on charge signal (C/m2) -SPIKE_MIN_QPROM = 5e-5 # threshold prominence for spike detection on charge signal (C/m2) +SPIKE_MIN_QAMP = 5e-5 # threshold amplitude for spike detection on charge signal (C/m2) +SPIKE_MIN_QPROM = 20e-5 # threshold prominence for spike detection on charge signal (C/m2) SPIKE_MIN_VAMP = 10.0 # threshold amplitude for spike detection on potential signal (mV) SPIKE_MIN_VPROM = 20.0 # threshold prominence for spike detection on potential signal (mV) SPIKE_MIN_DT = 5e-4 # minimal time interval for spike detection on charge signal (s) MIN_NSPIKES_SPECTRUM = 3 # minimum number of spikes to compute firing rate spectrum # Titrations TITRATION_ASTIM_RHEOBASE_LOG_CONF_INTERVAL = 2 TITRATION_T_OFFSET = 50e-3 # offset period for titration procedures (s) TITRATION_ASTIM_A_MAX = 6e5 - 1 # initial acoustic pressure upper bound for titration (Pa) TITRATION_ASTIM_DA_MAX = 1e3 # acoustic pressure search range threshold for titration (Pa) TITRATION_ESTIM_A_MAX = 50.0 # initial current density upper bound for titration (mA/m2) TITRATION_ESTIM_DA_MAX = 0.1 # current density search range threshold for titration (mA/m2) TITRATION_T_MAX = 2e-1 # initial stimulus duration upper bound for titration (s) TITRATION_DT_THR = 1e-3 # stimulus duration search range threshold for titration (s) TITRATION_DDC_THR = 0.01 # stimulus duty cycle search range threshold for titration (-) TITRATION_DC_MAX = 1.0 # initial stimulus duty cycle upper bound for titration (-) diff --git a/PySONIC/plt/spikeutils.py b/PySONIC/plt/spikeutils.py index 34d06d2..257be99 100644 --- a/PySONIC/plt/spikeutils.py +++ b/PySONIC/plt/spikeutils.py @@ -1,375 +1,375 @@ # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Date: 2018-10-01 20:40:28 # @Last Modified by: Theo Lemaire -# @Last Modified time: 2019-02-27 18:03:21 +# @Last Modified time: 2019-03-08 16:16:57 import pickle import numpy as np import matplotlib.pyplot as plt import matplotlib.cm as cm from ..utils import * from ..constants import * from ..postpro import findPeaks # Plot parameters phaseplotvars = { 'Vm': { 'label': 'V_m\ (mV)', 'dlabel': 'dV/dt\ (V/s)', 'factor': 1e0, 'lim': (-80.0, 50.0), 'dfactor': 1e-3, 'dlim': (-300, 700), 'thr_amp': SPIKE_MIN_VAMP, 'thr_prom': SPIKE_MIN_VPROM }, 'Qm': { 'label': 'Q_m\ (nC/cm^2)', 'dlabel': 'I\ (A/m^2)', 'factor': 1e5, 'lim': (-80.0, 50.0), 'dfactor': 1e0, 'dlim': (-2, 5), 'thr_amp': SPIKE_MIN_QAMP, 'thr_prom': SPIKE_MIN_QPROM } } def plotPhasePlane(filepaths, varname, no_offset=False, no_first=False, labels=None, colors=None, fs=15, lw=2, tbounds=None, pretty=True): ''' Plot phase-plane diagrams of spiking dynamics from simulation results. :param filepaths: list of full paths to data files :param varname: name of output variable of interest ('Qm' or Vm') :param no_offset: boolean stating whether or not to discard post-offset spikes :param no_first: boolean stating whether or not to discard first spike :param tbounds: spike interval bounds (ms) :return: figure handle ''' # Preprocess parameters if tbounds is None: tbounds = (-1.5, 1.5) pltvar = phaseplotvars[varname] # Create figure fig, axes = plt.subplots(1, 2, figsize=(8, 4)) for ax in axes: ax.spines['top'].set_visible(False) ax.spines['right'].set_visible(False) # 1st axis: variable as function of time ax = axes[0] ax.set_xlabel('$\\rm time\ (ms)$', fontsize=fs) ax.set_ylabel('$\\rm {}$'.format(pltvar['label']), fontsize=fs) ax.set_xlim(tbounds) ax.set_ylim(pltvar['lim']) if pretty: ax.set_xticks(tbounds) ax.set_yticks(pltvar['lim']) ax.set_xticklabels(['{:+.1f}'.format(x) for x in ax.get_xticks()]) ax.set_yticklabels(['{:+.0f}'.format(x) for x in ax.get_yticks()]) # 2nd axis: phase plot (derivative of variable vs variable) ax = axes[1] ax.set_xlabel('$\\rm {}$'.format(pltvar['label']), fontsize=fs) ax.set_ylabel('$\\rm {}$'.format(pltvar['dlabel']), fontsize=fs) ax.set_xlim(pltvar['lim']) ax.set_ylim(pltvar['dlim']) ax.plot([0, 0], [pltvar['dlim'][0], pltvar['dlim'][1]], '--', color='k', linewidth=1) ax.plot([pltvar['lim'][0], pltvar['lim'][1]], [0, 0], '--', color='k', linewidth=1) if pretty: ax.set_xticks(pltvar['lim']) ax.set_yticks(pltvar['dlim']) ax.set_xticklabels(['{:+.0f}'.format(x) for x in ax.get_xticks()]) ax.set_yticklabels(['{:+.0f}'.format(x) for x in ax.get_yticks()]) for ax in axes: for item in ax.get_xticklabels() + ax.get_yticklabels(): item.set_fontsize(fs) handles = [] autolabels = [] # For each file for i, filepath in enumerate(filepaths): # Load data logger.info('loading data from file "{}"'.format(filepath)) with open(filepath, 'rb') as fh: frame = pickle.load(fh) df = frame['data'] meta = frame['meta'] tstim = meta['tstim'] t = df['t'].values y = df[varname].values dt = t[1] - t[0] dydt = np.diff(y) / dt # Prominence-based spike detection ispikes, *_, ibounds = findPeaks( y, mph=pltvar['thr_amp'], # mpd=int(np.ceil(SPIKE_MIN_DT / dt)), mpp=pltvar['thr_prom'] ) if len(ispikes) > 0: # Discard potential irrelevant spikes if no_offset: ibounds_right = [x[1] for x in ibounds] inds = np.where(t[ibounds_right] < tstim)[0] ispikes = ispikes[inds] ibounds = ibounds[inds] if no_first: ispikes = ispikes[1:] ibounds = ibounds[1:] # Store spikes in dedicated lists tspikes = [] yspikes = [] dydtspikes = [] for ispike, ibound in zip(ispikes, ibounds): tmin = max(t[ibound[0]], tbounds[0] * 1e-3 + t[ispike]) tmax = min(t[ibound[1]], tbounds[1] * 1e-3 + t[ispike]) inds = np.where((t > tmin) & (t < tmax))[0] tspikes.append(t[inds] - t[ispike]) yspikes.append(y[inds]) dinds = np.hstack(([inds[0] - 1], inds, [inds[-1] + 1])) dydt = np.diff(y[dinds]) / np.diff(t[dinds]) dydtspikes.append((dydt[:-1] + dydt[1:]) / 2) if len(tspikes) == 0: logger.warning('No spikes detected') else: # Plot spikes temporal profiles and phase-plane diagrams for j in range(len(tspikes)): if colors is None: color = 'C{}'.format(i if len(filepaths) > 1 else j % 10) else: color = colors[i] lh = axes[0].plot(tspikes[j] * 1e3, yspikes[j] * pltvar['factor'], linewidth=lw, c=color)[0] axes[1].plot(yspikes[j] * pltvar['factor'], dydtspikes[j] * pltvar['dfactor'], linewidth=lw, c=color) # Populate legend handles.append(lh) autolabels.append(figtitle(meta)) fig.tight_layout() if labels is None: labels = autolabels # Add legend fig.subplots_adjust(top=0.8) if len(filepaths) > 1: axes[0].legend(handles, labels, fontsize=fs, frameon=False, loc='upper center', bbox_to_anchor=(1.0, 1.35)) else: fig.suptitle(labels[0], fontsize=fs) # Return return fig def plotSpikingMetrics(xvar, xlabel, metrics_dict, logscale=False, spikeamp=True, colors=None, fs=8, lw=2, ps=4, figsize=cm2inch(7.25, 5.8)): ''' Plot the evolution of key spiking metrics as function of a specific stimulation parameter. ''' ls = {'full': 'o-', 'sonic': 'o--'} cdefault = {'full': 'silver', 'sonic': 'k'} # Create figure naxes = 3 if spikeamp else 2 fig, axes = plt.subplots(naxes, 1, figsize=figsize) axes[0].set_ylabel('Latency\n (ms)', fontsize=fs, rotation=0, ha='right', va='center') axes[1].set_ylabel('Firing\n rate (Hz)', fontsize=fs, rotation=0, ha='right', va='center') if naxes == 3: axes[2].set_ylabel('Spike amp.\n ($\\rm nC/cm^2$)', fontsize=fs, rotation=0, ha='right', va='center') for ax in axes: ax.spines['top'].set_visible(False) ax.spines['right'].set_visible(False) if logscale: ax.set_xscale('log') for item in ax.get_yticklabels(): item.set_fontsize(fs) for ax in axes[:-1]: ax.spines['bottom'].set_visible(False) ax.set_xticks([]) plt.setp(ax.get_xticklabels(minor=True), visible=False) ax.get_xaxis().set_tick_params(which='minor', size=0) ax.get_xaxis().set_tick_params(which='minor', width=0) axes[-1].set_xlabel(xlabel, fontsize=fs) if not logscale: axes[-1].set_xticks([min(xvar), max(xvar)]) for item in axes[-1].get_xticklabels(): item.set_fontsize(fs) # Plot metrics for each neuron for i, neuron in enumerate(metrics_dict.keys()): full_metrics = metrics_dict[neuron]['full'] sonic_metrics = metrics_dict[neuron]['sonic'] c = colors[neuron] if colors is not None else cdefault # Latency rf = 10 ax = axes[0] ax.plot(xvar, full_metrics['latencies (ms)'].values, ls['full'], color=c['full'], linewidth=lw, markersize=ps) ax.plot(xvar, sonic_metrics['latencies (ms)'].values, ls['sonic'], color=c['sonic'], linewidth=lw, markersize=ps, label=neuron) # Firing rate rf = 10 ax = axes[1] ax.errorbar(xvar, full_metrics['mean firing rates (Hz)'].values, yerr=full_metrics['std firing rates (Hz)'].values, fmt=ls['full'], color=c['full'], linewidth=lw, markersize=ps) ax.errorbar(xvar, sonic_metrics['mean firing rates (Hz)'].values, yerr=sonic_metrics['std firing rates (Hz)'].values, fmt=ls['sonic'], color=c['sonic'], linewidth=lw, markersize=ps) # Spike amplitudes if spikeamp: ax = axes[2] rf = 10 ax.errorbar(xvar, full_metrics['mean spike amplitudes (nC/cm2)'].values, yerr=full_metrics['std spike amplitudes (nC/cm2)'].values, fmt=ls['full'], color=c['full'], linewidth=lw, markersize=ps) ax.errorbar(xvar, sonic_metrics['mean spike amplitudes (nC/cm2)'].values, yerr=sonic_metrics['std spike amplitudes (nC/cm2)'].values, fmt=ls['sonic'], color=c['sonic'], linewidth=lw, markersize=ps) # Adapt axes y-limits rf = 10 for ax in axes: ax.set_ylim([np.floor(ax.get_ylim()[0] / rf) * rf, np.ceil(ax.get_ylim()[1] / rf) * rf]) ax.set_yticks([max(ax.get_ylim()[0], 0), ax.get_ylim()[1]]) # Legend if len(metrics_dict.keys()) > 1: leg = axes[0].legend(fontsize=fs, frameon=False, bbox_to_anchor=(0., 0.9, 1., .102), loc=8, ncol=2, borderaxespad=0.) for l in leg.get_lines(): l.set_linestyle('-') fig.subplots_adjust(hspace=.3, bottom=0.2, left=0.35, right=0.95, top=0.95) return fig def plotFRProfile(filepaths, varname, no_offset=False, no_first=False, fs=15, lw=2, cmap=None, zscale='lin', zref='A'): ''' Plot spike rate temporal profiles from simulation results. :param filepaths: list of full paths to data files :param no_offset: boolean stating whether or not to discard post-offset spikes :param no_first: boolean stating whether or not to discard first spike :return: figure handle ''' pltvar = phaseplotvars[varname] # Create figure fig, ax = plt.subplots(figsize=(9, 4)) ax.spines['top'].set_visible(False) ax.spines['right'].set_visible(False) ax.set_xlabel('time (ms)', fontsize=fs) ax.set_ylabel('firing rate (Hz)', fontsize=fs) for item in ax.get_xticklabels() + ax.get_yticklabels(): item.set_fontsize(fs) handles = [] amplitudes = [] intensities = [] # For each file for i, filepath in enumerate(filepaths): # Load data logger.info('loading data from file "{}"'.format(filepath)) with open(filepath, 'rb') as fh: frame = pickle.load(fh) df = frame['data'] meta = frame['meta'] tstim = meta['tstim'] t = df['t'].values y = df[varname].values amplitudes.append(meta['Adrive']) intensities.append(Pressure2Intensity(meta['Adrive'])) # Prominence-based spike detection ispikes, *_ = findPeaks( y, mph=pltvar['thr_amp'], # mpd=int(np.ceil(SPIKE_MIN_DT / dt)), mpp=pltvar['thr_prom'] ) # Discard potential irrelevant spikes if len(ispikes) > 0: if no_offset: inds = np.where(t[ispikes] < tstim)[0] ispikes = ispikes[inds] if no_first: ispikes = ispikes[1:] # Plot firing rate as a function of spikes timing if len(ispikes) > 0: tspikes = t[ispikes][:-1] sr = 1 / np.diff(t[ispikes]) lh = ax.plot(tspikes * 1e3, sr, linewidth=lw)[0] handles.append(lh) else: logger.warning('No spikes detected') intensities = np.array(intensities) amplitudes = np.array(amplitudes) # Define and apply color code if zref == 'I': zref = intensities zlabel = 'Intensity' zunit = 'W/m2' elif zref == 'A': zref = amplitudes * 1e-3 zlabel = 'Amplitude' zunit = 'kPa' fig.tight_layout() if cmap is not None: mymap = plt.get_cmap(cmap) if zscale == 'lin': norm = matplotlib.colors.Normalize(zref.min(), zref.max()) elif zscale == 'log': norm = matplotlib.colors.LogNorm(zref.min(), zref.max()) sm = cm.ScalarMappable(norm=norm, cmap=mymap) sm._A = [] for lh, z in zip(handles, zref): lh.set_color(sm.to_rgba(z)) # Add colorbar fig.subplots_adjust(left=0.1, right=0.8, bottom=0.15, top=0.95, hspace=0.5) cbarax = fig.add_axes([0.85, 0.15, 0.03, 0.8]) fig.colorbar(sm, cax=cbarax, orientation='vertical') cbarax.set_ylabel('{} ({})'.format(zlabel, zunit), fontsize=fs) for item in cbarax.get_yticklabels(): item.set_fontsize(fs) else: for lh, z in zip(handles, zref): lh.set_label('{:.2f} {}'.format(z, zunit)) ax.legend() # Return return fig diff --git a/PySONIC/utils.py b/PySONIC/utils.py index e35c085..cf728b8 100644 --- a/PySONIC/utils.py +++ b/PySONIC/utils.py @@ -1,582 +1,582 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Date: 2016-09-19 22:30:46 # @Email: theo.lemaire@epfl.ch # @Last Modified by: Theo Lemaire -# @Last Modified time: 2019-03-05 10:38:49 +# @Last Modified time: 2019-03-08 16:09:12 ''' Definition of generic utility functions used in other modules ''' import operator import os import math import pickle import re import tkinter as tk from tkinter import filedialog import numpy as np import colorlog from scipy.interpolate import interp1d import matplotlib from .constants import FARADAY, Rg # Matplotlib parameters matplotlib.rcParams['pdf.fonttype'] = 42 matplotlib.rcParams['ps.fonttype'] = 42 matplotlib.rcParams['font.family'] = 'arial' # Package logger def setLogger(): log_formatter = colorlog.ColoredFormatter( '%(log_color)s %(asctime)s %(message)s', datefmt='%d/%m/%Y %H:%M:%S:', reset=True, log_colors={ 'DEBUG': 'green', 'INFO': 'white', 'WARNING': 'yellow', 'ERROR': 'red', 'CRITICAL': 'red,bg_white', }, style='%' ) log_handler = colorlog.StreamHandler() log_handler.setFormatter(log_formatter) color_logger = colorlog.getLogger('PySONIC') color_logger.addHandler(log_handler) return color_logger logger = setLogger() # File naming conventions def ESTIM_filecode(neuron, Astim, tstim, PRF, DC): return 'ESTIM_{}_{}_{:.1f}mA_per_m2_{:.0f}ms{}'.format( neuron, 'CW' if DC == 1 else 'PW', Astim, tstim * 1e3, '_PRF{:.2f}Hz_DC{:.2f}%'.format(PRF, DC * 1e2) if DC < 1. else '') def ASTIM_filecode(neuron, a, Fdrive, Adrive, tstim, PRF, DC, method): return 'ASTIM_{}_{}_{:.0f}nm_{:.0f}kHz_{:.1f}kPa_{:.0f}ms_{}{}'.format( neuron, 'CW' if DC == 1 else 'PW', a * 1e9, Fdrive * 1e-3, Adrive * 1e-3, tstim * 1e3, 'PRF{:.2f}Hz_DC{:.2f}%_'.format(PRF, DC * 1e2) if DC < 1. else '', method) def MECH_filecode(a, Fdrive, Adrive, Qm): return 'MECH_{:.0f}nm_{:.0f}kHz_{:.1f}kPa_{:.1f}nCcm2'.format( a * 1e9, Fdrive * 1e-3, Adrive * 1e-3, Qm * 1e5) rgxp = re.compile('(ESTIM|ASTIM)_([A-Za-z]*)_(.*).pkl') rgxp_mech = re.compile('(MECH)_(.*).pkl') # Figure naming conventions def figtitle(meta): ''' Return appropriate title based on simulation metadata. ''' if 'Cm0' in meta: - return '{:.0f}nm radius BLS structure: MECH-STIM {:.0f}kHz, {:.0f}kPa, {:.1f}nC/cm2'.format( + return '{:.0f}nm radius BLS structure: MECH-STIM {:.0f}kHz, {:.2f}kPa, {:.1f}nC/cm2'.format( meta['a'] * 1e9, meta['Fdrive'] * 1e-3, meta['Adrive'] * 1e-3, meta['Qm'] * 1e5) else: if meta['DC'] < 1: wavetype = 'PW' suffix = ', {:.2f}Hz PRF, {:.0f}% DC'.format(meta['PRF'], meta['DC'] * 1e2) else: wavetype = 'CW' suffix = '' if 'Astim' in meta: return '{} neuron: {} E-STIM {:.2f}mA/m2, {:.0f}ms{}'.format( meta['neuron'], wavetype, meta['Astim'], meta['tstim'] * 1e3, suffix) else: - return '{} neuron ({:.1f}nm): {} A-STIM {:.0f}kHz {:.0f}kPa, {:.0f}ms{} - {} model'.format( + return '{} neuron ({:.1f}nm): {} A-STIM {:.0f}kHz {:.2f}kPa, {:.0f}ms{} - {} model'.format( meta['neuron'], meta['a'] * 1e9, wavetype, meta['Fdrive'] * 1e-3, meta['Adrive'] * 1e-3, meta['tstim'] * 1e3, suffix, meta['method']) timeunits = { 'ASTIM': 't_ms', 'ESTIM': 't_ms', 'MECH': 't_us' } def cm2inch(*tupl): inch = 2.54 if isinstance(tupl[0], tuple): return tuple(i / inch for i in tupl[0]) else: return tuple(i / inch for i in tupl) # SI units prefixes si_prefixes = { 'y': 1e-24, # yocto 'z': 1e-21, # zepto 'a': 1e-18, # atto 'f': 1e-15, # femto 'p': 1e-12, # pico 'n': 1e-9, # nano 'u': 1e-6, # micro 'm': 1e-3, # mili '': 1e0, # None 'k': 1e3, # kilo 'M': 1e6, # mega 'G': 1e9, # giga 'T': 1e12, # tera 'P': 1e15, # peta 'E': 1e18, # exa 'Z': 1e21, # zetta 'Y': 1e24, # yotta } def si_format(x, precision=0, space=' '): ''' Format a float according to the SI unit system, with the appropriate prefix letter. ''' if isinstance(x, float) or isinstance(x, int) or isinstance(x, np.float) or\ isinstance(x, np.int32) or isinstance(x, np.int64): if x == 0: factor = 1e0 prefix = '' else: sorted_si_prefixes = sorted(si_prefixes.items(), key=operator.itemgetter(1)) vals = [tmp[1] for tmp in sorted_si_prefixes] # vals = list(si_prefixes.values()) ix = np.searchsorted(vals, np.abs(x)) - 1 if np.abs(x) == vals[ix + 1]: ix += 1 factor = vals[ix] prefix = sorted_si_prefixes[ix][0] # prefix = list(si_prefixes.keys())[ix] return '{{:.{}f}}{}{}'.format(precision, space, prefix).format(x / factor) elif isinstance(x, list) or isinstance(x, tuple): return [si_format(item, precision, space) for item in x] elif isinstance(x, np.ndarray) and x.ndim == 1: return [si_format(float(item), precision, space) for item in x] else: print(type(x)) def pow10_format(number, precision=2): ''' Format a number in power of 10 notation. ''' ret_string = '{0:.{1:d}e}'.format(number, precision) a, b = ret_string.split("e") a = float(a) b = int(b) return '{}10^{{{}}}'.format('{} * '.format(a) if a != 1. else '', b) def rmse(x1, x2): ''' Compute the root mean square error between two 1D arrays ''' return np.sqrt(((x1 - x2) ** 2).mean()) def rsquared(x1, x2): ''' compute the R-squared coefficient between two 1D arrays ''' residuals = x1 - x2 ss_res = np.sum(residuals**2) ss_tot = np.sum((x1 - np.mean(x1))**2) return 1 - (ss_res / ss_tot) def Pressure2Intensity(p, rho=1075.0, c=1515.0): ''' Return the spatial peak, pulse average acoustic intensity (ISPPA) associated with the specified pressure amplitude. :param p: pressure amplitude (Pa) :param rho: medium density (kg/m3) :param c: speed of sound in medium (m/s) :return: spatial peak, pulse average acoustic intensity (W/m2) ''' return p**2 / (2 * rho * c) def Intensity2Pressure(I, rho=1075.0, c=1515.0): ''' Return the pressure amplitude associated with the specified spatial peak, pulse average acoustic intensity (ISPPA). :param I: spatial peak, pulse average acoustic intensity (W/m2) :param rho: medium density (kg/m3) :param c: speed of sound in medium (m/s) :return: pressure amplitude (Pa) ''' return np.sqrt(2 * rho * c * I) def OpenFilesDialog(filetype, dirname=''): ''' Open a FileOpenDialogBox to select one or multiple file. The default directory and file type are given. :param dirname: default directory :param filetype: default file type :return: tuple of full paths to the chosen filenames ''' root = tk.Tk() root.withdraw() filenames = filedialog.askopenfilenames(filetypes=[(filetype + " files", '.' + filetype)], initialdir=dirname) if filenames: par_dir = os.path.abspath(os.path.join(filenames[0], os.pardir)) else: par_dir = None return (filenames, par_dir) def selectDirDialog(): ''' Open a dialog box to select a directory. :return: full path to selected directory ''' root = tk.Tk() root.withdraw() return filedialog.askdirectory() def SaveFileDialog(filename, dirname=None, ext=None): ''' Open a dialog box to save file. :param filename: filename :param dirname: initial directory :param ext: default extension :return: full path to the chosen filename ''' root = tk.Tk() root.withdraw() filename_out = filedialog.asksaveasfilename( defaultextension=ext, initialdir=dirname, initialfile=filename) return filename_out def downsample(t_dense, y, nsparse): ''' Decimate periodic signals to a specified number of samples.''' if(y.ndim) > 1: nsignals = y.shape[0] else: nsignals = 1 y = np.array([y]) # determine time step and period of input signal T = t_dense[-1] - t_dense[0] dt_dense = t_dense[1] - t_dense[0] # resample time vector linearly t_ds = np.linspace(t_dense[0], t_dense[-1], nsparse) # create MAV window nmav = int(0.03 * T / dt_dense) if nmav % 2 == 0: nmav += 1 mav = np.ones(nmav) / nmav # determine signals padding npad = int((nmav - 1) / 2) # determine indexes of sampling on convolved signals ids = np.round(np.linspace(0, t_dense.size - 1, nsparse)).astype(int) y_ds = np.empty((nsignals, nsparse)) # loop through signals for i in range(nsignals): # pad, convolve and resample pad_left = y[i, -(npad + 2):-2] pad_right = y[i, 1:npad + 1] y_ext = np.concatenate((pad_left, y[i, :], pad_right), axis=0) y_mav = np.convolve(y_ext, mav, mode='valid') y_ds[i, :] = y_mav[ids] if nsignals == 1: y_ds = y_ds[0, :] return (t_ds, y_ds) def rescale(x, lb=None, ub=None, lb_new=0, ub_new=1): ''' Rescale a value to a specific interval by linear transformation. ''' if lb is None: lb = x.min() if ub is None: ub = x.max() xnorm = (x - lb) / (ub - lb) return xnorm * (ub_new - lb_new) + lb_new def getStimPulses(t, states): ''' Determine the onset and offset times of pulses from a stimulation vector. :param t: time vector (s). :param states: a vector of stimulation state (ON/OFF) at each instant in time. :return: 3-tuple with number of patches, timing of STIM-ON an STIM-OFF instants. ''' # Compute states derivatives and identify bounds indexes of pulses dstates = np.diff(states) ipulse_on = np.insert(np.where(dstates > 0.0)[0] + 1, 0, 0) ipulse_off = np.where(dstates < 0.0)[0] + 1 if ipulse_off.size < ipulse_on.size: ioff = t.size - 1 if ipulse_off.size == 0: ipulse_off = np.array([ioff]) else: ipulse_off = np.insert(ipulse_off, ipulse_off.size - 1, ioff) # Get time instants for pulses ON and OFF npulses = ipulse_on.size tpulse_on = t[ipulse_on] tpulse_off = t[ipulse_off] # return 3-tuple with #pulses, pulse ON and pulse OFF instants return npulses, tpulse_on, tpulse_off def getNeuronLookupsFile(mechname, a=None, Fdrive=None, Adrive=None, fs=False): fpath = os.path.join( os.path.split(__file__)[0], 'neurons', '{}_lookups'.format(mechname) ) if a is not None: fpath += '_{:.0f}nm'.format(a * 1e9) if Fdrive is not None: fpath += '_{:.0f}kHz'.format(Fdrive * 1e-3) if Adrive is not None: fpath += '_{:.0f}kPa'.format(Adrive * 1e-3) if fs is True: fpath += '_fs' return '{}.pkl'.format(fpath) def getLookups4D(mechname): ''' Retrieve 4D lookup tables and reference vectors for a given membrane mechanism. :param mechname: name of membrane density mechanism :return: 4-tuple with 1D numpy arrays of reference input vectors (charge density and one other variable), a dictionary of associated 2D lookup numpy arrays, and a dictionnary with information about the other variable. ''' # Check lookup file existence lookup_path = getNeuronLookupsFile(mechname) if not os.path.isfile(lookup_path): raise FileNotFoundError('Missing lookup file: "{}"'.format(lookup_path)) # Load lookups dictionary logger.debug('Loading lookup table') with open(lookup_path, 'rb') as fh: df = pickle.load(fh) inputs = df['input'] lookups4D = df['lookup'] # Retrieve 1D inputs from lookups dictionary aref = inputs['a'] Fref = inputs['f'] Aref = inputs['A'] Qref = inputs['Q'] return aref, Fref, Aref, Qref, lookups4D def getLookupsOff(mechname): ''' Retrieve appropriate US-OFF lookup tables and reference vectors for a given membrane mechanism. :param mechname: name of membrane density mechanism :return: 2-tuple with 1D numpy array of reference charge density and dictionary of associated 1D lookup numpy arrays. ''' # Get 4D lookups and input vectors aref, Fref, Aref, Qref, lookups4D = getLookups4D(mechname) # Perform 2D projection in appropriate dimensions logger.debug('Interpolating lookups at A = 0') lookups_off = {key: y4D[0, 0, 0, :] for key, y4D in lookups4D.items()} return Qref, lookups_off def getLookups2D(mechname, a=None, Fdrive=None, Adrive=None): ''' Retrieve appropriate 2D lookup tables and reference vectors for a given membrane mechanism, projected at a specific combination of sonophore radius, US frequency and/or acoustic pressure amplitude. :param mechname: name of membrane density mechanism :param a: sonophore radius (m) :param Fdrive: US frequency (Hz) :param Adrive: Acoustic peak pressure ampplitude (Hz) :return: 4-tuple with 1D numpy arrays of reference input vectors (charge density and one other variable), a dictionary of associated 2D lookup numpy arrays, and a dictionnary with information about the other variable. ''' # Get 4D lookups and input vectors aref, Fref, Aref, Qref, lookups4D = getLookups4D(mechname) # Check that inputs are within lookup range if a is not None: a = isWithin('radius', a, (aref.min(), aref.max())) if Fdrive is not None: Fdrive = isWithin('frequency', Fdrive, (Fref.min(), Fref.max())) if Adrive is not None: Adrive = isWithin('amplitude', Adrive, (Aref.min(), Aref.max())) # Determine projection dimensions based on inputs var_a = {'name': 'a', 'label': 'sonophore radius', 'val': a, 'unit': 'm', 'factor': 1e9, 'ref': aref, 'axis': 0} var_Fdrive = {'name': 'f', 'label': 'frequency', 'val': Fdrive, 'unit': 'Hz', 'factor': 1e-3, 'ref': Fref, 'axis': 1} var_Adrive = {'name': 'A', 'label': 'amplitude', 'val': Adrive, 'unit': 'Pa', 'factor': 1e-3, 'ref': Aref, 'axis': 2} if not isinstance(Adrive, float): var1 = var_a var2 = var_Fdrive var3 = var_Adrive elif not isinstance(Fdrive, float): var1 = var_a var2 = var_Adrive var3 = var_Fdrive elif not isinstance(a, float): var1 = var_Fdrive var2 = var_Adrive var3 = var_a # Perform 2D projection in appropriate dimensions logger.debug('Interpolating lookups at (%s = %s%s, %s = %s%s)', var1['name'], si_format(var1['val'], space=' '), var1['unit'], var2['name'], si_format(var2['val'], space=' '), var2['unit']) lookups3D = {key: interp1d(var1['ref'], y4D, axis=var1['axis'])(var1['val']) for key, y4D in lookups4D.items()} if var2['axis'] > var1['axis']: var2['axis'] -= 1 lookups2D = {key: interp1d(var2['ref'], y3D, axis=var2['axis'])(var2['val']) for key, y3D in lookups3D.items()} if var3['val'] is not None: logger.debug('Interpolating lookups at %d new %s values between %s%s and %s%s', len(var3['val']), var3['name'], si_format(min(var3['val']), space=' '), var3['unit'], si_format(max(var3['val']), space=' '), var3['unit']) lookups2D = {key: interp1d(var3['ref'], y2D, axis=0)(var3['val']) for key, y2D in lookups2D.items()} var3['ref'] = np.array(var3['val']) return var3['ref'], Qref, lookups2D, var3 def getLookups2Dfs(mechname, a, Fdrive, fs): # Check lookup file existence lookup_path = getNeuronLookupsFile(mechname, a=a, Fdrive=Fdrive, fs=True) if not os.path.isfile(lookup_path): raise FileNotFoundError('Missing lookup file: "{}"'.format(lookup_path)) # Load lookups dictionary logger.debug('Loading lookup table') with open(lookup_path, 'rb') as fh: df = pickle.load(fh) inputs = df['input'] lookups3D = df['lookup'] # Retrieve 1D inputs from lookups dictionary fsref = inputs['fs'] Aref = inputs['A'] Qref = inputs['Q'] # Check that fs is within lookup range fs = isWithin('coverage', fs, (fsref.min(), fsref.max())) # Perform projection at fs logger.debug('Interpolating lookups at fs = %s%%', fs * 1e2) lookups2D = {key: interp1d(fsref, y3D, axis=2)(fs) for key, y3D in lookups3D.items()} return Aref, Qref, lookups2D def isWithin(name, val, bounds, rel_tol=1e-9): ''' Check if a floating point number is within an interval. If the value falls outside the interval, an error is raised. If the value falls just outside the interval due to rounding errors, the associated interval bound is returned. :param val: float value :param bounds: interval bounds (float tuple) :return: original or corrected value ''' if isinstance(val, list) or isinstance(val, np.ndarray) or isinstance(val, tuple): return [isWithin(name, v, bounds, rel_tol) for v in val] if val >= bounds[0] and val <= bounds[1]: return val elif val < bounds[0] and math.isclose(val, bounds[0], rel_tol=rel_tol): logger.warning('Rounding %s value (%s) to interval lower bound (%s)', name, val, bounds[0]) return bounds[0] elif val > bounds[1] and math.isclose(val, bounds[1], rel_tol=rel_tol): logger.warning('Rounding %s value (%s) to interval upper bound (%s)', name, val, bounds[1]) return bounds[1] else: raise ValueError('{} value ({}) out of [{}, {}] interval'.format( name, val, bounds[0], bounds[1])) def getLookupsCompTime(mechname): # Check lookup file existence lookup_path = getNeuronLookupsFile(mechname) if not os.path.isfile(lookup_path): raise FileNotFoundError('Missing lookup file: "{}"'.format(lookup_path)) # Load lookups dictionary logger.debug('Loading comp times') with open(lookup_path, 'rb') as fh: df = pickle.load(fh) tcomps4D = df['tcomp'] return np.sum(tcomps4D) def nernst(z_ion, Cion_in, Cion_out, T): ''' Return the Nernst potential of a specific ion given its intra and extracellular concentrations. :param z_ion: ion valence :param Cion_in: intracellular ion concentration :param Cion_out: extracellular ion concentration :param T: temperature (K) :return: ion Nernst potential (mV) ''' return (Rg * T) / (z_ion * FARADAY) * np.log(Cion_out / Cion_in) * 1e3 def vtrap(x, y): ''' Generic function used to compute rate constants. ''' return x / (np.exp(x / y) - 1) def efun(x): ''' Generic function used to compute rate constants. ''' return x / (np.exp(x) - 1) def ghkDrive(Vm, Z_ion, Cion_in, Cion_out, T): ''' Use the Goldman-Hodgkin-Katz equation to compute the electrochemical driving force of a specific ion species for a given membrane potential. :param Vm: membrane potential (mV) :param Cin: intracellular ion concentration (M) :param Cout: extracellular ion concentration (M) :param T: temperature (K) :return: electrochemical driving force of a single ion particle (mC.m-3) ''' x = Z_ion * FARADAY * Vm / (Rg * T) * 1e-3 # [-] eCin = Cion_in * efun(-x) # M eCout = Cion_out * efun(x) # M return FARADAY * (eCin - eCout) * 1e6 # mC/m3 diff --git a/paper figures/fig9.py b/paper figures/fig9.py index 74c1212..da78432 100644 --- a/paper figures/fig9.py +++ b/paper figures/fig9.py @@ -1,110 +1,110 @@ # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Date: 2018-12-09 12:06:01 # @Last Modified by: Theo Lemaire -# @Last Modified time: 2019-02-28 11:37:10 +# @Last Modified time: 2019-03-08 16:15:52 ''' Sub-panels of SONIC model validation on an STN neuron (response to CW sonication). ''' import os import logging import numpy as np import matplotlib import matplotlib.pyplot as plt from argparse import ArgumentParser from PySONIC.utils import logger, selectDirDialog, ASTIM_filecode, Intensity2Pressure from PySONIC.plt import plotFRProfile, plotBatch # Plot parameters matplotlib.rcParams['pdf.fonttype'] = 42 matplotlib.rcParams['ps.fonttype'] = 42 matplotlib.rcParams['font.family'] = 'arial' # Figure basename figbase = os.path.splitext(__file__)[0] def main(): ap = ArgumentParser() # Runtime options ap.add_argument('-v', '--verbose', default=False, action='store_true', help='Increase verbosity') ap.add_argument('-i', '--inputdir', type=str, help='Input directory') ap.add_argument('-f', '--figset', type=str, nargs='+', help='Figure set', default='all') ap.add_argument('-s', '--save', default=False, action='store_true', help='Save output figures as pdf') args = ap.parse_args() loglevel = logging.DEBUG if args.verbose is True else logging.INFO logger.setLevel(loglevel) inputdir = selectDirDialog() if args.inputdir is None else args.inputdir if inputdir == '': logger.error('No input directory chosen') return figset = args.figset if figset is 'all': figset = ['a', 'b'] logger.info('Generating panels {} of {}'.format(figset, figbase)) # Parameters neuron = 'STN' a = 32e-9 # m Fdrive = 500e3 # Hz tstim = 1 # s PRF = 1e2 DC = 1. # Range of intensities intensities = np.hstack(( np.arange(10, 101, 10), np.arange(101, 131, 1), np.array([140]) )) # W/m2 # remove levels corresponding to overwritten files (similar Adrive decimals) todelete = [np.argwhere(intensities == x)[0][0] for x in [108, 115, 122, 127]] intensities = np.delete(intensities, todelete) # Levels depicted with individual traces - subset_intensities = [105, 107, 123] # W/m2 + subset_intensities = [112, 114, 123] # W/m2 # convert to amplitudes and get filepaths amplitudes = np.array([Intensity2Pressure(I) for I in intensities]) # Pa fnames = ['{}.pkl'.format(ASTIM_filecode(neuron, a, Fdrive, A, tstim, PRF, DC, 'sonic')) for A in amplitudes] fpaths = [os.path.join(inputdir, 'STN', fn) for fn in fnames] # Generate figures figs = [] if 'a' in figset: fig = plotFRProfile(fpaths, 'Qm', no_offset=True, no_first=False, zref='A', zscale='lin', cmap='Oranges') fig.canvas.set_window_title(figbase + 'a') figs.append(fig) if 'b' in figset: isubset = [np.argwhere(intensities == x)[0][0] for x in subset_intensities] subset_amplitudes = amplitudes[isubset] titles = ['{:.2f} kPa ({:.0f} W/m2)'.format(A * 1e-3, I) for A, I in zip(subset_amplitudes, subset_intensities)] print(titles) figtraces = plotBatch([fpaths[i] for i in isubset], vars_dict={'Q_m': ['Qm']}) for fig, title in zip(figtraces, titles): fig.axes[0].set_title(title) fig.canvas.set_window_title(figbase + 'b {}'.format(title)) figs.append(fig) if args.save: for fig in figs: s = fig.canvas.get_window_title() s = s.replace('(', '- ').replace('/', '_').replace(')', '') figname = '{}.pdf'.format(s) fig.savefig(os.path.join(inputdir, figname), transparent=True) else: plt.show() if __name__ == '__main__': main()