Page Menu
Home
c4science
Search
Configure Global Search
Log In
Files
F78072803
extract_data.py
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Subscribers
None
File Metadata
Details
File Info
Storage
Attached
Created
Sun, Aug 18, 06:25
Size
2 KB
Mime Type
text/x-python
Expires
Tue, Aug 20, 06:25 (2 d)
Engine
blob
Format
Raw Data
Handle
19977348
Attached To
rEBECGSMARTRESAMPLER EB_ECG_Smart_Resampler
extract_data.py
View Options
import
os
import
sys
import
matplotlib.pyplot
as
plt
import
numpy
as
np
import
pandas
as
pd
scripts
=
'../helper_scripts'
if
scripts
not
in
sys
.
path
:
sys
.
path
.
insert
(
0
,
scripts
)
import
multiprocessing
from
multiprocessing
import
Pool
import
MITAnnotation
as
MITA
import
wfdb
from
csvManager
import
csvManager
FREE_CORES
=
0
data_folder
=
"../data/dataRaw"
dest_data
=
"../data/extracted_data"
dest_annotation
=
"../data/extracted_annotation/"
userChannel
=
"ECG1"
def
extract_data
(
args
):
file_name
=
args
[
0
]
last_annot
=
args
[
1
]
print
(
"working on file: {}"
.
format
(
file_name
))
single_name
=
os
.
path
.
basename
(
file_name
)
#Record reading
rec
=
wfdb
.
rdrecord
(
file_name
,
channel_names
=
[
userChannel
],
physical
=
False
)
#Input signal as a plain array
v
=
(
rec
.
d_signal
.
reshape
((
1
,
-
1
))[
0
])
.
tolist
()[
0
:
last_annot
+
10
]
t
=
list
(
range
(
len
(
v
)))[
0
:
last_annot
+
10
]
manager
=
csvManager
()
manager
.
write
(
t
,
v
,
os
.
path
.
join
(
dest_data
,
single_name
+
".bin"
))
def
extract_annot
(
file_name
):
single_name
=
os
.
path
.
basename
(
file_name
)
file_source
=
file_name
+
".atr"
file_dest
=
os
.
path
.
join
(
dest_annotation
,
single_name
+
".annot"
)
times
=
[
x
.
time
for
x
in
MITA
.
read_annotations
(
file_source
)
if
MITA
.
is_qrs_annotation
(
x
)]
df
=
pd
.
DataFrame
(
times
)
df
.
to_csv
(
file_dest
,
index
=
False
)
print
(
"Extracted annotation: {}"
.
format
(
file_name
))
return
times
[
-
1
]
def
process
(
multi
=
True
,
cores
=
1
):
# ------------ INIT ------------
if
not
os
.
path
.
isdir
(
dest_data
):
os
.
mkdir
(
dest_data
)
if
not
os
.
path
.
isdir
(
dest_annotation
):
os
.
mkdir
(
dest_annotation
)
# ------------ Extract DATA & ANNOTATIONS ------------
#find files:
files
=
[]
for
x
in
os
.
listdir
(
data_folder
):
thisFile
=
os
.
path
.
join
(
data_folder
,
x
)
thisFileNoExt
=
os
.
path
.
splitext
(
thisFile
)[
0
]
if
os
.
path
.
isfile
(
thisFile
)
and
os
.
path
.
exists
(
thisFileNoExt
+
".hea"
):
files
.
append
(
thisFileNoExt
)
listOfFiles
=
list
(
set
(
files
))
with
Pool
(
cores
)
as
pool
:
last_annot
=
pool
.
map
(
extract_annot
,
listOfFiles
)
pool
.
map
(
extract_data
,
zip
(
listOfFiles
,
last_annot
))
if
__name__
==
"__main__"
:
import
argparse
parser
=
argparse
.
ArgumentParser
()
parser
.
add_argument
(
"--cores"
,
help
=
"Force used number of cores (default, half of the available ones"
)
args
=
parser
.
parse_args
()
if
args
.
cores
is
not
None
:
used_cores
=
int
(
args
.
cores
)
else
:
used_cores
=
multiprocessing
.
cpu_count
()
//
2
print
(
f
"Extracting data with {used_cores} cores..."
)
process
(
multi
=
True
,
cores
=
used_cores
)
Event Timeline
Log In to Comment