Page Menu
Home
c4science
Search
Configure Global Search
Log In
Files
F83476840
compute_srf.py
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Subscribers
None
File Metadata
Details
File Info
Storage
Attached
Created
Tue, Sep 17, 08:44
Size
5 KB
Mime Type
text/x-python
Expires
Thu, Sep 19, 08:44 (1 d, 23 h)
Engine
blob
Format
Raw Data
Handle
20807414
Attached To
rEBECGSMARTRESAMPLER EB_ECG_Smart_Resampler
compute_srf.py
View Options
import
numpy
as
np
import
os
from
scipy.interpolate
import
CubicSpline
,
interp1d
from
tqdm
import
tqdm
import
linecache
import
tracemalloc
DATA_DIR
=
"../data/extracted_data/"
import
pickle
from
multiprocessing
import
Pool
import
sys
# Signal freqeuncy variables
FREQ
=
128
MULTIPLIER_FREQ
=
50
#Final freq: 128*20 = 2.56 KHz
def
open_file
(
file_name
):
file_name_full
=
os
.
path
.
join
(
DATA_DIR
,
os
.
path
.
basename
(
file_name
))
data
=
{}
with
open
(
file_name_full
,
"rb"
)
as
f
:
data
=
pickle
.
load
(
f
)
f
.
flush
()
f
.
close
()
return
data
def
find_num_events_ADC
(
signal
,
nBits
,
frequency_multiplier
=
MULTIPLIER_FREQ
,
hist
=
5
,
original_bits
=
11
):
#ADC stats
up_samp_signal
=
upsample_uniform_signal
(
signal
,
frequency_multiplier
)
delta
=
2
**
original_bits
dV
=
(
delta
)
/
(
2
**
nBits
)
hist
=
(
hist
/
100
)
*
dV
min_val
=
-
delta
//
2
#init value, first sample (we assume we always sample the nearest level at start) and ADC status
v_0
=
up_samp_signal
[
'v'
][
0
]
lowTh
=
min_val
+
((
v_0
-
min_val
)
//
dV
)
*
dV
highTh
=
lowTh
+
dV
num_events_ADC
=
0
for
val
in
tqdm
(
up_samp_signal
[
'v'
],
desc
=
f
"ADC with {nBits} bits"
):
#print(f"Value: {val}, time: {time}, low_th = {lowTh - hist}, high_th = { highTh + hist}")
if
val
>
highTh
+
hist
or
val
<
lowTh
-
hist
:
lowTh
=
min_val
+
((
val
-
min_val
)
//
dV
)
*
dV
#Delta from the bottom: (val-min_val)//dV*dV then compute the actual level summin min_val
highTh
=
lowTh
+
dV
num_events_ADC
+=
1
return
num_events_ADC
def
ADC
(
signal
,
nBits
,
frequency_multiplier
=
MULTIPLIER_FREQ
,
hist
=
5
,
original_bits
=
11
):
#ADC stats
up_samp_signal
=
upsample_uniform_signal
(
signal
,
frequency_multiplier
)
delta
=
2
**
original_bits
dV
=
(
delta
)
/
(
2
**
nBits
)
hist
=
(
hist
/
100
)
*
dV
min_val
=
-
delta
//
2
events
=
{
't'
:[],
'v'
:[]}
#init value, first sample (we assume we always sample the nearest level at start) and ADC status
v_0
=
up_samp_signal
[
'v'
][
0
]
lowTh
=
min_val
+
((
v_0
-
min_val
)
//
dV
)
*
dV
highTh
=
lowTh
+
dV
events
[
't'
]
.
append
(
up_samp_signal
[
't'
][
0
]
/
frequency_multiplier
)
events
[
'v'
]
.
append
(
int
(
lowTh
if
v_0
-
lowTh
<
highTh
-
v_0
else
highTh
))
i
=
0
for
val
in
tqdm
(
up_samp_signal
[
'v'
],
desc
=
f
"ADC with {nBits} bits"
):
#print(f"Value: {val}, time: {time}, low_th = {lowTh - hist}, high_th = { highTh + hist}")
if
val
>
highTh
+
hist
or
val
<
lowTh
-
hist
:
direction
=
1
if
val
>
highTh
else
-
1
lowTh
=
min_val
+
((
val
-
min_val
)
//
dV
)
*
dV
#Delta from the bottom: (val-min_val)//dV*dV then compute the actual level summin min_val
highTh
=
lowTh
+
dV
events
[
't'
]
.
append
(
up_samp_signal
[
't'
][
i
]
/
frequency_multiplier
)
events
[
'v'
]
.
append
(
int
(
lowTh
if
direction
==
1
else
highTh
))
i
+=
1
if
events
[
't'
][
-
1
]
<
up_samp_signal
[
't'
][
-
1
]
/
frequency_multiplier
:
#final value, last sample (we assume we always sample the nearest level at end)
v_end
=
up_samp_signal
[
'v'
][
-
1
]
lowTh
=
min_val
+
((
v_end
-
min_val
)
//
dV
)
*
dV
highTh
=
lowTh
+
dV
events
[
't'
]
.
append
(
up_samp_signal
[
't'
][
-
1
]
/
frequency_multiplier
)
events
[
'v'
]
.
append
(
int
(
lowTh
if
v_end
-
lowTh
<
highTh
-
v_end
else
highTh
))
return
events
def
upsample_uniform_signal
(
signal
,
multiplier
):
out_signal
=
{}
upsampled
=
[]
v
=
signal
[
'v'
]
last_new_t
=
(
len
(
v
)
-
1
)
*
multiplier
t
=
np
.
arange
(
0
,
last_new_t
+
1
,
multiplier
,
dtype
=
np
.
int64
)
new_base
=
np
.
arange
(
0
,
last_new_t
,
1
,
dtype
=
np
.
int64
)
f
=
interp1d
(
t
,
v
)
upsampled
=
f
(
new_base
)
out_signal
=
{
't'
:
new_base
,
'v'
:
upsampled
}
return
out_signal
def
prof_one_lvl
(
b
):
tracemalloc
.
start
()
original_num_points
=
0
EB_num_points
=
0
snapshot_start
=
tracemalloc
.
take_snapshot
()
for
f_name
in
os
.
listdir
(
DATA_DIR
):
print
(
f
"File {f_name}"
)
data
=
open_file
(
f_name
)
EB_num_points
+=
find_num_events_ADC
(
data
,
b
)
original_num_points
+=
len
(
data
[
't'
])
data
=
None
snapshot_end
=
tracemalloc
.
take_snapshot
()
top_stats
=
snapshot_end
.
statistics
(
'lineno'
)
top_stats_diff
=
snapshot_end
.
compare_to
(
snapshot_start
,
'lineno'
)
print
(
"
\n\n
########################################################
\n\n
"
)
print
(
"[ Top 10 memory usage]"
)
for
stat
in
top_stats
[:
10
]:
print
(
stat
)
print
(
"
\n\n
########################################################
\n\n
"
)
print
(
"[ Top 10 differences ]"
)
for
stat
in
top_stats_diff
[:
10
]:
print
(
stat
)
print
(
"
\n\n
########################################################
\n\n
"
)
print
(
f
'{b} Bits, SRF = {(original_num_points-EB_num_points)/original_num_points}
\n\n
'
)
snapshot_start
=
snapshot_end
SRF
=
(
original_num_points
-
EB_num_points
)
/
original_num_points
print
(
f
'{b} Bits, SRF = {SRF}'
)
with
open
(
'../results/SRFs_3.txt'
,
'a+'
)
as
f
:
f
.
write
(
f
'{b} Bits, SRF = {SRF}
\n
'
)
def
process_one_level
(
lvl
):
original_num_points
=
0
EB_num_points
=
0
for
f_name
in
os
.
listdir
(
DATA_DIR
):
tracemalloc
.
start
()
tracemalloc
.
stop
()
print
(
f
"File {f_name}"
)
data
=
open_file
(
f_name
)
EB_num_points
+=
find_num_events_ADC
(
data
,
lvl
)
original_num_points
+=
len
(
data
[
't'
])
data
=
None
SRF
=
(
original_num_points
-
EB_num_points
)
/
original_num_points
print
(
f
'{lvl} Bits, SRF = {SRF}'
)
with
open
(
'../results/SRFs_3.txt'
,
'a+'
)
as
f
:
f
.
write
(
f
'{lvl} Bits, SRF = {SRF}
\n
'
)
def
main
():
cores
=
int
(
sys
.
argv
[
2
])
with
Pool
(
cores
)
as
p
:
if
sys
.
argv
[
1
]
==
"proces"
:
p
.
map
(
process_one_level
,
list
(
range
(
2
,
11
)))
elif
sys
.
argv
[
1
]
==
"profile"
:
p
.
map
(
prof_one_lvl
,
list
(
range
(
2
,
11
)))
else
:
print
(
f
"YOU STUPID FUCK!"
)
exit
if
__name__
==
"__main__"
:
main
()
Event Timeline
Log In to Comment