Page Menu
Home
c4science
Search
Configure Global Search
Log In
Files
F61070846
interactive_merger.py
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Subscribers
None
File Metadata
Details
File Info
Storage
Attached
Created
Sat, May 4, 08:39
Size
29 KB
Mime Type
text/x-python
Expires
Mon, May 6, 08:39 (1 d, 23 h)
Engine
blob
Format
Raw Data
Handle
17455893
Attached To
rNIETZSCHEPYTHON nietzsche-python
interactive_merger.py
View Options
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
""" This script contains a interactive shell for merging faksimile positions and words.
"""
# Copyright (C) University of Basel 2019 {{{1
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/> 1}}}
import
abc
from
colorama
import
Fore
,
Style
from
deprecated
import
deprecated
from
functools
import
cmp_to_key
import
getopt
import
inspect
import
lxml.etree
as
ET
import
re
import
shutil
import
string
import
sys
import
tempfile
from
operator
import
attrgetter
import
os
from
os
import
listdir
,
sep
,
path
,
setpgrp
,
devnull
from
os.path
import
exists
,
isfile
,
isdir
,
dirname
,
basename
from
pathlib
import
Path
from
progress.bar
import
Bar
import
warnings
if
dirname
(
__file__
)
not
in
sys
.
path
:
sys
.
path
.
append
(
dirname
(
__file__
))
from
datatypes.faksimile
import
FaksimilePage
,
get_paths_inside_rect
from
datatypes.word_position
import
WordPosition
from
datatypes.faksimile_position
import
FaksimilePosition
from
datatypes.word
import
Word
from
datatypes.lineNumber
import
LineNumber
from
datatypes.page
import
Page
,
STATUS_MERGED_OK
from
datatypes.transkriptionField
import
TranskriptionField
from
join_faksimileAndTranskription
import
get_filelist_and_manuscript_file
,
sort_faksimile_positions
,
sort_words
from
process_files
import
update_svgposfile_status
from
process_words_post_merging
import
post_merging_processing_and_saving
from
util
import
ExternalViewer
,
create_highlighted_svg_file
,
get_empty_node_ids
,
record_changes
,
\
record_changes_on_svg_file_to_page
,
record_changes_on_xml_file_to_page
,
get_mismatching_ids
,
\
replace_chars
sys
.
path
.
append
(
'shared_util'
)
from
myxmlwriter
import
write_pretty
,
FILE_TYPE_SVG_WORD_POSITION
,
FILE_TYPE_XML_MANUSCRIPT
__author__
=
"Christian Steiner"
__maintainer__
=
__author__
__copyright__
=
'University of Basel'
__email__
=
"christian.steiner@unibas.ch"
__status__
=
"Development"
__license__
=
"GPL v3"
__version__
=
"0.0.1"
UNITTESTING
=
False
PUNCTUATION_PATTERN
=
r"[{}]"
.
format
(
string
.
punctuation
)
PUNCTUATION_EOW_PATTERN
=
r"\w+[{}]$"
.
format
(
'
\"
'
)
SINGLE_PUNCTUATION_PATTERN
=
r"^[{}–]$"
.
format
(
string
.
punctuation
)
SINGLE_WORD_PATTERN
=
r"^[\w{}]$"
.
format
(
string
.
punctuation
)
HIGHLIGHT_COLOR
=
'red'
OPACITY
=
'0.5'
MIN_THRESHOLD
=
2
DO_DEBUG
=
False
HISTORY_FILE
=
'.interactive_history'
class
LineComposer
(
metaclass
=
abc
.
ABCMeta
):
@abc.abstractmethod
def
get_lines_of_faksimile_positions
(
self
)
->
list
:
pass
@abc.abstractmethod
def
get_line
(
self
,
line_of_words
,
index
=-
1
,
offset
=
2
,
interactive
=
False
)
->
list
:
"""Return the line that corresponds to the line_of_words.
"""
pass
@abc.abstractmethod
def
create_faksimile_dictionary
(
self
,
line_of_faksimile_positions
,
mergeables_only
=
False
)
->
dict
:
"""Create a faksimile_dictionary with fp.text as key and a list of fp as value.
"""
pass
@abc.abstractmethod
def
merge_mergeables
(
self
,
line_of_words
,
faksimile_text_dictionary
,
new_words
):
"""Merge words with faksimile positions for which there are keys in in faksimile_text_dictionary.
"""
pass
@abc.abstractmethod
def
merge_unmergeables
(
self
,
line_of_words
,
line_of_faksimile_positions
,
new_words
):
"""Merge unmergeable words and faksimile_positions
"""
pass
@abc.abstractmethod
def
merge_word_with_fp
(
self
,
word
,
faksimile_position
,
list_of_new_words
,
index
=-
1
,
mark_for_verification
=
False
):
"""Merge word with faksimile position.
"""
pass
class
InteractiveMergerShell
:
"""A interactive shell for merging faksimile positions with words.
"""
def
__init__
(
self
,
line_composer
:
LineComposer
,
page
=
None
):
self
.
line_composer
=
line_composer
self
.
last_response
=
'0>0'
self
.
keep_alive
=
True
self
.
interactivity
=
True
self
.
old_word_new_word_mapping
=
{}
self
.
page
=
page
self
.
word_merge_dialog_list
=
[]
self
.
word_merge_response_function_dict
=
{}
self
.
command_history
=
[]
self
.
play_stack
=
[]
self
.
_add_function
(
'([0-9]+)*a=add new faksimile position to word with index [0-9]+ or to default word'
,
\
re
.
compile
(
r'(\d+)*a$'
),
self
.
_add_new_faksimile_position2word
,
self
.
word_merge_dialog_list
,
\
self
.
word_merge_response_function_dict
)
self
.
_add_function
(
'q(uit)*=quit interactive mode'
,
\
re
.
compile
(
r'q(uit)*'
),
self
.
_quit
,
self
.
word_merge_dialog_list
,
\
self
.
word_merge_response_function_dict
)
self
.
_add_function
(
'f(ind)*=find faksimile position that corresponds to first unmerged word'
,
\
re
.
compile
(
r'f(ind)*'
),
self
.
_find_faksimile_position
,
self
.
word_merge_dialog_list
,
\
self
.
word_merge_response_function_dict
)
self
.
_add_function
(
'([0-9]+,)*[0-9]j(>[0-9]+)*=join words (and merge with faksimile position [0-9]+)'
,
\
re
.
compile
(
r'(\d+,)*\d+j(>\d+)*$'
),
self
.
_join_words
,
self
.
word_merge_dialog_list
,
\
self
.
word_merge_response_function_dict
)
self
.
_add_function
(
'([0-9]+>[0-9])*=merge word [0-9]+ with faksimile position [0-9]+'
,
\
re
.
compile
(
r'\d+>\d+(,\d+)*$'
),
self
.
_merge_words
,
self
.
word_merge_dialog_list
,
\
self
.
word_merge_response_function_dict
)
self
.
_add_function
(
'h(elp)*=help'
,
re
.
compile
(
r'h(elp)*'
),
self
.
_help
,
self
.
word_merge_dialog_list
,
\
self
.
word_merge_response_function_dict
)
self
.
_add_function
(
'd(elete)*=help'
,
re
.
compile
(
r'd(elete)*'
),
self
.
_delete_word
,
self
.
word_merge_dialog_list
,
\
self
.
word_merge_response_function_dict
)
if
self
.
page
is
not
None
:
self
.
_add_function
(
's(ave)*=save changes to page'
,
re
.
compile
(
r's(ave)*'
),
self
.
_save
,
self
.
word_merge_dialog_list
,
\
self
.
word_merge_response_function_dict
)
def
_add_function
(
self
,
dialog_description
,
pattern
,
exec_function
,
dialog_list
,
response_function_dictionary
):
"""Add function to dialog list and response_function dictionary.
"""
dialog_list
.
append
(
dialog_description
)
response_function_dictionary
.
update
({
pattern
:
exec_function
})
def
_add_new_faksimile_position2word
(
self
,
response
,
unmerged_words
,
unmerged_fps
,
new_words
,
line_of_words
):
"""Add a new faksimile position to word
"""
word_index
=
int
(
re
.
split
(
r'a'
,
response
)[
0
])
\
if
re
.
split
(
r'a'
,
response
)[
0
]
!=
''
and
int
(
re
.
split
(
r'a'
,
response
)[
0
])
<
len
(
unmerged_words
)
\
else
0
new_fp_dict
=
{
'x'
:
0.0
,
'y'
:
0.0
,
'height'
:
0.0
,
'width'
:
0.0
,
'id'
:
'manualRect'
}
for
key
in
new_fp_dict
.
keys
():
add_fp_response
=
input
(
f
'{key}>'
)
if
key
==
'id'
and
add_fp_response
!=
''
:
new_fp_dict
[
key
]
=
add_fp_response
elif
re
.
match
(
r'\d+(\.\d+)*'
,
add_fp_response
):
new_fp_dict
[
key
]
=
float
(
add_fp_response
)
new_fp
=
FaksimilePosition
(
id
=
new_fp_dict
[
'id'
],
x
=
new_fp_dict
[
'x'
],
\
y
=
new_fp_dict
[
'y'
],
height
=
new_fp_dict
[
'height'
],
width
=
new_fp_dict
[
'width'
])
new_fp
.
text
=
unmerged_words
[
word_index
]
.
text
print
(
f
'Faksimile position added to word "{unmerged_words[word_index].text}": {new_fp}'
)
self
.
line_composer
.
merge_word_with_fp
(
unmerged_words
[
word_index
],
new_fp
,
new_words
)
def
_delete_word
(
self
,
response
,
unmerged_words
,
unmerged_fps
,
new_words
,
line_of_words
):
"""Delete current word.
"""
current_word
=
unmerged_words
[
0
]
self
.
page
.
words
.
remove
(
current_word
)
unmerged_words
.
remove
(
current_word
)
line_of_words
.
remove
(
current_word
)
print
(
f
'Word {current_word.id}: "{current_word.text}" deleted'
)
def
_help
(
self
,
response
,
unmerged_words
,
unmerged_fps
,
new_words
,
line_of_words
):
"""Print help dialog.
"""
for
dialog_description
in
self
.
word_merge_dialog_list
:
print
(
dialog_description
)
def
_join_words
(
self
,
response
,
unmerged_words
,
unmerged_fps
,
new_words
,
line_of_words
):
"""Join words.
"""
response_list
=
re
.
split
(
r'j'
,
response
)
indices
=
[
0
]
if
re
.
match
(
r'\d+,\d+'
,
response_list
[
0
]):
indices
=
[
int
(
i
)
for
i
in
re
.
split
(
r','
,
response_list
[
0
])]
else
:
indices
.
append
(
int
(
response_list
[
0
]))
word_index
=
indices
[
0
]
start_index
=
word_index
+
1
end_index
=
indices
[
1
]
+
1
old_words
=
[]
for
i
in
range
(
start_index
,
end_index
):
unmerged_words
[
word_index
]
.
join
(
unmerged_words
[
i
])
self
.
old_word_new_word_mapping
.
update
({
unmerged_words
[
i
]:
unmerged_words
[
word_index
]})
old_words
.
append
(
unmerged_words
[
i
])
print
(
f
'Word {unmerged_words[word_index].id} joined! New text: {unmerged_words[word_index].text}'
)
if
re
.
match
(
r'.*j>\d+'
,
response
):
fp_index
=
int
(
re
.
split
(
r'>'
,
response
)[
1
])
self
.
line_composer
.
merge_word_with_fp
(
unmerged_words
[
word_index
],
unmerged_fps
[
fp_index
],
new_words
)
for
word
in
old_words
:
word
.
joined
=
True
def
_merge_words
(
self
,
response
,
unmerged_words
,
unmerged_fps
,
new_words
,
line_of_words
):
"""Merge words.
"""
raw_word_fp_indices
=
[
i
for
i
in
re
.
split
(
r'>'
,
response
)
]
if
len
(
raw_word_fp_indices
)
>
1
\
and
re
.
match
(
r'^\d+$'
,
raw_word_fp_indices
[
0
]):
word_index
=
int
(
raw_word_fp_indices
[
0
])
indices
=
[
int
(
i
)
for
i
in
re
.
split
(
r','
,
raw_word_fp_indices
[
1
])
]
if
word_index
<
len
(
unmerged_words
)
and
indices
[
0
]
<
len
(
unmerged_fps
):
self
.
line_composer
.
merge_word_with_fp
(
unmerged_words
[
word_index
],
unmerged_fps
[
indices
[
0
]],
new_words
)
if
len
(
indices
)
==
2
:
for
i
in
range
(
indices
[
0
]
+
1
,
indices
[
1
]
+
1
):
if
i
<
len
(
unmerged_fps
):
unmerged_words
[
word_index
]
.
faksimile_positions
.
append
(
unmerged_fps
[
i
])
unmerged_fps
[
i
]
.
joined
=
True
for
old_word
,
new_word
in
self
.
old_word_new_word_mapping
.
items
():
old_word
.
joined
=
new_word
.
joined
def
_find_faksimile_position
(
self
,
response
,
unmerged_words
,
unmerged_fps
,
new_words
,
line_of_words
):
"""Find the equivalent of the first word of unmerged_words in all not joined
faksimile positions and merge them.
"""
results
=
[]
for
line
in
self
.
line_composer
.
get_lines_of_faksimile_positions
():
if
unmerged_words
[
0
]
.
text
in
[
fp
.
text
for
fp
in
line
if
not
fp
.
joined
]:
results
+=
[
fp
for
fp
in
line
if
fp
.
text
==
unmerged_words
[
0
]
.
text
]
print
(
f
'Results found for {unmerged_words[0].text}: {[fp.id for fp in results]}'
)
if
len
(
results
)
>
0
:
print
(
f
'Merging {unmerged_words[0].text} with {results[0].id}'
)
self
.
line_composer
.
merge_word_with_fp
(
unmerged_words
[
0
],
results
[
0
],
new_words
)
for
old_word
,
new_word
in
self
.
old_word_new_word_mapping
.
items
():
old_word
.
joined
=
new_word
.
joined
def
_quit
(
self
,
response
,
unmerged_words
,
unmerged_fps
,
new_words
,
line_of_words
):
"""Quit interactive mode.
"""
self
.
keep_alive
=
False
self
.
interactivity
=
False
def
_save
(
self
,
response
,
unmerged_words
,
unmerged_fps
,
new_words
,
line_of_words
):
"""Save all new_words to page and write it to xml file.
"""
print
(
f
'Saving changes to {self.page.page_tree.docinfo.URL} ...{len(new_words)}'
)
for
new_word
in
new_words
:
new_word
.
verified
=
True
old_words
=
[
word
for
word
in
self
.
page
.
words
if
word
.
id
==
new_word
.
id
]
if
len
(
old_words
)
>
0
:
old_word
=
min
(
old_words
)
index
=
self
.
page
.
words
.
index
(
old_word
)
self
.
page
.
words
[
index
]
=
new_word
else
:
print
(
f
'{new_word.id}: {new_word.text}: {new_word.faksimile_positions[0].text}'
)
#print(f'{index}: {old_word.text}->{new_word.text}')
for
old_word
in
self
.
old_word_new_word_mapping
.
keys
():
self
.
page
.
words
.
remove
(
old_word
)
self
.
page
.
unlock
()
self
.
page
.
update_and_attach_words2tree
()
if
not
UNITTESTING
:
write_pretty
(
xml_element_tree
=
self
.
page
.
page_tree
,
script_name
=
__file__
,
file_type
=
FILE_TYPE_SVG_WORD_POSITION
)
def
interactive_merge_lines
(
self
,
line_of_words
,
new_words
,
index
,
offset
)
->
bool
:
"""Interactively merge a line of words with the corresponding line of faksimile positions.
[:return:] interactive
"""
interactive_line
=
self
.
line_composer
.
get_line
(
line_of_words
,
index
,
offset
=
offset
,
interactive
=
True
)
default_line_index
=
int
(
len
(
interactive_line
)
/
2
)
current_line
=
[
l
for
i
,
l
in
interactive_line
][
default_line_index
]
if
len
(
interactive_line
)
>
1
:
default_inner_index
=
0
print
(
Fore
.
WHITE
+
f
'Current line of words: index: {index}: '
,
end
=
''
)
for
word
in
line_of_words
:
color
=
Fore
.
WHITE
\
if
word
.
joined
\
else
Fore
.
MAGENTA
print
(
color
+
f
'{word.text} '
,
end
=
''
)
print
(
Fore
.
RESET
)
for
line_index
,
line
in
enumerate
(
interactive_line
):
if
line_index
==
default_line_index
:
default_inner_index
=
line
[
0
]
print
(
'['
+
Fore
.
WHITE
+
f
'{line[0]}'
+
Fore
.
RESET
+
f
']: {[fp.text for fp in line[1]]}'
)
else
:
print
(
f
'[{line[0]}]: {[fp.text for fp in line[1]]}'
)
print
(
Fore
.
WHITE
+
'select index of corresponding faksimile line ...'
+
Fore
.
RESET
)
response
=
input
(
'[format:startIndex[,endIndex]|default='
+
Fore
.
WHITE
+
f
'{default_inner_index}'
+
Fore
.
RESET
\
+
'|n=next lines|p=previous lines|q=stop interactive mode]>'
)
\
if
len
(
self
.
play_stack
)
==
0
\
else
self
.
play_stack
.
pop
(
0
)
self
.
command_history
.
append
(
response
)
faksimile_line_index
=
-
1
if
response
.
startswith
(
'n'
):
new_index
=
max
([
i
for
i
,
l
in
interactive_line
])
+
offset
\
if
max
([
i
for
i
,
l
in
interactive_line
])
+
offset
<
len
(
self
.
line_composer
.
get_lines_of_faksimile_positions
())
\
else
max
([
i
for
i
,
l
in
interactive_line
])
return
self
.
interactive_merge_lines
(
line_of_words
,
new_words
,
new_index
,
offset
)
elif
response
.
startswith
(
'p'
):
new_index
=
min
([
i
for
i
,
l
in
interactive_line
])
-
offset
\
if
min
([
i
for
i
,
l
in
interactive_line
])
-
offset
>=
0
\
else
0
return
self
.
interactive_merge_lines
(
line_of_words
,
new_words
,
new_index
,
offset
)
elif
response
.
startswith
(
'q'
):
return
new_words
,
False
elif
response
!=
''
and
re
.
match
(
r'\d+(\+|(,\s*|\s)\d+)$'
,
response
):
indices
=
[
int
(
i
)
for
i
in
response
.
replace
(
','
,
' '
)
.
replace
(
' '
,
' '
)
.
split
(
' '
)]
\
if
not
re
.
match
(
r'\d+\+$'
,
response
)
\
else
[
int
(
response
.
replace
(
'+'
,
''
)),
int
(
response
.
replace
(
'+'
,
''
))
+
1
]
current_line
=
[]
for
current_index
in
range
(
indices
[
0
],
indices
[
1
]
+
1
):
current_lines
=
[
l
for
i
,
l
in
interactive_line
if
i
==
current_index
]
if
len
(
current_lines
)
>
0
:
current_line
+=
current_lines
[
0
]
print
(
Fore
.
MAGENTA
+
f
'{[fp.text for fp in current_line ]}'
+
Fore
.
RESET
)
elif
response
!=
''
and
re
.
match
(
r'\d+$'
,
response
):
faksimile_line_index
=
int
(
response
)
if
len
([
i
for
i
,
l
in
interactive_line
if
i
==
faksimile_line_index
])
>
0
:
current_line
=
[
l
for
i
,
l
in
interactive_line
if
i
==
faksimile_line_index
][
0
]
faksimile_text_dictionary
=
self
.
line_composer
.
create_faksimile_dictionary
(
current_line
,
mergeables_only
=
True
)
self
.
line_composer
.
merge_mergeables
(
line_of_words
,
faksimile_text_dictionary
,
new_words
)
self
.
interactive_merge_unmergeables
(
line_of_words
,
current_line
,
new_words
,
index
)
return
self
.
interactivity
def
interactive_merge_unmergeables
(
self
,
line_of_words
,
line_of_faksimile_positions
,
new_words
,
index
):
"""Merge unmergeable words and faksimile_positions interactively.
"""
unmerged_words
=
[
word
for
word
in
line_of_words
if
not
word
.
joined
and
not
word
.
mergeable
]
unmerged_fps
=
[
fp
for
fp
in
line_of_faksimile_positions
if
not
fp
.
joined
and
not
fp
.
mergeable
]
interactivity
=
True
if
len
(
unmerged_words
)
>
0
:
if
len
(
unmerged_words
)
==
1
and
len
(
unmerged_fps
)
==
1
:
self
.
line_composer
.
merge_unmergeables
(
line_of_words
,
line_of_faksimile_positions
,
new_words
)
new_word
=
None
current_word_index
=
0
current_fp_index
=
0
unmerged_words
=
[
word
for
word
in
unmerged_words
if
not
word
.
joined
and
not
word
.
mergeable
]
unmerged_fps
=
[
fp
for
fp
in
unmerged_fps
if
not
fp
.
joined
and
not
fp
.
mergeable
]
while
len
(
unmerged_words
)
>
0
and
self
.
keep_alive
:
if
len
(
unmerged_words
)
==
len
(
unmerged_fps
):
self
.
line_composer
.
merge_unmergeables
(
line_of_words
,
line_of_faksimile_positions
,
new_words
)
unmerged_words
=
[
word
for
word
in
unmerged_words
if
not
word
.
joined
and
not
word
.
mergeable
]
unmerged_fps
=
[
fp
for
fp
in
unmerged_fps
if
not
fp
.
joined
and
not
fp
.
mergeable
]
break
response
=
self
.
print_merge_unmergeable_dialog
(
line_of_words
,
line_of_faksimile_positions
,
\
unmerged_words
,
unmerged_fps
,
current_word_index
,
current_fp_index
)
\
if
len
(
self
.
play_stack
)
==
0
\
else
self
.
play_stack
.
pop
(
0
)
self
.
command_history
.
append
(
response
)
self
.
process_merge_unmergeable_response
(
response
,
unmerged_words
,
unmerged_fps
,
new_words
,
line_of_words
)
unmerged_words
=
[
word
for
word
in
unmerged_words
if
not
word
.
joined
and
not
word
.
mergeable
]
unmerged_fps
=
[
fp
for
fp
in
unmerged_fps
if
not
fp
.
joined
and
not
fp
.
mergeable
]
for
old_word
,
new_word
in
self
.
old_word_new_word_mapping
.
items
():
old_word
.
joined
=
new_word
.
joined
self
.
last_response
=
response
else
:
print
(
'Nothing to do'
)
for
old_word
,
new_word
in
self
.
old_word_new_word_mapping
.
items
():
old_word
.
joined
=
new_word
.
joined
def
print_command_history
(
self
):
"""Print command history.
"""
history_string
=
'#'
.
join
(
self
.
command_history
)
print
(
history_string
)
with
open
(
HISTORY_FILE
,
'a+'
)
as
f
:
f
.
write
(
f
'{history_string}
\n
'
)
def
print_merge_unmergeable_dialog
(
self
,
line_of_words
,
line_of_faksimile_positions
,
unmerged_words
,
unmerged_fps
,
current_word_index
,
current_fp_index
)
->
str
:
"""Print head dialog of merge_unmergeable.
[:return:] user response
"""
print
(
'Word line:'
,
[
word
.
text
for
word
in
line_of_words
])
print
(
'Faksimile:'
,
[
fp
.
text
for
fp
in
line_of_faksimile_positions
])
print
(
Fore
.
WHITE
+
f
'Select word/faksimile position from line {unmerged_words[0].line_number}:'
+
Fore
.
RESET
)
print
(
'words: '
,
end
=
''
)
for
index
,
word
in
enumerate
(
unmerged_words
):
current_color
=
Fore
.
MAGENTA
\
if
index
==
current_word_index
\
else
Fore
.
WHITE
print
(
current_color
+
f
'[{index}]: {word.text}'
,
end
=
''
)
print
(
Fore
.
RESET
)
print
(
'faksimile positions: '
,
end
=
''
)
for
index
,
fp
in
enumerate
(
unmerged_fps
):
current_color
=
Fore
.
MAGENTA
\
if
index
==
current_fp_index
\
else
Fore
.
WHITE
print
(
current_color
+
f
'[{index}]: {fp.text}'
,
end
=
''
)
print
(
Fore
.
RESET
)
print
(
'Choose word(s) and faksimile position(s):'
)
response
=
input
(
'default='
+
Fore
.
WHITE
+
f
'{self.last_response}'
+
Fore
.
RESET
+
'|h=help>'
)
if
response
==
''
:
return
self
.
last_response
return
response
def
process_merge_unmergeable_response
(
self
,
response
,
unmerged_words
,
unmerged_fps
,
new_words
,
line_of_words
):
"""Process user response and merge unmergeables.
"""
for
pattern
in
self
.
word_merge_response_function_dict
.
keys
():
if
re
.
match
(
pattern
,
response
):
exec_function
=
self
.
word_merge_response_function_dict
[
pattern
]
exec_function
(
response
,
unmerged_words
,
unmerged_fps
,
new_words
,
line_of_words
)
break
def
set_command_history
(
self
):
"""Set command history.
"""
command_history_string
=
input
(
'Start the interactive shell with command history ...>'
)
if
command_history_string
!=
''
:
self
.
play_stack
=
command_history_string
.
split
(
'#'
)
class
ManualMergerShell
:
"""This class provides an interactive shell for merging words and faksimile positions manually.
"""
def
__init__
(
self
,
unmerged_words
,
unmerged_fps
,
new_words
,
page
=
None
):
self
.
command_history
=
[]
self
.
keep_alive
=
True
self
.
new_words
=
new_words
self
.
old_words
=
[]
self
.
page
=
page
self
.
play_stack
=
[]
self
.
prompt4faksimile
=
False
self
.
response_function_dictionary
=
{}
self
.
unmerged_fps
=
unmerged_fps
self
.
unmerged_words
=
unmerged_words
self
.
word
=
None
self
.
response_function_dictionary
.
update
({
re
.
compile
(
r'^(\d+(\s)*)+$'
):
self
.
_join_word_with_ids
})
self
.
response_function_dictionary
.
update
({
re
.
compile
(
r'(^\(\d+,\d+,.*\)\s(\d+(\s)*)+$|^(\d+(\s)*)+\(\d+,\d+,.*\)$)'
):
self
.
_join_word_with_other_word
})
self
.
response_function_dictionary
.
update
({
re
.
compile
(
r'(rect\d+(\s)*)+'
):
self
.
_merge_word_with_faksimile_positions
})
self
.
response_function_dictionary
.
update
({
re
.
compile
(
r'^q(uit)*$'
):
self
.
_quit
})
def
_join_word_with_ids
(
self
,
response
):
"""Join word with ids from response.
"""
print
(
response
)
word_ids
=
[
int
(
word_id
)
for
word_id
in
response
.
split
(
' '
)
if
word_id
!=
''
]
if
len
(
word_ids
)
>
0
:
self
.
word
=
min
([
word
for
word
in
self
.
unmerged_words
if
word
.
id
==
word_ids
[
0
]])
if
len
(
word_ids
)
>
1
:
for
word_id
in
word_ids
[
1
:]:
next_word
=
min
([
word
for
word
in
self
.
unmerged_words
if
word
.
id
==
word_id
])
self
.
word
.
join
(
next_word
)
self
.
old_words
.
append
(
next_word
)
def
_join_word_with_other_word
(
self
,
response
):
"""Join word with other word from response.
"""
word_info_string
=
re
.
split
(
r'\).*'
,
re
.
split
(
r'.*\('
,
response
)[
1
])[
0
]
word_id
=
int
(
word_info_string
.
split
(
','
)[
0
])
word_line_number
=
int
(
word_info_string
.
split
(
','
)[
1
])
word_text
=
word_info_string
.
split
(
','
)[
2
]
test_dictionary
=
{
'id'
:
word_id
,
'line_number'
:
word_line_number
}
word_ids
=
[
int
(
word_id
)
for
word_id
in
response
.
split
(
' '
)
if
re
.
match
(
r'^\d+$'
,
word_id
)
]
result_words
=
[
word
for
word
in
self
.
new_words
\
if
test_word_vs_dict
(
word
,
test_dictionary
)
\
and
word
.
text
==
word_text
]
if
len
(
result_words
)
>
1
:
print
(
f
'Found {len(result_words)}: {[(word.id,word.line_number, word.text) for word in result_words]}'
)
elif
len
(
result_words
)
==
1
:
self
.
new_words
.
remove
(
result_words
[
0
])
for
fp
in
result_words
[
0
]
.
faksimile_positions
:
fp
.
joined
=
False
self
.
unmerged_fps
.
append
(
fp
)
result_words
[
0
]
.
faksimile_positions
.
remove
(
fp
)
if
response
.
startswith
(
'('
+
word_info_string
):
self
.
word
=
result_words
[
0
]
for
word_id
in
word_ids
:
next_word
=
min
([
word
for
word
in
self
.
unmerged_words
if
word
.
id
==
word_id
])
self
.
word
.
join
(
next_word
)
self
.
old_words
.
append
(
next_word
)
else
:
self
.
word
=
min
([
word
for
word
in
self
.
unmerged_words
if
word
.
id
==
word_ids
[
0
]])
if
len
(
word_ids
)
>
1
:
for
word_id
in
word_ids
[
1
:]:
next_word
=
min
([
word
for
word
in
self
.
unmerged_words
if
word
.
id
==
word_id
])
self
.
word
.
join
(
next_word
)
self
.
old_words
.
append
(
next_word
)
self
.
old_words
.
append
(
result_words
[
0
])
self
.
word
.
join
(
result_words
[
0
])
def
_merge_extra_ids
(
self
,
extra_ids
,
faksimile_ids
):
"""Merge extra faksimile ids.
"""
for
extra_id
in
extra_ids
:
words
=
[
word
for
word
in
self
.
new_words
\
if
len
(
word
.
faksimile_positions
)
>
0
\
and
len
([
fp
for
fp
in
word
.
faksimile_positions
if
fp
.
id
==
extra_id
])
>
0
]
if
len
(
words
)
>
0
:
word
=
words
[
0
]
fp
=
min
([
fp
for
fp
in
word
.
faksimile_positions
if
fp
.
id
==
extra_id
])
self
.
unmerged_fps
.
append
(
fp
)
faksimile_ids
.
append
(
fp
.
id
)
word
.
faksimile_positions
.
remove
(
fp
)
if
len
(
word
.
faksimile_positions
)
==
0
:
word
.
joined
=
False
self
.
new_words
.
remove
(
word
)
self
.
unmerged_words
.
append
(
word
)
def
_merge_word_with_faksimile_positions
(
self
,
response
):
"""Merge word with faksimile positions with id specified by response.
"""
faksimile_ids
=
[
fp_id
for
fp_id
in
response
.
split
(
' '
)
if
fp_id
!=
''
and
fp_id
in
[
fp
.
id
for
fp
in
self
.
unmerged_fps
]]
extra_ids
=
[
fp_id
for
fp_id
in
response
.
split
(
' '
)
if
fp_id
!=
''
and
fp_id
not
in
faksimile_ids
]
if
len
(
extra_ids
)
>
0
:
self
.
_merge_extra_ids
(
extra_ids
,
faksimile_ids
)
if
len
(
faksimile_ids
)
>
0
:
for
fp_id
in
faksimile_ids
:
fp
=
min
([
fp
for
fp
in
self
.
unmerged_fps
if
fp
.
id
==
fp_id
])
self
.
word
.
faksimile_positions
.
append
(
fp
)
fp
.
joined
,
self
.
word
.
joined
=
True
,
True
print
(
f
'{self.word.id}: {self.word.text} merged with {fp.id}'
)
if
self
.
word
.
joined
:
self
.
new_words
.
append
(
self
.
word
)
self
.
word
=
None
def
_process_response
(
self
,
response
):
"""Process response.
"""
for
pattern
in
self
.
response_function_dictionary
.
keys
():
if
re
.
match
(
pattern
,
response
):
exec_function
=
self
.
response_function_dictionary
[
pattern
]
exec_function
(
response
)
self
.
command_history
.
append
(
response
)
break
def
_quit
(
self
,
response
):
"""Quit manual merger.
"""
self
.
keep_alive
=
False
def
run
(
self
)
->
int
:
"""Manually merge words and faksimile positions.
[:return:] exit code
"""
response
=
input
(
f
'Manually merge {len(self.unmerged_words)} words with {len(self.unmerged_fps)} faksimile positions? Y/n/history>'
)
if
response
!=
'n'
:
if
response
!=
'Y'
and
'#'
in
response
:
self
.
play_stack
=
response
.
split
(
'#'
)
while
len
(
self
.
unmerged_words
+
self
.
unmerged_fps
)
>
0
and
self
.
keep_alive
:
print
(
f
'Not merged words : {[(word.id,word.line_number,word.text) for word in self.unmerged_words if not word.joined]}'
)
print
(
f
'Not merged faksimile: {[(fp.id,fp.text) for fp in self.unmerged_fps if not fp.joined]}'
)
input_text
=
'Input word id(s)>'
if
self
.
word
is
not
None
:
print
(
f
'Current word to merge with faksimile position: {self.word.id} {self.word.line_number} "{self.word.text}"'
)
input_text
=
'Input faksimile id(s)>'
response
=
input
(
input_text
)
\
if
len
(
self
.
play_stack
)
==
0
\
else
self
.
play_stack
.
pop
(
0
)
if
'#'
in
response
:
self
.
play_stack
+=
response
.
split
(
'#'
)
self
.
_process_response
(
response
)
for
word
in
self
.
old_words
:
word
.
joined
=
True
self
.
unmerged_words
=
[
word
for
word
in
self
.
unmerged_words
if
not
word
.
joined
]
self
.
unmerged_fps
=
[
fp
for
fp
in
self
.
unmerged_fps
if
not
fp
.
joined
]
if
self
.
page
is
not
None
and
len
(
self
.
unmerged_words
+
self
.
unmerged_fps
)
==
0
:
self
.
page
.
words
=
self
.
new_words
self
.
print_history
()
return
len
(
self
.
unmerged_words
+
self
.
unmerged_fps
)
return
-
1
def
print_history
(
self
):
"""Print command history.
"""
history_string
=
'#'
.
join
(
self
.
command_history
)
print
(
history_string
)
with
open
(
HISTORY_FILE
,
'a+'
)
as
f
:
f
.
write
(
f
'{history_string}
\n
'
)
def
test_word_vs_dict
(
word
,
dictionary
)
->
bool
:
"""Return true if for each key of dictionary word has same value in __dict__ as dictionary.
"""
for
key
in
dictionary
.
keys
():
if
dictionary
[
key
]
>
-
1
and
word
.
__dict__
[
key
]
!=
dictionary
[
key
]:
return
False
return
True
Event Timeline
Log In to Comment