Page MenuHomec4science

utils.py
No OneTemporary

File Metadata

Created
Sun, Jun 9, 06:13

utils.py

import numpy as np
from matplotlib import pyplot as plt
#sklearn
from sklearn.metrics import confusion_matrix, classification_report
#running .exe as a subprocess
import subprocess
#Train_set and test_set are pandas dataframes where the signal is in the column labeled "signal" and the label "label"
def cut_signal_data(train_set, test_set, size_train = 1280, stride_train = 640, size_test=1280, stride_test=1280):
x_train = []
y_train = []
x_test = []
y_test = []
#For each training seizure/pre-ictal signals
for row in train_set.itertuples(index = False):
signal = row.signal
label = row.label
#Cut signal in chunks of 5 seconds
signals = sliding_window(signal, size_train, stride_train)
labels = np.ones(len(signals))*label
x_train+=list(signals)
y_train+=list(labels)
#For each testing seizure/pre-ictal signals
for row in test_set.itertuples(index = False):
signal = row.signal
label = row.label
#Cut signal in chunks of 5 seconds
signals = sliding_window(signal, size_test, stride_test)
labels = np.ones(len(signals))*label
x_test+=list(signals)
y_test+=list(labels)
return x_train, y_train, x_test, y_test
#input: eeg signal, size: size of the window, stride: step
#Default : 1280 long windows with 50% overlap
#output : array of signal cuts according to the window size
def sliding_window(signal,size = 1280, stride = 640):
out = []
numOfChunks = int(((signal.shape[1]-size)/stride)+1)
for i in range(0,numOfChunks*stride,stride):
out.append(signal[:,i:i+size])
return out
def prepare_standardplot(title, xlabel):
fig, (ax1, ax2) = plt.subplots(1, 2)
fig.set_size_inches(12,6)
fig.suptitle(title)
ax1.set_ylabel('categorical cross entropy')
ax1.set_xlabel(xlabel)
ax1.set_yscale('log')
ax2.set_ylabel('categorical accuracy [% correct]')
ax2.set_xlabel(xlabel)
return fig, ax1, ax2
def finalize_standardplot(fig, ax1, ax2):
ax1handles, ax1labels = ax1.get_legend_handles_labels()
if len(ax1labels) > 0:
ax1.legend(ax1handles, ax1labels)
ax2handles, ax2labels = ax2.get_legend_handles_labels()
if len(ax2labels) > 0:
ax2.legend(ax2handles, ax2labels)
fig.tight_layout()
plt.subplots_adjust(top=0.9)
def plot_history(history, title):
fig, ax1, ax2 = prepare_standardplot(title, 'epoch')
ax1.plot(history.history['loss'], label = "training")
ax1.plot(history.history['val_loss'], label = "validation")
ax2.plot(history.history['acc'], label = "training")
ax2.plot(history.history['val_acc'], label = "validation")
finalize_standardplot(fig, ax1, ax2)
return fig
#Display model performance on test set
def show_confusion_matrix(y_true_, y_pred_, title):
cm = confusion_matrix(y_pred=np.max(np.round(y_pred_), axis=1), y_true=np.argmax(y_true_, axis=1))
print(cm)
plt.imshow(cm, cmap="inferno_r")
plt.title(title)
plt.show()
print(classification_report(y_pred=np.max(np.round(y_pred_), axis=1), y_true=np.argmax(y_true_, axis=1)))
#Calculate accuracy on test set
def compute_accuracy(y_pred_, y_true_):
return 1 - np.sum(np.abs(np.max(np.round(y_pred_), axis=1) - np.argmax(y_true_, axis=1)))/len(y_true_)
def false_positive_rate(y_test, y_pred, detect_rule):
time_hr = len(y_pred)*1280/(256*60*60)
preds = np.max(np.rint(y_pred), axis=1)
fp=0
x_ = np.zeros(detect_rule[1])
alarm_triggered = False
counter_alarm = 23 #Needs 1mn seconds between seizure onsets
false_alarms = []
for idx, x in enumerate(preds):
if(counter_alarm == 0):
alarm_triggered = False
else:
counter_alarm -= 1
if (alarm_triggered == False):
for j, y in enumerate(x_[::-1]):
if(j == len(x_)-1):
x_[1] = x_[0]
x_[0] = 0
else:
x_[len(x_)-1-j] = x_[len(x_)-2-j]
if(x==1):
x_[0]=1
if(np.sum(x_) >= detect_rule[0]):
fp+=1
alarm_triggered = True
counter_alarm = 23
false_alarms.append(idx)
fpr = fp/time_hr
return fpr, fp, false_alarms
#Compute detection time: first time to have 2 out of 3 segments being classified as ictal
def compute_detect_time(preds, test, detect_rule):
x_ = np.zeros(detect_rule[1])
for idx, x in enumerate(np.max(np.round(preds), axis=1)):
for j, y in enumerate(x_[::-1]):
if(j == len(x_)-1):
x_[1] = x_[0]
x_[0] = 0
else:
x_[len(x_)-1-j] = x_[len(x_)-2-j]
if(x==1):
x_[0]=1
if(np.sum(x_) >= detect_rule[0]):
return idx
return -1
def train(temp_data_folder, model_file, epochs, batch_size, learning_rate, x_train, y_train, x_val, y_val):
np.save(temp_data_folder+"x_train.npy", x_train)
np.save(temp_data_folder+"y_train.npy", y_train)
np.save(temp_data_folder+"x_val.npy", x_val)
np.save(temp_data_folder+"y_val.npy", y_val)
subprocess.check_call(["../build/basic_lopo.x", "fit", "--data_folder", temp_data_folder, "--model_file", model_file, "--epochs", str(epochs), "--batch_size", str(batch_size), "--learning_rate", str(learning_rate)])
def evaluate(temp_data_folder, model_file, x_test, y_test):
np.save(temp_data_folder+"x_test.npy", x_test)
np.save(temp_data_folder+"y_test.npy", y_test)
subprocess.check_call(["../build/basic_lopo.x", "predict", "--data_folder", temp_data_folder, "--model_file", model_file])

Event Timeline