Page Menu
Home
c4science
Search
Configure Global Search
Log In
Files
F66597489
ECG_beats_division.py
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Subscribers
None
File Metadata
Details
File Info
Storage
Attached
Created
Tue, Jun 11, 17:04
Size
6 KB
Mime Type
text/x-python
Expires
Thu, Jun 13, 17:04 (2 d)
Engine
blob
Format
Raw Data
Handle
18246407
Attached To
rEBECGSMARTRESAMPLER EB_ECG_Smart_Resampler
ECG_beats_division.py
View Options
import
os
import
pickle
import
sys
from
time
import
time
import
matplotlib.pyplot
as
plt
import
numpy
as
np
import
pandas
as
pd
scripts
=
'../helper_scripts'
if
scripts
not
in
sys
.
path
:
sys
.
path
.
insert
(
0
,
scripts
)
import
multiprocessing
import
shutil
from
multiprocessing
import
Pool
,
Process
from
csvManager
import
csvManager
#SETUP
data_raw
=
"../data/extracted_data"
data_lvl
=
"../data/level_crossing"
annot_folder
=
"../data/extracted_annotation"
out_dir
=
"../data/beats"
bits
=
6
max_num_bits
=
11
beat_to_plot
=
253
file_to_extract
=
None
#'16265' #None for all files
#MAIN FUNCTIONS
def
extract_beats
(
data
,
boundaries
):
# {beat_time_1:
# {'uniform':
# {'t':[],'v':[]},
# 'eb':
# {'t':[],'v':[]}
# }
# }
beats
=
{}
prior_idx_eb
=
0
for
b
in
boundaries
:
#print(b)
idx_start
=
find_time_index_in_eb
(
data
[
't'
],
b
[
0
],
prior
=
prior_idx_eb
,
mode
=
"before"
)
idx_stop
=
find_time_index_in_eb
(
data
[
't'
],
b
[
2
],
prior
=
idx_start
,
mode
=
"after"
)
#print(f"Searched {b[0]} - {b[1]}. Indexes(time) found: {idx_start}({data['t'][idx_start]}) {idx_stop}({data['t'][idx_stop]})")
prior_idx_eb
=
idx_stop
-
1
t
=
data
[
't'
][
idx_start
:
idx_stop
+
1
]
v
=
data
[
'v'
][
idx_start
:
idx_stop
+
1
]
beats
[
b
[
1
]]
=
{
't'
:
t
,
'v'
:
v
}
return
beats
def
get_boundaries
(
file_name
):
boundaries
=
[]
#list of touples (start,annotation,stop)
annot_file
=
os
.
path
.
join
(
annot_folder
,
os
.
path
.
basename
(
file_name
)
.
split
(
"."
)[
0
]
+
'.annot'
)
annotations
=
[]
with
open
(
annot_file
)
as
f
:
annotations
=
[
int
(
t
)
for
t
in
f
.
readlines
()]
delta_t_bef
=
0
delta_t_aft
=
0
for
i
in
range
(
1
,
len
(
annotations
)):
delta_t_bef
=
(
annotations
[
i
]
-
annotations
[
i
-
1
])
*
0.4
if
i
!=
len
(
annotations
)
-
1
:
delta_t_aft
=
(
annotations
[
i
+
1
]
-
annotations
[
i
])
*
0.6
t_bef
=
annotations
[
i
]
-
delta_t_bef
t_aft
=
annotations
[
i
]
+
delta_t_aft
boundaries
.
append
((
t_bef
,
annotations
[
i
],
t_aft
))
return
boundaries
def
get_data
(
file_name
,
bits
):
#{'uniform':
# {'t':[],'v':[]},
# 'eb':
# {'t':[],'v':[]}
#}
manager
=
csvManager
()
data
=
{}
if
bits
==
0
:
data_file
=
os
.
path
.
join
(
data_raw
,
file_name
)
else
:
data_file
=
os
.
path
.
join
(
data_lvl
,
str
(
bits
)
+
"bits"
,
file_name
)
t
,
v
=
manager
.
read
(
data_file
)
data
=
{
't'
:
t
,
'v'
:
v
}
return
data
def
get_beats
(
file_name
,
bits
):
data
=
get_data
(
file_name
,
bits
)
boundaries
=
get_boundaries
(
file_name
)
beats
=
extract_beats
(
data
,
boundaries
)
return
beats
# --------------------------- HELPER FUNCTIONS ---------------------------
def
get_files
(
folder
,
pattern
=
None
):
names
=
[
name
for
name
in
os
.
listdir
(
folder
)
if
"bin"
in
name
]
if
pattern
is
not
None
:
filt_fun
=
lambda
strin
:
True
if
pattern
in
strin
else
False
names
=
list
(
filter
(
filt_fun
,
names
))
return
names
def
find_time_index_in_eb
(
data_time
,
time
,
prior
=
0
,
mode
=
"before"
):
if
time
<=
data_time
[
0
]:
return
0
elif
time
>=
data_time
[
-
1
]:
return
(
len
(
data_time
)
-
1
)
idx
=
None
#print(time,prior,len(data_time)-1)
for
this_idx
in
range
(
prior
,
len
(
data_time
)
-
1
):
if
time
>=
data_time
[
this_idx
]
and
time
<
data_time
[
this_idx
+
1
]:
if
time
==
data_time
[
this_idx
]
or
mode
==
"before"
:
idx
=
this_idx
elif
mode
==
"after"
:
idx
=
this_idx
+
1
break
if
idx
==
None
:
for
this_idx
in
range
(
0
,
prior
):
if
time
>=
data_time
[
this_idx
]
and
time
<
data_time
[
this_idx
+
1
]:
if
time
==
data_time
[
this_idx
]
or
mode
==
"before"
:
idx
=
this_idx
elif
mode
==
"after"
:
idx
=
this_idx
+
1
break
return
idx
def
invert_inner_outer_keys
(
data
):
data_out
=
{}
for
k1
in
data
[
list
(
data
.
keys
())[
0
]]
.
keys
():
data_out
[
k1
]
=
{}
for
k0
in
data
.
keys
():
data_out
[
k1
][
k0
]
=
data
[
k0
][
k1
]
return
data_out
def
extract_beats_file
(
file_name
):
beats_this_file
=
{}
print
(
f
"----------------------- Analyzing file {file_name} ----------------------- "
)
for
b
in
range
(
max_num_bits
+
1
):
#print(f"Analyzing file {file_name} sampled with {b} bits")
beats_this_file
[
b
]
=
get_beats
(
file_name
,
b
)
# Store data (serialize)
data_inverted_keys
=
invert_inner_outer_keys
(
beats_this_file
)
file_out
=
os
.
path
.
join
(
out_dir
,
file_name
.
split
(
"."
)[
0
]
+
".pickle"
)
with
open
(
file_out
,
'wb'
)
as
handle
:
pickle
.
dump
(
data_inverted_keys
,
handle
,
protocol
=
pickle
.
HIGHEST_PROTOCOL
)
def
process
(
multi
=
True
,
cores
=
1
):
files
=
get_files
(
data_raw
,
file_to_extract
)
if
os
.
path
.
isdir
(
out_dir
):
shutil
.
rmtree
(
out_dir
)
os
.
mkdir
(
out_dir
)
if
multi
:
print
(
f
"Executing beats subdivision with {cores} cores..."
)
used_cores
=
cores
with
Pool
(
used_cores
)
as
pool
:
pool
.
map
(
extract_beats_file
,
files
)
else
:
for
arg
in
files
:
extract_beats_file
(
arg
)
def
test_performances
():
times
=
[]
print
(
"~"
*
85
)
print
(
"Analyzing multicore performances in sampling signals with level crossing algorithm and saving in binary.
\n
"
"Usefull? No, interesting, yes."
)
print
(
"-"
*
85
)
print
(
"-"
*
85
)
print
(
"
\n
"
)
for
core_num
in
range
(
1
,
multiprocessing
.
cpu_count
()):
print
(
"-"
*
85
)
print
(
f
"Using {core_num} cores..."
)
start
=
time
()
process
(
multi
=
True
,
cores
=
core_num
)
stop
=
time
()
times
.
append
(
start
-
stop
)
print
(
"
\n\n
"
)
for
core_num
,
t
in
zip
(
range
(
1
,
multiprocessing
.
cpu_count
()
-
1
),
times
):
print
(
f
"Elapsed time using {core_num} cores: {int(t//60)}:{int((t-t//60*60)//1)}"
)
plt
.
figure
()
plt
.
plot
(
times
)
plt
.
savefig
(
"../data/logs/perf2.png"
)
if
__name__
==
"__main__"
:
import
argparse
parser
=
argparse
.
ArgumentParser
()
parser
.
add_argument
(
"--test"
,
help
=
"test multicore capabilities"
,
action
=
"store_true"
)
parser
.
add_argument
(
"--cores"
,
help
=
"Force used number of cores (default, half of the available ones"
)
args
=
parser
.
parse_args
()
if
args
.
test
:
print
(
"TEST MULTICORE..."
)
test_performances
()
else
:
if
args
.
cores
is
not
None
:
used_cores
=
int
(
args
.
cores
)
else
:
used_cores
=
multiprocessing
.
cpu_count
()
//
2
process
(
multi
=
True
,
cores
=
used_cores
)
Event Timeline
Log In to Comment