Page MenuHomec4science

ECG_lvlCrossing.py
No OneTemporary

File Metadata

Created
Sat, Jun 1, 06:51

ECG_lvlCrossing.py

"""
Created on Fri Feb 22 09:07:30 2019
Simple script to perform a non-uniform subsampling of an ECG record in the MIT-BIH format, using
the Wall-Danielsson algorithm. As output, it produces a 2-column csv file with the sample times
and values.
Dependencies:
- numpy
- tqdm (https://pypi.org/project/tqdm/)
- wfdb-python (https://pypi.org/project/wfdb/)
@author: T. Teijeiro
"""
import numpy as np
from tqdm import trange
import os
def ADC(v, nBits = 5, hist = 0):
"""
To write
"""
delta = 2048
dV = (delta)/(2**nBits)
lowTh = 0
highTh = dV
pos = 0
index = []
temp = 0
for vn in v:
h = highTh + int(hist/100*dV)
l = lowTh - int(hist/100*dV)
if vn > h:
index.append(temp)
for i in range(pos,2**nBits):
lowTh += dV
highTh += dV
pos += 1
if vn < highTh:
break
if vn < l:
index.append(temp)
for i in range(pos,0,-1):
lowTh -= dV
highTh -= dV
pos -= 1
if vn > lowTh:
break
temp += 1
return index
if __name__ == "__main__":
import argparse
import wfdb
parser = argparse.ArgumentParser(description='Performs a nonuniform subsampling of one lead '
'of a MIT-BIH ECG record, generating as output a two-column '
'csv file with the samples time and value.')
parser.add_argument('-i', metavar='input', required=True, help='Input record to be processed')
parser.add_argument('-o', metavar='output', required=True, help='Name of the output csv file')
parser.add_argument('-l', metavar='lead', default='MLII', help='Name of the lead to subsample')
parser.add_argument('--hist', type=float, default = 0, help='Hysteresys (perccentage:0%100) for the level overlapping')
parser.add_argument('-b', required=True, type=int,
help=('Number of bits to use in the subsampler, the number of level is given by 2^b'))
args = parser.parse_args()
if os.path.isdir(args.i):
if not os.path.exists(args.o):
os.makedirs(args.o)
subFolder = os.path.join(args.o,'lvlCrossingBits{}Hist{}'.format(args.b,args.hist))
if not os.path.exists(subFolder):
os.makedirs(subFolder)
#find files:
files = []
for x in os.listdir(args.i):
thisFile = os.path.join(args.i,x)
thisFileNoExt = os.path.splitext(thisFile)[0]
if os.path.isfile(thisFile) and os.path.exists(thisFileNoExt+".hea"):
files.append(thisFileNoExt)
n = 1
listOfFiles = list(set(files))
for f in listOfFiles:
print("working on file: ",f,"( ",n,"/",len(listOfFiles),")")
n+=1
singleName = os.path.basename(f)
if singleName == '102' or singleName == '104':
continue
else:
userChannel = args.l
#Record reading
rec = wfdb.rdrecord(f, channel_names=[userChannel], physical=False)
#Input signal as a plain array
s = rec.d_signal.reshape((1, -1))[0]
#Subsampling
if args.b == 0:
subsampled = list(range(len(s)))
else:
subsampled = ADC(s, nBits = args.b, hist = args.hist)
#Result storage
fileName = os.path.basename(os.path.normpath(f))+".csv"
outPath = os.path.join(subFolder,fileName)
np.savetxt(outPath, np.column_stack([subsampled, s[subsampled]]), fmt='%d', delimiter=', ')
else:
#Record reading
singleName = os.path.basename(args.i)
if singleName == '102' or singleName == '104':
exit()
else:
userChannel = args.l
rec = wfdb.rdrecord(args.i, channel_names=[userChannel], physical=False)
#Input signal as a plain array
s = rec.d_signal.reshape((1, -1))[0]
#Subsampling
if args.b == 0:
subsampled = list(range(len(s)))
else:
subsampled = ADC(s, nBits = args.b, hist = args.hist)
#Result storage
np.savetxt(args.o, np.column_stack([subsampled, s[subsampled]]), fmt='%d', delimiter=', ')

Event Timeline