Page Menu
Home
c4science
Search
Configure Global Search
Log In
Files
F77468438
dataset_generator.py
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Subscribers
None
File Metadata
Details
File Info
Storage
Attached
Created
Wed, Aug 14, 16:11
Size
5 KB
Mime Type
text/x-python
Expires
Fri, Aug 16, 16:11 (2 d)
Engine
blob
Format
Raw Data
Handle
19875223
Attached To
R12761 LC-Sampling-Theory
dataset_generator.py
View Options
import
os
from
typing
import
List
import
pandas
as
pd
import
numpy
as
np
import
jax.numpy
as
jnp
DATA_PATH
=
"../ecg_syn/ecgsyn.dat"
SAVE_PATH
=
"../dataset/beats.npy"
OPT_FILE
=
"../ecg_syn/ecgsyn.opt"
HR
=
60
T_SPAN_RANDOM_SIGNAL_SECONDS
=
2
MEM_FOR_ECGSYN
=
32e9
BYTES_ECGSYN_BOINT
=
1001
def
run_ECGSYN
(
data_path
,
freq
,
num_samples
):
dt
=
1
/
freq
if
os
.
path
.
isfile
(
OPT_FILE
):
os
.
remove
(
OPT_FILE
)
command
=
f
'cd ../ecg_syn/ ; ./ecgsyn -n {num_samples+2} -s {freq} -S {freq} -h {HR}
%%
'
#num_samples+2 as the first and last heartbeat might be vexed
os
.
system
(
command
)
data
=
pd
.
read_csv
(
data_path
,
delimiter
=
" "
,
header
=
None
)
return
data
def
separate_beats
(
vs
:
np
.
ndarray
,
ms
:
List
)
->
List
[
np
.
ndarray
]:
out
:
List
[
np
.
ndarray
]
=
[]
min_value_idx
:
int
=
0
min_value_idx_old
:
int
=
0
min_value
:
float
=
np
.
inf
in_t_p
:
bool
=
False
for
i
,(
v
,
m
)
in
enumerate
(
zip
(
vs
,
ms
)):
if
m
==
5
:
in_t_p
=
True
if
m
==
1
:
in_t_p
=
False
out
.
append
(
vs
[
min_value_idx_old
:
min_value_idx
])
min_value_idx_old
=
min_value_idx
min_value
=
np
.
inf
if
in_t_p
:
if
v
<
min_value
:
min_value
=
v
min_value_idx
=
i
return
out
[
1
:]
# We don't want the first window: it could be not a full window (recording start in-medias beat)
def
find_len_left
(
windows
:
List
[
np
.
ndarray
])
->
int
:
len_left
=
len
(
windows
[
0
])
for
w
in
windows
:
c
=
np
.
argmax
(
w
)
if
c
<
len_left
:
len_left
=
c
return
len_left
def
find_len_right
(
windows
:
List
[
np
.
ndarray
])
->
int
:
len_right
=
len
(
windows
[
0
])
for
w
in
windows
:
c
=
np
.
argmax
(
w
)
l_r
=
len
(
w
)
-
c
-
1
if
l_r
<
len_right
:
len_right
=
l_r
return
len_right
def
normalize_length
(
windows
:
List
[
np
.
ndarray
])
->
List
[
np
.
ndarray
]:
out
:
List
[
np
.
ndarray
]
=
[]
len_left
=
find_len_left
(
windows
)
len_right
=
find_len_right
(
windows
)
for
w
in
windows
:
c
=
np
.
argmax
(
w
)
left_idx
=
c
-
len_left
rigth_idx
=
c
+
len_right
+
1
out
.
append
(
w
[
left_idx
:
rigth_idx
])
return
out
def
load_signal
(
num_pts
,
freq
):
tot_num_pts
=
0
freq_this_file
=
0
dataset
=
None
if
os
.
path
.
isfile
(
OPT_FILE
):
with
open
(
OPT_FILE
)
as
f
:
for
l
in
f
.
readlines
():
if
"-s"
in
l
:
freq_this_file
=
int
(
l
[
3
:
12
])
if
"-n"
in
l
:
tot_num_pts
=
int
(
l
[
3
:
12
])
if
freq
==
freq_this_file
:
dataset
=
np
.
load
(
SAVE_PATH
)[:
num_pts
]
if
tot_num_pts
>
num_pts
:
print
(
f
"Loaded {tot_num_pts} points, the dataset contains {tot_num_pts} points"
)
elif
tot_num_pts
<
num_pts
:
print
(
f
"Incoherent info. about number of pints, dataset length: {len(dataset)}"
)
else
:
print
(
f
"Present dataset do not respect given parameter (f: {freq_this_file}, pts: {tot_num_pts})"
)
else
:
print
(
"No signal to load/ Missing config file"
)
return
dataset
def
create_random_signal
(
coefs
,
ws
,
dt
):
t
=
np
.
linspace
(
0
,
T_SPAN_RANDOM_SIGNAL_SECONDS
,
int
(
T_SPAN_RANDOM_SIGNAL_SECONDS
/
dt
))
x
=
np
.
zeros
(
len
(
t
))
for
w
,
c
in
zip
(
ws
,
coefs
):
x
+=
c
*
np
.
sin
(
w
*
t
)
return
x
def
create_positive_random_dataset
(
num_pts
,
freq
):
out
=
[]
rng
=
np
.
random
.
default_rng
(
31415926514
)
ws
=
rng
.
choice
(
int
(
freq
*
10
),
size
=
100
,
replace
=
False
)
/
100
#We do FREQ*10/100 so to have an big enough integer search space for rng.choich, and we divide by 10 so the maximum freq. is 1/10 of the sampling freq
dt
=
1
/
freq
coefs
=
rng
.
choice
(
3000
,
size
=
100
)
for
_
in
range
(
num_pts
):
x
=
create_random_signal
(
coefs
,
ws
,
dt
)
x
-=
min
(
x
)
+
0.01
out
.
append
(
x
)
out
=
np
.
array
(
out
)
return
out
def
create_ECG_emulated_dataset
(
num_pts
,
freq
):
windows
=
[]
max_num_beat_this_freq
=
int
(
MEM_FOR_ECGSYN
/
(
freq
*
BYTES_ECGSYN_BOINT
)
*
60
/
HR
)
print
(
f
"Maximum number of beats at this freq (per ECGSYN run): {max_num_beat_this_freq}"
)
print
(
f
"Beats desired: {num_pts}"
)
print
(
f
"Running ECGSYN {num_pts//max_num_beat_this_freq} times"
)
for
i
in
range
(
num_pts
//
max_num_beat_this_freq
):
print
(
"
\n
###########################"
)
print
(
f
"#Generating {i+1}/{num_pts//max_num_beat_this_freq} datassets"
)
print
(
"###########################
\n
"
)
data
=
run_ECGSYN
(
data_path
=
DATA_PATH
,
freq
=
freq
,
num_samples
=
max_num_beat_this_freq
)
v
=
data
[
1
]
.
to_numpy
()
marks
=
data
[
2
]
.
to_list
()
windows
.
extend
(
separate_beats
(
v
,
marks
)[:
max_num_beat_this_freq
])
#Tail
if
num_pts
%
max_num_beat_this_freq
!=
0
:
print
(
"
\n
###########################"
)
print
(
f
"#Running ECGSYN. Tail beats number: {num_pts%max_num_beat_this_freq}"
)
print
(
"###########################
\n
"
)
num_beats_remaining
=
num_pts
%
max_num_beat_this_freq
data
=
run_ECGSYN
(
data_path
=
DATA_PATH
,
freq
=
freq
,
num_samples
=
num_beats_remaining
)
v
=
data
[
1
]
.
to_numpy
()
marks
=
data
[
2
]
.
to_list
()
windows
.
extend
(
separate_beats
(
v
,
marks
)[:
num_beats_remaining
])
windows_length_norm
=
normalize_length
(
windows
)
dataset_np
=
np
.
array
(
windows_length_norm
)
if
not
os
.
path
.
isdir
(
"../dataset"
):
os
.
mkdir
(
"../dataset"
)
np
.
save
(
SAVE_PATH
,
dataset_np
)
return
dataset_np
def
get_signal
(
type
=
'load'
,
num_pts
=
1000
,
freq
=
256
):
if
type
==
'random'
:
dataset
=
create_positive_random_dataset
(
num_pts
,
freq
)
elif
type
==
'load'
:
dataset
=
load_signal
(
num_pts
,
freq
)
elif
type
==
'create'
:
dataset
=
create_ECG_emulated_dataset
(
num_pts
,
freq
)
else
:
print
(
"Dataset type not recognized in 'get_signal()'"
)
return
jnp
.
array
(
dataset
)
def
main
()
->
None
:
pass
if
__name__
==
"__main__"
:
main
()
Event Timeline
Log In to Comment