Page Menu
Home
c4science
Search
Configure Global Search
Log In
Files
F61759599
model.py
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Subscribers
None
File Metadata
Details
File Info
Storage
Attached
Created
Wed, May 8, 19:24
Size
14 KB
Mime Type
text/x-python
Expires
Fri, May 10, 19:24 (1 d, 23 h)
Engine
blob
Format
Raw Data
Handle
17527801
Attached To
R11149 PDM-Nicola-Oulu
model.py
View Options
import
os
from
transformers
import
TFGPT2LMHeadModel
from
transformers
import
AutoTokenizer
,
AutoModelForQuestionAnswering
from
abc
import
abstractmethod
import
numpy
as
np
import
tensorflow
as
tf
import
torch
import
copy
from
transformers.generation_tf_utils
import
TFGenerationMixin
import
modif_gpt
as
mod_gpt
class
Model
:
def
__init__
(
self
,
home_path
,
finetune_path
,
model_path
,
probaMode
,
printStep
,
buckets
):
os
.
chdir
(
home_path
)
os
.
chdir
(
model_path
)
model_path
=
os
.
getcwd
()
self
.
home_path
=
home_path
self
.
home_path
=
home_path
self
.
finetune_path
=
finetune_path
self
.
model_path
=
model_path
self
.
model
=
None
self
.
tokenizer
=
None
self
.
probaMode
=
probaMode
self
.
buckets
=
buckets
self
.
__init_counters
()
self
.
printStep
=
printStep
self
.
sureLim
=
0.8
# private functions
def
__init_counters
(
self
):
self
.
count
=
np
.
zeros
((
self
.
buckets
,))
eps
=
1e-6
# small number, to avoid division by zero
self
.
count
+=
eps
tmp
=
np
.
zeros
((
self
.
buckets
,))
tmp
+=
eps
self
.
score
=
{
"tp"
:
copy
.
deepcopy
(
tmp
),
"fp"
:
copy
.
deepcopy
(
tmp
),
"fn"
:
copy
.
deepcopy
(
tmp
),
"tn"
:
copy
.
deepcopy
(
tmp
),
"f1"
:
copy
.
deepcopy
(
tmp
),
"recall"
:
copy
.
deepcopy
(
tmp
),
"precision"
:
copy
.
deepcopy
(
tmp
),
}
self
.
xsure
=
[]
self
.
ysure
=
[]
def
__printScores
(
self
,
hist
=
True
):
def
nbTolines
(
nb1
,
nb2
):
return
std_len
(
"|"
*
int
(
100
/
nb2
*
nb1
),
l
=
100
,
side
=
"right"
,
fill
=
"."
)
def
std_len
(
s
,
l
=
8
,
side
=
"left"
,
fill
=
" "
):
s
=
str
(
s
)
while
(
len
(
s
)
<
l
):
if
side
==
"left"
:
s
=
fill
+
s
else
:
s
+=
fill
return
s
def
doHist
(
tmp
,
var
):
if
i
==
self
.
buckets
-
1
:
next_bin
=
0
else
:
next_bin
=
var
[
i
+
1
]
tmp
+=
" - {}"
.
format
(
nbTolines
(
var
[
i
]
-
next_bin
,
var
[
0
])
)
return
tmp
console
=
""
tmp
=
""
tmp
+=
"Number of exact matches, for each ConfidenceScore threshold"
tmp
+=
"trs - used cases [%] - ConfidenceScore distribution"
for
i
in
range
(
self
.
buckets
):
tmp
+=
"{} - {}%"
.
format
(
std_len
(
self
.
lim
[
i
]),
std_len
(
int
(
100
/
self
.
count
[
0
]
*
self
.
count
[
i
]))
)
if
hist
:
tmp
=
doHist
(
tmp
,
self
.
count
)
tmp
+=
"
\n
"
console
+=
tmp
print
(
tmp
)
print
(
"
\n
{}
\n
"
.
format
(
"="
*
100
))
tmp
=
"trs - true positive [%] - false positive [%] - false negative [%] - true negative [%]"
for
i
in
range
(
self
.
buckets
):
tmp
+=
"{} - {}% - {}% - {}% - {}%
\n
"
.
format
(
std_len
(
self
.
lim
[
i
]),
std_len
(
int
(
100
/
self
.
count
[
0
]
*
self
.
score
[
"tp"
][
i
])),
std_len
(
int
(
100
/
self
.
count
[
0
]
*
self
.
score
[
"fp"
][
i
])),
std_len
(
int
(
100
/
self
.
count
[
0
]
*
self
.
score
[
"fn"
][
i
])),
std_len
(
int
(
100
/
self
.
count
[
0
]
*
self
.
score
[
"tn"
][
i
]))
)
console
+=
tmp
print
(
tmp
)
print
(
"
\n
{}
\n
"
.
format
(
"="
*
100
))
tmp
=
"
\n
trs - recall - precision - F1 - F1 histogram"
for
i
in
range
(
self
.
buckets
):
tmp
+=
"{} - {} - {} - {}"
.
format
(
std_len
(
self
.
lim
[
i
],
2
),
std_len
(
round
(
self
.
score
[
"recall"
][
i
],
2
)),
std_len
(
round
(
self
.
score
[
"precision"
][
i
],
2
)),
std_len
(
round
(
self
.
score
[
"f1"
][
i
],
2
))
)
if
hist
:
tmp
+=
" - "
+
nbTolines
(
self
.
score
[
"f1"
][
i
],
1
)
tmp
+=
"
\n
"
console
+=
tmp
print
(
tmp
)
print
(
"
\n
{}
\n
"
.
format
(
"="
*
100
))
return
console
def
__updateF1
(
self
):
for
i
in
range
(
self
.
buckets
):
self
.
score
[
"recall"
][
i
]
=
\
self
.
score
[
"tp"
][
i
]
/
(
self
.
score
[
"tp"
][
i
]
+
self
.
score
[
"fn"
][
i
])
self
.
score
[
"precision"
][
i
]
=
\
self
.
score
[
"tp"
][
i
]
/
(
self
.
score
[
"tp"
][
i
]
+
self
.
score
[
"fp"
][
i
])
self
.
score
[
"f1"
][
i
]
=
\
2
*
self
.
score
[
"precision"
][
i
]
*
self
.
score
[
"recall"
][
i
]
/
\
(
self
.
score
[
"precision"
][
i
]
+
self
.
score
[
"recall"
][
i
])
def
__check_answer
(
self
,
confidenceScore
,
corrAnswer
,
givenAnswer
):
for
i
,
l
in
enumerate
(
self
.
lim
):
# calculate true positive, ect
if
confidenceScore
>
l
:
self
.
count
[
i
]
+=
1
if
corrAnswer
==
givenAnswer
:
self
.
score
[
"tp"
][
i
]
+=
1
else
:
self
.
score
[
"fp"
][
i
]
+=
1
else
:
if
corrAnswer
==
givenAnswer
:
self
.
score
[
"fn"
][
i
]
+=
1
else
:
self
.
score
[
"tn"
][
i
]
+=
1
# update recall, precission and F1 score
self
.
__updateF1
()
# protected functions
@abstractmethod
def
_train
(
self
,
nbEpochs
,
outModelName
,
startCheckpoint
,
dataEnd
,
tokenizerLocaction
):
raise
NotImplementedError
(
"__train has to be defined in each child class"
)
@abstractmethod
def
_generate
(
self
,
x
,
y
):
raise
NotImplementedError
(
"_generate has to be defined in each child class"
)
# public functions
@abstractmethod
def
set_proba_mode
(
self
,
mode
):
raise
NotImplementedError
(
"set_proba_mode has to be defined in each child class"
)
def
getSureGuesses
(
self
):
return
[
self
.
xsure
,
self
.
ysure
]
def
train
(
self
,
nbEpochs
,
outModelName
,
startCheckpoint
=
None
,
dataEnd
=
""
,
tokenizerLocaction
=
None
):
print
(
"train ..."
)
if
startCheckpoint
==
None
:
print
(
"... from scratch, with the Tokenizer in {}"
.
format
(
tokenizerLocaction
))
raise
NotImplementedError
(
"Training from Scratch is not implemented yet"
)
else
:
pass
# tokenizerLocaction = startCheckpoint
cmd
=
self
.
_train
(
nbEpochs
,
outModelName
,
startCheckpoint
,
dataEnd
,
tokenizerLocaction
)
print
(
cmd
)
os
.
chdir
(
self
.
finetune_path
)
os
.
system
(
cmd
)
def
generate
(
self
,
data
,
idxStart
=
0
,
idx_End
=
None
):
self
.
__init_counters
()
if
idx_End
is
None
:
idx_End
=
len
(
data
[
0
])
X
=
data
[
0
][
idxStart
:
idx_End
]
Y
=
data
[
1
][
idxStart
:
idx_End
]
for
i
,
x
in
enumerate
(
X
):
corrAnswer
,
givenAnswer
,
confScore
=
self
.
_generate
(
x
,
Y
[
i
])
# print out all wrong answers
if
False
:
if
corrAnswer
!=
givenAnswer
:
print
(
"corr:<{}> vs given:<{}>"
.
format
(
corrAnswer
,
givenAnswer
))
self
.
__check_answer
(
confScore
,
corrAnswer
,
givenAnswer
)
if
confScore
>
self
.
sureLim
:
self
.
xsure
.
append
(
x
)
self
.
ysure
.
append
(
Y
[
i
])
if
self
.
printStep
>
0
and
(
i
+
1
)
%
self
.
printStep
==
0
:
print
(
"({}): Correct Answer / given Answer
\n
{} / {}"
.
format
(
i
,
corrAnswer
,
givenAnswer
))
_
=
self
.
__printScores
()
print
(
"Final scores"
)
console
=
self
.
__printScores
()
tp_lim0
=
int
(
100
/
self
.
count
[
0
]
*
self
.
score
[
"tp"
][
0
])
prec_lim0
=
self
.
score
[
"precision"
][
0
]
f1_lim0
=
self
.
score
[
"f1"
][
0
]
for
i
,
l
in
enumerate
(
self
.
lim
):
if
l
>=
self
.
sureLim
:
tp_limsure
=
int
(
100
/
self
.
count
[
0
]
*
self
.
score
[
"tp"
][
i
])
prec_limsure
=
self
.
score
[
"precision"
][
i
]
f1_limsure
=
self
.
score
[
"f1"
][
i
]
break
return
self
.
score
,
[
tp_lim0
,
tp_limsure
,
prec_lim0
,
prec_limsure
,
f1_lim0
,
f1_limsure
],
console
@abstractmethod
def
load_model
(
self
,
ModelName
):
raise
NotImplementedError
(
"load_model has to be defined in each child class"
)
class
GPTModel
(
Model
):
def
__init__
(
self
,
home_path
,
model_path
=
"./models"
,
probaMode
=
"longOk"
,
printStep
=
10
,
buckets
=
10
):
os
.
chdir
(
home_path
)
os
.
chdir
(
"./transformers/examples/"
)
os
.
chdir
(
"./language-modeling"
)
finetune_path
=
os
.
getcwd
()
super
()
.
__init__
(
home_path
,
finetune_path
,
model_path
,
probaMode
,
printStep
,
buckets
)
def
_train
(
self
,
nbEpochs
,
outModelName
,
startCheckpoint
,
dataEnd
,
tokenizerLocaction
):
cmd
=
"python run_clm.py
\
--model_type {}
\
--train_file
\"
{}
\"
\
--do_train
\
--validation_file
\"
{}
\"
\
--per_gpu_train_batch_size 1
\
--save_steps -1
\
--num_train_epochs {}
\
--fp16
\
--output_dir=
\"
{}
\"
\
"
.
format
(
"gpt2"
,
"train"
+
dataEnd
+
".txt"
,
"eval"
+
dataEnd
+
".txt"
,
nbEpochs
,
self
.
model_path
+
"/"
+
outModelName
)
# --do_eval \
if
startCheckpoint
is
not
None
:
if
startCheckpoint
not
in
[
"gpt2"
]:
startCheckpoint
=
self
.
model_path
+
"/"
+
startCheckpoint
tokenizerLocaction
=
startCheckpoint
# "gpt2"
# startCheckpoint = "gpt2"
cmd
+=
" --model_name_or_path {}"
.
format
(
startCheckpoint
)
if
tokenizerLocaction
is
not
None
:
cmd
+=
" --tokenizer_name {}"
.
format
(
tokenizerLocaction
)
return
cmd
def
__set_modif_gpt
(
self
):
TFGenerationMixin
.
_generate_no_beam_search
=
mod_gpt
.
_generate_no_beam_search_modif
TFGenerationMixin
.
generate
=
mod_gpt
.
generate_modif
def
_generate
(
self
,
x
,
y
):
self
.
__set_modif_gpt
()
input_ids
=
self
.
tokenizer
.
encode
(
x
,
return_tensors
=
'tf'
)
VERBOSE
=
"nothing_but_score"
VERBOSE
=
"nothing"
generated_text_samples
=
self
.
model
.
generate
(
input_ids
,
max_length
=
len
(
input_ids
[
0
])
+
50
,
num_return_sequences
=
1
,
no_repeat_ngram_size
=
0
,
repetition_penalty
=
1.0
,
top_p
=
1.0
,
temperature
=
1.0
,
do_sample
=
False
,
top_k
=
0
,
early_stopping
=
True
,
tokenizer
=
self
.
tokenizer
,
VERBOSE
=
VERBOSE
,
probaMode
=
self
.
probaMode
,
num_beams
=
1
,
force2nd
=
True
)
givenAnswer
=
generated_text_samples
[
0
]
corrAnswer
=
y
[:
-
len
(
"<|endoftext|>"
)
-
1
]
return
corrAnswer
,
givenAnswer
,
generated_text_samples
[
1
]
def
set_proba_mode
(
self
,
mode
):
if
mode
in
[
"mult"
,
"longOk"
]:
self
.
probaMode
=
mode
else
:
raise
NotImplementedError
(
"This probability mode is not yet implemented"
)
def
load_model
(
self
,
ModelName
):
self
.
model
=
TFGPT2LMHeadModel
.
from_pretrained
(
self
.
model_path
+
"/"
+
ModelName
,
from_pt
=
True
)
self
.
tokenizer
=
AutoTokenizer
.
from_pretrained
(
self
.
model_path
+
"/"
+
ModelName
)
class
BertModel
(
Model
):
def
__init__
(
self
,
home_path
,
model_path
=
"./models"
,
probaMode
=
"mult"
,
printStep
=
10
,
buckets
=
10
):
os
.
chdir
(
home_path
)
os
.
chdir
(
"./transformers/examples/"
)
os
.
chdir
(
"./question-answering"
)
finetune_path
=
os
.
getcwd
()
super
()
.
__init__
(
home_path
,
finetune_path
,
model_path
,
probaMode
,
printStep
,
buckets
)
# private functions
def
_train
(
self
,
nbEpochs
,
outModelName
,
startCheckpoint
,
dataEnd
,
tokenizerLocaction
):
cmd
=
"python run_qa.py
\
--train_file
\"
{}
\"
\
--do_train
\
--num_train_epochs
\"
{}
\"
\
--output_dir=
\"
{}
\"
\
--fp16
\
--save_steps -1
\
"
.
format
(
"train"
+
dataEnd
+
".json"
,
nbEpochs
,
self
.
model_path
+
"/"
+
outModelName
)
# --validation_file \"{}\" \
# --do_eval \
# "eval" + end + ".json"
if
startCheckpoint
is
not
None
:
if
startCheckpoint
not
in
[
"xlm-roberta-base"
,
"roberta-base"
]:
startCheckpoint
=
self
.
model_path
+
"/"
+
startCheckpoint
cmd
+=
" --model_name_or_path=
\"
{}
\"
"
.
format
(
startCheckpoint
)
return
cmd
def
_generate
(
self
,
x
,
y
):
tmp
=
x
.
split
(
"---"
)
text
=
tmp
[
1
]
question
=
tmp
[
0
]
# tokenize model input
inputs
=
self
.
tokenizer
(
question
,
text
,
return_tensors
=
'pt'
)
# generate network output
outputs
=
self
.
model
(
**
inputs
)
start_scores
=
outputs
.
start_logits
end_scores
=
outputs
.
end_logits
if
self
.
probaMode
==
"mult"
:
# find start and end of the answer
a
=
torch
.
argmax
(
start_scores
)
b
=
torch
.
argmax
(
end_scores
)
a
=
int
(
a
)
b
=
int
(
b
)
# get the probability of the answer
probs_a
=
tf
.
nn
.
softmax
(
start_scores
.
detach
())
probs_b
=
tf
.
nn
.
softmax
(
end_scores
.
detach
())
prob_a
=
probs_a
[
0
,
a
]
prob_b
=
probs_b
[
0
,
b
]
prob_ab
=
prob_a
*
prob_b
elif
self
.
probaMode
==
"forceNon0"
:
# force b to be higher then a
# find start and end of the answer
a
=
torch
.
argmax
(
start_scores
)
a
=
int
(
a
)
b
=
torch
.
argmax
(
end_scores
[
0
,
a
:])
b
=
int
(
b
)
# get the probability of the answer
probs_a
=
tf
.
nn
.
softmax
(
start_scores
.
detach
())
probs_b
=
tf
.
nn
.
softmax
(
end_scores
[
0
,
a
:]
.
detach
())
prob_a
=
probs_a
[
0
,
a
]
prob_b
=
probs_b
[
b
]
prob_ab
=
prob_a
*
prob_b
b
=
a
+
b
elif
self
.
probaMode
==
"maxNon0"
:
# search for the best non zero sequence
# find start and end of the answer
prob_ab
=
0
for
a_
in
range
(
len
(
start_scores
[
0
])):
b_
=
torch
.
argmax
(
end_scores
[
0
,
a_
:])
b_
=
int
(
b_
)
# get the probability of the answer
probs_a
=
tf
.
nn
.
softmax
(
start_scores
.
detach
())
probs_b
=
tf
.
nn
.
softmax
(
end_scores
[
0
,
a_
:]
.
detach
())
prob_a
=
probs_a
[
0
,
a_
]
prob_b
=
probs_b
[
b_
]
prob_ab_
=
prob_a
*
prob_b
if
prob_ab_
>
prob_ab
:
prob_ab
=
prob_ab_
a
=
a_
b
=
a_
+
b_
else
:
raise
NotImplementedError
(
"this probability mode is not implemented yet"
)
givenAnswer
=
self
.
tokenizer
.
decode
(
inputs
[
'input_ids'
][
0
][
int
(
a
):
int
(
b
)
+
1
])
# due to the tokenizer the answer often starts with a blank space, which is not part of the answer
if
len
(
givenAnswer
)
>
0
and
givenAnswer
[
0
]
==
" "
:
givenAnswer
=
givenAnswer
[
1
:]
return
y
,
givenAnswer
,
prob_ab
# public functions
def
set_proba_mode
(
self
,
mode
):
if
mode
in
[
"mult"
,
"forceNon0"
,
"maxNon0"
]:
self
.
probaMode
=
mode
else
:
raise
NotImplementedError
(
"this probability mode is not implemented yet"
)
def
load_model
(
self
,
ModelName
):
self
.
model
=
AutoModelForQuestionAnswering
.
from_pretrained
(
self
.
model_path
+
"/"
+
ModelName
)
self
.
tokenizer
=
AutoTokenizer
.
from_pretrained
(
self
.
model_path
+
"/"
+
ModelName
)
Event Timeline
Log In to Comment