Page MenuHomec4science

dataset_generator.py
No OneTemporary

File Metadata

Created
Wed, Aug 14, 16:11

dataset_generator.py

import os
from typing import List
import pandas as pd
import numpy as np
import jax.numpy as jnp
DATA_PATH = "../ecg_syn/ecgsyn.dat"
SAVE_PATH = "../dataset/beats.npy"
OPT_FILE = "../ecg_syn/ecgsyn.opt"
HR = 60
T_SPAN_RANDOM_SIGNAL_SECONDS = 2
MEM_FOR_ECGSYN = 32e9
BYTES_ECGSYN_BOINT = 1001
def run_ECGSYN(data_path,freq,num_samples):
dt = 1/freq
if os.path.isfile(OPT_FILE):
os.remove(OPT_FILE)
command = f'cd ../ecg_syn/ ; ./ecgsyn -n {num_samples+2} -s {freq} -S {freq} -h {HR} %%' #num_samples+2 as the first and last heartbeat might be vexed
os.system(command)
data = pd.read_csv(data_path,delimiter=" ",header=None)
return data
def separate_beats(vs: np.ndarray, ms: List) -> List[np.ndarray]:
out: List[np.ndarray] = []
min_value_idx: int = 0
min_value_idx_old: int = 0
min_value: float = np.inf
in_t_p: bool = False
for i,(v,m) in enumerate(zip(vs,ms)):
if m == 5:
in_t_p = True
if m == 1:
in_t_p = False
out.append(vs[min_value_idx_old:min_value_idx])
min_value_idx_old = min_value_idx
min_value = np.inf
if in_t_p:
if v<min_value:
min_value = v
min_value_idx = i
return out[1:] # We don't want the first window: it could be not a full window (recording start in-medias beat)
def find_len_left(windows: List[np.ndarray]) -> int:
len_left = len(windows[0])
for w in windows:
c = np.argmax(w)
if c<len_left:
len_left = c
return len_left
def find_len_right(windows: List[np.ndarray]) -> int:
len_right = len(windows[0])
for w in windows:
c = np.argmax(w)
l_r = len(w)-c-1
if l_r<len_right:
len_right = l_r
return len_right
def normalize_length(windows: List[np.ndarray]) -> List[np.ndarray]:
out: List[np.ndarray] = []
len_left = find_len_left(windows)
len_right = find_len_right(windows)
for w in windows:
c = np.argmax(w)
left_idx = c-len_left
rigth_idx = c+len_right+1
out.append(w[left_idx:rigth_idx])
return out
def load_signal(num_pts,freq):
tot_num_pts = 0
freq_this_file = 0
dataset = None
if os.path.isfile(OPT_FILE):
with open(OPT_FILE) as f:
for l in f.readlines():
if "-s" in l:
freq_this_file = int(l[3:12])
if "-n" in l:
tot_num_pts = int(l[3:12])
if freq == freq_this_file:
dataset = np.load(SAVE_PATH)[:num_pts]
if tot_num_pts>num_pts:
print(f"Loaded {tot_num_pts} points, the dataset contains {tot_num_pts} points")
elif tot_num_pts<num_pts:
print(f"Incoherent info. about number of pints, dataset length: {len(dataset)}")
else:
print(f"Present dataset do not respect given parameter (f: {freq_this_file}, pts: {tot_num_pts})")
else:
print("No signal to load/ Missing config file")
return dataset
def create_random_signal(coefs, ws, dt):
t = np.linspace(0,T_SPAN_RANDOM_SIGNAL_SECONDS,int(T_SPAN_RANDOM_SIGNAL_SECONDS/dt))
x = np.zeros(len(t))
for w,c in zip(ws, coefs):
x += c*np.sin(w*t)
return x
def create_positive_random_dataset(num_pts,freq):
out = []
rng = np.random.default_rng(31415926514)
ws = rng.choice(int(freq*10), size=100, replace=False)/100 #We do FREQ*10/100 so to have an big enough integer search space for rng.choich, and we divide by 10 so the maximum freq. is 1/10 of the sampling freq
dt = 1/freq
coefs = rng.choice(3000, size=100)
for _ in range(num_pts):
x = create_random_signal(coefs, ws, dt)
x -= min(x)+0.01
out.append(x)
out = np.array(out)
return out
def create_ECG_emulated_dataset(num_pts,freq):
windows = []
max_num_beat_this_freq = int(MEM_FOR_ECGSYN/(freq*BYTES_ECGSYN_BOINT)*60/HR)
print(f"Maximum number of beats at this freq (per ECGSYN run): {max_num_beat_this_freq}")
print(f"Beats desired: {num_pts}")
print(f"Running ECGSYN {num_pts//max_num_beat_this_freq} times")
for i in range(num_pts//max_num_beat_this_freq):
print("\n###########################")
print(f"#Generating {i+1}/{num_pts//max_num_beat_this_freq} datassets")
print("###########################\n")
data = run_ECGSYN(data_path=DATA_PATH,freq=freq, num_samples=max_num_beat_this_freq)
v = data[1].to_numpy()
marks = data[2].to_list()
windows.extend(separate_beats(v,marks)[:max_num_beat_this_freq])
#Tail
if num_pts%max_num_beat_this_freq != 0:
print("\n###########################")
print(f"#Running ECGSYN. Tail beats number: {num_pts%max_num_beat_this_freq}")
print("###########################\n")
num_beats_remaining = num_pts%max_num_beat_this_freq
data = run_ECGSYN(data_path=DATA_PATH,freq=freq, num_samples=num_beats_remaining)
v = data[1].to_numpy()
marks = data[2].to_list()
windows.extend(separate_beats(v,marks)[:num_beats_remaining])
windows_length_norm = normalize_length(windows)
dataset_np = np.array(windows_length_norm)
if not os.path.isdir("../dataset"):
os.mkdir("../dataset")
np.save(SAVE_PATH,dataset_np)
return dataset_np
def get_signal(type = 'load', num_pts = 1000, freq = 256):
if type == 'random':
dataset = create_positive_random_dataset(num_pts,freq)
elif type == 'load':
dataset = load_signal(num_pts,freq)
elif type == 'create':
dataset = create_ECG_emulated_dataset(num_pts,freq)
else:
print("Dataset type not recognized in 'get_signal()'")
return jnp.array(dataset)
def main() -> None:
pass
if __name__ == "__main__":
main()

Event Timeline