Page Menu
Home
c4science
Search
Configure Global Search
Log In
Files
F86422761
utils.py
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Subscribers
None
File Metadata
Details
File Info
Storage
Attached
Created
Sun, Oct 6, 09:59
Size
7 KB
Mime Type
text/x-python
Expires
Tue, Oct 8, 09:59 (1 d, 23 h)
Engine
blob
Format
Raw Data
Handle
21420645
Attached To
R9868 DeepHealth_UC13_seizure_detection
utils.py
View Options
import
numpy
as
np
from
matplotlib
import
pyplot
as
plt
#pyeddl
import
pyeddl._core.eddl
as
eddl
import
pyeddl._core.eddlT
as
eddlT
from
pyeddl._core.eddl
import
BatchNormalization
,
L2
,
sgd
,
Input
,
Dense
,
Dropout
,
Activation
,
Reshape
,
MaxPool
,
Flatten
,
Conv
,
GlorotUniform
#sklearn
from
sklearn.metrics
import
confusion_matrix
,
classification_report
,
roc_curve
,
roc_auc_score
NB_CHNS
=
4
L2_K
=
0.1
L2_B
=
0.1
L2_A
=
0.0
DROPOUT_RATE
=
0.5
def
getModel
():
in_
=
Input
([
1
,
NB_CHNS
,
1280
])
l
=
in_
l
=
L2
(
GlorotUniform
(
Conv
(
l
,
16
,
[
3
,
5
],
[
1
,
1
],
"same"
)),
L2_K
)
l
=
BatchNormalization
(
l
,
0.99
,
0.001
)
l
=
Activation
(
l
,
"relu"
)
l
=
MaxPool
(
l
,
[
1
,
2
],
[
1
,
2
],
"same"
)
l
=
L2
(
GlorotUniform
(
Conv
(
l
,
32
,
[
3
,
3
],
[
1
,
1
],
"same"
)),
L2_K
)
l
=
BatchNormalization
(
l
,
0.99
,
0.001
)
l
=
Activation
(
l
,
"relu"
)
l
=
MaxPool
(
l
,
[
1
,
2
],
[
1
,
2
],
"same"
)
l
=
L2
(
GlorotUniform
(
Conv
(
l
,
32
,
[
3
,
3
],
[
2
,
2
],
"same"
)),
L2_K
)
l
=
BatchNormalization
(
l
,
0.99
,
0.001
)
l
=
Activation
(
l
,
"relu"
)
l
=
Dropout
(
l
,
DROPOUT_RATE
)
l
=
Flatten
(
l
)
l
=
L2
(
GlorotUniform
(
Dense
(
l
,
64
)),
L2_K
)
l
=
Activation
(
l
,
"relu"
)
l
=
Dropout
(
l
,
DROPOUT_RATE
)
l
=
GlorotUniform
(
Dense
(
l
,
2
))
l
=
Activation
(
l
,
"softmax"
)
out_
=
l
model
=
eddl
.
Model
([
in_
],
[
out_
])
eddl
.
build
(
model
,
sgd
(
0.01
,
0.9
,
0.0
,
nesterov
=
True
),
[
"cross_entropy"
],
[
"categorical_accuracy"
],
eddl
.
CS_CPU
(
1
,
'full_mem'
))
eddl
.
summary
(
model
)
return
model
#Train_set and test_set are pandas dataframes where the signal is in the column labeled "signal" and the label "label"
def
cut_signal_data
(
train_set
,
test_set
,
size_train
=
1280
,
stride_train
=
640
,
size_test
=
1280
,
stride_test
=
1280
):
x_train
=
[]
y_train
=
[]
x_test
=
[]
y_test
=
[]
#For each training seizure/pre-ictal signals
for
row
in
train_set
.
itertuples
(
index
=
False
):
signal
=
row
.
signal
label
=
row
.
label
#Cut signal in chunks of 5 seconds
signals
=
sliding_window
(
signal
,
size_train
,
stride_train
)
labels
=
np
.
ones
(
len
(
signals
))
*
label
x_train
+=
list
(
signals
)
y_train
+=
list
(
labels
)
#For each testing seizure/pre-ictal signals
for
row
in
test_set
.
itertuples
(
index
=
False
):
signal
=
row
.
signal
label
=
row
.
label
#Cut signal in chunks of 5 seconds
signals
=
sliding_window
(
signal
,
size_test
,
stride_test
)
labels
=
np
.
ones
(
len
(
signals
))
*
label
x_test
+=
list
(
signals
)
y_test
+=
list
(
labels
)
return
x_train
,
y_train
,
x_test
,
y_test
#input: eeg signal, size: size of the window, stride: step
#Default : 1280 long windows with 50% overlap
#output : array of signal cuts according to the window size
def
sliding_window
(
signal
,
size
=
1280
,
stride
=
640
):
out
=
[]
numOfChunks
=
int
(((
signal
.
shape
[
1
]
-
size
)
/
stride
)
+
1
)
for
i
in
range
(
0
,
numOfChunks
*
stride
,
stride
):
out
.
append
(
signal
[:,
i
:
i
+
size
])
return
out
def
prepare_standardplot
(
title
,
xlabel
):
fig
,
(
ax1
,
ax2
)
=
plt
.
subplots
(
1
,
2
)
fig
.
set_size_inches
(
12
,
6
)
fig
.
suptitle
(
title
)
ax1
.
set_ylabel
(
'binary cross entropy'
)
ax1
.
set_xlabel
(
xlabel
)
ax1
.
set_yscale
(
'log'
)
ax2
.
set_ylabel
(
'accuracy [
% c
orrect]'
)
ax2
.
set_xlabel
(
xlabel
)
return
fig
,
ax1
,
ax2
def
finalize_standardplot
(
fig
,
ax1
,
ax2
):
ax1handles
,
ax1labels
=
ax1
.
get_legend_handles_labels
()
if
len
(
ax1labels
)
>
0
:
ax1
.
legend
(
ax1handles
,
ax1labels
)
ax2handles
,
ax2labels
=
ax2
.
get_legend_handles_labels
()
if
len
(
ax2labels
)
>
0
:
ax2
.
legend
(
ax2handles
,
ax2labels
)
fig
.
tight_layout
()
plt
.
subplots_adjust
(
top
=
0.9
)
def
plot_history
(
history
,
title
):
fig
,
ax1
,
ax2
=
prepare_standardplot
(
title
,
'epoch'
)
ax1
.
plot
(
history
.
history
[
'loss'
],
label
=
"training"
)
ax1
.
plot
(
history
.
history
[
'val_loss'
],
label
=
"validation"
)
ax2
.
plot
(
history
.
history
[
'acc'
],
label
=
"training"
)
ax2
.
plot
(
history
.
history
[
'val_acc'
],
label
=
"validation"
)
finalize_standardplot
(
fig
,
ax1
,
ax2
)
return
fig
#Display model performance on test set
def
show_confusion_matrix
(
y_true_
,
y_pred_
,
title
):
print
(
"Generating Confusion Matrix"
)
cm
=
confusion_matrix
(
y_pred
=
np
.
rint
(
y_pred_
),
y_true
=
np
.
array
(
y_true_
))
print
(
cm
)
plt
.
imshow
(
cm
,
cmap
=
"inferno_r"
)
plt
.
title
(
title
)
plt
.
show
()
print
(
classification_report
(
y_pred
=
np
.
rint
(
y_pred_
),
y_true
=
np
.
array
(
y_true_
)))
#Calculate accuracy on test set
def
compute_accuracy
(
y_pred_
,
y_true_
):
return
1
-
np
.
sum
(
np
.
abs
(
np
.
max
(
np
.
round
(
y_pred_
),
axis
=
1
)
-
np
.
array
(
y_true_
)))
/
len
(
y_true_
)
def
false_positive_rate
(
y_test
,
y_pred
,
detect_rule
):
time_hr
=
len
(
y_pred
)
*
1280
/
(
256
*
60
*
60
)
preds
=
np
.
max
(
np
.
rint
(
y_pred
),
axis
=
1
)
fp
=
0
x_
=
np
.
zeros
(
detect_rule
[
1
])
alarm_triggered
=
False
counter_alarm
=
23
#Needs 1mn seconds between seizure onsets
false_alarms
=
[]
for
idx
,
x
in
enumerate
(
preds
):
if
(
counter_alarm
==
0
):
alarm_triggered
=
False
else
:
counter_alarm
-=
1
if
(
alarm_triggered
==
False
):
for
j
,
y
in
enumerate
(
x_
[::
-
1
]):
if
(
j
==
len
(
x_
)
-
1
):
x_
[
1
]
=
x_
[
0
]
x_
[
0
]
=
0
else
:
x_
[
len
(
x_
)
-
1
-
j
]
=
x_
[
len
(
x_
)
-
2
-
j
]
if
(
x
==
1
):
x_
[
0
]
=
1
if
(
np
.
sum
(
x_
)
>=
detect_rule
[
0
]):
fp
+=
1
alarm_triggered
=
True
counter_alarm
=
23
false_alarms
.
append
(
idx
)
fpr
=
fp
/
time_hr
return
fpr
,
fp
,
false_alarms
#Compute detection time: first time to have 2 out of 3 segments being classified as ictal
def
compute_detect_time
(
preds
,
test
,
detect_rule
):
x_
=
np
.
zeros
(
detect_rule
[
1
])
for
idx
,
x
in
enumerate
(
preds
):
for
j
,
y
in
enumerate
(
x_
[::
-
1
]):
if
(
j
==
len
(
x_
)
-
1
):
x_
[
1
]
=
x_
[
0
]
x_
[
0
]
=
0
else
:
x_
[
len
(
x_
)
-
1
-
j
]
=
x_
[
len
(
x_
)
-
2
-
j
]
if
(
x
==
1
):
x_
[
0
]
=
1
if
(
np
.
sum
(
x_
)
>=
detect_rule
[
0
]):
return
idx
return
-
1
def
train
(
temp_data_folder
,
model_file
,
epochs
,
batch_size
,
learning_rate
,
x_train
,
y_train
,
x_val
,
y_val
):
x_train
=
eddlT
.
create
(
x_train
)
y_train
=
eddlT
.
create
(
y_train
)
x_val
=
eddlT
.
create
(
x_val
)
y_val
=
eddlT
.
create
(
y_val
)
net
=
getModel
()
eddl
.
setlr
(
net
,
[
learning_rate
])
for
e
in
range
(
epochs
):
print
(
"Real Epoch number: {} of {}"
.
format
(
e
+
1
,
epochs
))
eddl
.
fit
(
net
,
[
x_train
],
[
y_train
],
batch_size
,
1
)
eddl
.
evaluate
(
net
,
[
x_val
],
[
y_val
])
eddl
.
save
(
net
,
model_file
)
def
evaluate
(
temp_data_folder
,
model_file
,
x_test
,
y_test
):
net
=
getModel
()
eddl
.
load
(
net
,
model_file
)
x_test
=
eddlT
.
create
(
x_test
)
y_test
=
eddlT
.
create
(
y_test
)
eddl
.
evaluate
(
net
,
[
x_test
],
[
y_test
])
Event Timeline
Log In to Comment