Page Menu
Home
c4science
Search
Configure Global Search
Log In
Files
F60635543
model.py
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Subscribers
None
File Metadata
Details
File Info
Storage
Attached
Created
Wed, May 1, 15:12
Size
26 KB
Mime Type
text/x-python
Expires
Fri, May 3, 15:12 (1 d, 23 h)
Engine
blob
Format
Raw Data
Handle
17373643
Attached To
R11484 ADDI
model.py
View Options
import
os
from
transformers
import
TFGPT2LMHeadModel
from
transformers
import
AutoTokenizer
,
AutoModelForQuestionAnswering
from
abc
import
abstractmethod
import
numpy
as
np
import
tensorflow
as
tf
import
torch
import
copy
import
random
from
transformers.generation_tf_utils
import
TFGenerationMixin
import
modif_gpt
as
mod_gpt
class
Model
:
def
__init__
(
self
,
home_path
,
finetune_path
,
model_path
,
probaMode
,
printStep
,
buckets
):
os
.
chdir
(
home_path
)
os
.
chdir
(
model_path
)
model_path
=
os
.
getcwd
()
self
.
home_path
=
home_path
self
.
home_path
=
home_path
self
.
finetune_path
=
finetune_path
self
.
model_path
=
model_path
self
.
model
=
None
self
.
tokenizer
=
None
self
.
probaMode
=
probaMode
self
.
buckets
=
buckets
self
.
lim
=
[
x
/
self
.
buckets
for
x
in
range
(
self
.
buckets
)]
self
.
scores
=
{
"all"
:
self
.
__init_counters
()}
self
.
printStep
=
printStep
self
.
sureLim
=
0.5
self
.
keywords
=
[
"<input>"
,
"<answer>"
,
"<find>"
,
"<|endoftext|>"
]
# private functions
def
__init_counters
(
self
):
eps
=
1e-6
# small number, to avoid division by zero
tmp
=
np
.
zeros
((
self
.
buckets
,))
tmp
+=
eps
score
=
{
"tp"
:
copy
.
deepcopy
(
tmp
),
"fp"
:
copy
.
deepcopy
(
tmp
),
"fn"
:
copy
.
deepcopy
(
tmp
),
"tn"
:
copy
.
deepcopy
(
tmp
),
"f1"
:
copy
.
deepcopy
(
tmp
),
"recall"
:
copy
.
deepcopy
(
tmp
),
"precision"
:
copy
.
deepcopy
(
tmp
),
}
score
[
"count"
]
=
np
.
zeros
((
self
.
buckets
,))
score
[
"count"
]
+=
eps
score
[
"xsure"
]
=
[]
score
[
"ysure"
]
=
[]
return
copy
.
deepcopy
(
score
)
def
__printScores
(
self
,
hist
=
True
):
def
nbTolines
(
nb1
,
nb2
):
return
std_len
(
"|"
*
int
(
100
/
nb2
*
nb1
),
l
=
100
,
side
=
"right"
,
fill
=
"."
)
def
std_len
(
s
,
l
=
8
,
side
=
"left"
,
fill
=
" "
):
s
=
str
(
s
)
while
(
len
(
s
)
<
l
):
if
side
==
"left"
:
s
=
fill
+
s
else
:
s
+=
fill
return
s
def
doHist
(
tmp
,
var
):
if
i
==
self
.
buckets
-
1
:
next_bin
=
0
else
:
next_bin
=
var
[
i
+
1
]
tmp
+=
" - {}"
.
format
(
nbTolines
(
var
[
i
]
-
next_bin
,
var
[
0
])
)
return
tmp
console
=
""
tmp
=
""
keys
=
self
.
scores
.
keys
()
for
key
in
keys
:
tmp
+=
"{}
\n
<{}>
\n
{}
\n
"
.
format
(
"="
*
50
,
key
,
"="
*
50
)
tmp
+=
"Number of exact matches, for each ConfidenceScore threshold"
tmp
+=
"trs - used cases [%] - ConfidenceScore distribution
\n
"
for
i
in
range
(
self
.
buckets
):
tmp
+=
"{} - {}%"
.
format
(
std_len
(
self
.
lim
[
i
]),
std_len
(
int
(
100
/
self
.
scores
[
key
][
"count"
][
0
]
*
self
.
scores
[
key
][
"count"
][
i
]))
)
if
hist
:
tmp
=
doHist
(
tmp
,
self
.
scores
[
key
][
"count"
])
tmp
+=
"
\n
"
console
+=
tmp
print
(
tmp
)
print
(
"
\n
{}
\n
"
.
format
(
"="
*
100
))
tmp
=
"
\n
trs - true positive - false positive - false negative - true negative
\n
"
for
i
in
range
(
self
.
buckets
):
tmp
+=
"{} - {}% - {}% - {}% - {}%
\n
"
.
format
(
std_len
(
self
.
lim
[
i
]),
# std_len(int(100/self.scores[key]["count"][0]*self.scores[key]["tp"][i])),
# std_len(int(100 / self.scores[key]["count"][0] * self.scores[key]["fp"][i])),
# std_len(int(100 / self.scores[key]["count"][0] * self.scores[key]["fn"][i])),
# std_len(int(100 / self.scores[key]["count"][0] * self.scores[key]["tn"][i]))
std_len
(
self
.
scores
[
key
][
"tp"
][
i
]),
std_len
(
self
.
scores
[
key
][
"fp"
][
i
]),
std_len
(
self
.
scores
[
key
][
"fn"
][
i
]),
std_len
(
self
.
scores
[
key
][
"tn"
][
i
])
)
console
+=
tmp
print
(
tmp
)
print
(
"
\n
{}
\n
"
.
format
(
"="
*
100
))
tmp
=
"
\n
trs - recall - precision - F1 - F1 histogram
\n
"
for
i
in
range
(
self
.
buckets
):
tmp
+=
"{} - {} - {} - {}"
.
format
(
std_len
(
self
.
lim
[
i
],
2
),
std_len
(
round
(
self
.
scores
[
key
][
"recall"
][
i
],
2
)),
std_len
(
round
(
self
.
scores
[
key
][
"precision"
][
i
],
2
)),
std_len
(
round
(
self
.
scores
[
key
][
"f1"
][
i
],
2
))
)
if
hist
:
tmp
+=
" - "
+
nbTolines
(
self
.
scores
[
key
][
"f1"
][
i
],
1
)
tmp
+=
"
\n
"
console
+=
tmp
print
(
tmp
)
print
(
"
\n
{}
\n
"
.
format
(
"="
*
100
))
return
console
def
__updateF1
(
self
,
question
):
for
key
in
[
"all"
,
question
]:
for
i
in
range
(
self
.
buckets
):
self
.
scores
[
key
][
"recall"
][
i
]
=
\
self
.
scores
[
key
][
"tp"
][
i
]
/
(
self
.
scores
[
key
][
"tp"
][
i
]
+
self
.
scores
[
key
][
"fn"
][
i
])
self
.
scores
[
key
][
"precision"
][
i
]
=
\
self
.
scores
[
key
][
"tp"
][
i
]
/
(
self
.
scores
[
key
][
"tp"
][
i
]
+
self
.
scores
[
key
][
"fp"
][
i
])
self
.
scores
[
key
][
"f1"
][
i
]
=
\
2
*
self
.
scores
[
key
][
"precision"
][
i
]
*
self
.
scores
[
key
][
"recall"
][
i
]
/
\
(
self
.
scores
[
key
][
"precision"
][
i
]
+
self
.
scores
[
key
][
"recall"
][
i
])
def
__check_answer
(
self
,
confidenceScore
,
corrAnswer
,
givenAnswer
,
question
):
for
i
,
l
in
enumerate
(
self
.
lim
):
# calculate true positive, ect
if
confidenceScore
>
l
:
self
.
_incr_scores
(
question
,
"count"
,
i
)
if
corrAnswer
==
givenAnswer
:
self
.
_incr_scores
(
question
,
"tp"
,
i
)
else
:
self
.
_incr_scores
(
question
,
"fp"
,
i
)
else
:
if
corrAnswer
==
givenAnswer
:
self
.
_incr_scores
(
question
,
"fn"
,
i
)
else
:
self
.
_incr_scores
(
question
,
"tn"
,
i
)
"""
print("for <all>:")
print(self.scores["all"]["count"])
print(self.scores["all"]["tp"])
print("for <{}>:".format(question))
print(self.scores[question]["count"])
print(self.scores[question]["tp"])
"""
# update recall, precission and F1 score
self
.
__updateF1
(
question
)
# protected functions
def
_incr_scores
(
self
,
question
,
key
,
i
=
None
,
inc
=
1
):
# create a full subdictionary, if it's a unseen question
if
question
not
in
self
.
scores
.
keys
():
print
(
"found a new question: {}"
.
format
(
question
))
self
.
scores
[
question
]
=
self
.
__init_counters
()
#update the value of "all"
if
question
is
not
"all"
and
key
not
in
[
"f1"
,
"recall"
,
"precision"
]:
if
i
is
None
:
self
.
scores
[
"all"
][
key
]
+=
inc
else
:
self
.
scores
[
"all"
][
key
][
i
]
+=
inc
# set the main value
if
i
is
None
:
self
.
scores
[
question
][
key
]
+=
inc
else
:
self
.
scores
[
question
][
key
][
i
]
+=
inc
@abstractmethod
def
_train
(
self
,
nbEpochs
,
outModelName
,
startCheckpoint
,
dataEnd
,
tokenizerLocaction
):
raise
NotImplementedError
(
"__train has to be defined in each child class"
)
@abstractmethod
def
_generate
(
self
,
x
,
y
,
x_bert
,
y_bert
):
raise
NotImplementedError
(
"_generate has to be defined in each child class"
)
@abstractmethod
def
_generate_demo
(
self
,
c
,
q
):
raise
NotImplementedError
(
"_generate_demo has to be defined in each child class"
)
def
_clean_sure
(
self
):
keys
=
self
.
scores
.
keys
()
invalid
=
[]
for
key
in
keys
:
if
key
is
not
"all"
and
len
(
self
.
scores
[
key
][
"ysure"
])
>
0
:
inv
=
[]
occ
=
[[
x
,
self
.
scores
[
key
][
"ysure"
]
.
count
(
x
)]
for
x
in
set
(
self
.
scores
[
key
][
"ysure"
])]
# order it in decreasing order
occ_nb
=
[
x
[
1
]
for
x
in
occ
]
occ_nb_tot
=
0
for
x
in
occ_nb
:
occ_nb_tot
+=
x
occ
=
[
x
for
_
,
x
in
sorted
(
zip
(
occ_nb
,
occ
))]
# one dominant solution (> 90%)
if
occ
[
-
1
][
1
]
/
occ_nb_tot
>
0.9
:
for
x
in
occ
[:
-
1
]:
inv
.
append
(
x
[
0
])
# two dominant solutions
elif
(
occ
[
-
1
][
1
]
+
occ
[
-
2
][
1
])
/
occ_nb_tot
>
0.9
:
for
x
in
occ
[:
-
2
]:
inv
.
append
(
x
[
0
])
# no dominant solution
else
:
# it looks suspicious if a solution is choosen more often then 10%
for
x
in
occ
:
if
x
[
1
]
/
occ_nb_tot
>
0.1
:
inv
.
append
(
x
[
0
])
print
(
key
)
print
(
"The invalid answers for the key <{}> are {}"
.
format
(
key
,
inv
))
self
.
scores
[
key
][
"xsure"
]
=
\
[
x
for
i
,
x
in
enumerate
(
self
.
scores
[
key
][
"xsure"
])
if
self
.
scores
[
key
][
"ysure"
][
i
]
not
in
inv
]
self
.
scores
[
key
][
"ysure"
]
=
\
[
x
for
i
,
x
in
enumerate
(
self
.
scores
[
key
][
"ysure"
])
if
self
.
scores
[
key
][
"ysure"
][
i
]
not
in
inv
]
invalid
+=
inv
print
(
invalid
)
print
(
"The invalid answers for the key <{}> are {}"
.
format
(
"all"
,
invalid
))
self
.
scores
[
"all"
][
"xsure"
]
=
\
[
x
for
i
,
x
in
enumerate
(
self
.
scores
[
"all"
][
"xsure"
])
if
self
.
scores
[
"all"
][
"ysure"
][
i
]
not
in
invalid
]
self
.
scores
[
"all"
][
"ysure"
]
=
\
[
x
for
i
,
x
in
enumerate
(
self
.
scores
[
"all"
][
"ysure"
])
if
self
.
scores
[
"all"
][
"ysure"
][
i
]
not
in
invalid
]
# public functions
@abstractmethod
def
set_proba_mode
(
self
,
mode
):
raise
NotImplementedError
(
"set_proba_mode has to be defined in each child class"
)
def
getSureGuesses
(
self
):
return
[
self
.
scores
[
"all"
][
"xsure"
],
self
.
scores
[
"all"
][
"ysure"
]]
def
train
(
self
,
nbEpochs
,
outModelName
,
startCheckpoint
=
None
,
dataEnd
=
""
,
tokenizerLocaction
=
None
):
print
(
"train ..."
)
if
startCheckpoint
==
None
:
print
(
"... from scratch, with the Tokenizer in {}"
.
format
(
tokenizerLocaction
))
raise
NotImplementedError
(
"Training from Scratch is not implemented yet"
)
else
:
pass
# tokenizerLocaction = startCheckpoint
cmd
=
self
.
_train
(
nbEpochs
,
outModelName
,
startCheckpoint
,
dataEnd
,
tokenizerLocaction
)
print
(
cmd
)
os
.
chdir
(
self
.
finetune_path
)
os
.
system
(
cmd
)
def
generate_demo
(
self
,
c
,
q
):
givenAnswer
,
confScore
=
self
.
_generate_demo
(
c
,
q
)
return
givenAnswer
,
confScore
def
generate
(
self
,
data
,
idxStart
=
0
,
idx_End
=
None
,
data_bert
=
None
):
self
.
scores
=
{
"all"
:
self
.
__init_counters
()}
if
idx_End
is
None
:
idx_End
=
len
(
data
[
0
])
X
=
data
[
0
][
idxStart
:
idx_End
]
Y
=
data
[
1
][
idxStart
:
idx_End
]
if
data_bert
is
not
None
:
X_bert
=
data_bert
[
0
][
idxStart
:
idx_End
]
Y_bert
=
data_bert
[
1
][
idxStart
:
idx_End
]
else
:
X_bert
=
X
Y_bert
=
Y
errs
=
0
for
i
,
x
in
enumerate
(
X
):
if
i
%
100
==
0
:
print
(
i
)
if
True
:
#try:
corrAnswer
,
givenAnswer
,
confScore
,
question
=
self
.
_generate
(
x
,
Y
[
i
],
X_bert
[
i
],
Y_bert
[
i
])
else
:
#except:
print
(
"Model produced some error"
)
# i.e due to too long input (Roberta)
# add one additional example
X
.
append
(
data
[
0
][
idx_End
+
errs
])
Y
.
append
(
data
[
1
][
idx_End
+
errs
])
if
data_bert
is
not
None
:
X_bert
.
append
(
data_bert
[
0
][
idx_End
+
errs
])
Y_bert
.
append
(
data_bert
[
1
][
idx_End
+
errs
])
else
:
X_bert
=
X
Y_bert
=
Y
errs
+=
1
continue
# print out all wrong answers
if
True
:
if
corrAnswer
!=
givenAnswer
:
print
(
"corr:<{}> vs given:<{}> ({})"
.
format
(
corrAnswer
,
givenAnswer
,
question
))
self
.
__check_answer
(
confScore
,
corrAnswer
,
givenAnswer
,
question
)
if
confScore
>
self
.
sureLim
:
self
.
scores
[
question
][
"xsure"
]
.
append
(
x
)
self
.
scores
[
question
][
"ysure"
]
.
append
(
Y
[
i
])
self
.
scores
[
"all"
][
"xsure"
]
.
append
(
x
)
self
.
scores
[
"all"
][
"ysure"
]
.
append
(
Y
[
i
])
if
self
.
printStep
>
0
and
(
i
+
1
)
%
self
.
printStep
==
0
:
print
(
"({}): Correct Answer / given Answer
\n
{} / {}"
.
format
(
i
,
corrAnswer
,
givenAnswer
))
_
=
self
.
__printScores
()
print
(
"Final scores"
)
self
.
_clean_sure
()
console
=
self
.
__printScores
()
tp_lim0
=
int
(
100
/
self
.
scores
[
"all"
][
"count"
][
0
]
*
self
.
scores
[
"all"
][
"tp"
][
0
])
prec_lim0
=
self
.
scores
[
"all"
][
"precision"
][
0
]
f1_lim0
=
self
.
scores
[
"all"
][
"f1"
][
0
]
for
i
,
l
in
enumerate
(
self
.
lim
):
if
l
>=
self
.
sureLim
:
tp_limsure
=
int
(
100
/
self
.
scores
[
"all"
][
"count"
][
0
]
*
self
.
scores
[
"all"
][
"tp"
][
i
])
prec_limsure
=
self
.
scores
[
"all"
][
"precision"
][
i
]
f1_limsure
=
self
.
scores
[
"all"
][
"f1"
][
i
]
break
return
self
.
scores
,
[
tp_lim0
,
tp_limsure
,
prec_lim0
,
prec_limsure
,
f1_lim0
,
f1_limsure
],
console
@abstractmethod
def
load_model
(
self
,
ModelName
):
raise
NotImplementedError
(
"load_model has to be defined in each child class"
)
class
GPTModel
(
Model
):
def
__init__
(
self
,
home_path
,
model_path
=
"./models"
,
probaMode
=
"longOk"
,
printStep
=
10
,
buckets
=
10
):
os
.
chdir
(
home_path
)
os
.
chdir
(
"./transformers/examples/"
)
os
.
chdir
(
"./language-modeling"
)
finetune_path
=
os
.
getcwd
()
super
()
.
__init__
(
home_path
,
finetune_path
,
model_path
,
probaMode
,
printStep
,
buckets
)
def
_train
(
self
,
nbEpochs
,
outModelName
,
startCheckpoint
,
dataEnd
,
tokenizerLocaction
):
cmd
=
"python run_clm.py
\
--model_type {}
\
--train_file
\"
{}
\"
\
--do_train
\
--validation_file
\"
{}
\"
\
--per_gpu_train_batch_size 1
\
--save_steps -1
\
--num_train_epochs {}
\
--fp16
\
--output_dir=
\"
{}
\"
\
"
.
format
(
"gpt2"
,
"train"
+
dataEnd
+
".txt"
,
"eval"
+
dataEnd
+
".txt"
,
nbEpochs
,
self
.
model_path
+
"/"
+
outModelName
)
# --do_eval \
if
startCheckpoint
is
not
None
:
if
startCheckpoint
not
in
[
"gpt2"
,
"gpt2-medium"
,
"gpt2-large"
,
"gpt2-xl"
]:
startCheckpoint
=
self
.
model_path
+
"/"
+
startCheckpoint
tokenizerLocaction
=
startCheckpoint
# "gpt2"
# startCheckpoint = "gpt2"
cmd
+=
" --model_name_or_path {}"
.
format
(
startCheckpoint
)
if
tokenizerLocaction
is
not
None
:
cmd
+=
" --tokenizer_name {}"
.
format
(
tokenizerLocaction
)
return
cmd
def
__set_modif_gpt
(
self
):
TFGenerationMixin
.
_generate_no_beam_search
=
mod_gpt
.
_generate_no_beam_search_modif
TFGenerationMixin
.
generate
=
mod_gpt
.
generate_modif
def
_generate_demo
(
self
,
c
,
q
):
self
.
keywords
=
[
"<input>"
,
"<answer>"
,
"<find>"
,
"<|endoftext|>"
]
query
=
self
.
keywords
[
0
]
+
c
+
self
.
keywords
[
2
]
+
q
+
self
.
keywords
[
1
]
self
.
__set_modif_gpt
()
input_ids
=
self
.
tokenizer
.
encode
(
query
,
return_tensors
=
'tf'
)
VERBOSE
=
"nothing"
generated_text_samples
=
self
.
model
.
generate
(
input_ids
,
max_length
=
len
(
input_ids
[
0
])
+
30
,
num_return_sequences
=
1
,
no_repeat_ngram_size
=
0
,
repetition_penalty
=
1.0
,
top_p
=
1.0
,
temperature
=
1.0
,
do_sample
=
False
,
top_k
=
0
,
early_stopping
=
True
,
tokenizer
=
self
.
tokenizer
,
VERBOSE
=
VERBOSE
,
probaMode
=
self
.
probaMode
,
num_beams
=
1
,
force2nd
=
True
)
givenAnswer
=
generated_text_samples
[
0
]
return
givenAnswer
,
generated_text_samples
[
1
]
def
_generate
(
self
,
x
,
y
,
x_bert
,
y_bert
):
i_len
=
len
(
self
.
keywords
[
0
])
a_len
=
len
(
self
.
keywords
[
1
])
f_len
=
len
(
self
.
keywords
[
2
])
e_len
=
len
(
self
.
keywords
[
3
])
f_start
=
x
.
find
(
self
.
keywords
[
2
])
question
=
x
[
f_start
+
f_len
:
-
a_len
]
self
.
__set_modif_gpt
()
input_ids
=
self
.
tokenizer
.
encode
(
x
,
return_tensors
=
'tf'
)
VERBOSE
=
"nothing_but_score"
VERBOSE
=
"nothing"
generated_text_samples
=
self
.
model
.
generate
(
input_ids
,
max_length
=
len
(
input_ids
[
0
])
+
30
,
num_return_sequences
=
1
,
no_repeat_ngram_size
=
0
,
repetition_penalty
=
1.0
,
top_p
=
1.0
,
temperature
=
1.0
,
do_sample
=
False
,
top_k
=
0
,
early_stopping
=
True
,
tokenizer
=
self
.
tokenizer
,
VERBOSE
=
VERBOSE
,
probaMode
=
self
.
probaMode
,
num_beams
=
1
,
force2nd
=
True
)
givenAnswer
=
generated_text_samples
[
0
]
corrAnswer
=
y
[:
-
len
(
"<|endoftext|>"
)]
return
corrAnswer
,
givenAnswer
,
generated_text_samples
[
1
],
question
def
set_proba_mode
(
self
,
mode
):
print
(
"Set the proba for gpt2 to {}"
.
format
(
mode
))
if
mode
in
[
"mult"
,
"longOk"
]:
self
.
probaMode
=
mode
else
:
raise
NotImplementedError
(
"This probability mode is not yet implemented"
)
def
load_model
(
self
,
ModelName
):
self
.
model
=
TFGPT2LMHeadModel
.
from_pretrained
(
self
.
model_path
+
"/"
+
ModelName
,
from_pt
=
True
)
self
.
tokenizer
=
AutoTokenizer
.
from_pretrained
(
self
.
model_path
+
"/"
+
ModelName
)
class
BertModel
(
Model
):
def
__init__
(
self
,
home_path
,
model_path
=
"./models"
,
probaMode
=
"mult"
,
printStep
=
10
,
buckets
=
10
):
os
.
chdir
(
home_path
)
os
.
chdir
(
"./transformers/examples/"
)
os
.
chdir
(
"./question-answering"
)
finetune_path
=
os
.
getcwd
()
super
()
.
__init__
(
home_path
,
finetune_path
,
model_path
,
probaMode
,
printStep
,
buckets
)
# private functions
def
_train
(
self
,
nbEpochs
,
outModelName
,
startCheckpoint
,
dataEnd
,
tokenizerLocaction
):
cmd
=
"python run_qa.py
\
--train_file
\"
{}
\"
\
--do_train
\
--num_train_epochs
\"
{}
\"
\
--output_dir=
\"
{}
\"
\
--fp16
\
--save_steps -1
\
"
.
format
(
"train"
+
dataEnd
+
".json"
,
nbEpochs
,
self
.
model_path
+
"/"
+
outModelName
)
# --validation_file \"{}\" \
# --do_eval \
# "eval" + end + ".json"
if
startCheckpoint
is
not
None
:
if
startCheckpoint
not
in
[
"xlm-roberta-base"
,
"roberta-base"
]:
startCheckpoint
=
self
.
model_path
+
"/"
+
startCheckpoint
cmd
+=
" --model_name_or_path=
\"
{}
\"
"
.
format
(
startCheckpoint
)
return
cmd
def
_generate_demo
(
self
,
c
,
q
):
inputs
=
self
.
tokenizer
(
q
,
c
,
return_tensors
=
'pt'
)
# generate network output
outputs
=
self
.
model
(
**
inputs
)
start_scores
=
outputs
.
start_logits
end_scores
=
outputs
.
end_logits
#elif self.probaMode == "forceNon0":
# force b to be higher then a
# find start and end of the answer
a
=
torch
.
argmax
(
start_scores
)
a
=
int
(
a
)
b
=
torch
.
argmax
(
end_scores
[
0
,
a
:])
b
=
int
(
b
)
# get the probability of the answer
probs_a
=
tf
.
nn
.
softmax
(
start_scores
.
detach
())
probs_b
=
tf
.
nn
.
softmax
(
end_scores
[
0
,
a
:]
.
detach
())
prob_a
=
probs_a
[
0
,
a
]
prob_b
=
probs_b
[
b
]
prob_ab
=
prob_a
*
prob_b
b
=
a
+
b
givenAnswer
=
self
.
tokenizer
.
decode
(
inputs
[
'input_ids'
][
0
][
int
(
a
):
int
(
b
)
+
1
])
# due to the tokenizer the answer often starts with a blank space, which is not part of the answer
if
len
(
givenAnswer
)
>
0
and
givenAnswer
[
0
]
==
" "
:
givenAnswer
=
givenAnswer
[
1
:]
return
givenAnswer
,
float
(
prob_ab
)
def
_generate
(
self
,
x
,
y
,
x_bert
,
y_bert
):
tmp
=
x
.
split
(
"---"
)
text
=
tmp
[
1
]
question
=
tmp
[
0
]
# tokenize model input
inputs
=
self
.
tokenizer
(
question
,
text
,
return_tensors
=
'pt'
)
# generate network output
outputs
=
self
.
model
(
**
inputs
)
start_scores
=
outputs
.
start_logits
end_scores
=
outputs
.
end_logits
if
self
.
probaMode
==
"mult"
:
# find start and end of the answer
a
=
torch
.
argmax
(
start_scores
)
b
=
torch
.
argmax
(
end_scores
)
a
=
int
(
a
)
b
=
int
(
b
)
# get the probability of the answer
probs_a
=
tf
.
nn
.
softmax
(
start_scores
.
detach
())
probs_b
=
tf
.
nn
.
softmax
(
end_scores
.
detach
())
prob_a
=
probs_a
[
0
,
a
]
prob_b
=
probs_b
[
0
,
b
]
prob_ab
=
prob_a
*
prob_b
elif
self
.
probaMode
==
"forceNon0"
:
# force b to be higher then a
# find start and end of the answer
a
=
torch
.
argmax
(
start_scores
)
a
=
int
(
a
)
b
=
torch
.
argmax
(
end_scores
[
0
,
a
:])
b
=
int
(
b
)
# get the probability of the answer
probs_a
=
tf
.
nn
.
softmax
(
start_scores
.
detach
())
probs_b
=
tf
.
nn
.
softmax
(
end_scores
[
0
,
a
:]
.
detach
())
prob_a
=
probs_a
[
0
,
a
]
prob_b
=
probs_b
[
b
]
prob_ab
=
prob_a
*
prob_b
b
=
a
+
b
elif
self
.
probaMode
==
"maxNon0"
:
# search for the best non zero sequence
# find start and end of the answer
prob_ab
=
0
for
a_
in
range
(
len
(
start_scores
[
0
])):
b_
=
torch
.
argmax
(
end_scores
[
0
,
a_
:])
b_
=
int
(
b_
)
# get the probability of the answer
probs_a
=
tf
.
nn
.
softmax
(
start_scores
.
detach
())
probs_b
=
tf
.
nn
.
softmax
(
end_scores
[
0
,
a_
:]
.
detach
())
prob_a
=
probs_a
[
0
,
a_
]
prob_b
=
probs_b
[
b_
]
prob_ab_
=
prob_a
*
prob_b
if
prob_ab_
>
prob_ab
:
prob_ab
=
prob_ab_
a
=
a_
b
=
a_
+
b_
else
:
raise
NotImplementedError
(
"this probability mode is not implemented yet"
)
givenAnswer
=
self
.
tokenizer
.
decode
(
inputs
[
'input_ids'
][
0
][
int
(
a
):
int
(
b
)
+
1
])
# due to the tokenizer the answer often starts with a blank space, which is not part of the answer
if
len
(
givenAnswer
)
>
0
and
givenAnswer
[
0
]
==
" "
:
givenAnswer
=
givenAnswer
[
1
:]
return
y
,
givenAnswer
,
prob_ab
,
question
# public functions
def
set_proba_mode
(
self
,
mode
):
print
(
"Set the proba for bert to {}"
.
format
(
mode
))
if
mode
in
[
"mult"
,
"forceNon0"
,
"maxNon0"
]:
self
.
probaMode
=
mode
else
:
raise
NotImplementedError
(
"this probability mode is not implemented yet"
)
def
load_model
(
self
,
ModelName
):
path
=
os
.
path
.
join
(
self
.
model_path
,
ModelName
)
self
.
model
=
AutoModelForQuestionAnswering
.
from_pretrained
(
path
)
self
.
tokenizer
=
AutoTokenizer
.
from_pretrained
(
path
)
class
ComboModel
(
Model
):
def
__init__
(
self
,
home_path
,
model_path
=
"./models"
,
probaMode
=
"mult"
,
printStep
=
10
,
buckets
=
10
,
models
=
[],
ModelNames
=
[],
probaModes
=
[]):
os
.
chdir
(
home_path
)
os
.
chdir
(
"./transformers/examples/"
)
os
.
chdir
(
"./question-answering"
)
finetune_path
=
os
.
getcwd
()
self
.
ModelNames
=
ModelNames
self
.
probaModes
=
probaModes
self
.
models
=
models
super
()
.
__init__
(
home_path
,
finetune_path
,
model_path
,
probaMode
,
printStep
,
buckets
)
# private functions
def
_train
(
self
,
nbEpochs
,
outModelName
,
startCheckpoints
,
dataEnd
,
tokenizerLocaction
):
for
i
,
mm
in
enumerate
(
self
.
models
):
startCheckpoint
=
self
.
ModelNames
[
i
]
self
.
ModelNames
[
i
]
+=
"_adapt"
cmd
=
mm
.
_train
(
nbEpochs
,
self
.
ModelNames
[
i
],
startCheckpoint
,
dataEnd
,
tokenizerLocaction
)
print
(
cmd
)
os
.
chdir
(
mm
.
finetune_path
)
os
.
system
(
cmd
)
return
""
def
_generate_demo
(
self
,
c
,
q
):
pass
return
givenAnswer
,
float
(
prob_ab
)
def
_generate
(
self
,
x
,
y
,
x_bert
,
y_bert
):
givenAnswers
=
[]
confScores
=
[]
for
i
,
mm
in
enumerate
(
self
.
models
):
# print("Predict an answer for the model {}".format(i))
if
self
.
probaModes
[
i
]
==
"longOk"
:
# print("seems to be gpt2")
corrAnswer
,
givenAnswer
,
confScore
,
question
=
mm
.
_generate
(
x
,
y
,
None
,
None
)
elif
self
.
probaModes
[
i
]
==
"forceNon0"
:
# print("seems to be bert")
corrAnswer
,
givenAnswer
,
confScore
,
question
=
mm
.
_generate
(
x_bert
,
y_bert
,
None
,
None
)
givenAnswers
.
append
(
givenAnswer
)
confScores
.
append
(
confScore
)
# find maximum occuring answer
best_sol
=
max
(
set
(
givenAnswers
),
key
=
givenAnswers
.
count
)
occ
=
givenAnswers
.
count
(
best_sol
)
if
occ
<=
1
or
(
occ
==
2
and
givenAnswers
[
1
]
==
givenAnswers
[
2
]):
if
random
.
randint
(
0
,
9
)
<
6
:
ind
=
0
# choose gpt with 60% chance
else
:
ind
=
random
.
randint
(
1
,
len
(
givenAnswers
)
-
1
)
# choose Roberta or XLM-R with 20% chance each
confScore
=
0.6
*
confScores
[
ind
]
best_sol
=
givenAnswers
[
ind
]
elif
occ
==
2
:
confScore
=
0.8
for
i
,
a
in
enumerate
(
givenAnswers
):
if
a
==
best_sol
:
confScore
*=
confScores
[
i
]
else
:
confScore
=
1
for
i
,
a
in
enumerate
(
givenAnswers
):
confScore
*=
confScores
[
i
]
return
corrAnswer
,
best_sol
,
confScore
,
question
# public functions
def
set_proba_mode
(
self
,
mode
):
for
i
,
mm
in
enumerate
(
self
.
models
):
print
(
"Set proba mode {} for model {}, {}"
.
format
(
self
.
probaModes
[
i
],
i
,
mm
))
mm
.
set_proba_mode
(
self
.
probaModes
[
i
])
def
load_model
(
self
,
ModelName
):
for
i
,
mm
in
enumerate
(
self
.
models
):
print
(
"Load model {}"
.
format
(
i
))
mm
.
load_model
(
self
.
ModelNames
[
i
])
Event Timeline
Log In to Comment