Page Menu
Home
c4science
Search
Configure Global Search
Log In
Files
F61680697
utils.py
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Subscribers
None
File Metadata
Details
File Info
Storage
Attached
Created
Wed, May 8, 07:14
Size
29 KB
Mime Type
text/x-python
Expires
Fri, May 10, 07:14 (2 d)
Engine
blob
Format
Raw Data
Handle
17544167
Attached To
R4670 PySONIC (old)
utils.py
View Options
# -*- coding: utf-8 -*-
# @Author: Theo Lemaire
# @Email: theo.lemaire@epfl.ch
# @Date: 2016-09-19 22:30:46
# @Last Modified by: Theo Lemaire
# @Last Modified time: 2020-02-04 15:05:28
''' Definition of generic utility functions used in other modules '''
import
csv
from
functools
import
wraps
import
operator
import
time
from
inspect
import
signature
import
os
import
abc
from
shutil
import
get_terminal_size
import
lockfile
import
math
import
pickle
import
json
from
tqdm
import
tqdm
import
logging
import
tkinter
as
tk
from
tkinter
import
filedialog
import
base64
import
datetime
import
numpy
as
np
from
scipy.optimize
import
brentq
from
scipy
import
linalg
import
colorlog
from
pushbullet
import
Pushbullet
# Package logger
my_log_formatter
=
colorlog
.
ColoredFormatter
(
'
%(log_color)s
%(asctime)s
%(message)s
'
,
datefmt
=
'
%d
/%m/%Y %H:%M:%S:'
,
reset
=
True
,
log_colors
=
{
'DEBUG'
:
'green'
,
'INFO'
:
'white'
,
'WARNING'
:
'yellow'
,
'ERROR'
:
'red'
,
'CRITICAL'
:
'red,bg_white'
,
},
style
=
'%'
)
def
setHandler
(
logger
,
handler
):
for
h
in
logger
.
handlers
:
logger
.
removeHandler
(
h
)
logger
.
addHandler
(
handler
)
return
logger
def
setLogger
(
name
,
formatter
):
handler
=
colorlog
.
StreamHandler
()
handler
.
setFormatter
(
formatter
)
logger
=
colorlog
.
getLogger
(
name
)
logger
.
addHandler
(
handler
)
return
logger
class
TqdmHandler
(
logging
.
StreamHandler
):
def
__init__
(
self
,
formatter
):
logging
.
StreamHandler
.
__init__
(
self
)
self
.
setFormatter
(
formatter
)
def
emit
(
self
,
record
):
msg
=
self
.
format
(
record
)
tqdm
.
write
(
msg
)
logger
=
setLogger
(
'PySONIC'
,
my_log_formatter
)
def
fillLine
(
text
,
char
=
'-'
,
totlength
=
None
):
''' Surround a text with repetitions of a specific character in order to
fill a line to a given total length.
:param text: text to be surrounded
:param char: surrounding character
:param totlength: target number of characters in filled text line
:return: filled text line
'''
if
totlength
is
None
:
totlength
=
get_terminal_size
()
.
columns
-
1
ndashes
=
totlength
-
len
(
text
)
-
2
if
ndashes
<
2
:
return
text
else
:
nside
=
ndashes
//
2
nleft
,
nright
=
nside
,
nside
if
ndashes
%
2
==
1
:
nright
+=
1
return
f
'{char * nleft} {text} {char * nright}'
# SI units prefixes
si_prefixes
=
{
'y'
:
1e-24
,
# yocto
'z'
:
1e-21
,
# zepto
'a'
:
1e-18
,
# atto
'f'
:
1e-15
,
# femto
'p'
:
1e-12
,
# pico
'n'
:
1e-9
,
# nano
'u'
:
1e-6
,
# micro
'm'
:
1e-3
,
# mili
''
:
1e0
,
# None
'k'
:
1e3
,
# kilo
'M'
:
1e6
,
# mega
'G'
:
1e9
,
# giga
'T'
:
1e12
,
# tera
'P'
:
1e15
,
# peta
'E'
:
1e18
,
# exa
'Z'
:
1e21
,
# zetta
'Y'
:
1e24
,
# yotta
}
def
si_format
(
x
,
precision
=
0
,
space
=
' '
):
''' Format a float according to the SI unit system, with the appropriate prefix letter. '''
if
isinstance
(
x
,
float
)
or
isinstance
(
x
,
int
)
or
isinstance
(
x
,
np
.
float
)
or
\
isinstance
(
x
,
np
.
int32
)
or
isinstance
(
x
,
np
.
int64
):
if
x
==
0
:
factor
=
1e0
prefix
=
''
else
:
sorted_si_prefixes
=
sorted
(
si_prefixes
.
items
(),
key
=
operator
.
itemgetter
(
1
))
vals
=
[
tmp
[
1
]
for
tmp
in
sorted_si_prefixes
]
ix
=
np
.
searchsorted
(
vals
,
np
.
abs
(
x
))
-
1
if
np
.
abs
(
x
)
==
vals
[
ix
+
1
]:
ix
+=
1
factor
=
vals
[
ix
]
prefix
=
sorted_si_prefixes
[
ix
][
0
]
return
f
'{x / factor:.{precision}f}{space}{prefix}'
elif
isinstance
(
x
,
list
)
or
isinstance
(
x
,
tuple
):
return
[
si_format
(
item
,
precision
,
space
)
for
item
in
x
]
elif
isinstance
(
x
,
np
.
ndarray
)
and
x
.
ndim
==
1
:
return
[
si_format
(
float
(
item
),
precision
,
space
)
for
item
in
x
]
else
:
print
(
type
(
x
))
def
pow10_format
(
number
,
precision
=
2
):
''' Format a number in power of 10 notation. '''
sci_string
=
f
'{number:.{precision}e}'
value
,
exponent
=
sci_string
.
split
(
"e"
)
value
,
exponent
=
float
(
value
),
int
(
exponent
)
val_str
=
f
'{value} * '
if
value
!=
1.
else
''
return
f
'{val_str}10^{{{exponent}}}'
def
plural
(
n
):
if
n
<
0
:
raise
ValueError
(
f
'Cannot format negative integer (n = {n})'
)
if
n
==
0
:
return
''
else
:
return
's'
def
rmse
(
x1
,
x2
):
''' Compute the root mean square error between two 1D arrays '''
return
np
.
sqrt
(((
x1
-
x2
)
**
2
)
.
mean
())
def
rsquared
(
x1
,
x2
):
''' compute the R-squared coefficient between two 1D arrays '''
residuals
=
x1
-
x2
ss_res
=
np
.
sum
(
residuals
**
2
)
ss_tot
=
np
.
sum
((
x1
-
np
.
mean
(
x1
))
**
2
)
return
1
-
(
ss_res
/
ss_tot
)
def
Pressure2Intensity
(
p
,
rho
=
1075.0
,
c
=
1515.0
):
''' Return the spatial peak, pulse average acoustic intensity (ISPPA)
associated with the specified pressure amplitude.
:param p: pressure amplitude (Pa)
:param rho: medium density (kg/m3)
:param c: speed of sound in medium (m/s)
:return: spatial peak, pulse average acoustic intensity (W/m2)
'''
return
p
**
2
/
(
2
*
rho
*
c
)
def
Intensity2Pressure
(
I
,
rho
=
1075.0
,
c
=
1515.0
):
''' Return the pressure amplitude associated with the specified
spatial peak, pulse average acoustic intensity (ISPPA).
:param I: spatial peak, pulse average acoustic intensity (W/m2)
:param rho: medium density (kg/m3)
:param c: speed of sound in medium (m/s)
:return: pressure amplitude (Pa)
'''
return
np
.
sqrt
(
2
*
rho
*
c
*
I
)
def
convertPKL2JSON
():
for
pkl_filepath
in
OpenFilesDialog
(
'pkl'
)[
0
]:
logger
.
info
(
f
'Processing {pkl_filepath} ...'
)
json_filepath
=
f
'{os.path.splitext(pkl_filepath)[0]}.json'
with
open
(
pkl_filepath
,
'rb'
)
as
fpkl
,
open
(
json_filepath
,
'w'
)
as
fjson
:
data
=
pickle
.
load
(
fpkl
)
json
.
dump
(
data
,
fjson
,
ensure_ascii
=
False
,
sort_keys
=
True
,
indent
=
4
)
logger
.
info
(
'All done!'
)
def
OpenFilesDialog
(
filetype
,
dirname
=
''
):
''' Open a FileOpenDialogBox to select one or multiple file.
The default directory and file type are given.
:param dirname: default directory
:param filetype: default file type
:return: tuple of full paths to the chosen filenames
'''
root
=
tk
.
Tk
()
root
.
withdraw
()
filenames
=
filedialog
.
askopenfilenames
(
filetypes
=
[(
filetype
+
" files"
,
'.'
+
filetype
)],
initialdir
=
dirname
)
if
len
(
filenames
)
==
0
:
raise
ValueError
(
'no input file selected'
)
par_dir
=
os
.
path
.
abspath
(
os
.
path
.
join
(
filenames
[
0
],
os
.
pardir
))
return
filenames
,
par_dir
def
selectDirDialog
(
title
=
'Select directory'
):
''' Open a dialog box to select a directory.
:return: full path to selected directory
'''
root
=
tk
.
Tk
()
root
.
withdraw
()
directory
=
filedialog
.
askdirectory
(
title
=
title
)
if
directory
==
''
:
raise
ValueError
(
'no directory selected'
)
return
directory
def
SaveFileDialog
(
filename
,
dirname
=
None
,
ext
=
None
):
''' Open a dialog box to save file.
:param filename: filename
:param dirname: initial directory
:param ext: default extension
:return: full path to the chosen filename
'''
root
=
tk
.
Tk
()
root
.
withdraw
()
filename_out
=
filedialog
.
asksaveasfilename
(
defaultextension
=
ext
,
initialdir
=
dirname
,
initialfile
=
filename
)
if
len
(
filename_out
)
==
0
:
raise
ValueError
(
'no output filepath selected'
)
return
filename_out
def
loadData
(
fpath
,
frequency
=
1
):
''' Load dataframe and metadata dictionary from pickle file. '''
logger
.
info
(
'Loading data from "
%s
"'
,
os
.
path
.
basename
(
fpath
))
with
open
(
fpath
,
'rb'
)
as
fh
:
frame
=
pickle
.
load
(
fh
)
df
=
frame
[
'data'
]
.
iloc
[::
frequency
]
meta
=
frame
[
'meta'
]
return
df
,
meta
def
rescale
(
x
,
lb
=
None
,
ub
=
None
,
lb_new
=
0
,
ub_new
=
1
):
''' Rescale a value to a specific interval by linear transformation. '''
if
lb
is
None
:
lb
=
x
.
min
()
if
ub
is
None
:
ub
=
x
.
max
()
xnorm
=
(
x
-
lb
)
/
(
ub
-
lb
)
return
xnorm
*
(
ub_new
-
lb_new
)
+
lb_new
def
expandRange
(
xmin
,
xmax
,
exp_factor
=
2
):
if
xmin
>
xmax
:
raise
ValueError
(
'values must be provided in (min, max) order'
)
xptp
=
xmax
-
xmin
xmid
=
(
xmin
+
xmax
)
/
2
xdev
=
xptp
*
exp_factor
/
2
return
(
xmid
-
xdev
,
xmin
+
xdev
)
def
isIterable
(
x
):
for
t
in
[
list
,
tuple
,
np
.
ndarray
]:
if
isinstance
(
x
,
t
):
return
True
return
False
def
isWithin
(
name
,
val
,
bounds
,
rel_tol
=
1e-9
):
''' Check if a floating point number is within an interval.
If the value falls outside the interval, an error is raised.
If the value falls just outside the interval due to rounding errors,
the associated interval bound is returned.
:param val: float value
:param bounds: interval bounds (float tuple)
:return: original or corrected value
'''
if
isIterable
(
val
):
return
np
.
array
([
isWithin
(
name
,
v
,
bounds
,
rel_tol
)
for
v
in
val
])
if
val
>=
bounds
[
0
]
and
val
<=
bounds
[
1
]:
return
val
elif
val
<
bounds
[
0
]
and
math
.
isclose
(
val
,
bounds
[
0
],
rel_tol
=
rel_tol
):
logger
.
warning
(
'Rounding
%s
value (
%s
) to interval lower bound (
%s
)'
,
name
,
val
,
bounds
[
0
])
return
bounds
[
0
]
elif
val
>
bounds
[
1
]
and
math
.
isclose
(
val
,
bounds
[
1
],
rel_tol
=
rel_tol
):
logger
.
warning
(
'Rounding
%s
value (
%s
) to interval upper bound (
%s
)'
,
name
,
val
,
bounds
[
1
])
return
bounds
[
1
]
else
:
raise
ValueError
(
f
'{name} value ({val}) out of [{bounds[0]}, {bounds[1]}] interval'
)
def
getDistribution
(
xmin
,
xmax
,
nx
,
scale
=
'lin'
):
if
scale
==
'log'
:
xmin
,
xmax
=
np
.
log10
(
xmin
),
np
.
log10
(
xmax
)
return
{
'lin'
:
np
.
linspace
,
'log'
:
np
.
logspace
}[
scale
](
xmin
,
xmax
,
nx
)
def
getDistFromList
(
xlist
):
if
not
isinstance
(
xlist
,
list
):
raise
TypeError
(
'Input must be a list'
)
if
len
(
xlist
)
!=
4
:
raise
ValueError
(
'List must contain exactly 4 arguments ([type, min, max, n])'
)
scale
=
xlist
[
0
]
if
scale
not
in
(
'log'
,
'lin'
):
raise
ValueError
(
'Unknown distribution type (must be "lin" or "log")'
)
xmin
,
xmax
=
[
float
(
x
)
for
x
in
xlist
[
1
:
-
1
]]
if
xmin
>=
xmax
:
raise
ValueError
(
'Specified minimum higher or equal than specified maximum'
)
nx
=
int
(
xlist
[
-
1
])
if
nx
<
2
:
raise
ValueError
(
'Specified number must be at least 2'
)
return
getDistribution
(
xmin
,
xmax
,
nx
,
scale
=
scale
)
def
getIndex
(
container
,
value
):
''' Return the index of a float / string value in a list / array
:param container: list / 1D-array of elements
:param value: value to search for
:return: index of value (if found)
'''
if
isinstance
(
value
,
float
):
container
=
np
.
array
(
container
)
imatches
=
np
.
where
(
np
.
isclose
(
container
,
value
,
rtol
=
1e-9
,
atol
=
1e-16
))[
0
]
if
len
(
imatches
)
==
0
:
raise
ValueError
(
f
'{value} not found in {container}'
)
return
imatches
[
0
]
elif
isinstance
(
value
,
str
):
return
container
.
index
(
value
)
def
funcSig
(
func
,
args
,
kwargs
):
args_repr
=
[
repr
(
a
)
for
a
in
args
]
kwargs_repr
=
[
f
"{k}={v!r}"
for
k
,
v
in
kwargs
.
items
()]
return
f
'{func.__name__}({", ".join(args_repr + kwargs_repr)})'
def
debug
(
func
):
''' Print the function signature and return value. '''
@wraps
(
func
)
def
wrapper_debug
(
*
args
,
**
kwargs
):
print
(
f
'Calling {funcSig(func, args, kwargs)}'
)
value
=
func
(
*
args
,
**
kwargs
)
print
(
f
"{func.__name__!r} returned {value!r}"
)
return
value
return
wrapper_debug
def
timer
(
func
):
''' Monitor and return the runtime of the decorated function. '''
@wraps
(
func
)
def
wrapper
(
*
args
,
**
kwargs
):
start_time
=
time
.
perf_counter
()
value
=
func
(
*
args
,
**
kwargs
)
end_time
=
time
.
perf_counter
()
run_time
=
end_time
-
start_time
return
value
,
run_time
return
wrapper
def
alignWithFuncDef
(
func
,
args
,
kwargs
):
''' Align a set of provided positional and keyword arguments with the arguments
signature in a specific function definition.
:param func: function object
:param args: list of provided positional arguments
:param kwargs: dictionary of provided keyword arguments
:return: 2-tuple with the modified arguments and
'''
# Get positional and keyword arguments from function signature
sig_params
=
{
k
:
v
for
k
,
v
in
signature
(
func
)
.
parameters
.
items
()}
sig_args
=
list
(
filter
(
lambda
x
:
x
.
default
==
x
.
empty
,
sig_params
.
values
()))
sig_kwargs
=
{
k
:
v
.
default
for
k
,
v
in
sig_params
.
items
()
if
v
.
default
!=
v
.
empty
}
sig_nargs
=
len
(
sig_args
)
kwarg_keys
=
list
(
sig_kwargs
.
keys
())
# Restrain provided positional arguments to those that are also positional in signature
new_args
=
args
[:
sig_nargs
]
# Construct hybrid keyword arguments dictionary from:
# - remaining positional arguments
# - provided keyword arguments
# - default keyword arguments
new_kwargs
=
sig_kwargs
for
i
,
x
in
enumerate
(
args
[
sig_nargs
:]):
new_kwargs
[
kwarg_keys
[
i
]]
=
x
for
k
,
v
in
kwargs
.
items
():
new_kwargs
[
k
]
=
v
return
new_args
,
new_kwargs
def
alignWithMethodDef
(
method
,
args
,
kwargs
):
args
,
kwargs
=
alignWithFuncDef
(
method
,
[
None
]
+
list
(
args
),
kwargs
)
return
tuple
(
args
[
1
:]),
kwargs
def
logCache
(
fpath
,
delimiter
=
'
\t
'
,
out_type
=
float
):
''' Add an extra IO memoization functionality to a function using file caching,
to avoid repetitions of tedious computations with identical inputs.
'''
def
wrapper_with_args
(
func
):
@wraps
(
func
)
def
wrapper
(
*
args
,
**
kwargs
):
# If function has history -> do not log
if
'history'
in
kwargs
:
return
func
(
*
args
,
**
kwargs
)
# Modify positional and keyword arguments to match function signature, if needed
args
,
kwargs
=
alignWithFuncDef
(
func
,
args
,
kwargs
)
# Translate args and kwargs into string signature
fsignature
=
funcSig
(
func
,
args
,
kwargs
)
# If entry present in log, return corresponding output
if
os
.
path
.
isfile
(
fpath
):
with
open
(
fpath
,
'r'
,
newline
=
''
)
as
f
:
reader
=
csv
.
reader
(
f
,
delimiter
=
delimiter
)
for
row
in
reader
:
if
row
[
0
]
==
fsignature
:
logger
.
debug
(
f
'entry found in "{os.path.basename(fpath)}"'
)
return
out_type
(
row
[
1
])
# Otherwise, compute output and log it into file before returning
out
=
func
(
*
args
,
**
kwargs
)
lock
=
lockfile
.
FileLock
(
fpath
)
lock
.
acquire
()
with
open
(
fpath
,
'a'
,
newline
=
''
)
as
csvfile
:
writer
=
csv
.
writer
(
csvfile
,
delimiter
=
delimiter
)
writer
.
writerow
([
fsignature
,
str
(
out
)])
lock
.
release
()
return
out
return
wrapper
return
wrapper_with_args
def
fileCache
(
root
,
fcode_func
,
ext
=
'json'
):
def
wrapper_with_args
(
func
):
@wraps
(
func
)
def
wrapper
(
*
args
,
**
kwargs
):
# Get load and dump functions from file extension
try
:
load_func
=
{
'json'
:
json
.
load
,
'pkl'
:
pickle
.
load
,
'csv'
:
lambda
f
:
np
.
loadtxt
(
f
,
delimiter
=
','
)
}[
ext
]
dump_func
=
{
'json'
:
json
.
dump
,
'pkl'
:
pickle
.
dump
,
'csv'
:
lambda
x
,
f
:
np
.
savetxt
(
f
,
x
,
delimiter
=
','
)
}[
ext
]
except
KeyError
:
raise
ValueError
(
'Unknown file extension'
)
# Get read and write mode (text or binary) from file extension
mode
=
'b'
if
ext
==
'pkl'
else
''
# Get file path from root and function arguments, using fcode function
if
callable
(
fcode_func
):
fcode
=
fcode_func
(
*
args
)
else
:
fcode
=
fcode_func
fpath
=
os
.
path
.
join
(
os
.
path
.
abspath
(
root
),
f
'{fcode}.{ext}'
)
# If file exists, load output from it
if
os
.
path
.
isfile
(
fpath
):
logger
.
info
(
f
'loading data from "{fpath}"'
)
with
open
(
fpath
,
'r'
+
mode
)
as
f
:
out
=
load_func
(
f
)
# Otherwise, execute function and create the file to dump the output
else
:
logger
.
warning
(
f
'reference data file not found: "{fpath}"'
)
out
=
func
(
*
args
,
**
kwargs
)
logger
.
info
(
f
'dumping data in "{fpath}"'
)
lock
=
lockfile
.
FileLock
(
fpath
)
lock
.
acquire
()
with
open
(
fpath
,
'w'
+
mode
)
as
f
:
dump_func
(
out
,
f
)
lock
.
release
()
return
out
return
wrapper
return
wrapper_with_args
def
derivative
(
f
,
x
,
eps
,
method
=
'central'
):
''' Compute the difference formula for f'(x) with perturbation size eps.
:param dfunc: derivatives function, taking an array of states and returning
an array of derivatives
:param x: states vector
:param method: difference formula: 'forward', 'backward' or 'central'
:param eps: perturbation vector (same size as states vector)
:return: numerical approximation of the derivative around the fixed point
'''
if
isIterable
(
x
):
if
not
isIterable
(
eps
)
or
len
(
eps
)
!=
len
(
x
):
raise
ValueError
(
'eps must be the same size as x'
)
elif
np
.
sum
(
eps
!=
0.
)
!=
1
:
raise
ValueError
(
'eps must be zero-valued across all but one dimensions'
)
eps_val
=
np
.
sum
(
eps
)
else
:
eps_val
=
eps
if
method
==
'central'
:
df
=
(
f
(
x
+
eps
)
-
f
(
x
-
eps
))
/
2
elif
method
==
'forward'
:
df
=
f
(
x
+
eps
)
-
f
(
x
)
elif
method
==
'backward'
:
df
=
f
(
x
)
-
f
(
x
-
eps
)
else
:
raise
ValueError
(
"Method must be 'central', 'forward' or 'backward'."
)
return
df
/
eps_val
def
jacobian
(
dfunc
,
x
,
rel_eps
=
None
,
abs_eps
=
None
,
method
=
'central'
):
''' Evaluate the Jacobian maatrix of a (time-invariant) system, given a states vector
and derivatives function.
:param dfunc: derivatives function, taking an array of n states and returning
an array of n derivatives
:param x: n-states vector
:return: n-by-n square Jacobian matrix
'''
if
sum
(
e
is
not
None
for
e
in
[
abs_eps
,
rel_eps
])
!=
1
:
raise
ValueError
(
'one (and only one) of "rel_eps" or "abs_eps" parameters must be provided'
)
# Determine vector size
x
=
np
.
asarray
(
x
)
n
=
x
.
size
# Initialize Jacobian matrix
J
=
np
.
empty
((
n
,
n
))
# Create epsilon vector
if
rel_eps
is
not
None
:
mode
=
'relative'
eps_vec
=
rel_eps
else
:
mode
=
'absolute'
eps_vec
=
abs_eps
if
not
isIterable
(
eps_vec
):
eps_vec
=
np
.
array
([
eps_vec
]
*
n
)
if
mode
==
'relative'
:
eps
=
x
*
eps_vec
else
:
eps
=
eps_vec
# Perturb each state by epsilon on both sides, re-evaluate derivatives
# and assemble Jacobian matrix
ei
=
np
.
zeros
(
n
)
for
i
in
range
(
n
):
ei
[
i
]
=
1
J
[:,
i
]
=
derivative
(
dfunc
,
x
,
eps
*
ei
,
method
=
method
)
ei
[
i
]
=
0
return
J
def
classifyFixedPoint
(
x
,
dfunc
):
''' Characterize the stability of a fixed point by numerically evaluating its Jacobian
matrix and evaluating the sign of the real part of its associated eigenvalues.
:param x: n-states vector
:param dfunc: derivatives function, taking an array of n states and returning
an array of n derivatives
'''
# Compute Jacobian numerically
# print(f'x = {x}, dfunx(x) = {dfunc(x)}')
eps_machine
=
np
.
sqrt
(
np
.
finfo
(
float
)
.
eps
)
J
=
jacobian
(
dfunc
,
x
,
rel_eps
=
eps_machine
,
method
=
'forward'
)
# Compute eigenvalues and eigenvectors
eigvals
,
eigvecs
=
linalg
.
eig
(
J
)
logger
.
debug
(
f
"eigenvalues = {[f'({x.real:.2e} + {x.imag:.2e}j)' for x in eigvals]}"
)
# Determine fixed point stability based on eigenvalues
is_neg_eigvals
=
eigvals
.
real
<
0
if
is_neg_eigvals
.
all
():
# all real parts negative -> stable
key
=
'stable'
elif
is_neg_eigvals
.
any
():
# both posivie and negative real parts -> saddle
key
=
'saddle'
else
:
# all real parts positive -> unstable
key
=
'unstable'
return
eigvals
,
key
def
findModifiedEq
(
x0
,
dfunc
,
*
args
):
''' Find an equilibrium variable in a modified system by searching for its
derivative root within an interval around its original equilibrium.
:param x0: equilibrium value in original system.
:param func: derivative function, taking the variable as first parameter.
:param *args: remaining arguments needed for the derivative function.
:return: variable equilibrium value in modified system.
'''
is_iterable
=
[
isIterable
(
arg
)
for
arg
in
args
]
if
any
(
is_iterable
):
if
not
all
(
is_iterable
):
raise
ValueError
(
'mix of iterables and non-iterables'
)
lengths
=
[
len
(
arg
)
for
arg
in
args
]
if
not
all
(
n
==
lengths
[
0
]
for
n
in
lengths
):
raise
ValueError
(
f
'inputs are not of the same size: {lengths}'
)
n
=
lengths
[
0
]
res
=
[]
for
i
in
range
(
n
):
x
=
[
arg
[
i
]
for
arg
in
args
]
res
.
append
(
findModifiedEq
(
x0
,
dfunc
,
*
x
))
return
np
.
array
(
res
)
else
:
return
brentq
(
lambda
x
:
dfunc
(
x
,
*
args
),
x0
*
1e-4
,
x0
*
1e3
,
xtol
=
1e-16
)
def
swapFirstLetterCase
(
s
):
if
s
[
0
]
.
islower
():
return
s
.
capitalize
()
else
:
return
s
[
0
]
.
lower
()
+
s
[
1
:]
def
getPow10
(
x
,
direction
=
'up'
):
''' Get the power of 10 that is closest to a number, in either direction("down" or "up"). '''
round_method
=
{
'up'
:
np
.
ceil
,
'down'
:
np
.
floor
}[
direction
]
return
np
.
power
(
10
,
round_method
(
np
.
log10
(
x
)))
def
rotAroundPoint2D
(
x
,
theta
,
p
):
''' Rotate a 2D vector around a center point by a given angle.
:param x: 2D coordinates vector
:param theta: rotation angle (in rad)
:param p: 2D center point coordinates
:return: 2D rotated coordinates vector
'''
n1
,
n2
=
x
.
shape
if
n1
!=
2
:
if
n2
==
2
:
x
=
x
.
T
else
:
raise
ValueError
(
'x should be a 2-by-n vector'
)
# Rotation matrix
R
=
np
.
array
([
[
np
.
cos
(
theta
),
-
np
.
sin
(
theta
)],
[
np
.
sin
(
theta
),
np
.
cos
(
theta
)],
])
# Broadcast center point to input vector
ptile
=
np
.
tile
(
p
,
(
x
.
shape
[
1
],
1
))
.
T
# Subtract, rotate and add
return
R
.
dot
(
x
-
ptile
)
+
ptile
def
getKey
(
keyfile
=
'pushbullet.key'
):
dir_path
=
os
.
path
.
dirname
(
os
.
path
.
realpath
(
__file__
))
package_root
=
os
.
path
.
abspath
(
os
.
path
.
join
(
dir_path
,
os
.
pardir
))
fpath
=
os
.
path
.
join
(
package_root
,
keyfile
)
if
not
os
.
path
.
isfile
(
fpath
):
raise
FileNotFoundError
(
'pushbullet API key file not found'
)
with
open
(
fpath
)
as
f
:
encoded_key
=
f
.
readlines
()[
0
]
return
base64
.
b64decode
(
str
.
encode
(
encoded_key
))
.
decode
()
def
sendPushNotification
(
msg
):
try
:
key
=
getKey
()
pb
=
Pushbullet
(
key
)
dt
=
datetime
.
datetime
.
now
()
s
=
dt
.
strftime
(
'%Y-%m-
%d
%H:%M:%S'
)
push
=
pb
.
push_note
(
'Code Messenger'
,
f
'{s}
\n
{msg}'
)
except
FileNotFoundError
as
e
:
logger
.
error
(
f
'Could not send push notification: "{msg}"'
)
def
alert
(
func
):
''' Run a function, and send a push notification upon completion,
or if an error is raised during its execution.
'''
@wraps
(
func
)
def
wrapper
(
*
args
,
**
kwargs
):
try
:
out
=
func
(
*
args
,
**
kwargs
)
sendPushNotification
(
f
'completed "{func.__name__}" execution successfully'
)
return
out
except
BaseException
as
e
:
sendPushNotification
(
f
'error during "{func.__name__}" execution: {e}'
)
raise
e
return
wrapper
def
sunflower
(
n
,
radius
=
1
,
alpha
=
1
):
''' Generate a population of uniformly distributed 2D data points
in a unit circle.
:param n: number of data points
:param alpha: coefficient determining evenness of the boundary
:return: 2D matrix of Cartesian (x, y) positions
'''
nbounds
=
np
.
round
(
alpha
*
np
.
sqrt
(
n
))
# number of boundary points
phi
=
(
np
.
sqrt
(
5
)
+
1
)
/
2
# golden ratio
k
=
np
.
arange
(
1
,
n
+
1
)
# index vector
theta
=
2
*
np
.
pi
*
k
/
phi
**
2
# angle vector
r
=
np
.
sqrt
((
k
-
1
)
/
(
n
-
nbounds
-
1
))
# radius vector
r
[
r
>
1
]
=
1
x
=
r
*
np
.
cos
(
theta
)
y
=
r
*
np
.
sin
(
theta
)
return
radius
*
np
.
vstack
((
x
,
y
))
def
filecode
(
model
,
*
args
):
''' Generate file code given a specific combination of model input parameters. '''
# If meta dictionary was passed, generate inputs list from it
if
len
(
args
)
==
1
and
isinstance
(
args
[
0
],
dict
):
meta
=
args
[
0
]
meta_keys
=
list
(
signature
(
model
.
meta
)
.
parameters
.
keys
())
args
=
[
meta
[
k
]
for
k
in
meta_keys
]
# Otherwise, transform args tuple into list
else
:
args
=
list
(
args
)
# If any argument is an iterable -> transform it to a continous string
for
i
in
range
(
len
(
args
)):
if
isIterable
(
args
[
i
]):
args
[
i
]
=
''
.
join
([
str
(
x
)
for
x
in
args
[
i
]])
# Create file code by joining string-encoded inputs with underscores
codes
=
model
.
filecodes
(
*
args
)
.
values
()
return
'_'
.
join
([
x
for
x
in
codes
if
x
is
not
None
])
def
simAndSave
(
model
,
*
args
,
**
kwargs
):
''' Simulate the model and save the results in a specific output directory.
:param *args: list of arguments provided to the simulation function
:param **kwargs: optional arguments dictionary
:return: output filepath
'''
# Extract output directory and overwrite boolean from keyword arguments.
outputdir
=
kwargs
.
pop
(
'outputdir'
,
'.'
)
overwrite
=
kwargs
.
pop
(
'overwrite'
,
True
)
# Set data and meta to None
data
,
meta
=
None
,
None
# Extract drive object from args
drive
,
*
other_args
=
args
# If drive is searchable and not fully resolved
if
drive
.
is_searchable
:
if
not
drive
.
is_resolved
:
# Call simulate to perform titration
out
=
model
.
simulate
(
*
args
)
# If titration yields nothing -> no file produced -> return None
if
out
is
None
:
logger
.
warning
(
'returning None'
)
return
None
# Store data and meta
data
,
meta
=
out
# Update args list with resovled drive
args
=
(
meta
[
'drive'
],
*
other_args
)
# Check if a output file corresponding to sim inputs is found in the output directory
# That check is performed prior to running the simulation, such that
# it is not run if the file is present and overwrite is set ot false.
fname
=
f
'{model.filecode(*args)}.pkl'
fpath
=
os
.
path
.
join
(
outputdir
,
fname
)
existing_file_msg
=
f
'File "{fname}" already present in directory "{outputdir}"'
existing_file
=
os
.
path
.
isfile
(
fpath
)
# If file exists and overwrite is set ot false -> return
if
existing_file
and
not
overwrite
:
logger
.
warning
(
f
'{existing_file_msg} -> preserving'
)
return
fpath
# Run simulation if not already done (for titration cases)
if
data
is
None
:
data
,
meta
=
model
.
simulate
(
*
args
)
# Raise warning if an existing file is overwritten
if
existing_file
:
logger
.
warning
(
f
'{existing_file_msg} -> overwriting'
)
# Save output file and return output filepath
with
open
(
fpath
,
'wb'
)
as
fh
:
pickle
.
dump
({
'meta'
:
meta
,
'data'
:
data
},
fh
)
logger
.
debug
(
'simulation data exported to "
%s
"'
,
fpath
)
return
fpath
def
moveItem
(
l
,
value
,
itarget
):
''' Move a list item to a specific target index.
:param l: list object
:param value: value of the item to move
:param itarget: target index
:return: re-ordered list.
'''
# Get absolute target index
if
itarget
<
0
:
itarget
+=
len
(
l
)
assert
itarget
<
len
(
l
),
f
'target index {itarget} exceeds list size ({len(l)})'
# Get index corresponding to element and delete entry from list
iref
=
l
.
index
(
value
)
new_l
=
l
.
copy
()
del
new_l
[
iref
]
# Return re-organized list
return
new_l
[:
itarget
]
+
[
value
]
+
new_l
[
itarget
:]
class
StimObject
(
metaclass
=
abc
.
ABCMeta
):
''' Generic interface to a simulation object. '''
@staticmethod
@abc.abstractmethod
def
inputs
():
raise
NotImplementedError
def
checkInt
(
self
,
key
,
value
):
if
not
isinstance
(
value
,
int
):
raise
TypeError
(
f
'Invalid {self.inputs()[key]["desc"]} (must be an integer)'
)
return
value
def
checkFloat
(
self
,
key
,
value
):
if
isinstance
(
value
,
int
):
value
=
float
(
value
)
if
not
isinstance
(
value
,
float
):
raise
TypeError
(
f
'Invalid {self.inputs()[key]["desc"]} (must be float typed)'
)
return
value
def
checkStrictlyPositive
(
self
,
key
,
value
):
if
value
<=
0
:
d
=
self
.
inputs
()[
key
]
raise
ValueError
(
f
'Invalid {d["desc"]}: {value * d.get("factor", 1)} {d["unit"]} (must be strictly positive)'
)
def
checkPositiveOrNull
(
self
,
key
,
value
):
if
value
<
0
:
d
=
self
.
inputs
()[
key
]
raise
ValueError
(
f
'Invalid {d["desc"]}: {value * d.get("factor", 1)} {d["unit"]} (must be positive or null)'
)
def
checkStrictlyNegative
(
self
,
key
,
value
):
if
value
>=
0
:
d
=
self
.
inputs
()[
key
]
raise
ValueError
(
f
'Invalid {d["desc"]}: {value * d.get("factor", 1)} {d["unit"]} (must be strictly negative)'
)
def
checkNegativeOrNull
(
self
,
key
,
value
):
if
value
>
0
:
d
=
self
.
inputs
()[
key
]
raise
ValueError
(
f
'Invalid {d["desc"]}: {value * d.get("factor", 1)} {d["unit"]} (must be negative or null)'
)
def
checkBounded
(
self
,
key
,
value
,
bounds
):
if
value
<
bounds
[
0
]
or
value
>
bounds
[
1
]:
d
=
self
.
inputs
()[
key
]
f
,
u
=
d
.
get
(
"factor"
,
1
),
d
[
"unit"
]
bounds_str
=
f
'[{bounds[0] * f}; {bounds[1] * f}] {u}'
raise
ValueError
(
f
'Invalid {d["desc"]}: {value * f} {u} (must be within {bounds_str})'
)
Event Timeline
Log In to Comment