Page MenuHomec4science

process_words_post_merging.py
No OneTemporary

File Metadata

Created
Wed, May 1, 09:20

process_words_post_merging.py

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
""" This program can be used to process words after they have been merged with faksimile data.
"""
# Copyright (C) University of Basel 2019 {{{1
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/> 1}}}
from colorama import Fore, Style
from deprecated import deprecated
from functools import cmp_to_key
import getopt
import inspect
import lxml.etree as ET
from operator import attrgetter
import os
from os import listdir, sep, path, setpgrp, devnull
from os.path import exists, isfile, isdir, dirname, basename
from pathlib import Path as PathlibPath
from progress.bar import Bar
import re
import shutil
import string
from svgpathtools import svg2paths2, svg_to_paths
from svgpathtools.path import Path as SVGPath
from svgpathtools.path import Line
import sys
import tempfile
import warnings
if dirname(__file__) not in sys.path:
sys.path.append(dirname(__file__))
from datatypes.box import Box
from datatypes.archival_manuscript import ArchivalManuscriptUnity
from datatypes.mark_foreign_hands import MarkForeignHands
from datatypes.page import Page, STATUS_MERGED_OK, STATUS_POSTMERGED_OK
from datatypes.path import Path
from datatypes.text_connection_mark import TextConnectionMark
from datatypes.transkriptionField import TranskriptionField
from datatypes.word import Word, do_paths_intersect_saveMode, update_transkription_position_ids
from extract_line_continuation import extract_line_continuations
from util import back_up, process_warnings4status
from process_files import update_svgposfile_status
from process_footnotes import categorize_footnotes
sys.path.append('shared_util')
from myxmlwriter import write_pretty, xml_has_type, FILE_TYPE_SVG_WORD_POSITION, FILE_TYPE_XML_MANUSCRIPT
from main_util import extract_paths_on_tf
__author__ = "Christian Steiner"
__maintainer__ = __author__
__copyright__ = 'University of Basel'
__email__ = "christian.steiner@unibas.ch"
__status__ = "Development"
__license__ = "GPL v3"
__version__ = "0.0.1"
UNITTESTING = False
DEBUG_WORD = None
MERGED_DIR = 'merged'
WARNING_FOOTNOTES_ERROR = 'footnotes not processed'
WARNING_LINE_CONTINUATION = 'line continuation fail'
def categorize_paths(page, transkription_field=None):
"""Categorize all paths that are part of the transkription field.
:return: a dictionary containig a list for each category of path.
"""
if page.source is not None and isfile(page.source):
MAX_HEIGHT_LINES = 1
max_line = sorted(\
[line_number.bottom-line_number.top for line_number in page.line_numbers if line_number.id % 2 == 0],\
reverse=True)[0] + 2 if len(page.line_numbers) > 0 else 17
tr_xmin = 0.0
tr_ymin = 0.0
if (page.svg_image is None or page.svg_image.text_field is None)\
and transkription_field is not None:
tr_xmin = transkription_field.xmin
tr_ymin = transkription_field.ymin
paths, attributes = svg_to_paths.svg2paths(page.source)
allpaths_outside_tf = []
attributes_outside_tf = []
if transkription_field is None:
transkription_field = TranskriptionField(page.source, multipage_index=page.multipage_index)
allpaths_on_tf = extract_paths_on_tf(page, outsiders=allpaths_outside_tf, outsider_attributes=attributes_outside_tf, transkription_field=transkription_field)
path_dict = { 'text_area_deletion_paths': [],\
'deletion_or_underline_paths': [],\
'box_paths': [],\
'dots_paths': [],\
'word_connector_paths': [],\
'uncategorized_paths': [] }
for mypath in allpaths_on_tf:
xmin, xmax, ymin, ymax = mypath.path.bbox()
start_line_number = page.get_line_number(mypath.path.start.imag-tr_ymin)
if abs(xmax-xmin) < 1 and abs(ymax-ymin) < 1:
path_dict.get('dots_paths').append(mypath)
elif abs(ymax-ymin) > MAX_HEIGHT_LINES and abs(ymax-ymin) < max_line and mypath.path.iscontinuous() and mypath.path.isclosed():
path_dict.get('box_paths').append(mypath)
elif abs(ymax-ymin) > MAX_HEIGHT_LINES and abs(ymax-ymin) > max_line and mypath.path.iscontinuous() and not mypath.path.isclosed():
path_dict.get('word_connector_paths').append(mypath)
elif abs(ymax-ymin) < MAX_HEIGHT_LINES:
mypath.start_line_number = start_line_number
path_dict.get('deletion_or_underline_paths').append(mypath)
elif start_line_number != -1 and start_line_number != page.get_line_number(mypath.path.end.imag-tr_ymin):
# Check for "ladder", i.e. a path with 3 segments (seg0 is horizontal on line x, seg1 moves to line x+1, seg2 is horizontal on line x+1)
if start_line_number + 1 == page.get_line_number(mypath.path.end.imag-tr_ymin)\
and len(mypath.path._segments) == 3\
and abs(mypath.path._segments[0].bbox()[3]-mypath.path._segments[0].bbox()[2]) < MAX_HEIGHT_LINES\
and abs(mypath.path._segments[2].bbox()[3]-mypath.path._segments[2].bbox()[2]) < MAX_HEIGHT_LINES:
for index in 0, 2:
new_path = Path(parent_path=mypath, path=SVGPath(mypath.path._segments[index]))
new_path.start_line_number = page.get_line_number(new_path.path.start.imag-tr_ymin)
path_dict.get('deletion_or_underline_paths').append(new_path)
else:
path_dict.get('text_area_deletion_paths').append(mypath)
else:
path_dict.get('uncategorized_paths').append(mypath)
underline_path = mark_words_intersecting_with_paths_as_deleted(page, path_dict.get('deletion_or_underline_paths'), tr_xmin, tr_ymin)
path_dict.update({'underline_path': underline_path})
path_dict['uncategorized_paths'] += process_word_boxes(page, path_dict.get('box_paths'), transkription_field,\
paths=allpaths_outside_tf, attributes=attributes_outside_tf, max_line=max_line)
return path_dict
elif not UNITTESTING:
error_msg = 'Svg source file {} does not exist!'.format(page.source)\
if page.source is not None else 'Page does not contain a source file!'
raise FileNotFoundError(error_msg)
return {}
def copy_page_to_merged_directory(page, manuscript_file=None):
"""Copy page to directory that contains the first version of all svg_pos_files that have been
merged with the faksimile position data. MERGED_DIR is a subfolder of svg_pos_files-directory.
"""
svg_pos_file = PathlibPath(page.page_tree.docinfo.URL)
target_dir = svg_pos_file.parent / MERGED_DIR
if not target_dir.is_dir():
target_dir.mkdir()
target_pos_file = target_dir / svg_pos_file.name
save_page(page, str(svg_pos_file), target_svg_pos_file=str(target_pos_file), status=STATUS_MERGED_OK, manuscript_file=manuscript_file)
def find_special_words(page, transkription_field=None):
"""Find special words, remove them from words, process their content.
"""
if page.source is None or not isfile(page.source):
raise FileNotFoundError('Page does not have a source!')
if transkription_field is None:
transkription_field = TranskriptionField(page.source, multipage_index=page.multipage_index)
set_to_text_field_zero = (page.svg_image is None or page.svg_image.text_field is None)
special_char_list = MarkForeignHands.get_special_char_list()
special_char_list += TextConnectionMark.get_special_char_list()
single_char_words = [ word for word in page.words if len(word.text) == 1 and word.text in special_char_list ]
if not UNITTESTING:
bar = Bar('find special words', max=len(single_char_words))
for word in single_char_words:
not bool(UNITTESTING) and bar.next()
if word.text == MarkForeignHands.CLASS_MARK:
id = len(page.mark_foreign_hands)
page.mark_foreign_hands.append(MarkForeignHands.create_cls_from_word(word, id=id))
page.words.remove(word)
elif word.text in TextConnectionMark.SPECIAL_CHAR_LIST[0]\
or (word.text in TextConnectionMark.SPECIAL_CHAR_LIST\
and any(style in page.sonderzeichen_list for style\
in word.transkription_positions[0].positional_word_parts[0].style_class.split(' '))):
id = len(page.text_connection_marks)
page.text_connection_marks.append(TextConnectionMark.create_cls_from_word(word, id=id))
page.words.remove(word)
not bool(UNITTESTING) and bar.finish()
svg_tree = ET.parse(page.source)
page.update_page_type(transkription_field=transkription_field)
page.update_line_number_area(transkription_field, svg_tree=svg_tree, set_to_text_field_zero=set_to_text_field_zero)
if page.marginals_source is not None:
svg_tree = ET.parse(page.marginals_source)
italic_classes = [ key for key in page.style_dict\
if bool(page.style_dict[key].get('font-family')) and page.style_dict[key]['font-family'].endswith('Italic') ]
if len(page.mark_foreign_hands) > 0:
MarkForeignHands.find_content(page.mark_foreign_hands, transkription_field, svg_tree, italic_classes=italic_classes,\
SonderzeichenList=page.sonderzeichen_list, set_to_text_field_zero=set_to_text_field_zero)
if len(page.text_connection_marks) > 0:
TextConnectionMark.find_content_in_footnotes(page, transkription_field, svg_tree)
def mark_words_intersecting_with_paths_as_deleted(page, deletion_paths, tr_xmin=0.0, tr_ymin=0.0):
"""Marks all words that intersect with deletion paths as deleted
and adds these paths to word_deletion_paths.
[:return:] list of .path.Path that might be word_underline_paths
"""
if not UNITTESTING:
bar = Bar('mark words that intersect with deletion paths', max=len(page.words))
for word in page.words:
not bool(UNITTESTING) and bar.next()
word = mark_word_if_it_intersects_with_paths_as_deleted(word, page, deletion_paths, tr_xmin=tr_xmin, tr_ymin=tr_ymin)
for part_word in word.word_parts:
part_word = mark_word_if_it_intersects_with_paths_as_deleted(part_word, page, deletion_paths, tr_xmin=tr_xmin, tr_ymin=tr_ymin)
word.partition_according_to_deletion()
not bool(UNITTESTING) and bar.finish()
# return those paths in deletion_paths that are not in page.word_deletion_paths
return [ word_underline_path for word_underline_path in set(deletion_paths) - set(page.word_deletion_paths) ]
def mark_word_if_it_intersects_with_paths_as_deleted(word, page, deletion_paths, tr_xmin=0.0, tr_ymin=0.0):
"""Marks word if it intersects with deletion paths as deleted
and adds these paths to word_deletion_paths.
[:return:] word
"""
word.deleted = False
for transkription_position in word.transkription_positions:
word_path = Path.create_path_from_transkription_position(transkription_position,\
tr_xmin=tr_xmin, tr_ymin=tr_ymin)
intersecting_paths = [ deletion_path for deletion_path in deletion_paths\
if do_paths_intersect_saveMode(deletion_path, word_path) ]
if DEBUG_WORD is not None and word.text == DEBUG_WORD.text and word.line_number == DEBUG_WORD.line_number:
relevant_paths = [ path for path in deletion_paths if path.start_line_number == DEBUG_WORD.line_number ]
#print(word.line_number, word_path.path.bbox(), [ path.path.bbox() for path in relevant_paths])
if len(intersecting_paths) > 0:
#print(f'{word.line_number}: {word.id}, {word.text}: {intersecting_paths}')
transkription_position.deleted = True
transkription_position._deletion_paths += intersecting_paths
for deletion_path in intersecting_paths:
if deletion_path.parent_path is not None:
deletion_path = deletion_path.parent_path
if deletion_path not in page.word_deletion_paths:
deletion_path.tag = Path.WORD_DELETION_PATH_TAG
deletion_path.attach_object_to_tree(page.page_tree)
page.word_deletion_paths.append(deletion_path)
return word
def post_merging_processing_and_saving(svg_pos_file=None, new_words=None, page=None, manuscript_file=None, target_svg_pos_file=None):
"""Process words after merging with faksimile word positions.
"""
if page is None and svg_pos_file is None:
raise Exception('ERROR: post_merging_processing_and_saving needs either a Page or a svg_pos_file!')
if page is None:
page = Page(svg_pos_file)
if page.source is None or not isfile(page.source):
raise FileNotFoundError('Page instantiated from {} does not contain an existing source!'.format(svg_pos_file))
if svg_pos_file is None:
svg_pos_file = page.page_tree.docinfo.URL
if new_words is not None:
page.words = sorted(new_words, key=attrgetter('id'))
for word_node in page.page_tree.xpath('.//word'):
word_node.getparent().remove(word_node)
manuscript = ArchivalManuscriptUnity.create_cls(manuscript_file)\
if manuscript_file is not None\
else None
copy_page_to_merged_directory(page, manuscript_file=manuscript_file)
transkription_field = TranskriptionField(page.source, multipage_index=page.multipage_index)
update_faksimile_line_positions(page)
status = STATUS_MERGED_OK
page.update_styles(manuscript=manuscript, partition_according_to_styles=True)
categorize_paths(page, transkription_field=transkription_field)
with warnings.catch_warnings(record=True) as w:
warnings.simplefilter('default')
try:
find_special_words(page, transkription_field=transkription_field)
categorize_footnotes(page)
extract_line_continuations(page, warning_message=WARNING_LINE_CONTINUATION)
except Exception:
warnings.warn(WARNING_FOOTNOTES_ERROR)
status = process_warnings4status(w, [ WARNING_FOOTNOTES_ERROR, WARNING_LINE_CONTINUATION ], status, STATUS_POSTMERGED_OK)
save_page(page, svg_pos_file, target_svg_pos_file=target_svg_pos_file, status=status, manuscript_file=manuscript_file)
def process_word_boxes(page, box_paths, transkription_field, paths=None, attributes=None, max_line=17) -> list:
"""Process word boxes: partition words according to word boxes.
[:return:] a list of paths that are not boxes
"""
MAX_HEIGHT_LINES = 1
not_boxes = []
if not UNITTESTING:
bar = Bar('process word boxes', max=len(page.words))
svg_tree = ET.parse(page.source)
namespaces = { k if k is not None else 'ns': v for k, v in svg_tree.getroot().nsmap.items() }
allpaths_on_margin_field = []
tr_xmin = 0 if page.svg_image is not None and page.svg_image.text_field is not None\
else transkription_field.xmin
tr_ymin = 0 if page.svg_image is not None and page.svg_image.text_field is not None\
else transkription_field.ymin
if paths is None or attributes is None:
paths = []
raw_paths, attributes = svg_to_paths.svg2paths(page.source)
for index, raw_path in enumerate(raw_paths):
paths.append(Path.create_cls(id=index, path=raw_path, style_class=attributes[index].get('class'), page=page))
for index, mypath in enumerate(paths):
path = mypath.path
xmin, xmax, ymin, ymax = path.bbox()
attribute = attributes[index]
if len(path) > 0\
and path != transkription_field.path\
and ((path.bbox()[1] < transkription_field.xmin and transkription_field.is_page_verso())\
or (path.bbox()[0] > transkription_field.xmax and not transkription_field.is_page_verso()))\
and abs(ymax-ymin) < max_line:
allpaths_on_margin_field.append(mypath)#Path.create_cls(id=index, path=path, style_class=attribute.get('class'), page=page))
box_line_number_dict = {}
for box_path in sorted(box_paths, key=lambda path: path.get_median_y()):
line_number = page.get_line_number(box_path.get_median_y(tr_ymin=tr_ymin))
if line_number > 0:
if line_number not in box_line_number_dict.keys():
box_line_number_dict.update({ line_number: [ box_path ]})
else:
box_line_number_dict.get(line_number).append(box_path)
boxes = []
for line_number in box_line_number_dict.keys():
box_paths_on_line = sorted(box_line_number_dict[line_number], key=lambda path: path.get_x())
margin_boxes_on_line = sorted([ margin_box for margin_box in allpaths_on_margin_field\
if page.get_line_number(margin_box.get_median_y(tr_ymin=tr_ymin)) == line_number ],\
key=lambda path: path.get_x())
threshold = 3 if line_number % 2 == 0 else 1.5
if len(margin_boxes_on_line) > 0:
for box_path in box_paths_on_line:
#print(line_number, box_path.path.d(), len(margin_boxes_on_line))
box = Box.create_box(box_path, margin_boxes_on_line, svg_tree=svg_tree,\
namespaces=namespaces, threshold=threshold)
if box is not None:
boxes.append(box)
else:
not_boxes += box_paths_on_line
if len(boxes) > 0:
for word in page.words:
word.process_boxes(boxes, tr_xmin=tr_xmin, tr_ymin=tr_ymin)
word.create_correction_history(page)
if not bool(UNITTESTING):
bar.next()
elif word.earlier_version is not None:
#print(f'{word.text} -> {word.earlier_version.text}')
if word.earlier_version.earlier_version is not None:
print(f'{word.earlier_version.earlier_version.text}')
not bool(UNITTESTING) and bar.finish()
return not_boxes
def reset_page(page):
"""Reset all words that have word_parts in order to run the script a second time.
"""
svg_pos_file = PathlibPath(page.page_tree.docinfo.URL)
first_merge_version = svg_pos_file.parent / MERGED_DIR / svg_pos_file.name
if first_merge_version.exists():
page = Page(str(first_merge_version))
else:
word_with_wordparts = [ word for word in page.words if len(word.word_parts) > 0 ]
word_with_wordparts += [ word for word in page.words if word.earlier_version is not None ]
page_changed = False
if len(word_with_wordparts) > 0:
for word in word_with_wordparts:
word.undo_partitioning()
update_transkription_position_ids(word)
page_changed = True
no_line_numbers = [ word for word in page.words if word.line_number == -1 ]
if len(no_line_numbers) > 0:
for word in no_line_numbers:
if len(word.transkription_positions) > 0:
word.line_number = page.get_line_number((word.transkription_positions[0].top+word.transkription_positions[0].bottom)/2)
else:
msg = f'Word {word.id} {word.text} has no transkription_position!'
warnings.warn(msg)
page_changed = True
if page_changed:
page.update_and_attach_words2tree()
def save_page(page, svg_pos_file, target_svg_pos_file=None, status=None, manuscript_file=None):
"""Save page to target_file and update status of file.
"""
page.update_and_attach_words2tree()
if not UNITTESTING:
if target_svg_pos_file is None:
target_svg_pos_file = svg_pos_file
if status is not None:
update_svgposfile_status(svg_pos_file, manuscript_file=manuscript_file, status=status)
write_pretty(xml_element_tree=page.page_tree, file_name=target_svg_pos_file, script_name=__file__, file_type=FILE_TYPE_SVG_WORD_POSITION)
def update_faksimile_line_positions(page):
"""Update faksimile_positions of the lines
"""
num_lines = len(page.line_numbers)
ymin = page.text_field.ymin\
if page.text_field is not None\
else 0.0
for line_number in page.line_numbers:
if len([ word.faksimile_positions[0] for word in page.words\
if len(word.faksimile_positions) > 0 and word.line_number == line_number.id ]) > 0:
line_number.faksimile_inner_top = min([ word.faksimile_positions[0].top for word in page.words\
if len(word.faksimile_positions) > 0 and word.line_number == line_number.id ])
line_number.faksimile_inner_bottom = max([ word.faksimile_positions[0].bottom for word in page.words\
if len(word.faksimile_positions) > 0 and word.line_number == line_number.id ])
if line_number.id % 2 == 0:
line_number.faksimile_outer_top = line_number.faksimile_inner_top - ymin
line_number.faksimile_outer_bottom = line_number.faksimile_inner_bottom - ymin
for index, line_number in enumerate(page.line_numbers):
if line_number.faksimile_inner_bottom == 0.0\
or line_number.faksimile_inner_bottom < line_number.faksimile_inner_top:
if index == 0 and num_lines > 1:
line_number.faksimile_inner_bottom = page.line_numbers[index+1].top
elif index == num_lines-1 and page.text_field is not None:
line_number.faksimile_inner_bottom = round(page.text_field.height + page.text_field.ymin, 3)
elif index > 0 and index < num_lines-1:
line_number.faksimile_inner_bottom = page.line_numbers[index+1].faksimile_inner_top\
if page.line_numbers[index+1].faksimile_inner_top > page.line_numbers[index-1].faksimile_inner_bottom\
else page.line_numbers[index-1].faksimile_inner_bottom
line_number.attach_object_to_tree(page.page_tree)
def update_writing_process_ids(page):
"""Update the writing_process_ids of the words and split accordingly.
"""
for word in page.words:
word.set_writing_process_id_to_transkription_positions(page)
word.partition_according_to_writing_process_id()
def usage():
"""prints information on how to use the script
"""
print(main.__doc__)
def main(argv):
"""This program can be used to process words after they have been merged with faksimile data.
svgscripts/process_words_post_merging.py [OPTIONS] <xmlManuscriptFile|svg_pos_file>
<xmlManuscriptFile> a xml file about a manuscript, containing information about its pages.
<svg_pos_file> a xml file about a page, containing information about svg word positions.
OPTIONS:
-h|--help show help
-i|--include-missing-line-number run script on files that contain words without line numbers
-r|--rerun rerun script on a svg_pos_file that has already been processed
:return: exit code (int)
"""
status_not_contain = STATUS_POSTMERGED_OK
include_missing_line_number = False
try:
opts, args = getopt.getopt(argv, "hir", ["help", "include-missing-line-number", "rerun" ])
except getopt.GetoptError:
usage()
return 2
for opt, arg in opts:
if opt in ('-h', '--help'):
usage()
return 0
elif opt in ('-i', '--include-missing-line-number'):
include_missing_line_number = True
elif opt in ('-r', '--rerun'):
status_not_contain = ''
if len(args) < 1:
usage()
return 2
exit_status = 0
file_a = args[0]
if isfile(file_a):
manuscript_file = file_a\
if xml_has_type(FILE_TYPE_XML_MANUSCRIPT, xml_source_file=file_a)\
else None
counter = 0
for page in Page.get_pages_from_xml_file(file_a, status_contains=STATUS_MERGED_OK, status_not_contain=status_not_contain):
reset_page(page)
no_line_numbers = [ word for word in page.words if word.line_number == -1 ]
if not include_missing_line_number and len(no_line_numbers) > 0:
not UNITTESTING and print(Fore.RED + f'Page {page.title}, {page.number} has words with no line number!')
for word in no_line_numbers:
not UNITTESTING and print(f'Word {word.id}: {word.text}')
else:
back_up(page, page.xml_file)
not UNITTESTING and print(Fore.CYAN + f'Processing {page.title}, {page.number} ...' + Style.RESET_ALL)
post_merging_processing_and_saving(page=page, manuscript_file=manuscript_file)
counter += 1
not UNITTESTING and print(Style.RESET_ALL + f'[{counter} pages processed]')
else:
raise FileNotFoundError('File {} does not exist!'.format(file_a))
return exit_status
if __name__ == "__main__":
sys.exit(main(sys.argv[1:]))

Event Timeline