diff --git a/modules/bibupload/lib/bibupload.py b/modules/bibupload/lib/bibupload.py index d49aa9d04..4c6d4a2d4 100644 --- a/modules/bibupload/lib/bibupload.py +++ b/modules/bibupload/lib/bibupload.py @@ -1,2001 +1,2007 @@ # -*- coding: utf-8 -*- ## ## This file is part of CDS Invenio. ## Copyright (C) 2002, 2003, 2004, 2005, 2006, 2007, 2008 CERN. ## ## CDS Invenio is free software; you can redistribute it and/or ## modify it under the terms of the GNU General Public License as ## published by the Free Software Foundation; either version 2 of the ## License, or (at your option) any later version. ## ## CDS Invenio is distributed in the hope that it will be useful, but ## WITHOUT ANY WARRANTY; without even the implied warranty of ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU ## General Public License for more details. ## ## You should have received a copy of the GNU General Public License ## along with CDS Invenio; if not, write to the Free Software Foundation, Inc., ## 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA. """ BibUpload: Receive MARC XML file and update the appropriate database tables according to options. Usage: bibupload [options] input.xml Examples: $ bibupload -i input.xml Options: -a, --append new fields are appended to the existing record -c, --correct fields are replaced by the new ones in the existing record -f, --format takes only the FMT fields into account. Does not update -i, --insert insert the new record in the database -r, --replace the existing record is entirely replaced by the new one -d, --delete specified fields are deleted if existing -z, --reference update references (update only 999 fields) -s, --stage=STAGE stage to start from in the algorithm (0: always done; 1: FMT tags; 2: FFT tags; 3: BibFmt; 4: Metadata update; 5: time update) -n, --notimechange do not change record last modification date when updating -o, --holdingpen Makes bibupload insert into holding pen instead the normal database Scheduling options: -u, --user=USER user name to store task, password needed General options: -h, --help print this help and exit -v, --verbose=LEVEL verbose level (from 0 to 9, default 1) -V --version print the script version """ __revision__ = "$Id$" import os import re import sys import time from zlib import compress import urllib2 import socket import marshal import copy from invenio.config import CFG_OAI_ID_FIELD, \ CFG_BIBUPLOAD_REFERENCE_TAG, \ CFG_BIBUPLOAD_EXTERNAL_SYSNO_TAG, \ CFG_BIBUPLOAD_EXTERNAL_OAIID_TAG, \ CFG_BIBUPLOAD_EXTERNAL_OAIID_PROVENANCE_TAG, \ CFG_BIBUPLOAD_STRONG_TAGS, \ CFG_BIBUPLOAD_CONTROLLED_PROVENANCE_TAGS, \ CFG_BIBUPLOAD_SERIALIZE_RECORD_STRUCTURE from invenio.bibupload_config import CFG_BIBUPLOAD_CONTROLFIELD_TAGS, \ CFG_BIBUPLOAD_SPECIAL_TAGS from invenio.dbquery import run_sql, \ Error from invenio.bibrecord import create_records, \ record_add_field, \ record_delete_field, \ record_xml_output, \ record_get_field_instances, \ record_get_field_values, \ field_get_subfield_values, \ field_get_subfield_instances, \ record_extract_oai_id, \ record_modify_subfield, \ record_delete_subfield_from, \ record_delete_fields, \ record_add_subfield_into, \ record_find_field from invenio.search_engine import get_record from invenio.dateutils import convert_datestruct_to_datetext from invenio.errorlib import register_exception from invenio.intbitset import intbitset from invenio.config import CFG_WEBSUBMIT_FILEDIR from invenio.bibtask import task_init, write_message, \ task_set_option, task_get_option, task_get_task_param, task_update_status, \ task_update_progress, task_sleep_now_if_required, fix_argv_paths from invenio.bibdocfile import BibRecDocs, file_strip_ext, normalize_format, \ get_docname_from_url, get_format_from_url, check_valid_url, download_url, \ KEEP_OLD_VALUE, decompose_bibdocfile_url, InvenioWebSubmitFileError, \ bibdocfile_url_p #Statistic variables stat = {} stat['nb_records_to_upload'] = 0 stat['nb_records_updated'] = 0 stat['nb_records_inserted'] = 0 stat['nb_errors'] = 0 stat['nb_holdingpen'] = 0 stat['exectime'] = time.localtime() ## Let's set a reasonable timeout for URL request (e.g. FFT) socket.setdefaulttimeout(40) _re_find_001 = re.compile('\\s*(\\d*)\\s*', re.S) def bibupload_pending_recids(): """This function embed a bit of A.I. and is more a hack than an elegant algorithm. It should be updated in case bibupload/bibsched are modified in incompatible ways. This function return the intbitset of all the records that are being (or are scheduled to be) touched by other bibuploads. """ options = run_sql("""SELECT arguments FROM schTASK WHERE status<>'DONE' AND proc='bibupload' AND (status='RUNNING' OR status='CONTINUING' OR status='WAITING' OR status='SCHEDULED' OR status='ABOUT TO STOP' OR status='ABOUT TO SLEEP')""") ret = intbitset() xmls = [] if options: for arguments in options: arguments = marshal.loads(arguments[0]) for argument in arguments[1:]: if argument.startswith('/'): # XMLs files are recognizable because they're absolute # files... xmls.append(argument) for xmlfile in xmls: # Let's grep for the 001 try: xml = open(xmlfile).read() ret += [int(group[1]) for group in _re_find_001.findall(xml)] except: continue return ret ### bibupload engine functions: def bibupload(record, opt_tag=None, opt_mode=None, opt_stage_to_start_from=1, opt_notimechange=0, oai_rec_id = ""): """Main function: process a record and fit it in the tables bibfmt, bibrec, bibrec_bibxxx, bibxxx with proper record metadata. Return (error_code, recID) of the processed record. """ assert(opt_mode in ('insert', 'replace', 'replace_or_insert', 'reference', 'correct', 'append', 'format', 'holdingpen', 'delete')) error = None # If there are special tags to proceed check if it exists in the record if opt_tag is not None and not(record.has_key(opt_tag)): write_message(" Failed: Tag not found, enter a valid tag to update.", verbose=1, stream=sys.stderr) return (1, -1) # Extraction of the Record Id from 001, SYSNO or OAIID tags: rec_id = retrieve_rec_id(record, opt_mode) if rec_id == -1: return (1, -1) elif rec_id > 0: write_message(" -Retrieve record ID (found %s): DONE." % rec_id, verbose=2) if not record.has_key('001'): # Found record ID by means of SYSNO or OAIID, and the # input MARCXML buffer does not have this 001 tag, so we # should add it now: error = record_add_field(record, '001', controlfield_value=rec_id) if error is None: write_message(" Failed: " \ "Error during adding the 001 controlfield " \ "to the record", verbose=1, stream=sys.stderr) return (1, int(rec_id)) else: error = None write_message(" -Added tag 001: DONE.", verbose=2) write_message(" -Check if the xml marc file is already in the database: DONE" , verbose=2) # Reference mode check if there are reference tag if opt_mode == 'reference': error = extract_tag_from_record(record, CFG_BIBUPLOAD_REFERENCE_TAG) if error is None: write_message(" Failed: No reference tags has been found...", verbose=1, stream=sys.stderr) return (1, -1) else: error = None write_message(" -Check if reference tags exist: DONE", verbose=2) record_deleted_p = False if opt_mode == 'insert' or \ (opt_mode == 'replace_or_insert' and rec_id is None): insert_mode_p = True # Insert the record into the bibrec databases to have a recordId rec_id = create_new_record() write_message(" -Creation of a new record id (%d): DONE" % rec_id, verbose=2) # we add the record Id control field to the record error = record_add_field(record, '001', controlfield_value=rec_id) if error is None: write_message(" Failed: " \ "Error during adding the 001 controlfield " \ "to the record", verbose=1, stream=sys.stderr) return (1, int(rec_id)) else: error = None elif opt_mode != 'insert' and opt_mode != 'format' and \ opt_stage_to_start_from != 5: insert_mode_p = False # Update Mode # Retrieve the old record to update rec_old = get_record(rec_id) # Also save a copy to restore previous situation in case of errors original_record = get_record(rec_id) if rec_old is None: write_message(" Failed during the creation of the old record!", verbose=1, stream=sys.stderr) return (1, int(rec_id)) else: write_message(" -Retrieve the old record to update: DONE", verbose=2) # In Replace mode, take over old strong tags if applicable: if opt_mode == 'replace' or \ opt_mode == 'replace_or_insert': copy_strong_tags_from_old_record(record, rec_old) # Delete tags to correct in the record if opt_mode == 'correct' or opt_mode == 'reference': delete_tags_to_correct(record, rec_old, opt_tag) write_message(" -Delete the old tags to correct in the old record: DONE", verbose=2) # Delete tags specified if in delete mode if opt_mode == 'delete': record = delete_tags(record, rec_old) write_message(" -Delete specified tags in the old record: DONE", verbose=2) # Append new tag to the old record and update the new record with the old_record modified if opt_mode == 'append' or opt_mode == 'correct' or \ opt_mode == 'reference': record = append_new_tag_to_old_record(record, rec_old, opt_tag, opt_mode) write_message(" -Append new tags to the old record: DONE", verbose=2) # now we clear all the rows from bibrec_bibxxx from the old # record (they will be populated later (if needed) during # stage 4 below): delete_bibrec_bibxxx(rec_old, rec_id) record_deleted_p = True write_message(" -Clean bibrec_bibxxx: DONE", verbose=2) write_message(" -Stage COMPLETED", verbose=2) try: # Have a look if we have FMT tags write_message("Stage 1: Start (Insert of FMT tags if exist).", verbose=2) if opt_stage_to_start_from <= 1 and \ extract_tag_from_record(record, 'FMT') is not None: record = insert_fmt_tags(record, rec_id, opt_mode) if record is None: write_message(" Stage 1 failed: Error while inserting FMT tags", verbose=1, stream=sys.stderr) return (1, int(rec_id)) elif record == 0: # Mode format finished stat['nb_records_updated'] += 1 return (0, int(rec_id)) write_message(" -Stage COMPLETED", verbose=2) else: write_message(" -Stage NOT NEEDED", verbose=2) # Have a look if we have FFT tags write_message("Stage 2: Start (Process FFT tags if exist).", verbose=2) record_had_FFT = False if opt_stage_to_start_from <= 2 and \ extract_tag_from_record(record, 'FFT') is not None: record_had_FFT = True if not writing_rights_p(): write_message(" Stage 2 failed: Error no rights to write fulltext files", verbose=1, stream=sys.stderr) task_update_status("ERROR") sys.exit(1) try: record = elaborate_fft_tags(record, rec_id, opt_mode) except Exception, e: register_exception() write_message(" Stage 2 failed: Error while elaborating FFT tags: %s" % e, verbose=1, stream=sys.stderr) return (1, int(rec_id)) if record is None: write_message(" Stage 2 failed: Error while elaborating FFT tags", verbose=1, stream=sys.stderr) return (1, int(rec_id)) write_message(" -Stage COMPLETED", verbose=2) else: write_message(" -Stage NOT NEEDED", verbose=2) # Have a look if we have FFT tags write_message("Stage 2B: Start (Synchronize 8564 tags).", verbose=2) has_bibdocs = run_sql("SELECT count(id_bibdoc) FROM bibrec_bibdoc JOIN bibdoc ON id_bibdoc=id WHERE id_bibrec=%s AND status<>'DELETED'", (rec_id, ))[0][0] > 0 if opt_stage_to_start_from <= 2 and (has_bibdocs or record_had_FFT or extract_tag_from_record(record, '856') is not None): try: record = synchronize_8564(rec_id, record, record_had_FFT) except Exception, e: register_exception(alert_admin=True) write_message(" Stage 2B failed: Error while synchronizing 8564 tags: %s" % e, verbose=1, stream=sys.stderr) return (1, int(rec_id)) if record is None: write_message(" Stage 2B failed: Error while synchronizing 8564 tags", verbose=1, stream=sys.stderr) return (1, int(rec_id)) write_message(" -Stage COMPLETED", verbose=2) else: write_message(" -Stage NOT NEEDED", verbose=2) # Update of the BibFmt write_message("Stage 3: Start (Update bibfmt).", verbose=2) if opt_stage_to_start_from <= 3: # format the single record as xml rec_xml_new = record_xml_output(record) # Update bibfmt with the format xm of this record if opt_mode != 'format': error = update_bibfmt_format(rec_id, rec_xml_new, 'xm') if error == 1: write_message(" Failed: error during update_bibfmt_format 'xm'", verbose=1, stream=sys.stderr) return (1, int(rec_id)) if CFG_BIBUPLOAD_SERIALIZE_RECORD_STRUCTURE: error = update_bibfmt_format(rec_id, marshal.dumps(record), 'recstruct') if error == 1: write_message(" Failed: error during update_bibfmt_format 'recstruct'", verbose=1, stream=sys.stderr) return (1, int(rec_id)) # archive MARCXML format of this record for version history purposes: error = archive_marcxml_for_history(rec_id) if error == 1: write_message(" Failed to archive MARCXML for history", verbose=1, stream=sys.stderr) return (1, int(rec_id)) else: write_message(" -Archived MARCXML for history : DONE", verbose=2) write_message(" -Stage COMPLETED", verbose=2) # Update the database MetaData write_message("Stage 4: Start (Update the database with the metadata).", verbose=2) if opt_stage_to_start_from <= 4: if opt_mode in ('insert', 'replace', 'replace_or_insert', 'append', 'correct', 'reference', 'delete'): update_database_with_metadata(record, rec_id, oai_rec_id) record_deleted_p = False else: write_message(" -Stage NOT NEEDED in mode %s" % opt_mode, verbose=2) write_message(" -Stage COMPLETED", verbose=2) else: write_message(" -Stage NOT NEEDED", verbose=2) # Finally we update the bibrec table with the current date write_message("Stage 5: Start (Update bibrec table with current date).", verbose=2) if opt_stage_to_start_from <= 5 and \ opt_notimechange == 0 and \ not insert_mode_p: now = convert_datestruct_to_datetext(time.localtime()) write_message(" -Retrieved current localtime: DONE", verbose=2) update_bibrec_modif_date(now, rec_id) write_message(" -Stage COMPLETED", verbose=2) else: write_message(" -Stage NOT NEEDED", verbose=2) # Increase statistics if insert_mode_p: stat['nb_records_inserted'] += 1 else: stat['nb_records_updated'] += 1 # Upload of this record finish write_message("Record "+str(rec_id)+" DONE", verbose=1) return (0, int(rec_id)) finally: if record_deleted_p: ## BibUpload has failed living the record deleted. We should ## back the original record then. update_database_with_metadata(original_record, rec_id, oai_rec_id) write_message(" Restored original record", verbose=1, stream=sys.stderr) def insert_record_into_holding_pen(record, oai_id): query = "INSERT INTO oaiHOLDINGPEN (oai_id, date_inserted, record_XML) VALUES (%s, NOW(), %s)" xml_record = record_xml_output(record) run_sql(query, (oai_id, xml_record)) # record_id is logged as 0! ( We are not inserting into the main database) log_record_uploading(oai_id, task_get_task_param('task_id', 0), 0, 'H') stat['nb_holdingpen'] += 1 def print_out_bibupload_statistics(): """Print the statistics of the process""" out = "Task stats: %(nb_input)d input records, %(nb_updated)d updated, " \ "%(nb_inserted)d inserted, %(nb_errors)d errors, %(nb_holdingpen)d inserted to holding pen. " \ "Time %(nb_sec).2f sec." % { \ 'nb_input': stat['nb_records_to_upload'], 'nb_updated': stat['nb_records_updated'], 'nb_inserted': stat['nb_records_inserted'], 'nb_errors': stat['nb_errors'], 'nb_holdingpen': stat['nb_holdingpen'], 'nb_sec': time.time() - time.mktime(stat['exectime']) } write_message(out) def open_marc_file(path): """Open a file and return the data""" try: # open the file containing the marc document marc_file = open(path,'r') marc = marc_file.read() marc_file.close() except IOError, erro: write_message("Error: %s" % erro, verbose=1, stream=sys.stderr) write_message("Exiting.", sys.stderr) task_update_status("ERROR") sys.exit(1) return marc def xml_marc_to_records(xml_marc): """create the records""" # Creation of the records from the xml Marc in argument recs = create_records(xml_marc, 1, 1) if recs == []: write_message("Error: Cannot parse MARCXML file.", verbose=1, stream=sys.stderr) write_message("Exiting.", sys.stderr) task_update_status("ERROR") sys.exit(1) elif recs[0][0] is None: write_message("Error: MARCXML file has wrong format: %s" % recs, verbose=1, stream=sys.stderr) write_message("Exiting.", sys.stderr) task_update_status("ERROR") sys.exit(1) else: recs = map((lambda x:x[0]), recs) return recs def find_record_format(rec_id, format): """Look whether record REC_ID is formatted in FORMAT, i.e. whether FORMAT exists in the bibfmt table for this record. Return the number of times it is formatted: 0 if not, 1 if yes, 2 if found more than once (should never occur). """ out = 0 query = """SELECT COUNT(id) FROM bibfmt WHERE id_bibrec=%s AND format=%s""" params = (rec_id, format) res = [] try: res = run_sql(query, params) out = res[0][0] except Error, error: write_message(" Error during find_record_format() : %s " % error, verbose=1, stream=sys.stderr) return out def find_record_from_recid(rec_id): """ Try to find record in the database from the REC_ID number. Return record ID if found, None otherwise. """ try: res = run_sql("SELECT id FROM bibrec WHERE id=%s", (rec_id,)) except Error, error: write_message(" Error during find_record_bibrec() : %s " % error, verbose=1, stream=sys.stderr) if res: return res[0][0] else: return None def find_record_from_sysno(sysno): """ Try to find record in the database from the external SYSNO number. Return record ID if found, None otherwise. """ bibxxx = 'bib'+CFG_BIBUPLOAD_EXTERNAL_SYSNO_TAG[0:2]+'x' bibrec_bibxxx = 'bibrec_' + bibxxx try: res = run_sql("""SELECT bb.id_bibrec FROM %(bibrec_bibxxx)s AS bb, %(bibxxx)s AS b WHERE b.tag=%%s AND b.value=%%s AND bb.id_bibxxx=b.id""" % \ {'bibxxx': bibxxx, 'bibrec_bibxxx': bibrec_bibxxx}, (CFG_BIBUPLOAD_EXTERNAL_SYSNO_TAG, sysno,)) except Error, error: write_message(" Error during find_record_from_sysno(): %s " % error, verbose=1, stream=sys.stderr) if res: return res[0][0] else: return None def find_records_from_extoaiid(extoaiid, extoaisrc=None): """ Try to find records in the database from the external EXTOAIID number. Return list of record ID if found, None otherwise. """ assert(CFG_BIBUPLOAD_EXTERNAL_OAIID_TAG[:5] == CFG_BIBUPLOAD_EXTERNAL_OAIID_PROVENANCE_TAG[:5]) bibxxx = 'bib'+CFG_BIBUPLOAD_EXTERNAL_OAIID_TAG[0:2]+'x' bibrec_bibxxx = 'bibrec_' + bibxxx try: write_message(' Looking for extoaiid="%s" with extoaisrc="%s"' % (extoaiid, extoaisrc), verbose=9) id_bibrecs = intbitset(run_sql("""SELECT bb.id_bibrec FROM %(bibrec_bibxxx)s AS bb, %(bibxxx)s AS b WHERE b.tag=%%s AND b.value=%%s AND bb.id_bibxxx=b.id""" % \ {'bibxxx': bibxxx, 'bibrec_bibxxx': bibrec_bibxxx}, (CFG_BIBUPLOAD_EXTERNAL_OAIID_TAG, extoaiid,))) write_message(' Partially found %s for extoaiid="%s"' % (id_bibrecs, extoaiid), verbose=9) ret = intbitset() for id_bibrec in id_bibrecs: record = get_record(id_bibrec) instances = record_get_field_instances(record, CFG_BIBUPLOAD_EXTERNAL_OAIID_TAG[0:3], CFG_BIBUPLOAD_EXTERNAL_OAIID_TAG[3], CFG_BIBUPLOAD_EXTERNAL_OAIID_TAG[4]) write_message(' recid %s -> instances "%s"' % (id_bibrec, instances), verbose=9) for instance in instances: provenance = field_get_subfield_values(instance, CFG_BIBUPLOAD_EXTERNAL_OAIID_PROVENANCE_TAG[5]) write_message(' recid %s -> provenance "%s"' % (id_bibrec, provenance), verbose=9) provenance = provenance and provenance[0] or None if provenance is None: if extoaisrc is None: write_message('Found recid %s for extoaiid="%s"' % (id_bibrec, extoaiid), verbose=9) ret.add(id_bibrec) break else: raise Error('Found recid %s for extoaiid="%s" that doesn\'t specify any provenance, while input record does.' % (id_bibrec, extoaiid)) else: if extoaiid is None: raise Error('Found recid %s for extoaiid="%s" that specify as provenance "%s", while input record does not specify any provenance.' % (id_bibrec, extoaiid, provenance)) elif provenance == extoaisrc: write_message('Found recid %s for extoaiid="%s" with provenance="%s"' % (id_bibrec, extoaiid, extoaisrc), verbose=9) ret.add(id_bibrec) break return ret except Error, error: write_message(" Error during find_records_from_extoaiid(): %s " % error, verbose=1, stream=sys.stderr) raise def find_record_from_oaiid(oaiid): """ Try to find record in the database from the OAI ID number and OAI SRC. Return record ID if found, None otherwise. """ bibxxx = 'bib'+CFG_OAI_ID_FIELD[0:2]+'x' bibrec_bibxxx = 'bibrec_' + bibxxx try: res = run_sql("""SELECT bb.id_bibrec FROM %(bibrec_bibxxx)s AS bb, %(bibxxx)s AS b WHERE b.tag=%%s AND b.value=%%s AND bb.id_bibxxx=b.id""" % \ {'bibxxx': bibxxx, 'bibrec_bibxxx': bibrec_bibxxx}, (CFG_OAI_ID_FIELD, oaiid,)) except Error, error: write_message(" Error during find_record_from_oaiid(): %s " % error, verbose=1, stream=sys.stderr) if res: return res[0][0] else: return None def extract_tag_from_record(record, tag_number): """ Extract the tag_number for record.""" # first step verify if the record is not already in the database if record: return record.get(tag_number, None) return None def retrieve_rec_id(record, opt_mode): """Retrieve the record Id from a record by using tag 001 or SYSNO or OAI ID tag. opt_mod is the desired mode.""" rec_id = None # 1st step: we look for the tag 001 tag_001 = extract_tag_from_record(record, '001') if tag_001 is not None: # We extract the record ID from the tag rec_id = tag_001[0][3] # if we are in insert mode => error if opt_mode == 'insert': write_message(" Failed : Error tag 001 found in the xml" \ " submitted, you should use the option replace," \ " correct or append to replace an existing" \ " record. (-h for help)", verbose=1, stream=sys.stderr) return -1 else: # we found the rec id and we are not in insert mode => continue # we try to match rec_id against the database: if find_record_from_recid(rec_id) is not None: # okay, 001 corresponds to some known record return int(rec_id) else: # The record doesn't exist yet. We shall have try to check # the SYSNO or OAI id later. write_message(" -Tag 001 value not found in database.", verbose=9) rec_id = None else: write_message(" -Tag 001 not found in the xml marc file.", verbose=9) if rec_id is None: # 2nd step we look for the SYSNO sysnos = record_get_field_values(record, CFG_BIBUPLOAD_EXTERNAL_SYSNO_TAG[0:3], CFG_BIBUPLOAD_EXTERNAL_SYSNO_TAG[3:4] != "_" and \ CFG_BIBUPLOAD_EXTERNAL_SYSNO_TAG[3:4] or "", CFG_BIBUPLOAD_EXTERNAL_SYSNO_TAG[4:5] != "_" and \ CFG_BIBUPLOAD_EXTERNAL_SYSNO_TAG[4:5] or "", CFG_BIBUPLOAD_EXTERNAL_SYSNO_TAG[5:6]) if sysnos: sysno = sysnos[0] # there should be only one external SYSNO write_message(" -Checking if SYSNO " + sysno + \ " exists in the database", verbose=9) # try to find the corresponding rec id from the database rec_id = find_record_from_sysno(sysno) if rec_id is not None: # rec_id found pass else: # The record doesn't exist yet. We will try to check # external and internal OAI ids later. write_message(" -Tag SYSNO value not found in database.", verbose=9) rec_id = None else: write_message(" -Tag SYSNO not found in the xml marc file.", verbose=9) if rec_id is None: # 2nd step we look for the external OAIID extoai_fields = record_get_field_instances(record, CFG_BIBUPLOAD_EXTERNAL_OAIID_TAG[0:3], CFG_BIBUPLOAD_EXTERNAL_OAIID_TAG[3:4] != "_" and \ CFG_BIBUPLOAD_EXTERNAL_OAIID_TAG[3:4] or "", CFG_BIBUPLOAD_EXTERNAL_OAIID_TAG[4:5] != "_" and \ CFG_BIBUPLOAD_EXTERNAL_OAIID_TAG[4:5] or "") if extoai_fields: for field in extoai_fields: extoaiid = field_get_subfield_values(field, CFG_BIBUPLOAD_EXTERNAL_OAIID_TAG[5:6]) extoaisrc = field_get_subfield_values(field, CFG_BIBUPLOAD_EXTERNAL_OAIID_PROVENANCE_TAG[5:6]) if extoaiid: extoaiid = extoaiid[0] if extoaisrc: extoaisrc = extoaisrc[0] else: extoaisrc = None write_message(" -Checking if EXTOAIID %s (%s) exists in the database" % (extoaiid, extoaisrc), verbose=9) # try to find the corresponding rec id from the database try: rec_ids = find_records_from_extoaiid(extoaiid, extoaisrc) except Error, e: write_message(e, verbose=1, stream=sys.stderr) return -1 if rec_ids: # rec_id found rec_id = rec_ids.pop() break else: # The record doesn't exist yet. We will try to check # OAI id later. write_message(" -Tag EXTOAIID value not found in database.", verbose=9) rec_id = None else: write_message(" -Tag EXTOAIID not found in the xml marc file.", verbose=9) if rec_id is None: # 4th step we look for the OAI ID oaiidvalues = record_get_field_values(record, CFG_OAI_ID_FIELD[0:3], CFG_OAI_ID_FIELD[3:4] != "_" and \ CFG_OAI_ID_FIELD[3:4] or "", CFG_OAI_ID_FIELD[4:5] != "_" and \ CFG_OAI_ID_FIELD[4:5] or "", CFG_OAI_ID_FIELD[5:6]) if oaiidvalues: oaiid = oaiidvalues[0] # there should be only one OAI ID write_message(" -Check if local OAI ID " + oaiid + \ " exist in the database", verbose=9) # try to find the corresponding rec id from the database rec_id = find_record_from_oaiid(oaiid) if rec_id is not None: # rec_id found pass else: write_message(" -Tag OAI ID value not found in database.", verbose=9) rec_id = None else: write_message(" -Tag SYSNO not found in the xml marc file.", verbose=9) # Now we should have detected rec_id from SYSNO or OAIID # tags. (None otherwise.) if rec_id: if opt_mode == 'insert': write_message(" Failed : Record found in the database," \ " you should use the option replace," \ " correct or append to replace an existing" \ " record. (-h for help)", verbose=1, stream=sys.stderr) return -1 else: if opt_mode != 'insert' and \ opt_mode != 'replace_or_insert': write_message(" Failed : Record not found in the database."\ " Please insert the file before updating it."\ " (-h for help)", verbose=1, stream=sys.stderr) return -1 return rec_id and int(rec_id) or None ### Insert functions def create_new_record(): """Create new record in the database""" now = convert_datestruct_to_datetext(time.localtime()) query = """INSERT INTO bibrec (creation_date, modification_date) VALUES (%s, %s)""" params = (now, now) try: rec_id = run_sql(query, params) return rec_id except Error, error: write_message(" Error during the creation_new_record function : %s " % error, verbose=1, stream=sys.stderr) return None def insert_bibfmt(id_bibrec, marc, format, modification_date='1970-01-01 00:00:00'): """Insert the format in the table bibfmt""" # compress the marc value pickled_marc = compress(marc) try: time.strptime(modification_date, "%Y-%m-%d %H:%M:%S") except ValueError: modification_date = '1970-01-01 00:00:00' query = """INSERT INTO bibfmt (id_bibrec, format, last_updated, value) VALUES (%s, %s, %s, %s)""" try: row_id = run_sql(query, (id_bibrec, format, modification_date, pickled_marc)) return row_id except Error, error: write_message(" Error during the insert_bibfmt function : %s " % error, verbose=1, stream=sys.stderr) return None def insert_record_bibxxx(tag, value): """Insert the record into bibxxx""" # determine into which table one should insert the record table_name = 'bib'+tag[0:2]+'x' # check if the tag, value combination exists in the table query = """SELECT id,value FROM %s """ % table_name query += """ WHERE tag=%s AND value=%s""" params = (tag, value) try: res = run_sql(query, params) except Error, error: write_message(" Error during the insert_record_bibxxx function : %s " % error, verbose=1, stream=sys.stderr) # Note: compare now the found values one by one and look for # string binary equality (e.g. to respect lowercase/uppercase # match), regardless of the charset etc settings. Ideally we # could use a BINARY operator in the above SELECT statement, but # we would have to check compatibility on various MySQLdb versions # etc; this approach checks all matched values in Python, not in # MySQL, which is less cool, but more conservative, so it should # work better on most setups. for row in res: row_id = row[0] row_value = row[1] if row_value == value: return (table_name, row_id) # We got here only when the tag,value combination was not found, # so it is now necessary to insert the tag,value combination into # bibxxx table as new. query = """INSERT INTO %s """ % table_name query += """ (tag, value) values (%s , %s)""" params = (tag, value) try: row_id = run_sql(query, params) except Error, error: write_message(" Error during the insert_record_bibxxx function : %s " % error, verbose=1, stream=sys.stderr) return (table_name, row_id) def insert_record_bibrec_bibxxx(table_name, id_bibxxx, field_number, id_bibrec): """Insert the record into bibrec_bibxxx""" # determine into which table one should insert the record full_table_name = 'bibrec_'+ table_name # insert the proper row into the table query = """INSERT INTO %s """ % full_table_name query += """(id_bibrec,id_bibxxx, field_number) values (%s , %s, %s)""" params = (id_bibrec, id_bibxxx, field_number) try: res = run_sql(query, params) except Error, error: write_message(" Error during the insert_record_bibrec_bibxxx" " function 2nd query : %s " % error, verbose=1, stream=sys.stderr) return res def synchronize_8564(rec_id, record, record_had_FFT): """ Synchronize 8564_ tags and BibDocFile tables. This function directly manipulate the record parameter. @type rec_id: positive integer @param rec_id: the record identifier. @param record: the record structure as created by bibrecord.create_record @type record_had_FFT: boolean @param record_had_FFT: True if the incoming bibuploaded-record used FFT @return: the manipulated record (which is also modified as a side effect) """ def merge_marc_into_bibdocfile(field): """ Internal function that reads a single field and store its content in BibDocFile tables. @param field: the 8564_ field containing a BibDocFile URL. """ write_message('Merging field: %s' % (field, ), verbose=9) url = field_get_subfield_values(field, 'u')[:1] or field_get_subfield_values(field, 'q')[:1] description = field_get_subfield_values(field, 'y')[:1] comment = field_get_subfield_values(field, 'z')[:1] if url: recid, docname, format = decompose_bibdocfile_url(url[0]) if recid != rec_id: write_message("INFO: URL %s is not pointing to a fulltext owned by this record (%s)" % (url, recid), stream=sys.stderr) else: try: bibdoc = BibRecDocs(recid).get_bibdoc(docname) if description: bibdoc.set_description(description[0], format) if comment: bibdoc.set_comment(comment[0], format) except InvenioWebSubmitFileError: ## Apparently the referenced docname doesn't exist anymore. ## Too bad. Let's skip it. write_message("WARNING: docname %s doesn't exist for record %s. Has it been renamed outside FFT?" % (docname, recid), stream=sys.stderr) def merge_bibdocfile_into_marc(field, subfields): """ Internal function that reads BibDocFile table entries referenced by the URL in the given 8564_ field and integrate the given information directly with the provided subfields. @param field: the 8564_ field containing a BibDocFile URL. @param subfields: the subfields corresponding to the BibDocFile URL generated after BibDocFile tables. """ write_message('Merging subfields %s into field %s' % (subfields, field), verbose=9) subfields = dict(subfields) ## We make a copy not to have side-effects subfield_to_delete = [] for subfield_index, (code, value) in enumerate(field_get_subfield_instances(field)): ## For each subfield instance already existing... if code in subfields: ## ...We substitute it with what is in BibDocFile tables record_modify_subfield(record, '856', field[4], code, subfields[code], subfield_index) del subfields[code] else: ## ...We delete it otherwise subfield_to_delete.append(subfield_index) subfield_to_delete.sort() for counter, position in enumerate(subfield_to_delete): ## FIXME: Very hackish algorithm. Since deleting a subfield ## will alterate the position of following subfields, we ## are taking note of this and adjusting further position ## by using a counter. record_delete_subfield_from(record, '856', field[4], position - counter) subfields = subfields.items() subfields.sort() for code, value in subfields: ## Let's add non-previously existing subfields record_add_subfield_into(record, '856', field[4], code, value) def get_bibdocfile_managed_info(): """ Internal function to eturns a dictionary of BibDocFile URL -> wanna-be subfields. @rtype: mapping @return: BibDocFile URL -> wanna-be subfields dictionary """ ret = {} bibrecdocs = BibRecDocs(rec_id) latest_files = bibrecdocs.list_latest_files() for afile in latest_files: url = afile.get_url() ret[url] = {'u' : url} description = afile.get_description() comment = afile.get_comment() if description: ret[url]['y'] = description if comment: ret[url]['z'] = comment for bibdoc in bibrecdocs.list_bibdocs(): icon = bibdoc.get_icon() if icon: icon = icon.list_all_files() if icon: url = icon[0].get_url() ret[url] = {'q' : url, 'x' : 'icon'} return ret write_message("Synchronizing MARC of recid '%s' with:\n%s" % (rec_id, record), verbose=9) tags8564s = record_get_field_instances(record, '856', '4', ' ') write_message("Original 8564_ instances: %s" % tags8564s, verbose=9) tags8564s_to_add = get_bibdocfile_managed_info() write_message("BibDocFile instances: %s" % tags8564s_to_add, verbose=9) positions_tags8564s_to_remove = [] for local_position, field in enumerate(tags8564s): for url in field_get_subfield_values(field, 'u') + field_get_subfield_values(field, 'q'): if url in tags8564s_to_add: if record_had_FFT: merge_bibdocfile_into_marc(field, tags8564s_to_add[url]) else: merge_marc_into_bibdocfile(field) del tags8564s_to_add[url] break elif bibdocfile_url_p(url) and decompose_bibdocfile_url(url)[0] == rec_id: positions_tags8564s_to_remove.append(local_position) break record_delete_fields(record, '856', positions_tags8564s_to_remove) tags8564s_to_add = tags8564s_to_add.values() tags8564s_to_add.sort() for subfields in tags8564s_to_add: subfields = subfields.items() subfields.sort() record_add_field(record, '856', '4', ' ', subfields=subfields) write_message('Final record: %s' % record, verbose=9) return record def elaborate_fft_tags(record, rec_id, mode): """ Process FFT tags that should contain $a with file pathes or URLs to get the fulltext from. This function enriches record with proper 8564 URL tags, downloads fulltext files and stores them into var/data structure where appropriate. CFG_BIBUPLOAD_WGET_SLEEP_TIME defines time to sleep in seconds in between URL downloads. Note: if an FFT tag contains multiple $a subfields, we upload them into different 856 URL tags in the metadata. See regression test case test_multiple_fft_insert_via_http(). """ # Let's define some handy sub procedure. def _add_new_format(bibdoc, url, format, docname, doctype, newname, description, comment): """Adds a new format for a given bibdoc. Returns True when everything's fine.""" write_message('Add new format to %s url: %s, format: %s, docname: %s, doctype: %s, newname: %s, description: %s, comment: %s' % (repr(bibdoc), url, format, docname, doctype, newname, description, comment), verbose=9) try: if not url: # Not requesting a new url. Just updating comment & description return _update_description_and_comment(bibdoc, docname, format, description, comment) tmpurl = download_url(url, format) try: try: bibdoc.add_file_new_format(tmpurl, description=description, comment=comment) except StandardError, e: write_message("('%s', '%s', '%s', '%s', '%s', '%s', '%s') not inserted because format already exists (%s)." % (url, format, docname, doctype, newname, description, comment, e), stream=sys.stderr) raise finally: os.remove(tmpurl) except Exception, e: write_message("Error in downloading '%s' because of: %s" % (url, e), stream=sys.stderr) raise return True def _add_new_version(bibdoc, url, format, docname, doctype, newname, description, comment): """Adds a new version for a given bibdoc. Returns True when everything's fine.""" write_message('Add new version to %s url: %s, format: %s, docname: %s, doctype: %s, newname: %s, description: %s, comment: %s' % (repr(bibdoc), url, format, docname, doctype, newname, description, comment)) try: if not url: return _update_description_and_comment(bibdoc, docname, format, description, comment) tmpurl = download_url(url, format) try: try: bibdoc.add_file_new_version(tmpurl, description=description, comment=comment) except StandardError, e: write_message("('%s', '%s', '%s', '%s', '%s', '%s', '%s') not inserted because '%s'." % (url, format, docname, doctype, newname, description, comment, e), stream=sys.stderr) raise finally: os.remove(tmpurl) except Exception, e: write_message("Error in downloading '%s' because of: %s" % (url, e), stream=sys.stderr) raise return True def _update_description_and_comment(bibdoc, docname, format, description, comment): """Directly update comments and descriptions.""" write_message('Just updating description and comment for %s with format %s with description %s and comment %s' % (docname, format, description, comment), verbose=9) try: bibdoc.set_description(description, format) bibdoc.set_comment(comment, format) except StandardError, e: write_message("('%s', '%s', '%s', '%s') description and comment not updated because '%s'." % (docname, format, description, comment, e)) raise return True def _add_new_icon(bibdoc, url, restriction): """Adds a new icon to an existing bibdoc, replacing the previous one if it exists. If url is empty, just remove the current icon.""" if not url: bibdoc.delete_icon() else: try: path = urllib2.urlparse.urlsplit(url)[2] filename = os.path.split(path)[-1] format = filename[len(file_strip_ext(filename)):] tmpurl = download_url(url, format) try: try: icondoc = bibdoc.add_icon(tmpurl, 'icon-%s' % bibdoc.get_docname()) if restriction and restriction != KEEP_OLD_VALUE: icondoc.set_status(restriction) except StandardError, e: write_message("('%s', '%s') icon not added because '%s'." % (url, format, e), stream=sys.stderr) raise finally: os.remove(tmpurl) except Exception, e: write_message("Error in downloading '%s' because of: %s" % (url, e), stream=sys.stderr) raise return True if mode == 'delete': raise StandardError('FFT tag specified but bibupload executed in --delete mode') tuple_list = extract_tag_from_record(record, 'FFT') if tuple_list: # FFT Tags analysis write_message("FFTs: "+str(tuple_list), verbose=9) docs = {} # docnames and their data for fft in record_get_field_instances(record, 'FFT', ' ', ' '): # Let's discover the type of the document # This is a legacy field and will not be enforced any particular # check on it. doctype = field_get_subfield_values(fft, 't') if doctype: doctype = doctype[0] else: # Default is Main doctype = 'Main' # Let's discover the url. url = field_get_subfield_values(fft, 'a') if url: url = url[0] try: check_valid_url(url) except StandardError, e: raise StandardError, "fft '%s' specify an url ('%s') with problems: %s" % (fft, url, e) else: url = '' # Let's discover the description description = field_get_subfield_values(fft, 'd') if description != []: description = description[0] else: if mode == 'correct' and doctype != 'FIX-MARC': ## If the user require to correct, and do not specify ## a description this means she really want to ## modify the description. description = '' else: description = KEEP_OLD_VALUE # Let's discover the desired docname to be created/altered name = field_get_subfield_values(fft, 'n') if name: name = file_strip_ext(name[0]) else: if url: name = get_docname_from_url(url) else: write_message("Warning: fft '%s' doesn't specifies neither a url nor a name" % str(fft), stream=sys.stderr) continue # Let's discover the desired new docname in case we want to change it newname = field_get_subfield_values(fft, 'm') if newname: newname = file_strip_ext(newname[0]) else: newname = name # Let's discover the desired format format = field_get_subfield_values(fft, 'f') if format: format = format[0] else: if url: format = get_format_from_url(url) else: format = '' format = normalize_format(format) # Let's discover the icon icon = field_get_subfield_values(fft, 'x') if icon != []: icon = icon[0] if icon != KEEP_OLD_VALUE: try: check_valid_url(icon) except StandardError, e: raise StandardError, "fft '%s' specify an icon ('%s') with problems: %s" % (fft, icon, e) else: if mode == 'correct' and doctype != 'FIX-MARC': ## See comment on description icon = '' else: icon = KEEP_OLD_VALUE # Let's discover the comment comment = field_get_subfield_values(fft, 'z') if comment != []: comment = comment[0] else: if mode == 'correct' and doctype != 'FIX-MARC': ## See comment on description comment = '' else: comment = KEEP_OLD_VALUE # Let's discover the restriction restriction = field_get_subfield_values(fft, 'r') if restriction != []: restriction = restriction[0] else: if mode == 'correct' and doctype != 'FIX-MARC': ## See comment on description restriction = '' else: restriction = KEEP_OLD_VALUE version = field_get_subfield_values(fft, 'v') if version: version = version[0] else: version = '' if docs.has_key(name): # new format considered (doctype2, newname2, restriction2, icon2, version2, urls) = docs[name] if doctype2 != doctype: raise StandardError, "fft '%s' specifies a different doctype from previous fft with docname '%s'" % (str(fft), name) if newname2 != newname: raise StandardError, "fft '%s' specifies a different newname from previous fft with docname '%s'" % (str(fft), name) if restriction2 != restriction: raise StandardError, "fft '%s' specifies a different restriction from previous fft with docname '%s'" % (str(fft), name) if icon2 != icon: raise StandardError, "fft '%x' specifies a different icon than the previous fft with docname '%s'" % (str(fft), name) if version2 != version: raise StandardError, "fft '%x' specifies a different version than the previous fft with docname '%s'" % (str(fft), name) for (url2, format2, description2, comment2) in urls: if format == format2: raise StandardError, "fft '%s' specifies a second file '%s' with the same format '%s' from previous fft with docname '%s'" % (str(fft), url, format, name) if url or format: urls.append((url, format, description, comment)) else: if url or format: docs[name] = (doctype, newname, restriction, icon, version, [(url, format, description, comment)]) else: docs[name] = (doctype, newname, restriction, icon, version, []) write_message('Result of FFT analysis:\n\tDocs: %s' % (docs,), verbose=9) # Let's remove all FFT tags record_delete_field(record, 'FFT', ' ', ' ') # Preprocessed data elaboration bibrecdocs = BibRecDocs(rec_id) if mode == 'replace': # First we erase previous bibdocs for bibdoc in bibrecdocs.list_bibdocs(): bibdoc.delete() bibrecdocs.build_bibdoc_list() for docname, (doctype, newname, restriction, icon, version, urls) in docs.iteritems(): write_message("Elaborating olddocname: '%s', newdocname: '%s', doctype: '%s', restriction: '%s', icon: '%s', urls: '%s', mode: '%s'" % (docname, newname, doctype, restriction, icon, urls, mode), verbose=9) if mode in ('insert', 'replace'): # new bibdocs, new docnames, new marc if newname in bibrecdocs.get_bibdoc_names(): write_message("('%s', '%s') not inserted because docname already exists." % (newname, urls), stream=sys.stderr) raise StandardError try: bibdoc = bibrecdocs.add_bibdoc(doctype, newname) bibdoc.set_status(restriction) except Exception, e: write_message("('%s', '%s', '%s') not inserted because: '%s'." % (doctype, newname, urls, e), stream=sys.stderr) raise StandardError for (url, format, description, comment) in urls: assert(_add_new_format(bibdoc, url, format, docname, doctype, newname, description, comment)) if icon and not icon == KEEP_OLD_VALUE: assert(_add_new_icon(bibdoc, icon, restriction)) elif mode == 'replace_or_insert': # to be thought as correct_or_insert for bibdoc in bibrecdocs.list_bibdocs(): if bibdoc.get_docname() == docname: if doctype not in ('PURGE', 'DELETE', 'EXPUNGE', 'REVERT', 'FIX-ALL', 'FIX-MARC', 'DELETE-FILE'): if newname != docname: try: bibdoc.change_name(newname) icon = bibdoc.get_icon() if icon: icon.change_name('icon-%s' % newname) except StandardError, e: write_message(e, stream=sys.stderr) raise found_bibdoc = False for bibdoc in bibrecdocs.list_bibdocs(): if bibdoc.get_docname() == newname: found_bibdoc = True if doctype == 'PURGE': bibdoc.purge() elif doctype == 'DELETE': bibdoc.delete() elif doctype == 'EXPUNGE': bibdoc.expunge() elif doctype == 'FIX-ALL': bibrecdocs.fix(docname) elif doctype == 'FIX-MARC': pass elif doctype == 'DELETE-FILE': if urls: for (url, format, description, comment) in urls: bibdoc.delete_file(format, version) elif doctype == 'REVERT': try: bibdoc.revert(version) except Exception, e: write_message('(%s, %s) not correctly reverted: %s' % (newname, version, e), stream=sys.stderr) raise else: if restriction != KEEP_OLD_VALUE: bibdoc.set_status(restriction) # Since the docname already existed we have to first # bump the version by pushing the first new file # then pushing the other files. if urls: (first_url, first_format, first_description, first_comment) = urls[0] other_urls = urls[1:] assert(_add_new_version(bibdoc, first_url, first_format, docname, doctype, newname, first_description, first_comment)) for (url, format, description, comment) in other_urls: assert(_add_new_format(bibdoc, url, format, docname, doctype, newname, description, comment)) if icon != KEEP_OLD_VALUE: assert(_add_new_icon(bibdoc, icon, restriction)) if not found_bibdoc: bibdoc = bibrecdocs.add_bibdoc(doctype, newname) for (url, format, description, comment) in urls: assert(_add_new_format(bibdoc, url, format, docname, doctype, newname, description, comment)) if icon and not icon == KEEP_OLD_VALUE: assert(_add_new_icon(bibdoc, icon, restriction)) elif mode == 'correct': for bibdoc in bibrecdocs.list_bibdocs(): if bibdoc.get_docname() == docname: if doctype not in ('PURGE', 'DELETE', 'EXPUNGE', 'REVERT', 'FIX-ALL', 'FIX-MARC', 'DELETE-FILE'): if newname != docname: try: bibdoc.change_name(newname) icon = bibdoc.get_icon() if icon: icon.change_name('icon-%s' % newname) except StandardError, e: write_message('Error in renaming %s to %s: %s' % (docname, newname, e), stream=sys.stderr) raise found_bibdoc = False for bibdoc in bibrecdocs.list_bibdocs(): if bibdoc.get_docname() == newname: found_bibdoc = True if doctype == 'PURGE': bibdoc.purge() elif doctype == 'DELETE': bibdoc.delete() elif doctype == 'EXPUNGE': bibdoc.expunge() elif doctype == 'FIX-ALL': bibrecdocs.fix(newname) elif doctype == 'FIX-MARC': pass elif doctype == 'DELETE-FILE': if urls: for (url, format, description, comment) in urls: bibdoc.delete_file(format, version) elif doctype == 'REVERT': try: bibdoc.revert(version) except Exception, e: write_message('(%s, %s) not correctly reverted: %s' % (newname, version, e), stream=sys.stderr) raise else: if restriction != KEEP_OLD_VALUE: bibdoc.set_status(restriction) if urls: (first_url, first_format, first_description, first_comment) = urls[0] other_urls = urls[1:] assert(_add_new_version(bibdoc, first_url, first_format, docname, doctype, newname, first_description, first_comment)) for (url, format, description, comment) in other_urls: assert(_add_new_format(bibdoc, url, format, docname, description, doctype, newname, description, comment)) if icon != KEEP_OLD_VALUE: _add_new_icon(bibdoc, icon, restriction) if not found_bibdoc: if doctype in ('PURGE', 'DELETE', 'EXPUNGE', 'FIX-ALL', 'FIX-MARC', 'DELETE-FILE', 'REVERT'): write_message("('%s', '%s', '%s') not performed because '%s' docname didn't existed." % (doctype, newname, urls, docname), stream=sys.stderr) raise StandardError else: bibdoc = bibrecdocs.add_bibdoc(doctype, newname) for (url, format, description, comment) in urls: assert(_add_new_format(bibdoc, url, format, docname, doctype, newname, description, comment)) if icon and not icon == KEEP_OLD_VALUE: assert(_add_new_icon(bibdoc, icon, restriction)) elif mode == 'append': try: found_bibdoc = False for bibdoc in bibrecdocs.list_bibdocs(): if bibdoc.get_docname() == docname: found_bibdoc = True for (url, format, description, comment) in urls: assert(_add_new_format(bibdoc, url, format, docname, doctype, newname, description, comment)) if icon not in ('', KEEP_OLD_VALUE): assert(_add_new_icon(bibdoc, icon, restriction)) if not found_bibdoc: try: bibdoc = bibrecdocs.add_bibdoc(doctype, docname) bibdoc.set_status(restriction) for (url, format, description, comment) in urls: assert(_add_new_format(bibdoc, url, format, docname, doctype, newname, description, comment)) if icon and not icon == KEEP_OLD_VALUE: assert(_add_new_icon(bibdoc, icon, restriction)) except Exception, e: register_exception() write_message("('%s', '%s', '%s') not appended because: '%s'." % (doctype, newname, urls, e), stream=sys.stderr) raise except: register_exception() raise return record def insert_fmt_tags(record, rec_id, opt_mode): """Process and insert FMT tags""" fmt_fields = record_get_field_instances(record, 'FMT') if fmt_fields: for fmt_field in fmt_fields: # Get the d, f, g subfields of the FMT tag try: d_value = field_get_subfield_values(fmt_field, "d")[0] except IndexError: d_value = "" try: f_value = field_get_subfield_values(fmt_field, "f")[0] except IndexError: f_value = "" try: g_value = field_get_subfield_values(fmt_field, "g")[0] except IndexError: g_value = "" # Update the format res = update_bibfmt_format(rec_id, g_value, f_value, d_value) if res == 1: write_message(" Failed: Error during update_bibfmt", verbose=1, stream=sys.stderr) # If we are in format mode, we only care about the FMT tag if opt_mode == 'format': return 0 # We delete the FMT Tag of the record record_delete_field(record, 'FMT') write_message(" -Delete field FMT from record : DONE", verbose=2) return record elif opt_mode == 'format': write_message(" Failed: Format updated failed : No tag FMT found", verbose=1, stream=sys.stderr) return None else: return record ### Update functions def update_bibrec_modif_date(now, bibrec_id): """Update the date of the record in bibrec table """ query = """UPDATE bibrec SET modification_date=%s WHERE id=%s""" params = (now, bibrec_id) try: run_sql(query, params) write_message(" -Update record modification date : DONE" , verbose=2) except Error, error: write_message(" Error during update_bibrec_modif_date function : %s" % error, verbose=1, stream=sys.stderr) def update_bibfmt_format(id_bibrec, format_value, format_name, modification_date=None): """Update the format in the table bibfmt""" if modification_date is None: modification_date = time.strftime('%Y-%m-%d %H:%M:%S') else: try: time.strptime(modification_date, "%Y-%m-%d %H:%M:%S") except ValueError: modification_date = '1970-01-01 00:00:00' # We check if the format is already in bibFmt nb_found = find_record_format(id_bibrec, format_name) if nb_found == 1: # we are going to update the format # compress the format_value value pickled_format_value = compress(format_value) # update the format: query = """UPDATE bibfmt SET last_updated=%s, value=%s WHERE id_bibrec=%s AND format=%s""" params = (modification_date, pickled_format_value, id_bibrec, format_name) try: row_id = run_sql(query, params) if row_id is None: write_message(" Failed: Error during update_bibfmt_format function", verbose=1, stream=sys.stderr) return 1 else: write_message(" -Update the format %s in bibfmt : DONE" % format_name , verbose=2) return 0 except Error, error: write_message(" Error during the update_bibfmt_format function : %s " % error, verbose=1, stream=sys.stderr) elif nb_found > 1: write_message(" Failed: Same format %s found several time in bibfmt for the same record." % format_name, verbose=1, stream=sys.stderr) return 1 else: # Insert the format information in BibFMT res = insert_bibfmt(id_bibrec, format_value, format_name, modification_date) if res is None: write_message(" Failed: Error during insert_bibfmt", verbose=1, stream=sys.stderr) return 1 else: write_message(" -Insert the format %s in bibfmt : DONE" % format_name , verbose=2) return 0 def archive_marcxml_for_history(recID): """ Archive current MARCXML format of record RECID from BIBFMT table into hstRECORD table. Useful to keep MARCXML history of records. Return 0 if everything went fine. Return 1 otherwise. """ try: res = run_sql("SELECT id_bibrec, value, last_updated FROM bibfmt WHERE format='xm' AND id_bibrec=%s", (recID,)) if res: run_sql("""INSERT INTO hstRECORD (id_bibrec, marcxml, job_id, job_name, job_person, job_date, job_details) VALUES (%s,%s,%s,%s,%s,%s,%s)""", (res[0][0], res[0][1], task_get_task_param('task_id', 0), 'bibupload', task_get_task_param('user','UNKNOWN'), res[0][2], 'mode: ' + task_get_option('mode','UNKNOWN') + '; file: ' + task_get_option('file_path','UNKNOWN') + '.')) except Error, error: write_message(" Error during archive_marcxml_for_history: %s " % error, verbose=1, stream=sys.stderr) return 1 return 0 def update_database_with_metadata(record, rec_id, oai_rec_id = "oai"): """Update the database tables with the record and the record id given in parameter""" for tag in record.keys(): # check if tag is not a special one: if tag not in CFG_BIBUPLOAD_SPECIAL_TAGS: # for each tag there is a list of tuples representing datafields tuple_list = record[tag] # this list should contain the elements of a full tag [tag, ind1, ind2, subfield_code] tag_list = [] tag_list.append(tag) for single_tuple in tuple_list: # these are the contents of a single tuple subfield_list = single_tuple[0] ind1 = single_tuple[1] ind2 = single_tuple[2] # append the ind's to the full tag if ind1 == '' or ind1 == ' ': tag_list.append('_') else: tag_list.append(ind1) if ind2 == '' or ind2 == ' ': tag_list.append('_') else: tag_list.append(ind2) datafield_number = single_tuple[4] if tag in CFG_BIBUPLOAD_SPECIAL_TAGS: # nothing to do for special tags (FFT, FMT) pass elif tag in CFG_BIBUPLOAD_CONTROLFIELD_TAGS and tag != "001": value = single_tuple[3] # get the full tag full_tag = ''.join(tag_list) # update the tables write_message(" insertion of the tag "+full_tag+" with the value "+value, verbose=9) # insert the tag and value into into bibxxx (table_name, bibxxx_row_id) = insert_record_bibxxx(full_tag, value) #print 'tname, bibrow', table_name, bibxxx_row_id; if table_name is None or bibxxx_row_id is None: write_message(" Failed : during insert_record_bibxxx", verbose=1, stream=sys.stderr) # connect bibxxx and bibrec with the table bibrec_bibxxx res = insert_record_bibrec_bibxxx(table_name, bibxxx_row_id, datafield_number, rec_id) if res is None: write_message(" Failed : during insert_record_bibrec_bibxxx", verbose=1, stream=sys.stderr) else: # get the tag and value from the content of each subfield for subfield in subfield_list: subtag = subfield[0] value = subfield[1] tag_list.append(subtag) # get the full tag full_tag = ''.join(tag_list) # update the tables write_message(" insertion of the tag "+full_tag+" with the value "+value, verbose=9) # insert the tag and value into into bibxxx (table_name, bibxxx_row_id) = insert_record_bibxxx(full_tag, value) if table_name is None or bibxxx_row_id is None: write_message(" Failed : during insert_record_bibxxx", verbose=1, stream=sys.stderr) # connect bibxxx and bibrec with the table bibrec_bibxxx res = insert_record_bibrec_bibxxx(table_name, bibxxx_row_id, datafield_number, rec_id) if res is None: write_message(" Failed : during insert_record_bibrec_bibxxx", verbose=1, stream=sys.stderr) # remove the subtag from the list tag_list.pop() tag_list.pop() tag_list.pop() tag_list.pop() write_message(" -Update the database with metadata : DONE", verbose=2) log_record_uploading(oai_rec_id, task_get_task_param('task_id', 0), rec_id, 'P') def append_new_tag_to_old_record(record, rec_old, opt_tag, opt_mode): """Append new tags to a old record""" def _append_tag(tag): # Reference mode append only reference tag if opt_mode == 'reference': if tag == CFG_BIBUPLOAD_REFERENCE_TAG: for single_tuple in record[tag]: # We retrieve the information of the tag subfield_list = single_tuple[0] ind1 = single_tuple[1] ind2 = single_tuple[2] # We add the datafield to the old record write_message(" Adding tag: %s ind1=%s ind2=%s code=%s" % (tag, ind1, ind2, subfield_list), verbose=9) newfield_number = record_add_field(rec_old, tag, ind1, ind2, subfields=subfield_list) if newfield_number is None: write_message(" Error when adding the field"+tag, verbose=1, stream=sys.stderr) else: if tag in CFG_BIBUPLOAD_CONTROLFIELD_TAGS: if tag == '001': pass else: # if it is a controlfield,just access the value for single_tuple in record[tag]: controlfield_value = single_tuple[3] # add the field to the old record newfield_number = record_add_field(rec_old, tag, controlfield_value=controlfield_value) if newfield_number is None: write_message(" Error when adding the field"+tag, verbose=1, stream=sys.stderr) else: # For each tag there is a list of tuples representing datafields for single_tuple in record[tag]: # We retrieve the information of the tag subfield_list = single_tuple[0] ind1 = single_tuple[1] ind2 = single_tuple[2] + if '%s%s%s' % (tag, ind1 == ' ' and '_' or ind1, ind2 == ' ' and '_' or ind2) in (CFG_BIBUPLOAD_EXTERNAL_OAIID_TAG[:5], CFG_BIBUPLOAD_EXTERNAL_SYSNO_TAG[:5]): + ## We don't want to append the external identifier + ## if it is already existing. + if record_find_field(rec_old, tag, single_tuple): + write_message(" Not adding tag: %s ind1=%s ind2=%s subfields=%s: it's already there" % (tag, ind1, ind2, subfield_list), verbose=9) + continue # We add the datafield to the old record - write_message(" Adding tag: %s ind1=%s ind2=%s code=%s" % (tag, ind1, ind2, subfield_list), verbose=9) + write_message(" Adding tag: %s ind1=%s ind2=%s subfields=%s" % (tag, ind1, ind2, subfield_list), verbose=9) newfield_number = record_add_field(rec_old, tag, ind1, ind2, subfields=subfield_list) if newfield_number is None: write_message(" Error when adding the field"+tag, verbose=1, stream=sys.stderr) if opt_tag is not None: _append_tag(opt_tag) else: # Go through each tag in the appended record for tag in record: _append_tag(tag) return rec_old def copy_strong_tags_from_old_record(record, rec_old): """ Look for strong tags in RECORD and REC_OLD. If no strong tags are found in RECORD, then copy them over from REC_OLD. This function modifies RECORD structure on the spot. """ for strong_tag in CFG_BIBUPLOAD_STRONG_TAGS: if not record_get_field_instances(record, strong_tag): strong_tag_old_field_instances = record_get_field_instances(rec_old, strong_tag) if strong_tag_old_field_instances: for strong_tag_old_field_instance in strong_tag_old_field_instances: sf_vals, fi_ind1, fi_ind2, controlfield, dummy = strong_tag_old_field_instance record_add_field(record, strong_tag, fi_ind1, fi_ind2, controlfield, sf_vals) return ### Delete functions def delete_tags(record, rec_old): """ Returns a record structure with all the fields in rec_old minus the fields in record. @param record: The record containing tags to delete. @type record: record structure @param rec_old: The original record. @type rec_old: record structure @return: The modified record. @rtype: record structure """ returned_record = copy.deepcopy(rec_old) for tag, fields in record.iteritems(): if tag in ('001', ): continue for field in fields: local_position = record_find_field(returned_record, tag, field)[1] if local_position is not None: record_delete_field(returned_record, tag, field_position_local=local_position) return returned_record def delete_tags_to_correct(record, rec_old, opt_tag): """ Delete tags from REC_OLD which are also existing in RECORD. When deleting, pay attention not only to tags, but also to indicators, so that fields with the same tags but different indicators are not deleted. """ ## Some fields are controlled via provenance information. ## We should re-add saved fields at the end. fields_to_readd = {} for tag in CFG_BIBUPLOAD_CONTROLLED_PROVENANCE_TAGS: if tag[:3] in record: tmp_field_instances = record_get_field_instances(record, tag[:3], tag[3], tag[4]) ## Let's discover the provenance that will be updated provenances_to_update = [] for instance in tmp_field_instances: for code, value in instance[0]: if code == tag[5]: if value not in provenances_to_update: provenances_to_update.append(value) break else: ## The provenance is not specified. ## let's add the special empty provenance. if '' not in provenances_to_update: provenances_to_update.append('') potential_fields_to_readd = record_get_field_instances(rec_old, tag[:3], tag[3], tag[4]) ## Let's take all the field corresponding to tag ## Let's save apart all the fields that should be updated, but ## since they have a different provenance not mentioned in record ## they should be preserved. fields = [] for sf_vals, ind1, ind2, dummy_cf, dummy_line in potential_fields_to_readd: for code, value in sf_vals: if code == tag[5]: if value not in provenances_to_update: fields.append(sf_vals) break else: if '' not in provenances_to_update: ## Empty provenance, let's protect in any case fields.append(sf_vals) fields_to_readd[tag] = fields # browse through all the tags from the MARCXML file: for tag in record: # do we have to delete only a special tag or any tag? if opt_tag is None or opt_tag == tag: # check if the tag exists in the old record too: if tag in rec_old and tag != '001': # the tag does exist, so delete all record's tag+ind1+ind2 combinations from rec_old for dummy_sf_vals, ind1, ind2, dummy_cf, field_number in record[tag]: write_message(" Delete tag: " + tag + " ind1=" + ind1 + " ind2=" + ind2, verbose=9) record_delete_field(rec_old, tag, ind1, ind2) ## Ok, we readd necessary fields! for tag, fields in fields_to_readd.iteritems(): for sf_vals in fields: write_message(" Adding tag: " + tag[:3] + " ind1=" + tag[3] + " ind2=" + tag[4] + " code=" + str(sf_vals), verbose=9) record_add_field(rec_old, tag[:3], tag[3], tag[4], subfields=sf_vals) def delete_bibrec_bibxxx(record, id_bibrec): """Delete the database record from the table bibxxx given in parameters""" # we clear all the rows from bibrec_bibxxx from the old record for tag in record.keys(): if tag not in CFG_BIBUPLOAD_SPECIAL_TAGS: # for each name construct the bibrec_bibxxx table name table_name = 'bibrec_bib'+tag[0:2]+'x' # delete all the records with proper id_bibrec query = """DELETE FROM `%s` where id_bibrec = %s""" params = (table_name, id_bibrec) try: run_sql(query % params) except Error, error: write_message(" Error during the delete_bibrec_bibxxx function : %s " % error, verbose=1, stream=sys.stderr) def wipe_out_record_from_all_tables(recid): """ Wipe out completely the record and all its traces of RECID from the database (bibrec, bibrec_bibxxx, bibxxx, bibfmt). Useful for the time being for test cases. """ # delete all the linked bibdocs for bibdoc in BibRecDocs(recid).list_bibdocs(): bibdoc.expunge() # delete from bibrec: run_sql("DELETE FROM bibrec WHERE id=%s", (recid,)) # delete from bibrec_bibxxx: for i in range(0, 10): for j in range(0, 10): run_sql("DELETE FROM %(bibrec_bibxxx)s WHERE id_bibrec=%%s" % \ {'bibrec_bibxxx': "bibrec_bib%i%ix" % (i, j)}, (recid,)) # delete all unused bibxxx values: for i in range(0, 10): for j in range(0, 10): run_sql("DELETE %(bibxxx)s FROM %(bibxxx)s " \ " LEFT JOIN %(bibrec_bibxxx)s " \ " ON %(bibxxx)s.id=%(bibrec_bibxxx)s.id_bibxxx " \ " WHERE %(bibrec_bibxxx)s.id_bibrec IS NULL" % \ {'bibxxx': "bib%i%ix" % (i, j), 'bibrec_bibxxx': "bibrec_bib%i%ix" % (i, j)}) # delete from bibfmt: run_sql("DELETE FROM bibfmt WHERE id_bibrec=%s", (recid,)) # delete from bibrec_bibdoc: run_sql("DELETE FROM bibrec_bibdoc WHERE id_bibrec=%s", (recid,)) return def delete_bibdoc(id_bibrec): """Delete document from bibdoc which correspond to the bibrec id given in parameter""" query = """UPDATE bibdoc SET status='DELETED' WHERE id IN (SELECT id_bibdoc FROM bibrec_bibdoc WHERE id_bibrec=%s)""" params = (id_bibrec,) try: run_sql(query, params) except Error, error: write_message(" Error during the delete_bibdoc function : %s " % error, verbose=1, stream=sys.stderr) def delete_bibrec_bibdoc(id_bibrec): """Delete the bibrec record from the table bibrec_bibdoc given in parameter""" # delete all the records with proper id_bibrec query = """DELETE FROM bibrec_bibdoc WHERE id_bibrec=%s""" params = (id_bibrec,) try: run_sql(query, params) except Error, error: write_message(" Error during the delete_bibrec_bibdoc function : %s " % error, verbose=1, stream=sys.stderr) def main(): """Main that construct all the bibtask.""" task_init(authorization_action='runbibupload', authorization_msg="BibUpload Task Submission", description="""Receive MARC XML file and update appropriate database tables according to options. Examples: $ bibupload -i input.xml """, help_specific_usage=""" -a, --append\t\tnew fields are appended to the existing record -c, --correct\t\tfields are replaced by the new ones in the existing record -f, --format\t\ttakes only the FMT fields into account. Does not update -i, --insert\t\tinsert the new record in the database -r, --replace\t\tthe existing record is entirely replaced by the new one -z, --reference\tupdate references (update only 999 fields) -d, --delete\t\tspecified fields are deleted in existing record -S, --stage=STAGE\tstage to start from in the algorithm (0: always done; 1: FMT tags; \t\t\t2: FFT tags; 3: BibFmt; 4: Metadata update; 5: time update) -n, --notimechange\tdo not change record last modification date when updating -o, --holdingpen\t\tInsert record into holding pen instead of the normal database """, version=__revision__, specific_params=("ircazdS:fno", [ "insert", "replace", "correct", "append", "reference", "delete", "stage=", "format", "notimechange", "holdingpen", ]), task_submit_elaborate_specific_parameter_fnc=task_submit_elaborate_specific_parameter, task_run_fnc=task_run_core) def task_submit_elaborate_specific_parameter(key, value, opts, args): """ Given the string key it checks it's meaning, eventually using the value. Usually it fills some key in the options dict. It must return True if it has elaborated the key, False, if it doesn't know that key. eg: if key in ['-n', '--number']: task_get_option(\1) = value return True return False """ # No time change option if key in ("-n", "--notimechange"): task_set_option('notimechange', 1) # Insert mode option elif key in ("-i", "--insert"): if task_get_option('mode') == 'replace': # if also replace found, then set to replace_or_insert task_set_option('mode', 'replace_or_insert') else: task_set_option('mode', 'insert') fix_argv_paths([args[0]]) task_set_option('file_path', os.path.abspath(args[0])) # Replace mode option elif key in ("-r", "--replace"): if task_get_option('mode') == 'insert': # if also insert found, then set to replace_or_insert task_set_option('mode', 'replace_or_insert') else: task_set_option('mode', 'replace') fix_argv_paths([args[0]]) task_set_option('file_path', os.path.abspath(args[0])) # Holding pen mode option elif key in ("-o", "--holdingpen"): write_message("Holding pen mode", verbose=3) task_set_option('mode', 'holdingpen') fix_argv_paths([args[0]]) task_set_option('file_path', os.path.abspath(args[0])) # Correct mode option elif key in ("-c", "--correct"): task_set_option('mode', 'correct') fix_argv_paths([args[0]]) task_set_option('file_path', os.path.abspath(args[0])) # Append mode option elif key in ("-a", "--append"): task_set_option('mode', 'append') fix_argv_paths([args[0]]) task_set_option('file_path', os.path.abspath(args[0])) # Reference mode option elif key in ("-z", "--reference"): task_set_option('mode', 'reference') fix_argv_paths([args[0]]) task_set_option('file_path', os.path.abspath(args[0])) elif key in ("-d", "--delete"): task_set_option('mode', 'delete') fix_argv_paths([args[0]]) task_set_option('file_path', os.path.abspath(args[0])) # Format mode option elif key in ("-f", "--format"): task_set_option('mode', 'format') fix_argv_paths([args[0]]) task_set_option('file_path', os.path.abspath(args[0])) # Stage elif key in ("-S", "--stage"): try: value = int(value) except ValueError: print >> sys.stderr, """The value specified for --stage must be a valid integer, not %s""" % value return False if not (0 <= value <= 5): print >> sys.stderr, """The value specified for --stage must be comprised between 0 and 5""" return False task_set_option('stage_to_start_from', value) else: return False return True def task_submit_check_options(): """ Reimplement this method for having the possibility to check options before submitting the task, in order for example to provide default values. It must return False if there are errors in the options. """ if task_get_option('mode') is None: write_message("Please specify at least one update/insert mode!") return False if task_get_option('file_path') is None: write_message("Missing filename! -h for help.") return False return True def writing_rights_p(): """Return True in case bibupload has the proper rights to write in the fulltext file folder.""" filename = os.path.join(CFG_WEBSUBMIT_FILEDIR, 'test.txt') try: if not os.path.exists(CFG_WEBSUBMIT_FILEDIR): os.makedirs(CFG_WEBSUBMIT_FILEDIR) open(filename, 'w').write('TEST') assert(open(filename).read() == 'TEST') os.remove(filename) except: register_exception() return False return True def task_run_core(): """ Reimplement to add the body of the task.""" error = 0 write_message("Input file '%s', input mode '%s'." % (task_get_option('file_path'), task_get_option('mode'))) write_message("STAGE 0:", verbose=2) if task_get_option('file_path') is not None: write_message("start preocessing", verbose=3) task_update_progress("Reading XML input") recs = xml_marc_to_records(open_marc_file(task_get_option('file_path'))) stat['nb_records_to_upload'] = len(recs) write_message(" -Open XML marc: DONE", verbose=2) task_sleep_now_if_required(can_stop_too=True) write_message("Entering records loop", verbose=3) if recs is not None: # We proceed each record by record for record in recs: record_id = record_extract_oai_id(record) task_sleep_now_if_required(can_stop_too=True) if task_get_option("mode") == "holdingpen": #inserting into the holding pen write_message("Inserting into holding pen", verbose=3) insert_record_into_holding_pen(record, record_id) else: write_message("Inserting into main database", verbose=3) error = bibupload( record, opt_tag=task_get_option('tag'), opt_mode=task_get_option('mode'), opt_stage_to_start_from=task_get_option('stage_to_start_from'), opt_notimechange=task_get_option('notimechange'), oai_rec_id = record_id) if error[0] == 1: if record: write_message(record_xml_output(record), stream=sys.stderr) else: write_message("Record could not have been parsed", stream=sys.stderr) stat['nb_errors'] += 1 elif error[0] == 2: if record: write_message(record_xml_output(record), stream=sys.stderr) else: write_message("Record could not have been parsed", stream=sys.stderr) task_update_progress("Done %d out of %d." % \ (stat['nb_records_inserted'] + \ stat['nb_records_updated'], stat['nb_records_to_upload'])) else: write_message(" Error bibupload failed: No record found", verbose=1, stream=sys.stderr) if task_get_task_param('verbose') >= 1: # Print out the statistics print_out_bibupload_statistics() # Check if they were errors return not stat['nb_errors'] >= 1 def log_record_uploading(oai_rec_id, task_id, bibrec_id, insertion_db): if oai_rec_id != "" and oai_rec_id != None: query = """UPDATE oaiHARVESTLOG SET date_inserted=NOW(), inserted_to_db=%s, id_bibrec=%s WHERE oai_id = %s AND bibupload_task_id = %s ORDER BY date_harvested LIMIT 1""" try: run_sql(query, (str(insertion_db), str(bibrec_id), str(oai_rec_id), str(task_id), )) except Error, error: write_message(" Error during the log_record_uploading function : %s " % error, verbose=1, stream=sys.stderr) if __name__ == "__main__": main() diff --git a/modules/bibupload/lib/bibupload_regression_tests.py b/modules/bibupload/lib/bibupload_regression_tests.py index c43f2fa7f..dbee38390 100644 --- a/modules/bibupload/lib/bibupload_regression_tests.py +++ b/modules/bibupload/lib/bibupload_regression_tests.py @@ -1,3427 +1,3438 @@ # -*- coding: utf-8 -*- ## ## This file is part of CDS Invenio. ## Copyright (C) 2002, 2003, 2004, 2005, 2006, 2007, 2008 CERN. ## ## CDS Invenio is free software; you can redistribute it and/or ## modify it under the terms of the GNU General Public License as ## published by the Free Software Foundation; either version 2 of the ## License, or (at your option) any later version. ## ## CDS Invenio is distributed in the hope that it will be useful, but ## WITHOUT ANY WARRANTY; without even the implied warranty of ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU ## General Public License for more details. ## ## You should have received a copy of the GNU General Public License ## along with CDS Invenio; if not, write to the Free Software Foundation, Inc., ## 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA. # pylint: disable-msg=C0301 """Regression tests for the BibUpload.""" __revision__ = "$Id$" import re import unittest import datetime import os import time from urllib2 import urlopen from md5 import md5 from invenio.config import CFG_OAI_ID_FIELD, CFG_PREFIX, CFG_SITE_URL, CFG_TMPDIR, \ CFG_WEBSUBMIT_FILEDIR, \ CFG_BIBUPLOAD_EXTERNAL_SYSNO_TAG, \ CFG_BIBUPLOAD_EXTERNAL_OAIID_TAG, \ CFG_BIBUPLOAD_EXTERNAL_OAIID_PROVENANCE_TAG from invenio import bibupload from invenio.search_engine import print_record from invenio.dbquery import run_sql from invenio.dateutils import convert_datestruct_to_datetext from invenio.testutils import make_test_suite, run_test_suite from invenio.bibdocfile import BibRecDocs from invenio.bibtask import task_set_task_param # helper functions: def remove_tag_001_from_xmbuffer(xmbuffer): """Remove tag 001 from MARCXML buffer. Useful for testing two MARCXML buffers without paying attention to recIDs attributed during the bibupload. """ return re.sub(r'.*', '', xmbuffer) def compare_xmbuffers(xmbuffer1, xmbuffer2): """Compare two XM (XML MARC) buffers by removing whitespaces before testing. """ def remove_blanks_from_xmbuffer(xmbuffer): """Remove \n and blanks from XMBUFFER.""" out = xmbuffer.replace("\n", "") out = out.replace(" ", "") return out # remove whitespace: xmbuffer1 = remove_blanks_from_xmbuffer(xmbuffer1) xmbuffer2 = remove_blanks_from_xmbuffer(xmbuffer2) if xmbuffer1 != xmbuffer2: return "\n=" + xmbuffer1 + "=\n" + '!=' + "\n=" + xmbuffer2 + "=\n" return '' def remove_tag_001_from_hmbuffer(hmbuffer): """Remove tag 001 from HTML MARC buffer. Useful for testing two HTML MARC buffers without paying attention to recIDs attributed during the bibupload. """ return re.sub(r'(^|\n)(
)?[0-9]{9}\s001__\s\d+($|\n)', '', hmbuffer)
 
 def compare_hmbuffers(hmbuffer1, hmbuffer2):
     """Compare two HM (HTML MARC) buffers by removing whitespaces
        before testing.
     """
 
     # remove eventual 
...
formatting: hmbuffer1 = re.sub(r'^
', '', hmbuffer1)
     hmbuffer2 = re.sub(r'^
', '', hmbuffer2)
     hmbuffer1 = re.sub(r'
$', '', hmbuffer1) hmbuffer2 = re.sub(r'
$', '', hmbuffer2) # remove leading recid, leaving only field values: hmbuffer1 = re.sub(r'(^|\n)[0-9]{9}\s', '', hmbuffer1) hmbuffer2 = re.sub(r'(^|\n)[0-9]{9}\s', '', hmbuffer2) # remove leading whitespace: hmbuffer1 = re.sub(r'(^|\n)\s+', '', hmbuffer1) hmbuffer2 = re.sub(r'(^|\n)\s+', '', hmbuffer2) compare_hmbuffers = hmbuffer1 == hmbuffer2 if not compare_hmbuffers: return "\n=" + hmbuffer1 + "=\n" + '!=' + "\n=" + hmbuffer2 + "=\n" return '' def try_url_download(url): """Try to download a given URL""" try: open_url = urlopen(url) open_url.read() except Exception, e: raise StandardError, "Downloading %s is impossible because of %s" \ % (url, str(e)) return True class BibUploadInsertModeTest(unittest.TestCase): """Testing insert mode.""" def setUp(self): # pylint: disable-msg=C0103 """Initialise the MARCXML variable""" self.test = """ something Tester, J Y MIT Tester, K J CERN2 Tester, G CERN3 test11 test31 test12 test32 test13 test33 test21 test41 test22 test42 test14 test51 test52 Tester, T CERN """ self.test_hm = """ 100__ $$aTester, T$$uCERN 111__ $$atest11$$ctest31 111__ $$atest12$$ctest32 111__ $$atest13$$ctest33 111__ $$btest21$$dtest41 111__ $$btest22$$dtest42 111__ $$atest14 111__ $$etest51 111__ $$etest52 245__ $$asomething 700__ $$aTester, J Y$$uMIT 700__ $$aTester, K J$$uCERN2 700__ $$aTester, G$$uCERN3 """ def test_create_record_id(self): """bibupload - insert mode, trying to create a new record ID in the database""" rec_id = bibupload.create_new_record() self.assertNotEqual(-1, rec_id) def test_no_retrieve_record_id(self): """bibupload - insert mode, detection of record ID in the input file""" # We create create the record out of the xml marc recs = bibupload.xml_marc_to_records(self.test) # We call the function which should retrieve the record id rec_id = bibupload.retrieve_rec_id(recs[0], 'insert') # We compare the value found with None self.assertEqual(None, rec_id) def test_insert_complete_xmlmarc(self): """bibupload - insert mode, trying to insert complete MARCXML file""" # Initialize the global variable task_set_task_param('verbose', 0) # We create create the record out of the xml marc recs = bibupload.xml_marc_to_records(self.test) # We call the main function with the record as a parameter err, recid = bibupload.bibupload(recs[0], opt_mode='insert') # We retrieve the inserted xml inserted_xm = print_record(recid, 'xm') inserted_hm = print_record(recid, 'hm') # Compare if the two MARCXML are the same self.assertEqual(compare_xmbuffers(remove_tag_001_from_xmbuffer(inserted_xm), self.test), '') self.assertEqual(compare_hmbuffers(remove_tag_001_from_hmbuffer(inserted_hm), self.test_hm), '') class BibUploadAppendModeTest(unittest.TestCase): """Testing append mode.""" def setUp(self): # pylint: disable-msg=C0103 """Initialize the MARCXML variable""" self.test_existing = """ 123456789 Tester, T DESY + + 0003719PHOPHO + """ self.test_to_append = """ 123456789 Tester, U CERN + + 0003719PHOPHO + """ self.test_expected_xm = """ 123456789 Tester, T DESY Tester, U CERN + + 0003719PHOPHO + """ self.test_expected_hm = """ 001__ 123456789 100__ $$aTester, T$$uDESY 100__ $$aTester, U$$uCERN + 970__ $$a0003719PHOPHO """ # insert test record: test_to_upload = self.test_existing.replace('123456789', '') recs = bibupload.xml_marc_to_records(test_to_upload) + task_set_task_param('verbose', 0) err, recid = bibupload.bibupload(recs[0], opt_mode='insert') self.test_recid = recid # replace test buffers with real recid of inserted test record: self.test_existing = self.test_existing.replace('123456789', str(self.test_recid)) self.test_to_append = self.test_to_append.replace('123456789', str(self.test_recid)) self.test_expected_xm = self.test_expected_xm.replace('123456789', str(self.test_recid)) self.test_expected_hm = self.test_expected_hm.replace('123456789', str(self.test_recid)) def test_retrieve_record_id(self): """bibupload - append mode, the input file should contain a record ID""" task_set_task_param('verbose', 0) # We create create the record out of the xml marc recs = bibupload.xml_marc_to_records(self.test_to_append) # We call the function which should retrieve the record id rec_id = bibupload.retrieve_rec_id(recs[0], 'append') # We compare the value found with None self.assertEqual(self.test_recid, rec_id) # clean up after ourselves: bibupload.wipe_out_record_from_all_tables(self.test_recid) return def test_update_modification_record_date(self): """bibupload - append mode, checking the update of the modification date""" # Initialize the global variable task_set_task_param('verbose', 0) # We create create the record out of the xml marc recs = bibupload.xml_marc_to_records(self.test_existing) # We call the function which should retrieve the record id rec_id = bibupload.retrieve_rec_id(recs[0], opt_mode='append') # Retrieve current localtime now = time.localtime() # We update the modification date bibupload.update_bibrec_modif_date(convert_datestruct_to_datetext(now), rec_id) # We retrieve the modification date from the database query = """SELECT DATE_FORMAT(modification_date,'%%Y-%%m-%%d %%H:%%i:%%s') FROM bibrec where id = %s""" res = run_sql(query % rec_id) # We compare the two results self.assertEqual(res[0][0], convert_datestruct_to_datetext(now)) # clean up after ourselves: bibupload.wipe_out_record_from_all_tables(self.test_recid) return def test_append_complete_xml_marc(self): """bibupload - append mode, appending complete MARCXML file""" # Now we append a datafield # We create create the record out of the xml marc recs = bibupload.xml_marc_to_records(self.test_to_append) # We call the main function with the record as a parameter err, recid = bibupload.bibupload(recs[0], opt_mode='append') # We retrieve the inserted xm after_append_xm = print_record(recid, 'xm') after_append_hm = print_record(recid, 'hm') # Compare if the two MARCXML are the same self.assertEqual(compare_xmbuffers(after_append_xm, self.test_expected_xm), '') self.assertEqual(compare_hmbuffers(after_append_hm, self.test_expected_hm), '') # clean up after ourselves: bibupload.wipe_out_record_from_all_tables(self.test_recid) return class BibUploadCorrectModeTest(unittest.TestCase): """ Testing correcting a record containing similar tags (identical tag, different indicators). Currently CDS Invenio replaces only those tags that have matching indicators too, unlike ALEPH500 that does not pay attention to indicators, it corrects all fields with the same tag, regardless of the indicator values. """ def setUp(self): """Initialize the MARCXML test record.""" self.testrec1_xm = """ 123456789 SzGeCERN Test, Jane Test Institute Test, John Test University Cool Test, Jim Test Laboratory """ self.testrec1_hm = """ 001__ 123456789 003__ SzGeCERN 100__ $$aTest, Jane$$uTest Institute 10047 $$aTest, John$$uTest University 10048 $$aCool 10047 $$aTest, Jim$$uTest Laboratory """ self.testrec1_xm_to_correct = """ 123456789 Test, Joseph Test Academy Test2, Joseph Test2 Academy """ self.testrec1_corrected_xm = """ 123456789 SzGeCERN Test, Jane Test Institute Cool Test, Joseph Test Academy Test2, Joseph Test2 Academy """ self.testrec1_corrected_hm = """ 001__ 123456789 003__ SzGeCERN 100__ $$aTest, Jane$$uTest Institute 10048 $$aCool 10047 $$aTest, Joseph$$uTest Academy 10047 $$aTest2, Joseph$$uTest2 Academy """ # insert test record: task_set_task_param('verbose', 0) test_record_xm = self.testrec1_xm.replace('123456789', '') recs = bibupload.xml_marc_to_records(test_record_xm) err, recid = bibupload.bibupload(recs[0], opt_mode='insert') # replace test buffers with real recID: self.testrec1_xm = self.testrec1_xm.replace('123456789', str(recid)) self.testrec1_hm = self.testrec1_hm.replace('123456789', str(recid)) self.testrec1_xm_to_correct = self.testrec1_xm_to_correct.replace('123456789', str(recid)) self.testrec1_corrected_xm = self.testrec1_corrected_xm.replace('123456789', str(recid)) self.testrec1_corrected_hm = self.testrec1_corrected_hm.replace('123456789', str(recid)) # test of the inserted record: inserted_xm = print_record(recid, 'xm') inserted_hm = print_record(recid, 'hm') self.assertEqual(compare_xmbuffers(inserted_xm, self.testrec1_xm), '') self.assertEqual(compare_hmbuffers(inserted_hm, self.testrec1_hm), '') def test_record_correction(self): """bibupload - correct mode, similar MARCXML tags/indicators""" # correct some tags: recs = bibupload.xml_marc_to_records(self.testrec1_xm_to_correct) err, recid = bibupload.bibupload(recs[0], opt_mode='correct') corrected_xm = print_record(recid, 'xm') corrected_hm = print_record(recid, 'hm') # did it work? self.assertEqual(compare_xmbuffers(corrected_xm, self.testrec1_corrected_xm), '') self.assertEqual(compare_hmbuffers(corrected_hm, self.testrec1_corrected_hm), '') # clean up after ourselves: bibupload.wipe_out_record_from_all_tables(recid) return class BibUploadDeleteModeTest(unittest.TestCase): """ Testing deleting specific tags from a record while keeping anything else untouched. Currently CDS Invenio deletes only those tags that have matching indicators too, unlike ALEPH500 that does not pay attention to indicators, it corrects all fields with the same tag, regardless of the indicator values. """ def setUp(self): """Initialize the MARCXML test record.""" self.testrec1_xm = """ 123456789 SzGeCERN Test, Jane Test Institute Test, John Test University Cool Test, Jim Test Laboratory dumb text """ self.testrec1_hm = """ 001__ 123456789 003__ SzGeCERN 100__ $$aTest, Jane$$uTest Institute 10047 $$aTest, John$$uTest University 10048 $$aCool 10047 $$aTest, Jim$$uTest Laboratory 888__ $$adumb text """ self.testrec1_xm_to_delete = """ 123456789 Test, Jane Test Institute Test, Johnson Test University Cool dumb text """ self.testrec1_corrected_xm = """ 123456789 SzGeCERN Test, John Test University Test, Jim Test Laboratory """ self.testrec1_corrected_hm = """ 001__ 123456789 003__ SzGeCERN 10047 $$aTest, John$$uTest University 10047 $$aTest, Jim$$uTest Laboratory """ # insert test record: task_set_task_param('verbose', 0) test_record_xm = self.testrec1_xm.replace('123456789', '') recs = bibupload.xml_marc_to_records(test_record_xm) err, recid = bibupload.bibupload(recs[0], opt_mode='insert') # replace test buffers with real recID: self.testrec1_xm = self.testrec1_xm.replace('123456789', str(recid)) self.testrec1_hm = self.testrec1_hm.replace('123456789', str(recid)) self.testrec1_xm_to_delete = self.testrec1_xm_to_delete.replace('123456789', str(recid)) self.testrec1_corrected_xm = self.testrec1_corrected_xm.replace('123456789', str(recid)) self.testrec1_corrected_hm = self.testrec1_corrected_hm.replace('123456789', str(recid)) # test of the inserted record: inserted_xm = print_record(recid, 'xm') inserted_hm = print_record(recid, 'hm') self.assertEqual(compare_xmbuffers(inserted_xm, self.testrec1_xm), '') self.assertEqual(compare_hmbuffers(inserted_hm, self.testrec1_hm), '') # Checking dumb text is in bibxxx self.failUnless(run_sql("SELECT * from bibrec_bib88x WHERE id_bibrec=%s", (recid, ))) def test_record_tags_deletion(self): """bibupload - delete mode, deleting specific tags""" # correct some tags: recs = bibupload.xml_marc_to_records(self.testrec1_xm_to_delete) err, recid = bibupload.bibupload(recs[0], opt_mode='delete') corrected_xm = print_record(recid, 'xm') corrected_hm = print_record(recid, 'hm') # did it work? self.assertEqual(compare_xmbuffers(corrected_xm, self.testrec1_corrected_xm), '') self.assertEqual(compare_hmbuffers(corrected_hm, self.testrec1_corrected_hm), '') # Checking dumb text is no more in bibxxx self.failIf(run_sql("SELECT * from bibrec_bib88x WHERE id_bibrec=%s", (recid, ))) # clean up after ourselves: bibupload.wipe_out_record_from_all_tables(recid) return class BibUploadReplaceModeTest(unittest.TestCase): """Testing replace mode.""" def setUp(self): """Initialize the MARCXML test record.""" self.testrec1_xm = """ 123456789 SzGeCERN Test, Jane Test Institute Test, John Test University Cool Test, Jim Test Laboratory """ self.testrec1_hm = """ 001__ 123456789 003__ SzGeCERN 100__ $$aTest, Jane$$uTest Institute 10047 $$aTest, John$$uTest University 10048 $$aCool 10047 $$aTest, Jim$$uTest Laboratory """ self.testrec1_xm_to_replace = """ 123456789 Test, Joseph Test Academy Test2, Joseph Test2 Academy """ self.testrec1_replaced_xm = """ 123456789 Test, Joseph Test Academy Test2, Joseph Test2 Academy """ self.testrec1_replaced_hm = """ 001__ 123456789 10047 $$aTest, Joseph$$uTest Academy 10047 $$aTest2, Joseph$$uTest2 Academy """ # insert test record: task_set_task_param('verbose', 0) test_record_xm = self.testrec1_xm.replace('123456789', '') recs = bibupload.xml_marc_to_records(test_record_xm) err, recid = bibupload.bibupload(recs[0], opt_mode='insert') # replace test buffers with real recID: self.testrec1_xm = self.testrec1_xm.replace('123456789', str(recid)) self.testrec1_hm = self.testrec1_hm.replace('123456789', str(recid)) self.testrec1_xm_to_replace = self.testrec1_xm_to_replace.replace('123456789', str(recid)) self.testrec1_replaced_xm = self.testrec1_replaced_xm.replace('123456789', str(recid)) self.testrec1_replaced_hm = self.testrec1_replaced_hm.replace('123456789', str(recid)) # test of the inserted record: inserted_xm = print_record(recid, 'xm') inserted_hm = print_record(recid, 'hm') self.assertEqual(compare_xmbuffers(inserted_xm, self.testrec1_xm), '') self.assertEqual(compare_hmbuffers(inserted_hm, self.testrec1_hm), '') def test_record_replace(self): """bibupload - replace mode, similar MARCXML tags/indicators""" # replace some tags: recs = bibupload.xml_marc_to_records(self.testrec1_xm_to_replace) err, recid = bibupload.bibupload(recs[0], opt_mode='replace') replaced_xm = print_record(recid, 'xm') replaced_hm = print_record(recid, 'hm') # did it work? self.assertEqual(compare_xmbuffers(replaced_xm, self.testrec1_replaced_xm), '') self.assertEqual(compare_hmbuffers(replaced_hm, self.testrec1_replaced_hm), '') # clean up after ourselves: bibupload.wipe_out_record_from_all_tables(recid) return class BibUploadReferencesModeTest(unittest.TestCase): """Testing references mode.""" def setUp(self): # pylint: disable-msg=C0103 """Initialize the MARCXML variable""" self.test_insert = """ 123456789 Tester, T CERN """ self.test_reference = """ 123456789 M. Lüscher and P. Weisz, String excitation energies in SU(N) gauge theories beyond the free-string approximation, J. High Energy Phys. 07 (2004) 014 """ self.test_reference_expected_xm = """ 123456789 Tester, T CERN M. Lüscher and P. Weisz, String excitation energies in SU(N) gauge theories beyond the free-string approximation, J. High Energy Phys. 07 (2004) 014 """ self.test_insert_hm = """ 001__ 123456789 100__ $$aTester, T$$uCERN """ self.test_reference_expected_hm = """ 001__ 123456789 100__ $$aTester, T$$uCERN %(reference_tag)sC5 $$mM. Lüscher and P. Weisz, String excitation energies in SU(N) gauge theories beyond the free-string approximation,$$sJ. High Energy Phys. 07 (2004) 014 """ % {'reference_tag': bibupload.CFG_BIBUPLOAD_REFERENCE_TAG} # insert test record: task_set_task_param('verbose', 0) test_insert = self.test_insert.replace('123456789', '') recs = bibupload.xml_marc_to_records(test_insert) err, recid = bibupload.bibupload(recs[0], opt_mode='insert') # replace test buffers with real recID: self.test_insert = self.test_insert.replace('123456789', str(recid)) self.test_insert_hm = self.test_insert_hm.replace('123456789', str(recid)) self.test_reference = self.test_reference.replace('123456789', str(recid)) self.test_reference_expected_xm = self.test_reference_expected_xm.replace('123456789', str(recid)) self.test_reference_expected_hm = self.test_reference_expected_hm.replace('123456789', str(recid)) # test of the inserted record: inserted_xm = print_record(recid, 'xm') inserted_hm = print_record(recid, 'hm') self.assertEqual(compare_xmbuffers(inserted_xm, self.test_insert), '') self.assertEqual(compare_hmbuffers(inserted_hm, self.test_insert_hm), '') self.test_recid = recid def test_reference_complete_xml_marc(self): """bibupload - reference mode, inserting references MARCXML file""" # We create create the record out of the xml marc recs = bibupload.xml_marc_to_records(self.test_reference) # We call the main function with the record as a parameter err, recid = bibupload.bibupload(recs[0], opt_mode='reference') # We retrieve the inserted xml reference_xm = print_record(recid, 'xm') reference_hm = print_record(recid, 'hm') # Compare if the two MARCXML are the same self.assertEqual(compare_xmbuffers(reference_xm, self.test_reference_expected_xm), '') self.assertEqual(compare_hmbuffers(reference_hm, self.test_reference_expected_hm), '') # clean up after ourselves: bibupload.wipe_out_record_from_all_tables(self.test_recid) return class BibUploadFMTModeTest(unittest.TestCase): """Testing FMT mode.""" def setUp(self): # pylint: disable-msg=C0103 """Initialize the MARCXML variable""" self.new_xm_with_fmt = """ SzGeCERN HB Test. Okay. 2008-03-14 15:14:00 Bar, Baz Foo On the quux and huux """ self.expected_xm_after_inserting_new_xm_with_fmt = """ 123456789 SzGeCERN Bar, Baz Foo On the quux and huux """ self.expected_hm_after_inserting_new_xm_with_fmt = """ 001__ 123456789 003__ SzGeCERN 100__ $$aBar, Baz$$uFoo 245__ $$aOn the quux and huux """ self.recid76_xm_before_all_the_tests = print_record(76, 'xm') self.recid76_hm_before_all_the_tests = print_record(76, 'hm') self.recid76_fmts = run_sql("""SELECT last_updated, value, format FROM bibfmt WHERE id_bibrec=76""") self.recid76_xm_with_fmt = """ 76 SzGeCERN HB Test. Here is some format value. Doe, John CERN On the foos and bars """ self.recid76_xm_with_fmt_only_first = """ 76 HB Test. Let us see if this gets inserted well. """ self.recid76_xm_with_fmt_only_second = """ 76 HB Test. Yet another test, to be run after the first one. HD Test. Let's see what will be stored in the detailed format field. """ def tearDown(self): """Helper function that restores recID 76 MARCXML, using the value saved before all the tests started to execute. (see self.recid76_xm_before_all_the_tests). Does not restore HB and HD formats. """ recs = bibupload.xml_marc_to_records(self.recid76_xm_before_all_the_tests) err, recid = bibupload.bibupload(recs[0], opt_mode='replace') for (last_updated, value, format) in self.recid76_fmts: run_sql("""UPDATE bibfmt SET last_updated=%s, value=%s WHERE id_bibrec=76 AND format=%s""", (last_updated, value, format)) inserted_xm = print_record(recid, 'xm') inserted_hm = print_record(recid, 'hm') self.assertEqual(compare_xmbuffers(inserted_xm, self.recid76_xm_before_all_the_tests), '') self.assertEqual(compare_hmbuffers(inserted_hm, self.recid76_hm_before_all_the_tests), '') def test_inserting_new_record_containing_fmt_tag(self): """bibupload - FMT tag, inserting new record containing FMT tag""" recs = bibupload.xml_marc_to_records(self.new_xm_with_fmt) (dummy, new_recid) = bibupload.bibupload(recs[0], opt_mode='insert') xm_after = print_record(new_recid, 'xm') hm_after = print_record(new_recid, 'hm') hb_after = print_record(new_recid, 'hb') self.assertEqual(compare_xmbuffers(xm_after, self.expected_xm_after_inserting_new_xm_with_fmt.replace('123456789', str(new_recid))), '') self.assertEqual(compare_hmbuffers(hm_after, self.expected_hm_after_inserting_new_xm_with_fmt.replace('123456789', str(new_recid))), '') self.assertEqual(run_sql('SELECT last_updated from bibfmt WHERE id_bibrec=%s', (new_recid, ))[0][0], datetime.datetime(2008, 3, 14, 15, 14)) self.failUnless(hb_after.startswith("Test. Okay.")) def test_updating_existing_record_formats_in_format_mode(self): """bibupload - FMT tag, updating existing record via format mode""" xm_before = print_record(76, 'xm') hm_before = print_record(76, 'hm') # insert first format value: recs = bibupload.xml_marc_to_records(self.recid76_xm_with_fmt_only_first) bibupload.bibupload(recs[0], opt_mode='format') xm_after = print_record(76, 'xm') hm_after = print_record(76, 'hm') hb_after = print_record(76, 'hb') self.assertEqual(xm_after, xm_before) self.assertEqual(hm_after, hm_before) self.failUnless(hb_after.startswith("Test. Let us see if this gets inserted well.")) # now insert another format value and recheck: recs = bibupload.xml_marc_to_records(self.recid76_xm_with_fmt_only_second) bibupload.bibupload(recs[0], opt_mode='format') xm_after = print_record(76, 'xm') hm_after = print_record(76, 'hm') hb_after = print_record(76, 'hb') hd_after = print_record(76, 'hd') self.assertEqual(xm_after, xm_before) self.assertEqual(hm_after, hm_before) self.failUnless(hb_after.startswith("Test. Yet another test, to be run after the first one.")) self.failUnless(hd_after.startswith("Test. Let's see what will be stored in the detailed format field.")) def test_updating_existing_record_formats_in_correct_mode(self): """bibupload - FMT tag, updating existing record via correct mode""" xm_before = print_record(76, 'xm') hm_before = print_record(76, 'hm') # insert first format value: recs = bibupload.xml_marc_to_records(self.recid76_xm_with_fmt_only_first) bibupload.bibupload(recs[0], opt_mode='correct') xm_after = print_record(76, 'xm') hm_after = print_record(76, 'hm') hb_after = print_record(76, 'hb') self.assertEqual(xm_after, xm_before) self.assertEqual(hm_after, hm_before) self.failUnless(hb_after.startswith("Test. Let us see if this gets inserted well.")) # now insert another format value and recheck: recs = bibupload.xml_marc_to_records(self.recid76_xm_with_fmt_only_second) bibupload.bibupload(recs[0], opt_mode='correct') xm_after = print_record(76, 'xm') hm_after = print_record(76, 'hm') hb_after = print_record(76, 'hb') hd_after = print_record(76, 'hd') self.assertEqual(xm_after, xm_before) self.assertEqual(hm_after, hm_before) self.failUnless(hb_after.startswith("Test. Yet another test, to be run after the first one.")) self.failUnless(hd_after.startswith("Test. Let's see what will be stored in the detailed format field.")) def test_updating_existing_record_formats_in_replace_mode(self): """bibupload - FMT tag, updating existing record via replace mode""" # insert first format value: recs = bibupload.xml_marc_to_records(self.recid76_xm_with_fmt_only_first) bibupload.bibupload(recs[0], opt_mode='replace') xm_after = print_record(76, 'xm') hm_after = print_record(76, 'hm') hb_after = print_record(76, 'hb') self.assertEqual(compare_xmbuffers(xm_after, '76'), '') self.assertEqual(compare_hmbuffers(hm_after, '000000076 001__ 76'), '') self.failUnless(hb_after.startswith("Test. Let us see if this gets inserted well.")) # now insert another format value and recheck: recs = bibupload.xml_marc_to_records(self.recid76_xm_with_fmt_only_second) bibupload.bibupload(recs[0], opt_mode='replace') xm_after = print_record(76, 'xm') hm_after = print_record(76, 'hm') hb_after = print_record(76, 'hb') hd_after = print_record(76, 'hd') self.assertEqual(compare_xmbuffers(xm_after, """ 76 """), '') self.assertEqual(compare_hmbuffers(hm_after, '000000076 001__ 76'), '') self.failUnless(hb_after.startswith("Test. Yet another test, to be run after the first one.")) self.failUnless(hd_after.startswith("Test. Let's see what will be stored in the detailed format field.")) # final insertion and recheck: recs = bibupload.xml_marc_to_records(self.recid76_xm_with_fmt) bibupload.bibupload(recs[0], opt_mode='replace') xm_after = print_record(76, 'xm') hm_after = print_record(76, 'hm') hb_after = print_record(76, 'hb') hd_after = print_record(76, 'hd') self.assertEqual(compare_xmbuffers(xm_after, """ 76 SzGeCERN Doe, John CERN On the foos and bars """), '') self.assertEqual(compare_hmbuffers(hm_after, """ 001__ 76 003__ SzGeCERN 100__ $$aDoe, John$$uCERN 245__ $$aOn the foos and bars """), '') self.failUnless(hb_after.startswith("Test. Here is some format value.")) self.failUnless(hd_after.startswith("Test. Let's see what will be stored in the detailed format field.")) class BibUploadRecordsWithSYSNOTest(unittest.TestCase): """Testing uploading of records that have external SYSNO present.""" def setUp(self): # pylint: disable-msg=C0103 """Initialize the MARCXML test records.""" self.verbose = 0 # Note that SYSNO fields are repeated but with different # subfields, this is to test whether bibupload would not # mistakenly pick up wrong values. self.xm_testrec1 = """ 123456789 SzGeCERN Bar, Baz Foo On the quux and huux 1 sysno1 sysno2 """ % {'sysnotag': CFG_BIBUPLOAD_EXTERNAL_SYSNO_TAG[0:3], 'sysnoind1': CFG_BIBUPLOAD_EXTERNAL_SYSNO_TAG[3:4] != "_" and \ CFG_BIBUPLOAD_EXTERNAL_SYSNO_TAG[3:4] or " ", 'sysnoind2': CFG_BIBUPLOAD_EXTERNAL_SYSNO_TAG[4:5] != "_" and \ CFG_BIBUPLOAD_EXTERNAL_SYSNO_TAG[4:5] or " ", 'sysnosubfieldcode': CFG_BIBUPLOAD_EXTERNAL_SYSNO_TAG[5:6], } self.hm_testrec1 = """ 001__ 123456789 003__ SzGeCERN 100__ $$aBar, Baz$$uFoo 245__ $$aOn the quux and huux 1 %(sysnotag)s%(sysnoind1)s%(sysnoind2)s $$%(sysnosubfieldcode)ssysno1 %(sysnotag)s%(sysnoind1)s%(sysnoind2)s $$0sysno2 """ % {'sysnotag': CFG_BIBUPLOAD_EXTERNAL_SYSNO_TAG[0:3], 'sysnoind1': CFG_BIBUPLOAD_EXTERNAL_SYSNO_TAG[3:4], 'sysnoind2': CFG_BIBUPLOAD_EXTERNAL_SYSNO_TAG[4:5], 'sysnosubfieldcode': CFG_BIBUPLOAD_EXTERNAL_SYSNO_TAG[5:6], } self.xm_testrec1_to_update = """ SzGeCERN Bar, Baz Foo On the quux and huux 1 Updated sysno1 sysno2 """ % {'sysnotag': CFG_BIBUPLOAD_EXTERNAL_SYSNO_TAG[0:3], 'sysnoind1': CFG_BIBUPLOAD_EXTERNAL_SYSNO_TAG[3:4] != "_" and \ CFG_BIBUPLOAD_EXTERNAL_SYSNO_TAG[3:4] or " ", 'sysnoind2': CFG_BIBUPLOAD_EXTERNAL_SYSNO_TAG[4:5] != "_" and \ CFG_BIBUPLOAD_EXTERNAL_SYSNO_TAG[4:5] or " ", 'sysnosubfieldcode': CFG_BIBUPLOAD_EXTERNAL_SYSNO_TAG[5:6], } self.xm_testrec1_updated = """ 123456789 SzGeCERN Bar, Baz Foo On the quux and huux 1 Updated sysno1 sysno2 """ % {'sysnotag': CFG_BIBUPLOAD_EXTERNAL_SYSNO_TAG[0:3], 'sysnoind1': CFG_BIBUPLOAD_EXTERNAL_SYSNO_TAG[3:4] != "_" and \ CFG_BIBUPLOAD_EXTERNAL_SYSNO_TAG[3:4] or " ", 'sysnoind2': CFG_BIBUPLOAD_EXTERNAL_SYSNO_TAG[4:5] != "_" and \ CFG_BIBUPLOAD_EXTERNAL_SYSNO_TAG[4:5] or " ", 'sysnosubfieldcode': CFG_BIBUPLOAD_EXTERNAL_SYSNO_TAG[5:6], } self.hm_testrec1_updated = """ 001__ 123456789 003__ SzGeCERN 100__ $$aBar, Baz$$uFoo 245__ $$aOn the quux and huux 1 Updated %(sysnotag)s%(sysnoind1)s%(sysnoind2)s $$%(sysnosubfieldcode)ssysno1 %(sysnotag)s%(sysnoind1)s%(sysnoind2)s $$0sysno2 """ % {'sysnotag': CFG_BIBUPLOAD_EXTERNAL_SYSNO_TAG[0:3], 'sysnoind1': CFG_BIBUPLOAD_EXTERNAL_SYSNO_TAG[3:4], 'sysnoind2': CFG_BIBUPLOAD_EXTERNAL_SYSNO_TAG[4:5], 'sysnosubfieldcode': CFG_BIBUPLOAD_EXTERNAL_SYSNO_TAG[5:6], } self.xm_testrec2 = """ 987654321 SzGeCERN Bar, Baz Foo On the quux and huux 2 sysno2 sysno1 """ % {'sysnotag': CFG_BIBUPLOAD_EXTERNAL_SYSNO_TAG[0:3], 'sysnoind1': CFG_BIBUPLOAD_EXTERNAL_SYSNO_TAG[3:4] != "_" and \ CFG_BIBUPLOAD_EXTERNAL_SYSNO_TAG[3:4] or " ", 'sysnoind2': CFG_BIBUPLOAD_EXTERNAL_SYSNO_TAG[4:5] != "_" and \ CFG_BIBUPLOAD_EXTERNAL_SYSNO_TAG[4:5] or " ", 'sysnosubfieldcode': CFG_BIBUPLOAD_EXTERNAL_SYSNO_TAG[5:6], } self.hm_testrec2 = """ 001__ 987654321 003__ SzGeCERN 100__ $$aBar, Baz$$uFoo 245__ $$aOn the quux and huux 2 %(sysnotag)s%(sysnoind1)s%(sysnoind2)s $$%(sysnosubfieldcode)ssysno2 %(sysnotag)s%(sysnoind1)s%(sysnoind2)s $$0sysno1 """ % {'sysnotag': CFG_BIBUPLOAD_EXTERNAL_SYSNO_TAG[0:3], 'sysnoind1': CFG_BIBUPLOAD_EXTERNAL_SYSNO_TAG[3:4], 'sysnoind2': CFG_BIBUPLOAD_EXTERNAL_SYSNO_TAG[4:5], 'sysnosubfieldcode': CFG_BIBUPLOAD_EXTERNAL_SYSNO_TAG[5:6], } def test_insert_the_same_sysno_record(self): """bibupload - SYSNO tag, refuse to insert the same SYSNO record""" # initialize bibupload mode: if self.verbose: print "test_insert_the_same_sysno_record() started" # insert record 1 first time: testrec_to_insert_first = self.xm_testrec1.replace('123456789', '') recs = bibupload.xml_marc_to_records(testrec_to_insert_first) task_set_task_param('verbose', 0) err1, recid1 = bibupload.bibupload(recs[0], opt_mode='insert') inserted_xm = print_record(recid1, 'xm') inserted_hm = print_record(recid1, 'hm') # use real recID when comparing whether it worked: self.xm_testrec1 = self.xm_testrec1.replace('123456789', str(recid1)) self.hm_testrec1 = self.hm_testrec1.replace('123456789', str(recid1)) self.assertEqual(compare_xmbuffers(inserted_xm, self.xm_testrec1), '') self.assertEqual(compare_hmbuffers(inserted_hm, self.hm_testrec1), '') # insert record 2 first time: testrec_to_insert_first = self.xm_testrec2.replace('987654321', '') recs = bibupload.xml_marc_to_records(testrec_to_insert_first) task_set_task_param('verbose', 0) err2, recid2 = bibupload.bibupload(recs[0], opt_mode='insert') inserted_xm = print_record(recid2, 'xm') inserted_hm = print_record(recid2, 'hm') # use real recID when comparing whether it worked: self.xm_testrec2 = self.xm_testrec2.replace('987654321', str(recid2)) self.hm_testrec2 = self.hm_testrec2.replace('987654321', str(recid2)) self.assertEqual(compare_xmbuffers(inserted_xm, self.xm_testrec2), '') self.assertEqual(compare_hmbuffers(inserted_hm, self.hm_testrec2), '') # try to insert updated record 1, it should fail: recs = bibupload.xml_marc_to_records(self.xm_testrec1_to_update) task_set_task_param('verbose', 0) err1_updated, recid1_updated = bibupload.bibupload(recs[0], opt_mode='insert') self.assertEqual(-1, recid1_updated) # delete test records bibupload.wipe_out_record_from_all_tables(recid1) bibupload.wipe_out_record_from_all_tables(recid2) bibupload.wipe_out_record_from_all_tables(recid1_updated) if self.verbose: print "test_insert_the_same_sysno_record() finished" def test_insert_or_replace_the_same_sysno_record(self): """bibupload - SYSNO tag, allow to insert or replace the same SYSNO record""" # initialize bibupload mode: task_set_task_param('verbose', self.verbose) if self.verbose: print "test_insert_or_replace_the_same_sysno_record() started" # insert/replace record 1 first time: testrec_to_insert_first = self.xm_testrec1.replace('123456789', '') recs = bibupload.xml_marc_to_records(testrec_to_insert_first) err1, recid1 = bibupload.bibupload(recs[0], opt_mode='replace_or_insert') inserted_xm = print_record(recid1, 'xm') inserted_hm = print_record(recid1, 'hm') # use real recID in test buffers when comparing whether it worked: self.xm_testrec1 = self.xm_testrec1.replace('123456789', str(recid1)) self.hm_testrec1 = self.hm_testrec1.replace('123456789', str(recid1)) self.assertEqual(compare_xmbuffers(inserted_xm, self.xm_testrec1), '') self.assertEqual(compare_hmbuffers(inserted_hm, self.hm_testrec1), '') # try to insert/replace updated record 1, it should be okay: task_set_task_param('verbose', self.verbose) recs = bibupload.xml_marc_to_records(self.xm_testrec1_to_update) err1_updated, recid1_updated = bibupload.bibupload(recs[0], opt_mode='replace_or_insert') inserted_xm = print_record(recid1_updated, 'xm') inserted_hm = print_record(recid1_updated, 'hm') self.assertEqual(recid1, recid1_updated) # use real recID in test buffers when comparing whether it worked: self.xm_testrec1_updated = self.xm_testrec1_updated.replace('123456789', str(recid1)) self.hm_testrec1_updated = self.hm_testrec1_updated.replace('123456789', str(recid1)) self.assertEqual(compare_xmbuffers(inserted_xm, self.xm_testrec1_updated), '') self.assertEqual(compare_hmbuffers(inserted_hm, self.hm_testrec1_updated), '') # delete test records bibupload.wipe_out_record_from_all_tables(recid1) bibupload.wipe_out_record_from_all_tables(recid1_updated) if self.verbose: print "test_insert_or_replace_the_same_sysno_record() finished" def test_replace_nonexisting_sysno_record(self): """bibupload - SYSNO tag, refuse to replace non-existing SYSNO record""" # initialize bibupload mode: task_set_task_param('verbose', self.verbose) if self.verbose: print "test_replace_nonexisting_sysno_record() started" # insert record 1 first time: testrec_to_insert_first = self.xm_testrec1.replace('123456789', '') recs = bibupload.xml_marc_to_records(testrec_to_insert_first) err1, recid1 = bibupload.bibupload(recs[0], opt_mode='replace_or_insert') inserted_xm = print_record(recid1, 'xm') inserted_hm = print_record(recid1, 'hm') # use real recID in test buffers when comparing whether it worked: self.xm_testrec1 = self.xm_testrec1.replace('123456789', str(recid1)) self.hm_testrec1 = self.hm_testrec1.replace('123456789', str(recid1)) self.assertEqual(compare_xmbuffers(inserted_xm, self.xm_testrec1), '') self.assertEqual(compare_hmbuffers(inserted_hm, self.hm_testrec1), '') # try to replace record 2 it should fail: testrec_to_insert_first = self.xm_testrec2.replace('987654321', '') recs = bibupload.xml_marc_to_records(testrec_to_insert_first) err2, recid2 = bibupload.bibupload(recs[0], opt_mode='replace') self.assertEqual(-1, recid2) # delete test records bibupload.wipe_out_record_from_all_tables(recid1) bibupload.wipe_out_record_from_all_tables(recid2) if self.verbose: print "test_replace_nonexisting_sysno_record() finished" class BibUploadRecordsWithEXTOAIIDTest(unittest.TestCase): """Testing uploading of records that have external EXTOAIID present.""" def setUp(self): # pylint: disable-msg=C0103 """Initialize the MARCXML test records.""" self.verbose = 0 # Note that EXTOAIID fields are repeated but with different # subfields, this is to test whether bibupload would not # mistakenly pick up wrong values. self.xm_testrec1 = """ 123456789 SzGeCERN extoaiid1 extoaisrc1 extoaiid2 Bar, Baz Foo On the quux and huux 1 """ % {'extoaiidtag': CFG_BIBUPLOAD_EXTERNAL_OAIID_TAG[0:3], 'extoaiidind1': CFG_BIBUPLOAD_EXTERNAL_OAIID_TAG[3:4] != "_" and \ CFG_BIBUPLOAD_EXTERNAL_OAIID_TAG[3:4] or " ", 'extoaiidind2': CFG_BIBUPLOAD_EXTERNAL_OAIID_TAG[4:5] != "_" and \ CFG_BIBUPLOAD_EXTERNAL_OAIID_TAG[4:5] or " ", 'extoaiidsubfieldcode': CFG_BIBUPLOAD_EXTERNAL_OAIID_TAG[5:6], 'extoaisrcsubfieldcode' : CFG_BIBUPLOAD_EXTERNAL_OAIID_PROVENANCE_TAG[5:6], } self.hm_testrec1 = """ 001__ 123456789 003__ SzGeCERN %(extoaiidtag)s%(extoaiidind1)s%(extoaiidind2)s $$%(extoaisrcsubfieldcode)sextoaisrc1$$%(extoaiidsubfieldcode)sextoaiid1 %(extoaiidtag)s%(extoaiidind1)s%(extoaiidind2)s $$0extoaiid2 100__ $$aBar, Baz$$uFoo 245__ $$aOn the quux and huux 1 """ % {'extoaiidtag': CFG_BIBUPLOAD_EXTERNAL_OAIID_TAG[0:3], 'extoaiidind1': CFG_BIBUPLOAD_EXTERNAL_OAIID_TAG[3:4], 'extoaiidind2': CFG_BIBUPLOAD_EXTERNAL_OAIID_TAG[4:5], 'extoaiidsubfieldcode': CFG_BIBUPLOAD_EXTERNAL_OAIID_TAG[5:6], 'extoaisrcsubfieldcode' : CFG_BIBUPLOAD_EXTERNAL_OAIID_PROVENANCE_TAG[5:6], } self.xm_testrec1_to_update = """ SzGeCERN extoaiid1 extoaisrc1 extoaiid2 Bar, Baz Foo On the quux and huux 1 Updated """ % {'extoaiidtag': CFG_BIBUPLOAD_EXTERNAL_OAIID_TAG[0:3], 'extoaiidind1': CFG_BIBUPLOAD_EXTERNAL_OAIID_TAG[3:4] != "_" and \ CFG_BIBUPLOAD_EXTERNAL_OAIID_TAG[3:4] or " ", 'extoaiidind2': CFG_BIBUPLOAD_EXTERNAL_OAIID_TAG[4:5] != "_" and \ CFG_BIBUPLOAD_EXTERNAL_OAIID_TAG[4:5] or " ", 'extoaiidsubfieldcode': CFG_BIBUPLOAD_EXTERNAL_OAIID_TAG[5:6], 'extoaisrcsubfieldcode' : CFG_BIBUPLOAD_EXTERNAL_OAIID_PROVENANCE_TAG[5:6], } self.xm_testrec1_updated = """ 123456789 SzGeCERN extoaiid1 extoaisrc1 extoaiid2 Bar, Baz Foo On the quux and huux 1 Updated """ % {'extoaiidtag': CFG_BIBUPLOAD_EXTERNAL_OAIID_TAG[0:3], 'extoaiidind1': CFG_BIBUPLOAD_EXTERNAL_OAIID_TAG[3:4] != "_" and \ CFG_BIBUPLOAD_EXTERNAL_OAIID_TAG[3:4] or " ", 'extoaiidind2': CFG_BIBUPLOAD_EXTERNAL_OAIID_TAG[4:5] != "_" and \ CFG_BIBUPLOAD_EXTERNAL_OAIID_TAG[4:5] or " ", 'extoaiidsubfieldcode': CFG_BIBUPLOAD_EXTERNAL_OAIID_TAG[5:6], 'extoaisrcsubfieldcode' : CFG_BIBUPLOAD_EXTERNAL_OAIID_PROVENANCE_TAG[5:6], } self.hm_testrec1_updated = """ 001__ 123456789 003__ SzGeCERN %(extoaiidtag)s%(extoaiidind1)s%(extoaiidind2)s $$%(extoaisrcsubfieldcode)sextoaisrc1$$%(extoaiidsubfieldcode)sextoaiid1 %(extoaiidtag)s%(extoaiidind1)s%(extoaiidind2)s $$0extoaiid2 100__ $$aBar, Baz$$uFoo 245__ $$aOn the quux and huux 1 Updated """ % {'extoaiidtag': CFG_BIBUPLOAD_EXTERNAL_OAIID_TAG[0:3], 'extoaiidind1': CFG_BIBUPLOAD_EXTERNAL_OAIID_TAG[3:4], 'extoaiidind2': CFG_BIBUPLOAD_EXTERNAL_OAIID_TAG[4:5], 'extoaiidsubfieldcode': CFG_BIBUPLOAD_EXTERNAL_OAIID_TAG[5:6], 'extoaisrcsubfieldcode' : CFG_BIBUPLOAD_EXTERNAL_OAIID_PROVENANCE_TAG[5:6], } self.xm_testrec2 = """ 987654321 SzGeCERN extoaiid2 extoaisrc1 extoaiid1 Bar, Baz Foo On the quux and huux 2 """ % {'extoaiidtag': CFG_BIBUPLOAD_EXTERNAL_OAIID_TAG[0:3], 'extoaiidind1': CFG_BIBUPLOAD_EXTERNAL_OAIID_TAG[3:4] != "_" and \ CFG_BIBUPLOAD_EXTERNAL_OAIID_TAG[3:4] or " ", 'extoaiidind2': CFG_BIBUPLOAD_EXTERNAL_OAIID_TAG[4:5] != "_" and \ CFG_BIBUPLOAD_EXTERNAL_OAIID_TAG[4:5] or " ", 'extoaiidsubfieldcode': CFG_BIBUPLOAD_EXTERNAL_OAIID_TAG[5:6], 'extoaisrcsubfieldcode' : CFG_BIBUPLOAD_EXTERNAL_OAIID_PROVENANCE_TAG[5:6], } self.hm_testrec2 = """ 001__ 987654321 003__ SzGeCERN %(extoaiidtag)s%(extoaiidind1)s%(extoaiidind2)s $$%(extoaisrcsubfieldcode)sextoaisrc1$$%(extoaiidsubfieldcode)sextoaiid2 %(extoaiidtag)s%(extoaiidind1)s%(extoaiidind2)s $$0extoaiid1 100__ $$aBar, Baz$$uFoo 245__ $$aOn the quux and huux 2 """ % {'extoaiidtag': CFG_BIBUPLOAD_EXTERNAL_OAIID_TAG[0:3], 'extoaiidind1': CFG_BIBUPLOAD_EXTERNAL_OAIID_TAG[3:4], 'extoaiidind2': CFG_BIBUPLOAD_EXTERNAL_OAIID_TAG[4:5], 'extoaiidsubfieldcode': CFG_BIBUPLOAD_EXTERNAL_OAIID_TAG[5:6], 'extoaisrcsubfieldcode' : CFG_BIBUPLOAD_EXTERNAL_OAIID_PROVENANCE_TAG[5:6], } def test_insert_the_same_extoaiid_record(self): """bibupload - EXTOAIID tag, refuse to insert the same EXTOAIID record""" # initialize bibupload mode: task_set_task_param('verbose', self.verbose) if self.verbose: print "test_insert_the_same_extoaiid_record() started" # insert record 1 first time: testrec_to_insert_first = self.xm_testrec1.replace('123456789', '') recs = bibupload.xml_marc_to_records(testrec_to_insert_first) err1, recid1 = bibupload.bibupload(recs[0], opt_mode='insert') inserted_xm = print_record(recid1, 'xm') inserted_hm = print_record(recid1, 'hm') # use real recID when comparing whether it worked: self.xm_testrec1 = self.xm_testrec1.replace('123456789', str(recid1)) self.hm_testrec1 = self.hm_testrec1.replace('123456789', str(recid1)) self.assertEqual(compare_xmbuffers(inserted_xm, self.xm_testrec1), '') self.assertEqual(compare_hmbuffers(inserted_hm, self.hm_testrec1), '') # insert record 2 first time: testrec_to_insert_first = self.xm_testrec2.replace('987654321', '') recs = bibupload.xml_marc_to_records(testrec_to_insert_first) err2, recid2 = bibupload.bibupload(recs[0], opt_mode='insert') inserted_xm = print_record(recid2, 'xm') inserted_hm = print_record(recid2, 'hm') # use real recID when comparing whether it worked: self.xm_testrec2 = self.xm_testrec2.replace('987654321', str(recid2)) self.hm_testrec2 = self.hm_testrec2.replace('987654321', str(recid2)) self.assertEqual(compare_xmbuffers(inserted_xm, self.xm_testrec2), '') self.assertEqual(compare_hmbuffers(inserted_hm, self.hm_testrec2), '') # try to insert updated record 1, it should fail: recs = bibupload.xml_marc_to_records(self.xm_testrec1_to_update) err1_updated, recid1_updated = bibupload.bibupload(recs[0], opt_mode='insert') self.assertEqual(-1, recid1_updated) # delete test records bibupload.wipe_out_record_from_all_tables(recid1) bibupload.wipe_out_record_from_all_tables(recid2) bibupload.wipe_out_record_from_all_tables(recid1_updated) if self.verbose: print "test_insert_the_same_extoaiid_record() finished" def test_insert_or_replace_the_same_extoaiid_record(self): """bibupload - EXTOAIID tag, allow to insert or replace the same EXTOAIID record""" # initialize bibupload mode: task_set_task_param('verbose', self.verbose) if self.verbose: print "test_insert_or_replace_the_same_extoaiid_record() started" # insert/replace record 1 first time: testrec_to_insert_first = self.xm_testrec1.replace('123456789', '') recs = bibupload.xml_marc_to_records(testrec_to_insert_first) err1, recid1 = bibupload.bibupload(recs[0], opt_mode='replace_or_insert') inserted_xm = print_record(recid1, 'xm') inserted_hm = print_record(recid1, 'hm') # use real recID in test buffers when comparing whether it worked: self.xm_testrec1 = self.xm_testrec1.replace('123456789', str(recid1)) self.hm_testrec1 = self.hm_testrec1.replace('123456789', str(recid1)) self.assertEqual(compare_xmbuffers(inserted_xm, self.xm_testrec1), '') self.assertEqual(compare_hmbuffers(inserted_hm, self.hm_testrec1), '') # try to insert/replace updated record 1, it should be okay: recs = bibupload.xml_marc_to_records(self.xm_testrec1_to_update) err1_updated, recid1_updated = bibupload.bibupload(recs[0], opt_mode='replace_or_insert') inserted_xm = print_record(recid1_updated, 'xm') inserted_hm = print_record(recid1_updated, 'hm') self.assertEqual(recid1, recid1_updated) # use real recID in test buffers when comparing whether it worked: self.xm_testrec1_updated = self.xm_testrec1_updated.replace('123456789', str(recid1)) self.hm_testrec1_updated = self.hm_testrec1_updated.replace('123456789', str(recid1)) self.assertEqual(compare_xmbuffers(inserted_xm, self.xm_testrec1_updated), '') self.assertEqual(compare_hmbuffers(inserted_hm, self.hm_testrec1_updated), '') # delete test records bibupload.wipe_out_record_from_all_tables(recid1) bibupload.wipe_out_record_from_all_tables(recid1_updated) if self.verbose: print "test_insert_or_replace_the_same_extoaiid_record() finished" def test_replace_nonexisting_extoaiid_record(self): """bibupload - EXTOAIID tag, refuse to replace non-existing EXTOAIID record""" # initialize bibupload mode: task_set_task_param('verbose', self.verbose) if self.verbose: print "test_replace_nonexisting_extoaiid_record() started" # insert record 1 first time: testrec_to_insert_first = self.xm_testrec1.replace('123456789', '') recs = bibupload.xml_marc_to_records(testrec_to_insert_first) err1, recid1 = bibupload.bibupload(recs[0], opt_mode='replace_or_insert') inserted_xm = print_record(recid1, 'xm') inserted_hm = print_record(recid1, 'hm') # use real recID in test buffers when comparing whether it worked: self.xm_testrec1 = self.xm_testrec1.replace('123456789', str(recid1)) self.hm_testrec1 = self.hm_testrec1.replace('123456789', str(recid1)) self.assertEqual(compare_xmbuffers(inserted_xm, self.xm_testrec1), '') self.assertEqual(compare_hmbuffers(inserted_hm, self.hm_testrec1), '') # try to replace record 2 it should fail: testrec_to_insert_first = self.xm_testrec2.replace('987654321', '') recs = bibupload.xml_marc_to_records(testrec_to_insert_first) err2, recid2 = bibupload.bibupload(recs[0], opt_mode='replace') self.assertEqual(-1, recid2) # delete test records bibupload.wipe_out_record_from_all_tables(recid1) bibupload.wipe_out_record_from_all_tables(recid2) if self.verbose: print "test_replace_nonexisting_extoaiid_record() finished" class BibUploadRecordsWithOAIIDTest(unittest.TestCase): """Testing uploading of records that have OAI ID present.""" def setUp(self): # pylint: disable-msg=C0103 """Initialize the MARCXML test records.""" self.verbose = 0 # Note that OAI fields are repeated but with different # subfields, this is to test whether bibupload would not # mistakenly pick up wrong values. self.xm_testrec1 = """ 123456789 SzGeCERN Bar, Baz Foo On the quux and huux 1 oai:foo:1 oai:foo:2 """ % {'oaitag': CFG_OAI_ID_FIELD[0:3], 'oaiind1': CFG_OAI_ID_FIELD[3:4] != "_" and \ CFG_OAI_ID_FIELD[3:4] or " ", 'oaiind2': CFG_OAI_ID_FIELD[4:5] != "_" and \ CFG_OAI_ID_FIELD[4:5] or " ", 'oaisubfieldcode': CFG_OAI_ID_FIELD[5:6], } self.hm_testrec1 = """ 001__ 123456789 003__ SzGeCERN 100__ $$aBar, Baz$$uFoo 245__ $$aOn the quux and huux 1 %(oaitag)s%(oaiind1)s%(oaiind2)s $$%(oaisubfieldcode)soai:foo:1 %(oaitag)s%(oaiind1)s%(oaiind2)s $$0oai:foo:2 """ % {'oaitag': CFG_OAI_ID_FIELD[0:3], 'oaiind1': CFG_OAI_ID_FIELD[3:4], 'oaiind2': CFG_OAI_ID_FIELD[4:5], 'oaisubfieldcode': CFG_OAI_ID_FIELD[5:6], } self.xm_testrec1_to_update = """ SzGeCERN Bar, Baz Foo On the quux and huux 1 Updated oai:foo:1 oai:foo:2 """ % {'oaitag': CFG_OAI_ID_FIELD[0:3], 'oaiind1': CFG_OAI_ID_FIELD[3:4] != "_" and \ CFG_OAI_ID_FIELD[3:4] or " ", 'oaiind2': CFG_OAI_ID_FIELD[4:5] != "_" and \ CFG_OAI_ID_FIELD[4:5] or " ", 'oaisubfieldcode': CFG_OAI_ID_FIELD[5:6], } self.xm_testrec1_updated = """ 123456789 SzGeCERN Bar, Baz Foo On the quux and huux 1 Updated oai:foo:1 oai:foo:2 """ % {'oaitag': CFG_OAI_ID_FIELD[0:3], 'oaiind1': CFG_OAI_ID_FIELD[3:4] != "_" and \ CFG_OAI_ID_FIELD[3:4] or " ", 'oaiind2': CFG_OAI_ID_FIELD[4:5] != "_" and \ CFG_OAI_ID_FIELD[4:5] or " ", 'oaisubfieldcode': CFG_OAI_ID_FIELD[5:6], } self.hm_testrec1_updated = """ 001__ 123456789 003__ SzGeCERN 100__ $$aBar, Baz$$uFoo 245__ $$aOn the quux and huux 1 Updated %(oaitag)s%(oaiind1)s%(oaiind2)s $$%(oaisubfieldcode)soai:foo:1 %(oaitag)s%(oaiind1)s%(oaiind2)s $$0oai:foo:2 """ % {'oaitag': CFG_OAI_ID_FIELD[0:3], 'oaiind1': CFG_OAI_ID_FIELD[3:4], 'oaiind2': CFG_OAI_ID_FIELD[4:5], 'oaisubfieldcode': CFG_OAI_ID_FIELD[5:6], } self.xm_testrec2 = """ 987654321 SzGeCERN Bar, Baz Foo On the quux and huux 2 oai:foo:2 oai:foo:1 """ % {'oaitag': CFG_OAI_ID_FIELD[0:3], 'oaiind1': CFG_OAI_ID_FIELD[3:4] != "_" and \ CFG_OAI_ID_FIELD[3:4] or " ", 'oaiind2': CFG_OAI_ID_FIELD[4:5] != "_" and \ CFG_OAI_ID_FIELD[4:5] or " ", 'oaisubfieldcode': CFG_OAI_ID_FIELD[5:6], } self.hm_testrec2 = """ 001__ 987654321 003__ SzGeCERN 100__ $$aBar, Baz$$uFoo 245__ $$aOn the quux and huux 2 %(oaitag)s%(oaiind1)s%(oaiind2)s $$%(oaisubfieldcode)soai:foo:2 %(oaitag)s%(oaiind1)s%(oaiind2)s $$0oai:foo:1 """ % {'oaitag': CFG_OAI_ID_FIELD[0:3], 'oaiind1': CFG_OAI_ID_FIELD[3:4], 'oaiind2': CFG_OAI_ID_FIELD[4:5], 'oaisubfieldcode': CFG_OAI_ID_FIELD[5:6], } def test_insert_the_same_oai_record(self): """bibupload - OAIID tag, refuse to insert the same OAI record""" task_set_task_param('verbose', self.verbose) # insert record 1 first time: testrec_to_insert_first = self.xm_testrec1.replace('123456789', '') recs = bibupload.xml_marc_to_records(testrec_to_insert_first) err1, recid1 = bibupload.bibupload(recs[0], opt_mode='insert') inserted_xm = print_record(recid1, 'xm') inserted_hm = print_record(recid1, 'hm') # use real recID when comparing whether it worked: self.xm_testrec1 = self.xm_testrec1.replace('123456789', str(recid1)) self.hm_testrec1 = self.hm_testrec1.replace('123456789', str(recid1)) self.assertEqual(compare_xmbuffers(inserted_xm, self.xm_testrec1), '') self.assertEqual(compare_hmbuffers(inserted_hm, self.hm_testrec1), '') # insert record 2 first time: testrec_to_insert_first = self.xm_testrec2.replace('987654321', '') recs = bibupload.xml_marc_to_records(testrec_to_insert_first) err2, recid2 = bibupload.bibupload(recs[0], opt_mode='insert') inserted_xm = print_record(recid2, 'xm') inserted_hm = print_record(recid2, 'hm') # use real recID when comparing whether it worked: self.xm_testrec2 = self.xm_testrec2.replace('987654321', str(recid2)) self.hm_testrec2 = self.hm_testrec2.replace('987654321', str(recid2)) self.assertEqual(compare_xmbuffers(inserted_xm, self.xm_testrec2), '') self.assertEqual(compare_hmbuffers(inserted_hm, self.hm_testrec2), '') # try to insert updated record 1, it should fail: recs = bibupload.xml_marc_to_records(self.xm_testrec1_to_update) err1_updated, recid1_updated = bibupload.bibupload(recs[0], opt_mode='insert') self.assertEqual(-1, recid1_updated) # delete test records bibupload.wipe_out_record_from_all_tables(recid1) bibupload.wipe_out_record_from_all_tables(recid2) bibupload.wipe_out_record_from_all_tables(recid1_updated) def test_insert_or_replace_the_same_oai_record(self): """bibupload - OAIID tag, allow to insert or replace the same OAI record""" # initialize bibupload mode: task_set_task_param('verbose', self.verbose) # insert/replace record 1 first time: testrec_to_insert_first = self.xm_testrec1.replace('123456789', '') recs = bibupload.xml_marc_to_records(testrec_to_insert_first) err1, recid1 = bibupload.bibupload(recs[0], opt_mode='replace_or_insert') inserted_xm = print_record(recid1, 'xm') inserted_hm = print_record(recid1, 'hm') # use real recID in test buffers when comparing whether it worked: self.xm_testrec1 = self.xm_testrec1.replace('123456789', str(recid1)) self.hm_testrec1 = self.hm_testrec1.replace('123456789', str(recid1)) self.assertEqual(compare_xmbuffers(inserted_xm, self.xm_testrec1), '') self.assertEqual(compare_hmbuffers(inserted_hm, self.hm_testrec1), '') # try to insert/replace updated record 1, it should be okay: recs = bibupload.xml_marc_to_records(self.xm_testrec1_to_update) err1_updated, recid1_updated = bibupload.bibupload(recs[0], opt_mode='replace_or_insert') inserted_xm = print_record(recid1_updated, 'xm') inserted_hm = print_record(recid1_updated, 'hm') self.assertEqual(recid1, recid1_updated) # use real recID in test buffers when comparing whether it worked: self.xm_testrec1_updated = self.xm_testrec1_updated.replace('123456789', str(recid1)) self.hm_testrec1_updated = self.hm_testrec1_updated.replace('123456789', str(recid1)) self.assertEqual(compare_xmbuffers(inserted_xm, self.xm_testrec1_updated), '') self.assertEqual(compare_hmbuffers(inserted_hm, self.hm_testrec1_updated), '') # delete test records bibupload.wipe_out_record_from_all_tables(recid1) bibupload.wipe_out_record_from_all_tables(recid1_updated) def test_replace_nonexisting_oai_record(self): """bibupload - OAIID tag, refuse to replace non-existing OAI record""" task_set_task_param('verbose', self.verbose) # insert record 1 first time: testrec_to_insert_first = self.xm_testrec1.replace('123456789', '') recs = bibupload.xml_marc_to_records(testrec_to_insert_first) err1, recid1 = bibupload.bibupload(recs[0], opt_mode='replace_or_insert') inserted_xm = print_record(recid1, 'xm') inserted_hm = print_record(recid1, 'hm') # use real recID in test buffers when comparing whether it worked: self.xm_testrec1 = self.xm_testrec1.replace('123456789', str(recid1)) self.hm_testrec1 = self.hm_testrec1.replace('123456789', str(recid1)) self.assertEqual(compare_xmbuffers(inserted_xm, self.xm_testrec1), '') self.assertEqual(compare_hmbuffers(inserted_hm, self.hm_testrec1), '') # try to replace record 2 it should fail: testrec_to_insert_first = self.xm_testrec2.replace('987654321', '') recs = bibupload.xml_marc_to_records(testrec_to_insert_first) err2, recid2 = bibupload.bibupload(recs[0], opt_mode='replace') self.assertEqual(-1, recid2) # delete test records bibupload.wipe_out_record_from_all_tables(recid1) bibupload.wipe_out_record_from_all_tables(recid2) class BibUploadIndicatorsTest(unittest.TestCase): """ Testing uploading of a MARCXML record with indicators having either blank space (as per MARC schema) or empty string value (old behaviour). """ def setUp(self): """Initialize the MARCXML test record.""" self.testrec1_xm = """ SzGeCERN Test, John Test University """ self.testrec1_hm = """ 003__ SzGeCERN 100__ $$aTest, John$$uTest University """ self.testrec2_xm = """ SzGeCERN Test, John Test University """ self.testrec2_hm = """ 003__ SzGeCERN 100__ $$aTest, John$$uTest University """ def test_record_with_spaces_in_indicators(self): """bibupload - inserting MARCXML with spaces in indicators""" task_set_task_param('verbose', 0) recs = bibupload.xml_marc_to_records(self.testrec1_xm) err, recid = bibupload.bibupload(recs[0], opt_mode='insert') inserted_xm = print_record(recid, 'xm') inserted_hm = print_record(recid, 'hm') self.assertEqual(compare_xmbuffers(remove_tag_001_from_xmbuffer(inserted_xm), self.testrec1_xm), '') self.assertEqual(compare_hmbuffers(remove_tag_001_from_hmbuffer(inserted_hm), self.testrec1_hm), '') bibupload.wipe_out_record_from_all_tables(recid) def test_record_with_no_spaces_in_indicators(self): """bibupload - inserting MARCXML with no spaces in indicators""" task_set_task_param('verbose', 0) recs = bibupload.xml_marc_to_records(self.testrec2_xm) err, recid = bibupload.bibupload(recs[0], opt_mode='insert') inserted_xm = print_record(recid, 'xm') inserted_hm = print_record(recid, 'hm') self.assertEqual(compare_xmbuffers(remove_tag_001_from_xmbuffer(inserted_xm), self.testrec2_xm), '') self.assertEqual(compare_hmbuffers(remove_tag_001_from_hmbuffer(inserted_hm), self.testrec2_hm), '') bibupload.wipe_out_record_from_all_tables(recid) class BibUploadUpperLowerCaseTest(unittest.TestCase): """ Testing treatment of similar records with only upper and lower case value differences in the bibxxx table. """ def setUp(self): """Initialize the MARCXML test records.""" self.testrec1_xm = """ SzGeCERN Test, John Test University """ self.testrec1_hm = """ 003__ SzGeCERN 100__ $$aTest, John$$uTest University """ self.testrec2_xm = """ SzGeCERN TeSt, JoHn Test UniVeRsity """ self.testrec2_hm = """ 003__ SzGeCERN 100__ $$aTeSt, JoHn$$uTest UniVeRsity """ def test_record_with_upper_lower_case_letters(self): """bibupload - inserting similar MARCXML records with upper/lower case""" task_set_task_param('verbose', 0) # insert test record #1: recs = bibupload.xml_marc_to_records(self.testrec1_xm) err1, recid1 = bibupload.bibupload(recs[0], opt_mode='insert') recid1_inserted_xm = print_record(recid1, 'xm') recid1_inserted_hm = print_record(recid1, 'hm') # insert test record #2: recs = bibupload.xml_marc_to_records(self.testrec2_xm) err1, recid2 = bibupload.bibupload(recs[0], opt_mode='insert') recid2_inserted_xm = print_record(recid2, 'xm') recid2_inserted_hm = print_record(recid2, 'hm') # let us compare stuff now: self.assertEqual(compare_xmbuffers(remove_tag_001_from_xmbuffer(recid1_inserted_xm), self.testrec1_xm), '') self.assertEqual(compare_hmbuffers(remove_tag_001_from_hmbuffer(recid1_inserted_hm), self.testrec1_hm), '') self.assertEqual(compare_xmbuffers(remove_tag_001_from_xmbuffer(recid2_inserted_xm), self.testrec2_xm), '') self.assertEqual(compare_hmbuffers(remove_tag_001_from_hmbuffer(recid2_inserted_hm), self.testrec2_hm), '') # clean up after ourselves: bibupload.wipe_out_record_from_all_tables(recid1) bibupload.wipe_out_record_from_all_tables(recid2) class BibUploadControlledProvenanceTest(unittest.TestCase): """Testing treatment of tags under controlled provenance in the correct mode.""" def setUp(self): """Initialize the MARCXML test record.""" self.testrec1_xm = """ 123456789 SzGeCERN Test, Jane Test Institute Test title blabla sam blublu sim human """ self.testrec1_hm = """ 001__ 123456789 003__ SzGeCERN 100__ $$aTest, Jane$$uTest Institute 245__ $$aTest title 6531_ $$9sam$$ablabla 6531_ $$9sim$$ablublu 6531_ $$ahuman """ self.testrec1_xm_to_correct = """ 123456789 bleble sim bloblo som """ self.testrec1_corrected_xm = """ 123456789 SzGeCERN Test, Jane Test Institute Test title blabla sam human bleble sim bloblo som """ self.testrec1_corrected_hm = """ 001__ 123456789 003__ SzGeCERN 100__ $$aTest, Jane$$uTest Institute 245__ $$aTest title 6531_ $$9sam$$ablabla 6531_ $$ahuman 6531_ $$9sim$$ableble 6531_ $$9som$$abloblo """ # insert test record: task_set_task_param('verbose', 0) test_record_xm = self.testrec1_xm.replace('123456789', '') recs = bibupload.xml_marc_to_records(test_record_xm) err, recid = bibupload.bibupload(recs[0], opt_mode='insert') # replace test buffers with real recID: self.testrec1_xm = self.testrec1_xm.replace('123456789', str(recid)) self.testrec1_hm = self.testrec1_hm.replace('123456789', str(recid)) self.testrec1_xm_to_correct = self.testrec1_xm_to_correct.replace('123456789', str(recid)) self.testrec1_corrected_xm = self.testrec1_corrected_xm.replace('123456789', str(recid)) self.testrec1_corrected_hm = self.testrec1_corrected_hm.replace('123456789', str(recid)) # test of the inserted record: inserted_xm = print_record(recid, 'xm') inserted_hm = print_record(recid, 'hm') self.assertEqual(compare_xmbuffers(inserted_xm, self.testrec1_xm), '') self.assertEqual(compare_hmbuffers(inserted_hm, self.testrec1_hm), '') def test_controlled_provenance_persistence(self): """bibupload - correct mode, tags with controlled provenance""" # correct metadata tags; will the protected tags be kept? task_set_task_param('verbose', 0) recs = bibupload.xml_marc_to_records(self.testrec1_xm_to_correct) err, recid = bibupload.bibupload(recs[0], opt_mode='correct') corrected_xm = print_record(recid, 'xm') corrected_hm = print_record(recid, 'hm') # did it work? self.assertEqual(compare_xmbuffers(corrected_xm, self.testrec1_corrected_xm), '') self.assertEqual(compare_hmbuffers(corrected_hm, self.testrec1_corrected_hm), '') # clean up after ourselves: bibupload.wipe_out_record_from_all_tables(recid) class BibUploadStrongTagsTest(unittest.TestCase): """Testing treatment of strong tags and the replace mode.""" def setUp(self): """Initialize the MARCXML test record.""" self.testrec1_xm = """ 123456789 SzGeCERN Test, Jane Test Institute Test title A value Another value """ % {'strong_tag': bibupload.CFG_BIBUPLOAD_STRONG_TAGS[0]} self.testrec1_hm = """ 001__ 123456789 003__ SzGeCERN 100__ $$aTest, Jane$$uTest Institute 245__ $$aTest title %(strong_tag)s__ $$aA value$$bAnother value """ % {'strong_tag': bibupload.CFG_BIBUPLOAD_STRONG_TAGS[0]} self.testrec1_xm_to_replace = """ 123456789 Test, Joseph Test Academy """ self.testrec1_replaced_xm = """ 123456789 Test, Joseph Test Academy A value Another value """ % {'strong_tag': bibupload.CFG_BIBUPLOAD_STRONG_TAGS[0]} self.testrec1_replaced_hm = """ 001__ 123456789 100__ $$aTest, Joseph$$uTest Academy %(strong_tag)s__ $$aA value$$bAnother value """ % {'strong_tag': bibupload.CFG_BIBUPLOAD_STRONG_TAGS[0]} # insert test record: task_set_task_param('verbose', 0) test_record_xm = self.testrec1_xm.replace('123456789', '') recs = bibupload.xml_marc_to_records(test_record_xm) err, recid = bibupload.bibupload(recs[0], opt_mode='insert') # replace test buffers with real recID: self.testrec1_xm = self.testrec1_xm.replace('123456789', str(recid)) self.testrec1_hm = self.testrec1_hm.replace('123456789', str(recid)) self.testrec1_xm_to_replace = self.testrec1_xm_to_replace.replace('123456789', str(recid)) self.testrec1_replaced_xm = self.testrec1_replaced_xm.replace('123456789', str(recid)) self.testrec1_replaced_hm = self.testrec1_replaced_hm.replace('123456789', str(recid)) # test of the inserted record: inserted_xm = print_record(recid, 'xm') inserted_hm = print_record(recid, 'hm') self.assertEqual(compare_xmbuffers(inserted_xm, self.testrec1_xm), '') self.assertEqual(compare_hmbuffers(inserted_hm, self.testrec1_hm), '') def test_strong_tags_persistence(self): """bibupload - strong tags, persistence in replace mode""" # replace all metadata tags; will the strong tags be kept? recs = bibupload.xml_marc_to_records(self.testrec1_xm_to_replace) err, recid = bibupload.bibupload(recs[0], opt_mode='replace') replaced_xm = print_record(recid, 'xm') replaced_hm = print_record(recid, 'hm') # did it work? self.assertEqual(compare_xmbuffers(replaced_xm, self.testrec1_replaced_xm), '') self.assertEqual(compare_hmbuffers(replaced_hm, self.testrec1_replaced_hm), '') # clean up after ourselves: bibupload.wipe_out_record_from_all_tables(recid) return class BibUploadFFTModeTest(unittest.TestCase): """ Testing treatment of fulltext file transfer import mode. """ def _test_bibdoc_status(self, recid, docname, status): res = run_sql('SELECT bd.status FROM bibrec_bibdoc as bb JOIN bibdoc as bd ON bb.id_bibdoc = bd.id WHERE bb.id_bibrec = %s AND bd.docname = %s', (recid, docname)) self.failUnless(res) self.assertEqual(status, res[0][0]) def test_writing_rights(self): """bibupload - FFT has writing rights""" self.failUnless(bibupload.writing_rights_p()) def test_simple_fft_insert(self): """bibupload - simple FFT insert""" # define the test case: test_to_upload = """ SzGeCERN Test, John Test University http://cds.cern.ch/img/cds.gif """ testrec_expected_xm = """ 123456789 SzGeCERN Test, John Test University %(siteurl)s/record/123456789/files/cds.gif """ % {'siteurl': CFG_SITE_URL} testrec_expected_hm = """ 001__ 123456789 003__ SzGeCERN 100__ $$aTest, John$$uTest University 8564_ $$u%(siteurl)s/record/123456789/files/cds.gif """ % {'siteurl': CFG_SITE_URL} testrec_expected_url = "%(siteurl)s/record/123456789/files/cds.gif" \ % {'siteurl': CFG_SITE_URL} # insert test record: task_set_task_param('verbose', 0) recs = bibupload.xml_marc_to_records(test_to_upload) err, recid = bibupload.bibupload(recs[0], opt_mode='insert') # replace test buffers with real recid of inserted test record: testrec_expected_xm = testrec_expected_xm.replace('123456789', str(recid)) testrec_expected_hm = testrec_expected_hm.replace('123456789', str(recid)) testrec_expected_url = testrec_expected_url.replace('123456789', str(recid)) # compare expected results: inserted_xm = print_record(recid, 'xm') inserted_hm = print_record(recid, 'hm') self.assertEqual(compare_xmbuffers(inserted_xm, testrec_expected_xm), '') self.assertEqual(compare_hmbuffers(inserted_hm, testrec_expected_hm), '') self.failUnless(try_url_download(testrec_expected_url)) bibupload.wipe_out_record_from_all_tables(recid) def test_exotic_format_fft_append(self): """bibupload - exotic format FFT append""" # define the test case: testfile = os.path.join(CFG_TMPDIR, 'test.ps.Z') open(testfile, 'w').write('TEST') test_to_upload = """ SzGeCERN Test, John Test University """ testrec_to_append = """ 123456789 %s """ % testfile testrec_expected_xm = """ 123456789 SzGeCERN Test, John Test University %(siteurl)s/record/123456789/files/test.ps.Z """ % {'siteurl': CFG_SITE_URL} testrec_expected_hm = """ 001__ 123456789 003__ SzGeCERN 100__ $$aTest, John$$uTest University 8564_ $$u%(siteurl)s/record/123456789/files/test.ps.Z """ % {'siteurl': CFG_SITE_URL} testrec_expected_url = "%(siteurl)s/record/123456789/files/test.ps.Z" \ % {'siteurl': CFG_SITE_URL} testrec_expected_url2 = "%(siteurl)s/record/123456789/files/test?format=ps.Z" \ % {'siteurl': CFG_SITE_URL} # insert test record: task_set_task_param('verbose', 0) recs = bibupload.xml_marc_to_records(test_to_upload) err, recid = bibupload.bibupload(recs[0], opt_mode='insert') # replace test buffers with real recid of inserted test record: testrec_to_append = testrec_to_append.replace('123456789', str(recid)) testrec_expected_xm = testrec_expected_xm.replace('123456789', str(recid)) testrec_expected_hm = testrec_expected_hm.replace('123456789', str(recid)) testrec_expected_url = testrec_expected_url.replace('123456789', str(recid)) testrec_expected_url2 = testrec_expected_url.replace('123456789', str(recid)) recs = bibupload.xml_marc_to_records(testrec_to_append) err, recid = bibupload.bibupload(recs[0], opt_mode='append') # compare expected results: inserted_xm = print_record(recid, 'xm') inserted_hm = print_record(recid, 'hm') self.assertEqual(compare_xmbuffers(inserted_xm, testrec_expected_xm), '') self.assertEqual(compare_hmbuffers(inserted_hm, testrec_expected_hm), '') self.assertEqual(urlopen(testrec_expected_url).read(), 'TEST') self.assertEqual(urlopen(testrec_expected_url2).read(), 'TEST') bibupload.wipe_out_record_from_all_tables(recid) def test_fft_check_md5_through_bibrecdoc_str(self): """bibupload - simple FFT insert, check md5 through BibRecDocs.str()""" # define the test case: test_to_upload = """ SzGeCERN Test, John Test University %s/img/head.gif """ % CFG_SITE_URL # insert test record: task_set_task_param('verbose', 0) recs = bibupload.xml_marc_to_records(test_to_upload) err, recid = bibupload.bibupload(recs[0], opt_mode='insert') original_md5 = md5(urlopen('%s/img/head.gif' % CFG_SITE_URL).read()).hexdigest() bibrec_str = str(BibRecDocs(int(recid))) md5_found = False for row in bibrec_str.split('\n'): if 'checksum' in row: if original_md5 in row: md5_found = True self.failUnless(md5_found) bibupload.wipe_out_record_from_all_tables(recid) def test_detailed_fft_insert(self): """bibupload - detailed FFT insert""" # define the test case: test_to_upload = """ SzGeCERN Test, John Test University http://cds.cern.ch/img/cds.gif SuperMain This is a description This is a comment CIDIESSE http://cds.cern.ch/img/cds.gif SuperMain .jpeg This is a description This is a second comment CIDIESSE """ testrec_expected_xm = """ 123456789 SzGeCERN Test, John Test University %(siteurl)s/record/123456789/files/CIDIESSE.gif This is a description This is a comment %(siteurl)s/record/123456789/files/CIDIESSE.jpeg This is a description This is a second comment """ % {'siteurl': CFG_SITE_URL} testrec_expected_hm = """ 001__ 123456789 003__ SzGeCERN 100__ $$aTest, John$$uTest University 8564_ $$u%(siteurl)s/record/123456789/files/CIDIESSE.gif$$yThis is a description$$zThis is a comment 8564_ $$u%(siteurl)s/record/123456789/files/CIDIESSE.jpeg$$yThis is a description$$zThis is a second comment """ % {'siteurl': CFG_SITE_URL} testrec_expected_url1 = "%(siteurl)s/record/123456789/files/CIDIESSE.gif" % {'siteurl': CFG_SITE_URL} testrec_expected_url2 = "%(siteurl)s/record/123456789/files/CIDIESSE.jpeg" % {'siteurl': CFG_SITE_URL} # insert test record: task_set_task_param('verbose', 0) recs = bibupload.xml_marc_to_records(test_to_upload) err, recid = bibupload.bibupload(recs[0], opt_mode='insert') # replace test buffers with real recid of inserted test record: testrec_expected_xm = testrec_expected_xm.replace('123456789', str(recid)) testrec_expected_hm = testrec_expected_hm.replace('123456789', str(recid)) testrec_expected_url1 = testrec_expected_url1.replace('123456789', str(recid)) testrec_expected_url2 = testrec_expected_url1.replace('123456789', str(recid)) # compare expected results: inserted_xm = print_record(recid, 'xm') inserted_hm = print_record(recid, 'hm') self.assertEqual(compare_xmbuffers(inserted_xm, testrec_expected_xm), '') self.assertEqual(compare_hmbuffers(inserted_hm, testrec_expected_hm), '') self.failUnless(try_url_download(testrec_expected_url1)) self.failUnless(try_url_download(testrec_expected_url2)) bibupload.wipe_out_record_from_all_tables(recid) def test_simple_fft_insert_with_restriction(self): """bibupload - simple FFT insert with restriction""" # define the test case: test_to_upload = """ SzGeCERN Test, John Test University http://cds.cern.ch/img/cds.gif thesis http://cds.cern.ch/img/cds.gif """ testrec_expected_xm = """ 123456789 SzGeCERN Test, John Test University %(siteurl)s/record/123456789/files/cds.gif %(siteurl)s/record/123456789/files/icon-cds.gif icon """ % {'siteurl': CFG_SITE_URL} testrec_expected_hm = """ 001__ 123456789 003__ SzGeCERN 100__ $$aTest, John$$uTest University 8564_ $$u%(siteurl)s/record/123456789/files/cds.gif 8564_ $$q%(siteurl)s/record/123456789/files/icon-cds.gif$$xicon """ % {'siteurl': CFG_SITE_URL} testrec_expected_url = "%(siteurl)s/record/123456789/files/cds.gif" \ % {'siteurl': CFG_SITE_URL} testrec_expected_icon = "%(siteurl)s/record/123456789/files/icon-cds.gif" \ % {'siteurl': CFG_SITE_URL} # insert test record: task_set_task_param('verbose', 0) recs = bibupload.xml_marc_to_records(test_to_upload) err, recid = bibupload.bibupload(recs[0], opt_mode='insert') # replace test buffers with real recid of inserted test record: testrec_expected_xm = testrec_expected_xm.replace('123456789', str(recid)) testrec_expected_hm = testrec_expected_hm.replace('123456789', str(recid)) testrec_expected_url = testrec_expected_url.replace('123456789', str(recid)) testrec_expected_icon = testrec_expected_icon.replace('123456789', str(recid)) # compare expected results: inserted_xm = print_record(recid, 'xm') inserted_hm = print_record(recid, 'hm') self.assertEqual(compare_xmbuffers(inserted_xm, testrec_expected_xm), '') self.assertEqual(compare_hmbuffers(inserted_hm, testrec_expected_hm), '') open_url = urlopen(testrec_expected_url) self.failUnless("This file is restricted" in open_url.read()) open_icon = urlopen(testrec_expected_icon) restricted_icon = urlopen("%s/img/restricted.gif" % CFG_SITE_URL) self.failUnless(open_icon.read() == restricted_icon.read()) bibupload.wipe_out_record_from_all_tables(recid) def test_simple_fft_insert_with_icon(self): """bibupload - simple FFT insert with icon""" # define the test case: test_to_upload = """ SzGeCERN Test, John Test University http://cds.cern.ch/img/cds.gif http://cds.cern.ch/img/cds.gif """ testrec_expected_xm = """ 123456789 SzGeCERN Test, John Test University %(siteurl)s/record/123456789/files/cds.gif %(siteurl)s/record/123456789/files/icon-cds.gif icon """ % {'siteurl': CFG_SITE_URL} testrec_expected_hm = """ 001__ 123456789 003__ SzGeCERN 100__ $$aTest, John$$uTest University 8564_ $$u%(siteurl)s/record/123456789/files/cds.gif 8564_ $$q%(siteurl)s/record/123456789/files/icon-cds.gif$$xicon """ % {'siteurl': CFG_SITE_URL} testrec_expected_url = "%(siteurl)s/record/123456789/files/cds.gif" \ % {'siteurl': CFG_SITE_URL} testrec_expected_icon = "%(siteurl)s/record/123456789/files/icon-cds.gif" \ % {'siteurl': CFG_SITE_URL} # insert test record: task_set_task_param('verbose', 0) recs = bibupload.xml_marc_to_records(test_to_upload) err, recid = bibupload.bibupload(recs[0], opt_mode='insert') # replace test buffers with real recid of inserted test record: testrec_expected_xm = testrec_expected_xm.replace('123456789', str(recid)) testrec_expected_hm = testrec_expected_hm.replace('123456789', str(recid)) testrec_expected_url = testrec_expected_url.replace('123456789', str(recid)) testrec_expected_icon = testrec_expected_icon.replace('123456789', str(recid)) # compare expected results: inserted_xm = print_record(recid, 'xm') inserted_hm = print_record(recid, 'hm') self.assertEqual(compare_xmbuffers(inserted_xm, testrec_expected_xm), '') self.assertEqual(compare_hmbuffers(inserted_hm, testrec_expected_hm), '') self.failUnless(try_url_download(testrec_expected_url)) self.failUnless(try_url_download(testrec_expected_icon)) bibupload.wipe_out_record_from_all_tables(recid) def test_multiple_fft_insert(self): """bibupload - multiple FFT insert""" # define the test case: test_to_upload = """ SzGeCERN Test, John Test University http://cds.cern.ch/img/cds.gif http://cdsweb.cern.ch/img/head.gif http://doc.cern.ch/archive/electronic/hep-th/0101/0101001.pdf %(prefix)s/var/tmp/demobibdata.xml """ % { 'prefix': CFG_PREFIX } testrec_expected_xm = """ 123456789 SzGeCERN Test, John Test University %(siteurl)s/record/123456789/files/0101001.pdf %(siteurl)s/record/123456789/files/cds.gif %(siteurl)s/record/123456789/files/demobibdata.xml %(siteurl)s/record/123456789/files/head.gif """ % { 'siteurl': CFG_SITE_URL} testrec_expected_hm = """ 001__ 123456789 003__ SzGeCERN 100__ $$aTest, John$$uTest University 8564_ $$u%(siteurl)s/record/123456789/files/0101001.pdf 8564_ $$u%(siteurl)s/record/123456789/files/cds.gif 8564_ $$u%(siteurl)s/record/123456789/files/demobibdata.xml 8564_ $$u%(siteurl)s/record/123456789/files/head.gif """ % { 'siteurl': CFG_SITE_URL} # insert test record: testrec_expected_urls = [] for files in ('cds.gif', 'head.gif', '0101001.pdf', 'demobibdata.xml'): testrec_expected_urls.append('%(siteurl)s/record/123456789/files/%(files)s' % {'siteurl' : CFG_SITE_URL, 'files' : files}) task_set_task_param('verbose', 0) recs = bibupload.xml_marc_to_records(test_to_upload) err, recid = bibupload.bibupload(recs[0], opt_mode='insert') # replace test buffers with real recid of inserted test record: testrec_expected_xm = testrec_expected_xm.replace('123456789', str(recid)) testrec_expected_hm = testrec_expected_hm.replace('123456789', str(recid)) testrec_expected_urls = [] for files in ('cds.gif', 'head.gif', '0101001.pdf', 'demobibdata.xml'): testrec_expected_urls.append('%(siteurl)s/record/%(recid)s/files/%(files)s' % {'siteurl' : CFG_SITE_URL, 'files' : files, 'recid' : recid}) # compare expected results: inserted_xm = print_record(recid, 'xm') inserted_hm = print_record(recid, 'hm') # FIXME: Next test has been commented out since, appearently, the # returned xml can have non predictable row order (but still correct) # Using only html marc output is fine because a value is represented # by a single row, so a row to row comparison can be employed. self.assertEqual(compare_xmbuffers(inserted_xm, testrec_expected_xm), '') self.assertEqual(compare_hmbuffers(inserted_hm, testrec_expected_hm), '') for url in testrec_expected_urls: self.failUnless(try_url_download(url)) self._test_bibdoc_status(recid, 'head', '') self._test_bibdoc_status(recid, '0101001', '') self._test_bibdoc_status(recid, 'cds', '') self._test_bibdoc_status(recid, 'demobibdata', '') bibupload.wipe_out_record_from_all_tables(recid) def test_simple_fft_correct(self): """bibupload - simple FFT correct""" # define the test case: test_to_upload = """ SzGeCERN Test, John Test University http://cds.cern.ch/img/cds.gif """ test_to_correct = """ 123456789 http://cds.cern.ch/img/cds.gif """ testrec_expected_xm = """ 123456789 SzGeCERN Test, John Test University %(siteurl)s/record/123456789/files/cds.gif """ % { 'siteurl': CFG_SITE_URL} testrec_expected_hm = """ 001__ 123456789 003__ SzGeCERN 100__ $$aTest, John$$uTest University 8564_ $$u%(siteurl)s/record/123456789/files/cds.gif """ % { 'siteurl': CFG_SITE_URL} testrec_expected_url = "%(siteurl)s/record/123456789/files/cds.gif" \ % {'siteurl': CFG_SITE_URL} # insert test record: task_set_task_param('verbose', 0) recs = bibupload.xml_marc_to_records(test_to_upload) err, recid = bibupload.bibupload(recs[0], opt_mode='insert') # replace test buffers with real recid of inserted test record: testrec_expected_xm = testrec_expected_xm.replace('123456789', str(recid)) testrec_expected_hm = testrec_expected_hm.replace('123456789', str(recid)) testrec_expected_url = testrec_expected_url.replace('123456789', str(recid)) test_to_correct = test_to_correct.replace('123456789', str(recid)) # correct test record with new FFT: task_set_task_param('verbose', 0) recs = bibupload.xml_marc_to_records(test_to_correct) bibupload.bibupload(recs[0], opt_mode='correct') # compare expected results: inserted_xm = print_record(recid, 'xm') inserted_hm = print_record(recid, 'hm') self.failUnless(try_url_download(testrec_expected_url)) self.assertEqual(compare_xmbuffers(inserted_xm, testrec_expected_xm), '') self.assertEqual(compare_hmbuffers(inserted_hm, testrec_expected_hm), '') self._test_bibdoc_status(recid, 'cds', '') #print "\nRecid: " + str(recid) + "\n" #print testrec_expected_hm + "\n" #print print_record(recid, 'hm') + "\n" bibupload.wipe_out_record_from_all_tables(recid) def test_fft_vs_bibedit(self): """bibupload - FFT Vs. BibEdit compatibility""" # define the test case: test_to_upload = """ SzGeCERN Test, John Test University http://cds.cern.ch/img/cds.gif """ test_to_replace = """ 123456789 SzGeCERN Test, John Test University http://www.google.com/ BibEdit Comment %(siteurl)s/record/123456789/files/cds.gif BibEdit Description 01 http://cern.ch/ """ % { 'siteurl': CFG_SITE_URL} testrec_expected_xm = str(test_to_replace) testrec_expected_hm = """ 001__ 123456789 003__ SzGeCERN 100__ $$aTest, John$$uTest University 8564_ $$uhttp://www.google.com/ 8564_ $$u%(siteurl)s/record/123456789/files/cds.gif$$x01$$yBibEdit Description$$zBibEdit Comment 8564_ $$uhttp://cern.ch/ """ % { 'siteurl': CFG_SITE_URL} testrec_expected_url = "%(siteurl)s/record/123456789/files/cds.gif" \ % {'siteurl': CFG_SITE_URL} # insert test record: task_set_task_param('verbose', 0) recs = bibupload.xml_marc_to_records(test_to_upload) err, recid = bibupload.bibupload(recs[0], opt_mode='insert') # replace test buffers with real recid of inserted test record: testrec_expected_xm = testrec_expected_xm.replace('123456789', str(recid)) testrec_expected_hm = testrec_expected_hm.replace('123456789', str(recid)) testrec_expected_url = testrec_expected_url.replace('123456789', str(recid)) test_to_replace = test_to_replace.replace('123456789', str(recid)) # correct test record with new FFT: task_set_task_param('verbose', 0) recs = bibupload.xml_marc_to_records(test_to_replace) bibupload.bibupload(recs[0], opt_mode='replace') # compare expected results: inserted_xm = print_record(recid, 'xm') inserted_hm = print_record(recid, 'hm') self.failUnless(try_url_download(testrec_expected_url)) self.assertEqual(compare_xmbuffers(inserted_xm, testrec_expected_xm), '') self.assertEqual(compare_hmbuffers(inserted_hm, testrec_expected_hm), '') self._test_bibdoc_status(recid, 'cds', '') bibrecdocs = BibRecDocs(recid) bibdoc = bibrecdocs.get_bibdoc('cds') self.assertEqual(bibdoc.get_description('.gif'), 'BibEdit Description') bibupload.wipe_out_record_from_all_tables(recid) def test_detailed_fft_correct(self): """bibupload - detailed FFT correct""" # define the test case: test_to_upload = """ SzGeCERN Test, John Test University http://cds.cern.ch/img/cds.gif Try Comment """ test_to_correct = """ 123456789 http://cdsweb.cern.ch/img/head.gif cds patata Next Try KEEP-OLD-VALUE """ testrec_expected_xm = """ 123456789 SzGeCERN Test, John Test University %(siteurl)s/record/123456789/files/patata.gif Next Try Comment """ % { 'siteurl': CFG_SITE_URL} testrec_expected_hm = """ 001__ 123456789 003__ SzGeCERN 100__ $$aTest, John$$uTest University 8564_ $$u%(siteurl)s/record/123456789/files/patata.gif$$yNext Try$$zComment """ % { 'siteurl': CFG_SITE_URL} testrec_expected_url = "%(siteurl)s/record/123456789/files/patata.gif" \ % {'siteurl': CFG_SITE_URL} # insert test record: task_set_task_param('verbose', 0) recs = bibupload.xml_marc_to_records(test_to_upload) err, recid = bibupload.bibupload(recs[0], opt_mode='insert') # replace test buffers with real recid of inserted test record: testrec_expected_xm = testrec_expected_xm.replace('123456789', str(recid)) testrec_expected_hm = testrec_expected_hm.replace('123456789', str(recid)) testrec_expected_url = testrec_expected_url.replace('123456789', str(recid)) test_to_correct = test_to_correct.replace('123456789', str(recid)) # correct test record with new FFT: task_set_task_param('verbose', 0) recs = bibupload.xml_marc_to_records(test_to_correct) bibupload.bibupload(recs[0], opt_mode='correct') # compare expected results: inserted_xm = print_record(recid, 'xm') inserted_hm = print_record(recid, 'hm') self.failUnless(try_url_download(testrec_expected_url)) self.assertEqual(compare_xmbuffers(inserted_xm, testrec_expected_xm), '') self.assertEqual(compare_hmbuffers(inserted_hm, testrec_expected_hm), '') self._test_bibdoc_status(recid, 'patata', '') #print "\nRecid: " + str(recid) + "\n" #print testrec_expected_hm + "\n" #print print_record(recid, 'hm') + "\n" bibupload.wipe_out_record_from_all_tables(recid) def test_no_url_fft_correct(self): """bibupload - no_url FFT correct""" # define the test case: test_to_upload = """ SzGeCERN Test, John Test University http://cds.cern.ch/img/cds.gif Try Comment """ test_to_correct = """ 123456789 cds patata .gif KEEP-OLD-VALUE Next Comment """ testrec_expected_xm = """ 123456789 SzGeCERN Test, John Test University %(siteurl)s/record/123456789/files/patata.gif Try Next Comment """ % { 'siteurl': CFG_SITE_URL} testrec_expected_hm = """ 001__ 123456789 003__ SzGeCERN 100__ $$aTest, John$$uTest University 8564_ $$u%(siteurl)s/record/123456789/files/patata.gif$$yTry$$zNext Comment """ % { 'siteurl': CFG_SITE_URL} testrec_expected_url = "%(siteurl)s/record/123456789/files/patata.gif" \ % {'siteurl': CFG_SITE_URL} # insert test record: task_set_task_param('verbose', 0) recs = bibupload.xml_marc_to_records(test_to_upload) err, recid = bibupload.bibupload(recs[0], opt_mode='insert') # replace test buffers with real recid of inserted test record: testrec_expected_xm = testrec_expected_xm.replace('123456789', str(recid)) testrec_expected_hm = testrec_expected_hm.replace('123456789', str(recid)) testrec_expected_url = testrec_expected_url.replace('123456789', str(recid)) test_to_correct = test_to_correct.replace('123456789', str(recid)) # correct test record with new FFT: recs = bibupload.xml_marc_to_records(test_to_correct) bibupload.bibupload(recs[0], opt_mode='correct') # compare expected results: inserted_xm = print_record(recid, 'xm') inserted_hm = print_record(recid, 'hm') self.failUnless(try_url_download(testrec_expected_url)) self.assertEqual(compare_xmbuffers(inserted_xm, testrec_expected_xm), '') self.assertEqual(compare_hmbuffers(inserted_hm, testrec_expected_hm), '') self._test_bibdoc_status(recid, 'patata', '') #print "\nRecid: " + str(recid) + "\n" #print testrec_expected_hm + "\n" #print print_record(recid, 'hm') + "\n" bibupload.wipe_out_record_from_all_tables(recid) def test_new_icon_fft_append(self): """bibupload - new icon FFT append""" # define the test case: test_to_upload = """ SzGeCERN Test, John Test University """ test_to_correct = """ 123456789 cds http://cds.cern.ch/img/cds.gif """ testrec_expected_xm = """ 123456789 SzGeCERN Test, John Test University %(siteurl)s/record/123456789/files/icon-cds.gif icon """ % { 'siteurl': CFG_SITE_URL} testrec_expected_hm = """ 001__ 123456789 003__ SzGeCERN 100__ $$aTest, John$$uTest University 8564_ $$q%(siteurl)s/record/123456789/files/icon-cds.gif$$xicon """ % { 'siteurl': CFG_SITE_URL} testrec_expected_url = "%(siteurl)s/record/123456789/files/icon-cds.gif" \ % {'siteurl': CFG_SITE_URL} # insert test record: task_set_task_param('verbose', 0) recs = bibupload.xml_marc_to_records(test_to_upload) err, recid = bibupload.bibupload(recs[0], opt_mode='insert') # replace test buffers with real recid of inserted test record: testrec_expected_xm = testrec_expected_xm.replace('123456789', str(recid)) testrec_expected_hm = testrec_expected_hm.replace('123456789', str(recid)) testrec_expected_url = testrec_expected_url.replace('123456789', str(recid)) test_to_correct = test_to_correct.replace('123456789', str(recid)) # correct test record with new FFT: task_set_task_param('verbose', 0) recs = bibupload.xml_marc_to_records(test_to_correct) bibupload.bibupload(recs[0], opt_mode='append') # compare expected results: inserted_xm = print_record(recid, 'xm') inserted_hm = print_record(recid, 'hm') self.failUnless(try_url_download(testrec_expected_url)) self.assertEqual(compare_xmbuffers(inserted_xm, testrec_expected_xm), '') self.assertEqual(compare_hmbuffers(inserted_hm, testrec_expected_hm), '') self._test_bibdoc_status(recid, 'cds', '') #print "\nRecid: " + str(recid) + "\n" #print testrec_expected_hm + "\n" #print print_record(recid, 'hm') + "\n" bibupload.wipe_out_record_from_all_tables(recid) def test_multiple_fft_correct(self): """bibupload - multiple FFT correct""" # define the test case: test_to_upload = """ SzGeCERN Test, John Test University http://cds.cern.ch/img/cds.gif Try Comment Restricted http://cds.cern.ch/img/cds.gif .jpeg Try jpeg Comment jpeg Restricted """ test_to_correct = """ 123456789 http://cds.cern.ch/img/cds.gif patata .gif New restricted """ testrec_expected_xm = """ 123456789 SzGeCERN Test, John Test University %(siteurl)s/record/123456789/files/patata.gif """ % { 'siteurl': CFG_SITE_URL} testrec_expected_hm = """ 001__ 123456789 003__ SzGeCERN 100__ $$aTest, John$$uTest University 8564_ $$u%(siteurl)s/record/123456789/files/patata.gif """ % { 'siteurl': CFG_SITE_URL} testrec_expected_url = "%(siteurl)s/record/123456789/files/patata.gif" \ % {'siteurl': CFG_SITE_URL} # insert test record: task_set_task_param('verbose', 0) recs = bibupload.xml_marc_to_records(test_to_upload) err, recid = bibupload.bibupload(recs[0], opt_mode='insert') # replace test buffers with real recid of inserted test record: testrec_expected_xm = testrec_expected_xm.replace('123456789', str(recid)) testrec_expected_hm = testrec_expected_hm.replace('123456789', str(recid)) testrec_expected_url = testrec_expected_url.replace('123456789', str(recid)) test_to_correct = test_to_correct.replace('123456789', str(recid)) # correct test record with new FFT: task_set_task_param('verbose', 0) recs = bibupload.xml_marc_to_records(test_to_correct) bibupload.bibupload(recs[0], opt_mode='correct') # compare expected results: inserted_xm = print_record(recid, 'xm') inserted_hm = print_record(recid, 'hm') self.failUnless(try_url_download(testrec_expected_url)) self.assertEqual(compare_xmbuffers(inserted_xm, testrec_expected_xm), '') self.assertEqual(compare_hmbuffers(inserted_hm, testrec_expected_hm), '') self._test_bibdoc_status(recid, 'patata', 'New restricted') #print "\nRecid: " + str(recid) + "\n" #print testrec_expected_hm + "\n" #print print_record(recid, 'hm') + "\n" bibupload.wipe_out_record_from_all_tables(recid) def test_purge_fft_correct(self): """bibupload - purge FFT correct""" # define the test case: test_to_upload = """ SzGeCERN Test, John Test University http://cds.cern.ch/img/cds.gif http://cdsweb.cern.ch/img/head.gif """ test_to_correct = """ 123456789 http://cds.cern.ch/img/cds.gif """ test_to_purge = """ 123456789 http://cds.cern.ch/img/cds.gif PURGE """ testrec_expected_xm = """ 123456789 SzGeCERN Test, John Test University %(siteurl)s/record/123456789/files/cds.gif %(siteurl)s/record/123456789/files/head.gif """ % { 'siteurl': CFG_SITE_URL} testrec_expected_hm = """ 001__ 123456789 003__ SzGeCERN 100__ $$aTest, John$$uTest University 8564_ $$u%(siteurl)s/record/123456789/files/cds.gif 8564_ $$u%(siteurl)s/record/123456789/files/head.gif """ % { 'siteurl': CFG_SITE_URL} testrec_expected_url = "%(siteurl)s/record/123456789/files/cds.gif" % { 'siteurl': CFG_SITE_URL} # insert test record: task_set_task_param('verbose', 0) recs = bibupload.xml_marc_to_records(test_to_upload) err, recid = bibupload.bibupload(recs[0], opt_mode='insert') # replace test buffers with real recid of inserted test record: testrec_expected_xm = testrec_expected_xm.replace('123456789', str(recid)) testrec_expected_hm = testrec_expected_hm.replace('123456789', str(recid)) testrec_expected_url = testrec_expected_url.replace('123456789', str(recid)) test_to_correct = test_to_correct.replace('123456789', str(recid)) test_to_purge = test_to_purge.replace('123456789', str(recid)) # correct test record with new FFT: task_set_task_param('verbose', 0) recs = bibupload.xml_marc_to_records(test_to_correct) bibupload.bibupload(recs[0], opt_mode='correct') # purge test record with new FFT: task_set_task_param('verbose', 0) recs = bibupload.xml_marc_to_records(test_to_purge) bibupload.bibupload(recs[0], opt_mode='correct') # compare expected results: inserted_xm = print_record(recid, 'xm') inserted_hm = print_record(recid, 'hm') self.failUnless(try_url_download(testrec_expected_url)) self.assertEqual(compare_xmbuffers(inserted_xm, testrec_expected_xm), '') self.assertEqual(compare_hmbuffers(inserted_hm, testrec_expected_hm), '') self._test_bibdoc_status(recid, 'cds', '') self._test_bibdoc_status(recid, 'head', '') #print "\nRecid: " + str(recid) + "\n" #print testrec_expected_hm + "\n" #print print_record(recid, 'hm') + "\n" bibupload.wipe_out_record_from_all_tables(recid) def test_revert_fft_correct(self): """bibupload - revert FFT correct""" # define the test case: test_to_upload = """ SzGeCERN Test, John Test University %s/img/iconpen.gif cds """ % CFG_SITE_URL test_to_correct = """ 123456789 %s/img/head.gif cds """ % CFG_SITE_URL test_to_revert = """ 123456789 cds REVERT 1 """ testrec_expected_xm = """ 123456789 SzGeCERN Test, John Test University %(siteurl)s/record/123456789/files/cds.gif """ % { 'siteurl': CFG_SITE_URL} testrec_expected_hm = """ 001__ 123456789 003__ SzGeCERN 100__ $$aTest, John$$uTest University 8564_ $$u%(siteurl)s/record/123456789/files/cds.gif """ % { 'siteurl': CFG_SITE_URL} testrec_expected_url = "%(siteurl)s/record/123456789/files/cds.gif" % { 'siteurl': CFG_SITE_URL} # insert test record: task_set_task_param('verbose', 0) recs = bibupload.xml_marc_to_records(test_to_upload) err, recid = bibupload.bibupload(recs[0], opt_mode='insert') # replace test buffers with real recid of inserted test record: testrec_expected_xm = testrec_expected_xm.replace('123456789', str(recid)) testrec_expected_hm = testrec_expected_hm.replace('123456789', str(recid)) testrec_expected_url = testrec_expected_url.replace('123456789', str(recid)) test_to_correct = test_to_correct.replace('123456789', str(recid)) test_to_revert = test_to_revert.replace('123456789', str(recid)) # correct test record with new FFT: task_set_task_param('verbose', 0) recs = bibupload.xml_marc_to_records(test_to_correct) bibupload.bibupload(recs[0], opt_mode='correct') # revert test record with new FFT: task_set_task_param('verbose', 0) recs = bibupload.xml_marc_to_records(test_to_revert) bibupload.bibupload(recs[0], opt_mode='correct') # compare expected results: inserted_xm = print_record(recid, 'xm') inserted_hm = print_record(recid, 'hm') self.failUnless(try_url_download(testrec_expected_url)) self.assertEqual(compare_xmbuffers(inserted_xm, testrec_expected_xm), '') self.assertEqual(compare_hmbuffers(inserted_hm, testrec_expected_hm), '') self._test_bibdoc_status(recid, 'cds', '') expected_content_version1 = urlopen('%s/img/iconpen.gif' % CFG_SITE_URL).read() expected_content_version2 = urlopen('%s/img/head.gif' % CFG_SITE_URL).read() expected_content_version3 = expected_content_version1 content_version1 = urlopen('%s/record/%s/files/cds.gif?version=1' % (CFG_SITE_URL, recid)).read() content_version2 = urlopen('%s/record/%s/files/cds.gif?version=2' % (CFG_SITE_URL, recid)).read() content_version3 = urlopen('%s/record/%s/files/cds.gif?version=3' % (CFG_SITE_URL, recid)).read() self.assertEqual(expected_content_version1, content_version1) self.assertEqual(expected_content_version2, content_version2) self.assertEqual(expected_content_version3, content_version3) #print "\nRecid: " + str(recid) + "\n" #print testrec_expected_hm + "\n" #print print_record(recid, 'hm') + "\n" bibupload.wipe_out_record_from_all_tables(recid) def test_simple_fft_replace(self): """bibupload - simple FFT replace""" # define the test case: test_to_upload = """ SzGeCERN Test, John Test University %s/img/iconpen.gif cds """ % CFG_SITE_URL test_to_replace = """ 123456789 SzGeCERN Test, John Test University %s/img/head.gif """ % CFG_SITE_URL testrec_expected_xm = """ 123456789 SzGeCERN Test, John Test University %(siteurl)s/record/123456789/files/head.gif """ % { 'siteurl': CFG_SITE_URL} testrec_expected_hm = """ 001__ 123456789 003__ SzGeCERN 100__ $$aTest, John$$uTest University 8564_ $$u%(siteurl)s/record/123456789/files/head.gif """ % { 'siteurl': CFG_SITE_URL} testrec_expected_url = "%(siteurl)s/record/123456789/files/head.gif" % { 'siteurl': CFG_SITE_URL} # insert test record: task_set_task_param('verbose', 0) recs = bibupload.xml_marc_to_records(test_to_upload) err, recid = bibupload.bibupload(recs[0], opt_mode='insert') # replace test buffers with real recid of inserted test record: testrec_expected_xm = testrec_expected_xm.replace('123456789', str(recid)) testrec_expected_hm = testrec_expected_hm.replace('123456789', str(recid)) testrec_expected_url = testrec_expected_url.replace('123456789', str(recid)) test_to_replace = test_to_replace.replace('123456789', str(recid)) # replace test record with new FFT: task_set_task_param('verbose', 0) recs = bibupload.xml_marc_to_records(test_to_replace) bibupload.bibupload(recs[0], opt_mode='replace') # compare expected results: inserted_xm = print_record(recid, 'xm') inserted_hm = print_record(recid, 'hm') self.failUnless(try_url_download(testrec_expected_url)) self.assertEqual(compare_xmbuffers(inserted_xm, testrec_expected_xm), '') self.assertEqual(compare_hmbuffers(inserted_hm, testrec_expected_hm), '') expected_content_version = urlopen('%s/img/head.gif' % CFG_SITE_URL).read() content_version = urlopen('%s/record/%s/files/head.gif' % (CFG_SITE_URL, recid)).read() self.assertEqual(expected_content_version, content_version) #print "\nRecid: " + str(recid) + "\n" #print testrec_expected_hm + "\n" #print print_record(recid, 'hm') + "\n" bibupload.wipe_out_record_from_all_tables(recid) TEST_SUITE = make_test_suite(BibUploadInsertModeTest, BibUploadAppendModeTest, BibUploadCorrectModeTest, BibUploadDeleteModeTest, BibUploadReplaceModeTest, BibUploadReferencesModeTest, BibUploadRecordsWithSYSNOTest, BibUploadRecordsWithEXTOAIIDTest, BibUploadRecordsWithOAIIDTest, BibUploadFMTModeTest, BibUploadIndicatorsTest, BibUploadUpperLowerCaseTest, BibUploadControlledProvenanceTest, BibUploadStrongTagsTest, BibUploadFFTModeTest, ) if __name__ == "__main__": run_test_suite(TEST_SUITE, warn_user=True)