Page MenuHomec4science

utils.py
No OneTemporary

File Metadata

Created
Mon, May 13, 15:08

utils.py

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author: Theo Lemaire
# @Date: 2016-09-19 22:30:46
# @Email: theo.lemaire@epfl.ch
# @Last Modified by: Theo Lemaire
# @Last Modified time: 2018-09-24 23:53:10
""" Definition of generic utility functions used in other modules """
import operator
import os
import pickle
import tkinter as tk
from tkinter import filedialog
import lockfile
import shutil
from openpyxl import load_workbook
import numpy as np
import colorlog
from scipy.interpolate import interp1d
def setLogger():
log_formatter = colorlog.ColoredFormatter(
'%(log_color)s %(asctime)s %(message)s',
datefmt='%d/%m/%Y %H:%M:%S:',
reset=True,
log_colors={
'DEBUG': 'green',
'INFO': 'white',
'WARNING': 'yellow',
'ERROR': 'red',
'CRITICAL': 'red,bg_white',
},
style='%'
)
log_handler = colorlog.StreamHandler()
log_handler.setFormatter(log_formatter)
color_logger = colorlog.getLogger('PySONIC')
color_logger.addHandler(log_handler)
return color_logger
# Get package logger
logger = setLogger()
def OpenFilesDialog(filetype, dirname=''):
""" Open a FileOpenDialogBox to select one or multiple file.
The default directory and file type are given.
:param dirname: default directory
:param filetype: default file type
:return: tuple of full paths to the chosen filenames
"""
root = tk.Tk()
root.withdraw()
filenames = filedialog.askopenfilenames(filetypes=[(filetype + " files", '.' + filetype)],
initialdir=dirname)
if filenames:
par_dir = os.path.abspath(os.path.join(filenames[0], os.pardir))
else:
par_dir = None
return (filenames, par_dir)
def selectDirDialog():
""" Open a dialog box to select a directory.
:return: full path to selected directory
"""
root = tk.Tk()
root.withdraw()
return filedialog.askdirectory()
def ImportExcelCol(filename, sheetname, colstr, startrow):
""" Load a specific column of an excel workbook as a numpy array.
:param filename: absolute or relative path to the Excel workbook
:param sheetname: name of the Excel spreadsheet to which data is appended
:param colstr: string of the column to import
:param startrow: index of the first row to consider
:return: 1D numpy array with the column data
"""
wb = load_workbook(filename, read_only=True)
ws = wb[sheetname]
range_start_str = colstr + str(startrow)
range_stop_str = colstr + str(ws.max_row)
tmp = np.array([[i.value for i in j] for j in ws[range_start_str:range_stop_str]])
return tmp[:, 0]
def ConstructMatrix(serialized_inputA, serialized_inputB, serialized_output):
""" Construct a 2D output matrix from serialized input.
:param serialized_inputA: serialized input variable A
:param serialized_inputB: serialized input variable B
:param serialized_output: serialized output variable
:return: 4-tuple with vectors of unique values of A (m) and B (n),
output variable 2D matrix (m,n) and number of holes in the matrix
"""
As = np.unique(serialized_inputA)
Bs = np.unique(serialized_inputB)
nA = As.size
nB = Bs.size
output = np.zeros((nA, nB))
output[:] = np.NAN
nholes = 0
for i in range(nA):
iA = np.where(serialized_inputA == As[i])
for j in range(nB):
iB = np.where(serialized_inputB == Bs[j])
iMatch = np.intersect1d(iA, iB)
if iMatch.size == 0:
nholes += 1
elif iMatch.size > 1:
logger.warning('Identical serialized inputs with values (%f, %f)', As[i], Bs[j])
else:
iMatch = iMatch[0]
output[i, j] = serialized_output[iMatch]
return (As, Bs, output, nholes)
def rmse(x1, x2):
""" Compute the root mean square error between two 1D arrays """
return np.sqrt(((x1 - x2) ** 2).mean())
def rsquared(x1, x2):
''' compute the R-squared coefficient between two 1D arrays '''
residuals = x1 - x2
ss_res = np.sum(residuals**2)
ss_tot = np.sum((x1 - np.mean(x1))**2)
return 1 - (ss_res / ss_tot)
def DownSample(t_dense, y, nsparse):
""" Decimate periodic signals to a specified number of samples."""
if(y.ndim) > 1:
nsignals = y.shape[0]
else:
nsignals = 1
y = np.array([y])
# determine time step and period of input signal
T = t_dense[-1] - t_dense[0]
dt_dense = t_dense[1] - t_dense[0]
# resample time vector linearly
t_ds = np.linspace(t_dense[0], t_dense[-1], nsparse)
# create MAV window
nmav = int(0.03 * T / dt_dense)
if nmav % 2 == 0:
nmav += 1
mav = np.ones(nmav) / nmav
# determine signals padding
npad = int((nmav - 1) / 2)
# determine indexes of sampling on convolved signals
ids = np.round(np.linspace(0, t_dense.size - 1, nsparse)).astype(int)
y_ds = np.empty((nsignals, nsparse))
# loop through signals
for i in range(nsignals):
# pad, convolve and resample
pad_left = y[i, -(npad + 2):-2]
pad_right = y[i, 1:npad + 1]
y_ext = np.concatenate((pad_left, y[i, :], pad_right), axis=0)
y_mav = np.convolve(y_ext, mav, mode='valid')
y_ds[i, :] = y_mav[ids]
if nsignals == 1:
y_ds = y_ds[0, :]
return (t_ds, y_ds)
def Pressure2Intensity(p, rho=1075.0, c=1515.0):
""" Return the spatial peak, pulse average acoustic intensity (ISPPA)
associated with the specified pressure amplitude.
:param p: pressure amplitude (Pa)
:param rho: medium density (kg/m3)
:param c: speed of sound in medium (m/s)
:return: spatial peak, pulse average acoustic intensity (W/m2)
"""
return p**2 / (2 * rho * c)
def Intensity2Pressure(I, rho=1075.0, c=1515.0):
""" Return the pressure amplitude associated with the specified
spatial peak, pulse average acoustic intensity (ISPPA).
:param I: spatial peak, pulse average acoustic intensity (W/m2)
:param rho: medium density (kg/m3)
:param c: speed of sound in medium (m/s)
:return: pressure amplitude (Pa)
"""
return np.sqrt(2 * rho * c * I)
def find_nearest(array, value):
''' Find nearest element in 1D array. '''
idx = (np.abs(array - value)).argmin()
return (idx, array[idx])
def rescale(x, lb=None, ub=None, lb_new=0, ub_new=1):
''' Rescale a value to a specific interval by linear transformation. '''
if lb is None:
lb = x.min()
if ub is None:
ub = x.max()
xnorm = (x - lb) / (ub - lb)
return xnorm * (ub_new - lb_new) + lb_new
def LennardJones(x, beta, alpha, C, m, n):
""" Generic expression of a Lennard-Jones function, adapted for the context of
symmetric deflection (distance = 2x).
:param x: deflection (i.e. half-distance)
:param beta: x-shifting factor
:param alpha: x-scaling factor
:param C: y-scaling factor
:param m: exponent of the repulsion term
:param n: exponent of the attraction term
:return: Lennard-Jones potential at given distance (2x)
"""
return C * (np.power((alpha / (2 * x + beta)), m) - np.power((alpha / (2 * x + beta)), n))
def extractCompTimes(filenames):
''' Extract computation times from a list of simulation files. '''
tcomps = np.empty(len(filenames))
for i, fn in enumerate(filenames):
logger.info('Loading data from "%s"', fn)
with open(fn, 'rb') as fh:
frame = pickle.load(fh)
meta = frame['meta']
tcomps[i] = meta['tcomp']
return tcomps
def computeMeshEdges(x, scale='lin'):
''' Compute the appropriate edges of a mesh that quads a linear or logarihtmic distribution.
:param x: the input vector
:param scale: the type of distribution ('lin' for linear, 'log' for logarihtmic)
:return: the edges vector
'''
if scale is 'log':
x = np.log10(x)
dx = x[1] - x[0]
if scale is 'lin':
y = np.linspace(x[0] - dx / 2, x[-1] + dx / 2, x.size + 1)
elif scale is 'log':
y = np.logspace(x[0] - dx / 2, x[-1] + dx / 2, x.size + 1)
return y
si_prefixes = {
'y': 1e-24, # yocto
'z': 1e-21, # zepto
'a': 1e-18, # atto
'f': 1e-15, # femto
'p': 1e-12, # pico
'n': 1e-9, # nano
'u': 1e-6, # micro
'm': 1e-3, # mili
'': 1e0, # None
'k': 1e3, # kilo
'M': 1e6, # mega
'G': 1e9, # giga
'T': 1e12, # tera
'P': 1e15, # peta
'E': 1e18, # exa
'Z': 1e21, # zetta
'Y': 1e24, # yotta
}
def si_format(x, precision=0, space=''):
''' Format a float according to the SI unit system, with the appropriate prefix letter. '''
if isinstance(x, float) or isinstance(x, int) or isinstance(x, np.float) or\
isinstance(x, np.int32) or isinstance(x, np.int64):
if x == 0:
factor = 1e0
prefix = ''
else:
sorted_si_prefixes = sorted(si_prefixes.items(), key=operator.itemgetter(1))
vals = [tmp[1] for tmp in sorted_si_prefixes]
# vals = list(si_prefixes.values())
ix = np.searchsorted(vals, np.abs(x)) - 1
if np.abs(x) == vals[ix + 1]:
ix += 1
factor = vals[ix]
prefix = sorted_si_prefixes[ix][0]
# prefix = list(si_prefixes.keys())[ix]
return '{{:.{}f}}{}{}'.format(precision, space, prefix).format(x / factor)
elif isinstance(x, list) or isinstance(x, tuple):
return [si_format(item, precision, space) for item in x]
elif isinstance(x, np.ndarray) and x.ndim == 1:
return [si_format(float(item), precision, space) for item in x]
else:
print(type(x))
def getCycleAverage(t, y, T):
''' Compute the cycle-averaged profile of a signal given a specific periodicity.
:param t: time vector (s)
:param y: signal vector
:param T: period (s)
:return: cycle-averaged signal vector
'''
nsamples = y.size
ncycles = int((t[-1] - t[0]) // T)
npercycle = int(nsamples // ncycles)
return np.mean(np.reshape(y[:ncycles * npercycle], (ncycles, npercycle)), axis=1)
def getNeuronLookupsFile(mechname):
return os.path.join(
os.path.split(__file__)[0],
'neurons',
'{}_lookups.pkl'.format(mechname))
def getLookups2D(mechname, a, Fdrive):
''' Retrieve appropriate 2D lookup tables and reference vectors
for a given membrane mechanism, sonophore diameter and US frequency.
:param mechname: name of membrane density mechanism
:param a: sonophore diameter (m)
:param Fdrive: US frequency (Hz)
:return: 3-tuple with 1D numpy arrays of reference acoustic amplitudes and charge densities,
and a dictionary of 2D lookup numpy arrays
'''
# Check lookup file existence
lookup_path = getNeuronLookupsFile(mechname)
if not os.path.isfile(lookup_path):
raise FileNotFoundError('Missing lookup file: "{}"'.format(lookup_path))
# Load lookups dictionary
logger.debug('Loading lookup table')
with open(lookup_path, 'rb') as fh:
lookups4D = pickle.load(fh)
# Retrieve 1D inputs from lookups dictionary
aref = lookups4D.pop('a')
Fref = lookups4D.pop('f')
Aref = lookups4D.pop('A')
Qref = lookups4D.pop('Q')
# Check that sonophore diameter is within lookup range
arange = (aref.min() - 1e-12, aref.max() + 1e-12)
if a < arange[0] or a > arange[1]:
raise ValueError('Invalid sonophore diameter: {}m (must be within {}m - {}m lookup interval)'
.format(*si_format([a, *arange], precision=2, space=' ')))
# Check that US frequency is within lookup range
Frange = (Fref.min() - 1e-9, Fref.max() + 1e-9)
if Fdrive < Frange[0] or Fdrive > Frange[1]:
raise ValueError('Invalid frequency: {}Hz (must be within {}Hz - {}Hz lookup interval)'
.format(*si_format([Fdrive, *Frange], precision=2, space=' ')))
# Interpolate 4D lookups at sonophore diameter and then at US frequency
logger.debug('Interpolating lookups at a = {}m'.format(si_format(a, space=' ')))
lookups3D = {key: interp1d(aref, y4D, axis=0)(a) for key, y4D in lookups4D.items()}
logger.debug('Interpolating lookups at f = {}Hz'.format(si_format(Fdrive, space=' ')))
lookups2D = {key: interp1d(Fref, y3D, axis=0)(Fdrive) for key, y3D in lookups3D.items()}
return Aref, Qref, lookups2D
def pow10_format(number, precision=2):
''' Format a number in power of 10 notation. '''
ret_string = '{0:.{1:d}e}'.format(number, precision)
a, b = ret_string.split("e")
a = float(a)
b = int(b)
return '{}10^{{{}}}'.format('{} * '.format(a) if a != 1. else '', b)
def checkNumBounds(values, bounds):
''' Check if a set of numbers is within predefined bounds. '''
# checking parameters against reference bounds
for x, bound in zip(values, bounds):
if x < bound[0] or x > bound[1]:
raise ValueError('Input value {} out of [{}, {}] range'.format(x, bound[0], bound[1]))
pass
def getDefaultIndexes(params, defaults):
''' Return the indexes of default values found in lists of parameters.
:param params: dictionary of parameter arrays
:param defaults: dictionary of default values
:return: dictionary of resolved default indexes
'''
idefs = {}
for key, default in defaults.items():
if default not in params[key]:
raise Exception('default {} ({}) not found in parameter values'.format(key, default))
idefs[key] = np.where(params[key] == default)[0][0]
return idefs
def xlslog(filename, sheetname, data):
""" Append log data on a new row to specific sheet of excel workbook, using a lockfile
to avoid read/write errors between concurrent processes.
:param filename: absolute or relative path to the Excel workbook
:param sheetname: name of the Excel spreadsheet to which data is appended
:param data: data structure to be added to specific columns on a new row
:return: boolean indicating success (1) or failure (0) of operation
"""
try:
lock = lockfile.FileLock(filename)
lock.acquire()
wb = load_workbook(filename)
ws = wb[sheetname]
keys = data.keys()
i = 1
row_data = {}
for k in keys:
row_data[k] = data[k]
i += 1
ws.append(row_data)
wb.save(filename)
lock.release()
return 1
except PermissionError:
# If file cannot be accessed for writing because already opened
logger.warning('Cannot write to "%s". Close the file and type "Y"', filename)
user_str = input()
if user_str in ['y', 'Y']:
return xlslog(filename, sheetname, data)
else:
return 0
def checkBatchLog(batch_dir, batch_type):
''' Check for appropriate log file in batch directory, and create one if it is absent.
:param batch_dir: full path to batch directory
:param batch_type: type of simulation batch
:return: 2 tuple with full path to log file and boolean stating if log file was created
'''
# Check for directory existence
if not os.path.isdir(batch_dir):
raise IOError('"{}" output directory does not exist'.format(batch_dir))
# Determine log template from batch type
if batch_type == 'MECH':
logfile = 'log_MECH.xlsx'
elif batch_type == 'A-STIM':
logfile = 'log_ASTIM.xlsx'
elif batch_type == 'E-STIM':
logfile = 'log_ESTIM.xlsx'
else:
raise ValueError('Unknown batch type', batch_type)
# Get template in package subdirectory
this_dir, _ = os.path.split(__file__)
# parent_dir = os.path.abspath(os.path.join(this_dir, os.pardir))
logsrc = this_dir + '/templates/' + logfile
assert os.path.isfile(logsrc), 'template log file "{}" not found'.format(logsrc)
# Copy template in batch directory if no appropriate log file
logdst = batch_dir + '/' + logfile
is_log = os.path.isfile(logdst)
if not is_log:
shutil.copy2(logsrc, logdst)
return (logdst, not is_log)

Event Timeline