diff --git a/pre_proc/ECG_lvlCrossing.py b/pre_proc/ECG_lvlCrossing.py index ffb9460..e2776a6 100644 --- a/pre_proc/ECG_lvlCrossing.py +++ b/pre_proc/ECG_lvlCrossing.py @@ -1,131 +1,160 @@ """ Created on Fri Feb 22 09:07:30 2019 Simple script to perform a non-uniform subsampling of an ECG record in the MIT-BIH format, using the Wall-Danielsson algorithm. As output, it produces a 2-column csv file with the sample times and values. Dependencies: - numpy - tqdm (https://pypi.org/project/tqdm/) - wfdb-python (https://pypi.org/project/wfdb/) @authors: T. Teijeiro S.Zanoli """ import numpy as np import pandas as pd from tqdm import trange import os -from multiprocessing import Pool +from multiprocessing import Pool, Lock, Process import multiprocessing import shutil data_folder = "../data/extracted_data" out = "../data/level_crossing" log_dir = "../data/logs/" bits = range(2,13,1) pos = False bits_data = 12 hister = 5 -FREE_CORES = 11 +FREE_CORES = 2 MULTI = True def ADC(values, original_bits = 12, all_pos = True, nBits = 5, hist = 0): """ To write """ delta = 2**original_bits dV = (delta)/(2**nBits) hist = hist/100*dV if all_pos: - max_val = delta + #max_val = delta min_val = 0 else: - max_val = delta//2 + #max_val = delta//2 min_val = -delta//2 lowTh = min_val highTh = lowTh + dV index = [] for val,time in zip(values,range(len(values))): if val > highTh + hist or val < lowTh - hist: index.append(time) lowTh = min_val+((val-min_val)//dV)*dV #Delta from the bottom: (val-min_val)//dV*dV then compute the actual level summin min_val highTh = lowTh + dV return index -def sample(file): +def sample(file,lock = None): data = pd.read_csv(file, header = None) vals = list(data[1]) print("------------------------- Sub-sampling file: {} -------------------------".format(file)) for b in bits: print("\nFile: {}, subsampling with {} bits".format(file,b)) - out_dir = os.path.join(out,str(b)+"bits") - if not os.path.isdir(out_dir): - os.mkdir(out_dir) - file_name = os.path.join(out_dir,os.path.basename(file)) idxs = np.array(ADC(vals, original_bits = bits_data, all_pos = pos, nBits = b, hist = hister)) vals_samp = np.array(vals)[idxs] out_array = np.array((idxs,vals_samp)) out_df = pd.DataFrame(out_array, index = None) compression = 1-len(vals_samp)/len(vals) + + #---------- SAVE BACK ---------- + out_dir = os.path.join(out,str(b)+"bits") + file_name = os.path.join(out_dir,os.path.basename(file)) out_df.to_csv(file_name) + + if lock is not None: + lock.acquire() log(file_name,b,compression) + if lock is not None: + lock.release() def log (source_file, bits, compression): - if not os.path.isdir(log_dir): - os.mkdir(log_dir) name = os.path.basename(source_file).split('.')[0] file_name = os.path.join(log_dir,str(bits)+"Bits") str_to_write = name + ": "+str(compression)+"\n" with open(file_name,"a") as f: f.write(str_to_write) def log_resume(): resume = {} if not os.path.isdir(log_dir): return for l in os.listdir(log_dir): if "resume" in l: continue bits = int(l[0:l.find("Bits")]) resume[bits] = {"avg":None,"std":None, "num_files":None} compressions = [] text = "" num_file = 0 with open(os.path.join(log_dir,l)) as f: text = f.readlines() for line in text: num_file += 1 compr = float(line[line.find(": ")+len(": "):]) compressions.append(compr) resume[bits]["avg"] = np.average(compressions) resume[bits]["std"] = np.std(compressions) resume[bits]["num_files"] = num_file with open(os.path.join(log_dir,"resume.txt"),"w") as f: keys = sorted(list(resume.keys())) for k in keys: - line = "Bits: {}\tAvg: {}\tStD: {} (Total number of files:{})\n".format(str(k),resume[k]["avg"],resume[k]["std"],resume[k]["num_files"]) + line = "Bits: {}\t\tAvg: {}\tStD: {} (Total number of files:{})\n".format(str(k),resume[k]["avg"],resume[k]["std"],resume[k]["num_files"]) f.write(line) - +def init_multiproc(l): + global lock + lock = l if __name__ == "__main__": + #lock = multiprocessing.Lock() + lock = Lock() + names = [os.path.join(data_folder,name) for name in os.listdir(data_folder)] if os.path.isdir(log_dir): shutil.rmtree(log_dir) os.mkdir(log_dir) if os.path.isdir(out): shutil.rmtree(out) os.mkdir(out) + for b in bits: + out_dir = os.path.join(out,str(b)+"bits") + os.mkdir(out_dir) #sample(names[0]) if MULTI: - with Pool(multiprocessing.cpu_count()-FREE_CORES) as pool: - print(f"using {multiprocessing.cpu_count()-FREE_CORES} cores") - pool.map(sample, names) + used_cores = multiprocessing.cpu_count()-FREE_CORES + batch_size = int(np.ceil(len(names)/used_cores)) + name_idx = 0 + for i in range(batch_size): + p = [] + for j in range(used_cores): + proc = Process(target=sample, args=(names[name_idx], lock)) + proc.start() + p.append(proc) + name_idx += 1 + if name_idx == len(names): + break + for j in range(len(p)): + p[j].join() + p[j].terminate() + + #with Pool(multiprocessing.cpu_count()-FREE_CORES,initializer=init_multiproc, initargs=(lock,)) as pool: + #print(f"using {multiprocessing.cpu_count()-FREE_CORES} cores") + #pool.map(sample, names) else: - for n in names: - sample(n) + for arg in names: + sample(arg) + lock.acquire() log_resume() + lock.release()