Page Menu
Home
c4science
Search
Configure Global Search
Log In
Files
F98276306
uploader_tasks.py
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Subscribers
None
File Metadata
Details
File Info
Storage
Attached
Created
Sat, Jan 11, 17:03
Size
13 KB
Mime Type
text/x-python
Expires
Mon, Jan 13, 17:03 (1 d, 21 h)
Engine
blob
Format
Raw Data
Handle
23547734
Attached To
R3600 invenio-infoscience
uploader_tasks.py
View Options
# -*- coding: utf-8 -*-
##
## This file is part of Invenio.
## Copyright (C) 2014 CERN.
##
## Invenio is free software; you can redistribute it and/or
## modify it under the terms of the GNU General Public License as
## published by the Free Software Foundation; either version 2 of the
## License, or (at your option) any later version.
##
## Invenio is distributed in the hope that it will be useful, but
## WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
## General Public License for more details.
##
## You should have received a copy of the GNU General Public License
## along with Invenio; if not, write to the Free Software Foundation, Inc.,
## 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA.
"""
Uploader workflow tasks.
Those are the main/common tasks that the uploader will use, they are used
inside the workflows defined in :py:mod:`~invenio.modules.uploader.workflows`.
See: `Simple workflows for Python <https://pypi.python.org/pypi/workflow/1.0>`_
"""
import
os
from
invenio.base.globals
import
cfg
from
invenio.modules.pidstore.models
import
PersistentIdentifier
from
.errors
import
UploaderWorkflowException
###########################################################
############## Pre tasks #################
###########################################################
def
create_records_for_workflow
(
records
,
**
kwargs
):
"""Create the record object from the json.
:param records: List of records to be process.
:kwargs:
"""
from
invenio.modules.records.api
import
Record
for
i
,
obj
in
enumerate
(
records
):
records
[
i
]
=
(
obj
[
0
],
Record
(
json
=
obj
[
1
]))
###########################################################
############## Post tasks #################
###########################################################
def
return_recordids_only
(
records
,
**
kwargs
):
"""Retrieve from the records only the record ID to return them.
:param records: Processed list of records
:parma kwargs:
"""
for
i
,
obj
in
enumerate
(
records
):
records
[
i
]
=
obj
[
1
]
.
get
(
'recid'
)
###########################################################
############## Workflow tasks #################
###########################################################
def
raise_
(
ex
):
"""Helper task to raise an exception."""
def
_raise_
(
obj
,
eng
):
raise
ex
return
_raise_
def
validate
(
step
):
"""Validate the record.
Validate the record using the `validate` method present in each record and
the validation mode, either from the command line options or from
`UPLOADER_VALIDATION_MODE`.
For the validation the `schema` information from the field definition
is used, see `invenio.modules.jsonalchemy.jsonext.parsers.schema_parser`.
"""
def
_validate
(
obj
,
eng
):
record
=
obj
[
1
]
mode
=
eng
.
getVar
(
'options'
,
{})
.
get
(
'validation_mode'
,
cfg
[
'UPLOADER_VALIDATION_MODE'
])
eng
.
log
.
info
(
"Validating record using mode: '
%s
'"
,
(
mode
,
))
if
not
hasattr
(
record
,
'validate'
):
raise
UploaderWorkflowException
(
step
,
msg
=
"An 'validate' method is needed"
)
validator_errors
=
record
.
validate
()
eng
.
log
.
info
(
'Validation errors:
%s
'
%
(
str
(
validator_errors
),
))
if
mode
.
lower
()
==
'strict'
and
validator_errors
:
raise
UploaderWorkflowException
(
step
,
msg
=
"One or more validation errors have occurred, please"
" check them or change the 'validation_mode' to "
"'permissive'.
\n
%s
"
%
(
str
(
validator_errors
),
))
eng
.
log
.
info
(
'Finish validating the current record'
)
return
_validate
def
retrieve_record_id_from_pids
(
step
):
"""Retrieve the record identifier from a record using its PIDS.
If any PID matches with any in the DB then the record id found is set to
the current `record`
"""
def
_retrieve_record_id_from_pids
(
obj
,
eng
):
record
=
obj
[
1
]
eng
.
log
.
info
(
'Look for PIDs inside the current record'
)
if
not
hasattr
(
record
,
'persistent_identifiers'
):
raise
UploaderWorkflowException
(
step
,
msg
=
"An 'persistent_identifiers' method is needed"
)
for
pid_name
,
pid_values
in
record
.
persistent_identifiers
:
eng
.
log
.
info
(
"Found PID '
%s
' trying to match it"
,
(
pid_name
,
))
matching_recids
=
set
()
for
possible_pid
in
pid_values
:
eng
.
log
.
info
(
"Looking for PID
%s
"
,
(
possible_pid
,
))
pid
=
PersistentIdentifier
.
get
(
possible_pid
.
get
(
'type'
),
possible_pid
.
get
(
'value'
),
possible_pid
.
get
(
'provider'
))
if
pid
:
eng
.
log
.
info
(
"PID found in the data base
%s
"
,
(
pid
.
object_value
,
))
matching_recids
.
add
(
pid
.
object_value
)
if
len
(
matching_recids
)
>
1
:
raise
UploaderWorkflowException
(
step
,
msg
=
"Found multiple match in the database,
%s
"
"for '
%s
'"
%
(
repr
(
matching_recids
),
pid_name
))
elif
matching_recids
:
record
[
'recid'
]
=
matching_recids
.
pop
()
eng
.
log
.
info
(
'Finish looking for PIDs inside the current record'
)
break
eng
.
log
.
info
(
'Finish looking for PIDs inside the current record'
)
return
_retrieve_record_id_from_pids
def
reserve_record_id
(
step
):
"""Reserve a new record id for the current object and set it inside."""
# TODO: manage exceptions in a better way
def
_reserve_record_id
(
obj
,
eng
):
record
=
obj
[
1
]
eng
.
log
.
info
(
'Reserve a recid for the new record'
)
try
:
pid
=
PersistentIdentifier
.
create
(
'recid'
,
pid_value
=
None
,
pid_provider
=
'invenio'
)
record
[
'recid'
]
=
int
(
pid
.
pid_value
)
pid
.
reserve
()
eng
.
log
.
info
(
"Finish reserving a recid '
%s
' for the new record"
,
(
pid
.
pid_value
,
))
except
Exception
as
e
:
raise
UploaderWorkflowException
(
step
,
e
.
message
)
return
_reserve_record_id
def
save_record
(
step
):
"""Save the record to the DB using the `_save` method from it."""
def
_save
(
obj
,
eng
):
record
=
obj
[
1
]
eng
.
log
.
info
(
'Saving record to DB'
)
if
not
hasattr
(
record
,
'_save'
):
raise
UploaderWorkflowException
(
step
,
msg
=
"An '_save' method is needed"
)
try
:
record
.
_save
()
eng
.
log
.
info
(
'Record saved to DB'
)
except
Exception
as
e
:
raise
UploaderWorkflowException
(
step
,
e
.
message
)
return
_save
def
save_master_format
(
step
):
"""Put the master format info the `bfmt` DB table."""
def
_save_master_format
(
obj
,
eng
):
from
invenio.base.helpers
import
utf8ifier
from
invenio.modules.formatter.models
import
Bibfmt
from
invenio.ext.sqlalchemy
import
db
from
zlib
import
compress
eng
.
log
.
info
(
'Saving master record to DB'
)
bibfmt
=
Bibfmt
(
id_bibrec
=
obj
[
1
][
'recid'
],
format
=
obj
[
1
]
.
additional_info
.
master_format
,
kind
=
'master'
,
last_updated
=
obj
[
1
][
'modification_date'
],
value
=
compress
(
utf8ifier
(
obj
[
0
]
if
obj
[
1
]
.
additional_info
.
master_format
==
'marc'
else
obj
[
1
]
.
legacy_export_as_marc
()
)))
db
.
session
.
add
(
bibfmt
)
db
.
session
.
commit
()
eng
.
log
.
info
(
'Master record saved to DB'
)
return
_save_master_format
def
update_pidstore
(
step
):
"""Save each PID present in the record to the PID storage."""
# TODO: manage exceptions
def
_update_pidstore
(
obj
,
eng
):
record
=
obj
[
1
]
eng
.
log
.
info
(
'Look for PIDs inside the current record and register '
'them in the DB'
)
if
not
hasattr
(
record
,
'persistent_identifiers'
):
raise
UploaderWorkflowException
(
step
,
msg
=
"An 'persistent_identifiers' method is needed"
)
eng
.
log
.
info
(
"Found PIDs '
%s
'"
,
(
record
.
persistent_identifiers
,
))
for
pid_name
,
pid_values
in
record
.
persistent_identifiers
:
eng
.
log
.
info
(
"Found PID '
%s
'"
,
(
pid_name
,
))
for
pid_value
in
pid_values
:
pid
=
PersistentIdentifier
.
get
(
pid_value
.
get
(
'type'
),
pid_value
.
get
(
'value'
),
pid_value
.
get
(
'provider'
))
if
pid
is
None
:
pid
=
PersistentIdentifier
.
create
(
pid_value
.
get
(
'type'
),
pid_value
.
get
(
'value'
),
pid_value
.
get
(
'provider'
))
if
not
pid
.
has_object
(
'rec'
,
record
[
'recid'
]):
pid
.
assign
(
'rec'
,
record
[
'recid'
])
eng
.
log
.
info
(
'Finish looking for PIDs inside the current record and '
'register them in the DB'
)
return
_update_pidstore
def
manage_attached_documents
(
step
):
"""Attach and treat all the documents embeded in the input filex."""
from
invenio.modules.documents
import
api
from
invenio.modules.documents.tasks
import
set_document_contents
from
invenio.modules.records.utils
import
name_generator
def
_manage_attached_documents
(
obj
,
eng
):
record
=
obj
[
1
]
filename
=
eng
.
getVar
(
'options'
)
.
get
(
'filename'
)
dirname
=
os
.
path
.
abspath
(
os
.
path
.
dirname
(
filename
))
\
if
filename
is
not
None
else
os
.
curdir
def
_check_path
(
source
):
"""Check if the ``source`` path.
If it is relative path than the directory path of original blob
filename, if defined, or the current directory will be prepended.
"""
if
not
os
.
path
.
isabs
(
source
):
new_source
=
os
.
path
.
join
(
dirname
,
source
)
if
os
.
path
.
exists
(
new_source
):
return
new_source
eng
.
log
.
error
(
'File
%s
does not exist.'
,
(
new_source
,))
return
source
eng
.
log
.
info
(
'Look documents to manage'
)
def
_create_document
(
metadata
,
record
):
metadata
[
'source'
]
=
_check_path
(
metadata
[
'source'
])
if
'_documents'
not
in
record
:
record
[
'_documents'
]
=
[]
model
=
metadata
.
pop
(
'model'
,
'record_document_base'
)
if
'recids'
not
in
metadata
:
metadata
[
'recids'
]
=
list
()
if
record
.
get
(
'recid'
,
-
1
)
not
in
metadata
[
'recids'
]:
metadata
[
'recids'
]
.
append
(
record
.
get
(
'recid'
,
-
1
),
)
document
=
api
.
Document
.
create
(
metadata
,
model
=
model
)
eng
.
log
.
info
(
'Document
%s
created'
,
(
document
[
'_id'
],))
record
[
'_documents'
]
.
append
((
document
[
'title'
],
document
[
'_id'
]))
return
document
if
'files_to_upload'
in
record
:
eng
.
log
.
info
(
'Documents to upload found'
)
files_to_upload
=
record
.
get
(
'files_to_upload'
,
[])
for
file_to_upload
in
files_to_upload
:
document
=
_create_document
(
file_to_upload
,
record
)
set_document_contents
.
delay
(
document
[
'_id'
],
document
[
'source'
],
name_generator
(
document
)
)
eng
.
log
.
info
(
'Finish creating documents, delete temporary key'
)
del
record
[
'files_to_upload'
]
if
'files_to_link'
in
record
:
eng
.
log
.
info
(
'Documents to link found'
)
files_to_link
=
record
.
get
(
'files_to_link'
,
[])
for
file_to_link
in
files_to_link
:
_create_document
(
file_to_link
,
record
)
eng
.
log
.
info
(
'Finish linking documents, delete temporary key'
)
del
record
[
'files_to_link'
]
return
_manage_attached_documents
def
legacy
(
step
):
"""Update legacy bibxxx tables."""
def
_legacy
(
obj
,
eng
):
record
=
obj
[
1
]
if
record
.
additional_info
.
master_format
!=
'marc'
:
return
import
marshal
from
invenio.legacy.bibupload.engine
import
(
CFG_BIBUPLOAD_DISABLE_RECORD_REVISIONS
,
CFG_BIBUPLOAD_SERIALIZE_RECORD_STRUCTURE
,
archive_marcxml_for_history
,
update_bibfmt_format
,
update_database_with_metadata
,
)
modification_date
=
record
[
'modification_date'
]
.
strftime
(
'%Y-%m-
%d
%H:%M:%S'
)
update_bibfmt_format
(
record
[
'recid'
],
record
.
legacy_export_as_marc
(),
'xm'
,
modification_date
)
if
CFG_BIBUPLOAD_SERIALIZE_RECORD_STRUCTURE
:
update_bibfmt_format
(
record
[
'recid'
],
marshal
.
dumps
(
record
.
legacy_create_recstruct
()),
'recstruct'
,
modification_date
)
if
not
CFG_BIBUPLOAD_DISABLE_RECORD_REVISIONS
:
archive_marcxml_for_history
(
record
[
'recid'
],
affected_fields
=
{}
)
update_database_with_metadata
(
record
.
legacy_create_recstruct
(),
record
[
'recid'
]
)
eng
.
log
.
info
(
'Finishing legacy task for record {0}'
.
format
(
record
[
'recid'
])
)
return
_legacy
Event Timeline
Log In to Comment