Page Menu
Home
c4science
Search
Configure Global Search
Log In
Files
F64369366
nbls.py
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Subscribers
None
File Metadata
Details
File Info
Storage
Attached
Created
Sun, May 26, 10:30
Size
35 KB
Mime Type
text/x-python
Expires
Tue, May 28, 10:30 (1 d, 23 h)
Engine
blob
Format
Raw Data
Handle
17890859
Attached To
R4670 PySONIC (old)
nbls.py
View Options
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author: Theo Lemaire
# @Date: 2016-09-29 16:16:19
# @Email: theo.lemaire@epfl.ch
# @Last Modified by: Theo Lemaire
# @Last Modified time: 2019-05-31 10:45:52
from
copy
import
deepcopy
import
time
import
logging
import
pickle
import
progressbar
as
pb
import
numpy
as
np
import
pandas
as
pd
from
scipy.integrate
import
ode
,
odeint
,
solve_ivp
from
scipy.interpolate
import
interp1d
from
.simulators
import
PWSimulator
,
HybridSimulator
from
.bls
import
BilayerSonophore
from
.pneuron
import
PointNeuron
from
..utils
import
*
from
..constants
import
*
from
..postpro
import
findPeaks
,
getFixedPoints
class
NeuronalBilayerSonophore
(
BilayerSonophore
):
''' This class inherits from the BilayerSonophore class and receives an PointNeuron instance
at initialization, to define the electro-mechanical NICE model and its SONIC variant. '''
tscale
=
'ms'
# relevant temporal scale of the model
defvar
=
'Q'
# default plot variable
def
__init__
(
self
,
a
,
neuron
,
Fdrive
=
None
,
embedding_depth
=
0.0
):
''' Constructor of the class.
:param a: in-plane radius of the sonophore structure within the membrane (m)
:param neuron: neuron object
:param Fdrive: frequency of acoustic perturbation (Hz)
:param embedding_depth: depth of the embedding tissue around the membrane (m)
'''
# Check validity of input parameters
if
not
isinstance
(
neuron
,
PointNeuron
):
raise
ValueError
(
'Invalid neuron type: "{}" (must inherit from PointNeuron class)'
.
format
(
neuron
.
name
))
self
.
neuron
=
neuron
# Initialize BilayerSonophore parent object
BilayerSonophore
.
__init__
(
self
,
a
,
neuron
.
Cm0
,
neuron
.
Cm0
*
neuron
.
Vm0
*
1e-3
,
embedding_depth
)
def
__repr__
(
self
):
return
'NeuronalBilayerSonophore({}m, {})'
.
format
(
si_format
(
self
.
a
,
precision
=
1
,
space
=
' '
),
self
.
neuron
)
def
pprint
(
self
):
return
'{}m radius NBLS - {} neuron'
.
format
(
si_format
(
self
.
a
,
precision
=
0
,
space
=
' '
),
self
.
neuron
.
name
)
def
getPltVars
(
self
,
wrapleft
=
'df["'
,
wrapright
=
'"]'
):
pltvars
=
super
()
.
getPltVars
(
wrapleft
,
wrapright
)
pltvars
.
update
(
self
.
neuron
.
getPltVars
(
wrapleft
,
wrapright
))
return
pltvars
def
getPltScheme
(
self
):
return
self
.
neuron
.
getPltScheme
()
def
filecode
(
self
,
Fdrive
,
Adrive
,
tstim
,
PRF
,
DC
,
method
):
return
'ASTIM_{}_{}_{:.0f}nm_{:.0f}kHz_{:.2f}kPa_{:.0f}ms_{}{}'
.
format
(
self
.
neuron
.
name
,
'CW'
if
DC
==
1
else
'PW'
,
self
.
a
*
1e9
,
Fdrive
*
1e-3
,
Adrive
*
1e-3
,
tstim
*
1e3
,
'PRF{:.2f}Hz_DC{:.2f}%_'
.
format
(
PRF
,
DC
*
1e2
)
if
DC
<
1.
else
''
,
method
)
def
fullDerivatives
(
self
,
y
,
t
,
Adrive
,
Fdrive
,
phi
):
''' Compute the derivatives of the (n+3) ODE full NBLS system variables.
:param y: vector of state variables
:param t: specific instant in time (s)
:param Adrive: acoustic drive amplitude (Pa)
:param Fdrive: acoustic drive frequency (Hz)
:param phi: acoustic drive phase (rad)
:return: vector of derivatives
'''
dydt_mech
=
BilayerSonophore
.
derivatives
(
self
,
y
[:
3
],
t
,
Adrive
,
Fdrive
,
y
[
3
],
phi
)
dydt_elec
=
self
.
neuron
.
Qderivatives
(
y
[
3
:],
t
,
self
.
Capct
(
y
[
1
]))
return
dydt_mech
+
dydt_elec
def
effDerivatives
(
self
,
y
,
t
,
lkp
):
''' Compute the derivatives of the n-ODE effective HH system variables,
based on 1-dimensional linear interpolation of "effective" coefficients
that summarize the system's behaviour over an acoustic cycle.
:param y: vector of HH system variables at time t
:param t: specific instant in time (s)
:param lkp: dictionary of 1D data points of "effective" coefficients
over the charge domain, for specific frequency and amplitude values.
:return: vector of effective system derivatives at time t
'''
# Split input vector explicitly
Qm
,
*
states
=
y
# Compute charge and channel states variation
Vmeff
=
self
.
neuron
.
interpVmeff
(
Qm
,
lkp
)
dQmdt
=
-
self
.
neuron
.
iNet
(
Vmeff
,
states
)
*
1e-3
dstates
=
self
.
neuron
.
derEffStates
(
Qm
,
states
,
lkp
)
# Return derivatives vector
return
[
dQmdt
,
*
[
dstates
[
k
]
for
k
in
self
.
neuron
.
states
]]
def
interpEffVariable
(
self
,
key
,
Qm
,
stim
,
lkp_on
,
lkp_off
):
''' Interpolate Q-dependent effective variable along solution.
:param key: lookup variable key
:param Qm: charge density solution vector
:param stim: stimulation state solution vector
:param lkp_on: lookups for ON states
:param lkp_off: lookups for OFF states
:return: interpolated effective variable vector
'''
x
=
np
.
zeros
(
stim
.
size
)
x
[
stim
==
0
]
=
np
.
interp
(
Qm
[
stim
==
0
],
lkp_on
[
'Q'
],
lkp_on
[
key
],
left
=
np
.
nan
,
right
=
np
.
nan
)
x
[
stim
==
1
]
=
np
.
interp
(
Qm
[
stim
==
1
],
lkp_off
[
'Q'
],
lkp_off
[
key
],
left
=
np
.
nan
,
right
=
np
.
nan
)
return
x
def
runFull
(
self
,
Fdrive
,
Adrive
,
tstim
,
toffset
,
PRF
,
DC
,
phi
=
np
.
pi
):
''' Compute solutions of the full electro-mechanical system for a specific set of
US stimulation parameters, using a classic integration scheme.
The first iteration uses the quasi-steady simplification to compute
the initiation of motion from a flat leaflet configuration. Afterwards,
the ODE system is solved iteratively until completion.
:param Fdrive: acoustic drive frequency (Hz)
:param Adrive: acoustic drive amplitude (Pa)
:param tstim: duration of US stimulation (s)
:param toffset: duration of the offset (s)
:param PRF: pulse repetition frequency (Hz)
:param DC: pulse duty cycle (-)
:param phi: acoustic drive phase (rad)
:return: 3-tuple with the time profile, the effective solution matrix and a state vector
'''
# Determine time step
dt
=
1
/
(
NPC_FULL
*
Fdrive
)
# Compute non-zero deflection value for a small perturbation (solving quasi-steady equation)
Pac
=
self
.
Pacoustic
(
dt
,
Adrive
,
Fdrive
,
phi
)
Z0
=
self
.
balancedefQS
(
self
.
ng0
,
self
.
Qm0
,
Pac
)
# Set initial conditions
steady_states
=
self
.
neuron
.
steadyStates
(
self
.
neuron
.
Vm0
)
y0
=
np
.
concatenate
((
[
0.
,
Z0
,
self
.
ng0
,
self
.
Qm0
],
[
steady_states
[
k
]
for
k
in
self
.
neuron
.
states
],
))
# Initialize simulator and compute solution
logger
.
debug
(
'Computing detailed solution'
)
simulator
=
PWSimulator
(
lambda
y
,
t
:
self
.
fullDerivatives
(
y
,
t
,
Adrive
,
Fdrive
,
phi
),
lambda
y
,
t
:
self
.
fullDerivatives
(
y
,
t
,
0.
,
0.
,
0.
),
)
t
,
y
,
stim
=
simulator
.
compute
(
y0
,
dt
,
tstim
,
toffset
,
PRF
,
DC
,
print_progress
=
logger
.
getEffectiveLevel
()
<=
logging
.
INFO
,
target_dt
=
CLASSIC_TARGET_DT
)
# Compute membrane potential vector (in mV)
Qm
=
y
[:,
3
]
Z
=
y
[:,
1
]
Vm
=
Qm
/
self
.
v_Capct
(
Z
)
*
1e3
# mV
# Add Vm to solution matrix
y
=
np
.
hstack
((
y
[:,
1
:
4
],
np
.
array
([
Vm
])
.
T
,
y
[:,
4
:]
))
# Return output variables
return
t
,
y
.
T
,
stim
def
runSONIC
(
self
,
Fdrive
,
Adrive
,
tstim
,
toffset
,
PRF
,
DC
,
dt
=
DT_EFF
):
''' Compute solutions of the system for a specific set of
US stimulation parameters, using charge-predicted "effective"
coefficients to solve the HH equations at each step.
:param Fdrive: acoustic drive frequency (Hz)
:param Adrive: acoustic drive amplitude (Pa)
:param tstim: duration of US stimulation (s)
:param toffset: duration of the offset (s)
:param PRF: pulse repetition frequency (Hz)
:param DC: pulse duty cycle (-)
:param dt: integration time step (s)
:return: 3-tuple with the time profile, the effective solution matrix and a state vector
'''
# Load appropriate 2D lookups
Aref
,
Qref
,
lookups2D
,
_
=
getLookups2D
(
self
.
neuron
.
name
,
a
=
self
.
a
,
Fdrive
=
Fdrive
)
# Check that acoustic amplitude is within lookup range
Adrive
=
isWithin
(
'amplitude'
,
Adrive
,
(
Aref
.
min
(),
Aref
.
max
()))
# Interpolate 2D lookups at zero and US amplitude
logger
.
debug
(
'Interpolating lookups at A =
%.2f
kPa and A = 0'
,
Adrive
*
1e-3
)
lookups_on
=
{
key
:
interp1d
(
Aref
,
y2D
,
axis
=
0
)(
Adrive
)
for
key
,
y2D
in
lookups2D
.
items
()}
lookups_off
=
{
key
:
interp1d
(
Aref
,
y2D
,
axis
=
0
)(
0.0
)
for
key
,
y2D
in
lookups2D
.
items
()}
# Add reference charge vector to 1D lookup dictionaries
lookups_on
[
'Q'
]
=
Qref
lookups_off
[
'Q'
]
=
Qref
# Set initial conditions
steady_states
=
self
.
neuron
.
steadyStates
(
self
.
neuron
.
Vm0
)
y0
=
np
.
insert
(
np
.
array
([
steady_states
[
k
]
for
k
in
self
.
neuron
.
states
]),
0
,
self
.
Qm0
)
# Initialize simulator and compute solution
logger
.
debug
(
'Computing effective solution'
)
simulator
=
PWSimulator
(
lambda
y
,
t
:
self
.
effDerivatives
(
y
,
t
,
lookups_on
),
lambda
y
,
t
:
self
.
effDerivatives
(
y
,
t
,
lookups_off
)
)
t
,
y
,
stim
=
simulator
.
compute
(
y0
,
dt
,
tstim
,
toffset
,
PRF
,
DC
)
Qm
=
y
[:,
0
]
# Compute effective gas content and membrane potential vectors
ng
,
Vm
=
[
self
.
interpEffVariable
(
key
,
Qm
,
stim
,
lookups_on
,
lookups_off
)
for
key
in
[
'ng'
,
'V'
]
]
# Compute quasi-steady deflection vector
Z
=
np
.
array
([
self
.
balancedefQS
(
x1
,
x2
)
for
x1
,
x2
in
zip
(
ng
,
Qm
)])
# m
# Add Z, ng and Vm to solution matrix
y
=
np
.
hstack
((
np
.
array
([
Z
,
ng
,
Qm
,
Vm
])
.
T
,
y
[:,
1
:]
))
# Return output variables
return
t
,
y
.
T
,
stim
def
runHybrid
(
self
,
Fdrive
,
Adrive
,
tstim
,
toffset
,
PRF
,
DC
,
phi
=
np
.
pi
):
''' Compute solutions of the system for a specific set of
US stimulation parameters, using a hybrid integration scheme.
The first iteration uses the quasi-steady simplification to compute
the initiation of motion from a flat leaflet configuration. Afterwards,
the NBLS ODE system is solved iteratively for "slices" of N-microseconds,
in a 2-steps scheme:
- First, the full (n+3) ODE system is integrated for a few acoustic cycles
until Z and ng reach a stable periodic solution (limit cycle)
- Second, the signals of the 3 mechanical variables over the last acoustic
period are selected and resampled to a far lower sampling rate
- Third, the HH n-ODE system is integrated for the remaining time of the
slice, using periodic expansion of the mechanical signals to precompute
the values of capacitance.
:param Fdrive: acoustic drive frequency (Hz)
:param Adrive: acoustic drive amplitude (Pa)
:param tstim: duration of US stimulation (s)
:param toffset: duration of the offset (s)
:param phi: acoustic drive phase (rad)
:return: 3-tuple with the time profile, the solution matrix and a state vector
.. warning:: This method cannot handle pulsed stimuli
'''
# Determine time step
dt_dense
=
1
/
(
NPC_FULL
*
Fdrive
)
dt_sparse
=
1
/
(
NPC_HH
*
Fdrive
)
# Compute non-zero deflection value for a small perturbation (solving quasi-steady equation)
Pac
=
self
.
Pacoustic
(
dt_dense
,
Adrive
,
Fdrive
,
phi
)
Z0
=
self
.
balancedefQS
(
self
.
ng0
,
self
.
Qm0
,
Pac
)
# Set initial conditions
steady_states
=
self
.
neuron
.
steadyStates
(
self
.
neuron
.
Vm0
)
y0
=
np
.
concatenate
((
[
0.
,
Z0
,
self
.
ng0
,
self
.
Qm0
],
[
steady_states
[
k
]
for
k
in
self
.
neuron
.
states
],
))
is_dense_var
=
np
.
array
([
True
]
*
3
+
[
False
]
*
(
len
(
self
.
neuron
.
states
)
+
1
))
# Initialize simulator and compute solution
logger
.
debug
(
'Computing hybrid solution'
)
simulator
=
HybridSimulator
(
lambda
y
,
t
:
self
.
fullDerivatives
(
y
,
t
,
Adrive
,
Fdrive
,
phi
),
lambda
y
,
t
:
self
.
fullDerivatives
(
y
,
t
,
0.
,
0.
,
0.
),
lambda
t
,
y
,
Cm
:
self
.
neuron
.
Qderivatives
(
y
,
t
,
Cm
),
lambda
yref
:
self
.
Capct
(
yref
[
1
]),
is_dense_var
,
ivars_to_check
=
[
1
,
2
]
)
t
,
y
,
stim
=
simulator
.
compute
(
y0
,
dt_dense
,
dt_sparse
,
Fdrive
,
tstim
,
toffset
,
PRF
,
DC
,
# print_progress=logger.getEffectiveLevel() <= logging.INFO
)
# Compute membrane potential vector (in mV)
Qm
=
y
[:,
3
]
Z
=
y
[:,
1
]
Vm
=
Qm
/
self
.
v_Capct
(
Z
)
*
1e3
# mV
# Add Vm to solution matrix
y
=
np
.
hstack
((
y
[:,
1
:
4
],
np
.
array
([
Vm
])
.
T
,
y
[:,
4
:]
))
# Return output variables
return
t
,
y
.
T
,
stim
def
checkInputs
(
self
,
Fdrive
,
Adrive
,
tstim
,
toffset
,
PRF
,
DC
,
method
):
''' Check validity of simulation parameters.
:param Fdrive: acoustic drive frequency (Hz)
:param Adrive: acoustic drive amplitude (Pa)
:param tstim: duration of US stimulation (s)
:param toffset: duration of the offset (s)
:param PRF: pulse repetition frequency (Hz)
:param DC: pulse duty cycle (-)
:param method: selected integration method
:return: 3-tuple with the time profile, the solution matrix and a state vector
'''
BilayerSonophore
.
checkInputs
(
self
,
Fdrive
,
Adrive
,
0.0
,
0.0
)
self
.
neuron
.
checkInputs
(
Adrive
,
tstim
,
toffset
,
PRF
,
DC
)
# Check validity of simulation type
if
method
not
in
(
'full'
,
'hybrid'
,
'sonic'
):
raise
ValueError
(
'Invalid integration method: "{}"'
.
format
(
method
))
def
simulate
(
self
,
Fdrive
,
Adrive
,
tstim
,
toffset
,
PRF
=
None
,
DC
=
1.0
,
method
=
'sonic'
):
''' Run simulation of the system for a specific set of
US stimulation parameters.
:param Fdrive: acoustic drive frequency (Hz)
:param Adrive: acoustic drive amplitude (Pa)
:param tstim: duration of US stimulation (s)
:param toffset: duration of the offset (s)
:param PRF: pulse repetition frequency (Hz)
:param DC: pulse duty cycle (-)
:param method: selected integration method
:return: 3-tuple with the time profile, the solution matrix and a state vector
'''
# Check validity of stimulation parameters
self
.
checkInputs
(
Fdrive
,
Adrive
,
tstim
,
toffset
,
PRF
,
DC
,
method
)
# Call appropriate simulation function
if
method
==
'full'
:
return
self
.
runFull
(
Fdrive
,
Adrive
,
tstim
,
toffset
,
PRF
,
DC
)
elif
method
==
'sonic'
:
return
self
.
runSONIC
(
Fdrive
,
Adrive
,
tstim
,
toffset
,
PRF
,
DC
)
elif
method
==
'hybrid'
:
# if DC < 1.0:
# raise ValueError('Pulsed protocol incompatible with hybrid integration method')
return
self
.
runHybrid
(
Fdrive
,
Adrive
,
tstim
,
toffset
,
PRF
,
DC
)
def
isExcited
(
self
,
Adrive
,
Fdrive
,
tstim
,
toffset
,
PRF
,
DC
,
method
,
return_val
=
False
):
''' Run a simulation and determine if neuron is excited.
:param Adrive: acoustic amplitude (Pa)
:param Fdrive: US frequency (Hz)
:param tstim: duration of US stimulation (s)
:param toffset: duration of the offset (s)
:param PRF: pulse repetition frequency (Hz)
:param DC: pulse duty cycle (-)
:return: boolean stating whether neuron is excited or not
'''
t
,
y
,
_
=
self
.
simulate
(
Fdrive
,
Adrive
,
tstim
,
toffset
,
PRF
,
DC
,
method
=
method
)
dt
=
t
[
1
]
-
t
[
0
]
ipeaks
,
*
_
=
findPeaks
(
y
[
2
,
:],
SPIKE_MIN_QAMP
,
int
(
np
.
ceil
(
SPIKE_MIN_DT
/
dt
)),
SPIKE_MIN_QPROM
)
nspikes
=
ipeaks
.
size
logger
.
debug
(
'A =
%s
Pa --->
%s
spike
%s
detected'
,
si_format
(
Adrive
,
2
,
space
=
' '
),
nspikes
,
"s"
if
nspikes
>
1
else
""
)
cond
=
nspikes
>
0
if
return_val
:
return
{
True
:
nspikes
,
False
:
np
.
nan
}[
cond
]
else
:
return
cond
def
isSilenced
(
self
,
Adrive
,
Fdrive
,
tstim
,
toffset
,
PRF
,
DC
,
method
,
return_val
=
False
):
''' Run a simulation and determine if neuron is silenced.
:param Adrive: acoustic amplitude (Pa)
:param Fdrive: US frequency (Hz)
:param tstim: duration of US stimulation (s)
:param PRF: pulse repetition frequency (Hz)
:param DC: pulse duty cycle (-)
:return: boolean stating whether neuron is silenced or not
'''
if
tstim
<=
TMIN_STABILIZATION
:
raise
ValueError
(
'stimulus duration must be greater than {:.0f} ms'
.
format
(
TMIN_STABILIZATION
*
1e3
))
# Simulate model without offset
t
,
y
,
_
=
self
.
simulate
(
Fdrive
,
Adrive
,
tstim
,
0.
,
PRF
,
DC
,
method
=
method
)
# Extract charge signal posterior to observation window
Qm
=
y
[
2
,
t
>
TMIN_STABILIZATION
]
# Compute variation range
Qm_range
=
np
.
ptp
(
Qm
)
logger
.
debug
(
'A =
%s
Pa --->
%.2f
nC/cm2 variation range over the last
%.0f
ms'
,
si_format
(
Adrive
,
2
,
space
=
' '
),
Qm_range
*
1e5
,
TMIN_STABILIZATION
*
1e3
)
cond
=
np
.
ptp
(
Qm
)
<
QSS_Q_DIV_THR
if
return_val
:
return
{
True
:
Qm
[
-
1
],
False
:
np
.
nan
}[
cond
]
else
:
return
cond
def
titrate
(
self
,
Fdrive
,
tstim
,
toffset
,
PRF
=
None
,
DC
=
1.0
,
Arange
=
None
,
method
=
'sonic'
):
''' Use a binary search to determine the threshold amplitude needed to obtain
neural excitation for a given frequency, duration, PRF and duty cycle.
:param Fdrive: US frequency (Hz)
:param tstim: duration of US stimulation (s)
:param toffset: duration of the offset (s)
:param PRF: pulse repetition frequency (Hz)
:param DC: pulse duty cycle (-)
:param Arange: search interval for Adrive, iteratively refined
:return: determined threshold amplitude (Pa)
'''
# Determine amplitude interval if needed
if
Arange
is
None
:
Arange
=
(
0
,
getLookups2D
(
self
.
neuron
.
name
,
a
=
self
.
a
,
Fdrive
=
Fdrive
)[
0
]
.
max
())
# Determine output function
if
self
.
neuron
.
isTitratable
():
xfunc
=
self
.
isExcited
else
:
xfunc
=
self
.
isSilenced
# Titrate
return
titrate
(
xfunc
,
(
Fdrive
,
tstim
,
toffset
,
PRF
,
DC
,
method
),
Arange
,
TITRATION_ASTIM_DA_MAX
)
def
run
(
self
,
Fdrive
,
tstim
,
toffset
,
PRF
=
None
,
DC
=
1.0
,
Adrive
=
None
,
method
=
'sonic'
):
''' Run a simulation of the full electro-mechanical system for a given neuron type
with specific parameters, and return output data and metadata.
:param Fdrive: US frequency (Hz)
:param tstim: stimulus duration (s)
:param toffset: stimulus offset (s)
:param PRF: pulse repetition frequency (Hz)
:param DC: stimulus duty cycle (-)
:param Adrive: acoustic pressure amplitude (Pa)
:param method: integration method
'''
logger
.
info
(
'
%s
:
%s
@ f =
%s
Hz,
%s
t =
%s
s (
%s
s offset)
%s
'
,
self
,
'titration'
if
Adrive
is
None
else
'simulation'
,
si_format
(
Fdrive
,
0
,
space
=
' '
),
'A = {}Pa, '
.
format
(
si_format
(
Adrive
,
2
,
space
=
' '
))
if
Adrive
is
not
None
else
''
,
*
si_format
([
tstim
,
toffset
],
1
,
space
=
' '
),
(
', PRF = {}Hz, DC = {:.2f}%'
.
format
(
si_format
(
PRF
,
2
,
space
=
' '
),
DC
*
1e2
)
if
DC
<
1.0
else
''
))
# If no amplitude provided, perform titration
if
Adrive
is
None
:
Adrive
=
self
.
titrate
(
Fdrive
,
tstim
,
toffset
,
PRF
,
DC
,
method
=
method
)
if
np
.
isnan
(
Adrive
):
logger
.
error
(
'Could not find threshold excitation amplitude'
)
return
None
# Run simulation
tstart
=
time
.
time
()
t
,
y
,
stimstate
=
self
.
simulate
(
Fdrive
,
Adrive
,
tstim
,
toffset
,
PRF
,
DC
,
method
=
method
)
tcomp
=
time
.
time
()
-
tstart
Z
,
ng
,
Qm
,
Vm
,
*
channels
=
y
logger
.
debug
(
'completed in
%s
s'
,
si_format
(
tcomp
,
1
))
# Store dataframe and metadata
data
=
pd
.
DataFrame
({
't'
:
t
,
'stimstate'
:
stimstate
,
'Z'
:
Z
,
'ng'
:
ng
,
'Qm'
:
Qm
,
'Vm'
:
Vm
})
for
j
in
range
(
len
(
self
.
neuron
.
states
)):
data
[
self
.
neuron
.
states
[
j
]]
=
channels
[
j
]
meta
=
{
'neuron'
:
self
.
neuron
.
name
,
'a'
:
self
.
a
,
'd'
:
self
.
d
,
'Fdrive'
:
Fdrive
,
'Adrive'
:
Adrive
,
'phi'
:
np
.
pi
,
'tstim'
:
tstim
,
'toffset'
:
toffset
,
'PRF'
:
PRF
,
'DC'
:
DC
,
'tcomp'
:
tcomp
,
'method'
:
method
}
# Log number of detected spikes
self
.
neuron
.
logNSpikes
(
data
)
return
data
,
meta
def
runAndSave
(
self
,
outdir
,
Fdrive
,
tstim
,
toffset
,
PRF
=
None
,
DC
=
1.0
,
Adrive
=
None
,
method
=
'sonic'
):
''' Run a simulation of the full electro-mechanical system for a given neuron type
with specific parameters, and save the results in a PKL file.
:param outdir: full path to output directory
:param Fdrive: US frequency (Hz)
:param tstim: stimulus duration (s)
:param toffset: stimulus offset (s)
:param PRF: pulse repetition frequency (Hz)
:param DC: stimulus duty cycle (-)
:param Adrive: acoustic pressure amplitude (Pa)
:param method: integration method
'''
data
,
meta
=
self
.
run
(
Fdrive
,
tstim
,
toffset
,
PRF
,
DC
,
Adrive
,
method
)
simcode
=
self
.
filecode
(
Fdrive
,
Adrive
,
tstim
,
PRF
,
DC
,
method
)
outpath
=
'{}/{}.pkl'
.
format
(
outdir
,
simcode
)
with
open
(
outpath
,
'wb'
)
as
fh
:
pickle
.
dump
({
'meta'
:
meta
,
'data'
:
data
},
fh
)
logger
.
debug
(
'simulation data exported to "
%s
"'
,
outpath
)
return
outpath
# def runIfNone(self, outdir, Fdrive, Adrive, tstim, toffset, PRF=None, DC=1.0, Adrive=None,
# method='sonic'):
# ''' Run a simulation of the full electro-mechanical system for a given neuron type
# with specific parameters, and save the results in a PKL file,
# only if file not present.
# :param outdir: full path to output directory
# :param Fdrive: US frequency (Hz)
# :param tstim: stimulus duration (s)
# :param toffset: stimulus offset (s)
# :param PRF: pulse repetition frequency (Hz)
# :param DC: stimulus duty cycle (-)
# :param Adrive: acoustic pressure amplitude (Pa)
# :param method: integration method
# '''
# fname = self.filecode(Fdrive, Adrive, tstim, PRF, DC, method)
# fpath = os.path.join(outdir, fname)
# if not os.path.isfile(fpath):
# logger.warning('"{}"" file not found'.format(fname))
# self.runAndSave(outdir=Fdrive, tstim, toffset, PRF, DC, Adrive, method)
# return loadData(fpath)
def
getStabPoints
():
# Simulate model without offset
t
,
y
,
_
=
self
.
simulate
(
Fdrive
,
Adrive
,
tstim
,
0.
,
PRF
,
DC
,
method
=
method
)
# Extract charge signal posterior to observation window
Qm
=
y
[
2
,
t
>
TMIN_STABILIZATION
]
# Compute variation range
Qm_range
=
np
.
ptp
(
Qm
)
logger
.
debug
(
'A =
%s
Pa --->
%.2f
nC/cm2 variation range over the last
%.0f
ms'
,
si_format
(
Adrive
,
2
,
space
=
' '
),
Qm_range
*
1e5
,
TMIN_STABILIZATION
*
1e3
)
cond
=
np
.
ptp
(
Qm
)
<
QSS_Q_DIV_THR
if
return_val
:
return
{
True
:
Qm
[
-
1
],
False
:
np
.
nan
}[
cond
]
else
:
return
cond
def
quasiSteadyStates
(
self
,
Fdrive
,
amps
=
None
,
charges
=
None
,
DCs
=
1.0
,
squeeze_output
=
False
):
''' Compute the quasi-steady state values of the neuron's gating variables
for a combination of US amplitudes, charge densities and duty cycles,
at a specific US frequency.
:param Fdrive: US frequency (Hz)
:param amps: US amplitudes (Pa)
:param charges: membrane charge densities (C/m2)
:param DCs: duty cycle value(s)
:return: 4-tuple with reference values of US amplitude and charge density,
as well as interpolated Vmeff and QSS gating variables
'''
# Get DC-averaged lookups interpolated at the appropriate amplitudes and charges
amps
,
charges
,
lookups
=
getLookupsDCavg
(
self
.
neuron
.
name
,
self
.
a
,
Fdrive
,
amps
,
charges
,
DCs
)
# Compute QSS states using these lookups
nA
,
nQ
,
nDC
=
lookups
[
'V'
]
.
shape
QSS
=
{
k
:
np
.
empty
((
nA
,
nQ
,
nDC
))
for
k
in
self
.
neuron
.
states
}
for
iA
in
range
(
nA
):
for
iDC
in
range
(
nDC
):
QSS_1D
=
self
.
neuron
.
quasiSteadyStates
(
{
k
:
v
[
iA
,
:,
iDC
]
for
k
,
v
in
lookups
.
items
()})
for
k
in
QSS
.
keys
():
QSS
[
k
][
iA
,
:,
iDC
]
=
QSS_1D
[
k
]
# Compress outputs if needed
if
squeeze_output
:
QSS
=
{
k
:
v
.
squeeze
()
for
k
,
v
in
QSS
.
items
()}
lookups
=
{
k
:
v
.
squeeze
()
for
k
,
v
in
lookups
.
items
()}
# Return reference inputs and outputs
return
amps
,
charges
,
lookups
,
QSS
def
quasiSteadyStateiNet
(
self
,
Qm
,
Fdrive
,
Adrive
,
DC
):
''' Compute quasi-steady state net membrane current for a given combination
of US parameters and a given membrane charge density.
:param Qm: membrane charge density (C/m2)
:param Fdrive: US frequency (Hz)
:param Adrive: US amplitude (Pa)
:param DC: duty cycle (-)
:return: net membrane current (mA/m2)
'''
_
,
_
,
lookups
,
QSS
=
self
.
quasiSteadyStates
(
Fdrive
,
amps
=
Adrive
,
charges
=
Qm
,
DCs
=
DC
,
squeeze_output
=
True
)
return
self
.
neuron
.
iNet
(
lookups
[
'V'
],
np
.
array
(
list
(
QSS
.
values
())))
# mA/m2
def
evaluateStability
(
self
,
Qm0
,
states0
,
lkp
):
''' Integrate the effective differential system from a given starting point,
until clear convergence or clear divergence is found.
:param Qm0: initial membrane charge density (C/m2)
:param states0: dictionary of initial states values
:param lkp: dictionary of 1D data points of "effective" coefficients
over the charge domain, for specific frequency and amplitude values.
:return: boolean indicating convergence state
'''
# Initialize y0 vector
t0
=
0.
y0
=
np
.
array
([
Qm0
]
+
list
(
states0
.
values
()))
# Initializing empty list to record evolution of charge deviation
n
=
int
(
QSS_HISTORY_INTERVAL
//
QSS_INTEGRATION_INTERVAL
)
# size of history
dQ
=
[]
# As long as there is no clear charge convergence or divergence
conv
,
div
=
False
,
False
tf
,
yf
=
t0
,
y0
while
not
conv
and
not
div
:
# Integrate system for small interval and retrieve final charge deviation
t0
,
y0
=
tf
,
yf
sol
=
solve_ivp
(
lambda
t
,
y
:
self
.
effDerivatives
(
y
,
t
,
lkp
),
[
t0
,
t0
+
QSS_INTEGRATION_INTERVAL
],
y0
,
method
=
'LSODA'
)
tf
,
yf
=
sol
.
t
[
-
1
],
sol
.
y
[:,
-
1
]
dQ
.
append
(
yf
[
0
]
-
Qm0
)
# logger.debug('{:.0f} ms: dQ = {:.5f} nC/cm2, avg dQ = {:.5f} nC/cm2'.format(
# tf * 1e3, dQ[-1] * 1e5, np.mean(dQ[-n:]) * 1e5))
# If last charge deviation is too large -> divergence
if
np
.
abs
(
dQ
[
-
1
])
>
QSS_Q_DIV_THR
:
div
=
True
# If last charge deviation or average deviation in recent history
# is small enough -> convergence
for
x
in
[
dQ
[
-
1
],
np
.
mean
(
dQ
[
-
n
:])]:
if
np
.
abs
(
x
)
<
QSS_Q_CONV_THR
:
conv
=
True
# If max integration duration is been reached -> error
if
tf
>
QSS_MAX_INTEGRATION_DURATION
:
raise
ValueError
(
'too many iterations'
)
logger
.
debug
(
'{}vergence after {:.0f} ms: dQ = {:.5f} nC/cm2'
.
format
(
{
True
:
'con'
,
False
:
'di'
}[
conv
],
tf
*
1e3
,
dQ
[
-
1
]
*
1e5
))
return
conv
def
quasiSteadyStateFixedPoints
(
self
,
Fdrive
,
Adrive
,
DC
,
lkp
,
dQdt
):
''' Compute QSS fixed points along the charge dimension for a given combination
of US parameters, and determine their stability.
:param Fdrive: US frequency (Hz)
:param Adrive: US amplitude (Pa)
:param DC: duty cycle (-)
:param lkp: lookup dictionary for effective variables along charge dimension
:param dQdt: charge derivative profile along charge dimension
:return: 2-tuple with values of stable and unstable fixed points
'''
logger
.
debug
(
'A = {:.2f} kPa, DC = {:.0f}%'
.
format
(
Adrive
*
1e-3
,
DC
*
1e2
))
# Extract stable and unstable fixed points from QSS charge variation profile
dfunc
=
lambda
Qm
:
-
self
.
quasiSteadyStateiNet
(
Qm
,
Fdrive
,
Adrive
,
DC
)
SFP_candidates
=
getFixedPoints
(
lkp
[
'Q'
],
dQdt
,
filter
=
'stable'
,
der_func
=
dfunc
)
.
tolist
()
UFPs
=
getFixedPoints
(
lkp
[
'Q'
],
dQdt
,
filter
=
'unstable'
,
der_func
=
dfunc
)
.
tolist
()
SFPs
=
[]
pltvars
=
self
.
getPltVars
()
# For each candidate SFP
for
i
,
Qm
in
enumerate
(
SFP_candidates
):
logger
.
debug
(
'Q-SFP = {:.2f} nC/cm2'
.
format
(
Qm
*
1e5
))
# Re-compute QSS
*
_
,
QSS_FP
=
self
.
quasiSteadyStates
(
Fdrive
,
amps
=
Adrive
,
charges
=
Qm
,
DCs
=
DC
,
squeeze_output
=
True
)
# Simulate from unperturbed QSS and evaluate stability
if
not
self
.
evaluateStability
(
Qm
,
QSS_FP
,
lkp
):
logger
.
warning
(
'diverging system at ({:.2f} kPa, {:.2f} nC/cm2)'
.
format
(
Adrive
*
1e-3
,
Qm
*
1e5
))
UFPs
.
append
(
Qm
)
else
:
# For each state
unstable_states
=
[]
for
x
in
self
.
neuron
.
states
:
pltvar
=
pltvars
[
x
]
unit_str
=
pltvar
.
get
(
'unit'
,
''
)
factor
=
pltvar
.
get
(
'factor'
,
1
)
is_stable_direction
=
[]
for
sign
in
[
-
1
,
+
1
]:
# Perturb state with small offset
QSS_perturbed
=
deepcopy
(
QSS_FP
)
QSS_perturbed
[
x
]
*=
(
1
+
sign
*
QSS_REL_OFFSET
)
# If gating state, bound within [0., 1.]
if
self
.
neuron
.
isVoltageGated
(
x
):
QSS_perturbed
[
x
]
=
np
.
clip
(
QSS_perturbed
[
x
],
0.
,
1.
)
logger
.
debug
(
'{}: {:.5f} -> {:.5f} {}'
.
format
(
x
,
QSS_FP
[
x
]
*
factor
,
QSS_perturbed
[
x
]
*
factor
,
unit_str
))
# Simulate from perturbed QSS and evaluate stability
is_stable_direction
.
append
(
self
.
evaluateStability
(
Qm
,
QSS_perturbed
,
lkp
))
# Check if system shows stability upon x-state perturbation
# in both directions
if
not
np
.
all
(
is_stable_direction
):
unstable_states
.
append
(
x
)
# Classify fixed point as stable only if all states show stability
is_stable_FP
=
len
(
unstable_states
)
==
0
{
True
:
SFPs
,
False
:
UFPs
}[
is_stable_FP
]
.
append
(
Qm
)
logger
.
info
(
'{}stable fixed-point at ({:.2f} kPa, {:.2f} nC/cm2){}'
.
format
(
''
if
is_stable_FP
else
'un'
,
Adrive
*
1e-3
,
Qm
*
1e5
,
''
if
is_stable_FP
else
', caused by {} states'
.
format
(
unstable_states
)))
return
SFPs
,
UFPs
def
findRheobaseAmps
(
self
,
DCs
,
Fdrive
,
Vthr
):
''' Find the rheobase amplitudes (i.e. threshold acoustic amplitudes of infinite duration
that would result in excitation) of a specific neuron for various duty cycles.
:param DCs: duty cycles vector (-)
:param Fdrive: acoustic drive frequency (Hz)
:param Vthr: threshold membrane potential above which the neuron necessarily fires (mV)
:return: rheobase amplitudes vector (Pa)
'''
# Get threshold charge from neuron's spike threshold parameter
Qthr
=
self
.
neuron
.
Cm0
*
Vthr
*
1e-3
# C/m2
# Get QSS variables for each amplitude at threshold charge
Aref
,
_
,
Vmeff
,
QS_states
=
self
.
quasiSteadyStates
(
Fdrive
,
charges
=
Qthr
,
DCs
=
DCs
)
if
DCs
.
size
==
1
:
QS_states
=
QS_states
.
reshape
((
*
QS_states
.
shape
,
1
))
Vmeff
=
Vmeff
.
reshape
((
*
Vmeff
.
shape
,
1
))
# Compute 2D QSS charge variation array at Qthr
dQdt
=
-
self
.
neuron
.
iNet
(
Vmeff
,
QS_states
)
# Find the threshold amplitude that cancels dQdt for each duty cycle
Arheobase
=
np
.
array
([
np
.
interp
(
0
,
dQdt
[:,
i
],
Aref
,
left
=
0.
,
right
=
np
.
nan
)
for
i
in
range
(
DCs
.
size
)])
# Check if threshold amplitude is found for all DCs
inan
=
np
.
where
(
np
.
isnan
(
Arheobase
))[
0
]
if
inan
.
size
>
0
:
if
inan
.
size
==
Arheobase
.
size
:
logger
.
error
(
'No rheobase amplitudes within [
%s
-
%s
Pa] for the provided duty cycles'
,
*
si_format
((
Aref
.
min
(),
Aref
.
max
())))
else
:
minDC
=
DCs
[
inan
.
max
()
+
1
]
logger
.
warning
(
'No rheobase amplitudes within [
%s
-
%s
Pa] below
%.1f%%
duty cycle'
,
*
si_format
((
Aref
.
min
(),
Aref
.
max
())),
minDC
*
1e2
)
return
Arheobase
,
Aref
def
computeEffVars
(
self
,
Fdrive
,
Adrive
,
Qm
,
fs
):
''' Compute "effective" coefficients of the HH system for a specific
combination of stimulus frequency, stimulus amplitude and charge density.
A short mechanical simulation is run while imposing the specific charge density,
until periodic stabilization. The HH coefficients are then averaged over the last
acoustic cycle to yield "effective" coefficients.
:param Fdrive: acoustic drive frequency (Hz)
:param Adrive: acoustic drive amplitude (Pa)
:param Qm: imposed charge density (C/m2)
:param fs: list of sonophore membrane coverage fractions
:return: list with computation time and a list of dictionaries of effective variables
'''
tstart
=
time
.
time
()
# Run simulation and retrieve deflection and gas content vectors from last cycle
_
,
[
Z
,
ng
],
_
=
BilayerSonophore
.
simulate
(
self
,
Fdrive
,
Adrive
,
Qm
)
Z_last
=
Z
[
-
NPC_FULL
:]
# m
Cm_last
=
self
.
v_Capct
(
Z_last
)
# F/m2
# For each coverage fraction
effvars
=
[]
for
x
in
fs
:
# Compute membrane capacitance and membrane potential vectors
Cm
=
x
*
Cm_last
+
(
1
-
x
)
*
self
.
Cm0
# F/m2
Vm
=
Qm
/
Cm
*
1e3
# mV
# Compute average cycle value for membrane potential and rate constants
effvars
.
append
({
'V'
:
np
.
mean
(
Vm
)})
effvars
[
-
1
]
.
update
(
self
.
neuron
.
computeEffRates
(
Vm
))
tcomp
=
time
.
time
()
-
tstart
# Log process
log
=
'{}: lookups @ {}Hz, {}Pa, {:.2f} nC/cm2'
.
format
(
self
,
*
si_format
([
Fdrive
,
Adrive
],
precision
=
1
,
space
=
' '
),
Qm
*
1e5
)
if
len
(
fs
)
>
1
:
log
+=
', fs = {:.0f} - {:.0f}%'
.
format
(
fs
.
min
()
*
1e2
,
fs
.
max
()
*
1e2
)
log
+=
', tcomp = {:.3f} s'
.
format
(
tcomp
)
logger
.
info
(
log
)
# Return effective coefficients
return
[
tcomp
,
effvars
]
Event Timeline
Log In to Comment