Page Menu
Home
c4science
Search
Configure Global Search
Log In
Files
F65153880
ECG_lvlCrossing.py
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Subscribers
None
File Metadata
Details
File Info
Storage
Attached
Created
Sat, Jun 1, 06:51
Size
4 KB
Mime Type
text/x-python
Expires
Mon, Jun 3, 06:51 (2 d)
Engine
blob
Format
Raw Data
Handle
18013475
Attached To
R9636 Event based gQRS for ECG signals
ECG_lvlCrossing.py
View Options
"""
Created on Fri Feb 22 09:07:30 2019
Simple script to perform a non-uniform subsampling of an ECG record in the MIT-BIH format, using
the Wall-Danielsson algorithm. As output, it produces a 2-column csv file with the sample times
and values.
Dependencies:
- numpy
- tqdm (https://pypi.org/project/tqdm/)
- wfdb-python (https://pypi.org/project/wfdb/)
@author: T. Teijeiro
"""
import
numpy
as
np
from
tqdm
import
trange
import
os
def
ADC
(
v
,
nBits
=
5
,
hist
=
0
):
"""
To write
"""
delta
=
2048
dV
=
(
delta
)
/
(
2
**
nBits
)
lowTh
=
0
highTh
=
dV
pos
=
0
index
=
[]
temp
=
0
for
vn
in
v
:
h
=
highTh
+
int
(
hist
/
100
*
dV
)
l
=
lowTh
-
int
(
hist
/
100
*
dV
)
if
vn
>
h
:
index
.
append
(
temp
)
for
i
in
range
(
pos
,
2
**
nBits
):
lowTh
+=
dV
highTh
+=
dV
pos
+=
1
if
vn
<
highTh
:
break
if
vn
<
l
:
index
.
append
(
temp
)
for
i
in
range
(
pos
,
0
,
-
1
):
lowTh
-=
dV
highTh
-=
dV
pos
-=
1
if
vn
>
lowTh
:
break
temp
+=
1
return
index
if
__name__
==
"__main__"
:
import
argparse
import
wfdb
parser
=
argparse
.
ArgumentParser
(
description
=
'Performs a nonuniform subsampling of one lead '
'of a MIT-BIH ECG record, generating as output a two-column '
'csv file with the samples time and value.'
)
parser
.
add_argument
(
'-i'
,
metavar
=
'input'
,
required
=
True
,
help
=
'Input record to be processed'
)
parser
.
add_argument
(
'-o'
,
metavar
=
'output'
,
required
=
True
,
help
=
'Name of the output csv file'
)
parser
.
add_argument
(
'-l'
,
metavar
=
'lead'
,
default
=
'MLII'
,
help
=
'Name of the lead to subsample'
)
parser
.
add_argument
(
'--hist'
,
type
=
float
,
default
=
0
,
help
=
'Hysteresys (perccentage:0%100) for the level overlapping'
)
parser
.
add_argument
(
'-b'
,
required
=
True
,
type
=
int
,
help
=
(
'Number of bits to use in the subsampler, the number of level is given by 2^b'
))
args
=
parser
.
parse_args
()
if
os
.
path
.
isdir
(
args
.
i
):
if
not
os
.
path
.
exists
(
args
.
o
):
os
.
makedirs
(
args
.
o
)
subFolder
=
os
.
path
.
join
(
args
.
o
,
'lvlCrossingBits{}Hist{}'
.
format
(
args
.
b
,
args
.
hist
))
if
not
os
.
path
.
exists
(
subFolder
):
os
.
makedirs
(
subFolder
)
#find files:
files
=
[]
for
x
in
os
.
listdir
(
args
.
i
):
thisFile
=
os
.
path
.
join
(
args
.
i
,
x
)
thisFileNoExt
=
os
.
path
.
splitext
(
thisFile
)[
0
]
if
os
.
path
.
isfile
(
thisFile
)
and
os
.
path
.
exists
(
thisFileNoExt
+
".hea"
):
files
.
append
(
thisFileNoExt
)
n
=
1
listOfFiles
=
list
(
set
(
files
))
for
f
in
listOfFiles
:
print
(
"working on file: "
,
f
,
"( "
,
n
,
"/"
,
len
(
listOfFiles
),
")"
)
n
+=
1
singleName
=
os
.
path
.
basename
(
f
)
if
singleName
==
'102'
or
singleName
==
'104'
:
continue
else
:
userChannel
=
args
.
l
#Record reading
rec
=
wfdb
.
rdrecord
(
f
,
channel_names
=
[
userChannel
],
physical
=
False
)
#Input signal as a plain array
s
=
rec
.
d_signal
.
reshape
((
1
,
-
1
))[
0
]
#Subsampling
if
args
.
b
==
0
:
subsampled
=
list
(
range
(
len
(
s
)))
else
:
subsampled
=
ADC
(
s
,
nBits
=
args
.
b
,
hist
=
args
.
hist
)
#Result storage
fileName
=
os
.
path
.
basename
(
os
.
path
.
normpath
(
f
))
+
".csv"
outPath
=
os
.
path
.
join
(
subFolder
,
fileName
)
np
.
savetxt
(
outPath
,
np
.
column_stack
([
subsampled
,
s
[
subsampled
]]),
fmt
=
'
%d
'
,
delimiter
=
', '
)
else
:
#Record reading
singleName
=
os
.
path
.
basename
(
args
.
i
)
if
singleName
==
'102'
or
singleName
==
'104'
:
exit
()
else
:
userChannel
=
args
.
l
rec
=
wfdb
.
rdrecord
(
args
.
i
,
channel_names
=
[
userChannel
],
physical
=
False
)
#Input signal as a plain array
s
=
rec
.
d_signal
.
reshape
((
1
,
-
1
))[
0
]
#Subsampling
if
args
.
b
==
0
:
subsampled
=
list
(
range
(
len
(
s
)))
else
:
subsampled
=
ADC
(
s
,
nBits
=
args
.
b
,
hist
=
args
.
hist
)
#Result storage
np
.
savetxt
(
args
.
o
,
np
.
column_stack
([
subsampled
,
s
[
subsampled
]]),
fmt
=
'
%d
'
,
delimiter
=
', '
)
Event Timeline
Log In to Comment