diff --git a/invenio/celery/config.py b/invenio/celery/config.py index 73aa9c6dd..ae148c9a1 100644 --- a/invenio/celery/config.py +++ b/invenio/celery/config.py @@ -1,93 +1,93 @@ # -*- coding: utf-8 -*- ## ## This file is part of Invenio. ## Copyright (C) 2013 CERN. ## ## Invenio is free software; you can redistribute it and/or ## modify it under the terms of the GNU General Public License as ## published by the Free Software Foundation; either version 2 of the ## License, or (at your option) any later version. ## ## Invenio is distributed in the hope that it will be useful, but ## WITHOUT ANY WARRANTY; without even the implied warranty of ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU ## General Public License for more details. ## ## You should have received a copy of the GNU General Public License ## along with Invenio; if not, write to the Free Software Foundation, Inc., ## 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA def default_config(config): """ Provide default configuration for Celery """ ## Broker settings ## --------------- config.setdefault("BROKER_URL", "redis://localhost:6379/1") # Extra modules with tasks which should be loaded # The Invenio Celery loader automatically takes care of loading tasks # defined in *_tasks.py files in 'invenio' package. config.setdefault("CELERY_INCLUDE", [ #"invenio.celery.tasks", - #"invenio.bibworkflow_workers.worker_celery", + #"invenio.modules.workflows.workers.worker_celery", ]) ## Result backend ## -------------- config.setdefault("CELERY_RESULT_BACKEND", "redis://localhost:6379/1") config.setdefault("CELERY_RESULT_SERIALIZER", "pickle") ## Routing ## ------- # ... ## Task execution ## -------------- config.setdefault("CELERY_ALWAYS_EAGER", False) config.setdefault("CELERY_IGNORE_RESULT", False) config.setdefault("CELERY_TASK_SERIALIZER", "pickle") ## Worker ## ------ config.setdefault("CELERYD_MAX_TASKS_PER_CHILD", 1000) ## Error emails ## ------------ config.setdefault("CELERY_SEND_TASK_ERROR_EMAILS", False) if "CFG_SITE_EMERGENCY_EMAIL_ADDRESSES" in config: try: ADMINS = [ ('', x.strip()) for x in config["CFG_SITE_EMERGENCY_EMAIL_ADDRESSES"]['*'].explode(",") ] config.setdefault("ADMINS", ADMINS) except Exception: pass config.setdefault( "SERVER_EMAIL", config.get("CFG_SITE_ADMIN_EMAIL", "celery@localhost") ) config.setdefault( "EMAIL_HOST", config.get("CFG_MISCUTIL_SMTP_HOST", "localhost") ) config.setdefault( "EMAIL_HOST_USER", config.get("CFG_MISCUTIL_SMTP_USER", "") ) config.setdefault( "EMAIL_HOST_PASSWORD", config.get("CFG_MISCUTIL_SMTP_PASS", "") ) config.setdefault( "EMAIL_PORT", config.get("CFG_MISCUTIL_SMTP_PORT", "25") ) config.setdefault( "EMAIL_USE_TLS", config.get("CFG_MISCUTIL_SMTP_TLS", False) ) ## Scheduler ## --------- config.setdefault("CELERYBEAT_SCHEDULE", {}) return config diff --git a/invenio/legacy/oaiharvest/daemon.py b/invenio/legacy/oaiharvest/daemon.py index e21412120..7976e3b15 100644 --- a/invenio/legacy/oaiharvest/daemon.py +++ b/invenio/legacy/oaiharvest/daemon.py @@ -1,1619 +1,464 @@ # -*- coding: utf-8 -*- ## ## This file is part of Invenio. ## Copyright (C) 2009, 2010, 2011 CERN. ## ## Invenio is free software; you can redistribute it and/or ## modify it under the terms of the GNU General Public License as ## published by the Free Software Foundation; either version 2 of the ## License, or (at your option) any later version. ## ## Invenio is distributed in the hope that it will be useful, but ## WITHOUT ANY WARRANTY; without even the implied warranty of ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU ## General Public License for more details. ## ## You should have received a copy of the GNU General Public License ## along with Invenio; if not, write to the Free Software Foundation, Inc., ## 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA. """ OAI Harvest daemon - harvest records from OAI repositories. If started via CLI with --verb parameters, starts a manual single-shot harvesting. Otherwise starts a BibSched task for periodical harvesting of repositories defined in the OAI Harvest admin interface """ - __revision__ = "$Id$" -import os import sys import getopt import getpass import re import time -import calendar -import shutil -import tempfile import urlparse -import random -from invenio.config import \ - CFG_BINDIR, \ - CFG_TMPDIR, \ - CFG_ETCDIR, \ - CFG_INSPIRE_SITE, \ - CFG_CERN_SITE, \ - CFG_PLOTEXTRACTOR_DOWNLOAD_TIMEOUT, \ - CFG_SITE_URL, \ - CFG_OAI_FAILED_HARVESTING_STOP_QUEUE, \ - CFG_OAI_FAILED_HARVESTING_EMAILS_ADMIN -from invenio.legacy.oaiharvest.config import InvenioOAIHarvestWarning -from invenio.legacy.dbquery import run_sql -from invenio.legacy.bibsched.bibtask import \ - task_get_task_param, \ - task_get_option, \ - task_set_option, \ - write_message, \ - task_init, \ - task_sleep_now_if_required, \ - task_update_progress, \ - task_low_level_submission -from invenio.legacy.bibrecord import record_extract_oai_id, create_records, \ - create_record, record_add_fields, \ - record_delete_fields, record_xml_output, \ - record_get_field_instances, \ - record_modify_subfield, \ - record_has_field, field_xml_output +from invenio.config import (CFG_OAI_FAILED_HARVESTING_STOP_QUEUE, + CFG_OAI_FAILED_HARVESTING_EMAILS_ADMIN, + CFG_SITE_SUPPORT_EMAIL + ) from . import getter as oai_harvest_getter -from invenio.base.helpers import with_app_context +from invenio.ext.email import send_email from invenio.ext.logging import register_exception -from invenio.utils.plotextractor.getter import harvest_single, make_single_directory -from invenio.utils.plotextractor.converter import untar -from invenio.utils.plotextractor.cli import process_single, get_defaults -from invenio.utils.shell import run_shell_command, Timeout -from invenio.utils.text import translate_latex2unicode -from invenio.legacy.bibedit.utils import record_find_matching_fields -from invenio.legacy.bibcatalog.api import bibcatalog_system +from invenio.modules.workflows.api import start +from invenio.modules.workflows.models import (BibWorkflowEngineLog, + BibWorkflowObjectLog) +from invenio.modules.workflows.utils import InvenioWorkflowError + +from invenio.legacy.webuser import email_valid_p +from invenio.legacy.bibsched.bibtask import (task_get_task_param, + task_get_option, + task_set_option, + write_message, + task_init + ) +from invenio.legacy.oaiharvest.config import InvenioOAIHarvestWarning +from invenio.legacy.oaiharvest.utils import (compare_timestamps_with_tolerance, + generate_harvest_report) from invenio.legacy import template oaiharvest_templates = template.load('oai_harvest') ## precompile some often-used regexp for speed reasons: -REGEXP_OAI_ID = re.compile("<identifier.*?>(.*?)<\/identifier>", re.DOTALL) -REGEXP_RECORD = re.compile("<record.*?>(.*?)</record>", re.DOTALL) +REGEXP_RECORD = re.compile("<record.*?>(.*?)</>record>", re.DOTALL) REGEXP_REFS = re.compile("<record.*?>.*?<controlfield .*?>.*?</controlfield>(.*?)</record>", re.DOTALL) REGEXP_AUTHLIST = re.compile("<collaborationauthorlist.*?</collaborationauthorlist>", re.DOTALL) from invenio.legacy.bibconvert.registry import templates CFG_OAI_AUTHORLIST_POSTMODE_STYLESHEET = templates.get('authorlist2marcxml.xsl', '') def get_nb_records_in_file(filename): """ Return number of record in FILENAME that is either harvested or converted file. Useful for statistics. """ try: nb = open(filename, 'r').read().count("</record>") except IOError: nb = 0 # file not exists and such except: nb = -1 return nb def task_run_core(): - """Run the harvesting task. The row argument is the oaiharvest task - queue row, containing if, arguments, etc. - Return 1 in case of success and 0 in case of failure. - """ - reposlist = [] - datelist = [] - dateflag = 0 - filepath_prefix = "%s/oaiharvest_%s" % (CFG_TMPDIR, str(task_get_task_param("task_id"))) - ### go ahead: build up the reposlist - if task_get_option("repository") is not None: - ### user requests harvesting from selected repositories - write_message("harvesting from selected repositories") - for reposname in task_get_option("repository"): - row = get_row_from_reposname(reposname) - if row == []: - write_message("source name %s is not valid" % (reposname,)) - continue - else: - reposlist.append(get_row_from_reposname(reposname)) - else: - ### user requests harvesting from all repositories - write_message("harvesting from all repositories in the database") - reposlist = get_all_rows_from_db() + start_time = time.time() - ### go ahead: check if user requested from-until harvesting - if task_get_option("dates"): - ### for each repos simply perform a from-until date harvesting... - ### no need to update anything - dateflag = 1 - for element in task_get_option("dates"): - datelist.append(element) + try: - error_happened_p = 0 # 0: no error, 1: "recoverable" error (don't stop queue), 2: error (admin intervention needed) + workflow = start('generic_harvesting_workflow', data=[123], stop_on_error=True) - j = 0 - for repos in reposlist: - j += 1 - task_sleep_now_if_required() + except InvenioWorkflowError as e: - # Extract values from database row (in exact order): - # | id | baseurl | metadataprefix | arguments | comment - # | bibconvertcfgfile | name | lastrun | frequency - # | postprocess | setspecs | bibfilterprogram - source_id = repos[0][0] - baseurl = str(repos[0][1]) - metadataprefix = str(repos[0][2]) - bibconvert_cfgfile = str(repos[0][5]) - reponame = str(repos[0][6]) - lastrun = repos[0][7] - frequency = repos[0][8] - postmode = repos[0][9] - setspecs = str(repos[0][10]) - bibfilterprogram = str(repos[0][11]) + write_message("ERROR HAPPEN") + write_message("____________Workflow log output____________") - write_message("running in postmode %s" % (postmode,)) - downloaded_material_dict = {} - harvested_files_list = [] - # Harvest phase - harvestpath = "%s_%d_%s_" % (filepath_prefix, j, time.strftime("%Y%m%d%H%M%S")) - if dateflag == 1: - task_update_progress("Harvesting %s from %s to %s (%i/%i)" % \ - (reponame, \ - str(datelist[0]), - str(datelist[1]), - j, \ - len(reposlist))) - exit_code, file_list = oai_harvest_get(prefix=metadataprefix, - baseurl=baseurl, - harvestpath=harvestpath, - fro=str(datelist[0]), - until=str(datelist[1]), - setspecs=setspecs) - if exit_code == 1 : - write_message("source %s was harvested from %s to %s" % \ - (reponame, str(datelist[0]), str(datelist[1]))) - harvested_files_list = file_list - else: - write_message("an error occurred while harvesting from source %s for the dates chosen:\n%s\n" % \ - (reponame, file_list)) - if error_happened_p < 1: - error_happened_p = 1 - continue + workflowlog = BibWorkflowEngineLog.query.filter(BibWorkflowEngineLog.id_object == e.id_workflow).all() - elif dateflag != 1 and lastrun is None and frequency != 0: - write_message("source %s was never harvested before - harvesting whole repository" % \ - (reponame,)) - task_update_progress("Harvesting %s (%i/%i)" % \ - (reponame, - j, \ - len(reposlist))) - exit_code, file_list = oai_harvest_get(prefix=metadataprefix, - baseurl=baseurl, - harvestpath=harvestpath, - setspecs=setspecs) - if exit_code == 1 : - update_lastrun(source_id) - harvested_files_list = file_list - else : - write_message("an error occurred while harvesting from source %s:\n%s\n" % \ - (reponame, file_list)) - if error_happened_p < 1: - error_happened_p = 1 - continue + for log in workflowlog: + write_message(log.message) - elif dateflag != 1 and frequency != 0: - ### check that update is actually needed, - ### i.e. lastrun+frequency>today - timenow = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) - lastrundate = re.sub(r'\.[0-9]+$', '', - str(lastrun)) # remove trailing .00 - timeinsec = int(frequency) * 60 * 60 - updatedue = add_timestamp_and_timelag(lastrundate, timeinsec) - proceed = compare_timestamps_with_tolerance(updatedue, timenow) - if proceed == 0 or proceed == -1 : #update needed! - write_message("source %s is going to be updated" % (reponame,)) - fromdate = str(lastrun) - fromdate = fromdate.split()[0] # get rid of time of the day for the moment - task_update_progress("Harvesting %s (%i/%i)" % \ - (reponame, - j, \ - len(reposlist))) - exit_code, file_list = oai_harvest_get(prefix=metadataprefix, - baseurl=baseurl, - harvestpath=harvestpath, - fro=fromdate, - setspecs=setspecs) - if exit_code == 1 : - update_lastrun(source_id) - harvested_files_list = file_list - else : - write_message("an error occurred while harvesting from source %s:\n%s\n" % \ - (reponame, file_list)) - if error_happened_p < 1: - error_happened_p = 1 - continue - else: - write_message("source %s does not need updating" % (reponame,)) - continue + write_message("ERROR HAPPEN") + write_message("____________Object log output____________") + objectlog = BibWorkflowObjectLog.query.filter(BibWorkflowObjectLog.id_object == e.id_object).all() + for log in objectlog: + write_message(log.message) + execution_time = round(time.time() - start_time, 2) - elif dateflag != 1 and frequency == 0: - write_message("source %s has frequency set to 'Never' so it will not be updated" % \ - (reponame,)) - continue + write_message("Execution time :" + str(execution_time)) - # Harvesting done, now convert/extract/filter/upload as requested - if len(harvested_files_list) < 1: - write_message("No records harvested for %s" % (reponame,)) - continue + raise e + # The workflow already waits for all its children to finish + # We just need to check that they have not failed - # Retrieve all OAI IDs and set active list - harvested_identifier_list = collect_identifiers(harvested_files_list) - active_files_list = harvested_files_list - if len(active_files_list) != len(harvested_identifier_list): - # Harvested files and its identifiers are 'out of sync', abort harvest - write_message("Harvested files miss identifiers for %s" % (reponame,)) - continue - write_message("post-harvest processes started") - # Convert phase - if 'c' in postmode: - updated_files_list = [] - i = 0 - write_message("conversion step started") - for active_file in active_files_list: - i += 1 - task_sleep_now_if_required() - task_update_progress("Converting material harvested from %s (%i/%i)" % \ - (reponame, \ - i, \ - len(active_files_list))) - updated_file = "%s.converted" % (active_file.split('.')[0],) - updated_files_list.append(updated_file) - (exitcode, err_msg) = call_bibconvert(config=bibconvert_cfgfile, - harvestpath=active_file, - convertpath=updated_file) - if exitcode == 0: - write_message("harvested file %s was successfully converted" % \ - (active_file,)) - else: - write_message("an error occurred while converting %s:\n%s" % \ - (active_file, err_msg)) - error_happened_p = 2 - continue - # print stats: - for updated_file in updated_files_list: - write_message("File %s contains %i records." % \ - (updated_file, - get_nb_records_in_file(updated_file))) - active_files_list = updated_files_list - write_message("conversion step ended") - # plotextract phase - if 'p' in postmode: - write_message("plotextraction step started") - # Download tarball for each harvested/converted record, then run plotextrator. - # Update converted xml files with generated xml or add it for upload - updated_files_list = [] - i = 0 - for active_file in active_files_list: - identifiers = harvested_identifier_list[i] - i += 1 - task_sleep_now_if_required() - task_update_progress("Extracting plots from harvested material from %s (%i/%i)" % \ - (reponame, i, len(active_files_list))) - updated_file = "%s.plotextracted" % (active_file.split('.')[0],) - updated_files_list.append(updated_file) - (exitcode, err_msg) = call_plotextractor(active_file, - updated_file, - identifiers, - downloaded_material_dict, - source_id) - if exitcode == 0: - if err_msg != "": - write_message("plots from %s was extracted, but with some errors:\n%s" % \ - (active_file, err_msg)) - else: - write_message("plots from %s was successfully extracted" % \ - (active_file,)) - else: - write_message("an error occurred while extracting plots from %s:\n%s" % \ - (active_file, err_msg)) - error_happened_p = 2 - continue - # print stats: - for updated_file in updated_files_list: - write_message("File %s contains %i records." % \ - (updated_file, - get_nb_records_in_file(updated_file))) - active_files_list = updated_files_list - write_message("plotextraction step ended") - # refextract phase - if 'r' in postmode: - updated_files_list = [] - i = 0 - write_message("refextraction step started") - for active_file in active_files_list: - identifiers = harvested_identifier_list[i] - i += 1 - task_sleep_now_if_required() - task_update_progress("Extracting references from material harvested from %s (%i/%i)" % \ - (reponame, i, len(active_files_list))) - updated_file = "%s.refextracted" % (active_file.split('.')[0],) - updated_files_list.append(updated_file) - (exitcode, err_msg) = call_refextract(active_file, - updated_file, - identifiers, - downloaded_material_dict, - source_id) - if exitcode == 0: - if err_msg != "": - write_message("references from %s was extracted, but with some errors:\n%s" % \ - (active_file, err_msg)) - else: - write_message("references from %s was successfully extracted" % \ - (active_file,)) - else: - write_message("an error occurred while extracting references from %s:\n%s" % \ - (active_file, err_msg)) - error_happened_p = 2 - continue - # print stats: - for updated_file in updated_files_list: - write_message("File %s contains %i records." % \ - (updated_file, - get_nb_records_in_file(updated_file))) - active_files_list = updated_files_list - write_message("refextraction step ended") - # authorlist phase - if 'a' in postmode: - write_message("authorlist extraction step started") - # Initialize BibCatalog connection as default user, if possible - if bibcatalog_system is not None: - bibcatalog_response = bibcatalog_system.check_system() - else: - bibcatalog_response = "No ticket system configured" - if bibcatalog_response != "": - write_message("BibCatalog error: %s\n" % (bibcatalog_response,)) - updated_files_list = [] - i = 0 - for active_file in active_files_list: - identifiers = harvested_identifier_list[i] - i += 1 - task_sleep_now_if_required() - task_update_progress("Extracting any authorlists from material harvested from %s (%i/%i)" % \ - (reponame, i, len(active_files_list))) - updated_file = "%s.authextracted" % (active_file.split('.')[0],) - updated_files_list.append(updated_file) - (exitcode, err_msg) = call_authorlist_extract(active_file, - updated_file, - identifiers, - downloaded_material_dict, - source_id) - if exitcode == 0: - if err_msg != "": - write_message("authorlists from %s was extracted, but with some errors:\n%s" % \ - (active_file, err_msg)) - else: - write_message("any authorlists from %s was successfully extracted" % \ - (active_file,)) - else: - write_message("an error occurred while extracting authorlists from %s:\n%s" % \ - (active_file, err_msg)) - error_happened_p = 2 - continue - # print stats: - for updated_file in updated_files_list: - write_message("File %s contains %i records." % \ - (updated_file, - get_nb_records_in_file(updated_file))) - active_files_list = updated_files_list - write_message("authorlist extraction step ended") - # fulltext phase - if 't' in postmode: - write_message("full-text attachment step started") - # Attaching fulltext - updated_files_list = [] - i = 0 - for active_file in active_files_list: - identifiers = harvested_identifier_list[i] - i += 1 - task_sleep_now_if_required() - task_update_progress("Attaching fulltext to records harvested from %s (%i/%i)" % \ - (reponame, i, len(active_files_list))) - updated_file = "%s.fulltext" % (active_file.split('.')[0],) - updated_files_list.append(updated_file) - (exitcode, err_msg) = call_fulltext(active_file, - updated_file, - identifiers, - downloaded_material_dict, - source_id) - if exitcode == 0: - write_message("fulltext from %s was successfully attached" % \ - (active_file,)) - else: - write_message("an error occurred while attaching fulltext to %s:\n%s" % \ - (active_file, err_msg)) - error_happened_p = 2 - continue - # print stats: - for updated_file in updated_files_list: - write_message("File %s contains %i records." % \ - (updated_file, - get_nb_records_in_file(updated_file))) - active_files_list = updated_files_list - write_message("full-text attachment step ended") - # Filter-phase - if 'f' in postmode: - write_message("filtering step started") - # first call bibfilter: - res = 0 - i = 0 - for active_file in active_files_list: - i += 1 - task_sleep_now_if_required() - task_update_progress("Filtering material harvested from %s (%i/%i)" % \ - (reponame, \ - i, \ - len(active_files_list))) - (exitcode, err_msg) = call_bibfilter(bibfilterprogram, active_file) + workflowlog = BibWorkflowEngineLog.query.filter(BibWorkflowEngineLog.id_object == workflow.uuid).all() - if exitcode == 0: - write_message("%s was successfully bibfiltered" % \ - (active_file,)) - else: - write_message("an error occurred while bibfiltering %s:\n%s" % \ - (active_file, err_msg)) - error_happened_p = 2 - continue - # print stats: - for active_file in active_files_list: - write_message("File %s contains %i records." % \ - (active_file + ".insert.xml", - get_nb_records_in_file(active_file + ".insert.xml"))) - write_message("File %s contains %i records." % \ - (active_file + ".correct.xml", - get_nb_records_in_file(active_file + ".correct.xml"))) - write_message("File %s contains %i records." % \ - (active_file + ".append.xml", - get_nb_records_in_file(active_file + ".append.xml"))) - write_message("File %s contains %i records." % \ - (active_file + ".holdingpen.xml", - get_nb_records_in_file(active_file + ".holdingpen.xml"))) - write_message("filtering step ended") - # Upload files - if "u" in postmode: - write_message("upload step started") - if 'f' in postmode: - upload_modes = [('.insert.xml', '-i'), - ('.correct.xml', '-c'), - ('.append.xml', '-a'), - ('.holdingpen.xml', '-o')] - else: - upload_modes = [('', '-ir')] + for log in workflowlog: + write_message(log.message) - i = 0 - last_upload_task_id = -1 - # Get a random sequence ID that will allow for the tasks to be - # run in order, regardless if parallel task execution is activated - sequence_id = random.randrange(1, 4294967296) - for active_file in active_files_list: - task_sleep_now_if_required() - i += 1 - task_update_progress("Uploading records harvested from %s (%i/%i)" % \ - (reponame, \ - i, \ - len(active_files_list))) - for suffix, mode in upload_modes: - upload_filename = active_file + suffix - if get_nb_records_in_file(upload_filename) == 0: - continue - last_upload_task_id = call_bibupload(upload_filename, \ - [mode], \ - source_id, \ - sequence_id) - if not last_upload_task_id: - error_happened_p = 2 - write_message("an error occurred while uploading %s from %s" % \ - (upload_filename, reponame)) - break - else: - write_message("material harvested from source %s was successfully uploaded" % \ - (reponame,)) - if len(active_files_list) > 0: - write_message("nothing to upload") - write_message("upload step ended") + execution_time = round(time.time() - start_time, 2) + + write_message("Execution time :" + str(execution_time)) - if CFG_INSPIRE_SITE: - # Launch BibIndex,Webcoll update task to show uploaded content quickly - bibindex_params = ['-w', 'reportnumber,collection', \ - '-P', '6', \ - '-I', str(sequence_id), \ - '--post-process', 'bst_run_bibtask[taskname="webcoll", user="oaiharvest", P="6", c="HEP"]'] - task_low_level_submission("bibindex", "oaiharvest", *tuple(bibindex_params)) + #For each File - write_message("post-harvest processes ended") + # 0: no error + # 1: "recoverable" error (don't stop queue) + # 2: error (admin intervention needed) + error_happened_p = 0 + + # Generate reports + ticket_queue = task_get_option("create-ticket-in") + notification_email = task_get_option("notify-email-to") + if ticket_queue or notification_email: + subject, text = generate_harvest_report(repository, + harvested_identifier_list, + uploaded_task_ids, + active_files_list, + task_specific_name=task_get_task_param("task_specific_name") or "", + current_task_id=task_get_task_param("task_id"), + manual_harvest=bool(identifiers), + error_happened=bool(error_happened_p)) + # Create ticket for finished harvest? + if ticket_queue: + ticketid = create_ticket(ticket_queue, subject=subject, text=text) + if ticketid: + write_message("Ticket %s submitted." % (str(ticketid),)) + + # Send e-mail for finished harvest? + if notification_email: + send_email(fromaddr=CFG_SITE_SUPPORT_EMAIL, + toaddr=notification_email, + subject=subject, + content=text) + # All records from all repositories harvested. Check for any errors. if error_happened_p: - if CFG_OAI_FAILED_HARVESTING_STOP_QUEUE == 0 or \ - not task_get_task_param("sleeptime") or \ - error_happened_p > 1: + if CFG_OAI_FAILED_HARVESTING_STOP_QUEUE == 0 or not task_get_task_param("sleeptime") or error_happened_p > 1: # Admin want BibSched to stop, or the task is not set to # run at a later date: we must stop the queue. write_message("An error occurred. Task is configured to stop") return False else: # An error happened, but it can be recovered at next run # (task is re-scheduled) and admin set BibSched to # continue even after failure. write_message("An error occurred, but task is configured to continue") if CFG_OAI_FAILED_HARVESTING_EMAILS_ADMIN: try: - raise InvenioOAIHarvestWarning("OAIHarvest (task #%s) failed at fully harvesting source(s) %s. BibSched has NOT been stopped, and OAIHarvest will try to recover at next run" % (task_get_task_param("task_id"), ", ".join([repo[0][6] for repo in reposlist]),)) - except InvenioOAIHarvestWarning, e: + + raise InvenioOAIHarvestWarning("OAIHarvest (task #%s) failed at fully harvesting source(s) %s." + " BibSched has NOT been stopped, and OAIHarvest will try to rec" + "over at next run" % (task_get_task_param("task_id"), + ", ".join([repo[0][6] for repo + in reposlist]),)) + except InvenioOAIHarvestWarning: register_exception(stream='warning', alert_admin=True) return True else: return True -def collect_identifiers(harvested_file_list): - """Collects all OAI PMH identifiers from each file in the list - and adds them to a list of identifiers per file. - - @param harvested_file_list: list of filepaths to harvested files - - @return list of lists, containing each files' identifier list""" - result = [] - for harvested_file in harvested_file_list: - try: - fd_active = open(harvested_file) - except IOError: - write_message("Error opening harvested file '%s'. Skipping.." % (harvested_file,)) - continue - data = fd_active.read() - fd_active.close() - result.append(REGEXP_OAI_ID.findall(data)) - return result - -def remove_duplicates(harvested_file_list): - """ - Go through a list of harvested files and remove any duplicate records. - """ - harvested_identifiers = [] - for harvested_file in harvested_file_list: - # Firstly, rename original file to temporary name - try: - os.rename(harvested_file, "%s~" % (harvested_file,)) - except OSError: - write_message("Error renaming harvested file '%s'. Skipping.." % (harvested_file,)) - continue - # Secondly, open files for writing and reading - try: - updated_harvested_file = open(harvested_file, 'w') - original_harvested_file = open("%s~" % (harvested_file,)) - except IOError: - write_message("Error opening harvested file '%s'. Skipping.." % (harvested_file,)) - continue - data = original_harvested_file.read() - original_harvested_file.close() - - # Get and write OAI-PMH XML header data to updated file - header_index_end = data.find("<ListRecords>") + len("<ListRecords>") - updated_harvested_file.write("%s\n" % (data[:header_index_end],)) - - # By checking the OAI ID we write all records not written previously (in any file) - harvested_records = REGEXP_RECORD.findall(data) - for record in harvested_records: - oai_identifier = REGEXP_OAI_ID.search(record) - if oai_identifier != None and oai_identifier.group(1) not in harvested_identifiers: - updated_harvested_file.write("<record>%s</record>\n" % (record,)) - harvested_identifiers.append(oai_identifier.group(1)) - updated_harvested_file.write("</ListRecords>\n</OAI-PMH>\n") - updated_harvested_file.close() - -def add_timestamp_and_timelag(timestamp, - timelag): - """ Adds a time lag in seconds to a given date (timestamp). - Returns the resulting date. """ - # remove any trailing .00 in timestamp: - timestamp = re.sub(r'\.[0-9]+$', '', timestamp) - # first convert timestamp to Unix epoch seconds: - timestamp_seconds = calendar.timegm(time.strptime(timestamp, - "%Y-%m-%d %H:%M:%S")) - # now add them: - result_seconds = timestamp_seconds + timelag - result = time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime(result_seconds)) - return result - -def update_lastrun(index): - """ A method that updates the lastrun of a repository - successfully harvested """ - try: - today = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) - sql = 'UPDATE oaiHARVEST SET lastrun=%s WHERE id=%s' - run_sql(sql, (today, index)) - return 1 - except StandardError, e: - return (0, e) - -def oai_harvest_get(prefix, baseurl, harvestpath, - fro=None, until=None, setspecs=None, - user=None, password=None, cert_file=None, - key_file=None, method="POST"): - """ - Retrieve OAI records from given repository, with given arguments - """ - try: - (addressing_scheme, network_location, path, dummy1, \ - dummy2, dummy3) = urlparse.urlparse(baseurl) - secure = (addressing_scheme == "https") - - http_param_dict = {'verb': "ListRecords", - 'metadataPrefix': prefix} - if fro: - http_param_dict['from'] = fro - if until: - http_param_dict['until'] = until - sets = None - if setspecs: - sets = [oai_set.strip() for oai_set in setspecs.split(' ')] - - harvested_files = oai_harvest_getter.harvest(network_location, path, http_param_dict, method, harvestpath, - sets, secure, user, password, cert_file, key_file) - remove_duplicates(harvested_files) - return (1, harvested_files) - except (StandardError, oai_harvest_getter.InvenioOAIRequestError), e: - return (0, e) - -def call_bibconvert(config, harvestpath, convertpath): - """ Call BibConvert to convert file given at 'harvestpath' with - conversion template 'config', and save the result in file at - 'convertpath'. - - Returns status exit code of the conversion, as well as error - messages, if any - """ - exitcode, dummy, cmd_stderr = \ - run_shell_command(cmd="%s/bibconvert -c %s < %s", \ - args=(CFG_BINDIR, config, harvestpath), filename_out=convertpath) - return (exitcode, cmd_stderr) - -def call_plotextractor(active_file, extracted_file, harvested_identifier_list, \ - downloaded_files, source_id): - """ - Function that generates proper MARCXML containing harvested plots for - each record. - - @param active_file: path to the currently processed file - @param extracted_file: path to the file where the final results will be saved - @param harvested_identifier_list: list of OAI identifiers for this active_file - @param downloaded_files: dict of identifier -> dict mappings for downloaded material. - @param source_id: the repository identifier - @type source_id: integer - @return: exitcode and any error messages as: (exitcode, err_msg) - """ - all_err_msg = [] - exitcode = 0 - # Read in active file - recs_fd = open(active_file, 'r') - records = recs_fd.read() - recs_fd.close() - - # Find all record - record_xmls = REGEXP_RECORD.findall(records) - updated_xml = ['<?xml version="1.0" encoding="UTF-8"?>'] - updated_xml.append('<collection>') - i = 0 - for record_xml in record_xmls: - current_exitcode = 0 - identifier = harvested_identifier_list[i] - i += 1 - if identifier not in downloaded_files: - downloaded_files[identifier] = {} - updated_xml.append("<record>") - updated_xml.append(record_xml) - if not oaiharvest_templates.tmpl_should_process_record_with_mode(record_xml, 'p', source_id): - # We skip this record - updated_xml.append("</record>") - continue - if "tarball" not in downloaded_files[identifier]: - current_exitcode, err_msg, tarball, dummy = \ - plotextractor_harvest(identifier, active_file, selection=["tarball"]) - if current_exitcode != 0: - exitcode = current_exitcode - all_err_msg.append(err_msg) - else: - downloaded_files[identifier]["tarball"] = tarball - if current_exitcode == 0: - plotextracted_xml_path = process_single(downloaded_files[identifier]["tarball"]) - if plotextracted_xml_path != None: - # We store the path to the directory the tarball contents live - downloaded_files[identifier]["tarball-extracted"] = os.path.split(plotextracted_xml_path)[0] - # Read and grab MARCXML from plotextractor run - plotsxml_fd = open(plotextracted_xml_path, 'r') - plotextracted_xml = plotsxml_fd.read() - plotsxml_fd.close() - re_list = REGEXP_RECORD.findall(plotextracted_xml) - if re_list != []: - updated_xml.append(re_list[0]) - updated_xml.append("</record>") - updated_xml.append('</collection>') - # Write to file - file_fd = open(extracted_file, 'w') - file_fd.write("\n".join(updated_xml)) - file_fd.close() - if len(all_err_msg) > 0: - return exitcode, "\n".join(all_err_msg) - return exitcode, "" - -def call_refextract(active_file, extracted_file, harvested_identifier_list, - downloaded_files, source_id): - """ - Function that calls refextractor to extract references and attach them to - harvested records. It will download the fulltext-pdf for each identifier - if necessary. - - @param active_file: path to the currently processed file - @param extracted_file: path to the file where the final results will be saved - @param harvested_identifier_list: list of OAI identifiers for this active_file - @param downloaded_files: dict of identifier -> dict mappings for downloaded material. - @param source_id: the repository identifier - @type source_id: integer - @return: exitcode and any error messages as: (exitcode, all_err_msg) - """ - all_err_msg = [] - exitcode = 0 - flag = "" - if CFG_INSPIRE_SITE == 1: - flag = "--inspire" - # Read in active file - recs_fd = open(active_file, 'r') - records = recs_fd.read() - recs_fd.close() - - # Find all record - record_xmls = REGEXP_RECORD.findall(records) - updated_xml = ['<?xml version="1.0" encoding="UTF-8"?>'] - updated_xml.append('<collection>') - i = 0 - for record_xml in record_xmls: - current_exitcode = 0 - identifier = harvested_identifier_list[i] - i += 1 - if identifier not in downloaded_files: - downloaded_files[identifier] = {} - updated_xml.append("<record>") - updated_xml.append(record_xml) - if not oaiharvest_templates.tmpl_should_process_record_with_mode(record_xml, 'p', source_id): - # We skip this record - updated_xml.append("</record>") - continue - if "pdf" not in downloaded_files[identifier]: - current_exitcode, err_msg, dummy, pdf = \ - plotextractor_harvest(identifier, active_file, selection=["pdf"]) - if current_exitcode != 0: - exitcode = current_exitcode - all_err_msg.append(err_msg) - else: - downloaded_files[identifier]["pdf"] = pdf - if current_exitcode == 0: - current_exitcode, cmd_stdout, err_msg = run_shell_command(cmd="%s/refextract %s -f '%s'" % \ - (CFG_BINDIR, flag, downloaded_files[identifier]["pdf"])) - if err_msg != "" or current_exitcode != 0: - exitcode = current_exitcode - all_err_msg.append("Error extracting references from id: %s\nError:%s" % \ - (identifier, err_msg)) - else: - references_xml = REGEXP_REFS.search(cmd_stdout) - if references_xml: - updated_xml.append(references_xml.group(1)) - updated_xml.append("</record>") - updated_xml.append('</collection>') - # Write to file - file_fd = open(extracted_file, 'w') - file_fd.write("\n".join(updated_xml)) - file_fd.close() - if len(all_err_msg) > 0: - return exitcode, "\n".join(all_err_msg) - return exitcode, "" - -def call_authorlist_extract(active_file, extracted_file, harvested_identifier_list, - downloaded_files, source_id): - """ - Function that will look in harvested tarball for any authorlists. If found - it will extract and convert the authors using a XSLT stylesheet. - - @param active_file: path to the currently processed file - @type active_file: string - - @param extracted_file: path to the file where the final results will be saved - @type extracted_file: string - - @param harvested_identifier_list: list of OAI identifiers for this active_file - @type harvested_identifier_list: list - - @param downloaded_files: dict of identifier -> dict mappings for downloaded material. - @type downloaded_files: dict - - @param source_id: the repository identifier - @type source_id: integer - - @return: exitcode and any error messages as: (exitcode, all_err_msg) - @rtype: tuple - """ - all_err_msg = [] - exitcode = 0 - - # Read in active file - recs_fd = open(active_file, 'r') - records = recs_fd.read() - recs_fd.close() - - # Find all records - record_xmls = REGEXP_RECORD.findall(records) - updated_xml = ['<?xml version="1.0" encoding="UTF-8"?>'] - updated_xml.append('<collection>') - i = 0 - for record_xml in record_xmls: - current_exitcode = 0 - identifier = harvested_identifier_list[i] - i += 1 - if not oaiharvest_templates.tmpl_should_process_record_with_mode(record_xml, 'p', source_id): - # We skip this record - updated_xml.append("<record>") - updated_xml.append(record_xml) - updated_xml.append("</record>") - continue - - # Grab BibRec instance of current record for later amending - existing_record, status_code, dummy1 = create_record("<record>%s</record>" % (record_xml,)) - if status_code == 0: - all_err_msg.append("Error parsing record, skipping authorlist extraction of: %s\n" % \ - (identifier,)) - updated_xml.append("<record>%s</record>" % (record_xml,)) - continue - if identifier not in downloaded_files: - downloaded_files[identifier] = {} - if "tarball" not in downloaded_files[identifier]: - current_exitcode, err_msg, tarball, dummy = \ - plotextractor_harvest(identifier, active_file, selection=["tarball"]) - if current_exitcode != 0: - exitcode = current_exitcode - all_err_msg.append(err_msg) - else: - downloaded_files[identifier]["tarball"] = tarball - if current_exitcode == 0: - current_exitcode, err_msg, authorlist_xml_path = authorlist_extract(downloaded_files[identifier]["tarball"], \ - identifier, downloaded_files) - if current_exitcode != 0: - exitcode = current_exitcode - all_err_msg.append("Error extracting authors from id: %s\nError:%s" % \ - (identifier, err_msg)) - elif authorlist_xml_path is not None: - ## Authorlist found - # Read and create BibRec - xml_fd = open(authorlist_xml_path, 'r') - author_xml = xml_fd.read() - xml_fd.close() - authorlist_record = create_records(author_xml) - if len(authorlist_record) == 1: - if authorlist_record[0][0] == None: - all_err_msg.append("Error parsing authorlist record for id: %s" % \ - (identifier,)) - continue - authorlist_record = authorlist_record[0][0] - # Convert any LaTeX symbols in authornames - translate_fieldvalues_from_latex(authorlist_record, '100', code='a') - translate_fieldvalues_from_latex(authorlist_record, '700', code='a') - # Look for any UNDEFINED fields in authorlist - key = "UNDEFINED" - matching_fields = record_find_matching_fields(key, authorlist_record, tag='100') \ - + record_find_matching_fields(key, authorlist_record, tag='700') - if len(matching_fields) > 0 and bibcatalog_system != None: - # UNDEFINED found. Create ticket in author queue - ticketid = create_authorlist_ticket(matching_fields, identifier) - if ticketid: - write_message("authorlist RT ticket %d submitted for %s" % (ticketid, identifier)) - else: - all_err_msg.append("Error while submitting RT ticket for %s" % (identifier,)) - # Replace 100,700 fields of original record with extracted fields - record_delete_fields(existing_record, '100') - record_delete_fields(existing_record, '700') - first_author = record_get_field_instances(authorlist_record, '100') - additional_authors = record_get_field_instances(authorlist_record, '700') - record_add_fields(existing_record, '100', first_author) - record_add_fields(existing_record, '700', additional_authors) - updated_xml.append(record_xml_output(existing_record)) - updated_xml.append('</collection>') - # Write to file - file_fd = open(extracted_file, 'w') - file_fd.write("\n".join(updated_xml)) - file_fd.close() - - if len(all_err_msg) > 0: - return exitcode, all_err_msg - return exitcode, "" - -def call_fulltext(active_file, extracted_file, harvested_identifier_list, - downloaded_files, source_id): - """ - Function that calls attach FFT tag for full-text pdf to harvested records. - It will download the fulltext-pdf for each identifier if necessary. - - @param active_file: path to the currently processed file - @param extracted_file: path to the file where the final results will be saved - @param harvested_identifier_list: list of OAI identifiers for this active_file - @param downloaded_files: dict of identifier -> dict mappings for downloaded material. - - @return: exitcode and any error messages as: (exitcode, err_msg) - """ - all_err_msg = [] - exitcode = 0 - # Read in active file - recs_fd = open(active_file, 'r') - records = recs_fd.read() - recs_fd.close() - - # Set doctype FIXME: Remove when parameters are introduced to post-process steps - if CFG_INSPIRE_SITE == 1: - doctype = "arXiv" - elif CFG_CERN_SITE == 1: - doctype = "" - else: - doctype = "" - - # Find all records - record_xmls = REGEXP_RECORD.findall(records) - updated_xml = ['<?xml version="1.0" encoding="UTF-8"?>'] - updated_xml.append('<collection>') - i = 0 - for record_xml in record_xmls: - current_exitcode = 0 - identifier = harvested_identifier_list[i] - i += 1 - if identifier not in downloaded_files: - downloaded_files[identifier] = {} - updated_xml.append("<record>") - updated_xml.append(record_xml) - if not oaiharvest_templates.tmpl_should_process_record_with_mode(record_xml, 'p', source_id): - # We skip this record - updated_xml.append("</record>") - continue - if "pdf" not in downloaded_files[identifier]: - current_exitcode, err_msg, dummy, pdf = \ - plotextractor_harvest(identifier, active_file, selection=["pdf"]) - if current_exitcode != 0: - exitcode = current_exitcode - all_err_msg.append(err_msg) - else: - downloaded_files[identifier]["pdf"] = pdf - if current_exitcode == 0: - fulltext_xml = """ <datafield tag="FFT" ind1=" " ind2=" "> - <subfield code="a">%(url)s</subfield> - <subfield code="t">%(doctype)s</subfield> - </datafield>""" % {'url': downloaded_files[identifier]["pdf"], - 'doctype': doctype} - updated_xml.append(fulltext_xml) - updated_xml.append("</record>") - updated_xml.append('</collection>') - # Write to file - file_fd = open(extracted_file, 'w') - file_fd.write("\n".join(updated_xml)) - file_fd.close() - - if len(all_err_msg) > 0: - return exitcode, "\n".join(all_err_msg) - return exitcode, "" - - -def authorlist_extract(tarball_path, identifier, downloaded_files): - """ - Try to extract the tarball given, if not already extracted, and look for - any XML files that could be authorlists. If any is found, use a XSLT stylesheet - to transform the authorlist into MARCXML author-fields, and return the full path - of resulting conversion. - - @param tarball_path: path to the tarball to check - @type tarball_path: string - - @param identifier: OAI Identifier to the current record - @type identifier: string - - @param downloaded_files: dict of identifier -> dict mappings for downloaded material. - @type downloaded_files: dict - - @return: path to converted authorlist together with exitcode and any error messages as: - (exitcode, err_msg, authorlist_path) - @rtype: tuple - """ - all_err_msg = [] - exitcode = 0 - if "tarball-extracted" not in downloaded_files[identifier]: - # tarball has not been extracted - tar_dir, dummy = get_defaults(tarball=tarball_path, sdir=CFG_TMPDIR, refno_url="") - try: - dummy = untar(tarball_path, tar_dir) - except Timeout: - all_err_msg.append("Timeout during tarball extraction of %s" % (tarball_path,)) - exitcode = 1 - return exitcode, "\n".join(all_err_msg), None - downloaded_files[identifier]["tarball-extracted"] = tar_dir - # tarball is now surely extracted, so we try to fetch all XML in the folder - xml_files_list = find_matching_files(downloaded_files[identifier]["tarball-extracted"], \ - ["xml"]) - # Try to convert authorlist candidates, returning on first success - for xml_file in xml_files_list: - xml_file_fd = open(xml_file, "r") - xml_content = xml_file_fd.read() - xml_file_fd.close() - match = REGEXP_AUTHLIST.findall(xml_content) - if match != []: - tempfile_fd, temp_authorlist_path = tempfile.mkstemp(suffix=".xml", prefix="authorlist_temp", dir=CFG_TMPDIR) - os.write(tempfile_fd, match[0]) - os.close(tempfile_fd) - # Generate file to store conversion results - newfile_fd, authorlist_resultxml_path = tempfile.mkstemp(suffix=".xml", prefix="authorlist_MARCXML", \ - dir=downloaded_files[identifier]["tarball-extracted"]) - os.close(newfile_fd) - exitcode, cmd_stderr = call_bibconvert(config=CFG_OAI_AUTHORLIST_POSTMODE_STYLESHEET, \ - harvestpath=temp_authorlist_path, \ - convertpath=authorlist_resultxml_path) - if cmd_stderr == "" and exitcode == 0: - # Success! - return 0, "", authorlist_resultxml_path - # No valid authorlist found - return 0, "", None - -def plotextractor_harvest(identifier, active_file, selection=["pdf", "tarball"]): - """ - Function that calls plotextractor library to download selected material, - i.e. tarball or pdf, for passed identifier. Returns paths to respective files. - - @param identifier: OAI identifier of the record to harvest - @param active_file: path to the currently processed file - @param selection: list of materials to harvest - - @return: exitcode, errormessages and paths to harvested tarball and fulltexts - (exitcode, err_msg, tarball, pdf) - """ - all_err_msg = [] - exitcode = 0 - active_dir, active_name = os.path.split(active_file) - # turn oaiharvest_23_1_20110214161632_converted -> oaiharvest_23_1_material - # to let harvested material in same folder structure - active_name = "_".join(active_name.split('_')[:-2]) + "_material" - extract_path = make_single_directory(active_dir, active_name) - tarball, pdf = harvest_single(identifier, extract_path, selection) - time.sleep(CFG_PLOTEXTRACTOR_DOWNLOAD_TIMEOUT) - if tarball == None and "tarball" in selection: - all_err_msg.append("Error harvesting tarball from id: %s %s" % \ - (identifier, extract_path)) - exitcode = 1 - if pdf == None and "pdf" in selection: - all_err_msg.append("Error harvesting full-text from id: %s %s" % \ - (identifier, extract_path)) - exitcode = 1 - return exitcode, "\n".join(all_err_msg), tarball, pdf - -def find_matching_files(basedir, filetypes): - """ - This functions tries to find all files matching given filetypes by looking at - all the files and filenames in the given directory, including subdirectories. - - @param basedir: full path to base directory to search in - @type basedir: string - - @param filetypes: list of filetypes, extensions - @type filetypes: list - - @return: exitcode and any error messages as: (exitcode, err_msg) - @rtype: tuple - """ - files_list = [] - for dirpath, dummy0, filenames in os.walk(basedir): - for filename in filenames: - full_path = os.path.join(dirpath, filename) - dummy1, cmd_out, dummy2 = run_shell_command('file %s', (full_path,)) - for filetype in filetypes: - if cmd_out.lower().find(filetype) > -1: - files_list.append(full_path) - elif filename.split('.')[-1].lower() == filetype: - files_list.append(full_path) - return files_list - -def translate_fieldvalues_from_latex(record, tag, code='', encoding='utf-8'): - """ - Given a record and field tag, this function will modify the record by - translating the subfield values of found fields from LaTeX to chosen - encoding for all the subfields with given code (or all if no code is given). - - @param record: record to modify, in BibRec style structure - @type record: dict - - @param tag: tag of fields to modify - @type tag: string - - @param code: restrict the translation to a given subfield code - @type code: string - - @param encoding: scharacter encoding for the new value. Defaults to UTF-8. - @type encoding: string - """ - field_list = record_get_field_instances(record, tag) - for field in field_list: - subfields = field[0] - subfield_index = 0 - for subfield_code, subfield_value in subfields: - if code == '' or subfield_code == code: - newvalue = translate_latex2unicode(subfield_value).encode(encoding) - record_modify_subfield(record, tag, subfield_code, newvalue, \ - subfield_index, field_position_global=field[4]) - subfield_index += 1 - -def create_authorlist_ticket(matching_fields, identifier): - """ - This function will submit a ticket generated by UNDEFINED affiliations - in extracted authors from collaboration authorlists. - - @param matching_fields: list of (tag, field_instances) for UNDEFINED nodes - @type matching_fields: list - - @param identifier: OAI identifier of record - @type identifier: string - - @return: return the ID of the created ticket, or None on failure - @rtype: int or None - """ - if bibcatalog_system is None: - return None - subject = "[OAI Harvest] UNDEFINED affiliations for record %s" % (identifier,) - text = """ -Harvested record with identifier %(ident)s has had its authorlist extracted and contains some UNDEFINED affiliations. - -To see the record, go here: %(baseurl)s/search?p=%(ident)s - -If the record is not there yet, try again later. It may take some time for it to load into the system. - -List of unidentified fields: -%(fields)s - """ % { - 'ident' : identifier, - 'baseurl' : CFG_SITE_URL, - 'fields' : "\n".join([field_xml_output(field, tag) for tag, field_instances in matching_fields \ - for field in field_instances]) - } - queue = "Authors" - ticketid = bibcatalog_system.ticket_submit(subject=subject, queue=queue) - if bibcatalog_system.ticket_comment(None, ticketid, text) == None: - write_message("Error: commenting on ticket %s failed." % (str(ticketid),)) - return ticketid def create_oaiharvest_log(task_id, oai_src_id, marcxmlfile): """ Function which creates the harvesting logs @param task_id bibupload task id """ file_fd = open(marcxmlfile, "r") xml_content = file_fd.read(-1) file_fd.close() create_oaiharvest_log_str(task_id, oai_src_id, xml_content) -def create_oaiharvest_log_str(task_id, oai_src_id, xml_content): - """ - Function which creates the harvesting logs - @param task_id bibupload task id - """ - try: - records = create_records(xml_content) - for record in records: - oai_id = record_extract_oai_id(record[0]) - query = "INSERT INTO oaiHARVESTLOG (id_oaiHARVEST, oai_id, date_harvested, bibupload_task_id) VALUES (%s, %s, NOW(), %s)" - run_sql(query, (str(oai_src_id), str(oai_id), str(task_id))) - except Exception, msg: - print "Logging exception : %s " % (str(msg),) -def call_bibupload(marcxmlfile, mode=None, oai_src_id= -1, sequence_id=None): - """ - Creates a bibupload task for the task scheduler in given mode - on given file. Returns the generated task id and logs the event - in oaiHARVESTLOGS, also adding any given oai source identifier. - - @param marcxmlfile: base-marcxmlfilename to upload - @param mode: mode to upload in - @param oai_src_id: id of current source config - @param sequence_id: sequence-number, if relevant - - @return: task_id if successful, otherwise None. - """ - if mode is None: - mode = ["-r", "-i"] - if os.path.exists(marcxmlfile): - try: - args = mode - # Add job with priority 6 (above normal bibedit tasks) and file to upload to arguments - #FIXME: allow per-harvest arguments - args.extend(["-P", "6", marcxmlfile]) - if sequence_id: - args.extend(['-I', str(sequence_id)]) - task_id = task_low_level_submission("bibupload", "oaiharvest", *tuple(args)) - create_oaiharvest_log(task_id, oai_src_id, marcxmlfile) - except Exception, msg: - write_message("An exception during submitting oaiharvest task occured : %s " % (str(msg))) - return None - return task_id - else: - write_message("marcxmlfile %s does not exist" % (marcxmlfile,)) - return None - -def call_bibfilter(bibfilterprogram, marcxmlfile): - """ - Call bibfilter program BIBFILTERPROGRAM on MARCXMLFILE, which is usually - run before uploading records. - - The bibfilter should produce up to four files called MARCXMLFILE.insert.xml, - MARCXMLFILE.correct.xml, MARCXMLFILE.append.xml and MARCXMLFILE.holdingpen.xml. - The first file contains parts of MARCXML to be uploaded in insert mode, - the second file is uploaded in correct mode, third in append mode and the last file - contains MARCXML to be uploaded into the holding pen. - - @param bibfilterprogram: path to bibfilter script to run - @param marcxmlfile: base-marcxmlfilename - - @return: exitcode and any error messages as: (exitcode, err_msg) - """ - all_err_msg = [] - exitcode = 0 - if bibfilterprogram: - if not os.path.isfile(bibfilterprogram): - all_err_msg.append("bibfilterprogram %s is not a file" % - (bibfilterprogram,)) - exitcode = 1 - elif not os.path.isfile(marcxmlfile): - all_err_msg.append("marcxmlfile %s is not a file" % (marcxmlfile,)) - exitcode = 1 - else: - exitcode, dummy, cmd_stderr = run_shell_command(cmd="%s '%s'", \ - args=(bibfilterprogram, \ - marcxmlfile)) - if exitcode != 0 or cmd_stderr != "": - all_err_msg.append("Error while running filtering script on %s\nError:%s" % \ - (marcxmlfile, cmd_stderr)) - else: - try: - all_err_msg.append("no bibfilterprogram defined, copying %s only" % - (marcxmlfile,)) - shutil.copy(marcxmlfile, marcxmlfile + ".insert.xml") - except: - all_err_msg.append("cannot copy %s into %s.insert.xml" % (marcxmlfile, marcxmlfile)) - exitcode = 1 - return exitcode, "\n".join(all_err_msg) - -def get_row_from_reposname(reposname): - """ Returns all information about a row (OAI source) - from the source name """ - try: - sql = """SELECT id, baseurl, metadataprefix, arguments, - comment, bibconvertcfgfile, name, lastrun, - frequency, postprocess, setspecs, - bibfilterprogram - FROM oaiHARVEST WHERE name=%s""" - res = run_sql(sql, (reposname,)) - reposdata = [] - for element in res: - reposdata.append(element) - return reposdata - except StandardError, e: - return (0, e) - -def get_all_rows_from_db(): - """ This method retrieves the full database of repositories and returns - a list containing (in exact order): - | id | baseurl | metadataprefix | arguments | comment - | bibconvertcfgfile | name | lastrun | frequency - | postprocess | setspecs | bibfilterprogram - """ - try: - reposlist = [] - sql = """SELECT id FROM oaiHARVEST""" - idlist = run_sql(sql) - for index in idlist: - sql = """SELECT id, baseurl, metadataprefix, arguments, - comment, bibconvertcfgfile, name, lastrun, - frequency, postprocess, setspecs, - bibfilterprogram - FROM oaiHARVEST WHERE id=%s""" % index - - reposelements = run_sql(sql) - repos = [] - for element in reposelements: - repos.append(element) - reposlist.append(repos) - return reposlist - except StandardError, e: - return (0, e) - -def compare_timestamps_with_tolerance(timestamp1, - timestamp2, - tolerance=0): - """Compare two timestamps TIMESTAMP1 and TIMESTAMP2, of the form - '2005-03-31 17:37:26'. Optionally receives a TOLERANCE argument - (in seconds). Return -1 if TIMESTAMP1 is less than TIMESTAMP2 - minus TOLERANCE, 0 if they are equal within TOLERANCE limit, - and 1 if TIMESTAMP1 is greater than TIMESTAMP2 plus TOLERANCE. - """ - # remove any trailing .00 in timestamps: - timestamp1 = re.sub(r'\.[0-9]+$', '', timestamp1) - timestamp2 = re.sub(r'\.[0-9]+$', '', timestamp2) - # first convert timestamps to Unix epoch seconds: - timestamp1_seconds = calendar.timegm(time.strptime(timestamp1, - "%Y-%m-%d %H:%M:%S")) - timestamp2_seconds = calendar.timegm(time.strptime(timestamp2, - "%Y-%m-%d %H:%M:%S")) - # now compare them: - if timestamp1_seconds < timestamp2_seconds - tolerance: - return -1 - elif timestamp1_seconds > timestamp2_seconds + tolerance: - return 1 - else: - return 0 def get_dates(dates): """ A method to validate and process the dates input by the user at the command line """ twodates = [] if dates: datestring = dates.split(":") if len(datestring) == 2: for date in datestring: ### perform some checks on the date format datechunks = date.split("-") if len(datechunks) == 3: try: if int(datechunks[0]) and int(datechunks[1]) and \ int(datechunks[2]): twodates.append(date) except StandardError: write_message("Dates have invalid format, not " - "'yyyy-mm-dd:yyyy-mm-dd'") + "'yyyy-mm-dd:yyyy-mm-dd'") twodates = None return twodates else: write_message("Dates have invalid format, not " - "'yyyy-mm-dd:yyyy-mm-dd'") + "'yyyy-mm-dd:yyyy-mm-dd'") twodates = None return twodates - ## final check.. date1 must me smaller than date2 + ## final check.. date1 must me smaller than date2 date1 = str(twodates[0]) + " 01:00:00" date2 = str(twodates[1]) + " 01:00:00" if compare_timestamps_with_tolerance(date1, date2) != -1: write_message("First date must be before second date.") twodates = None return twodates else: write_message("Dates have invalid format, not " - "'yyyy-mm-dd:yyyy-mm-dd'") + "'yyyy-mm-dd:yyyy-mm-dd'") twodates = None else: twodates = None return twodates def get_repository_names(repositories): """ A method to validate and process the repository names input by the user at the command line """ repository_names = [] if repositories: names = repositories.split(",") for name in names: ### take into account both single word names and multiple word ### names (which get wrapped around "" or '') name = name.strip() if name.startswith("'"): name = name.strip("'") elif name.startswith('"'): name = name.strip('"') repository_names.append(name) else: repository_names = None return repository_names def usage(exitcode=0, msg=""): - "Print out info. Only used when run in 'manual' harvesting mode" + """Print out info. Only used when run in 'manual' harvesting mode""" sys.stderr.write("*Manual single-shot harvesting mode*\n") if msg: sys.stderr.write(msg + "\n") sys.exit(exitcode) def main(): """Starts the tool. If the command line arguments are those of the 'manual' mode, then starts a manual one-time harvesting. Else trigger a BibSched task for automated harvesting based on the OAIHarvest admin settings. """ - # Let's try to parse the arguments as used in manual harvesting: try: opts, args = getopt.getopt(sys.argv[1:], "o:v:m:p:i:s:f:u:r:x:c:k:w:l:", ["output=", "verb=", "method=", "metadataPrefix=", "identifier=", "set=", "from=", "until=", "resumptionToken=", "certificate=", "key=", "user=", "password="] - ) + ) # So everything went smoothly: start harvesting in manual mode if len([opt for opt, opt_value in opts if opt in ['-v', '--verb']]) > 0: # verb parameter is given http_param_dict = {} method = "POST" output = "" user = None password = None cert_file = None key_file = None sets = [] # get options and arguments for opt, opt_value in opts: - if opt in ["-v", "--verb"]: + if opt in ["-v", "--verb"]: http_param_dict['verb'] = opt_value elif opt in ["-m", '--method']: if opt_value == "GET" or opt_value == "POST": method = opt_value elif opt in ["-p", "--metadataPrefix"]: http_param_dict['metadataPrefix'] = opt_value elif opt in ["-i", "--identifier"]: http_param_dict['identifier'] = opt_value elif opt in ["-s", "--set"]: sets = opt_value.split() elif opt in ["-f", "--from"]: http_param_dict['from'] = opt_value elif opt in ["-u", "--until"]: http_param_dict['until'] = opt_value elif opt in ["-r", "--resumptionToken"]: http_param_dict['resumptionToken'] = opt_value elif opt in ["-o", "--output"]: output = opt_value elif opt in ["-c", "--certificate"]: cert_file = opt_value elif opt in ["-k", "--key"]: key_file = opt_value elif opt in ["-l", "--user"]: user = opt_value elif opt in ["-w", "--password"]: password = opt_value elif opt in ["-V", "--version"]: print __revision__ sys.exit(0) else: usage(1, "Option %s is not allowed" % opt) if len(args) > 0: base_url = args[-1] if not base_url.lower().startswith('http'): base_url = 'http://' + base_url - (addressing_scheme, network_location, path, dummy1, \ + (addressing_scheme, network_location, path, dummy1, dummy2, dummy3) = urlparse.urlparse(base_url) secure = (addressing_scheme == "https") if (cert_file and not key_file) or \ - (key_file and not cert_file): + (key_file and not cert_file): # Both are needed if one specified usage(1, "You must specify both certificate and key files") if password and not user: # User must be specified when password is given usage(1, "You must specify a username") elif user and not password: if not secure: sys.stderr.write("*WARNING* Your password will be sent in clear!\n") try: password = getpass.getpass() except KeyboardInterrupt, error: sys.stderr.write("\n%s\n" % (error,)) sys.exit(0) oai_harvest_getter.harvest(network_location, path, http_param_dict, method, output, sets, secure, user, password, cert_file, key_file) sys.stderr.write("Harvesting completed at: %s\n\n" % - time.strftime("%Y-%m-%d %H:%M:%S --> ", time.localtime())) + time.strftime("%Y-%m-%d %H:%M:%S --> ", time.localtime())) return else: usage(1, "You must specify the URL to harvest") else: # verb is not given. We will continue with periodic # harvesting. But first check if URL parameter is given: # if it is, then warn directly now - if len(args) > 1 or \ - (len(args) == 1 and not args[0].isdigit()): + + if len([opt for opt, opt_value in opts if opt in ['-i', '--identifier']]) == 0 \ + and len(args) > 1 or \ + (len(args) == 1 and not args[0].isdigit()): usage(1, "You must specify the --verb parameter") except getopt.error, e: # So could it be that we are using different arguments? Try to # start the BibSched task (automated harvesting) and see if it # validates pass # BibSched mode - periodical harvesting # Note that the 'help' is common to both manual and automated # mode. task_set_option("repository", None) task_set_option("dates", None) task_init(authorization_action='runoaiharvest', authorization_msg="oaiharvest Task Submission", description=""" Harvest records from OAI sources. Manual vs automatic harvesting: - Manual harvesting retrieves records from the specified URL, with the specified OAI arguments. Harvested records are displayed on the standard output or saved to a file, but are not integrated into the repository. This mode is useful to 'play' with OAI repositories or to build special harvesting scripts. - Automatic harvesting relies on the settings defined in the OAI Harvest admin interface to periodically retrieve the repositories and sets to harvest. It also take care of harvesting only new or modified records. Records harvested using this mode are converted and integrated into the repository, according to the settings defined in the OAI Harvest admin interface. Examples: Manual (single-shot) harvesting mode: Save to /tmp/z.xml records from CDS added/modified between 2004-04-01 and 2004-04-02, in MARCXML: $ oaiharvest -vListRecords -f2004-04-01 -u2004-04-02 -pmarcxml -o/tmp/z.xml http://cds.cern.ch/oai2d Automatic (periodical) harvesting mode: Schedule daily harvesting of all repositories defined in OAIHarvest admin: $ oaiharvest -s 24h Schedule daily harvesting of repository 'arxiv', defined in OAIHarvest admin: $ oaiharvest -r arxiv -s 24h Harvest in 10 minutes from 'pubmed' repository records added/modified between 2005-05-05 and 2005-05-10: $ oaiharvest -r pubmed -d 2005-05-05:2005-05-10 -t 10m """, - help_specific_usage='Manual single-shot harvesting mode:\n' - ' -o, --output specify output file\n' - ' -v, --verb OAI verb to be executed\n' - ' -m, --method http method (default POST)\n' - ' -p, --metadataPrefix metadata format\n' - ' -i, --identifier OAI identifier\n' - ' -s, --set OAI set(s). Whitespace-separated list\n' - ' -r, --resuptionToken Resume previous harvest\n' - ' -f, --from from date (datestamp)\n' - ' -u, --until until date (datestamp)\n' - ' -c, --certificate path to public certificate (in case of certificate-based harvesting)\n' - ' -k, --key path to private key (in case of certificate-based harvesting)\n' - ' -l, --user username (in case of password-protected harvesting)\n' - ' -w, --password password (in case of password-protected harvesting)\n' - 'Automatic periodical harvesting mode:\n' - ' -r, --repository="repo A"[,"repo B"] \t which repositories to harvest (default=all)\n' - ' -d, --dates=yyyy-mm-dd:yyyy-mm-dd \t reharvest given dates only\n', - version=__revision__, - specific_params=("r:d:", ["repository=", "dates=", ]), - task_submit_elaborate_specific_parameter_fnc= - task_submit_elaborate_specific_parameter, - task_run_fnc=task_run_core) + + help_specific_usage='Manual single-shot harvesting mode:\n' + ' -o, --output specify output file\n' + ' -v, --verb OAI verb to be executed\n' + ' -m, --method http method (default POST)\n' + ' -p, --metadataPrefix metadata format\n' + ' -i, --identifier OAI identifier\n' + ' -s, --set OAI set(s). Whitespace-separated list\n' + ' -r, --resuptionToken Resume previous harvest\n' + ' -f, --from from date (datestamp)\n' + ' -u, --until until date (datestamp)\n' + ' -c, --certificate path to public certificate (in case of certificate-based harvesting)\n' + ' -k, --key path to private key (in case of certificate-based harvesting)\n' + ' -l, --user username (in case of password-protected harvesting)\n' + ' -w, --password password (in case of password-protected harvesting)\n' + 'Deamon mode (periodical or one-shot harvesting mode):\n' + ' -r, --repository="repo A"[,"repo B"] \t which repositories to harvest (default=all)\n' + ' -d, --dates=yyyy-mm-dd:yyyy-mm-dd \t reharvest given dates only\n' + ' -i, --identifier OAI identifier if wished to run in as a task.\n' + ' --notify-email-to Receive notifications on given email on successful upload and/or finished harvest.\n' + ' --create-ticket-in Provide desired ticketing queue to create a ticket in it on upload and/or finished harvest.\n' + ' Requires a configured ticketing system (BibCatalog).\n', + version=__revision__, + specific_params=( + "r:i:d:", ["repository=", "idenfifier=", "dates=", "notify-email-to=", "create-ticket-in="]), + task_submit_elaborate_specific_parameter_fnc=task_submit_elaborate_specific_parameter, + task_run_fnc=task_run_core) + + def task_submit_elaborate_specific_parameter(key, value, opts, args): """Elaborate specific cli parameters for oaiharvest.""" if key in ("-r", "--repository"): task_set_option('repository', get_repository_names(value)) elif key in ("-d", "--dates"): task_set_option('dates', get_dates(value)) if value is not None and task_get_option("dates") is None: - raise StandardError, "Date format not valid." + raise StandardError("Date format not valid.") + elif key in ("--notify-email-to",): + if email_valid_p(value): + task_set_option('notify-email-to', value) + else: + raise StandardError("E-mail format not valid.") + elif key in ("--create-ticket-in",): + task_set_option('create-ticket-in', value) else: return False return True diff --git a/invenio/legacy/oaiharvest/dblayer.py b/invenio/legacy/oaiharvest/dblayer.py index 776e3605d..8173eae77 100644 --- a/invenio/legacy/oaiharvest/dblayer.py +++ b/invenio/legacy/oaiharvest/dblayer.py @@ -1,231 +1,384 @@ ## This file is part of Invenio. ## Copyright (C) 2009, 2010, 2011 CERN. ## ## Invenio is free software; you can redistribute it and/or ## modify it under the terms of the GNU General Public License as ## published by the Free Software Foundation; either version 2 of the ## License, or (at your option) any later version. ## ## Invenio is distributed in the hope that it will be useful, but ## WITHOUT ANY WARRANTY; without even the implied warranty of ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU ## General Public License for more details. ## ## You should have received a copy of the GNU General Public License ## along with Invenio; if not, write to the Free Software Foundation, Inc., ## 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA. from invenio.legacy.dbquery import run_sql class HistoryEntry: date_harvested = None date_inserted = None oai_id = "" record_id = 0 bibupload_task_id = "" inserted_to_db = "" oai_src_id = 0 def __init__(self, date_harvested, date_inserted, oai_src_id, oai_id, record_id, inserted_to_db, bibupload_task_id): self.date_harvested = date_harvested self.date_inserted = date_inserted self.record_id = record_id self.oai_id = oai_id self.bibupload_task_id = bibupload_task_id self.oai_src_id = oai_src_id self.inserted_to_db = inserted_to_db def __repr__(self): return str(self) def __str__(self): return "HistoryEntry(" + \ "date_harvested: " + str(self.date_harvested) + ', ' + \ "date_inserted: " + str(self.date_inserted) + ', ' + \ "oai_id: " + str(self.oai_id) + ', ' + \ "record_id: " + str(self.record_id) + ', ' + \ "bibupload_task_id: " + str(self.bibupload_task_id) + ', ' + \ "inserted_to_db: " + str(self.inserted_to_db) + ', ' + \ "oai_src_id: " + str(self.oai_src_id) + ', ' + ")" def get_history_entries_raw(query_suffix, sqlparameters): """ Internally used function which obtains sql query suffix ( starting from WHERE) and """ query_prefix = "SELECT date_harvested, date_inserted, id_oaiHARVEST, oai_id, id_bibrec, inserted_to_db, bibupload_task_id FROM oaiHARVESTLOG " query = query_prefix + query_suffix res = run_sql(query, sqlparameters) result = [] for entry in res: result.append(HistoryEntry(entry[0], entry[1], \ int(entry[2]), str(entry[3]), int(entry[4]),\ str(entry[5]), int(entry[6]))) return result def get_history_entries(oai_src_id, monthdate, method = "harvested"): sql_column = "date_harvested" if method == "inserted": sql_column = "date_inserted" query_suffix = "WHERE id_oaiHARVEST = %s AND MONTH(" + sql_column + ") = %s AND YEAR(" + sql_column + ") = %s ORDER BY " + sql_column return get_history_entries_raw(query_suffix,(str(oai_src_id), str(monthdate.month), str(monthdate.year))) def get_history_entries_for_day(oai_src_id, date, limit = -1, start = 0, method = "harvested"): """ Returns harvesting history entries for a given day @param oai_src_id: harvesting source identifier @param date: Date designing the deserved day @param limit: How many records (at most) do we want to get @param start: From which index do we want to start ? @param method: method of getting data (two possible values "harvested" and "inserted") Describes if the harvesting or inserting data should be used """ sql_column = "date_harvested" if method == "inserted": sql_column = "date_inserted" query_suffix = "WHERE id_oaiHARVEST = %s AND MONTH(" + sql_column + ") = %s AND YEAR(" + sql_column + ") = %s AND DAY(" + sql_column + ") = %s ORDER BY " + sql_column if limit > 0: query_suffix += " LIMIT " + str(start) + "," + str(limit) return get_history_entries_raw(query_suffix, (str(oai_src_id), str(date.month), str(date.year), str(date.day))) def get_entry_history(oai_id, start = 0, limit = -1 , method = "harvested"): """ Returns harvesting history entries for a given OAI identifier ( Show results from multiple sources ) @limit - How many records (at most) do we want to get @start - From which index do we want to start ? @method - method of getting data (two possible values "harvested" and "inserted") Describes if the harvesting or inserting data should be used """ sql_column = "date_harvested" if method == "inserted": sql_column = "date_inserted" query_suffix = "WHERE oai_id = %s ORDER BY " + sql_column if limit > 0: query_suffix += " LIMIT " + str(start) + "," + str(limit) return get_history_entries_raw(query_suffix, (str(oai_id),)) def get_month_logs_size(oai_src_id, date, method = "harvested"): """ Function which returns number of inserts which took place in given month (splited into days) @param oai_src_id: harvesting source identifier @return: Dictionary of harvesting statistics - keys describe days. values - numbers of inserted recordds """ sql_column = "date_harvested" if method == "inserted": sql_column = "date_inserted" query = "SELECT DAY(" + sql_column + "), COUNT(*) FROM oaiHARVESTLOG WHERE id_oaiHARVEST = %s AND MONTH(" + sql_column + ") = %s AND YEAR(" + sql_column + ")= %s GROUP BY DAY(" + sql_column+ ")" query_result = run_sql(query, (str(oai_src_id), str(date.month), str(date.year))) result = {} for entry in query_result: if int(entry[0]) != 0: result[int(entry[0])] = int(entry[1]) return result def get_day_logs_size(oai_src_id, date, method = "harvested"): """ Function which returns number of inserts which took place in given day @param oai_src_id: harvesting source identifier @return: Number of inserts during the given day """ sql_column = "date_harvested" if method == "inserted": sql_column = "date_inserted" query = "SELECT COUNT(*) FROM oaiHARVESTLOG WHERE id_oaiHARVEST = %s AND MONTH(" + sql_column + ") = %s AND YEAR(" + sql_column+ ")= %s AND DAY(" + sql_column + ") = %s" query_result = run_sql(query, (str(oai_src_id), str(date.month), str(date.year), str(date.day))) for entry in query_result: return int(entry[0]) return 0 def get_entry_logs_size(oai_id): """ Function which returns number of inserts which took place in given day @param oai_src_id: harvesting source identifier @return: Number of inserts during the given day """ query = "SELECT COUNT(*) FROM oaiHARVESTLOG WHERE oai_id = %s" query_result = run_sql(query, (str(oai_id),)) for entry in query_result: return int(entry[0]) return 0 +################################################################## +### Here the functions to retrieve, modify, delete and add sources +################################################################## + +def get_oai_src_by_id(oai_src_id): + """ + Returns a list of dictionaries with source parameters for a given id. + """ + return get_oai_src({'id': oai_src_id}) + + +def get_oai_src_by_name(oai_src_name): + """ + Returns a list of dictionaries with source parameters for a source name. + """ + return get_oai_src({'name': oai_src_name}) + + +def get_all_oai_src(): + """ + Returns a list of dictionaries with source parameters for a given id. + """ + return get_oai_src() + + +def get_oai_src(params={}): + """ + Returns a list of dictionaries each representing a DB row for a OAI source. + """ + sql = """SELECT id, baseurl, metadataprefix, arguments, + comment, name, lastrun, + frequency, postprocess, setspecs + FROM oaiHARVEST""" + sql_params = [] + if params: + for key, value in params.items(): + if "WHERE" not in sql: + sql += " WHERE" + else: + sql += " AND" + sql += " " + key + "=%s" + sql_params.append(value) + + new_res = [] + res = run_sql(sql, sql_params, with_dict=True) + if res: + for result in res: + for key, value in result.iteritems(): + if value is None: + if key == "arguments": + value = {} + else: + value = "" + result[key] = value + new_res.append(result) + return new_res + + +def modify_oai_src(oai_src_id, oai_src_name, oai_src_baseurl, oai_src_prefix, + oai_src_frequency, oai_src_post, oai_src_comment, + oai_src_sets=None, oai_src_args=None): + """Modifies a row's parameters""" + if oai_src_sets is None: + oai_src_sets = [] + if oai_src_post is None: + oai_src_post = [] + if oai_src_args is None: + oai_src_args = {} + sql = """UPDATE oaiHARVEST + SET baseurl=%s, metadataprefix=%s, arguments=%s, comment=%s, + name=%s, frequency=%s, postprocess=%s, setspecs=%s + WHERE id=%s""" + try: + run_sql(sql, (oai_src_baseurl, + oai_src_prefix, + serialize_via_marshal(oai_src_args), + oai_src_comment, + oai_src_name, + oai_src_frequency, + '-'.join(oai_src_post), + ' '.join(oai_src_sets), + oai_src_id)) + return (1, "") + except StandardError, e: + return (0, e) + +def add_oai_src(oai_src_name, oai_src_baseurl, oai_src_prefix, oai_src_frequency, + oai_src_lastrun, oai_src_post, oai_src_comment, + oai_src_sets=None, oai_src_args=None): + """Adds a new row to the database with the given parameters""" + if oai_src_sets is None: + oai_src_sets = [] + if oai_src_args is None: + oai_src_args = {} + #return (0, str(serialize_via_marshal(oai_src_args))) + try: + if oai_src_lastrun in [0, "0"]: lastrun_mode = 'NULL' + else: + lastrun_mode = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + # lastrun_mode = "'"+lastrun_mode+"'" + run_sql("INSERT INTO oaiHARVEST " + "(baseurl, metadataprefix, arguments, comment, name, lastrun, " + "frequency, postprocess, setspecs) VALUES " + "(%s, %s, %s, %s, %s, %s, %s, %s, %s)", + (oai_src_baseurl, oai_src_prefix, serialize_via_marshal(oai_src_args), \ + oai_src_comment, oai_src_name, lastrun_mode, oai_src_frequency, \ + "-".join(oai_src_post), " ".join(oai_src_sets))) + return (1, "") + except StandardError, e: + return (0, e) + +def delete_oai_src(oai_src_id): + """Deletes a row from the database according to its id""" + try: + res = run_sql("DELETE FROM oaiHARVEST WHERE id=%s" % oai_src_id) + return (1, "") + except StandardError, e: + return (0, e) + +def get_tot_oai_src(): + """Returns number of rows in the database""" + try: + sql = "SELECT COUNT(*) FROM oaiHARVEST" + res = run_sql(sql) + return res[0][0] + except StandardError, e: + return "" + +def get_update_status(): + """Returns a table showing a list of all rows and their LastUpdate status""" + try: + sql = "SELECT name,lastrun FROM oaiHARVEST ORDER BY lastrun desc" + res = run_sql(sql) + return res + except StandardError, e: + return "" + +def get_next_schedule(): + """Returns the next scheduled oaiharvestrun tasks""" + try: + sql = "SELECT runtime,status FROM schTASK WHERE proc='oaiharvest' AND runtime > now() ORDER by runtime LIMIT 1" + res = run_sql(sql) + if len(res) > 0: + return res[0] + else: + return ("", "") + except StandardError, e: + return ("", "") + +################################################################## +###### Here the functions related to Holding Pen operations ###### +################################################################## + def get_holdingpen_entries(start = 0, limit = 0): query = "SELECT oai_id, changeset_date, update_id FROM bibHOLDINGPEN ORDER BY changeset_date" if limit > 0 or start > 0: query += " LIMIT " + str(start) + "," + str(limit) return run_sql(query) def get_holdingpen_entry(oai_id, date_inserted): query = "SELECT changeset_xml FROM bibHOLDINGPEN WHERE changeset_date = %s AND oai_id = %s" return run_sql(query, (str(date_inserted), str(oai_id)))[0][0] def delete_holdingpen_entry(hpupdate_id): query = "DELETE FROM bibHOLDINGPEN WHERE changeset_id=%s" run_sql(query, (hpupdate_id, )) def get_holdingpen_day_fragment(year, month, day, limit, start, filter_key): """ returning the entries form the a particular day """ filterSql = "" if filter_key != "": filterSql = " and oai_id like '%%%s%%' " % (filter_key, ) query = "SELECT oai_id, changeset_date, changeset_id FROM bibHOLDINGPEN WHERE changeset_date > '%i-%i-%i 00:00:00' and changeset_date <= '%i-%i-%i 23:59:59' %s ORDER BY changeset_date LIMIT %i, %i" % (year, month, day, year, month, day, filterSql, start, limit) query_results = run_sql(query) return query_results def get_holdingpen_day_size(year, month, day, filter_key): """ returning the entries form the a particular day """ filterSql = "" if filter_key != "": filterSql = " and oai_id like '%%%s%%' " % (filter_key, ) query = "SELECT count(*) FROM bibHOLDINGPEN WHERE year(changeset_date) = '%i' and month(changeset_date) = '%i' and day(changeset_date) = '%i' %s" % (year, month, day, filterSql) query_results = run_sql(query) return int(query_results[0][0]) def get_holdingpen_month(year, month, filter_key): """ Returning the statistics about the entries form a particular month """ filterSql = "" if filter_key != "": filterSql = " and oai_id like '%%%s%%' " % (filter_key, ) query = "select day(changeset_date), count(*) from bibHOLDINGPEN where year(changeset_date) = '%i' and month(changeset_date) = '%i' %s group by day(changeset_date)" % (year, month, filterSql) return run_sql(query) def get_holdingpen_year(year, filter_key): """ Returning the statistics about the entries from a particular year """ filterSql = "" if filter_key != "": filterSql = " and oai_id like '%%%s%%' " % (filter_key, ) query = "select month(changeset_date), count(*) from bibHOLDINGPEN where year(changeset_date) = '%i' %s group by month(changeset_date)" % (year, filterSql) return run_sql(query) def get_holdingpen_years(filter_key): """ Returning the particular years of records present in the holding pen """ filterSql = "" if filter_key != "": filterSql = " where oai_id like '%%%s%%' " % (filter_key, ) query = "select year(changeset_date), count(*) changeset_date from bibHOLDINGPEN %s group by year(changeset_date)" % (filterSql,) results = run_sql(query) return results def get_holdingpen_entry_details(hpupdate_id): """ Returning the detials of the Holding Pen entry, the result of this function is a tuple: (oai_id, record_id, date_inserted, content) """ query = "SELECT oai_id, id_bibrec, changeset_date, changeset_xml FROM bibHOLDINGPEN WHERE changeset_id=%s" return run_sql(query, (hpupdate_id,))[0] diff --git a/invenio/legacy/oaiharvest/utils.py b/invenio/legacy/oaiharvest/utils.py new file mode 100644 index 000000000..84df6a235 --- /dev/null +++ b/invenio/legacy/oaiharvest/utils.py @@ -0,0 +1,631 @@ +# -*- coding: utf-8 -*- +## +## This file is part of Invenio. +## Copyright (C) 2009, 2010, 2011 CERN. +## +## Invenio is free software; you can redistribute it and/or +## modify it under the terms of the GNU General Public License as +## published by the Free Software Foundation; either version 2 of the +## License, or (at your option) any later version. +## +## Invenio is distributed in the hope that it will be useful, but +## WITHOUT ANY WARRANTY; without even the implied warranty of +## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +## General Public License for more details. +## +## You should have received a copy of the GNU General Public License +## along with Invenio; if not, write to the Free Software Foundation, Inc., +## 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA. + +""" +OAI Harvest utility functions. +""" + +__revision__ = "$Id$" + + +import os +import re +import time +import urlparse +import calendar +from invenio.errorlib import register_exception +from invenio import oai_harvest_getter + +from invenio.config import (CFG_ETCDIR, CFG_SITE_URL, + CFG_SITE_ADMIN_EMAIL + ) +from invenio.legacy.bibrecord import (record_get_field_instances, + record_modify_subfield, + field_xml_output + ) +from invenio.shellutils import run_shell_command +from invenio.utils.text import translate_latex2unicode +from invenio.oai_harvest_dblayer import update_lastrun +from invenio.legacy.bibcatalog.api import bibcatalog_system +from invenio.legacy.bibsched.bibtask import write_message + +## precompile some often-used regexp for speed reasons: +REGEXP_OAI_ID = re.compile("<identifier.*?>(.*?)<\/identifier>", re.DOTALL) +REGEXP_RECORD = re.compile("<record.*?>(.*?)</record>", re.DOTALL) +REGEXP_REFS = re.compile("<record.*?>.*?<controlfield .*?>.*?</controlfield>(.*?)</record>", re.DOTALL) +REGEXP_AUTHLIST = re.compile("<collaborationauthorlist.*?</collaborationauthorlist>", re.DOTALL) +CFG_OAI_AUTHORLIST_POSTMODE_STYLESHEET = "%s/bibconvert/config/%s" % (CFG_ETCDIR, "authorlist2marcxml.xsl") + + +def get_nb_records_in_file(filename): + """ + Return number of record in FILENAME that is either harvested or converted + file. Useful for statistics. + """ + try: + nb = open(filename, 'r').read().count("</record>") + except IOError: + nb = 0 # file not exists and such + except: + nb = -1 + return nb + + +def get_nb_records_in_string(string): + """ + Return number of record in FILENAME that is either harvested or converted + file. Useful for statistics. + """ + nb = string.count("</record>") + return nb + + +def collect_identifiers(harvested_file_list): + """Collects all OAI PMH identifiers from each file in the list + and adds them to a list of identifiers per file. + + @param harvested_file_list: list of filepaths to harvested files + + @return list of lists, containing each files' identifier list""" + result = [] + for harvested_file in harvested_file_list: + try: + fd_active = open(harvested_file) + except IOError as e: + raise e + data = fd_active.read() + fd_active.close() + result.append(REGEXP_OAI_ID.findall(data)) + return result + + +def remove_duplicates(harvested_file_list): + """ + Go through a list of harvested files and remove any duplicate records. + Usually happens when records are cross-listed across OAI sets. + + Saves a backup of original harvested file in: filename~ + """ + harvested_identifiers = [] + for harvested_file in harvested_file_list: + # Firstly, rename original file to temporary name + try: + os.rename(harvested_file, "%s~" % (harvested_file,)) + except IOError: + continue + # Secondly, open files for writing and reading + original_harvested_file = None + try: + try: + original_harvested_file = open("%s~" % (harvested_file,)) + data = original_harvested_file.read() + except IOError: + continue + finally: + if original_harvested_file: + original_harvested_file.close() + + if '<ListRecords>' not in data: + # We do not need to de-duplicate in non-ListRecords requests + continue + + updated_file_content = [] + # Get and write OAI-PMH XML header data to updated file + header_index_end = data.find("<ListRecords>") + len("<ListRecords>") + updated_file_content.append("%s" % (data[:header_index_end],)) + + # By checking the OAI ID we write all records not written previously (in any file) + harvested_records = REGEXP_RECORD.findall(data) + for record in harvested_records: + oai_identifier = REGEXP_OAI_ID.search(record) + if oai_identifier and oai_identifier.group(1) not in harvested_identifiers: + updated_file_content.append("<record>%s</record>" % (record,)) + harvested_identifiers.append(oai_identifier.group(1)) + updated_file_content.append("</ListRecords>\n</OAI-PMH>") + updated_harvested_file = None + try: + try: + updated_harvested_file = open(harvested_file, 'w') + updated_harvested_file.write("\n".join(updated_file_content)) + except IOError: + continue + finally: + if updated_harvested_file: + updated_harvested_file.close() + + +def add_timestamp_and_timelag(timestamp, + timelag): + """ Adds a time lag in seconds to a given date (timestamp). + Returns the resulting date. """ + # remove any trailing .00 in timestamp: + timestamp = re.sub(r'\.[0-9]+$', '', timestamp) + # first convert timestamp to Unix epoch seconds: + timestamp_seconds = calendar.timegm(time.strptime(timestamp, + "%Y-%m-%d %H:%M:%S")) + # now add them: + result_seconds = timestamp_seconds + timelag + result = time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime(result_seconds)) + return result + + +def find_matching_files(basedir, filetypes): + """ + This functions tries to find all files matching given filetypes by looking at + all the files and filenames in the given directory, including subdirectories. + + @param basedir: full path to base directory to search in + @type basedir: string + + @param filetypes: list of filetypes, extensions + @type filetypes: list + + @return: exitcode and any error messages as: (exitcode, err_msg) + @rtype: tuple + """ + files_list = [] + for dirpath, dummy0, filenames in os.walk(basedir): + for filename in filenames: + full_path = os.path.join(dirpath, filename) + dummy1, cmd_out, dummy2 = run_shell_command('file %s', (full_path,)) + for filetype in filetypes: + if cmd_out.lower().find(filetype) > -1: + files_list.append(full_path) + elif filename.split('.')[-1].lower() == filetype: + files_list.append(full_path) + return files_list + + +def translate_fieldvalues_from_latex(record, tag, code='', encoding='utf-8'): + """ + Given a record and field tag, this function will modify the record by + translating the subfield values of found fields from LaTeX to chosen + encoding for all the subfields with given code (or all if no code is given). + + @param record: record to modify, in BibRec style structure + @type record: dict + + @param tag: tag of fields to modify + @type tag: string + + @param code: restrict the translation to a given subfield code + @type code: string + + @param encoding: scharacter encoding for the new value. Defaults to UTF-8. + @type encoding: string + """ + field_list = record_get_field_instances(record, tag) + for field in field_list: + subfields = field[0] + subfield_index = 0 + for subfield_code, subfield_value in subfields: + if code == '' or subfield_code == code: + newvalue = translate_latex2unicode(subfield_value).encode(encoding) + record_modify_subfield(record, tag, subfield_code, newvalue, + subfield_index, field_position_global=field[4]) + subfield_index += 1 + + +def compare_timestamps_with_tolerance(timestamp1, + timestamp2, + tolerance=0): + """Compare two timestamps TIMESTAMP1 and TIMESTAMP2, of the form + '2005-03-31 17:37:26'. Optionally receives a TOLERANCE argument + (in seconds). Return -1 if TIMESTAMP1 is less than TIMESTAMP2 + minus TOLERANCE, 0 if they are equal within TOLERANCE limit, + and 1 if TIMESTAMP1 is greater than TIMESTAMP2 plus TOLERANCE. + """ + # remove any trailing .00 in timestamps: + timestamp1 = re.sub(r'\.[0-9]+$', '', timestamp1) + timestamp2 = re.sub(r'\.[0-9]+$', '', timestamp2) + # first convert timestamps to Unix epoch seconds: + timestamp1_seconds = calendar.timegm(time.strptime(timestamp1, + "%Y-%m-%d %H:%M:%S")) + timestamp2_seconds = calendar.timegm(time.strptime(timestamp2, + "%Y-%m-%d %H:%M:%S")) + # now compare them: + if timestamp1_seconds < timestamp2_seconds - tolerance: + return -1 + elif timestamp1_seconds > timestamp2_seconds + tolerance: + return 1 + else: + return 0 + + +def generate_harvest_report(repository, harvested_identifier_list, + uploaded_task_ids=[], active_files_list=[], + task_specific_name="", current_task_id=-1, + manual_harvest=False, error_happened=False): + """ + Returns an applicable subject-line + text to send via e-mail or add to + a ticket about the harvesting results. + """ + # Post-harvest reporting + current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + if task_specific_name: + fullname = repository.name + task_specific_name + else: + fullname = repository.name + + if manual_harvest: + # One-shot manual harvest + harvesting_prefix = "Manual harvest" + else: + # Automatic + harvesting_prefix = "Periodical harvesting" + + subject = "%s of '%s' finished %s" % (harvesting_prefix, fullname, current_time) + if error_happened: + subject += " with errors" + text = \ + """ + %(harvesting)s completed *with errors* from source named '%(name)s' (%(sourceurl)s) at %(ctime)s. + In total %(total)d record(s) were harvested. + + See harvest task log here for more information on the problems: + %(harvesttasklink)s + + Please forward this mail to administrators. <%(admin_mail)s> + + ---------- + Extra Info + ---------- + + Harvest history for this source: + %(siteurl)s/admin/oaiharvest/oaiharvestadmin.py/viewhistory?ln=no&oai_src_id=%(oai_src_id)s + + See state of uploaded records: + %(uploadtasklinks)s + + List of OAI IDs harvested: + %(ids)s + + Records ready to upload are located here: + %(files)s + """ \ + % { + 'harvesting': harvesting_prefix, + 'admin_mail': CFG_SITE_ADMIN_EMAIL, + 'name': fullname, + 'sourceurl': repository.baseurl, + 'ctime': current_time, + 'total': sum([len(ids) for ids in harvested_identifier_list]), + 'files': '\n'.join(active_files_list), + 'ids': '\n'.join([oaiid for ids in harvested_identifier_list for oaiid in ids]), + 'siteurl': CFG_SITE_URL, + 'oai_src_id': repository.id, + 'harvesttasklink': "%s/admin/oaiharvest/oaiharvestadmin.py/viewtasklogs?ln=no&task_id=%s" + % (CFG_SITE_URL, current_task_id), + 'uploadtasklinks': '\n'.join(["%s/admin/oaiharvest/oaiharvestadmin.py/viewtasklogs?ln=no&task_id=%s" + % (CFG_SITE_URL, task_id) for task_id in uploaded_task_ids]) or "None", + } + else: + text = \ + """ + %(harvesting)s completed successfully from source named '%(name)s' (%(sourceurl)s) at %(ctime)s. + In total %(total)d record(s) were harvested. + + See harvest history here: + %(siteurl)s/admin/oaiharvest/oaiharvestadmin.py/viewhistory?ln=no&oai_src_id=%(oai_src_id)s + + See state of uploaded records: + %(uploadtasklinks)s + + List of OAI IDs harvested: + %(ids)s + + Records ready to upload are located here: + %(files)s + """ \ + % { + 'harvesting': harvesting_prefix, + 'name': fullname, + 'sourceurl': repository['baseurl'], + 'ctime': current_time, + 'total': sum([len(ids) for ids in harvested_identifier_list]), + 'files': '\n'.join(active_files_list), + 'ids': '\n'.join([oaiid for ids in harvested_identifier_list for oaiid in ids]), + 'siteurl': CFG_SITE_URL, + 'oai_src_id': repository['id'], + 'uploadtasklinks': '\n'.join(["%s/admin/oaiharvest/oaiharvestadmin.py/viewtasklogs?ln=no&task_id=%s" \ + % (CFG_SITE_URL, task_id) for task_id in uploaded_task_ids]) or "None",\ + } + if not manual_harvest: + text += "Categories harvested from: \n%s\n" % (repository.setspecs or "None",) + return subject, text + + +def record_extraction_from_file(path): + """ + get an harvested file, and transform each record as if it was another independant + harvested document. + @param path: is the path of the file harvested + @return : return a table of records encapsulated with markup of the document + designed by path + """ + + #Will contains all the records + list_of_records = [] + + #will contains the header of the file ie: all lines before the first record + header = "" + + file = open(path,'r+') + + #Exctraction of the header + temporary_string = file.readline() + while not temporary_string.startswith("<record>"): + header += temporary_string + temporary_string = file.readline() + + #Exctraction of the records + + temporary_record = temporary_string + temporary_string = file.readline() + + while not temporary_string.startswith("</ListRecords>"): + if temporary_string.startswith("<record>"): + list_of_records.append(temporary_record) + temporary_record = temporary_string + else: + temporary_record = temporary_record + temporary_string + temporary_string = file.readline() + + list_of_records.append(temporary_record) + + #will contains the footer of the file ie: all lines after the last record + + #Exctraction of the footer + + footer = temporary_string + + temporary_string = file.readline() + + while not temporary_string == "": + footer = footer + temporary_string + temporary_string = file.readline() + + file.close() + + #Reassembling of the records and the footer and header + + for i in range(0, len(list_of_records)): + list_of_records[i] = header+list_of_records[i]+footer + + return list_of_records + + +def record_extraction(xml_string): + """this function return only record + removing header and footer. Work only + in the case there is only one record + into the collection. + """ + xml_string = xml_string[xml_string.index("<record>")+8:] + + return xml_string[:xml_string.rindex("</record>")] + + +def harvest_step(repository, harvestpath, identifiers, dates): + """ + Performs the entire harvesting step. + Returns a tuple of (file_list, error_code) + """ + harvested_files_list = None + + if identifiers: + # Harvesting is done per identifier instead of server-updates + harvested_files_list = harvest_by_identifiers(repository, identifiers, harvestpath) + elif dates: + # Dates are given so we harvest "from" -> "to" dates + harvested_files_list = harvest_by_dates(repository, + harvestpath, + str(dates[0]), + str(dates[1])) + elif not dates and (repository.lastrun is None or repository.lastrun == '') and repository.frequency != 0: + # First time we harvest from this repository + harvested_files_list = harvest_by_dates(repository, harvestpath) + update_lastrun(repository.id) + + elif not dates and repository.frequency != 0: + # Just a regular update from last time it ran + ### check that update is actually needed, + ### i.e. lastrun+frequency>today + timenow = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + lastrundate = re.sub(r'\.[0-9]+$', '', + str(repository.lastrun)) # remove trailing .00 + timeinsec = int(repository.frequency) * 60 * 60 + updatedue = add_timestamp_and_timelag(lastrundate, timeinsec) + proceed = compare_timestamps_with_tolerance(updatedue, timenow) + if proceed != 1: + # update needed! + fromdate = str(repository.lastrun) + # get rid of time of the day for the moment + fromdate = fromdate.split()[0] + harvested_files_list = harvest_by_dates(repository, harvestpath, + fromdate=fromdate) + update_lastrun(repository.id) + else: + return [] # No actual error here. + + elif not dates and repository.frequency == 0: + return [] # No actual error here. + return harvested_files_list + + +def harvest_by_identifiers(repository, identifiers, harvestpath): + """ + Harvest an OAI repository by identifiers. + + Given a repository "object" (dict from DB) and a list of OAI identifiers + of records in the repository perform a OAI harvest using GetRecord for each. + + The records will be harvested into the specified filepath. + """ + harvested_files_list = [] + count = 0 + for oai_identifier in identifiers: + count += 1 + harvested_files_list.extend(oai_harvest_get(prefix=repository.metadataprefix, + baseurl=repository.baseurl, + harvestpath=harvestpath, + verb="GetRecord", + identifier=oai_identifier)) + return harvested_files_list + + +def harvest_by_dates(repository, harvestpath, fromdate=None, todate=None): + """ + Harvest an OAI repository by dates. + + Given a repository "object" (dict from DB) and from/to dates, this function will + perform an OAI harvest request for records updated between the given dates. + + If no dates are given, the repository is harvested from the beginning. + + If you set fromdate == last-run and todate == None, then the repository + will be harvested since last time (most common type). + + The records will be harvested into the specified filepath. + """ + if fromdate and todate: + dates = "from %s to %s" % (fromdate, todate) + elif fromdate: + dates = "from %s" % (fromdate,) + else: + dates = "" + + try: + file_list = oai_harvest_get(prefix=repository.metadataprefix, + baseurl=repository.baseurl, + harvestpath=harvestpath, + fro=fromdate, + until=todate, + setspecs=repository.setspecs) + except StandardError as e: + # exception already dealt with, just noting the error. + raise e + + return file_list + + +def oai_harvest_get(prefix, baseurl, harvestpath, + fro=None, until=None, setspecs=None, + user=None, password=None, cert_file=None, + key_file=None, method="POST", verb="ListRecords", + identifier=""): + """ + Retrieve OAI records from given repository, with given arguments + """ + try: + (addressing_scheme, network_location, path, dummy1, + dummy2, dummy3) = urlparse.urlparse(baseurl) + secure = (addressing_scheme == "https") + + http_param_dict = {'verb': verb, + 'metadataPrefix': prefix} + if identifier: + http_param_dict['identifier'] = identifier + if fro: + http_param_dict['from'] = fro + if until: + http_param_dict['until'] = until + + sets = None + if setspecs: + sets = [oai_set.strip() for oai_set in setspecs.split(' ')] + + harvested_files = oai_harvest_getter.harvest(network_location, path, http_param_dict, method, harvestpath, + sets, secure, user, password, cert_file, key_file) + if verb == "ListRecords": + remove_duplicates(harvested_files) + return harvested_files + except (StandardError, oai_harvest_getter.InvenioOAIRequestError) as exce: + register_exception() + raise Exception("An error occurred while harvesting from %s: %s\n" + % (baseurl, str(exce))) + + +def create_authorlist_ticket(matching_fields, identifier, queue): + """ + This function will submit a ticket generated by UNDEFINED affiliations + in extracted authors from collaboration authorlists. + + @param matching_fields: list of (tag, field_instances) for UNDEFINED nodes + @type matching_fields: list + + @param identifier: OAI identifier of record + @type identifier: string + + @param queue: the RT queue to send a ticket to + @type queue: string + + @return: return the ID of the created ticket, or None on failure + @rtype: int or None + """ + subject = "[OAI Harvest] UNDEFINED affiliations for record %s" % (identifier,) + text = """ +Harvested record with identifier %(ident)s has had its authorlist extracted and contains some UNDEFINED affiliations. + +To see the record, go here: %(baseurl)s/search?p=%(ident)s + +If the record is not there yet, try again later. It may take some time for it to load into the system. + +List of unidentified fields: +%(fields)s + """ % { + 'ident': identifier, + 'baseurl': CFG_SITE_URL, + 'fields': "\n".join([field_xml_output(field, tag) for tag, field_instances in matching_fields + for field in field_instances]) + } + return create_ticket(queue, subject, text) + + +def create_ticket(queue, subject, text=""): + """ + This function will submit a ticket using the configured BibCatalog system. + + @param queue: the ticketing queue to send a ticket to + @type queue: string + + @param subject: subject of the ticket + @type subject: string + + @param text: the main text or body of the ticket. Optional. + @type text: string + + @return: return the ID of the created ticket, or None on failure + @rtype: int or None + """ + # Initialize BibCatalog connection as default user, if possible + if bibcatalog_system is not None: + bibcatalog_response = bibcatalog_system.check_system() + else: + bibcatalog_response = "No ticket system configured" + if bibcatalog_response != "": + write_message("BibCatalog error: %s\n" % (bibcatalog_response,)) + return None + + ticketid = bibcatalog_system.ticket_submit(subject=subject, queue=queue) + if text: + comment = bibcatalog_system.ticket_comment(None, ticketid, text) + if comment is None: + write_message("Error: commenting on ticket %s failed." % (str(ticketid),)) + return ticketid \ No newline at end of file diff --git a/invenio/modules/oai_harvest/models.py b/invenio/modules/oai_harvest/models.py index 50cbdcc97..139a823ba 100644 --- a/invenio/modules/oai_harvest/models.py +++ b/invenio/modules/oai_harvest/models.py @@ -1,105 +1,116 @@ # -*- coding: utf-8 -*- # ## This file is part of Invenio. ## Copyright (C) 2011, 2012, 2013 CERN. ## ## Invenio is free software; you can redistribute it and/or ## modify it under the terms of the GNU General Public License as ## published by the Free Software Foundation; either version 2 of the ## License, or (at your option) any later version. ## ## Invenio is distributed in the hope that it will be useful, but ## WITHOUT ANY WARRANTY; without even the implied warranty of ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU ## General Public License for more details. ## ## You should have received a copy of the GNU General Public License ## along with Invenio; if not, write to the Free Software Foundation, Inc., ## 59 Temple Place, Suite 330, Boston, MA 02D111-1307, USA. """ Oai harvest database models. """ # General imports. from invenio.ext.sqlalchemy import db # Create your models here. #from websearch_model import Collection from invenio.modules.record_editor.models import Bibrec from invenio.modules.scheduler.models import SchTASK + class OaiHARVEST(db.Model): """Represents a OaiHARVEST record.""" + __tablename__ = 'oaiHARVEST' + id = db.Column(db.MediumInteger(9, unsigned=True), nullable=False, - primary_key=True, autoincrement=True) + primary_key=True, autoincrement=True) baseurl = db.Column(db.String(255), nullable=False, server_default='') metadataprefix = db.Column(db.String(255), nullable=False, - server_default='oai_dc') + server_default='oai_dc') arguments = db.Column(db.LargeBinary, nullable=True) comment = db.Column(db.Text, nullable=True) name = db.Column(db.String(255), nullable=False) lastrun = db.Column(db.DateTime, nullable=True) frequency = db.Column(db.MediumInteger(12), nullable=False, - server_default='0') + server_default='0') postprocess = db.Column(db.String(20), nullable=False, - server_default='h') + server_default='h') setspecs = db.Column(db.Text, nullable=False) + @classmethod + def get(cls, *criteria, **filters): + """ A wrapper for the filter and filter_by functions of sqlalchemy. + Define a dict with which columns should be filtered by which values. + + look up also sqalchemy BaseQuery's filter and filter_by documentation + """ + return cls.query.filter(*criteria).filter_by(**filters) + class OaiREPOSITORY(db.Model): """Represents a OaiREPOSITORY record.""" __tablename__ = 'oaiREPOSITORY' id = db.Column(db.MediumInteger(9, unsigned=True), nullable=False, - primary_key=True, autoincrement=True) + primary_key=True, autoincrement=True) setName = db.Column(db.String(255), nullable=False, - server_default='') + server_default='') setSpec = db.Column(db.String(255), nullable=False, - server_default='') + server_default='') setCollection = db.Column(db.String(255), nullable=False, - server_default='') + server_default='') setDescription = db.Column(db.Text, nullable=False) setDefinition = db.Column(db.Text, nullable=False) setRecList = db.Column(db.iLargeBinary, nullable=True) last_updated = db.Column(db.DateTime, nullable=False, server_default='1970-01-01 00:00:00') p1 = db.Column(db.Text, nullable=False) f1 = db.Column(db.Text, nullable=False) m1 = db.Column(db.Text, nullable=False) p2 = db.Column(db.Text, nullable=False) f2 = db.Column(db.Text, nullable=False) m2 = db.Column(db.Text, nullable=False) p3 = db.Column(db.Text, nullable=False) f3 = db.Column(db.Text, nullable=False) m3 = db.Column(db.Text, nullable=False) class OaiHARVESTLOG(db.Model): """Represents a OaiHARVESTLOG record.""" __tablename__ = 'oaiHARVESTLOG' id_oaiHARVEST = db.Column(db.MediumInteger(9, unsigned=True), db.ForeignKey(OaiHARVEST.id), nullable=False) id_bibrec = db.Column(db.MediumInteger(8, unsigned=True), db.ForeignKey(Bibrec.id), nullable=False, server_default='0') bibupload_task_id = db.Column(db.Integer(11), db.ForeignKey(SchTASK.id), - nullable=False, server_default='0', - primary_key=True) + nullable=False, server_default='0', + primary_key=True) oai_id = db.Column(db.String(40), nullable=False, server_default='', - primary_key=True) + primary_key=True) date_harvested = db.Column(db.DateTime, nullable=False, - server_default='1900-01-01 00:00:00', - primary_key=True) + server_default='1900-01-01 00:00:00', + primary_key=True) date_inserted = db.Column(db.DateTime, nullable=False, - server_default='1900-01-01 00:00:00') + server_default='1900-01-01 00:00:00') inserted_to_db = db.Column(db.Char(1), nullable=False, - server_default='P') + server_default='P') bibrec = db.relationship(Bibrec, backref='harvestlogs') schtask = db.relationship(SchTASK) - __all__ = ['OaiHARVEST', 'OaiREPOSITORY', 'OaiHARVESTLOG'] diff --git a/invenio/modules/workflows/api.py b/invenio/modules/workflows/api.py index 694c35a36..e02bb8362 100644 --- a/invenio/modules/workflows/api.py +++ b/invenio/modules/workflows/api.py @@ -1,280 +1,304 @@ # -*- coding: utf-8 -*- ## This file is part of Invenio. ## Copyright (C) 2012, 2013 CERN. ## ## Invenio is free software; you can redistribute it and/or ## modify it under the terms of the GNU General Public License as ## published by the Free Software Foundation; either version 2 of the ## License, or (at your option) any later version. ## ## Invenio is distributed in the hope that it will be useful, but ## WITHOUT ANY WARRANTY; without even the implied warranty of ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU ## General Public License for more details. ## ## You should have received a copy of the GNU General Public License ## along with Invenio; if not, write to the Free Software Foundation, Inc., ## 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA. """ BibWorkflow API - functions to run workflows """ -from werkzeug import cached_property -from werkzeug.utils import import_string + + +from werkzeug.utils import (import_string, + cached_property) from invenio.base.globals import cfg +from invenio.base.config import CFG_BIBWORKFLOW_WORKER + + + +from invenio.bibworkflow_utils import BibWorkflowObjectIdContainer +from invenio.modules.workflows.models import BibWorkflowObject + class InvenioBibWorkflowWorkerUnavailable(Exception): pass class WorkerBackend(object): @cached_property def worker(self): try: return import_string('invenio.modules.workflows.workers.%s:%s' % ( cfg['CFG_BIBWORKFLOW_WORKER'], cfg['CFG_BIBWORKFLOW_WORKER'])) except: from invenio.ext.logging import register_exception ## Let's report about broken plugins register_exception(alert_admin=True) def __call__(self, *args, **kwargs): if not self.worker: raise InvenioBibWorkflowWorkerUnavailable('No worker configured') return self.worker(*args, **kwargs) WORKER = WorkerBackend() + def start(workflow_name, data, **kwargs): """ Starts a workflow by given name for specified data *immediately* in the current process. The name of the workflow to start is considered unique and it is equal to the name of a file containing the workflow definition. The data passed should be a list of object(s) to run through the workflow. For example: a list of dict, JSON string, BibWorkflowObjects etc. Special custom keyword arguments can be given to the workflow engine in order to pass certain variables to the tasks in the workflow execution, such as a taskid from BibSched, the current user etc. The workflow engine object generated is returned upon completion. @param workflow_name: the workflow name to run. Ex: "my_workflow" @type workflow_name: str @param data: the workflow name to run. Ex: "my_workflow" @type data: list of objects/dicts @return: BibWorkflowEngine that ran the workflow. """ from .worker_engine import run_worker return run_worker(workflow_name, data, **kwargs) def start_delayed(workflow_name, data, **kwargs): """ Starts a *delayed* workflow by using one of the defined workers available. For example, enqueueing the execution of the workflow in a task queue such as Celery (http://celeryproject.org). Otherwise, see documentation of start(). @param workflow_name: the workflow name to run. Ex: "my_workflow" @type workflow_name: str @param data: the workflow name to run. Ex: "my_workflow" @type data: list of objects/dicts @return: BibWorkflowEngine that ran the workflow. """ + if not CFG_BIBWORKFLOW_WORKER: + raise InvenioBibWorkflowWorkerUnavailable('No worker configured') + + #The goal of this part is to avoid a SQLalchemy decoherence in case + #some one try to send a Bibworkflow object. To avoid to send the + #complete object and get SQLAlchemy error of mapping, we save the id + #into our Id container, In the celery process the object is reloaded + #from the database ! + if isinstance(data, list): + for i in range(0, len(data)): + if isinstance(data[i], BibWorkflowObject): + data[i] = BibWorkflowObjectIdContainer(data[i]) + else: + if isinstance(data, BibWorkflowObject): + data = BibWorkflowObjectIdContainer(data) return WORKER().run_worker(workflow_name, data, **kwargs) def start_by_wid(wid, **kwargs): """ Will re-start given workflow, by workflow uuid (wid), from the beginning with the original data given. Special custom keyword arguments can be given to the workflow engine in order to pass certain variables to the tasks in the workflow execution, such as a taskid from BibSched, the current user etc. @param wid: the workflow uuid. Ex: "550e8400-e29b-41d4-a716-446655440000" @type wid: string @return: BibWorkflowEngine that ran the workflow. """ from .worker_engine import restart_worker return restart_worker(wid, **kwargs) def start_by_wid_delayed(wid, **kwargs): """ Will re-start given workflow, by workflow uuid (wid), from the beginning with the original data given. Starts the workflow *delayed* by using one of the defined workers available. For example, enqueueing the execution of the workflow in a task queue such as Celery (http://celeryproject.org). Special custom keyword arguments can be given to the workflow engine in order to pass certain variables to the tasks in the workflow execution, such as a taskid from BibSched, the current user etc. @param wid: the workflow uuid. Ex: "550e8400-e29b-41d4-a716-446655440000" @type wid: string @return: BibWorkflowEngine that ran the workflow. """ return WORKER().restart_worker(wid, **kwargs) def start_by_oids(workflow_name, oids, **kwargs): """ Will start given workflow, by name, using the given list of BibWorkflowObject ids (oids) from beginning. Special custom keyword arguments can be given to the workflow engine in order to pass certain variables to the tasks in the workflow execution, such as a taskid from BibSched, the current user etc. @param workflow_name: the workflow name to run. Ex: "my_workflow" @type workflow_name: str @param oids: list of BibWorkflowObject id's to run. @type oids: list of strings/integers @return: BibWorkflowEngine that ran the workflow. """ from .models import BibWorkflowObject objects = BibWorkflowObject.query.filter(BibWorkflowObject.id.in_(list(oids))).all() - return start(workflow_name, objects, **kwargs) def start_by_oids_delayed(workflow_name, oids, **kwargs): """ Will start given workflow, by name, using the given list of BibWorkflowObject ids (oids) from beginning. Special custom keyword arguments can be given to the workflow engine in order to pass certain variables to the tasks in the workflow execution, such as a taskid from BibSched, the current user etc. Starts the workflow *delayed* by using one of the defined workers available. For example, enqueueing the execution of the workflow in a task queue such as Celery (http://celeryproject.org). @param workflow_name: the workflow name to run. Ex: "my_workflow" @type workflow_name: str @param oids: list of BibWorkflowObject id's to run. @type oids: list of strings/integers @return: BibWorkflowEngine that ran the workflow. """ from .models import BibWorkflowObject objects = BibWorkflowObject.query.filter(BibWorkflowObject.id.in_(list(oids))).all() return start_delayed(workflow_name, objects, **kwargs) def continue_oid(oid, start_point="continue_next", **kwargs): """ Continue workflow asociated with object given by object id (oid). It can start from previous, current or next task. Special custom keyword arguments can be given to the workflow engine in order to pass certain variables to the tasks in the workflow execution, such as a taskid from BibSched, the current user etc. Starts the workflow *delayed* by using one of the defined workers available. For example, enqueueing the execution of the workflow in a task queue such as Celery (http://celeryproject.org). @param oid: id of BibWorkflowObject to run. @type oid: string @param start_point: where should the workflow start from? One of: * restart_prev: will restart from the previous task * continue_next: will continue to the next task * restart_task: will restart the current task @type start_point: string @return: BibWorkflowEngine that ran the workflow """ from .worker_engine import continue_worker return continue_worker(oid, start_point, **kwargs) def continue_oid_delayed(oid, start_point="continue_next", **kwargs): """ Continue workflow associated with object given by object id (oid). It can start from previous, current or next task. Special custom keyword arguments can be given to the workflow engine in order to pass certain variables to the tasks in the workflow execution, such as a taskid from BibSched, the current user etc. Starts the workflow *delayed* by using one of the defined workers available. For example, enqueueing the execution of the workflow in a task queue such as Celery (http://celeryproject.org). @param oid: id of BibWorkflowObject to run. @type oid: string @param start_point: where should the workflow start from? One of: * restart_prev: will restart from the previous task * continue_next: will continue to the next task * restart_task: will restart the current task @type start_point: string @return: BibWorkflowEngine that ran the workflow """ return WORKER().continue_worker(oid, start_point, **kwargs) def resume_objects_in_workflow(id_workflow, start_point="continue_next", **kwargs): """ Resume workflow for any halted or failed objects from given workflow. This is a generator function and will yield every workflow created per object which needs to be resumed. To identify the original workflow containing the halted objects, the ID (or UUID) of the workflow is required. The starting point to resume the objects from can optionally be given. By default, the objects resume with their next task in the workflow. @param id_workflow: id of Workflow with objects to resume. @type id_workflow: string @param start_point: where should the workflow start from? One of: * restart_prev: will restart from the previous task * continue_next: will continue to the next task * restart_task: will restart the current task @type start_point: string @yield: BibWorkflowEngine that ran the workflow """ from .models import BibWorkflowObject from .config import CFG_OBJECT_VERSION # Resume workflow if there are objects to resume objects = BibWorkflowObject.query.filter( BibWorkflowObject.id_workflow == id_workflow, BibWorkflowObject.version == CFG_OBJECT_VERSION.HALTED ).all() for obj in objects: yield continue_oid(oid=obj.id, start_point=start_point, **kwargs) diff --git a/invenio/modules/workflows/client.py b/invenio/modules/workflows/client.py index 3b2ad55c1..393bcb94f 100644 --- a/invenio/modules/workflows/client.py +++ b/invenio/modules/workflows/client.py @@ -1,132 +1,136 @@ # -*- coding: utf-8 -*- ## This file is part of Invenio. ## Copyright (C) 2012, 2013 CERN. ## ## Invenio is free software; you can redistribute it and/or ## modify it under the terms of the GNU General Public License as ## published by the Free Software Foundation; either version 2 of the ## License, or (at your option) any later version. ## ## Invenio is distributed in the hope that it will be useful, but ## WITHOUT ANY WARRANTY; without even the implied warranty of ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU ## General Public License for more details. ## ## You should have received a copy of the GNU General Public License ## along with Invenio; if not, write to the Free Software Foundation, Inc., ## 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA. import traceback from workflow.engine import HaltProcessing from .config import CFG_OBJECT_VERSION from .config import CFG_WORKFLOW_STATUS +from .utils import InvenioWorkflowError def run_workflow(wfe, data, stop_on_halt=False, stop_on_error=False, **kwargs): """ Main function running the workflow. """ - initial_run = True + initial_run = True while True: try: if initial_run: initial_run = False wfe.process(data) # We processed the workflow. We're done. break else: wfe._unpickled = True wfe.restart('current', 'current') # We processed the restarted workflow. We're done. break except HaltProcessing as e: # Processing was halted. Lets save current object and continue. wfe.log.error("Processing halted!" + str(e)) wfe._objects[wfe.getCurrObjId()].save(CFG_OBJECT_VERSION.HALTED, wfe.getCurrTaskId(), id_workflow=wfe.uuid) wfe.save(CFG_WORKFLOW_STATUS.HALTED) wfe.setPosition(wfe.getCurrObjId() + 1, [0, 0]) if stop_on_halt: break + except InvenioWorkflowError as e: + raise e except Exception as e: # Processing generated an exception. # We print the stacktrace, save the object and continue wfe.log.error("Processing error! %r\n%s" % (e, traceback.format_exc())) # Changing counter should be moved to wfe object # together with default exception handling wfe.increase_counter_error() wfe._objects[wfe.getCurrObjId()].save(CFG_OBJECT_VERSION.HALTED, wfe.getCurrTaskId(), id_workflow=wfe.uuid) wfe.save(CFG_WORKFLOW_STATUS.ERROR) wfe.setPosition(wfe.getCurrObjId() + 1, [0, 0]) if stop_on_halt or stop_on_error: + e = InvenioWorkflowError(str(e),wfe.uuid,wfe.getCurrObjId()) raise e def continue_execution(wfe, data, restart_point="restart_task", stop_on_halt=False, stop_on_error=False, **kwargs): """ Continue execution of workflow for given object (wfe) from "restart_point". restart_point can be one of: * restart_prev: will restart from the previous task * continue_next: will continue to the next task * restart_task: will restart the current task You can use stop_on_error to raise exception's and stop the processing. Use stop_on_halt to stop processing the workflow if HaltProcessing is raised. """ wfe.log.info("Continue execution from: " + str(restart_point)) pos = data[0].get_current_task() if restart_point == "restart_prev": pos[-1] = pos[-1] - 1 wfe.setPosition(wfe.db_obj.current_object, pos) elif restart_point == "continue_next": pos[-1] = pos[-1] + 1 wfe.setPosition(wfe.db_obj.current_object, pos) else: # restart_task wfe.setPosition(wfe.db_obj.current_object, pos) wfe._unpickled = True initial_run = True wfe._objects = data while True: try: if initial_run: initial_run = False wfe.restart('current', 'current') # We processed the workflow. We're done. break else: wfe._unpickled = True wfe.restart('current', 'current') # We processed the restarted workflow. We're done. break except HaltProcessing as e: # Processing was halted. Lets save current object and continue. wfe.log.error("Processing halted!" + str(e)) wfe._objects[wfe.getCurrObjId()].save(2, wfe.getCurrTaskId()) wfe.save(CFG_WORKFLOW_STATUS.HALTED) wfe.setPosition(wfe.getCurrObjId() + 1, [0, 0]) if stop_on_halt: break except Exception as e: # Processing generated an exception. We print the stacktrace, # save the object and continue wfe.log.error("Processing error! %r\n%s" % (e, traceback.format_exc())) # Changing counter should be moved to wfe object together # with default exception handling wfe.increase_counter_error() wfe._objects[wfe.getCurrObjId()].save(2, wfe.getCurrTaskId()) wfe.save(CFG_WORKFLOW_STATUS.ERROR) wfe.setPosition(wfe.getCurrObjId() + 1, [0, 0]) if stop_on_halt or stop_on_error: raise e diff --git a/invenio/modules/workflows/engine.py b/invenio/modules/workflows/engine.py index af4139d6c..1dfb855aa 100644 --- a/invenio/modules/workflows/engine.py +++ b/invenio/modules/workflows/engine.py @@ -1,409 +1,430 @@ # -*- coding: utf-8 -*- ## This file is part of Invenio. ## Copyright (C) 2012, 2013 CERN. ## ## Invenio is free software; you can redistribute it and/or ## modify it under the terms of the GNU General Public License as ## published by the Free Software Foundation; either version 2 of the ## License, or (at your option) any later version. ## ## Invenio is distributed in the hope that it will be useful, but ## WITHOUT ANY WARRANTY; without even the implied warranty of ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU ## General Public License for more details. ## ## You should have received a copy of the GNU General Public License ## along with Invenio; if not, write to the Free Software Foundation, Inc., ## 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA. import cPickle + import sys + from datetime import datetime from uuid import uuid1 as new_uuid + + +import base64 + from workflow.engine import (GenericWorkflowEngine, ContinueNextToken, HaltProcessing, StopProcessing, JumpTokenBack, JumpTokenForward, WorkflowError) from invenio.ext.sqlalchemy import db from invenio.config import CFG_DEVEL_SITE from .models import (Workflow, BibWorkflowObject, BibWorkflowEngineLog) from .utils import dictproperty, get_workflow_definition from .config import (CFG_WORKFLOW_STATUS, CFG_OBJECT_VERSION) from .logger import (get_logger, BibWorkflowLogHandler) + + DEBUG = CFG_DEVEL_SITE > 0 class BibWorkflowEngine(GenericWorkflowEngine): def __init__(self, name=None, uuid=None, curr_obj=0, workflow_object=None, id_user=0, module_name="Unknown", **kwargs): super(BibWorkflowEngine, self).__init__() self.db_obj = None if isinstance(workflow_object, Workflow): self.db_obj = workflow_object else: # If uuid is defined we try to get the db object from DB. if uuid is not None: self.db_obj = \ Workflow.get(Workflow.uuid == uuid).first() else: uuid = new_uuid() - if self.db_obj is None: self.db_obj = Workflow(name=name, id_user=id_user, current_object=curr_obj, module_name=module_name, uuid=uuid) self._create_db_obj() db_handler_obj = BibWorkflowLogHandler(BibWorkflowEngineLog, "uuid") self.log = get_logger(logger_name="workflow.%s" % self.db_obj.uuid, db_handler_obj=db_handler_obj, obj=self) self.set_workflow_by_name(self.name) self.set_extra_data_params(**kwargs) + def get_extra_data(self): + """ + Main method to retrieve data saved to the object. + """ + return cPickle.loads(base64.b64decode(self.db_obj._extra_data)) + + def set_extra_data(self, value): + """ + Main method to update data saved to the object. + """ + self.db_obj._extra_data = base64.b64encode(cPickle.dumps(value)) + def extra_data_get(self, key): - if key not in self.db_obj.extra_data.keys(): - raise KeyError - return self.db_obj.extra_data[key] + if key not in self.db_obj.get_extra_data(): + raise KeyError("%s not in extra_data" % (key,)) + return self.db_obj.get_extra_data()[key] def extra_data_set(self, key, value): - self.db_obj.extra_data[key] = value + tmp = self.db_obj.get_extra_data() + tmp[key] = value + self.db_obj.set_extra_data(tmp) extra_data = dictproperty(fget=extra_data_get, fset=extra_data_set, doc="Sets up property") del extra_data_get, extra_data_set @property def counter_object(self): return self.db_obj.counter_object @property def uuid(self): return self.db_obj.uuid @property def id_user(self): return self.db_obj.id_user @property def module_name(self): return self.db_obj.module_name @property def name(self): return self.db_obj.name @property def status(self): return self.db_obj.status def __getstate__(self): if not self._picklable_safe: raise cPickle.PickleError("The instance of the workflow engine " "cannot be serialized, " "because it was constructed with " "custom, user-supplied callbacks. " "Either use PickableWorkflowEngine or " "provide your own __getstate__ method.") state = self.__dict__.copy() del state['log'] return state def __setstate__(self, state): if len(self._objects) < self._i[0]: raise cPickle.PickleError("The workflow instance " "inconsistent state, " "too few objects") db_handler_obj = BibWorkflowLogHandler(BibWorkflowEngineLog, "uuid") state['log'] = get_logger(logger_name="workflow.%s" % state['uuid'], db_handler_obj=db_handler_obj, obj=self) self.__dict__ = state def __repr__(self): return "<BibWorkflow_engine(%s)>" % (self.db_obj.name,) def __str__(self, log=False): return """------------------------------- BibWorkflowEngine ------------------------------- %s ------------------------------- """ % (self.db_obj.__str__(),) @staticmethod def before_processing(objects, self): """ Executed before processing the workflow. """ # 1. Save workflow (ourselves). if not self.db_obj.uuid: self.save() self.set_counter_initial(len(objects)) self.log.info("Workflow has been started") # 2. We want to save all the objects as version 0. for obj in objects: same_workflow = \ obj.id_workflow and \ obj.id_workflow == self.db_obj.uuid if obj.id and same_workflow: # If object exists and we are running the same workflow, # do nothing obj.log.info("object saving process : was already existing") continue # Set the current workflow id in the object if obj.version == CFG_OBJECT_VERSION.INITIAL \ and obj.id_workflow is not None: obj.log.info("object saving process : was already existing") pass else: obj.id_workflow = self.uuid obj.save(obj.version) GenericWorkflowEngine.before_processing(objects, self) @staticmethod def after_processing(objects, self): self._i = [-1, [0]] if self.has_completed(): self.save(CFG_WORKFLOW_STATUS.COMPLETED) else: self.save(CFG_WORKFLOW_STATUS.FINISHED) def _create_db_obj(self): db.session.add(self.db_obj) db.session.commit() self.log.info("Workflow saved to db as new object.") def _update_db(self): db.session.commit() self.log.info("Workflow saved to db.") def has_completed(self): """ Returns True of workflow is fully completed meaning that all associated objects are in FINAL state. """ number_of_objects = BibWorkflowObject.query.filter( BibWorkflowObject.id_workflow == self.uuid, BibWorkflowObject.version.in_([CFG_OBJECT_VERSION.HALTED, CFG_OBJECT_VERSION.RUNNING]) ).count() return number_of_objects == 0 def save(self, status=CFG_WORKFLOW_STATUS.NEW): """ Save the workflow instance to database. Just storing the necessary data. No serialization (!). Status: 0 - new, 1 - running, 2 - halted, 3 - error, 4 - finished """ if not self.db_obj.uuid: # We do not have an ID, # so we need to add ourselves (first execution). self._create_db_obj() else: # This workflow continues a previous execution. if status in (CFG_WORKFLOW_STATUS.FINISHED, CFG_WORKFLOW_STATUS.HALTED): self.db_obj.current_object = 0 self.db_obj.modified = datetime.now() self.db_obj.status = status self._update_db() def process(self, objects): super(BibWorkflowEngine, self).process(objects) def restart(self, obj, task): """Restart the workflow engine after it was deserialized - """ self.log.info("Restarting workflow from %s object and %s task" % (str(obj), str(task),)) # set the point from which to start processing if obj == 'prev': # start with the previous object self._i[0] -= 2 #TODO: check if there is any object there elif obj == 'current': # continue with the current object self._i[0] -= 1 elif obj == 'next': pass else: raise Exception('Unknown start point for object: %s' % obj) # set the task that will be executed first if task == 'prev': # the previous self._i[1][-1] -= 1 elif task == 'current': # restart the task again self._i[1][-1] -= 0 elif task == 'next': # continue with the next task self._i[1][-1] += 1 else: raise Exception('Unknown start pointfor task: %s' % obj) self.process(self._objects) self._unpickled = False @staticmethod def processing_factory(objects, self): """Default processing factory extended with saving objects before succesful processing. Default processing factory, will process objects in order @var objects: list of objects (passed in by self.process()) @keyword cls: engine object itself, because this method may be implemented by the standalone function, we pass the self also as a cls argument As the WFE proceeds, it increments the internal counter, the first position is the number of the element. This pointer increases before the object is taken 2nd pos is reserved for the array that points to the task position. The number there points to the task that is currently executed; when error happens, it will be there unchanged. The pointer is updated after the task finished running. """ self.before_processing(objects, self) i = self._i # negative index not allowed, -1 is special - while i[0] < len(objects) - 1 and i[0] >= -1: + while len(objects) - 1 > i[0] >= -1: i[0] += 1 obj = objects[i[0]] obj.log.info("Object is selected for processing") callbacks = self.callback_chooser(obj, self) if callbacks: try: self.run_callbacks(callbacks, objects, obj) i[1] = [0] # reset the callbacks pointer except StopProcessing: if DEBUG: self.log.debug("Processing was stopped: '%s' " "(object: %s)" % (str(callbacks), repr(obj))) obj.log.debug("Processing has stopped") break except JumpTokenBack, step: if step.args[0] > 0: raise WorkflowError("JumpTokenBack cannot" " be positive number") if DEBUG: self.log.debug('Warning, we go back [%s] objects' % step.args[0]) obj.log.debug("Object preempted") i[0] = max(-1, i[0] - 1 + step.args[0]) i[1] = [0] # reset the callbacks pointer except JumpTokenForward, step: if step.args[0] < 0: raise WorkflowError("JumpTokenForward cannot" " be negative number") if DEBUG: self.log.debug('We skip [%s] objects' % step.args[0]) obj.log.debug("Object preempted") i[0] = min(len(objects), i[0] - 1 + step.args[0]) i[1] = [0] # reset the callbacks pointer except ContinueNextToken: if DEBUG: self.log.debug('Stop processing for this object, ' 'continue with next') obj.log.debug("Object preempted") i[1] = [0] # reset the callbacks pointer continue except HaltProcessing: self.increase_counter_halted() extra_data = obj.get_extra_data() extra_data['redis_search']['halt_processing'] = self.getCurrTaskName() obj.set_extra_data(extra_data) if DEBUG: self.log.info('Processing was halted at step: %s' % i) # reraise the exception, #this is the only case when a WFE can be completely # stopped obj.log.info("Object proccesing is halted") raise - except Exception: self.log.info("Unexpected error: %s", sys.exc_info()[0]) obj.log.error("Something terribly wrong" " happend to this object") extra_data = obj.get_extra_data() extra_data['redis_search']['error'] = self.getCurrTaskName() obj.set_extra_data(extra_data) raise # We save the object once it is fully run through obj.save(CFG_OBJECT_VERSION.FINAL) obj.log.info("Object proccesing is finished") self.increase_counter_finished() self.log.info("Done saving object: %i" % (obj.id, )) self.after_processing(objects, self) def getCurrTaskName(self): return self._callbacks['*'][0][self.getCurrTaskId()[-1]].func_name def execute_callback(self, callback, obj): """Executes the callback - override this method to implement logging""" obj.data = obj.get_data() obj.extra_data = obj.get_extra_data() + self.extra_data = self.get_extra_data() try: callback(obj, self) finally: + self.set_extra_data(self.extra_data) obj.set_data(obj.data) obj.set_extra_data(obj.extra_data) def halt(self, msg): """Halt the workflow (stop also any parent wfe)""" self.log.debug("Processing halted at task %s with message: %s" % (self.getCurrTaskId(), msg, )) raise HaltProcessing("Processing halted at task %s with message: %s" % (self.getCurrTaskId(), msg, )) def set_counter_initial(self, obj_count): self.db_obj.counter_initial = obj_count self.db_obj.counter_halted = 0 self.db_obj.counter_error = 0 self.db_obj.counter_finished = 0 def increase_counter_halted(self): self.db_obj.counter_halted += 1 def increase_counter_error(self): self.db_obj.counter_error += 1 def increase_counter_finished(self): self.db_obj.counter_finished += 1 def set_workflow_by_name(self, workflow_name): workflow = get_workflow_definition(workflow_name) self.workflow_definition = workflow self.setWorkflow(self.workflow_definition.workflow) def set_extra_data_params(self, **kwargs): for key, value in kwargs.iteritems(): self.extra_data[key] = value diff --git a/invenio/modules/workflows/models.py b/invenio/modules/workflows/models.py index c580131b4..d2504d2c4 100644 --- a/invenio/modules/workflows/models.py +++ b/invenio/modules/workflows/models.py @@ -1,517 +1,507 @@ # -*- coding: utf-8 -*- ## This file is part of Invenio. ## Copyright (C) 2012, 2013 CERN. ## ## Invenio is free software; you can redistribute it and/or ## modify it under the terms of the GNU General Public License as ## published by the Free Software Foundation; either version 2 of the ## License, or (at your option) any later version. ## ## Invenio is distributed in the hope that it will be useful, but ## WITHOUT ANY WARRANTY; without even the implied warranty of ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU ## General Public License for more details. ## ## You should have received a copy of the GNU General Public License ## along with Invenio; if not, write to the Free Software Foundation, Inc., ## 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA. import os import tempfile import cPickle import base64 import logging from datetime import datetime from sqlalchemy import desc from sqlalchemy.orm.exc import NoResultFound from invenio.ext.sqlalchemy import db from invenio.base.globals import cfg from .config import CFG_OBJECT_VERSION from .utils import redis_create_search_entry + from .logger import (get_logger, BibWorkflowLogHandler) def get_default_data(): """ Returns the base64 representation of the data default value """ data_default = {} return base64.b64encode(cPickle.dumps(data_default)) def get_default_extra_data(): """ Returns the base64 representation of the extra_data default value """ extra_data_default = {"tasks_results": {}, "owner": {}, "task_counter": {}, "error_msg": "", "last_task_name": "", "latest_object": -1, "widget": None, "redis_search": {}} return base64.b64encode(cPickle.dumps(extra_data_default)) class Workflow(db.Model): + __tablename__ = "bwlWORKFLOW" + uuid = db.Column(db.String(36), primary_key=True, nullable=False) + name = db.Column(db.String(255), default="Default workflow", nullable=False) created = db.Column(db.DateTime, default=datetime.now, nullable=False) modified = db.Column(db.DateTime, default=datetime.now, onupdate=datetime.now, nullable=False) id_user = db.Column(db.Integer, default=0, nullable=False) - extra_data = db.Column(db.MutableDict.as_mutable(db.PickleType), - nullable=False, default={}) + _extra_data = db.Column(db.LargeBinary, nullable=False, default=get_default_extra_data()) status = db.Column(db.Integer, default=0, nullable=False) current_object = db.Column(db.Integer, default="0", nullable=False) objects = db.relationship("BibWorkflowObject", backref="bwlWORKFLOW") counter_initial = db.Column(db.Integer, default=0, nullable=False) counter_halted = db.Column(db.Integer, default=0, nullable=False) counter_error = db.Column(db.Integer, default=0, nullable=False) counter_finished = db.Column(db.Integer, default=0, nullable=False) module_name = db.Column(db.String(64), nullable=False) + def __repr__(self): return "<Workflow(name: %s, module: %s, cre: %s, mod: %s," \ "id_user: %s, status: %s)>" % \ (str(self.name), str(self.module_name), str(self.created), str(self.modified), str(self.id_user), str(self.status)) def __str__(self): return """Workflow: Uuid: %s Name: %s User id: %s Module name: %s Created: %s Modified: %s Status: %s Current object: %s Counters: initial=%s, halted=%s, error=%s, finished=%s Extra data: %s""" % (str(self.uuid), str(self.name), str(self.id_user), str(self.module_name), str(self.created), str(self.modified), str(self.status), str(self.current_object), str(self.counter_initial), str(self.counter_halted), str(self.counter_error), str(self.counter_finished), str(self._extra_data),) @classmethod def get(cls, *criteria, **filters): """ A wrapper for the filter and filter_by functions of sqlalchemy. Define a dict with which columns should be filtered by which values. e.g. Workflow.get(uuid=uuid) Workflow.get(Workflow.uuid != uuid) The function supports also "hybrid" arguments. e.g. Workflow.get(Workflow.module_name != 'i_hate_this_module', user_id=user_id) look up also sqalchemy BaseQuery's filter and filter_by documentation """ return cls.query.filter(*criteria).filter_by(**filters) @classmethod def get_status(cls, uuid=None): """ Returns the status of the workflow """ return cls.get(Workflow.uuid == uuid).one().status @classmethod def get_most_recent(cls, *criteria, **filters): """ Returns the most recently modified workflow. """ most_recent = cls.get(*criteria, **filters).\ order_by(desc(Workflow.modified)).first() if most_recent is None: raise NoResultFound else: return most_recent @classmethod def get_objects(cls, uuid=None): """ Returns the objects of the workflow """ return cls.get(Workflow.uuid == uuid).one().objects - @classmethod - def get_extra_data(cls, user_id=0, uuid=None, key=None, getter=None): + def get_extra_data(self, user_id=0, uuid=None, key=None, getter=None): """Returns a json of the column extra_data or if any of the other arguments are defined, a specific value. You can define either the key or the getter function. @param key: the key to access the desirable value @param getter: a callable that takes a dict as param and returns a value """ - extra_data = cls.get(Workflow.id_user == user_id, - Workflow.uuid == uuid).one().extra_data + extra_data = Workflow.get(Workflow.id_user == self.id_user, + Workflow.uuid == self.uuid).one()._extra_data + extra_data = cPickle.loads(base64.b64decode(extra_data)) if key is not None: return extra_data[key] elif callable(getter): return getter(extra_data) - @classmethod - def set_extra_data(cls, user_id=0, uuid=None, + def set_extra_data(self, user_id=0, uuid=None, key=None, value=None, setter=None): """Modifies the json of the column extra_data or if any of the other arguments are defined, a specific value. You can define either the key, value or the setter function. @param key: the key to access the desirable value @param value: the new value @param setter: a callable that takes a dict as param and modifies it """ - extra_data = cls.get(Workflow.id_user == user_id, - Workflow.uuid == uuid).one().extra_data - + extra_data = Workflow.get(Workflow.id_user == user_id, + Workflow.uuid == uuid).one()._extra_data + extra_data = cPickle.loads(base64.b64decode(extra_data)) if key is not None and value is not None: extra_data[key] = value elif callable(setter): setter(extra_data) - cls.get(Workflow.uuid == uuid).update({'extra_data': extra_data}) + Workflow.get(Workflow.uuid == self.uuid).update({'_extra_data': base64.b64encode(cPickle.dumps(extra_data))}) @classmethod def delete(cls, uuid=None): cls.get(Workflow.uuid == uuid).delete() db.session.commit() class BibWorkflowObject(db.Model): # db table definition __tablename__ = "bwlOBJECT" + id = db.Column(db.Integer, primary_key=True) # Our internal data column. Default is encoded dict. _data = db.Column(db.LargeBinary, nullable=False, default=get_default_data()) _extra_data = db.Column(db.LargeBinary, nullable=False, default=get_default_extra_data()) id_workflow = db.Column(db.String(36), db.ForeignKey("bwlWORKFLOW.uuid"), nullable=True) version = db.Column(db.Integer(3), default=CFG_OBJECT_VERSION.RUNNING, nullable=False) id_parent = db.Column(db.Integer, db.ForeignKey("bwlOBJECT.id"), default=None) child_objects = db.relationship("BibWorkflowObject", remote_side=[id_parent]) created = db.Column(db.DateTime, default=datetime.now, nullable=False) modified = db.Column(db.DateTime, default=datetime.now, onupdate=datetime.now, nullable=False) status = db.Column(db.String(255), default="", nullable=False) - data_type = db.Column(db.String(100), nullable=True) + + data_type = db.Column(db.String(150), default="", + nullable=True) + uri = db.Column(db.String(500), default="") id_user = db.Column(db.Integer, default=0, nullable=False) child_logs = db.relationship("BibWorkflowObjectLog") workflow = db.relationship( Workflow, foreign_keys=[id_workflow], remote_side=Workflow.uuid ) _log = None @property def log(self): if not self._log: db_handler_obj = BibWorkflowLogHandler(BibWorkflowObjectLog, "id") self._log = get_logger(logger_name="object.%s_%s" % (self.id_workflow, self.id), db_handler_obj=db_handler_obj, loglevel=logging.DEBUG, obj=self) return self._log def get_data(self): """ Main method to retrieve data saved to the object. """ + return cPickle.loads(base64.b64decode(self._data)) def set_data(self, value): """ Main method to update data saved to the object. """ self._data = base64.b64encode(cPickle.dumps(value)) def get_extra_data(self): """ Main method to retrieve data saved to the object. """ return cPickle.loads(base64.b64decode(self._extra_data)) def set_extra_data(self, value): """ Main method to update data saved to the object. """ self._extra_data = base64.b64encode(cPickle.dumps(value)) def _create_db_obj(self): db.session.add(self) db.session.commit() def __repr__(self): return "<BibWorkflowObject(id = %s, data = %s, id_workflow = %s, " \ "version = %s, id_parent = %s, created = %s, extra_data = %s)" \ % (str(self.id), str(self.get_data()), str(self.id_workflow), str(self.version), str(self.id_parent), str(self.created), - str(self._extra_data)) + str(self.get_extra_data())) def __str__(self, log=False): return """ ------------------------------- BibWorkflowObject ------------------------------- Extra object class: Self status: %s ------------------------------- BibWorkflowObject: Id: %s Parent id: %s Workflow id: %s Created: %s Modified: %s Version: %s DB_obj status: %s Data type: %s URI: %s Data: %s Extra data: %s ------------------------------- """ % (str(self.status), str(self.id), str(self.id_parent), str(self.id_workflow), str(self.created), str(self.modified), str(self.version), str(self.status), str(self.data_type), str(self.uri), str(self.get_data()), - str(self._extra_data),) - # str(self.extra_object_class), + str(self.get_extra_data),) def __eq__(self, other): if isinstance(other, BibWorkflowObject): if self._data == other._data and \ self._extra_data == other._extra_data and \ self.id_workflow == other.id_workflow and \ self.version == other.version and \ self.id_parent == other.id_parent and \ isinstance(self.created, datetime) and \ isinstance(self.modified, datetime): return True else: return False return NotImplemented def __ne__(self, other): return not self.__eq__(other) def add_task_result(self, task_name, result): extra_data = self.get_extra_data() extra_data["tasks_results"][task_name] = result self.set_extra_data(extra_data) def change_status(self, message): self.status = message def get_current_task(self): return self.get_extra_data()["task_counter"] def _create_version_obj(self, id_workflow, version, id_parent=None, no_update=False): + obj = BibWorkflowObject(_data=self._data, id_workflow=id_workflow, version=version, id_parent=id_parent, _extra_data=self._extra_data, status=self.status, data_type=self.data_type) + db.session.add(obj) db.session.commit() if version is CFG_OBJECT_VERSION.INITIAL and not no_update: self.id_parent = obj.id db.session.commit() return obj.id def _update_db(self): db.session.add(self) db.session.commit() def save(self, version=None, task_counter=[0], id_workflow=None): """ Saved object """ if not self.id: db.session.add(self) db.session.commit() extra_data = self.get_extra_data() extra_data["task_counter"] = task_counter self.set_extra_data(extra_data) if not id_workflow: id_workflow = self.id_workflow if version: self.version = version if version in (CFG_OBJECT_VERSION.FINAL, CFG_OBJECT_VERSION.HALTED): redis_create_search_entry(self) self._update_db() def save_to_file(self, directory=None, prefix="workflow_object_data_", suffix=".obj"): """ Saves the contents of self.data['data'] to file. Returns path to saved file. Warning: Currently assumes non-binary content. """ if directory is None: directory = cfg['CFG_TMPSHAREDIR'] tmp_fd, filename = tempfile.mkstemp(dir=directory, prefix=prefix, suffix=suffix) os.write(tmp_fd, self._data) os.close(tmp_fd) return filename def __getstate__(self): - return {"_data": self._data, - "id_workflow": self.id_workflow, - "version": self.version, - "id_parent": self.id_parent, - "created": self.created, - "modified": self.modified, - "status": self.status, - "data_type": self.data_type, - "uri": self.uri, - "_extra_data": self._extra_data} + return self.__dict__ def __setstate__(self, state): - self._data = state["_data"] - self.id_workflow = state["id_workflow"] - self.version = state["version"] - self.id_parent = state["id_parent"] - self.created = state["created"] - self.modified = state["modified"] - self._extra_data = state["_extra_data"] - self.status = state["status"] - self.data_type = state["data_type"] - self.uri = state["uri"] + self.__dict__ = state def copy(self, other): """Copies data and metadata except id and id_workflow""" self._data = other._data self._extra_data = other._extra_data self.version = other.version self.id_parent = other.id_parent self.created = other.created - self.modified = datetime.now() - self.owner = other.owner + self.modified = other.modified self.status = other.status self.data_type = other.data_type self.uri = other.uri class BibWorkflowObjectLog(db.Model): """ This class represent a record of a log emit by an object into the database the object must be saved before using this class. Indeed it needs the id of the object into the database. """ __tablename__ = 'bwlOBJECTLOGGING' id = db.Column(db.Integer, primary_key=True) id_object = db.Column(db.Integer(255), db.ForeignKey('bwlOBJECT.id'), nullable=False) log_type = db.Column(db.Integer, default=0, nullable=False) created = db.Column(db.DateTime, default=datetime.now) message = db.Column(db.TEXT, default="", nullable=False) def __repr__(self): return "<BibWorkflowObjectLog(%i, %s, %s, %s)>" % \ (self.id, self.id_object, self.message, self.created) @classmethod def get(cls, *criteria, **filters): """ A wrapper for the filter and filter_by functions of sqlalchemy. Define a dict with which columns should be filtered by which values. look up also sqalchemy BaseQuery's filter and filter_by documentation """ return cls.query.filter(*criteria).filter_by(**filters) @classmethod def get_most_recent(cls, *criteria, **filters): """ Returns the most recently created log. """ most_recent = cls.get(*criteria, **filters).\ order_by(desc(BibWorkflowObjectLog.created)).first() if most_recent is None: raise NoResultFound else: return most_recent @classmethod def delete(cls, id=None): cls.get(BibWorkflowObjectLog.id == id).delete() db.session.commit() class BibWorkflowEngineLog(db.Model): __tablename__ = "bwlWORKFLOWLOGGING" id = db.Column(db.Integer, primary_key=True) id_object = db.Column(db.String(255), nullable=False) log_type = db.Column(db.Integer, default=0, nullable=False) created = db.Column(db.DateTime, default=datetime.now) message = db.Column(db.TEXT, default="", nullable=False) def __repr__(self): return "<BibWorkflowEngineLog(%i, %s, %s, %s)>" % \ (self.id, self.id_object, self.message, self.created) @classmethod def get(cls, *criteria, **filters): """ A wrapper for the filter and filter_by functions of sqlalchemy. Define a dict with which columns should be filtered by which values. look up also sqalchemy BaseQuery's filter and filter_by documentation """ return cls.query.filter(*criteria).filter_by(**filters) @classmethod def get_most_recent(cls, *criteria, **filters): """ Returns the most recently created log. """ most_recent = cls.get(*criteria, **filters).\ order_by(desc(BibWorkflowEngineLog.created)).first() if most_recent is None: raise NoResultFound else: return most_recent @classmethod def delete(cls, uuid=None): cls.get(BibWorkflowEngineLog.id == uuid).delete() db.session.commit() __all__ = ['Workflow', 'BibWorkflowObject', 'BibWorkflowObjectLog', 'BibWorkflowEngineLog'] diff --git a/invenio/modules/workflows/tasks/bibsched_tasks.py b/invenio/modules/workflows/tasks/bibsched_tasks.py new file mode 100644 index 000000000..f1c00ca34 --- /dev/null +++ b/invenio/modules/workflows/tasks/bibsched_tasks.py @@ -0,0 +1,75 @@ +## This file is part of Invenio. +## Copyright (C) 2013 CERN. +## +## Invenio is free software; you can redistribute it and/or +## modify it under the terms of the GNU General Public License as +## published by the Free Software Foundation; either version 2 of the +## License, or (at your option) any later version. +## +## Invenio is distributed in the hope that it will be useful, but +## WITHOUT ANY WARRANTY; without even the implied warranty of +## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +## General Public License for more details. +## +## You should have received a copy of the GNU General Public License +## along with Invenio; if not, write to the Free Software Foundation, Inc., +## 59 Temple Place, Suite 330, Boston, MA 021111307, USA. + +from invenio.bibtask import (write_message, + task_get_task_param + ) + + +def write_something_bibsched(messagea="This is the default message"): + """ + This function allows to send a message to bibsched... + This messages will be store into log. + """ + def _write_something_bibsched(obj, eng): + + if isinstance(messagea, basestring): + write_message(messagea) + return None + + if not isinstance(messagea, list): + if callable(messagea): + + write_message(messagea(obj, eng)) + return None + + if len(messagea) > 0: + temp = "" + for I in messagea: + if callable(I): + temp += str(I(obj, eng)) + elif isinstance(I, basestring): + temp += I + write_message(temp) + return None + + return _write_something_bibsched + + +def get_and_save_task_parameter(obj, eng): + eng.log.error("trying to retrieve param") + eng.log.error(str(task_get_task_param(None))) + eng.log.error("END OF RETRIEVING") + +#def task_update_progress(msg): +# def _task_update_progress(obj, eng): +# """Updates progress information in the BibSched task table.""" +# write_message("Updating task progress to %s." % msg, verbose=9) +# if "task_id" in _TASK_PARAMS: +# return run_sql("UPDATE schTASK SET progress=%s where id=%s", +# (msg, _TASK_PARAMS["task_id"])) +# +# +#def task_update_status(val): +# def _task_update_status(obj, eng): +# """Updates status information in the BibSched task table.""" +# write_message("Updating task status to %s." % val, verbose=9) +# if "task_id" in _TASK_PARAMS: +# return run_sql("UPDATE schTASK SET status=%s where id=%s", +# (val, _TASK_PARAMS["task_id"])) +# + diff --git a/invenio/modules/workflows/tasks/logic_tasks.py b/invenio/modules/workflows/tasks/logic_tasks.py new file mode 100644 index 000000000..a67191823 --- /dev/null +++ b/invenio/modules/workflows/tasks/logic_tasks.py @@ -0,0 +1,48 @@ + +def foreach(get_list_function=None, savename=None): + def _foreach(obj, eng): + + step = str(eng.getCurrTaskId()) + try: + if "Iterators" not in eng.extra_data: + eng.extra_data["Iterators"] = {} + except KeyError: + eng.extra_data["Iterators"] = {} + + if step not in eng.extra_data["Iterators"]: + eng.extra_data["Iterators"].update({step: 0}) + if callable(get_list_function): + my_list_to_process = get_list_function(obj, eng) + else: + my_list_to_process = [] + + if eng.extra_data["Iterators"][step] < len(my_list_to_process): + + obj.data = my_list_to_process[eng.extra_data["Iterators"][step]] + if savename is not None: + obj.extra_data[savename] = obj.data + + eng.extra_data["Iterators"][step] += 1 + else: + eng.extra_data["Iterators"][step] = 0 + coordonatex = len(eng.getCurrTaskId()) - 1 + coordonatey = eng.getCurrTaskId()[coordonatex] + new_vector = eng.getCurrTaskId() + new_vector[coordonatex] = coordonatey + 2 + eng.setPosition(eng.getCurrObjId(), new_vector) + + return _foreach + + +def endforeach(obj, eng): + coordonatex = len(eng.getCurrTaskId()) - 1 + coordonatey = eng.getCurrTaskId()[coordonatex] + new_vector = eng.getCurrTaskId() + new_vector[coordonatex] = coordonatey - 3 + eng.setPosition(eng.getCurrObjId(), new_vector) + + +def get_obj_data(obj, eng): + eng.log.info("last task name: get_obj_data") + return obj.data + diff --git a/invenio/modules/workflows/tasks/marcxml_tasks.py b/invenio/modules/workflows/tasks/marcxml_tasks.py index a5cabd52f..80424e999 100644 --- a/invenio/modules/workflows/tasks/marcxml_tasks.py +++ b/invenio/modules/workflows/tasks/marcxml_tasks.py @@ -1,142 +1,652 @@ ## This file is part of Invenio. ## Copyright (C) 2012, 2013 CERN. ## ## Invenio is free software; you can redistribute it and/or ## modify it under the terms of the GNU General Public License as ## published by the Free Software Foundation; either version 2 of the ## License, or (at your option) any later version. ## ## Invenio is distributed in the hope that it will be useful, but ## WITHOUT ANY WARRANTY; without even the implied warranty of ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU ## General Public License for more details. ## ## You should have received a copy of the GNU General Public License ## along with Invenio; if not, write to the Free Software Foundation, Inc., ## 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA. +import os +import random +import time +import glob +import re + + +from invenio.bibupload import (find_record_from_recid, + find_record_from_sysno, + find_records_from_extoaiid, + find_record_from_oaiid, + find_record_from_doi + ) +from invenio.legacy.oaiharvest.dblayer import create_oaiharvest_log_str + + +from invenio.base.config import (CFG_TMPSHAREDDIR, + CFG_PLOTEXTRACTOR_DOWNLOAD_TIMEOUT, + CFG_TMPDIR, + CFG_INSPIRE_SITE) +from invenio.oai_harvest_utils import (record_extraction_from_file, + collect_identifiers, + harvest_step, + translate_fieldvalues_from_latex, + find_matching_files, + ) +from invenio.bibtask import (task_sleep_now_if_required, + task_get_option, + task_low_level_submission + ) +from invenio.modules.oai_harvest.models import OaiHARVEST +from invenio.bibfield_jsonreader import JsonReader +from invenio.bibworkflow_utils import InvenioWorkflowError +from invenio.refextract_api import extract_references_from_file_xml +from invenio.legacy.bibrecord import (create_records, + record_xml_output + ) +from invenio.plotextractor_output_utils import (create_MARC, + create_contextfiles, + prepare_image_data, + write_message, + remove_dups + ) +from invenio.plotextractor_getter import (harvest_single, + make_single_directory + ) + +from invenio.plotextractor import (get_defaults, + extract_captions, + extract_context + ) +from invenio.shellutils import (run_shell_command, + Timeout + ) +import invenio.template +from invenio.plotextractor_converter import (untar, + convert_images + ) + +oaiharvest_templates = invenio.template.load('oai_harvest') + +REGEXP_RECORD = re.compile("<record.*?>(.*?)</>record>", re.DOTALL) +REGEXP_REFS = re.compile("<record.*?>.*?<controlfield .*?>.*?</controlfield>(.*?)</record>", re.DOTALL) +REGEXP_AUTHLIST = re.compile("<collaborationauthorlist.*?</collaborationauthorlist>", re.DOTALL) + def add_metadata_to_extra_data(obj, eng): """ Creates bibrecord from object data and populates extra_data with metadata """ from invenio.legacy.bibrecord import create_record, record_get_field_value record = create_record(obj.data) obj.extra_data['redis_search']['category'] =\ record_get_field_value(record[0], '037', code='c') obj.extra_data['redis_search']['title'] =\ record_get_field_value(record[0], '245', code='a') obj.extra_data['redis_search']['source'] =\ record_get_field_value(record[0], '035', code='9') add_metadata_to_extra_data.__title__ = "Metadata Extraction" add_metadata_to_extra_data.__description__ = "Populates object's extra_data with metadata" def approve_record(obj, eng): """ Will add the approval widget to the record """ + obj.extra_data["last_task_name"] = 'Record Approval' + eng.log.info("last task name: approve_record") try: obj.extra_data['message'] = 'Record needs approval. Click on widget to resolve.' + eng.log.info("Adding the approval widget to %s" % obj.id) obj.extra_data['widget'] = 'approval_widget' eng.halt("Record needs approval") except KeyError: # Log the error obj.extra_data["error_msg"] = 'Could not assign widget' + approve_record.__title__ = "Record Approval" approve_record.__description__ = "This task assigns the approval widget to a record." -def convert_record(stylesheet="oaiarxiv2marcxml.xsl"): +def convert_record_to_bibfield(obj, eng): + """ + Convert a record in data into a 'dictionary' + thanks to BibField + """ + from invenio.bibfield import create_record + + eng.log.info("last task name: convert_record_to_bibfield") + obj.data = create_record(obj.data).rec_json + eng.log.info("Conversion succeed") + + +def init_harvesting(obj, eng): + """ + This function gets all the option linked to the task and stores them into the + object to be used later. + """ + + eng.log.info("last task name: init_harvesting") + obj.extra_data["options"] = task_get_option(None) + obj.log.error(str(task_get_option(None))) + eng.log.info("end of init_harvesting") + + +def get_repositories_list(repositories): + """ + Here we are retrieving the oaiharvest configuration for the task. + It will allows in the future to do all the correct operations. + """ + def _get_repositories_list(obj, eng): + + eng.log.info("last task name: _get_repositories_list") + + reposlist_temp = None + + if repositories: + for reposname in repositories: + reposlist_temp = OaiHARVEST.get(OaiHARVEST.name == reposname).all() + else: + + reposlist_temp = OaiHARVEST.get(OaiHARVEST.name != "").all() + + return reposlist_temp + + return _get_repositories_list + + +def harvest_records(obj, eng): + """ + Run the harvesting task. The row argument is the oaiharvest task + queue row, containing if, arguments, etc. + Return 1 in case of success and 0 in case of failure. + """ + eng.log.info("last task name: harvest_records") + obj.extra_data["last_task_name"] = 'harvest_records' + harvested_identifier_list = [] + + harvestpath = "%s_%d_%s_" % ("%s/oaiharvest_%s" % (CFG_TMPSHAREDDIR, eng.uuid), + 1, time.strftime("%Y%m%d%H%M%S")) + # ## go ahead: check if user requested from-until harvesting + + try: + if "dates" not in obj.extra_data["options"]: + obj.extra_data["options"]["dates"] = {} + if "identifiers" not in obj.extra_data["options"]: + obj.extra_data["options"]["identifiers"] = {} + + except TypeError: + + obj.extra_data["options"] = {"dates": {}, "identifiers": {}} + + task_sleep_now_if_required() + + if obj.data.arguments: + eng.log.info(str(obj.data.arguments)) + # obj.data.arguments = deserialize_via_marshal(obj.data.arguments) + eng.log.info("running with post-processes: %s" % (obj.data.arguments,)) + + # Harvest phase + try: + harvested_files_list = harvest_step(obj.data, harvestpath, obj.extra_data["options"]["identifiers"], + obj.extra_data["options"]["dates"]) + except Exception: + eng.log.error("Error while harvesting %s. Skipping." % (obj.data,)) + + raise InvenioWorkflowError(str("Error while harvesting %s. Skipping." % (obj.data,)), eng.uuid) + + if len(harvested_files_list) == 0: + eng.log.error("No records harvested for %s" % (obj.data,)) + return None + # Retrieve all OAI IDs and set active list + + harvested_identifier_list.append(collect_identifiers(harvested_files_list)) + + if len(harvested_files_list) != len(harvested_identifier_list[0]): + # Harvested files and its identifiers are 'out of sync', abort harvest + eng.log.info("Harvested files miss identifiers for %s" % (obj.data.arguments,)) + raise InvenioWorkflowError(str("Harvested files miss identifiers ... failure !"), eng.uuid) + eng.log.info(str(len(harvested_files_list)) + " files harvested and processed") + eng.log.info("End harvest records task") + + +harvest_records.__id__ = "h" + + +def get_records_from_file(path=None): + def _get_records_from_file(obj, eng): + + eng.log.info("last task name: _get_records_from_file") + + if not "LoopData" in eng.extra_data: + eng.extra_data["LoopData"] = {} + + if "get_records_from_file" not in eng.extra_data["LoopData"]: + if path: + eng.extra_data["LoopData"].update({"get_records_from_file": record_extraction_from_file(path)}) + else: + eng.extra_data["LoopData"].update({"get_records_from_file": record_extraction_from_file(obj.data)}) + return eng.extra_data["LoopData"]["get_records_from_file"] + return _get_records_from_file + + +def get_eng_uuid_harvested(obj, eng): + """ + Simple function which allows to retrieve the uuid of the eng in the workflow + for printing by example + """ + eng.log.info("last task name: get_eng_uuid_harvested") + return "*" + str(eng.uuid) + "*.harvested" + + +def get_files_list(path, parameter): + def _get_files_list(obj, eng): + eng.log.info("last task name: get_files_list") + if callable(parameter): + unknown = parameter(obj, eng) + else: + unknown = parameter + result = glob.glob1(path, unknown) + for i in range(0, len(result)): + result[i] = path + os.sep + result[i] + return result + + return _get_files_list + + +def convert_record(stylesheet="oaidc2marcxml.xsl"): def _convert_record(obj, eng): """ Will convert the object data, if XML, using the given stylesheet """ + eng.log.info("last task name: convert_record") from invenio.legacy.bibconvert.xslt_engine import convert obj.extra_data["last_task_name"] = 'Convert Record' + eng.log.info("Starting conversion using %s stylesheet" % + (stylesheet,)) + try: obj.data = convert(obj.data, stylesheet) except: obj.extra_data["error_msg"] = 'Could not convert record' - raise + eng.log.error("Error: %s" % (obj.extra_data["error_msg"],)) + raise InvenioWorkflowError(str("Error: %s" % (obj.extra_data["error_msg"])), eng.uuid) - _convert_record.__title__ = "Convert Record" - _convert_record.__description__ = "This task converts a XML record." return _convert_record -def download_fulltext(obj, eng): +def fulltext_download(obj, eng): """ - Will download the fulltext document + Performs the fulltext download step. + Only for arXiv """ - from invenio.legacy.bibdocfile.api import download_url - obj.extra_data["last_task_name"] = 'Download Fulltext' - try: - eng.log_info("Starting download of %s" % (obj.data['url'])) - url = download_url(obj.data['url']) - obj.extra_data['tasks_results']['fulltext_url'] = url - except KeyError: - # Log the error - obj.extra_data["error_msg"] = 'Record does not include url' - eng.log.error("Error: %s" % (obj.extra_data["error_msg"],)) + eng.log.info("full-text attachment step started") + task_sleep_now_if_required() + + eng.log.info(str(obj.extra_data["repository"])) + + if "pdf" not in obj.extra_data["options"]["identifiers"]: -download_fulltext.__title__ = "Fulltext Download" -download_fulltext.__description__ = "This task downloads fulltext." + extract_path = make_single_directory(CFG_TMPSHAREDDIR, eng.uuid) + tarball, pdf = harvest_single(obj.data["system_control_number"]["value"], extract_path, ["pdf"]) + time.sleep(CFG_PLOTEXTRACTOR_DOWNLOAD_TIMEOUT) + if not obj.extra_data["repository"].arguments['t_doctype'] == '': + doctype = obj.extra_data["repository"].arguments['t_doctype'] + else: + doctype = 'arXiv' + if pdf: + obj.extra_data["options"]["identifiers"]["pdf"] = pdf + fulltext_xml = """ <datafield tag="FFT" ind1=" " ind2=" "> + <subfield code="a">%(url)s</subfield> + <subfield code="t">%(doctype)s</subfield> + </datafield>""" % {'url': obj.extra_data["options"]["identifiers"]["pdf"], + 'doctype': doctype} -def match_record(obj, eng): + updated_xml = '<?xml version="1.0" encoding="UTF-8"?>\n<collection>\n<record>\n' + fulltext_xml + \ + '</record>\n</collection>' + from invenio.bibfield import create_record + + new_dict_representation = create_record(updated_xml).rec_json + try: + obj.data['fft'].append(new_dict_representation["fft"]) + except: + obj.data['fft'] = [new_dict_representation['fft']] + + +def quick_match_record(obj, eng): """ - Will try to find matches in stored records + Retrieve the record Id from a record by using tag 001 or SYSNO or OAI ID or DOI + tag. opt_mod is the desired mode. + + 001 fields even in the insert mode """ from invenio.legacy.bibrecord import create_record from invenio.legacy.bibmatch.engine import match_records + eng.log.info("last task name: quick_match_record") + obj.extra_data["last_task_name"] = 'Quick Match Record' - obj.extra_data["last_task_name"] = 'Bibmatch Record' - rec = create_record(obj.data) - matches = match_records(records=[rec], - qrystrs=[("title", "[245__a]")]) - obj.extra_data['tasks_results']['match_record'] = matches - if matches[2] or matches[3]: - # we have ambiguous or fuzzy results - # render holding pen corresponding template - eng.halt("Match resolution needed") - elif matches[0]: - pass - else: - results = matches[1][0][1] - obj.extra_data['widget'] = 'bibmatch_widget' + function_dictionnary = {'recid': find_record_from_recid, 'system_number': find_record_from_sysno, + 'oaiid': find_record_from_oaiid, 'system_control_number': find_records_from_extoaiid, + 'doi': find_record_from_doi} -match_record.__title__ = "Bibmatch Record" -match_record.__description__ = "This task matches a XML record." + my_json_reader = JsonReader() + my_json_reader.rec_json = obj.data + identifiers = my_json_reader.get_persistent_identifiers() -def print_record(obj, eng): - eng.log_info(obj.get_data()) - -print_record.__title__ = "Print Record" -print_record.__description__ = "Prints the record data to engine log" + if not "recid" in identifiers: + for identifier in identifiers: + recid = function_dictionnary[identifier](identifiers[identifier]["value"]) + if recid: + obj.data['recid']['value'] = recid + return True + return False + else: + return True def upload_record(mode="ir"): def _upload_record(obj, eng): + + eng.log.info("last task name: upload_record") from invenio.legacy.bibsched.bibtask import task_low_level_submission obj.extra_data["last_task_name"] = 'Upload Record' eng.log_info("Saving data to temporary file for upload") filename = obj.save_to_file() params = ["-%s" % (mode,), filename] task_id = task_low_level_submission("bibupload", "bibworkflow", *tuple(params)) eng.log_info("Submitted task #%s" % (task_id,)) _upload_record.__title__ = "Upload Record" _upload_record.__description__ = "Uploads the record using BibUpload" return _upload_record + + +upload_record.__id__ = "u" + + +def plot_extract(plotextractor_types): + def _plot_extract(obj, eng): + """ + Performs the plotextraction step. + """ + + eng.log.info("last task name: plot_extract") + obj.extra_data["last_task_name"] = 'plotextraction' + eng.log.info("plotextraction step started") + # Download tarball for each harvested/converted record, then run plotextrator. + # Update converted xml files with generated xml or add it for upload + task_sleep_now_if_required() + + if 'latex' in plotextractor_types: + # Run LaTeX plotextractor + if "tarball" not in obj.extra_data["options"]["identifiers"]: + # turn oaiharvest_23_1_20110214161632_converted -> oaiharvest_23_1_material + # to let harvested material in same folder structure + extract_path = make_single_directory(CFG_TMPSHAREDDIR, eng.uuid) + tarball, pdf = harvest_single(obj.data["system_control_number"]["value"], extract_path, ["tarball"]) + tarball = str(tarball) + time.sleep(CFG_PLOTEXTRACTOR_DOWNLOAD_TIMEOUT) + + if tarball is None: + raise InvenioWorkflowError(str("Error harvesting tarball from id: %s %s" % + (obj.data["system_control_number"]["value"], extract_path)), + eng.uuid) + obj.extra_data["options"]["identifiers"]["tarball"] = tarball + else: + tarball = obj.extra_data["options"]["identifiers"]["tarball"] + + sub_dir, refno = get_defaults(tarball, CFG_TMPDIR, "") + + tex_files = None + image_list = None + try: + extracted_files_list, image_list, tex_files = untar(tarball, sub_dir) + except Timeout: + eng.log.error('Timeout during tarball extraction on %s' % (tarball,)) + + converted_image_list = convert_images(image_list) + eng.log.info('converted %d of %d images found for %s' % (len(converted_image_list), + len(image_list), + os.path.basename(tarball))) + extracted_image_data = [] + if tex_files == [] or tex_files is None: + eng.log.error('%s is not a tarball' % (os.path.split(tarball)[-1],)) + run_shell_command('rm -r %s', (sub_dir,)) + else: + for tex_file in tex_files: + # Extract images, captions and labels + partly_extracted_image_data = extract_captions(tex_file, sub_dir, + converted_image_list) + if partly_extracted_image_data: + # Add proper filepaths and do various cleaning + cleaned_image_data = prepare_image_data(partly_extracted_image_data, + tex_file, converted_image_list) + # Using prev. extracted info, get contexts for each image found + extracted_image_data.extend((extract_context(tex_file, cleaned_image_data))) + + if extracted_image_data: + extracted_image_data = remove_dups(extracted_image_data) + create_contextfiles(extracted_image_data) + marc_xml = '<?xml version="1.0" encoding="UTF-8"?>\n<collection>\n' + marc_xml = marc_xml + create_MARC(extracted_image_data, tarball, None) + marc_xml += "\n</collection>" + + if marc_xml: + from invenio.bibfield import create_record + # We store the path to the directory the tarball contents live + # Read and grab MARCXML from plotextractor run + new_dict_representation = create_record(marc_xml).rec_json + try: + obj.data['fft'].append(new_dict_representation["fft"]) + except KeyError: + obj.data['fft'] = [new_dict_representation['fft']] + + return _plot_extract + + +def refextract(obj, eng): + """ + Performs the reference extraction step. + """ + eng.log.info("refextraction step started") + + task_sleep_now_if_required() + + if "pdf" not in obj.extra_data["options"]["identifiers"]: + extract_path = make_single_directory(CFG_TMPSHAREDDIR, eng.uuid) + tarball, pdf = harvest_single(obj.data["system_control_number"]["value"], extract_path, ["pdf"]) + time.sleep(CFG_PLOTEXTRACTOR_DOWNLOAD_TIMEOUT) + if pdf is not None: + obj.extra_data["options"]["identifiers"]["pdf"] = pdf + + cmd_stdout = extract_references_from_file_xml(obj.extra_data["options"]["identifiers"]["pdf"]) + references_xml = REGEXP_REFS.search(cmd_stdout) + + if references_xml: + updated_xml = '<?xml version="1.0" encoding="UTF-8"?>\n<collection>\n<record>' + references_xml.group(1) + \ + "</record>\n</collection>" + + from invenio.bibfield import create_record + + new_dict_representation = create_record(updated_xml).rec_json + try: + obj.data['reference'].append(new_dict_representation["reference"]) + except KeyError: + if 'reference' in new_dict_representation: + obj.data['reference'] = [new_dict_representation['reference']] + + +def author_list(obj, eng): + """ + Performs the special authorlist extraction step (Mostly INSPIRE/CERN related). + """ + eng.log.info("last task name: author_list") + eng.log.info("authorlist extraction step started") + + identifiers = obj.data["system_control_number"]["value"] + task_sleep_now_if_required() + + if "tarball" not in obj.extra_data["options"]["identifiers"]: + extract_path = make_single_directory(CFG_TMPSHAREDDIR, eng.uuid) + tarball, pdf = harvest_single(obj.data["system_control_number"]["value"], extract_path, ["tarball"]) + tarball = str(tarball) + time.sleep(CFG_PLOTEXTRACTOR_DOWNLOAD_TIMEOUT) + if tarball is None: + raise InvenioWorkflowError(str("Error harvesting tarball from id: %s %s" % (identifiers, extract_path)), + eng.uuid) + obj.extra_data["options"]["identifiers"]["tarball"] = tarball + + sub_dir, dummy = get_defaults(obj.extra_data["options"]["identifiers"]["tarball"], CFG_TMPDIR, "") + + try: + untar(obj.extra_data["options"]["identifiers"]["tarball"], sub_dir) + except Timeout: + eng.log.error('Timeout during tarball extraction on %s' % (obj.extra_data["options"]["identifiers"]["tarball"])) + + xml_files_list = find_matching_files(sub_dir, ["xml"]) + + authors = "" + + for xml_file in xml_files_list: + xml_file_fd = open(xml_file, "r") + xml_content = xml_file_fd.read() + xml_file_fd.close() + + match = REGEXP_AUTHLIST.findall(xml_content) + if not match == []: + authors += match[0] + # Generate file to store conversion results + if authors is not '': + from invenio.bibconvert_xslt_engine import convert + + authors = convert(authors, "authorlist2marcxml.xsl") + authorlist_record = create_records(authors) + if len(authorlist_record) == 1: + if authorlist_record[0][0] is None: + eng.log.error("Error parsing authorlist record for id: %s" % (identifiers,)) + authorlist_record = authorlist_record[0][0] + # Convert any LaTeX symbols in authornames + translate_fieldvalues_from_latex(authorlist_record, '100', code='a') + translate_fieldvalues_from_latex(authorlist_record, '700', code='a') + # Look for any UNDEFINED fields in authorlist + #key = "UNDEFINED" + #matching_fields = record_find_matching_fields(key, authorlist_record, tag='100') +\ + # record_find_matching_fields(key, authorlist_record, tag='700') + + #if len(matching_fields) > 0: + + # UNDEFINED found. Create ticket in author queue + # ticketid = create_authorlist_ticket(matching_fields, \ + # identifiers, arguments.get('a_rt-queue')) + # if ticketid: + # eng.log.info("authorlist RT ticket %d submitted for %s" % (ticketid, identifiers)) + # else: + # eng.log.error("Error while submitting RT ticket for %s" % (identifiers,)) + updated_xml = '<?xml version="1.0" encoding="UTF-8"?>\n<collection>\n' + record_xml_output(authorlist_record) \ + + '</collection>' + if not None == updated_xml: + from invenio.bibfield import create_record + # We store the path to the directory the tarball contents live + # Read and grab MARCXML from plotextractor run + new_dict_representation = create_record(updated_xml).rec_json + obj.data['authors'] = new_dict_representation["authors"] + obj.data['number_of_authors'] = new_dict_representation["number_of_authors"] + + +author_list.__id__ = "u" + + +def upload_step(obj, eng): + """ + Perform the upload step. + """ + eng.log.info("upload step started") + uploaded_task_ids = [] + #Work comment: + # + #Prepare in case of filtering the files to up, + #no filtering, no other things to do + + new_dict_representation = JsonReader() + new_dict_representation.rec_json = obj.data + marcxml_value = new_dict_representation.legacy_export_as_marc() + + task_id = None + # Get a random sequence ID that will allow for the tasks to be + # run in order, regardless if parallel task execution is activated + sequence_id = random.randrange(1, 4294967296) + task_sleep_now_if_required() + extract_path = make_single_directory(CFG_TMPSHAREDDIR, eng.uuid) + # Now we launch BibUpload tasks for the final MARCXML files + filepath = extract_path + os.sep + str(obj.id) + file_fd = open(filepath, 'w') + file_fd.write(marcxml_value) + file_fd.close() + mode = ["-r", "-i"] + + if os.path.exists(filepath): + try: + args = mode + if sequence_id: + args.extend(['-I', str(sequence_id)]) + if obj.extra_data["repository"].arguments.get('u_name', ""): + args.extend(['-N', obj.extra_data["repository"].arguments.get('u_name', "")]) + if obj.extra_data["repository"].arguments.get('u_priority', 5): + args.extend(['-P', str(obj.extra_data["repository"].arguments.get('u_priority', 5))]) + args.append(filepath) + task_id = task_low_level_submission("bibupload", "oaiharvest", *tuple(args)) + create_oaiharvest_log(task_id, obj.extra_data["repository"].id, filepath) + except Exception, msg: + write_message("An exception during submitting oaiharvest task occured : %s " % (str(msg))) + return None + else: + eng.log.error("marcxmlfile %s does not exist" % (filepath,)) + if task_id is None: + eng.log.error("an error occurred while uploading %s from %s" % + (filepath, obj.extra_data["repository"].name)) + else: + uploaded_task_ids.append(task_id) + eng.log.info("material harvested from source %s was successfully uploaded" % + (obj.extra_data["repository"].name,)) + + if CFG_INSPIRE_SITE: + # Launch BibIndex,Webcoll update task to show uploaded content quickly + bibindex_params = ['-w', 'collection,reportnumber,global', + '-P', '6', + '-I', str(sequence_id), + '--post-process', + 'bst_run_bibtask[taskname="webcoll", user="oaiharvest", P="6", c="HEP"]'] + task_low_level_submission("bibindex", "oaiharvest", *tuple(bibindex_params)) + write_message("upload step ended") + eng.log.info("end of upload") + + +def create_oaiharvest_log(task_id, oai_src_id, marcxmlfile): + """ + Function which creates the harvesting logs + @param task_id bibupload task id + """ + file_fd = open(marcxmlfile, "r") + xml_content = file_fd.read(-1) + file_fd.close() + create_oaiharvest_log_str(task_id, oai_src_id, xml_content) diff --git a/invenio/modules/workflows/tasks/workflows_tasks.py b/invenio/modules/workflows/tasks/workflows_tasks.py new file mode 100644 index 000000000..0f8c15f74 --- /dev/null +++ b/invenio/modules/workflows/tasks/workflows_tasks.py @@ -0,0 +1,142 @@ + +from invenio.modules.workflows.models import (BibWorkflowObject, + BibWorkflowEngineLog) +from invenio.bibworkflow_api import (start_delayed) + +from invenio.bibworkflow_utils import InvenioWorkflowError + + +def get_nb_workflow_created(obj, eng): + eng.log.info("last task name: get_nb_workflow_created") + return eng.extra_data["nb_workflow"] + + +def start_workflow(workflow_to_run="default", data=None, copy=True, **kwargs): + """ + This function allow you to run a new asynchronous workflow, this + will be run on the celery node configurer into invenio + configuration. + + The first argument is the name of the workflow. + + The second one is the data to use for this workflow + + The copy parameter allow you to pass to the workflow a copy + of the obj at the moment of the call . + + **kargs allow you to add some key:value into the extra data of + the object. + """ + + def _start_workflow(obj, eng): + + eng.log.info("last task name: start_workflow") + + eng.log.info("Workflow object in creation") + myobject = BibWorkflowObject() + + if copy is True: + myobject.copy(obj) + if data is not None: + myobject.data = data + eng.log.info("Workflow object ready") + + myobject.save() + workflow_id = start_delayed(workflow_to_run, data=[myobject], **kwargs) + + eng.log.info("Workflow launched") + try: + eng.extra_data["workflow_ids"].append(workflow_id) + except KeyError: + eng.extra_data["workflow_ids"] = [workflow_id] + + try: + eng.extra_data["nb_workflow"] += 1 + except KeyError: + eng.extra_data["nb_workflow"] = 1 + + if "nb_workflow_failed" not in eng.extra_data: + eng.extra_data["nb_workflow_failed"] = 0 + + return _start_workflow + + +def wait_for_workflows_to_complete(obj, eng): + """ + This function wait all the asynchronous workflow launched. + It acts like a barrier + """ + eng.log.info("last task name: wait_for_workflows_to_complete") + + if 'workflow_ids' in eng.extra_data: + for workflow_id in eng.extra_data['workflow_ids']: + try: + workflow_id.get() + + except InvenioWorkflowError as e: + + eng.log.error("___________________\n_______ALERT_______\n___________________\n_______WORKFLOW " + + e.id_workflow + " FAILED_______\n_______ERROR MESSAGE IS :_______\n" + repr(e)) + + workflowlog = BibWorkflowEngineLog.query.filter(BibWorkflowEngineLog.id_workflow == e.id_workflow).all() + + for log in workflowlog: + eng.log.error(log.message) + + eng.extra_data["nb_workflow_failed"] += 1 + except Exception as e: + eng.log.error("_______ALERT_______") + eng.log.error(str(e)) + eng.extra_data["nb_workflow_failed"] += 1 + else: + eng.extra_data["nb_workflow"] = 0 + eng.extra_data["nb_workflow_failed"] = 0 + + +def wait_for_workflow_to_complete(obj, eng): + """ + This function wait for the asynchronous workflow specified + in obj.data ( asyncresult ) + It acts like a barrier + """ + eng.log.info("last task name: wait_for_workflow_to_complete") + for workflow_id in eng.extra_data['workflow_ids']: + try: + obj.data.get() + except Exception as e: + eng.log.error(str(e)) + eng.extra_data["nb_workflow_failed"] += 1 + + +def get_list_of_workflows_to_wait(obj, eng): + """ + Return a list of asyncresult corresponding to running + asynchrnous workflow + """ + eng.log.info("last task name: get_list_of_workflows_to_wait") + return eng.extra_data["workflow_ids"] + + +def get_status_async_result_obj_data(obj, eng): + eng.log.info("last task name: get_status_async_result_obj_data") + return obj.data.state + + +def workflows_reviews(obj, eng): + """ + This function write a little report about + asynchronous workflows in this main workflow + Raise an exception if a workflow is gone rogue + """ + eng.log.info("last task name: workflows_reviews") + eng.log.info("%s / %s failed" % (eng.extra_data["nb_workflow_failed"], eng.extra_data["nb_workflow"])) + + if eng.extra_data["nb_workflow_failed"]: + raise Exception("%s / %s failed" % (eng.extra_data["nb_workflow_failed"], eng.extra_data["nb_workflow"])) + + +def log_info(message): + def _log_info(obj, eng): + eng.log.info(message) + + return _log_info diff --git a/invenio/modules/workflows/utils.py b/invenio/modules/workflows/utils.py index 098b4ea8c..2424ef965 100644 --- a/invenio/modules/workflows/utils.py +++ b/invenio/modules/workflows/utils.py @@ -1,230 +1,274 @@ ## -*- coding: utf-8 -*- ## This file is part of Invenio. ## Copyright (C) 2012, 2013 CERN. ## ## Invenio is free software; you can redistribute it and/or ## modify it under the terms of the GNU General Public License as ## published by the Free Software Foundation; either version 2 of the ## License, or (at your option) any later version. ## ## Invenio is distributed in the hope that it will be useful, but ## WITHOUT ANY WARRANTY; without even the implied warranty of ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU ## General Public License for more details. ## ## You should have received a copy of the GNU General Public License ## along with Invenio; if not, write to the Free Software Foundation, Inc., ## 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA. import re import redis from invenio.legacy.bibrecord import create_record + from invenio.ext.logging import register_exception -from invenio.ext.sqlalchemy import db + + REGEXP_RECORD = re.compile("<record.*?>(.*?)</record>", re.DOTALL) +class BibWorkflowObjectIdContainer(object): + """ + This class is only made to be able to store a workflow ID and + to retrieve easily the workflow from this ID. It is used maily + to overide some problem with SQLAlchemy when we change of + execution thread ( for example Celery ) + """ + + def __init__(self, bibworkflowobject=None): + if bibworkflowobject is not None: + self.id = bibworkflowobject.id + else: + self.id = None + + def get_object(self): + from invenio.modules.workflows.models import BibWorkflowObject + + if self.id is not None: + my_object = BibWorkflowObject.query.filter(BibWorkflowObject.id == self.id).one() + import ast + temp = my_object.get_extra_data() + temp["repository"].arguments = ast.literal_eval(my_object.get_extra_data()["repository"].arguments) + my_object.set_extra_data(temp) + return my_object + else: + return None + + def __str__(self): + return "BibWorkflowObject" + str(self.id) + + +class InvenioWorkflowError(Exception): + + def __init__(self, error, id_workflow, id_object=0, message=""): + self.id_workflow = id_workflow + self.id_object = id_object + self.error = error + self.message = message + Exception.__init__(self, error, id_workflow, id_object, message) + super(InvenioWorkflowError, self).__init__(error,id_workflow, id_object, message) + + def __str__(self): + return str(self.id_workflow) + " " + str(self.id_object) + " " + str(self.error) + " " + str(self.message) + + class InvenioWorkflowDefinitionError(Exception): pass def get_workflow_definition(name): from .loader import workflows if name in workflows: return workflows[name] else: - raise InvenioWorkflowDefinitionError("Cannot find workflow %s" - % (name,)) + raise InvenioWorkflowDefinitionError("Cannot find workflow %s" % (name,)) def determineDataType(data): # If data is a dictionary and contains type key, # we can directly derive the data_type if isinstance(data, dict): if 'type' in data: data_type = data['type'] else: data_type = 'dict' else: # If data is not a dictionary, we try to guess MIME type # by using magic library try: from magic import Magic mime_checker = Magic(mime=True) data_type = mime_checker.from_buffer(data) # noqa except: register_exception(stream="warning", prefix= "BibWorkflowObject.determineDataType:" + " Impossible to resolve data type.") data_type = "" return data_type ## TODO special thanks to http://code.activestate.com/recipes/440514-dictproperty-properties-for-dictionary-attributes/ class dictproperty(object): class _proxy(object): def __init__(self, obj, fget, fset, fdel): self._obj = obj self._fget = fget self._fset = fset self._fdel = fdel def __getitem__(self, key): try: return self._fget(self._obj, key) except TypeError: print "can't read item" def __setitem__(self, key, value): try: self._fset(self._obj, key, value) except TypeError: print "can't set item %s: %s" % (str(key), str(value),) def __delitem__(self, key): try: self._fdel(self._obj, key) except TypeError: print "can't delete item" def __init__(self, fget=None, fset=None, fdel=None, doc=None): self._fget = fget self._fset = fset self._fdel = fdel self.__doc__ = doc def __get__(self, obj, objtype=None): if obj is None: return self return self._proxy(obj, self._fget, self._fset, self._fdel) def redis_create_search_entry(bwobject): redis_server = set_up_redis() extra_data = bwobject.get_extra_data() #creates database entries to not loose key value pairs in redis for key, value in extra_data["redis_search"].iteritems(): redis_server.sadd("holdingpen_sort", str(key)) redis_server.sadd("holdingpen_sort:%s" % (str(key),), str(value)) redis_server.sadd("holdingpen_sort:%s:%s" % (str(key), str(value),), bwobject.id) redis_server.sadd("holdingpen_sort", "owner") redis_server.sadd("holdingpen_sort:owner", extra_data['owner']) redis_server.sadd("holdingpen_sort:owner:%s" % (extra_data['owner'],), bwobject.id) redis_server.sadd("holdingpen_sort:last_task_name:%s" % (extra_data['last_task_name'],), bwobject.id) def filter_holdingpen_results(key, *args): """Function filters holdingpen entries by given key: value pair. It returns list of IDs.""" redis_server = set_up_redis() new_args = [] for a in args: new_args.append("holdingpen_sort:"+a) return redis_server.sinter("holdingpen_sort:"+key, *new_args) def get_redis_keys(key=None): redis_server = set_up_redis() if key: return list(redis_server.smembers("holdingpen_sort:%s" % (str(key),))) else: return list(redis_server.smembers("holdingpen_sort")) def get_redis_values(key): redis_server = set_up_redis() return redis_server.smembers("holdingpen_sort:%s" % (str(key),)) def set_up_redis(): """ Sets up the redis server for the saving of the HPContainers - @type url: string - @param url: address to setup the Redis server @return: Redis server object. """ from flask import current_app redis_server = redis.Redis.from_url( current_app.config.get('CACHE_REDIS_URL', 'redis://localhost:6379') ) return redis_server def empty_redis(): redis_server = set_up_redis() redis_server.flushall() def sort_bwolist(bwolist, iSortCol_0, sSortDir_0): if iSortCol_0 == 0: if sSortDir_0 == 'desc': bwolist.sort(key=lambda x: x.id, reverse=True) else: bwolist.sort(key=lambda x: x.id, reverse=False) elif iSortCol_0 == 1: pass # if sSortDir_0 == 'desc': # bwolist.sort(key=lambda x: x.id_user, reverse=True) # else: # bwolist.sort(key=lambda x: x.id_user, reverse=False) elif iSortCol_0 == 2: pass # if sSortDir_0 == 'desc': # bwolist.sort(key=lambda x: x.id_user, reverse=True) # else: # bwolist.sort(key=lambda x: x.id_user, reverse=False) elif iSortCol_0 == 3: pass # if sSortDir_0 == 'desc': # bwolist.sort(key=lambda x: x.id_user, reverse=True) # else: # bwolist.sort(key=lambda x: x.id_user, reverse=False) elif iSortCol_0 == 4: if sSortDir_0 == 'desc': bwolist.sort(key=lambda x: x.id_workflow, reverse=True) else: bwolist.sort(key=lambda x: x.id_workflow, reverse=False) elif iSortCol_0 == 5: if sSortDir_0 == 'desc': bwolist.sort(key=lambda x: x.id_user, reverse=True) else: bwolist.sort(key=lambda x: x.id_user, reverse=False) elif iSortCol_0 == 6: if sSortDir_0 == 'desc': bwolist.sort(key=lambda x: x.created, reverse=True) else: bwolist.sort(key=lambda x: x.created, reverse=False) elif iSortCol_0 == 7: if sSortDir_0 == 'desc': bwolist.sort(key=lambda x: x.version, reverse=True) else: bwolist.sort(key=lambda x: x.version, reverse=False) elif iSortCol_0 == 8: if sSortDir_0 == 'desc': bwolist.sort(key=lambda x: x.version, reverse=True) else: bwolist.sort(key=lambda x: x.version, reverse=False) elif iSortCol_0 == 9: if sSortDir_0 == 'desc': bwolist.sort(key=lambda x: x.version, reverse=True) else: bwolist.sort(key=lambda x: x.version, reverse=False) return bwolist def parse_bwids(bwolist): import ast return list(ast.literal_eval(bwolist)) diff --git a/invenio/modules/workflows/worker_engine.py b/invenio/modules/workflows/worker_engine.py index 7b9eb5d84..fe29ffee6 100644 --- a/invenio/modules/workflows/worker_engine.py +++ b/invenio/modules/workflows/worker_engine.py @@ -1,187 +1,185 @@ # -*- coding: utf-8 -*- ## This file is part of Invenio. ## Copyright (C) 2012, 2013 CERN. ## ## Invenio is free software; you can redistribute it and/or ## modify it under the terms of the GNU General Public License as ## published by the Free Software Foundation; either version 2 of the ## License, or (at your option) any later version. ## ## Invenio is distributed in the hope that it will be useful, but ## WITHOUT ANY WARRANTY; without even the implied warranty of ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU ## General Public License for more details. ## ## You should have received a copy of the GNU General Public License ## along with Invenio; if not, write to the Free Software Foundation, Inc., ## 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA. from invenio.ext.sqlalchemy import db from .client import run_workflow, continue_execution from .engine import BibWorkflowEngine from .models import BibWorkflowObject, Workflow from .config import CFG_OBJECT_VERSION class InvenioBibWorkflowValueError(Exception): pass def run_worker(wname, data, **kwargs): """ Runs workflow with given name and given data. Data can be specified as list of objects or single id of WfeObject/BibWorkflowObjects. """ wfe = BibWorkflowEngine(wname, **kwargs) wfe.save() - objects = prepare_objects(data, wfe) run_workflow(wfe=wfe, data=objects, **kwargs) + return wfe def restart_worker(wid, **kwargs): """ Restarts workflow with given id (wid) and given data. If data are not specified then it will load all initial data for workflow. Data can be specified as list of objects or single id of WfeObject/BibWorkflowObjects. """ data = BibWorkflowObject.query.filter(BibWorkflowObject.id_workflow == wid, BibWorkflowObject.version == CFG_OBJECT_VERSION.INITIAL).all() workflow = Workflow.query.filter(Workflow.uuid == wid).first() wfe = BibWorkflowEngine(workflow.name, **kwargs) wfe.save() objects = prepare_objects(data, wfe) run_workflow(wfe=wfe, data=objects, **kwargs) return wfe def continue_worker(oid, restart_point="continue_next", **kwargs): """ Restarts workflow with given id (wid) at given point. restart_point can be one of: * restart_prev: will restart from the previous task * continue_next: will continue to the next task * restart_task: will restart the current task """ data = [BibWorkflowObject.query.filter(BibWorkflowObject.id == oid).first()] workflow = Workflow.query.filter(Workflow.uuid == data[0].id_workflow).first() wfe = BibWorkflowEngine(workflow.name, uuid=None, id_user=0, workflow_object=workflow, **kwargs) wfe.save() continue_execution(wfe, data, restart_point, **kwargs) return wfe def prepare_objects(data, workflow_object): objects = [] for obj in data: if isinstance(obj, BibWorkflowObject): if obj.id: obj.log.debug("Object found for process") objects.append(_prepare_objects_helper(obj, workflow_object)) else: objects.append(obj) else: # First we create an initial object for each data item new_initial = \ BibWorkflowObject(id_workflow=workflow_object.uuid, version=CFG_OBJECT_VERSION.INITIAL ) new_initial.set_data(obj) new_initial._update_db() # Then we create another object to actually work on current_obj = BibWorkflowObject(id_workflow=workflow_object.uuid, version=CFG_OBJECT_VERSION.RUNNING, id_parent=new_initial.id) current_obj.set_data(obj) objects.append(current_obj) - return objects - def _prepare_objects_helper(obj, workflow_object): assert obj if obj.version == CFG_OBJECT_VERSION.INITIAL: obj.log.debug("State: Initial") new_id = obj._create_version_obj(id_workflow=workflow_object.uuid, version=CFG_OBJECT_VERSION.RUNNING, id_parent=obj.id, no_update=True) return BibWorkflowObject.query.filter(BibWorkflowObject.id == new_id).first() elif obj.version in (CFG_OBJECT_VERSION.HALTED, CFG_OBJECT_VERSION.FINAL): obj.log.debug("State: Halted or Final") # creating INITIAL object # for FINAL version: maybe it should set # id_parent to the previous final object new_initial = obj._create_version_obj(id_workflow=workflow_object.uuid, version=CFG_OBJECT_VERSION.INITIAL, no_update=True) new_id = obj._create_version_obj(id_workflow=workflow_object.uuid, version=CFG_OBJECT_VERSION.RUNNING, id_parent=new_initial, no_update=True) return BibWorkflowObject.query.filter(BibWorkflowObject.id == new_id).first() elif obj.version == CFG_OBJECT_VERSION.RUNNING: # object shuld be deleted restart from INITIAL obj.log.debug("State: Running") if obj.id_workflow is not None: obj.log.info("""WARNING! You want to restart from temporary object. We can't guarantee that data object is not corrupted. Workflow will start from associated INITIAL object and RUNNING object will be deleted.""") parent_obj = BibWorkflowObject.query.filter( BibWorkflowObject.id == obj.id_parent).first() new_initial = parent_obj._create_version_obj( id_workflow=workflow_object.uuid, version=CFG_OBJECT_VERSION.INITIAL, no_update=True) new_id = parent_obj._create_version_obj( id_workflow=workflow_object.uuid, version=CFG_OBJECT_VERSION.RUNNING, id_parent=new_initial, no_update=True) db.session.delete(obj) return BibWorkflowObject.query.filter(BibWorkflowObject.id == new_id).first() else: obj.log.info("""You are running workflow on a object created manualy outside of the workflow. Workflow will execute on THIS object (it will change its state and/or data) but it would also create INITIAL version of the object to - keep its oryginal state.""") + keep its original state.""") # We assume that there is no parent object, so we create a new # INITIAL object, which will become a parent. new_parent = obj._create_version_obj( id_workflow=workflow_object.uuid, version=CFG_OBJECT_VERSION.INITIAL, no_update=True) # We add an id_workflow to our object obj.id_workflow = workflow_object.uuid obj.id_parent = new_parent obj._update_db() return obj else: raise InvenioBibWorkflowValueError("Object version is unknown: %s" % (obj.version,)) diff --git a/invenio/modules/workflows/workers/worker_celery.py b/invenio/modules/workflows/workers/worker_celery.py index 9193f7023..d750e3e4c 100644 --- a/invenio/modules/workflows/workers/worker_celery.py +++ b/invenio/modules/workflows/workers/worker_celery.py @@ -1,83 +1,126 @@ ## This file is part of Invenio. ## Copyright (C) 2012, 2013 CERN. ## ## Invenio is free software; you can redistribute it and/or ## modify it under the terms of the GNU General Public License as ## published by the Free Software Foundation; either version 2 of the ## License, or (at your option) any later version. ## ## Invenio is distributed in the hope that it will be useful, but ## WITHOUT ANY WARRANTY; without even the implied warranty of ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU ## General Public License for more details. ## ## You should have received a copy of the GNU General Public License ## along with Invenio; if not, write to the Free Software Foundation, Inc., ## 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA. + +from invenio.bibworkflow_worker_engine import (run_worker, + restart_worker, + continue_worker) from invenio.celery import celery +from invenio.bibworkflow_utils import BibWorkflowObjectIdContainer +from invenio.ext.sqlalchemy import db + -@celery.task(name='invenio.bibworkflow_workers.worker_celery.run_worker') +@celery.task(name='invenio.modules.workflows.workers.worker_celery.run_worker') def celery_run(workflow_name, data, **kwargs): """ Runs the workflow with Celery """ + from ..worker_engine import run_worker + + if isinstance(data, list): + for i in range(0, len(data)): + if isinstance(data[i], BibWorkflowObjectIdContainer): + data[i] = data[i].get_object() + stack = data[i].get_extra_data().items() + while stack: + k, v = stack.pop() + if isinstance(v, dict): + stack.extend(v.iteritems()) + elif isinstance(v, db.Model): + # try except pass to maintain compatibility in case SQLAlchemy is fixed + try: + db.session.merge(data[i].extra_data["repository"]) + db.session.add(data[i].extra_data["repository"]) + db.session.commit() + except: + print "Celery : SQLAlchemy decoherence data object" + else: + if isinstance(data, BibWorkflowObjectIdContainer): + data = data.get_object() + stack = data.get_extra_data().items() + while stack: + k, v = stack.pop() + if isinstance(v, dict): + stack.extend(v.iteritems()) + elif isinstance(v, db.Model): + # try except pass to maintain compatibility in case SQLAlchemy is fixed + try: + db.session.merge(data.extra_data["repository"]) + db.session.add(data.extra_data["repository"]) + db.session.commit() + except: + print "Celery : SQLAlchemy decoherence data object" + run_worker(workflow_name, data, **kwargs) -@celery.task(name='invenio.bibworkflow_workers.worker_celery.restart_worker') +@celery.task(name='invenio.modules.workflows.workers.worker_celery.restart_worker') def celery_restart(wid, **kwargs): """ Restarts the workflow with Celery """ from ..worker_engine import restart_worker restart_worker(wid, **kwargs) -@celery.task(name='invenio.bibworkflow_workers.worker_celery.continue_worker') +@celery.task(name='invenio.modules.workflows.workers.worker_celery.continue_worker') def celery_continue(oid, restart_point, **kwargs): """ Restarts the workflow with Celery """ from ..worker_engine import continue_worker continue_worker(oid, restart_point, **kwargs) class worker_celery(object): def run_worker(self, workflow_name, data, **kwargs): """ Helper function to get celery task decorators to worker_celery @param workflow_name: name of the workflow to be run @type workflow_name: string @param data: list of objects for the workflow @type data: list """ return celery_run.delay(workflow_name, data, **kwargs) def restart_worker(self, wid, **kwargs): """ Helper function to get celery task decorators to worker_celery @param wid: uuid of the workflow to be run @type wid: string """ return celery_restart.delay(wid, **kwargs) def continue_worker(self, oid, restart_point, **kwargs): """ Helper function to get celery task decorators to worker_celery @param oid: uuid of the object to be started @type oid: string @param restart_point: sets the start point @type restart_point: string """ return celery_continue.delay(oid, restart_point, **kwargs) diff --git a/invenio/modules/workflows/workflows/full_doc_process.py b/invenio/modules/workflows/workflows/full_doc_process.py new file mode 100644 index 000000000..d6a577ecd --- /dev/null +++ b/invenio/modules/workflows/workflows/full_doc_process.py @@ -0,0 +1,42 @@ +## This file is part of Invenio. +## Copyright (C) 2012 CERN. +## +## Invenio is free software; you can redistribute it and/or +## modify it under the terms of the GNU General Public License as +## published by the Free Software Foundation; either version 2 of the +## License, or (at your option) any later version. +## +## Invenio is distributed in the hope that it will be useful, but +## WITHOUT ANY WARRANTY; without even the implied warranty of +## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +## General Public License for more details. +## +## You should have received a copy of the GNU General Public License +## along with Invenio; if not, write to the Free Software Foundation, Inc., +## 59 Temple Place, Suite 330, Boston, MA 02111 1307, USA. + + +from ..tasks.marcxml_tasks import (convert_record, + plot_extract, + convert_record_to_bibfield, + fulltext_download, + refextract, + author_list, + upload_step, + quick_match_record + ) + +from ..tasks.workflows_tasks import log_info + +from workflow.patterns import IF_ELSE + + +class full_doc_process(object): + workflow = [convert_record("oaiarxiv2marcxml.xsl"), convert_record_to_bibfield, + IF_ELSE(quick_match_record, [log_info("branch two")], [log_info("branch one"), plot_extract(["latex"]), + fulltext_download, refextract, author_list, + upload_step]), + log_info("branch end") + ] + #workflow =[convert_record("oaiarxiv2marcxml.xsl"), convert_record_to_bibfield, author_list, upload_step] + diff --git a/invenio/modules/workflows/workflows/generic_harvesting_workflow.py b/invenio/modules/workflows/workflows/generic_harvesting_workflow.py new file mode 100644 index 000000000..92e61a716 --- /dev/null +++ b/invenio/modules/workflows/workflows/generic_harvesting_workflow.py @@ -0,0 +1,70 @@ +## This file is part of Invenio. +## Copyright (C) 2012 CERN. +## +## Invenio is free software; you can redistribute it and/or +## modify it under the terms of the GNU General Public License as +## published by the Free Software Foundation; either version 2 of the +## License, or (at your option) any later version. +## +## Invenio is distributed in the hope that it will be useful, but +## WITHOUT ANY WARRANTY; without even the implied warranty of +## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +## General Public License for more details. +## +## You should have received a copy of the GNU General Public License +## along with Invenio; if not, write to the Free Software Foundation, Inc., +## 59 Temple Place, Suite 330, Boston, MA 02111 1307, USA. + +"""Implements an example of a typical ingestion workflow for MARCXML records""" + +from ..tasks.marcxml_tasks import (get_repositories_list, + init_harvesting, + harvest_records, + get_files_list, + get_eng_uuid_harvested, + get_records_from_file + ) + +from ..tasks.workflows_tasks import (start_workflow, + wait_for_workflows_to_complete, + workflows_reviews, + get_nb_workflow_created + ) + +from ..tasks.logic_tasks import (foreach, + endforeach + ) + +from ..tasks.bibsched_tasks import (write_something_bibsched, + get_and_save_task_parameter + ) + +from invenio.base.config import CFG_TMPSHAREDDIR + + +class generic_harvesting_workflow(object): + workflow = [init_harvesting, + write_something_bibsched("starting"), + get_and_save_task_parameter, + foreach(get_repositories_list(['arxiv']), "repository"), + [ + write_something_bibsched("harvesting"), + harvest_records, + foreach(get_files_list(CFG_TMPSHAREDDIR, get_eng_uuid_harvested)), + [ + foreach(get_records_from_file()), + [ + start_workflow("full_doc_process", None, stop_on_error=True), + write_something_bibsched(["Workflow started : ", get_nb_workflow_created, " "]), + ], + endforeach + ], + endforeach + ], + endforeach, + write_something_bibsched("waiting workflows"), + wait_for_workflows_to_complete, + write_something_bibsched("the end"), + workflows_reviews + ] + diff --git a/invenio/testsuite/test_celery.py b/invenio/testsuite/test_celery.py index a837f9aae..59d194448 100644 --- a/invenio/testsuite/test_celery.py +++ b/invenio/testsuite/test_celery.py @@ -1,95 +1,97 @@ # -*- coding: utf-8 -*- ## ## This file is part of Invenio. ## Copyright (C) 2013 CERN. ## ## Invenio is free software; you can redistribute it and/or ## modify it under the terms of the GNU General Public License as ## published by the Free Software Foundation; either version 2 of the ## License, or (at your option) any later version. ## ## Invenio is distributed in the hope that it will be useful, but ## WITHOUT ANY WARRANTY; without even the implied warranty of ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU ## General Public License for more details. ## ## You should have received a copy of the GNU General Public License ## along with Invenio; if not, write to the Free Software Foundation, Inc., ## 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA. """ Invenio Celery unit tests """ from __future__ import absolute_import # Invenio runs tests as standalone modules, instead of from their package # hierarchy. This means that this test module is executed as if you ran # 'python celery_unit_tests.py'. This causes the directory of this file to be added # to the sys.path. This causes the module invenio.celery to be importable # as just 'celery' since modules/miscutil/lib is now in sys.path. Thus, the real # celery package is no longer importable, causing an import error in # invenio.celery. Solutions could be to rename invenio.celery, however then you # cannot start celery workers with 'celeryd -A invenio' and must use something # like 'celeryd -A invenio.<new name>.celery' instead. # # Below lines removes the directory of this file from the sys.path so modules # are imported from their proper location. import sys import os from distutils.sysconfig import get_python_lib EXCLUDE = [ '', os.path.dirname(os.path.abspath(__file__)), os.path.join(get_python_lib(), 'invenio'), ] sys.path = filter( lambda x: x not in EXCLUDE, sys.path ) from invenio.testsuite import make_test_suite, run_test_suite, InvenioTestCase from invenio.celery import celery class CeleryTest(InvenioTestCase): def setUp(self): # Execute tasks synchronously celery.conf.CELERY_ALWAYS_EAGER = True # Set in-memory result backend celery.conf.CELERY_RESULT_BACKEND = 'cache' celery.conf.CELERY_CACHE_BACKEND = 'memory' # Don't silence exceptions in tasks. celery.conf.CELERY_EAGER_PROPAGATES_EXCEPTIONS = True # Trigger task registering (this is done by the worker, to register all tasks) celery.loader.import_default_modules() def test_loader(self): """ Test if `workers.py` files are correctly registered. """ self.assertTrue('invenio.celery.tasks.invenio_version' in celery.tasks) def test_task_invenio_version(self): """ Test calling of tasks """ - from invenio.config import CFG_VERSION - from invenio.celery.tasks import invenio_version + + from invenio.base.config import CFG_VERSION + from invenio.celery.workers import invenio_version + # Call task function without celery self.assertEqual(invenio_version(), CFG_VERSION) # Call task via Celery machinery self.assertEqual(invenio_version.delay().get(), CFG_VERSION) def test_task_invenio_db_test(self): """ Test Flask request context in tasks """ from invenio.celery.tasks import invenio_db_test # Call task via Celery machinery self.assertEqual(invenio_db_test.delay(1).get(), 1) self.assertEqual(invenio_db_test.delay(2).get(), 2) self.assertEqual(invenio_db_test.delay(3).get(), 3) # Call task without Celery machinery. with celery.loader.flask_app.test_request_context(): self.assertEqual(invenio_db_test(1), 1) TEST_SUITE = make_test_suite(CeleryTest) if __name__ == "__main__": run_test_suite(TEST_SUITE)