diff --git a/PySONIC/utils.py b/PySONIC/utils.py index 26f0afe..f7e31b8 100644 --- a/PySONIC/utils.py +++ b/PySONIC/utils.py @@ -1,516 +1,516 @@ # -*- coding: utf-8 -*- # @Author: Theo Lemaire # @Email: theo.lemaire@epfl.ch # @Date: 2016-09-19 22:30:46 # @Last Modified by: Theo Lemaire -# @Last Modified time: 2019-06-12 23:07:29 +# @Last Modified time: 2019-06-13 03:17:15 ''' Definition of generic utility functions used in other modules ''' import csv from functools import wraps import operator import time import os import lockfile import math import pickle import json from tqdm import tqdm import logging import tkinter as tk from tkinter import filedialog import numpy as np import colorlog # Package logger my_log_formatter = colorlog.ColoredFormatter( '%(log_color)s %(asctime)s %(message)s', datefmt='%d/%m/%Y %H:%M:%S:', reset=True, log_colors={ 'DEBUG': 'green', 'INFO': 'white', 'WARNING': 'yellow', 'ERROR': 'red', 'CRITICAL': 'red,bg_white', }, style='%') def setHandler(logger, handler): for h in logger.handlers: logger.removeHandler(h) logger.addHandler(handler) return logger def setLogger(name, formatter): handler = colorlog.StreamHandler() handler.setFormatter(formatter) logger = colorlog.getLogger(name) logger.addHandler(handler) return logger class TqdmHandler(logging.StreamHandler): def __init__(self, formatter): logging.StreamHandler.__init__(self) self.setFormatter(formatter) def emit(self, record): msg = self.format(record) tqdm.write(msg) logger = setLogger('PySONIC', my_log_formatter) # SI units prefixes si_prefixes = { 'y': 1e-24, # yocto 'z': 1e-21, # zepto 'a': 1e-18, # atto 'f': 1e-15, # femto 'p': 1e-12, # pico 'n': 1e-9, # nano 'u': 1e-6, # micro 'm': 1e-3, # mili '': 1e0, # None 'k': 1e3, # kilo 'M': 1e6, # mega 'G': 1e9, # giga 'T': 1e12, # tera 'P': 1e15, # peta 'E': 1e18, # exa 'Z': 1e21, # zetta 'Y': 1e24, # yotta } def si_format(x, precision=0, space=' '): ''' Format a float according to the SI unit system, with the appropriate prefix letter. ''' if isinstance(x, float) or isinstance(x, int) or isinstance(x, np.float) or\ isinstance(x, np.int32) or isinstance(x, np.int64): if x == 0: factor = 1e0 prefix = '' else: sorted_si_prefixes = sorted(si_prefixes.items(), key=operator.itemgetter(1)) vals = [tmp[1] for tmp in sorted_si_prefixes] # vals = list(si_prefixes.values()) ix = np.searchsorted(vals, np.abs(x)) - 1 if np.abs(x) == vals[ix + 1]: ix += 1 factor = vals[ix] prefix = sorted_si_prefixes[ix][0] # prefix = list(si_prefixes.keys())[ix] return '{{:.{}f}}{}{}'.format(precision, space, prefix).format(x / factor) elif isinstance(x, list) or isinstance(x, tuple): return [si_format(item, precision, space) for item in x] elif isinstance(x, np.ndarray) and x.ndim == 1: return [si_format(float(item), precision, space) for item in x] else: print(type(x)) def pow10_format(number, precision=2): ''' Format a number in power of 10 notation. ''' ret_string = '{0:.{1:d}e}'.format(number, precision) a, b = ret_string.split("e") a = float(a) b = int(b) return '{}10^{{{}}}'.format('{} * '.format(a) if a != 1. else '', b) def plural(n): if n < 0: raise ValueError('Cannot format negative integer (n = {})'.format(n)) if n == 0: return '' else: return 's' def rmse(x1, x2): ''' Compute the root mean square error between two 1D arrays ''' return np.sqrt(((x1 - x2) ** 2).mean()) def rsquared(x1, x2): ''' compute the R-squared coefficient between two 1D arrays ''' residuals = x1 - x2 ss_res = np.sum(residuals**2) ss_tot = np.sum((x1 - np.mean(x1))**2) return 1 - (ss_res / ss_tot) def Pressure2Intensity(p, rho=1075.0, c=1515.0): ''' Return the spatial peak, pulse average acoustic intensity (ISPPA) associated with the specified pressure amplitude. :param p: pressure amplitude (Pa) :param rho: medium density (kg/m3) :param c: speed of sound in medium (m/s) :return: spatial peak, pulse average acoustic intensity (W/m2) ''' return p**2 / (2 * rho * c) def Intensity2Pressure(I, rho=1075.0, c=1515.0): ''' Return the pressure amplitude associated with the specified spatial peak, pulse average acoustic intensity (ISPPA). :param I: spatial peak, pulse average acoustic intensity (W/m2) :param rho: medium density (kg/m3) :param c: speed of sound in medium (m/s) :return: pressure amplitude (Pa) ''' return np.sqrt(2 * rho * c * I) def convertPKL2JSON(): pkl_fpaths = OpenFilesDialog('pkl')[0] if len(pkl_fpaths) == 0: logger.error('No input files selected') return for pkl_filepath in pkl_fpaths: logger.info('Processing {} ...'.format(pkl_filepath)) json_filepath = '{}.json'.format(os.path.splitext(pkl_filepath)[0]) with open(pkl_filepath, 'rb') as fpkl, open(json_filepath, 'w') as fjson: data = pickle.load(fpkl) json.dump(data, fjson, ensure_ascii=False, sort_keys=True, indent=4) logger.info('All done!') def OpenFilesDialog(filetype, dirname=''): ''' Open a FileOpenDialogBox to select one or multiple file. The default directory and file type are given. :param dirname: default directory :param filetype: default file type :return: tuple of full paths to the chosen filenames ''' root = tk.Tk() root.withdraw() filenames = filedialog.askopenfilenames( filetypes=[(filetype + " files", '.' + filetype)], initialdir=dirname ) if filenames: par_dir = os.path.abspath(os.path.join(filenames[0], os.pardir)) else: par_dir = None return (filenames, par_dir) def selectDirDialog(title='Select directory'): ''' Open a dialog box to select a directory. :return: full path to selected directory ''' root = tk.Tk() root.withdraw() return filedialog.askdirectory(title=title) def SaveFileDialog(filename, dirname=None, ext=None): ''' Open a dialog box to save file. :param filename: filename :param dirname: initial directory :param ext: default extension :return: full path to the chosen filename ''' root = tk.Tk() root.withdraw() filename_out = filedialog.asksaveasfilename( defaultextension=ext, initialdir=dirname, initialfile=filename) return filename_out def loadData(fpath, frequency=1): ''' Load dataframe and metadata dictionary from pickle file. ''' logger.info('Loading data from "%s"', os.path.basename(fpath)) with open(fpath, 'rb') as fh: frame = pickle.load(fh) df = frame['data'].iloc[::frequency] meta = frame['meta'] return df, meta def rescale(x, lb=None, ub=None, lb_new=0, ub_new=1): ''' Rescale a value to a specific interval by linear transformation. ''' if lb is None: lb = x.min() if ub is None: ub = x.max() xnorm = (x - lb) / (ub - lb) return xnorm * (ub_new - lb_new) + lb_new def isIterable(x): for t in [list, tuple, np.ndarray]: if isinstance(x, t): return True return False def isWithin(name, val, bounds, rel_tol=1e-9): ''' Check if a floating point number is within an interval. If the value falls outside the interval, an error is raised. If the value falls just outside the interval due to rounding errors, the associated interval bound is returned. :param val: float value :param bounds: interval bounds (float tuple) :return: original or corrected value ''' if isIterable(val): return [isWithin(name, v, bounds, rel_tol) for v in val] if val >= bounds[0] and val <= bounds[1]: return val elif val < bounds[0] and math.isclose(val, bounds[0], rel_tol=rel_tol): logger.warning('Rounding %s value (%s) to interval lower bound (%s)', name, val, bounds[0]) return bounds[0] elif val > bounds[1] and math.isclose(val, bounds[1], rel_tol=rel_tol): logger.warning('Rounding %s value (%s) to interval upper bound (%s)', name, val, bounds[1]) return bounds[1] else: raise ValueError('{} value ({}) out of [{}, {}] interval'.format( name, val, bounds[0], bounds[1])) def getDistribution(xmin, xmax, nx, scale='lin'): if scale == 'log': xmin, xmax = np.log10(xmin), np.log10(xmax) return {'lin': np.linspace, 'log': np.logspace}[scale](xmin, xmax, nx) def getDistFromList(xlist): if not isinstance(xlist, list): raise TypeError('Input must be a list') if len(xlist) != 4: raise ValueError('List must contain exactly 4 arguments ([type, min, max, n])') scale = xlist[0] if scale not in ('log', 'lin'): raise ValueError('Unknown distribution type (must be "lin" or "log")') xmin, xmax = [float(x) for x in xlist[1:-1]] if xmin >= xmax: raise ValueError('Specified minimum higher or equal than specified maximum') nx = int(xlist[-1]) if nx < 2: raise ValueError('Specified number must be at least 2') return getDistribution(xmin, xmax, nx, scale=scale) def getIndex(container, value): ''' Return the index of a float / string value in a list / array :param container: list / 1D-array of elements :param value: value to search for :return: index of value (if found) ''' if isinstance(value, float): container = np.array(container) imatches = np.where(np.isclose(container, value, rtol=1e-9, atol=1e-16))[0] if len(imatches) == 0: raise ValueError('{} not found in {}'.format(value, container)) return imatches[0] elif isinstance(value, str): return container.index(value) def debug(func): ''' Print the function signature and return value. ''' @wraps(func) def wrapper_debug(*args, **kwargs): args_repr = [repr(a) for a in args] kwargs_repr = [f"{k}={v!r}" for k, v in kwargs.items()] signature = '{}({})'.format(func.__name__, ', '.join(args_repr + kwargs_repr)) print('Calling {}'.format(signature)) value = func(*args, **kwargs) print(f"{func.__name__!r} returned {value!r}") return value return wrapper_debug def timer(func): ''' Monitor and return the runtime of the decorated function. ''' @wraps(func) def wrapper(*args, **kwargs): start_time = time.perf_counter() value = func(*args, **kwargs) end_time = time.perf_counter() run_time = end_time - start_time return value, run_time return wrapper def logCache(fpath, delimiter='\t', out_type=float): ''' Add an extra IO memoization functionality to a function using file caching, to avoid repetitions of tedious computations with identical inputs. ''' def wrapper_with_args(func): @wraps(func) def wrapper(*args, **kwargs): # If function has history -> do not log if 'history' in kwargs: return func(*args, **kwargs) # Translate function arguments into string signature args_repr = [repr(a) for a in args] kwargs_repr = [f"{k}={v!r}" for k, v in kwargs.items()] signature = '{}({})'.format(func.__name__, ', '.join(args_repr + kwargs_repr)) # If entry present in log, return corresponding output if os.path.isfile(fpath): with open(fpath, 'r', newline='') as f: reader = csv.reader(f, delimiter=delimiter) for row in reader: if row[0] == signature: logger.info('entry found in "{}"'.format(os.path.basename(fpath))) return out_type(row[1]) # Otherwise, compute output and log it into file before returning out = func(*args, **kwargs) lock = lockfile.FileLock(fpath) lock.acquire() with open(fpath, 'a', newline='') as csvfile: writer = csv.writer(csvfile, delimiter=delimiter) writer.writerow([signature, str(out)]) lock.release() return out return wrapper return wrapper_with_args def fileCache(root, fcode_func, ext='json'): def wrapper_with_args(func): @wraps(func) def wrapper(*args, **kwargs): # Get load and dump functions from file extension try: load_func = { 'json': json.load, 'pkl': pickle.load, 'csv': lambda f: np.loadtxt(f, delimiter=',') }[ext] dump_func = { 'json': json.dump, 'pkl': pickle.dump, 'csv': lambda x, f: np.savetxt(f, x, delimiter=',') }[ext] except KeyError: raise ValueError('Unknown file extension') # Get read and write mode (text or binary) from file extension mode = 'b' if ext == 'pkl' else '' # Get file path from root and function arguments, using fcode function fpath = os.path.join(os.path.abspath(root), '{}.{}'.format(fcode_func(*args), ext)) # If file exists, load output from it if os.path.isfile(fpath): logger.info('loading data from "{}"'.format(fpath)) with open(fpath, 'r' + mode) as f: out = load_func(f) # Otherwise, execute function and create the file to dump the output else: logger.warning('reference data file not found: "{}"'.format(fpath)) out = func(*args, **kwargs) logger.info('dumping data in "{}"'.format(fpath)) lock = lockfile.FileLock(fpath) lock.acquire() with open(fpath, 'w' + mode) as f: dump_func(out, f) lock.release() return out return wrapper return wrapper_with_args def binarySearch(bool_func, args, ix, xbounds, dx_thr, history=None): ''' Use a binary search to determine the threshold satisfying a given condition within a continuous search interval. :param bool_func: boolean function returning whether condition is satisfied :param args: list of function arguments other than refined value :param xbounds: search interval for threshold (progressively refined) :param dx_thr: accuracy criterion for threshold :return: excitation threshold ''' # If first function call: check that condition changes within the interval if history is None: sim_args = args[:] # If condition not satisfied even for upper bound -> return nan sim_args.insert(ix, xbounds[1]) if not bool_func(sim_args): logger.warning('titration does not converge within this interval') return np.nan # If condition satisfied even for lower bound -> return nan sim_args[ix] = xbounds[0] if bool_func(sim_args): logger.warning('titration does not converge within this interval') return np.nan # Assign empty history history = [] # Compute function output at interval mid-point x = (xbounds[0] + xbounds[1]) / 2 sim_args = args[:] sim_args.insert(ix, x) history.append(bool_func(sim_args)) # If titration interval is small enough conv = False if (xbounds[1] - xbounds[0]) <= dx_thr: logger.debug('titration interval smaller than defined threshold') # If both conditions have been encountered during titration process, # we're going towards convergence if (0 in history and 1 in history): logger.debug('converging around threshold') # If current value satisfies condition, convergence is achieved # -> return threshold if history[-1]: logger.debug('currently satisfying condition -> convergence') return x # If only one condition has been encountered during titration process, # then no titration is impossible within the defined interval -> return NaN else: logger.warning('titration does not converge within this interval') return np.nan # Return threshold if convergence is reached, otherwise refine interval and iterate if conv: return x else: if x > 0.: xbounds = (xbounds[0], x) if history[-1] else (x, xbounds[1]) else: xbounds = (x, xbounds[1]) if history[-1] else (xbounds[0], x) return binarySearch(bool_func, args, ix, xbounds, dx_thr, history=history) diff --git a/README.md b/README.md index 7edc999..3412a46 100644 --- a/README.md +++ b/README.md @@ -1,162 +1,162 @@ -## Description +# Description This package is a Python implementation of the **multi-Scale Optimized Neuronal Intramembrane Cavitation (SONIC) model [1]**, a computationally efficient and interpretable model of neuronal intramembrane cavitation. It allows to simulate the responses of various neuron types to ultrasonic (and electrical) stimuli. This package contains three core model classes: - `BilayerSonophore` defines the underlying **biomechanical model of intramembrane cavitation**. - `PointNeuron` defines an abstract generic interface to **conductance-based point-neuron electrical models**. It is inherited by classes defining the different neuron types with specific membrane dynamics. - `NeuronalBilayerSonophore` defines the **full electromechanical model for any given neuron type**. To do so, it inherits from `BilayerSonophore` and receives a specific `PointNeuron` object at initialization. All three classes contain a `simulate` method to simulate the underlying model's behavior for a given set of stimulation and physiological parameters. The `NeuronalBilayerSonophore.simulate` method contains an additional `method` argument defining whether to perform a detailed (`full`), coarse-grained (`sonic`) or hybrid (`hybrid`) integration of the differential system. Numerical integration routines are implemented outside the models, in separate `Simulator` classes. The package also contains modules for graphing utilities, multiprocessing, results post-processing and command line parsing. -## Requirements +# Requirements - Python 3.6 or more -## Installation +# Installation - Open a terminal. - Activate a Python3 environment if needed, e.g. on the tnesrv5 machine: ```$ source /opt/apps/anaconda3/bin activate``` - Check that the appropriate version of pip is activated: ```$ pip --version``` - Go to the package directory (where the setup.py file is located): ```$ cd ``` - Insall the package and all its dependencies: ```$ pip install -e .``` -## Usage +# Usage This package contains conductance-based point-neuron implementations of several generic neuron types, including: - cortical regular spiking (RS) neuron - cortical fast spiking (FS) neuron - cortical low-threshold spiking (LTS) neuron - cortical intrinsically bursting (IB) neuron - thalamic reticular (RE) neuron - thalamo-cortical (TC) neuron - subthalamic nucleus (STN) neuron -### Python scripts +## Python scripts You can easily run simulations of any implemented point-neuron model under both electrical and ultrasonic stimuli, and visualize the simulation results, in just a few lines of code: ``` import logging import matplotlib.pyplot as plt from PySONIC.core import NeuronalBilayerSonophore from PySONIC.neurons import getPointNeuron from PySONIC.utils import logger from PySONIC.plt import SchemePlot logger.setLevel(logging.INFO) # Stimulation parameters a = 32e-9 # m Fdrive = 500e3 # Hz Adrive = 100e3 # Pa Astim = 10. # mA/m2 tstim = 250e-3 # s toffset = 50e-3 # s PRF = 100. # Hz DC = 0.5 # - # Point-neuron model and corresponding neuronal intramembrane cavitation model pneuron = getPointNeuron('RS') nbls = NeuronalBilayerSonophore(a, pneuron) # Run simulation upon electrical stimulation, and plot results elec_args = (Astim, tstim, toffset, PRF, DC) data, tcomp = pneuron.simulate(*elec_args) logger.info('completed in %.0f ms', tcomp * 1e3) scheme_plot = SchemePlot([(pneuron.simkey, data, pneuron.meta(*elec_args))]) fig1 = scheme_plot.render() # Run simulation upon ultrasonic stimulation, and plot results US_int_method = 'sonic' # Integration method ('sonic', 'full' or 'hybrid') US_args = (Fdrive, Adrive, tstim, toffset, PRF, DC, US_int_method) data, tcomp = nbls.simulate(*US_args) logger.info('completed in %.0f ms', tcomp * 1e3) scheme_plot = SchemePlot([(nbls.simkey, data, nbls.meta(*US_args))]) fig2 = scheme_plot.render() plt.show() ``` -### From the command line +## From the command line You can easily run simulations of all 3 model types using the dedicated command line scripts. To do so, open a terminal in the `scripts` directory. - Use `run_mech.py` for simulations of the **mechanical model** upon **ultrasonic stimulation**. For instance, for a 32 nm radius bilayer sonophore sonicated at 500 kHz and 100 kPa: ```$ python run_mech.py -a 32 -f 500 -A 100 -p Z``` - Use `run_estim.py` for simulations of **point-neuron models** upon **intracellular electrical stimulation**. For instance, a regular-spiking (RS) neuron injected with 10 mA/m2 intracellular current for 30 ms: ```$ python run_estim.py -n RS -A 10 --tstim 30 -p Vm``` - Use `run_astim.py` for simulations of **point-neuron models** upon **ultrasonic stimulation**. For instance, for a coarse-grained simulation of a 32 nm radius bilayer sonophore within a regular-spiking (RS) neuron membrane, sonicated at 500 kHz and 100 kPa for 150 ms: ```$ python run_astim.py -n RS -a 32 -f 500 -A 100 --tstim 150 --method sonic -p Qm``` The simulation results are saved in `.pkl` files. To view these results directly upon simulation completion, you can use the `-p [xxx]` option, where `[xxx]` can be `all` or a given variable name (e.g. `Z` for membrane deflection, `Vm` for membrane potential, `Qm` for membrane charge density). You can also easily run batches of simulations by specifying more than one value for any given stimulation parameter (e.g. `-A 100 200` for sonication with 100 and 200 kPa respectively). These batches can be parallelized using multiprocessing to optimize performance, with the extra argument `--mpi`. Several more options are available. To view them, type in: ```$ python -h``` -## Extend the package +# Extend the package -### Add other neuron types +## Add other neuron types You can easily add other neuron types into the package, providing their ion channel populations and underlying voltage-gated dynamics equations are known. To add a new point-neuron model, follow this procedure: 1. Create a new file, and save it in the `neurons` sub-folder, with an explicit name (e.g. `my_neuron.py`) 2. Copy-paste the content of the `template.py` file (also located in the `neurons` sub-folder) into your file 3. In your file, change the **class name** from `TemplateNeuron` to something more explicit (e.g. `MyNeuron`), and change the **neuron name** accordingly (e.g. `myneuron`). This name is a keyword used to refer to the model from outside the class 4. Modify/add **biophysical parameters** of your model (resting parameters, reversal potentials, channel conductances, ionic concentrations, temperatures, diffusion constants, etc...) as class attributes 5. Specify a **tuple of names of your different differential states** (i.e. all the differential variables of your model, except for the membrane potential), in the order you want them to appear in the solution. 6. Modify/add **gating states kinetics** (`alphax` and `betax` methods) that define the voltage-dependent activation and inactivation rates of the different ion channnels gates of your model. Those methods take the membrane potential `Vm` as input and return a rate in `s-1`. **You also need to modify the docstring accordingly, as this information is used by the package**. Alternatively, your can use steady-state open-probabilties (`xinf`) and adaptation time constants (`taux`) methods, but you will need to modify the other class methods accordingly. 7. Modify/add **states derivatives** (`derX` methods) that define the derivatives of your different state variables. Those methods must return a derivative in `/s`. **You also need to modify the docstring accordingly, as this information is used by the package** 8. Modify/add **steady-states** (`Xinf` methods) that define the steady-state values of your different state variables. Those methods must return a steady-state value in ``. 9. Modify/add **membrane currents** (`iXX` methods) of your model. Those methods take relevant gating states and the membrane potential `Vm` as inputs, and must return a current density in `mA/m2`. **You also need to modify the docstring accordingly, as this information is used by the package** 10. Modify the other required methods of the class: - The `currents` method that takes a membrane potential value `Vm` and a states vector as inputs, and returns a dictionary of membrane currents - The `derStates` method that takes the membrane potential `Vm` and a states vector as inputs, and returns a dictionary of states derivatives - The `derEffStates` method that takes a membrane charge density value `Qm`, a states vector, and a lookup dictionary as inputs, and returns a dictionary of effective states derivatives - The `steadyStates` method that takes a membrane potential value `Vm` as input, and returns a dictionary of steady-states - The `computeEffRates` method that takes a membrane potential array `Vm` as input, and returns a dictionary of effective (i.e. averaged over the `Vm` array) voltage-gated states 11. Add the neuron class to the package, by importing it in the `__init__.py` file of the `neurons` sub-folder: ```from .my_neuron import MyNeuron``` 12. Verify your point-neuron model by running simulations under various electrical stimuli and comparing the output to the neurons's expected behavior. Implemented required corrections if any. 13. Pre-compute lookup tables required to run coarse-grained simulations of the neuron model upon ultrasonic stimulation. To do so, go to the `scripts` directory and run the `run_lookups.py` script with the neuron's name as command line argument, e.g.: ```$ python run_lookups.py -n myneuron --mpi``` If possible, use the `--mpi` argument to enable multiprocessing, as lookups pre-computation greatly benefits from parallelization. 14. That's it! You can now run simulations of your point-neuron model upon ultrasonic stimulation. -## References +# References -[1] Lemaire, T., Neufeld, E., Kuster, N., and Micera, S. (2019). *Understanding ultrasound neuromodulation using a computationally efficient and interpretable model of intramembrane cavitation*. J. Neural Eng. +[1] Lemaire, T., Neufeld, E., Kuster, N., and Micera, S. (2019). Understanding ultrasound neuromodulation using a computationally efficient and interpretable model of intramembrane cavitation. J. Neural Eng.