Page MenuHomec4science

process_copX.py
No OneTemporary

File Metadata

Created
Fri, Nov 1, 23:51

process_copX.py

### Jan Linder
# Import libraries
import pytesseract
import sys
from pdf2image import convert_from_path
import os
import re
import pandas as pd
from pathlib import Path
import PyPDF2
# Constants
uppercaseAbbrev = ["US", "USA", "AO", "UK", "WWF-US", "WWF-UK", "EPFL"] # These are not family names but abbreviations
salutoryAddresses = ("Mr", "Ms", "Sr", "Sra", "H.E.", "S.E.", "M.", "Mme", "Dr.") # Must be a tuple for the funtion "startsWith" of String
defaultStartPage = [126, 2, 3, 3] # TODO add for copN > 4
SEPERATOR = "#"
# ----------------- Classes -----------------
class COP_Processor():
def __init__(self, copN, intermediateName, outputName):
self.copN = copN
self.intermediateName = intermediateName
self.outputName = outputName
# startPage and endPage denote the first and last (exclusive) pages of the pdf to be included. Returns true if successful
def __doOCR(self, startPage, endPage):
PDF_file = self.__getPDFpath()
pages = convert_from_path(PDF_file, dpi = 200)
# check indexes
if startPage == 0 and endPage == 0:
# set to default page space
startPage = defaultStartPage[self.copN]
endPage = len(pages)
elif len(pages) < endPage or endPage < startPage or startPage <= 0:
return False
# correct the page numbers s.t. they begin counting from 0 and end is exclusive
startPage -= 1
# Open the file in append mode so that
# All contents of all pages are added to the same file
f = open(self.intermediateName, "a")
print("Creating images out of the pdf")
# Iterate through all the pages stored above
for i in range(startPage, endPage):
print("Reading page " + str(i))
# Recognize the text as string in image using pytesserct
text = str(((pytesseract.image_to_string(pages[i]))))
# Write the processed text to the file.
f.write(text)
f.close()
def __doPDFtoTxt(self):
PDF_file = self.__getPDFpath()
f = open(PDF_file, "rb")
pdfReader = PyPDF2.PdfFileReader(f)
size = pdfReader.numPages
f = open(self.intermediateName, "a")
for i in range(size):
f.write(pdfReader.getPage(i).extractText())
f.close()
return True
def __processCOP1(self):
file = open(self.intermediateName, "r")
print("Read file and replace some words")
entire_text = file.read()
entire_text = entire_text.replace('Affilliation/Country', '').replace('Name', '').replace('—','').replace('‘', '').replace('|', '').replace('(', '').replace('{', '').replace('[', '')
# split it to a list
print("split the text up to a list")
entire_text_list = re.split(', | |\n', entire_text)
# init constants
familyName = ""
firstName = ""
party = ""
i = 0
# remove the empty slots
print("remove the empty slots")
content_list = [el for el in entire_text_list if el]
# the resulting dataframe
data = pd.DataFrame(columns = {"family name", "first name", "party"})
# fill in the data row by row
print("Extract the data")
list_size = len(content_list)
while i < list_size:
# go to the next word that is a last name (uppercase)
while not content_list[i].isupper():
i += 1
familyName = content_list[i]
i += 1
#if family name is more than one word
while content_list[i].isupper():
familyName += " " + content_list[i]
i += 1
# assume that first name is only one word
firstName = content_list[i]
i += 1
# assume that the rest is the organization
while not (i >= list_size or (content_list[i].isupper() and not content_list[i] in uppercaseAbbrev)):
party = party + " " + content_list[i]
i += 1
# append the new row to the dataframe
data = data.append({'family name': familyName, 'first name': firstName, 'party': party}, ignore_index = True)
# print the row
print(familyName + ", " + firstName + ": " + party)
#clear the party
party = ""
##TODO output a csv file or similar
return True
def __processCOP2to4(self):
file = open(self.intermediateName, "r")
print("Read file and replace some words")
entire_text = file.read()
entire_text = entire_text.replace(' ', '')
# split it to a list
print("split the text up to a list")
content_list = re.split('\n', entire_text)
# init constants
name = ""
description = ""
party = ""
i = 0
# remove the empty slots
#print("remove the empty slots")
#content_list = [el for el in entire_text_list if el]
# the resulting dataframe
data = pd.DataFrame(columns = {"name", "party", "description"})
# fill in the data row by row
print("Extract the data")
list_size = len(content_list)
while i < list_size:
elem = content_list[i]
# check if it's a new party
if elem.isupper() and not elem in uppercaseAbbrev:
# store the last person (if there is one)
if name != "":
data = data.append({'name': name, 'party': party, 'description': description}, ignore_index = True)
name = ""
description = ""
# set the new party
party = elem.lower()
# check if party is over two lines
j = 0
while i + j < list_size and content_list[i+1].isupper():
party += content_list[i+1].lower()
j += 1
elif(elem.startswith(salutoryAddresses)):
# a new person
# store the last person
if name != "":
data = data.append({'name': name, 'party': party, 'description': description}, ignore_index = True)
name = ""
description = ""
# set the new one
name = elem
elif elem != "":
# add it to the actual persons description
description += elem + "; "
i += 1
print("do some analysis -------------------------------------------------------------")
# sort per party and count
byParty = data.groupby('party')
for organization, people in byParty:
print(organization)
print(people)
print("The number of detected participants is " + str(len(data.index)))
return True
def __processCOPnewer(self):
file = open(self.intermediateName, "r")
print("Read file and replace some words")
entire_text = file.read()
#replace the bad special characters here
# entire_text = entire_text.encode('utf-8')
entire_text = entire_text.replace('\\r\\n', SEPERATOR).replace('\\x0c', '').replace(' (continued)', '').replace('(continued)', '')
# split it to a list
print("Split the text up to a list")
entire_list = re.split(SEPERATOR, entire_text)
# the resulting dataframe
data = pd.DataFrame(columns = {"name", "party", "description"})
# other variables
party = ""
name = ""
description = ""
print("Extract the data")
size = len(entire_list)
i = 0
# skip everything until the real participant list begins
while entire_list[i].lower() != "parties":
i += 1
# extract the list
i += 1
while i < size:
party = entire_list[i]
i += 1
# extract all the names of this party
while i < size and entire_list[i].startswith(salutoryAddresses):
name = entire_list[i]
i += 1
# the rest is description
while i < size - 1 and entire_list[i] != "" and not entire_list[i].startswith(salutoryAddresses):
description += entire_list[i] + " "
i += 1
# add the person to the dataframe
data = data.append({'name': name, 'party': party, 'description': description}, ignore_index = True)
description = ""
while i < size and entire_list[i] == "":
i += 1
while i < size and entire_list[i] == "":
i += 1
print("do some analysis -------------------------------------------------------------")
# sort per party and count
byParty = data.groupby('party')
for organization, people in byParty:
print(organization)
print(people)
print("The number of detected participants is " + str(len(data.index)))
return True
def __getPDFpath(self):
return Path("files/participants-cop" + str(self.copN) + ".pdf")
# The method that will be called from outside to convert the page
def pdfToData(self, startPage = 0, endPage = 0):
if self.copN <= 4 and self.copN > 0 :
# use OCR if intermediate file does not exist yet
if not os.path.isfile(self.intermediateName):
self.__doOCR(startPage, endPage)
# we now have it as a txt file
if self.copN == 1:
return self.__processCOP1()
else:
return self.__processCOP2to4()
else:
# use other method
if not os.path.isfile(self.intermediateName):
self.__doPDFtoTxt()
# the data is now available as a csv file
return self.__processCOPnewer()
# ----------------------------------------------------------
# format: process_copX <numberOfCop> <intermediateFilename> <outputFilename> (<startPage> <endPage>)
# the last option is given if the OCR has already been done (for cop 1 - 4)
# parse arguments
arguments = sys.argv
copNumber = int(arguments[1])
intermediateFilename = arguments[2]
outputFilename = arguments[3]
startPage = 0
endPage = 0
if(len(arguments) == 6):
startPage = int(arguments[4])
endPage = int(arguments[5])
proc = COP_Processor(copNumber, intermediateFilename, outputFilename)
success = proc.pdfToData(startPage, endPage)
if success:
print("The data has successfully been extracted")
else:
print("The data couldn't be extracted correctly. Maybe this cop is not implemented yet.")

Event Timeline