Page MenuHomec4science

ECG_beats_division.py
No OneTemporary

File Metadata

Created
Tue, Jun 11, 17:04

ECG_beats_division.py

import os
import pickle
import sys
from time import time
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
scripts = '../helper_scripts'
if scripts not in sys.path:
sys.path.insert(0,scripts)
import multiprocessing
import shutil
from multiprocessing import Pool, Process
from csvManager import csvManager
#SETUP
data_raw = "../data/extracted_data"
data_lvl = "../data/level_crossing"
annot_folder = "../data/extracted_annotation"
out_dir = "../data/beats"
bits = 6
max_num_bits = 11
beat_to_plot = 253
file_to_extract = None #'16265' #None for all files
#MAIN FUNCTIONS
def extract_beats(data,boundaries):
# {beat_time_1:
# {'uniform':
# {'t':[],'v':[]},
# 'eb':
# {'t':[],'v':[]}
# }
# }
beats = {}
prior_idx_eb = 0
for b in boundaries:
#print(b)
idx_start = find_time_index_in_eb(data['t'], b[0], prior = prior_idx_eb, mode = "before")
idx_stop = find_time_index_in_eb(data['t'], b[2], prior = idx_start, mode = "after")
#print(f"Searched {b[0]} - {b[1]}. Indexes(time) found: {idx_start}({data['t'][idx_start]}) {idx_stop}({data['t'][idx_stop]})")
prior_idx_eb = idx_stop-1
t = data['t'][idx_start:idx_stop+1]
v = data['v'][idx_start:idx_stop+1]
beats[b[1]] = {'t':t,'v':v}
return beats
def get_boundaries(file_name):
boundaries = [] #list of touples (start,annotation,stop)
annot_file = os.path.join(annot_folder,os.path.basename(file_name).split(".")[0]+'.annot')
annotations = []
with open(annot_file) as f:
annotations = [int(t) for t in f.readlines()]
delta_t_bef = 0
delta_t_aft = 0
for i in range(1,len(annotations)):
delta_t_bef = (annotations[i]-annotations[i-1])*0.4
if i != len(annotations)-1:
delta_t_aft = (annotations[i+1]-annotations[i])*0.6
t_bef = annotations[i]-delta_t_bef
t_aft = annotations[i]+delta_t_aft
boundaries.append((t_bef,annotations[i],t_aft))
return boundaries
def get_data(file_name,bits):
#{'uniform':
# {'t':[],'v':[]},
# 'eb':
# {'t':[],'v':[]}
#}
manager = csvManager()
data = {}
if bits == 0:
data_file = os.path.join(data_raw,file_name)
else:
data_file = os.path.join(data_lvl,str(bits)+"bits",file_name)
t, v = manager.read(data_file)
data = {'t':t, 'v':v}
return data
def get_beats(file_name,bits):
data = get_data(file_name,bits)
boundaries = get_boundaries(file_name)
beats = extract_beats(data,boundaries)
return beats
# --------------------------- HELPER FUNCTIONS ---------------------------
def get_files(folder, pattern = None):
names = [name for name in os.listdir(folder) if "bin" in name]
if pattern is not None:
filt_fun = lambda strin: True if pattern in strin else False
names = list(filter(filt_fun,names))
return names
def find_time_index_in_eb(data_time, time, prior = 0, mode = "before"):
if time <= data_time[0]:
return 0
elif time >= data_time[-1]:
return (len(data_time)-1)
idx = None
#print(time,prior,len(data_time)-1)
for this_idx in range(prior,len(data_time)-1):
if time >= data_time[this_idx] and time < data_time[this_idx+1]:
if time == data_time[this_idx] or mode == "before":
idx = this_idx
elif mode == "after":
idx = this_idx+1
break
if idx == None:
for this_idx in range(0,prior):
if time >= data_time[this_idx] and time < data_time[this_idx+1]:
if time == data_time[this_idx] or mode == "before":
idx = this_idx
elif mode == "after":
idx = this_idx+1
break
return idx
def invert_inner_outer_keys(data):
data_out = {}
for k1 in data[list(data.keys())[0]].keys():
data_out[k1] = {}
for k0 in data.keys():
data_out[k1][k0] = data[k0][k1]
return data_out
def extract_beats_file(file_name):
beats_this_file = {}
print(f"----------------------- Analyzing file {file_name} ----------------------- ")
for b in range(max_num_bits+1):
#print(f"Analyzing file {file_name} sampled with {b} bits")
beats_this_file[b] = get_beats(file_name,b)
# Store data (serialize)
data_inverted_keys = invert_inner_outer_keys(beats_this_file)
file_out = os.path.join(out_dir,file_name.split(".")[0]+".pickle")
with open(file_out, 'wb') as handle:
pickle.dump(data_inverted_keys, handle, protocol=pickle.HIGHEST_PROTOCOL)
def process(multi = True, cores = 1):
files = get_files(data_raw, file_to_extract)
if os.path.isdir(out_dir):
shutil.rmtree(out_dir)
os.mkdir(out_dir)
if multi:
print(f"Executing beats subdivision with {cores} cores...")
used_cores = cores
with Pool(used_cores) as pool:
pool.map(extract_beats_file, files)
else:
for arg in files:
extract_beats_file(arg)
def test_performances():
times = []
print("~"*85)
print("Analyzing multicore performances in sampling signals with level crossing algorithm and saving in binary.\n"
"Usefull? No, interesting, yes.")
print("-"*85)
print("-"*85)
print("\n")
for core_num in range(1,multiprocessing.cpu_count()):
print("-"*85)
print(f"Using {core_num} cores...")
start = time()
process(multi=True, cores=core_num)
stop = time()
times.append(start-stop)
print("\n\n")
for core_num,t in zip(range(1,multiprocessing.cpu_count()-1),times):
print (f"Elapsed time using {core_num} cores: {int(t//60)}:{int((t-t//60*60)//1)}")
plt.figure()
plt.plot(times)
plt.savefig("../data/logs/perf2.png")
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--test", help="test multicore capabilities",
action="store_true")
parser.add_argument("--cores", help="Force used number of cores (default, half of the available ones")
args = parser.parse_args()
if args.test:
print("TEST MULTICORE...")
test_performances()
else:
if args.cores is not None:
used_cores = int(args.cores)
else:
used_cores = multiprocessing.cpu_count()//2
process(multi=True, cores=used_cores)

Event Timeline