Page MenuHomec4science

ECG_lvlCrossing.py
No OneTemporary

File Metadata

Created
Sun, Oct 6, 15:45

ECG_lvlCrossing.py

"""
Created on Fri Feb 22 09:07:30 2019
Simple script to perform a non-uniform subsampling of an ECG record in the MIT-BIH format, using
the Wall-Danielsson algorithm. As output, it produces a 2-column csv file with the sample times
and values.
Dependencies:
- numpy
- tqdm (https://pypi.org/project/tqdm/)
- wfdb-python (https://pypi.org/project/wfdb/)
@authors: T. Teijeiro S.Zanoli
"""
import numpy as np
import pandas as pd
from tqdm import trange
import os
from multiprocessing import Pool, Process
import multiprocessing
import shutil
from csvManager import csvManager
from time import time
import matplotlib.pyplot as plt
data_folder = "../data/extracted_data"
out = "../data/level_crossing"
log_dir = "../data/logs/"
pos = False
bits_data = 11
bits = range(1,bits_data+1,1)
hister = 5
TEST_PERFORMANCES = True
def ADC(values, original_bits = 12, all_pos = True, nBits = 5, hist = 0):
"""
To write
"""
delta = 2**original_bits
dV = (delta)/(2**nBits)
hist = hist/100*dV
if all_pos:
#max_val = delta
min_val = 0
else:
#max_val = delta//2
min_val = -delta//2
lowTh = min_val
highTh = lowTh + dV
index = []
for val,time in zip(values,range(len(values))):
if val > highTh + hist or val < lowTh - hist:
index.append(time)
lowTh = min_val+((val-min_val)//dV)*dV #Delta from the bottom: (val-min_val)//dV*dV then compute the actual level summin min_val
highTh = lowTh + dV
return index
def sample(file):
manager = csvManager()
_,vals = manager.read(file)
#print("------------------------- Sub-sampling file: {} -------------------------".format(file))
for b in bits:
#print("\nFile: {}, subsampling with {} bits".format(file,b))
idxs = ADC(vals, original_bits = bits_data, all_pos = pos, nBits = b, hist = hister)
vals_samp = (np.array(vals)[idxs]).tolist()
compression = 1-len(vals_samp)/len(vals)
#---------- SAVE BACK ----------
out_dir = os.path.join(out,str(b)+"bits")
file_name = os.path.join(out_dir,os.path.basename(file))
manager.write(idxs,vals_samp,file_name)
log(file_name,b,compression)
def log (source_file, bits, compression):
name = os.path.basename(source_file).split('.')[0]
file_name = os.path.join(log_dir,str(bits)+"Bits")
str_to_write = name + ": "+str(compression)+"\n"
with open(file_name,"a") as f:
f.write(str_to_write)
def log_resume():
resume = {}
if not os.path.isdir(log_dir):
return
for l in os.listdir(log_dir):
if "resume" in l:
continue
bits = int(l[0:l.find("Bits")])
resume[bits] = {"avg":None,"std":None, "num_files":None}
compressions = []
text = ""
num_file = 0
with open(os.path.join(log_dir,l)) as f:
text = f.readlines()
for line in text:
num_file += 1
compr = float(line[line.find(": ")+len(": "):])
compressions.append(compr)
resume[bits]["avg"] = np.average(compressions)
resume[bits]["std"] = np.std(compressions)
resume[bits]["num_files"] = num_file
with open(os.path.join(log_dir,"resume.txt"),"w") as f:
keys = sorted(list(resume.keys()))
for k in keys:
line = "Bits: {}\t\tAvg: {}\tStD: {} (Total number of files:{})\n".format(str(k),resume[k]["avg"],resume[k]["std"],resume[k]["num_files"])
f.write(line)
def process(multi = False,cores = None):
names = [os.path.join(data_folder,name) for name in os.listdir(data_folder)]
if os.path.isdir(log_dir):
shutil.rmtree(log_dir)
os.mkdir(log_dir)
if os.path.isdir(out):
shutil.rmtree(out)
os.mkdir(out)
for b in bits:
out_dir = os.path.join(out,str(b)+"bits")
os.mkdir(out_dir)
#sample(names[0])
if multi:
used_cores = cores
with Pool(used_cores) as pool:
pool.map(sample, names)
else:
for arg in names:
sample(arg)
log_resume()
def test_performances():
times = []
print("~"*85)
print("Analyzing multicore performances in sampling signals with level crossing algorithm and saving in binary.\n"
"Usefull? No, interesting, yes.")
print("-"*85)
print("-"*85)
print("\nChecking single core (no library used)\n")
start = time()
process(multi=False)
stop = time()
print (f" Elapsed time for single core: {(stop - start)//60}:{((stop-start)-(stop - start)//60)//1}\n\n")
print("Using multicores...")
for core_num in range(1,multiprocessing.cpu_count()+1):
print("-"*85)
print(f"Using {core_num} cores...")
start = time()
process(multi=True, cores=core_num)
stop = time()
times.append(start-stop)
print (f" Elapsed time using {core_num} cores: {(stop - start)//60}:{((stop-start)-(stop - start)//60)//1}")
plt.figure()
plt.plot(times)
plt.savefig("./perf.png")
if __name__ == "__main__":
if TEST_PERFORMANCES:
test_performances()
else:
process(multiprocessing.cpu_count())

Event Timeline