Page MenuHomec4science

utils.py
No OneTemporary

File Metadata

Created
Fri, Jul 26, 20:40

utils.py

import numpy as np
from matplotlib import pyplot as plt
#pyeddl
import pyeddl._core.eddl as eddl
import pyeddl._core.eddlT as eddlT
from pyeddl._core.eddl import BatchNormalization, L2, sgd, Input, Dense, Dropout, Activation, Reshape, MaxPool, Flatten, Conv, GlorotUniform
#sklearn
from sklearn.metrics import confusion_matrix, classification_report, roc_curve,roc_auc_score
NB_CHNS = 4
L2_K = 0.1
L2_B = 0.1
L2_A = 0.0
DROPOUT_RATE = 0.5
def getModel():
in_ = Input([1, NB_CHNS, 1280])
l = in_
l = L2(GlorotUniform(Conv(l, 16, [3, 5], [1,1], "same")), L2_K)
l = BatchNormalization(l, 0.99, 0.001)
l = Activation(l, "relu")
l = MaxPool(l, [1,2], [1,2], "same")
l = L2(GlorotUniform(Conv(l, 32, [3, 3], [1,1], "same")), L2_K)
l = BatchNormalization(l, 0.99, 0.001)
l = Activation(l, "relu")
l = MaxPool(l, [1,2], [1,2], "same")
l = L2(GlorotUniform(Conv(l, 32, [3, 3], [2,2], "same")), L2_K)
l = BatchNormalization(l, 0.99, 0.001)
l = Activation(l, "relu")
l = Dropout(l, DROPOUT_RATE)
l = Flatten(l)
l = L2(GlorotUniform(Dense(l, 64)), L2_K)
l = Activation(l, "relu")
l = Dropout(l, DROPOUT_RATE)
l = GlorotUniform(Dense(l, 2))
l = Activation(l, "softmax")
out_ = l
model = eddl.Model([in_], [out_])
eddl.build(model,
sgd(0.01, 0.9, 0.0, nesterov=True),
["cross_entropy"],
["categorical_accuracy"],
eddl.CS_CPU(1, 'full_mem'))
eddl.summary(model)
return model
#Train_set and test_set are pandas dataframes where the signal is in the column labeled "signal" and the label "label"
def cut_signal_data(train_set, test_set, size_train = 1280, stride_train = 640, size_test=1280, stride_test=1280):
x_train = []
y_train = []
x_test = []
y_test = []
#For each training seizure/pre-ictal signals
for row in train_set.itertuples(index = False):
signal = row.signal
label = row.label
#Cut signal in chunks of 5 seconds
signals = sliding_window(signal, size_train, stride_train)
labels = np.ones(len(signals))*label
x_train+=list(signals)
y_train+=list(labels)
#For each testing seizure/pre-ictal signals
for row in test_set.itertuples(index = False):
signal = row.signal
label = row.label
#Cut signal in chunks of 5 seconds
signals = sliding_window(signal, size_test, stride_test)
labels = np.ones(len(signals))*label
x_test+=list(signals)
y_test+=list(labels)
return x_train, y_train, x_test, y_test
#input: eeg signal, size: size of the window, stride: step
#Default : 1280 long windows with 50% overlap
#output : array of signal cuts according to the window size
def sliding_window(signal,size = 1280, stride = 640):
out = []
numOfChunks = int(((signal.shape[1]-size)/stride)+1)
for i in range(0,numOfChunks*stride,stride):
out.append(signal[:,i:i+size])
return out
def prepare_standardplot(title, xlabel):
fig, (ax1, ax2) = plt.subplots(1, 2)
fig.set_size_inches(12,6)
fig.suptitle(title)
ax1.set_ylabel('binary cross entropy')
ax1.set_xlabel(xlabel)
ax1.set_yscale('log')
ax2.set_ylabel('accuracy [% correct]')
ax2.set_xlabel(xlabel)
return fig, ax1, ax2
def finalize_standardplot(fig, ax1, ax2):
ax1handles, ax1labels = ax1.get_legend_handles_labels()
if len(ax1labels) > 0:
ax1.legend(ax1handles, ax1labels)
ax2handles, ax2labels = ax2.get_legend_handles_labels()
if len(ax2labels) > 0:
ax2.legend(ax2handles, ax2labels)
fig.tight_layout()
plt.subplots_adjust(top=0.9)
def plot_history(history, title):
fig, ax1, ax2 = prepare_standardplot(title, 'epoch')
ax1.plot(history.history['loss'], label = "training")
ax1.plot(history.history['val_loss'], label = "validation")
ax2.plot(history.history['acc'], label = "training")
ax2.plot(history.history['val_acc'], label = "validation")
finalize_standardplot(fig, ax1, ax2)
return fig
#Display model performance on test set
def show_confusion_matrix(y_true_, y_pred_, title):
print("Generating Confusion Matrix")
cm = confusion_matrix(y_pred=np.rint(y_pred_), y_true=np.array(y_true_))
print(cm)
plt.imshow(cm, cmap="inferno_r")
plt.title(title)
plt.show()
print(classification_report(y_pred=np.rint(y_pred_), y_true=np.array(y_true_)))
#Calculate accuracy on test set
def compute_accuracy(y_pred_, y_true_):
return 1 - np.sum(np.abs(np.max(np.round(y_pred_), axis=1) - np.array(y_true_)))/len(y_true_)
def false_positive_rate(y_test, y_pred, detect_rule):
time_hr = len(y_pred)*1280/(256*60*60)
preds = np.max(np.rint(y_pred), axis=1)
fp=0
x_ = np.zeros(detect_rule[1])
alarm_triggered = False
counter_alarm = 23 #Needs 1mn seconds between seizure onsets
false_alarms = []
for idx, x in enumerate(preds):
if(counter_alarm == 0):
alarm_triggered = False
else:
counter_alarm -= 1
if (alarm_triggered == False):
for j, y in enumerate(x_[::-1]):
if(j == len(x_)-1):
x_[1] = x_[0]
x_[0] = 0
else:
x_[len(x_)-1-j] = x_[len(x_)-2-j]
if(x==1):
x_[0]=1
if(np.sum(x_) >= detect_rule[0]):
fp+=1
alarm_triggered = True
counter_alarm = 23
false_alarms.append(idx)
fpr = fp/time_hr
return fpr, fp, false_alarms
#Compute detection time: first time to have 2 out of 3 segments being classified as ictal
def compute_detect_time(preds, test, detect_rule):
x_ = np.zeros(detect_rule[1])
for idx, x in enumerate(preds):
for j, y in enumerate(x_[::-1]):
if(j == len(x_)-1):
x_[1] = x_[0]
x_[0] = 0
else:
x_[len(x_)-1-j] = x_[len(x_)-2-j]
if(x==1):
x_[0]=1
if(np.sum(x_) >= detect_rule[0]):
return idx
return -1
def train(temp_data_folder, model_file, epochs, batch_size, learning_rate, x_train, y_train, x_val, y_val):
x_train = eddlT.create(x_train)
y_train = eddlT.create(y_train)
x_val = eddlT.create(x_val)
y_val = eddlT.create(y_val)
net = getModel()
eddl.setlr(net, [learning_rate])
for e in range(epochs):
print("Real Epoch number: {} of {}".format(e+1, epochs))
eddl.fit(net, [x_train], [y_train], batch_size, 1)
eddl.evaluate(net, [x_val], [y_val])
eddl.save(net, model_file)
def evaluate(temp_data_folder, model_file, x_test, y_test):
net = getModel()
eddl.load(net, model_file)
x_test = eddlT.create(x_test)
y_test = eddlT.create(y_test)
eddl.evaluate(net, [x_test], [y_test])

Event Timeline