diff --git a/src/dataset_generator.py b/src/dataset_generator.py index 266a322..efc653b 100644 --- a/src/dataset_generator.py +++ b/src/dataset_generator.py @@ -1,116 +1,118 @@ import os from typing import List import pandas as pd import numpy as np import jax.numpy as jnp DATA_PATH = "../ecg_syn/ecgsyn.dat" SAVE_PATH = "../dataset/beats.npy" OPT_FILE = "../ecg_syn/ecgsyn.opt" HR = 60 T_SPAN_RANDOM_SIGNAL_SECONDS = 2 def run_ECGSYN(data_path,freq,num_samples): dt = 1/freq if os.path.isfile(OPT_FILE): os.remove(OPT_FILE) command = f'cd ../ecg_syn/ ; ./ecgsyn -n {num_samples+2} -s {freq} -S {freq} -h {HR} %%' #num_samples+2 as the first and last heartbeat might be vexed os.system(command) data = pd.read_csv(data_path,delimiter=" ",header=None) return data def separate_beats(vs: np.ndarray, ms: List) -> List[np.ndarray]: out: List[np.ndarray] = [] min_value_idx: int = 0 min_value_idx_old: int = 0 min_value: float = np.inf in_t_p: bool = False for i,(v,m) in enumerate(zip(vs,ms)): if m == 5: in_t_p = True if m == 1: in_t_p = False out.append(vs[min_value_idx_old:min_value_idx]) min_value_idx_old = min_value_idx min_value = np.inf if in_t_p: if v List[np.ndarray]: out: List[np.ndarray] = [] small_len = min([len(w) for w in windows]) for w in windows: len_diff = len(w)-small_len out.append(w[len_diff:]-min(w[len_diff:])) return out def load_signal(num_pts,freq): tot_num_pts = 0 freq_this_file = 0 dataset = None if os.path.isfile(OPT_FILE): with open(OPT_FILE) as f: for l in f.readlines(): if "-s" in l: freq_this_file = int(l[3:12]) if "-n" in l: tot_num_pts = int(l[3:12]) if tot_num_pts>=num_pts and freq == freq_this_file: dataset = np.load(SAVE_PATH)[:num_pts] else: print(f"Present dataset do not respect given parameter (f: {freq_this_file}, pts: {tot_num_pts})") else: print("No signal to load/ Missing config file") return dataset def create_random_signal(coefs, ws, dt): t = np.linspace(0,T_SPAN_RANDOM_SIGNAL_SECONDS,int(T_SPAN_RANDOM_SIGNAL_SECONDS/dt)) x = np.zeros(len(t)) for w,c in zip(ws, coefs): x += c*np.sin(w*t) return x def create_positive_random_dataset(num_pts,freq): out = [] rng = np.random.default_rng(31415926514) ws = rng.choice(int(freq*10), size=100, replace=False)/100 #We do FREQ*10/100 so to have an big enough integer search space for rng.choich, and we divide by 10 so the maximum freq. is 1/10 of the sampling freq dt = 1/freq coefs = rng.choice(3000, size=100) for _ in range(num_pts): x = create_random_signal(coefs, ws, dt) x -= min(x)+0.01 out.append(x) out = np.array(out) return out def create_ECG_emulated_dataset(num_pts,freq): data = run_ECGSYN(data_path=DATA_PATH,freq=freq, num_samples=num_pts) v = data[1].to_numpy() marks = data[2].to_list() windows = separate_beats(v,marks)[:num_pts] windows_length_norm = normalize_length(windows) dataset_np = np.array(windows_length_norm) + if not os.path.isdir("../dataset"): + os.mkdir("../dataset") np.save(SAVE_PATH,dataset_np) return dataset_np def get_signal(type = 'load', num_pts = 1000, freq = 256): if type == 'random': dataset = create_positive_random_dataset(num_pts,freq) elif type == 'load': dataset = load_signal(num_pts,freq) elif type == 'create': dataset = create_ECG_emulated_dataset(num_pts,freq) else: print("Dataset type not recognized in 'get_signal()'") return jnp.array(dataset) def main() -> None: pass if __name__ == "__main__": main() \ No newline at end of file