Page Menu
Home
c4science
Search
Configure Global Search
Log In
Files
F91931768
simulators.py
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Subscribers
None
File Metadata
Details
File Info
Storage
Attached
Created
Fri, Nov 15, 20:50
Size
18 KB
Mime Type
text/x-python
Expires
Sun, Nov 17, 20:50 (2 d)
Engine
blob
Format
Raw Data
Handle
22349579
Attached To
R4670 PySONIC (old)
simulators.py
View Options
# -*- coding: utf-8 -*-
# @Author: Theo Lemaire
# @Date: 2019-05-28 14:45:12
# @Last Modified by: Theo Lemaire
# @Last Modified time: 2019-06-01 19:45:34
import
abc
import
numpy
as
np
import
nolds
from
scipy.integrate
import
ode
,
odeint
from
tqdm
import
tqdm
from
..utils
import
*
from
..constants
import
*
class
Simulator
(
metaclass
=
abc
.
ABCMeta
):
''' Generic interface to simulator object. '''
def
initialize
(
self
,
y0
):
''' Initialize global arrays.
:param y0: vector of initial conditions
:return: 3-tuple with the initialized time vector, solution matrix and state vector
'''
t
=
np
.
array
([
0.
])
y
=
np
.
atleast_2d
(
y0
)
stim
=
np
.
array
([
1
])
return
t
,
y
,
stim
def
appendSolution
(
self
,
t
,
y
,
stim
,
tnew
,
ynew
,
is_on
):
''' Append to time vector, solution matrix and state vector.
:param t: preceding time vector
:param y: preceding solution matrix
:param stim: preceding stimulation state vector
:param tnew: integration time vector for current interval
:param ynew: derivative function for current interval
:param is_on: stimulation state for current interval
:return: 3-tuple with the appended time vector, solution matrix and state vector
'''
t
=
np
.
concatenate
((
t
,
tnew
[
1
:]))
y
=
np
.
concatenate
((
y
,
ynew
[
1
:]),
axis
=
0
)
stim
=
np
.
concatenate
((
stim
,
np
.
ones
(
tnew
.
size
-
1
)
*
is_on
))
return
t
,
y
,
stim
def
integrate
(
self
,
t
,
y
,
stim
,
tnew
,
dfunc
,
is_on
):
''' Integrate system for a time interval and append to preceding solution arrays.
:param t: preceding time vector
:param y: preceding solution matrix
:param stim: preceding stimulation state vector
:param tnew: integration time vector for current interval
:param dfunc: derivative function for current interval
:param is_on: stimulation state for current interval
:return: 3-tuple with the appended time vector, solution matrix and state vector
'''
if
tnew
.
size
==
0
:
return
t
,
y
,
stim
ynew
=
odeint
(
dfunc
,
y
[
-
1
],
tnew
)
return
self
.
appendSolution
(
t
,
y
,
stim
,
tnew
,
ynew
,
is_on
)
def
resample
(
self
,
t
,
y
,
stim
,
target_dt
):
''' Resample a solution to a new target time step.
:param t: time vector
:param y: solution matrix
:param stim: stimulation state vector
:target_dt: target time step after resampling
:return: 3-tuple with the resampled time vector, solution matrix and state vector
'''
dt
=
t
[
1
]
-
t
[
0
]
rf
=
int
(
np
.
round
(
target_dt
/
dt
))
assert
rf
>=
1
,
'Hyper-sampling not supported'
logger
.
debug
(
'Downsampling output arrays by factor
%u
(Fs =
%s
Hz)'
,
rf
,
si_format
(
1
/
(
dt
*
rf
),
2
))
return
t
[::
rf
],
y
[::
rf
,
:],
stim
[::
rf
]
@property
@abc.abstractmethod
def
compute
(
self
):
''' Abstract compute method. '''
return
'Should never reach here'
def
__call__
(
self
,
*
args
,
**
kwargs
):
''' Call and return compute method, with conditional time monitoring. '''
monitor_time
=
kwargs
.
pop
(
'monitor_time'
)
if
monitor_time
:
start_time
=
time
.
perf_counter
()
output
=
self
.
compute
(
*
args
,
**
kwargs
)
if
monitor_time
:
end_time
=
time
.
perf_counter
()
run_time
=
end_time
-
start_time
output
=
output
,
run_time
return
output
class
PeriodicSimulator
(
Simulator
):
def
__init__
(
self
,
dfunc
,
stopfunc
=
None
,
ivars_to_check
=
None
):
''' Initialize simulator with specific derivative and stop functions
:param dfunc: derivative function
:param stopfunc: function estimating stopping criterion
:param ivars_to_check: solution indexes of variables to check for stability
'''
self
.
dfunc
=
dfunc
self
.
ivars_to_check
=
ivars_to_check
if
stopfunc
is
not
None
:
self
.
stopfunc
=
stopfunc
else
:
self
.
stopfunc
=
self
.
isPeriodicallyStable
def
getNPerCycle
(
self
,
dt
,
T
):
''' Compute number of samples per cycle given a time step and a specific periodicity.
:param dt: integration time step (s)
:param T: periodicity (s)
:return: number of samples per cycle
'''
return
int
(
np
.
round
(
T
/
dt
))
+
1
def
getTimeReference
(
self
,
dt
,
T
):
''' Compute reference integration time vector for a specific periodicity.
:param dt: integration time step (s)
:param T: periodicity (s)
:return: time vector for 1 periodic cycle
'''
return
np
.
linspace
(
0
,
T
,
self
.
getNPerCycle
(
dt
,
T
))
def
isPeriodicallyStable
(
self
,
t
,
y
,
T
):
''' Assess the periodic stabilization of a solution, by evaluating the deviation
of system variables between the last two periods.
:param t: time vector
:param y: solution matrix
:param T: periodicity (s)
:return: boolean stating whether the solution is periodically stable or not
'''
# Extract the 2 cycles of interest from the solution
n
=
self
.
getNPerCycle
(
t
[
1
]
-
t
[
0
],
T
)
-
1
y_last
=
y
[
-
n
:,
:]
y_prec
=
y
[
-
2
*
n
:
-
n
,
:]
# For each variable of interest, evaluate the RMSE between the two cycles, the
# variation range over the last cycle, and the ratio of these 2 quantities
ratios
=
np
.
array
([
rmse
(
y_last
[:,
ivar
],
y_prec
[:,
ivar
])
/
np
.
ptp
(
y_last
[:,
ivar
])
for
ivar
in
self
.
ivars_to_check
])
# Classify the solution as periodically stable only if all RMSE/PTP ratios
# are below critical threshold
is_periodically_stable
=
np
.
all
(
ratios
<
MAX_RMSE_PTP_RATIO
)
# logger.debug(
# 'step %u: ratios = [%s]', icycle,
# ', '.join(['{:.2e}'.format(r) for r in ratios]))
return
is_periodically_stable
def
isAsymptoticallyStable
(
self
,
t
,
y
,
T
):
''' Assess the asymptotically stabilization of a solution, by evaluating the deviation
of system variables from their initial values.
:param t: time vector
:param y: solution matrix
:param T: periodicity (s)
:return: boolean stating whether the solution is asymptotically stable or not
'''
# For each variable of interest, evaluate the ...
lyapunov_exponents
=
np
.
array
([
nolds
.
lyap_r
(
y
[:,
ivar
])
for
ivar
in
self
.
ivars_to_check
])
print
(
lyapunov_exponents
)
is_asyptotically_stable
=
np
.
all
(
lyapunov_exponents
<
1e-5
)
return
is_asyptotically_stable
def
compute
(
self
,
y0
,
dt
,
T
,
t0
=
0.
,
nmax
=
NCYCLES_MAX
):
''' Simulate system with a specific periodicity until stopping criterion is met.
:param y0: 1D vector of initial conditions
:param dt: integration time step (s)
:param T: periodicity (s)
:param t0: starting time
:return: 3-tuple with the time profile, the effective solution matrix and a state vector
'''
# If none specified, set all variables to be checked for stability
if
self
.
ivars_to_check
is
None
:
self
.
ivars_to_check
=
range
(
y0
.
size
)
# Get reference time vector
tref
=
self
.
getTimeReference
(
dt
,
T
)
# Initialize global arrays
t
,
y
,
stim
=
self
.
initialize
(
y0
)
# Integrate system for a few cycles until stopping criterion is met
icycle
=
0
stop
=
False
while
not
stop
and
icycle
<
nmax
:
t
,
y
,
stim
=
self
.
integrate
(
t
,
y
,
stim
,
tref
+
icycle
*
T
,
self
.
dfunc
,
True
)
if
icycle
>
0
:
stop
=
self
.
stopfunc
(
t
,
y
,
T
)
icycle
+=
1
t
+=
t0
# Log stopping criterion
t_str
=
't = {:.5f} ms'
.
format
(
t
[
-
1
]
*
1e3
)
if
icycle
==
nmax
:
logger
.
warning
(
'
%s
: criterion not met -> stopping after
%u
cycles'
,
t_str
,
icycle
)
else
:
logger
.
debug
(
'
%s
: stopping criterion met after
%u
cycles'
,
t_str
,
icycle
)
# Return output variables
return
t
,
y
,
stim
class
PWSimulator
(
Simulator
):
def
__init__
(
self
,
dfunc_on
,
dfunc_off
):
''' Initialize simulator with specific derivative functions
:param dfunc_on: derivative function for ON periods
:param dfunc_off: derivative function for OFF periods
'''
self
.
dfunc_on
=
dfunc_on
self
.
dfunc_off
=
dfunc_off
def
getTimeReference
(
self
,
dt
,
tstim
,
toffset
,
PRF
,
DC
):
''' Compute reference integration time vectors for a specific stimulus application pattern.
:param dt: integration time step (s)
:param tstim: duration of US stimulation (s)
:param toffset: duration of the offset (s)
:param PRF: pulse repetition frequency (Hz)
:param DC: pulse duty cycle (-)
:return: 3-tuple with time vectors for stimulus ON and OFF periods and stimulus offset
'''
# Compute vector sizes
T_ON
=
DC
/
PRF
T_OFF
=
(
1
-
DC
)
/
PRF
# For high-PRF pulsed protocols: adapt time step to ensure minimal
# number of samples during TON or TOFF
dt_warning_msg
=
'high-PRF protocol: lowering time step to
%.2e
s to properly integrate
%s
'
for
key
,
T
in
{
'TON'
:
T_ON
,
'TOFF'
:
T_OFF
}
.
items
():
if
T
>
0
and
T
/
dt
<
MIN_SAMPLES_PER_PULSE_INT
:
dt
=
T
/
MIN_SAMPLES_PER_PULSE_INT
logger
.
warning
(
dt_warning_msg
,
dt
,
key
)
# Initializing accurate time vectors pulse ON and OFF periods, as well as offset
t_on
=
np
.
linspace
(
0
,
T_ON
,
int
(
np
.
round
(
T_ON
/
dt
))
+
1
)
t_off
=
np
.
linspace
(
T_ON
,
1
/
PRF
,
int
(
np
.
round
(
T_OFF
/
dt
)))
t_offset
=
np
.
linspace
(
tstim
,
tstim
+
toffset
,
int
(
np
.
round
(
toffset
/
dt
)))
return
t_on
,
t_off
,
t_offset
def
adjustPRF
(
self
,
tstim
,
PRF
,
DC
,
print_progress
):
''' Adjust the PRF in case of continuous wave stimulus, in order to obtain the desired
number of integration interval(s) during stimulus.
:param tstim: duration of US stimulation (s)
:param PRF: pulse repetition frequency (Hz)
:param DC: pulse duty cycle (-)
:param print_progress: boolean specifying whether to show a progress bar
:return: adjusted PRF value (Hz)
'''
if
DC
<
1.0
:
# if PW stimuli, then no change
return
PRF
else
:
# if CW stimuli, then divide integration according to presence of progress bar
return
{
True
:
100.
,
False
:
1.
}[
print_progress
]
/
tstim
def
getNPulses
(
self
,
tstim
,
PRF
):
''' Calculate number of pulses from stimulus temporal pattern.
:param tstim: duration of US stimulation (s)
:param toffset: duration of the offset (s)
:return: number of pulses during the stimulus
'''
return
int
(
np
.
round
(
tstim
*
PRF
))
def
compute
(
self
,
y0
,
dt
,
tstim
,
toffset
,
PRF
,
DC
,
target_dt
=
None
,
print_progress
=
False
):
''' Simulate system for a specific stimulus application pattern.
:param y0: 1D vector of initial conditions
:param dt: integration time step (s)
:param tstim: duration of US stimulation (s)
:param toffset: duration of the offset (s)
:param PRF: pulse repetition frequency (Hz)
:param DC: pulse duty cycle (-)
:param target_dt: target time step after resampling
:param print_progress: boolean specifying whether to show a progress bar
:return: 3-tuple with the time profile, the effective solution matrix and a state vector
'''
# Adjust PRF and get number of pulses
PRF
=
self
.
adjustPRF
(
tstim
,
PRF
,
DC
,
print_progress
)
npulses
=
self
.
getNPulses
(
tstim
,
PRF
)
# Get reference time vectors
t_on
,
t_off
,
t_offset
=
self
.
getTimeReference
(
dt
,
tstim
,
toffset
,
PRF
,
DC
)
# Initialize global arrays
t
,
y
,
stim
=
self
.
initialize
(
y0
)
# Initialize progress bar
if
print_progress
:
setHandler
(
logger
,
TqdmHandler
(
my_log_formatter
))
ntot
=
int
(
npulses
*
(
tstim
+
toffset
)
/
tstim
)
pbar
=
tqdm
(
total
=
ntot
)
# Integrate ON and OFF intervals of each pulse
for
i
in
range
(
npulses
):
for
j
,
(
tref
,
func
)
in
enumerate
(
zip
([
t_on
,
t_off
],
[
self
.
dfunc_on
,
self
.
dfunc_off
])):
t
,
y
,
stim
=
self
.
integrate
(
t
,
y
,
stim
,
tref
+
i
/
PRF
,
func
,
j
==
0
)
# Update progress bar
if
print_progress
:
pbar
.
update
(
i
)
# Integrate offset interval
t
,
y
,
stim
=
self
.
integrate
(
t
,
y
,
stim
,
t_offset
,
self
.
dfunc_off
,
False
)
# Terminate progress bar
if
print_progress
:
pbar
.
update
(
npulses
)
pbar
.
close
()
# Resample solution if specified
if
target_dt
is
not
None
:
t
,
y
,
stim
=
self
.
resample
(
t
,
y
,
stim
,
target_dt
)
# Return output variables
return
t
,
y
,
stim
class
HybridSimulator
(
PWSimulator
):
def
__init__
(
self
,
dfunc_on
,
dfunc_off
,
dfunc_sparse
,
predfunc
,
is_dense_var
,
stopfunc
=
None
,
ivars_to_check
=
None
):
''' Initialize simulator with specific derivative functions
:param dfunc_on: derivative function for ON periods
:param dfunc_off: derivative function for OFF periods
:param dfunc_sparse: derivative function for sparse integration
:param predfunc: function computing the extra arguments necessary for sparse integration
:param is_dense_var: boolean array stating for each variable if it evolves fast or not
:param stopfunc: function estimating stopping criterion
:param ivars_to_check: solution indexes of variables to check for stability
'''
PWSimulator
.
__init__
(
self
,
dfunc_on
,
dfunc_off
)
self
.
sparse_solver
=
ode
(
dfunc_sparse
)
self
.
sparse_solver
.
set_integrator
(
'dop853'
,
nsteps
=
SOLVER_NSTEPS
,
atol
=
1e-12
)
self
.
predfunc
=
predfunc
self
.
is_dense_var
=
is_dense_var
self
.
is_sparse_var
=
np
.
invert
(
is_dense_var
)
self
.
stopfunc
=
stopfunc
self
.
ivars_to_check
=
ivars_to_check
def
integrate
(
self
,
t
,
y
,
stim
,
tnew
,
dfunc
,
is_on
):
''' Integrate system for a time interval and append to preceding solution arrays,
using a hybrid scheme:
- First, the full ODE system is integrated for a few cycles with a dense time
granularity until a stopping criterion is met
- Second, the profiles of all variables over the last cycle are resampled to a
far lower (i.e. sparse) sampling rate
- Third, a subset of the ODE system is integrated with a sparse time granularity,
for the remaining of the time interval, while the remaining variables are
periodically expanded from their last cycle profile.
:param t: preceding time vector
:param y: preceding solution matrix
:param stim: preceding stimulation state vector
:param tnew: integration time vector for current interval
:param dfunc: derivative function for current interval
:param is_on: stimulation state for current interval
:return: 3-tuple with the appended time vector, solution matrix and state vector
'''
if
tnew
.
size
==
0
:
return
t
,
y
,
stim
# Initialize periodic solver
dense_solver
=
PeriodicSimulator
(
dfunc
,
stopfunc
=
self
.
stopfunc
,
ivars_to_check
=
self
.
ivars_to_check
)
dt_dense
=
tnew
[
1
]
-
tnew
[
0
]
npc_dense
=
dense_solver
.
getNPerCycle
(
dt_dense
,
self
.
T
)
# Until final integration time is reached
while
t
[
-
1
]
<
tnew
[
-
1
]:
logger
.
debug
(
't = {:.5f} ms: starting new hybrid integration'
.
format
(
t
[
-
1
]
*
1e3
))
# Integrate dense system until stopping criterion is met
tdense
,
ydense
,
stimdense
=
dense_solver
.
compute
(
y
[
-
1
],
dt_dense
,
self
.
T
,
t0
=
t
[
-
1
])
t
,
y
,
stim
=
self
.
appendSolution
(
t
,
y
,
stim
,
tdense
,
ydense
,
is_on
)
# Resample signals over last cycle to match sparse time step
tlast
,
ylast
,
stimlast
=
self
.
resample
(
tdense
[
-
npc_dense
:],
ydense
[
-
npc_dense
:],
stimdense
[
-
npc_dense
:],
self
.
dt_sparse
)
npc_sparse
=
tlast
.
size
# Integrate until either the rest of the interval or max update interval is reached
t0
=
tdense
[
-
1
]
tf
=
min
(
tnew
[
-
1
],
tdense
[
0
]
+
DT_UPDATE
)
nsparse
=
int
(
np
.
round
((
tf
-
t0
)
/
self
.
dt_sparse
))
tsparse
=
np
.
linspace
(
t0
,
tf
,
nsparse
)
ysparse
=
np
.
empty
((
nsparse
,
y
.
shape
[
1
]))
ysparse
[
0
]
=
y
[
-
1
]
self
.
sparse_solver
.
set_initial_value
(
y
[
-
1
,
self
.
is_sparse_var
],
t
[
-
1
])
for
j
in
range
(
1
,
tsparse
.
size
):
self
.
sparse_solver
.
set_f_params
(
self
.
predfunc
(
ylast
[
j
%
npc_sparse
]))
self
.
sparse_solver
.
integrate
(
tsparse
[
j
])
if
not
self
.
sparse_solver
.
successful
():
raise
ValueError
(
'integration error at t = {:.5f} ms'
.
format
(
tsparse
[
j
]
*
1e3
))
ysparse
[
j
,
self
.
is_dense_var
]
=
ylast
[
j
%
npc_sparse
,
self
.
is_dense_var
]
ysparse
[
j
,
self
.
is_sparse_var
]
=
self
.
sparse_solver
.
y
t
,
y
,
stim
=
self
.
appendSolution
(
t
,
y
,
stim
,
tsparse
,
ysparse
,
is_on
)
return
t
,
y
,
stim
def
compute
(
self
,
y0
,
dt_dense
,
dt_sparse
,
T
,
tstim
,
toffset
,
PRF
,
DC
,
print_progress
=
False
):
''' Simulate system for a specific stimulus application pattern.
:param y0: 1D vector of initial conditions
:param dt_dense: dense integration time step (s)
:param dt_sparse: sparse integration time step (s)
:param T: periodicity (s)
:param tstim: duration of US stimulation (s)
:param toffset: duration of the offset (s)
:param PRF: pulse repetition frequency (Hz)
:param DC: pulse duty cycle (-)
:param print_progress: boolean specifying whether to show a progress bar
:return: 3-tuple with the time profile, the effective solution matrix and a state vector
'''
# Set periodicity and sparse time step
self
.
T
=
T
self
.
dt_sparse
=
dt_sparse
# Call and return parent compute method
return
PWSimulator
.
compute
(
self
,
y0
,
dt_dense
,
tstim
,
toffset
,
PRF
,
DC
,
target_dt
=
None
,
print_progress
=
print_progress
)
Event Timeline
Log In to Comment