Page Menu
Home
c4science
Search
Configure Global Search
Log In
Files
F90471012
process_copX.py
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Subscribers
None
File Metadata
Details
File Info
Storage
Attached
Created
Fri, Nov 1, 23:51
Size
10 KB
Mime Type
text/x-python
Expires
Sun, Nov 3, 23:51 (1 d, 23 h)
Engine
blob
Format
Raw Data
Handle
22082478
Attached To
R10013 cop-mining-participants
process_copX.py
View Options
### Jan Linder
# Import libraries
import
pytesseract
import
sys
from
pdf2image
import
convert_from_path
import
os
import
re
import
pandas
as
pd
from
pathlib
import
Path
import
PyPDF2
# Constants
uppercaseAbbrev
=
[
"US"
,
"USA"
,
"AO"
,
"UK"
,
"WWF-US"
,
"WWF-UK"
,
"EPFL"
]
# These are not family names but abbreviations
salutoryAddresses
=
(
"Mr"
,
"Ms"
,
"Sr"
,
"Sra"
,
"H.E."
,
"S.E."
,
"M."
,
"Mme"
,
"Dr."
)
# Must be a tuple for the funtion "startsWith" of String
defaultStartPage
=
[
126
,
2
,
3
,
3
]
# TODO add for copN > 4
SEPERATOR
=
"#"
# ----------------- Classes -----------------
class
COP_Processor
():
def
__init__
(
self
,
copN
,
intermediateName
,
outputName
):
self
.
copN
=
copN
self
.
intermediateName
=
intermediateName
self
.
outputName
=
outputName
# startPage and endPage denote the first and last (exclusive) pages of the pdf to be included. Returns true if successful
def
__doOCR
(
self
,
startPage
,
endPage
):
PDF_file
=
self
.
__getPDFpath
()
pages
=
convert_from_path
(
PDF_file
,
dpi
=
200
)
# check indexes
if
startPage
==
0
and
endPage
==
0
:
# set to default page space
startPage
=
defaultStartPage
[
self
.
copN
]
endPage
=
len
(
pages
)
elif
len
(
pages
)
<
endPage
or
endPage
<
startPage
or
startPage
<=
0
:
return
False
# correct the page numbers s.t. they begin counting from 0 and end is exclusive
startPage
-=
1
# Open the file in append mode so that
# All contents of all pages are added to the same file
f
=
open
(
self
.
intermediateName
,
"a"
)
print
(
"Creating images out of the pdf"
)
# Iterate through all the pages stored above
for
i
in
range
(
startPage
,
endPage
):
print
(
"Reading page "
+
str
(
i
))
# Recognize the text as string in image using pytesserct
text
=
str
(((
pytesseract
.
image_to_string
(
pages
[
i
]))))
# Write the processed text to the file.
f
.
write
(
text
)
f
.
close
()
def
__doPDFtoTxt
(
self
):
PDF_file
=
self
.
__getPDFpath
()
f
=
open
(
PDF_file
,
"rb"
)
pdfReader
=
PyPDF2
.
PdfFileReader
(
f
)
size
=
pdfReader
.
numPages
f
=
open
(
self
.
intermediateName
,
"a"
)
for
i
in
range
(
size
):
f
.
write
(
pdfReader
.
getPage
(
i
)
.
extractText
())
f
.
close
()
return
True
def
__processCOP1
(
self
):
file
=
open
(
self
.
intermediateName
,
"r"
)
print
(
"Read file and replace some words"
)
entire_text
=
file
.
read
()
entire_text
=
entire_text
.
replace
(
'Affilliation/Country'
,
''
)
.
replace
(
'Name'
,
''
)
.
replace
(
'—'
,
''
)
.
replace
(
'‘'
,
''
)
.
replace
(
'|'
,
''
)
.
replace
(
'('
,
''
)
.
replace
(
'{'
,
''
)
.
replace
(
'['
,
''
)
# split it to a list
print
(
"split the text up to a list"
)
entire_text_list
=
re
.
split
(
', | |
\n
'
,
entire_text
)
# init constants
familyName
=
""
firstName
=
""
party
=
""
i
=
0
# remove the empty slots
print
(
"remove the empty slots"
)
content_list
=
[
el
for
el
in
entire_text_list
if
el
]
# the resulting dataframe
data
=
pd
.
DataFrame
(
columns
=
{
"family name"
,
"first name"
,
"party"
})
# fill in the data row by row
print
(
"Extract the data"
)
list_size
=
len
(
content_list
)
while
i
<
list_size
:
# go to the next word that is a last name (uppercase)
while
not
content_list
[
i
]
.
isupper
():
i
+=
1
familyName
=
content_list
[
i
]
i
+=
1
#if family name is more than one word
while
content_list
[
i
]
.
isupper
():
familyName
+=
" "
+
content_list
[
i
]
i
+=
1
# assume that first name is only one word
firstName
=
content_list
[
i
]
i
+=
1
# assume that the rest is the organization
while
not
(
i
>=
list_size
or
(
content_list
[
i
]
.
isupper
()
and
not
content_list
[
i
]
in
uppercaseAbbrev
)):
party
=
party
+
" "
+
content_list
[
i
]
i
+=
1
# append the new row to the dataframe
data
=
data
.
append
({
'family name'
:
familyName
,
'first name'
:
firstName
,
'party'
:
party
},
ignore_index
=
True
)
# print the row
print
(
familyName
+
", "
+
firstName
+
": "
+
party
)
#clear the party
party
=
""
##TODO output a csv file or similar
return
True
def
__processCOP2to4
(
self
):
file
=
open
(
self
.
intermediateName
,
"r"
)
print
(
"Read file and replace some words"
)
entire_text
=
file
.
read
()
entire_text
=
entire_text
.
replace
(
''
,
''
)
# split it to a list
print
(
"split the text up to a list"
)
content_list
=
re
.
split
(
'
\n
'
,
entire_text
)
# init constants
name
=
""
description
=
""
party
=
""
i
=
0
# remove the empty slots
#print("remove the empty slots")
#content_list = [el for el in entire_text_list if el]
# the resulting dataframe
data
=
pd
.
DataFrame
(
columns
=
{
"name"
,
"party"
,
"description"
})
# fill in the data row by row
print
(
"Extract the data"
)
list_size
=
len
(
content_list
)
while
i
<
list_size
:
elem
=
content_list
[
i
]
# check if it's a new party
if
elem
.
isupper
()
and
not
elem
in
uppercaseAbbrev
:
# store the last person (if there is one)
if
name
!=
""
:
data
=
data
.
append
({
'name'
:
name
,
'party'
:
party
,
'description'
:
description
},
ignore_index
=
True
)
name
=
""
description
=
""
# set the new party
party
=
elem
.
lower
()
# check if party is over two lines
j
=
0
while
i
+
j
<
list_size
and
content_list
[
i
+
1
]
.
isupper
():
party
+=
content_list
[
i
+
1
]
.
lower
()
j
+=
1
elif
(
elem
.
startswith
(
salutoryAddresses
)):
# a new person
# store the last person
if
name
!=
""
:
data
=
data
.
append
({
'name'
:
name
,
'party'
:
party
,
'description'
:
description
},
ignore_index
=
True
)
name
=
""
description
=
""
# set the new one
name
=
elem
elif
elem
!=
""
:
# add it to the actual persons description
description
+=
elem
+
"; "
i
+=
1
print
(
"do some analysis -------------------------------------------------------------"
)
# sort per party and count
byParty
=
data
.
groupby
(
'party'
)
for
organization
,
people
in
byParty
:
print
(
organization
)
print
(
people
)
print
(
"The number of detected participants is "
+
str
(
len
(
data
.
index
)))
return
True
def
__processCOPnewer
(
self
):
file
=
open
(
self
.
intermediateName
,
"r"
)
print
(
"Read file and replace some words"
)
entire_text
=
file
.
read
()
#replace the bad special characters here
# entire_text = entire_text.encode('utf-8')
entire_text
=
entire_text
.
replace
(
'
\\
r
\\
n'
,
SEPERATOR
)
.
replace
(
'
\\
x0c'
,
''
)
.
replace
(
' (continued)'
,
''
)
.
replace
(
'(continued)'
,
''
)
# split it to a list
print
(
"Split the text up to a list"
)
entire_list
=
re
.
split
(
SEPERATOR
,
entire_text
)
# the resulting dataframe
data
=
pd
.
DataFrame
(
columns
=
{
"name"
,
"party"
,
"description"
})
# other variables
party
=
""
name
=
""
description
=
""
print
(
"Extract the data"
)
size
=
len
(
entire_list
)
i
=
0
# skip everything until the real participant list begins
while
entire_list
[
i
]
.
lower
()
!=
"parties"
:
i
+=
1
# extract the list
i
+=
1
while
i
<
size
:
party
=
entire_list
[
i
]
i
+=
1
# extract all the names of this party
while
i
<
size
and
entire_list
[
i
]
.
startswith
(
salutoryAddresses
):
name
=
entire_list
[
i
]
i
+=
1
# the rest is description
while
i
<
size
-
1
and
entire_list
[
i
]
!=
""
and
not
entire_list
[
i
]
.
startswith
(
salutoryAddresses
):
description
+=
entire_list
[
i
]
+
" "
i
+=
1
# add the person to the dataframe
data
=
data
.
append
({
'name'
:
name
,
'party'
:
party
,
'description'
:
description
},
ignore_index
=
True
)
description
=
""
while
i
<
size
and
entire_list
[
i
]
==
""
:
i
+=
1
while
i
<
size
and
entire_list
[
i
]
==
""
:
i
+=
1
print
(
"do some analysis -------------------------------------------------------------"
)
# sort per party and count
byParty
=
data
.
groupby
(
'party'
)
for
organization
,
people
in
byParty
:
print
(
organization
)
print
(
people
)
print
(
"The number of detected participants is "
+
str
(
len
(
data
.
index
)))
return
True
def
__getPDFpath
(
self
):
return
Path
(
"files/participants-cop"
+
str
(
self
.
copN
)
+
".pdf"
)
# The method that will be called from outside to convert the page
def
pdfToData
(
self
,
startPage
=
0
,
endPage
=
0
):
if
self
.
copN
<=
4
and
self
.
copN
>
0
:
# use OCR if intermediate file does not exist yet
if
not
os
.
path
.
isfile
(
self
.
intermediateName
):
self
.
__doOCR
(
startPage
,
endPage
)
# we now have it as a txt file
if
self
.
copN
==
1
:
return
self
.
__processCOP1
()
else
:
return
self
.
__processCOP2to4
()
else
:
# use other method
if
not
os
.
path
.
isfile
(
self
.
intermediateName
):
self
.
__doPDFtoTxt
()
# the data is now available as a csv file
return
self
.
__processCOPnewer
()
# ----------------------------------------------------------
# format: process_copX <numberOfCop> <intermediateFilename> <outputFilename> (<startPage> <endPage>)
# the last option is given if the OCR has already been done (for cop 1 - 4)
# parse arguments
arguments
=
sys
.
argv
copNumber
=
int
(
arguments
[
1
])
intermediateFilename
=
arguments
[
2
]
outputFilename
=
arguments
[
3
]
startPage
=
0
endPage
=
0
if
(
len
(
arguments
)
==
6
):
startPage
=
int
(
arguments
[
4
])
endPage
=
int
(
arguments
[
5
])
proc
=
COP_Processor
(
copNumber
,
intermediateFilename
,
outputFilename
)
success
=
proc
.
pdfToData
(
startPage
,
endPage
)
if
success
:
print
(
"The data has successfully been extracted"
)
else
:
print
(
"The data couldn't be extracted correctly. Maybe this cop is not implemented yet."
)
Event Timeline
Log In to Comment