diff --git a/modules/bibindex/lib/bibindex_engine.py b/modules/bibindex/lib/bibindex_engine.py index dccab5140..e8676fe08 100644 --- a/modules/bibindex/lib/bibindex_engine.py +++ b/modules/bibindex/lib/bibindex_engine.py @@ -1,1595 +1,1601 @@ ## $Id$ ## BibIndxes bibliographic data, reference and fulltext indexing utility. ## This file is part of the CERN Document Server Software (CDSware). ## Copyright (C) 2002 CERN. ## ## The CDSware is free software; you can redistribute it and/or ## modify it under the terms of the GNU General Public License as ## published by the Free Software Foundation; either version 2 of the ## License, or (at your option) any later version. ## ## The CDSware is distributed in the hope that it will be useful, but ## WITHOUT ANY WARRANTY; without even the implied warranty of ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU ## General Public License for more details. ## ## You should have received a copy of the GNU General Public License ## along with CDSware; if not, write to the Free Software Foundation, Inc., ## 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA. # -*- coding: utf-8 -*- ## $Id$ ## DO NOT EDIT THIS FILE! IT WAS AUTOMATICALLY GENERATED FROM CDSware WML SOURCES. """ BibIndex indexing engine implementation. See bibindex executable for entry point. """ from marshal import loads,dumps from zlib import compress,decompress from string import split,translate,lower,upper import getopt import getpass import string import os import sre import sys import time import MySQLdb import Numeric import urllib import signal import tempfile import unicodedata import traceback import cStringIO from config import * from bibindex_engine_config import * from search_engine_config import cfg_max_recID from search_engine import perform_request_search, strip_accents from dbquery import run_sql from access_control_engine import acc_authorize_action from bibindex_engine_stopwords import is_stopword from bibindex_engine_stemmer import stem ## import optional modules: try: import psyco psyco.bind(get_words_from_phrase) psyco.bind(merge_with_old_recIDs) psyco.bind(serialize_via_numeric_array) psyco.bind(serialize_via_marshal) psyco.bind(deserialize_via_numeric_array) psyco.bind(deserialize_via_marshal) except: pass ## override urllib's default password-asking behaviour: class MyFancyURLopener(urllib.FancyURLopener): def prompt_user_passwd(self, host, realm): # supply some dummy credentials by default return (cfg_urlopener_username, cfg_urlopener_password) def http_error_401(self, url, fp, errcode, errmsg, headers): # do not bother with protected pages raise IOError, (999, 'unauthorized access') return None urllib._urlopener = MyFancyURLopener() def write_message(msg, stream=sys.stdout): """Write message and flush output stream (may be sys.stdout or sys.stderr).""" if stream == sys.stdout or stream == sys.stderr: stream.write(time.strftime("%Y-%m-%d %H:%M:%S --> ", time.localtime())) stream.write("%s\n" % msg) stream.flush() else: sys.stderr.write("Unknown stream %s. [must be sys.stdout or sys.stderr]\n" % stream) return ## precompile some often-used regexp for speed reasons: sre_subfields = sre.compile('\$\$\w'); sre_html = sre.compile("(?s)<[^>]*>|&#?\w+;") sre_block_punctuation_begin = sre.compile(r"^"+cfg_chars_punctuation+"+") sre_block_punctuation_end = sre.compile(cfg_chars_punctuation+"+$") sre_punctuation = sre.compile(cfg_chars_punctuation) sre_separators = sre.compile(cfg_chars_alphanumericseparators) sre_datetime_shift = sre.compile("([-\+]{0,1})([\d]+)([dhms])") nb_char_in_line = 50 # for verbose pretty printing chunksize = 1000 # default size of chunks that the records will be treated by wordTables = [] task_id = -1 base_process_size = 4500 # process base size options = {} # will hold task options ## Dictionary merging functions def intersection(dict, dict2): "Returns intersection of the two dictionaries." int_dict = {} if len(dict1) > len(dict2): for e in dict2: if dict1.has_key(e): int_dict[e] = 1 else: for e in dict1: if dict2.has_key(e): int_dict[e] = 1 return int_dict def union(dict1, dict2): "Returns union of the two dictionaries." union_dict = {} for e in dict1.keys(): union_dict[e] = 1 for e in dict2.keys(): union_dict[e] = 1 return union_dict def diff(dict1, dict2): "Returns dict1 - dict2." diff_dict = {} for e in dict1.keys(): if not dict2.has_key(e): diff_dict[e] = 1 return diff_dict def list_union(list1, list2): "Returns union of the two lists." union_dict = {} for e in list1: union_dict[e] = 1 for e in list2: union_dict[e] = 1 return union_dict.keys() ## safety function for killing slow MySQL threads: def kill_sleepy_mysql_threads(max_threads=cfg_max_mysql_threads, thread_timeout=cfg_mysql_thread_timeout): """Check the number of MySQL threads and if there are more than MAX_THREADS of them, lill all threads that are in a sleeping state for more than THREAD_TIMEOUT seconds. (This is useful for working around the the max_connection problem that appears during indexation in some not-yet-understood cases.) If some threads are to be killed, write info into the log file. """ res = run_sql("SHOW FULL PROCESSLIST") if len(res) > max_threads: for row in res: r_id,r_user,r_host,r_db,r_command,r_time,r_state,r_info = row if r_command == "Sleep" and int(r_time) > thread_timeout: run_sql("KILL %s", (r_id,)) if options["verbose"] >= 1: write_message("WARNING: too many MySQL threads, killing thread %s" % r_id) return ## MARC-21 tag/field access functions def get_fieldvalues(recID, tag): """Returns list of values of the MARC-21 'tag' fields for the record 'recID'.""" out = [] bibXXx = "bib" + tag[0] + tag[1] + "x" bibrec_bibXXx = "bibrec_" + bibXXx query = "SELECT value FROM %s AS b, %s AS bb WHERE bb.id_bibrec=%s AND bb.id_bibxxx=b.id AND tag LIKE '%s'" \ % (bibXXx, bibrec_bibXXx, recID, tag) res = run_sql(query) for row in res: out.append(row[0]) return out def get_field_tags(field): """Returns a list of MARC tags for the field code 'field'. Returns empty list in case of error. Example: field='author', output=['100__%','700__%'].""" out = [] query = """SELECT t.value FROM tag AS t, field_tag AS ft, field AS f WHERE f.code='%s' AND ft.id_field=f.id AND t.id=ft.id_tag ORDER BY ft.score DESC""" % field res = run_sql(query) for row in res: out.append(row[0]) return out ## Fulltext word extraction functions def get_fulltext_urls_from_html_page(htmlpagebody): """Parses htmlpagebody data looking for url_directs referring to probable fulltexts. Returns an array of (ext,url_direct) to fulltexts. Note: it looks for file format extensions as defined by global 'conv_programs'structure. """ out = [] for ext in conv_programs.keys(): expr = sre.compile( r"\"(http://[\w]+\.+[\w]+[^\"'><]*\.)(" + \ ext + r")\"") match = expr.search(htmlpagebody) if match: out.append([ext,match.group()]) else: # FIXME: workaround for getfile, should use bibdoc tables expr_getfile = sre.compile(r"\"(http://.*getfile\.py\?.*format=" + ext + r"&version=.*)\"") match = expr_getfile.search(htmlpagebody) if match: out.append([ext,match.group()]) return out def get_words_from_fulltext(url_indirect,separators="[^\w]",split=string.split): """Returns all the words contained in the fulltext whose url is contained in the document pointed to in phrase. Please note the double indirection. url_indirect returns a document that has to be parsed to get the actual urls.""" if cfg_fulltext_index_local_files_only and string.find(url_indirect, weburl) < 0: return [] if options["verbose"] >= 2: write_message("... reading fulltext files from %s started" % url_indirect) url_direct = None fulltext_urls = None # check for direct link in url url_indirect_ext = lower(split(url_indirect,".")[-1]) if url_indirect_ext in conv_programs.keys(): fulltext_urls = [(url_indirect_ext,url_indirect)] # Indirect url. Try to fetch the real fulltext(s) if not fulltext_urls: # read "setlink" data try: htmlpagebody = urllib.urlopen(url_indirect).read() except: sys.stderr.write("Error: Cannot read %s.\n" % url_indirect) return [] fulltext_urls = get_fulltext_urls_from_html_page(htmlpagebody) if options["verbose"] >= 9: write_message("... fulltext_urls = %s" % fulltext_urls) words = {} # process as many urls as they were found: for (ext,url_direct) in fulltext_urls: if options["verbose"] >= 2: write_message(".... processing %s from %s started" % (ext,url_direct)) # sanity check: if not url_direct: break; # read fulltext file: try: url = urllib.urlopen(url_direct[1:-1]) except: sys.stderr.write("Error: Cannot read %s.\n" % url_direct[1:-1]) break # try other fulltext files... tmp_name = tempfile.mktemp('cdsware.tmp') tmp_fd = open(tmp_name, "w") data_chunk = url.read(8*1024) while data_chunk: tmp_fd.write(data_chunk) data_chunk = url.read(8*1024) tmp_fd.close() # try all available conversion programs according to their order: bingo = 0 for conv_program in conv_programs[ext]: if os.path.exists(conv_program): # intelligence on how to run various conversion programs: cmd = "" # wil keep command to run bingo = 0 # had we success? if os.path.basename(conv_program) == "pdftotext": cmd = "%s %s %s.txt" % (conv_program, tmp_name, tmp_name) elif os.path.basename(conv_program) == "pstotext": if ext == "ps.gz": # is there gzip available? if os.path.exists(conv_programs_helpers["gz"]): cmd = "%s -cd %s | %s > %s.txt" \ % (conv_programs_helpers["gz"], tmp_name, conv_program, tmp_name) else: cmd = "%s %s > %s.txt" \ % (conv_program, tmp_name, tmp_name) elif os.path.basename(conv_program) == "ps2ascii": if ext == "ps.gz": # is there gzip available? if os.path.exists(conv_programs_helpers["gz"]): cmd = "%s -cd %s | %s > %s.txt"\ % (conv_programs_helpers["gz"], tmp_name, conv_program, tmp_name) else: cmd = "%s %s %s.txt" \ % (conv_program, tmp_name, tmp_name) elif os.path.basename(conv_program) == "antiword": cmd = "%s %s > %s.txt" % (conv_program, tmp_name, tmp_name) elif os.path.basename(conv_program) == "catdoc": cmd = "%s %s > %s.txt" % (conv_program, tmp_name, tmp_name) elif os.path.basename(conv_program) == "wvText": cmd = "%s %s %s.txt" % (conv_program, tmp_name, tmp_name) elif os.path.basename(conv_program) == "ppthtml": # is there html2text available? if os.path.exists(conv_programs_helpers["html"]): cmd = "%s %s | %s > %s.txt"\ % (conv_program, tmp_name, conv_programs_helpers["html"], tmp_name) else: cmd = "%s %s > %s.txt" \ % (conv_program, tmp_name, tmp_name) elif os.path.basename(conv_program) == "xlhtml": # is there html2text available? if os.path.exists(conv_programs_helpers["html"]): cmd = "%s %s | %s > %s.txt" % \ (conv_program, tmp_name, conv_programs_helpers["html"], tmp_name) else: cmd = "%s %s > %s.txt" % \ (conv_program, tmp_name, tmp_name) else: sys.stderr.write("Error: Do not know how to handle %s conversion program.\n" % conv_program) # try to run it: try: if options["verbose"] >= 9: write_message("..... launching %s" % cmd) errcode = os.system(cmd) if errcode == 0 and os.path.exists("%s.txt" % tmp_name): bingo = 1 break # bingo! else: write_message("Error while running %s for %s.\n" % (cmd, url_direct), sys.stderr) except: write_message("Error running %s for %s.\n" % (cmd, url_direct), sys.stderr) # were we successful? if bingo: tmp_name_txt_file = open("%s.txt" % tmp_name) for phrase in tmp_name_txt_file.xreadlines(): for word in get_words_from_phrase(phrase): if not words.has_key(word): words[word] = 1; tmp_name_txt_file.close() else: if options["verbose"]: write_message("No conversion success for %s.\n" % (url_direct), sys.stderr) # delete temp files (they might not exist): try: os.unlink(tmp_name) os.unlink(tmp_name + ".txt") except StandardError: write_message("Error: Could not delete file. It didn't exist", sys.stderr) if options["verbose"] >= 2: write_message(".... processing %s from %s ended" % (ext,url_direct)) if options["verbose"] >= 2: write_message("... reading fulltext files from %s ended" % url_indirect) return words.keys() # tagToFunctions mapping. It offers an indirection level necesary for # indexing fulltext. The default is get_words_from_phrase tagToWordsFunctions = {'8564_u':get_words_from_fulltext} def get_words_from_phrase(phrase, split=string.split): - "Returns list of words from phrase 'phrase'." + """Return list of words found in PHRASE. Note that the phrase is + split into groups depending on the alphanumeric characters and + punctuation characters definition present in the config file. + """ words = {} - # chars_punctuation and chars_alphanumericseparators from config - if cfg_remove_html_code and string.find(phrase, " -1: - #Most likely html, remove html code phrase = sre_html.sub(' ', phrase) - phrase = str.lower(phrase) # 1st split phrase into blocks according to whitespace for block in split(strip_accents(phrase)): # 2nd remove leading/trailing punctuation and add block: block = sre_block_punctuation_begin.sub("", block) block = sre_block_punctuation_end.sub("", block) if block: - block = wash_word(block) + block = apply_stemming_and_stopwords_and_length_check(block) if block: words[block] = 1 # 3rd break each block into subblocks according to punctuation and add subblocks: for subblock in sre_punctuation.split(block): - subblock = wash_word(subblock) + subblock = apply_stemming_and_stopwords_and_length_check(subblock) if subblock: words[subblock] = 1 # 4th break each subblock into alphanumeric groups and add groups: for alphanumeric_group in sre_separators.split(subblock): - alphanumeric_group = wash_word(alphanumeric_group) + alphanumeric_group = apply_stemming_and_stopwords_and_length_check(alphanumeric_group) if alphanumeric_group: - words[alphanumeric_group] = 1 - + words[alphanumeric_group] = 1 return words.keys() -def wash_word(word): - if not is_stopword(word): - stemmed = stem(word=word) - if len(stemmed) >= cfg_min_word_length: - return stemmed - return "" +def apply_stemming_and_stopwords_and_length_check(word): + """Return WORD after applying stemming and stopword and length checks. + See the config file in order to influence these. + """ + # stem word, when configured so: + if cfg_stemmer_default_language != None: + word = stem(word, cfg_stemmer_default_language) + # now check against stopwords: + if is_stopword(word): + return "" + # finally check the word length: + if len(word) < cfg_min_word_length: + return "" + return word def remove_subfields(s): "Removes subfields from string, e.g. 'foo $$c bar' becomes 'foo bar'." return sre_subfields.sub(' ', s) def get_index_id(indexname): """Returns the words/phrase index id for INDEXNAME. Returns empty string in case there is no words table for this index. Example: field='author', output=4.""" out = 0 query = """SELECT w.id FROM idxINDEX AS w WHERE w.name='%s' LIMIT 1""" % indexname res = run_sql(query, None, 1) if res: out = res[0][0] return out def get_index_tags(indexname): """Returns the list of tags that are indexed inside INDEXNAME. Returns empty list in case there are no tags indexed in this index. Note: uses get_field_tags() defined before. Example: field='author', output=['100__%', '700__%'].""" out = [] query = """SELECT f.code FROM idxINDEX AS w, idxINDEX_field AS wf, field AS f WHERE w.name='%s' AND w.id=wf.id_idxINDEX AND f.id=wf.id_field""" % indexname res = run_sql(query) for row in res: out.extend(get_field_tags(row[0])) return out def get_all_indexes(): """Returns the list of the names of all defined words indexes. Returns empty list in case there are no tags indexed in this index. Example: output=['global', 'author'].""" out = [] query = """SELECT name FROM idxINDEX""" res = run_sql(query) for row in res: out.append(row[0]) return out def usage(code, msg=''): "Prints usage for this module." if msg: sys.stderr.write("Error: %s.\n" % msg) print >> sys.stderr, \ """ Usage: %s [options] Examples: %s -a -i 234-250,293,300-500 -u admin@cdsware %s -a -w author,fulltext -M 8192 -v3 %s -d -m +4d -A on --flush=10000 Indexing options: -a, --add add or update words for selected records -d, --del delete words for selected records -i, --id=low[-high] select according to doc recID -m, --modified=from[,to] select according to modification date -c, --collection=c1[,c2] select according to collection Repairing options: -k, --check check consistency for all records in the table(s) -r, --repair try to repair all records in the table(s) Specific options: -w, --windex=w1[,w2] word/phrase indexes to consider (all) -M, --maxmem=XXX maximum memory usage in kB (no limit) -f, --flush=NNN full consistent table flush after NNN records (10000) Scheduling options: -u, --user=USER user name to store task, password needed -s, --sleeptime=SLEEP time after which to repeat tasks (no) e.g.: 1s, 30m, 24h, 7d -t, --time=TIME moment for the task to be active (now) e.g.: +15s, 5m, 3h , 2002-10-27 13:57:26 General options: -h, --help print this help and exit -V, --version print version and exit -v, --verbose=LEVEL verbose level (from 0 to 9, default 1) """ % ((sys.argv[0],) * 4) sys.exit(code) def authenticate(user, header="BibIndex Task Submission", action="runbibindex"): """Authenticate the user against the user database. Check for its password, if it exists. Check for action access rights. Return user name upon authorization success, do system exit upon authorization failure. """ print header print "=" * len(header) if user == "": print >> sys.stdout, "\rUsername: ", user = string.strip(string.lower(sys.stdin.readline())) else: print >> sys.stdout, "\rUsername: ", user ## first check user pw: res = run_sql("select id,password from user where email=%s", (user,), 1) if not res: print "Sorry, %s does not exist." % user sys.exit(1) else: (uid_db, password_db) = res[0] if password_db: password_entered = getpass.getpass() if password_db == password_entered: pass else: print "Sorry, wrong credentials for %s." % user sys.exit(1) ## secondly check authorization for the action: (auth_code, auth_message) = acc_authorize_action(uid_db, action) if auth_code != 0: print auth_message sys.exit(1) return user def split_ranges(parse_string): recIDs = [] ranges = string.split(parse_string, ",") for range in ranges: tmp_recIDs = string.split(range, "-") if len(tmp_recIDs)==1: recIDs.append([int(tmp_recIDs[0]), int(tmp_recIDs[0])]) else: if int(tmp_recIDs[0]) > int(tmp_recIDs[1]): # sanity check tmp = tmp_recIDs[0] tmp_recIDs[0] = tmp_recIDs[1] tmp_recIDs[1] = tmp recIDs.append([int(tmp_recIDs[0]), int(tmp_recIDs[1])]) return recIDs def get_word_tables(tables): wordTables = [] if tables: indexes = string.split(tables, ",") for index in indexes: index_id = get_index_id(index) if index_id: wordTables.append({"idxWORD%02dF" % index_id: \ get_index_tags(index)}) else: write_message("Error: There is no %s words table." % index, sys.stderr) else: for index in get_all_indexes(): index_id = get_index_id(index) wordTables.append({"idxWORD%02dF" % index_id: \ get_index_tags(index)}) return wordTables def get_date_range(var): "Returns the two dates contained as a low,high tuple" limits = string.split(var, ",") if len(limits)==1: low = get_datetime(limits[0]) return low,None if len(limits)==2: low = get_datetime(limits[0]) high = get_datetime(limits[1]) return low,high def get_datetime(var, format_string="%Y-%m-%d %H:%M:%S"): """Returns a date string according to the format string. It can handle normal date strings and shifts with respect to now.""" date = time.time() factors = {"d":24*3600, "h":3600, "m":60, "s":1} m = sre_datetime_shift.match(var) if m: sign = m.groups()[0] == "-" and -1 or 1 factor = factors[m.groups()[2]] value = float(m.groups()[1]) date = time.localtime(date + sign * factor * value) date = time.strftime(format_string, date) else: date = time.strptime(var, format_string) date = time.strftime(format_string, date) return date def create_range_list(res): """Creates a range list from a recID select query result contained in res. The result is expected to have ascending numerical order.""" if not res: return [] row = res[0] if not row: return [] else: range_list = [[row[0],row[0]]] for row in res[1:]: id = row[0] if id == range_list[-1][1] + 1: range_list[-1][1] = id else: range_list.append([id,id]) return range_list def beautify_range_list(range_list): """Returns a non overlapping, maximal range list""" ret_list = [] for new in range_list: found = 0 for old in ret_list: if new[0] <= old[0] <= new[1] + 1 or new[0] - 1 <= old[1] <= new[1]: old[0] = min(old[0], new[0]) old[1] = max(old[1], new[1]) found = 1 break if not found: ret_list.append(new) return ret_list def serialize_via_numeric_array(arr): """Serialize Numeric array into a compressed string.""" return compress(Numeric.dumps(arr)) def deserialize_via_numeric_array(string): """Decompress and deserialize string into a Numeric array.""" return Numeric.loads(decompress(string)) def serialize_via_marshal(obj): """Serialize Python object via marshal into a compressed string.""" return MySQLdb.escape_string(compress(dumps(obj))) def deserialize_via_marshal(string): """Decompress and deserialize string into a Python object via marshal.""" return loads(decompress(string)) class WordTable: "A class to hold the words table." def __init__(self, tablename, fields_to_index, separators="[^\s]"): "Creates words table instance." self.tablename = tablename self.recIDs_in_mem = [] self.fields_to_index = fields_to_index self.separators = separators self.value = {} def get_field(self, recID, tag): """Returns list of values of the MARC-21 'tag' fields for the record 'recID'.""" out = [] bibXXx = "bib" + tag[0] + tag[1] + "x" bibrec_bibXXx = "bibrec_" + bibXXx query = """SELECT value FROM %s AS b, %s AS bb WHERE bb.id_bibrec=%s AND bb.id_bibxxx=b.id AND tag LIKE '%s'""" % (bibXXx, bibrec_bibXXx, recID, tag); res = run_sql(query) for row in res: out.append(row[0]) return out def clean(self): "Cleans the words table." self.value={} def put_into_db(self, mode="normal", split=string.split): """Updates the current words table in the corresponding MySQL's idxFOO table. Mode 'normal' means normal execution, mode 'emergency' means words index reverting to old state. """ if options["verbose"]: write_message("%s %s wordtable flush started" % (self.tablename,mode)) write_message('...updating %d words into %s started' % \ (len(self.value), self.tablename)) task_update_progress("%s flushed %d/%d words" % (self.tablename, 0, len(self.value))) self.recIDs_in_mem = beautify_range_list(self.recIDs_in_mem) if mode == "normal": for group in self.recIDs_in_mem: query = """UPDATE %sR SET type='TEMPORARY' WHERE id_bibrec BETWEEN '%d' AND '%d' AND type='CURRENT'""" % \ (self.tablename[:-1], group[0], group[1]) if options["verbose"] >= 9: write_message(query) run_sql(query) nb_words_total = len(self.value) nb_words_report = int(nb_words_total/10) nb_words_done = 0 for word in self.value.keys(): self.put_word_into_db(word) nb_words_done += 1 if nb_words_report!=0 and ((nb_words_done % nb_words_report) == 0): if options["verbose"]: write_message('......processed %d/%d words' % (nb_words_done, nb_words_total)) task_update_progress("%s flushed %d/%d words" % (self.tablename, nb_words_done, nb_words_total)) if options["verbose"] >= 9: write_message('...updating %d words into %s ended' % \ (nb_words_total, self.tablename)) if options["verbose"]: write_message('...updating reverse table %sR started' % self.tablename[:-1]) if mode == "normal": for group in self.recIDs_in_mem: query = """UPDATE %sR SET type='CURRENT' WHERE id_bibrec BETWEEN '%d' AND '%d' AND type='FUTURE'""" % \ (self.tablename[:-1], group[0], group[1]) if options["verbose"] >= 9: write_message(query) run_sql(query) query = """DELETE FROM %sR WHERE id_bibrec BETWEEN '%d' AND '%d' AND type='TEMPORARY'""" % \ (self.tablename[:-1], group[0], group[1]) if options["verbose"] >= 9: write_message(query) run_sql(query) if options["verbose"] >= 9: write_message('End of updating wordTable into %s' % self.tablename) elif mode == "emergency": for group in self.recIDs_in_mem: query = """UPDATE %sR SET type='CURRENT' WHERE id_bibrec BETWEEN '%d' AND '%d' AND type='TEMPORARY'""" % \ (self.tablename[:-1], group[0], group[1]) if options["verbose"] >= 9: write_message(query) run_sql(query) query = """DELETE FROM %sR WHERE id_bibrec BETWEEN '%d' AND '%d' AND type='FUTURE'""" % \ (self.tablename[:-1], group[0], group[1]) if options["verbose"] >= 9: write_message(query) run_sql(query) if options["verbose"] >= 9: write_message('End of emergency flushing wordTable into %s' % self.tablename) if options["verbose"]: write_message('...updating reverse table %sR ended' % self.tablename[:-1]) self.clean() self.recIDs_in_mem = [] if options["verbose"]: write_message("%s %s wordtable flush ended" % (self.tablename, mode)) task_update_progress("%s flush ended" % (self.tablename)) def load_old_recIDs(self,word): """Load existing hitlist for the word from the database index files.""" query = "SELECT hitlist FROM %s WHERE term=%%s" % self.tablename res = run_sql(query, (word,)) if res: return deserialize_via_numeric_array(res[0][0]) else: return None def merge_with_old_recIDs(self,word,set): """Merge the system numbers stored in memory (hash of recIDs with value +1 or -1 according to whether to add/delete them) with those stored in the database index and received in set universe of recIDs for the given word. Return 0 in case no change was done to SET, return 1 in case SET was changed. """ set_changed_p = 0 for recID,sign in self.value[word].items(): if sign == -1 and set[recID]==1: # delete recID if existent in set and if marked as to be deleted set[recID] = 0 set_changed_p = 1 elif set[recID] == 0: # add recID if not existent in set and if marked as to be added set[recID] = 1 set_changed_p = 1 return set_changed_p def put_word_into_db(self, word, split=string.split): """Flush a single word to the database and delete it from memory""" set = self.load_old_recIDs(word) if set: # merge the word recIDs found in memory: if self.merge_with_old_recIDs(word,set) == 0: # nothing to update: if options["verbose"] >= 9: write_message("......... unchanged hitlist for ``%s''" % word) pass else: # yes there were some new words: if options["verbose"] >= 9: write_message("......... updating hitlist for ``%s''" % word) run_sql("UPDATE %s SET hitlist=%%s WHERE term=%%s" % self.tablename, (serialize_via_numeric_array(set), word)) else: # the word is new, will create new set: set = Numeric.zeros(cfg_max_recID+1, Numeric.Int0) Numeric.put(set, self.value[word].keys(), 1) if options["verbose"] >= 9: write_message("......... inserting hitlist for ``%s''" % word) run_sql("INSERT INTO %s (term, hitlist) VALUES (%%s, %%s)" % self.tablename, (word, serialize_via_numeric_array(set))) if not set: # never store empty words run_sql("DELETE from %s WHERE term=%%s" % self.tablename, (word,)) del self.value[word] def display(self): "Displays the word table." keys = self.value.keys() keys.sort() for k in keys: if options["verbose"]: write_message("%s: %s" % (k, self.value[k])) def count(self): "Returns the number of words in the table." return len(self.value) def info(self): "Prints some information on the words table." if options["verbose"]: write_message("The words table contains %d words." % self.count()) def lookup_words(self, word=""): "Lookup word from the words table." if not word: done = 0 while not done: try: word = raw_input("Enter word: ") done = 1 except (EOFError, KeyboardInterrupt): return if self.value.has_key(word): if options["verbose"]: write_message("The word '%s' is found %d times." \ % (word, len(self.value[word]))) else: if options["verbose"]: write_message("The word '%s' does not exist in the word file."\ % word) def update_last_updated(self, starting_time=None): """Update last_updated column of the index table in the database. Puts starting time there so that if the task was interrupted for record download, the records will be reindexed next time.""" if starting_time is None: return None if options["verbose"] >= 9: write_message("updating last_updated to %s...", starting_time) return run_sql("UPDATE idxINDEX SET last_updated=%s WHERE id=%s", (starting_time, self.tablename[len("idxWORD"):-1],)) def add_recIDs(self, recIDs): """Fetches records which id in the recIDs range list and adds them to the wordTable. The recIDs range list is of the form: [[i1_low,i1_high],[i2_low,i2_high], ..., [iN_low,iN_high]]. """ global chunksize flush_count = 0 records_done = 0 records_to_go = 0 for range in recIDs: records_to_go = records_to_go + range[1] - range[0] + 1 time_started = time.time() # will measure profile time for range in recIDs: i_low = range[0] chunksize_count = 0 while i_low <= range[1]: # calculate chunk group of recIDs and treat it: i_high = min(i_low+options["flush"]-flush_count-1,range[1]) i_high = min(i_low+chunksize-chunksize_count-1, i_high) try: self.chk_recID_range(i_low, i_high) except StandardError, e: write_message("Exception caught: %s" % e, sys.stderr) if options["verbose"] >= 9: traceback.print_tb(sys.exc_info()[2]) task_update_status("ERROR") task_sig_stop_commands() sys.exit(1) if options["verbose"]: write_message("%s adding records #%d-#%d started" % \ (self.tablename, i_low, i_high)) if cfg_check_mysql_threads: kill_sleepy_mysql_threads() task_update_progress("%s adding recs %d-%d" % (self.tablename, i_low, i_high)) self.del_recID_range(i_low, i_high) just_processed = self.add_recID_range(i_low, i_high) flush_count = flush_count + i_high - i_low + 1 chunksize_count = chunksize_count + i_high - i_low + 1 records_done = records_done + just_processed if options["verbose"]: write_message("%s adding records #%d-#%d ended " % \ (self.tablename, i_low, i_high)) if chunksize_count >= chunksize: chunksize_count = 0 # flush if necessary: if flush_count >= options["flush"]: self.put_into_db() self.clean() if options["verbose"]: write_message("%s backing up" % (self.tablename)) flush_count = 0 self.log_progress(time_started,records_done,records_to_go) # iterate: i_low = i_high + 1 if flush_count > 0: self.put_into_db() self.log_progress(time_started,records_done,records_to_go) def add_date(self, date): # If date is not set, then retrieve it from the database. # Reindex all formats newer than the modification date if not date: id = self.tablename[len("bibindex"):] query = """SELECT last_updated FROM idxINDEX WHERE id='%s' """ % id res = run_sql(query) if not res: return if not res[0][0]: date = "0000-00-00" else: date = res[0][0] query = """SELECT b.id FROM bibrec AS b WHERE b.modification_date >= '%s' ORDER BY b.id ASC""" % date res = run_sql(query) list = create_range_list(res) if not list: if options["verbose"]: write_message( "No new records added. %s is up to date" % self.tablename) else: self.add_recIDs(list) def add_recID_range(self, recID1, recID2): empty_list_string = serialize_via_marshal([]) wlist = {} self.recIDs_in_mem.append([recID1,recID2]) # secondly fetch all needed tags: for tag in self.fields_to_index: if tag in tagToWordsFunctions.keys(): get_words_function = tagToWordsFunctions[ tag ] else: get_words_function = get_words_from_phrase bibXXx = "bib" + tag[0] + tag[1] + "x" bibrec_bibXXx = "bibrec_" + bibXXx query = """SELECT bb.id_bibrec,b.value FROM %s AS b, %s AS bb WHERE bb.id_bibrec BETWEEN %d AND %d AND bb.id_bibxxx=b.id AND tag LIKE '%s'""" % (bibXXx, bibrec_bibXXx, recID1, recID2, tag) res = run_sql(query) nb_total_to_read = len(res) verbose_idx = 0 # for verbose pretty printing for row in res: recID,phrase = row if not wlist.has_key(recID): wlist[recID] = [] new_words = get_words_function(phrase) # ,self.separators wlist[recID] = list_union(new_words,wlist[recID]) # were there some words for these recIDs found? if len(wlist) == 0: return 0 recIDs = wlist.keys() for recID in recIDs: # was this record marked as deleted? if "DELETED" in self.get_field(recID, "980__c"): wlist[recID] = [] if options["verbose"] >= 9: write_message("... record %d was declared deleted, removing its word list" % recID) if options["verbose"] >= 9: write_message("... record %d, termlist: %s" % (recID, wlist[recID])) # Using cStringIO for speed. query_factory = cStringIO.StringIO() qwrite = query_factory.write qwrite( "INSERT INTO %sR (id_bibrec,termlist,type) VALUES" % self.tablename[:-1]) qwrite( "('" ) qwrite( str(recIDs[0]) ) qwrite( "','" ) qwrite( serialize_via_marshal(wlist[recIDs[0]]) ) qwrite( "','FUTURE')" ) for recID in recIDs[1:]: qwrite(",('") qwrite(str(recID)) qwrite("','") qwrite(serialize_via_marshal(wlist[recID])) qwrite("','FUTURE')") query = query_factory.getvalue() query_factory.close() run_sql(query) query_factory = cStringIO.StringIO() qwrite = query_factory.write qwrite("INSERT INTO %sR (id_bibrec,termlist,type) VALUES" % self.tablename[:-1]) qwrite("('") qwrite(str(recIDs[0])) qwrite("','") qwrite(serialize_via_marshal(wlist[recIDs[0]])) qwrite("','CURRENT')") for recID in recIDs[1:]: qwrite( ",('" ) qwrite( str(recID) ) qwrite( "','" ) qwrite( empty_list_string ) qwrite( "','CURRENT')" ) query = query_factory.getvalue() query_factory.close() try: run_sql(query) except MySQLdb.DatabaseError: # ok, we tried to add an existent record. No problem pass put = self.put for recID in recIDs: for w in wlist[recID]: put(recID, w, 1) return len(recIDs) def log_progress(self, start, done, todo): """Calculate progress and store it. start: start time, done: records processed, todo: total number of records""" time_elapsed = time.time() - start # consistency check if time_elapsed == 0 or done > todo: return time_recs_per_min = done/(time_elapsed/60.0) if options["verbose"]: write_message("%d records took %.1f seconds to complete.(%1.f recs/min)"\ % (done, time_elapsed, time_recs_per_min)) if time_recs_per_min: if options["verbose"]: write_message("Estimated runtime: %.1f minutes" % \ ((todo-done)/time_recs_per_min)) def put(self, recID, word, sign): "Adds/deletes a word to the word list." try: word = lower(word[:50]) if self.value.has_key(word): # the word 'word' exist already: update sign self.value[word][recID] = sign else: self.value[word] = {recID: sign} except: write_message("Error: Cannot put word %s with sign %d for recID %s." % (word, sign, recID)) def del_recIDs(self, recIDs): """Fetches records which id in the recIDs range list and adds them to the wordTable. The recIDs range list is of the form: [[i1_low,i1_high],[i2_low,i2_high], ..., [iN_low,iN_high]]. """ count = 0 for range in recIDs: self.del_recID_range(range[0],range[1]) count = count + range[1] - range[0] self.put_into_db() def del_recID_range(self, low, high): """Deletes records with 'recID' system number between low and high from memory words index table.""" if options["verbose"] > 2: write_message("%s fetching existing words for records #%d-#%d started" % \ (self.tablename, low, high)) self.recIDs_in_mem.append([low,high]) query = """SELECT id_bibrec,termlist FROM %sR as bb WHERE bb.id_bibrec BETWEEN '%d' AND '%d'""" % (self.tablename[:-1], low, high) recID_rows = run_sql(query) for recID_row in recID_rows: recID = recID_row[0] wlist = deserialize_via_marshal(recID_row[1]) for word in wlist: self.put(recID, word, -1) if options["verbose"] > 2: write_message("%s fetching existing words for records #%d-#%d ended" % \ (self.tablename, low, high)) def report_on_table_consistency(self): """Check reverse words index tables (e.g. idxWORD01R) for interesting states such as 'TEMPORARY' state. Prints small report (no of words, no of bad words). """ # find number of words: query = """SELECT COUNT(*) FROM %s""" % (self.tablename) res = run_sql(query, None, 1) if res: nb_words = res[0][0] else: nb_words = 0 # find number of records: query = """SELECT COUNT(DISTINCT(id_bibrec)) FROM %sR""" % (self.tablename[:-1]) res = run_sql(query, None, 1) if res: nb_records = res[0][0] else: nb_records = 0 # report stats: if options["verbose"]: write_message("%s contains %d words from %d records" % (self.tablename, nb_words, nb_records)) # find possible bad states in reverse tables: query = """SELECT COUNT(DISTINCT(id_bibrec)) FROM %sR WHERE type <> 'CURRENT'""" % (self.tablename[:-1]) res = run_sql(query) if res: nb_bad_records = res[0][0] else: nb_bad_records = 999999999 if nb_bad_records: write_message("EMERGENCY: %s needs to repair %d of %d records" % \ (self.tablename, nb_bad_records, nb_records)) else: if options["verbose"]: write_message("%s is in consistent state" % (self.tablename)) return nb_bad_records def repair(self): """Repair the whole table""" # find possible bad states in reverse tables: query = """SELECT COUNT(DISTINCT(id_bibrec)) FROM %sR WHERE type <> 'CURRENT'""" % (self.tablename[:-1]) res = run_sql(query, None, 1) if res: nb_bad_records = res[0][0] else: nb_bad_records = 0 # find number of records: query = """SELECT COUNT(DISTINCT(id_bibrec)) FROM %sR""" % (self.tablename[:-1]) res = run_sql(query) if res: nb_records = res[0][0] else: nb_records = 0 if nb_bad_records == 0: return query = """SELECT id_bibrec FROM %sR WHERE type <> 'CURRENT' ORDER BY id_bibrec""" \ % (self.tablename[:-1]) res = run_sql(query) recIDs = create_range_list(res) flush_count = 0 records_done = 0 records_to_go = 0 for range in recIDs: records_to_go = records_to_go + range[1] - range[0] + 1 time_started = time.time() # will measure profile time for range in recIDs: i_low = range[0] chunksize_count = 0 while i_low <= range[1]: # calculate chunk group of recIDs and treat it: i_high = min(i_low+options["flush"]-flush_count-1,range[1]) i_high = min(i_low+chunksize-chunksize_count-1, i_high) try: self.fix_recID_range(i_low, i_high) except StandardError, e: write_message("Exception caught: %s" % e, sys.stderr) if options["verbose"] >= 9: traceback.print_tb(sys.exc_info()[2]) task_update_status("ERROR") task_sig_stop_commands() sys.exit(1) flush_count = flush_count + i_high - i_low + 1 chunksize_count = chunksize_count + i_high - i_low + 1 records_done = records_done + i_high - i_low + 1 if chunksize_count >= chunksize: chunksize_count = 0 # flush if necessary: if flush_count >= options["flush"]: self.put_into_db("emergency") self.clean() flush_count = 0 self.log_progress(time_started,records_done,records_to_go) # iterate: i_low = i_high + 1 if flush_count > 0: self.put_into_db("emergency") self.log_progress(time_started,records_done,records_to_go) write_message("%s inconsistencies repaired." % self.tablename) def chk_recID_range(self, low, high): """Check if the reverse index table is in proper state""" ## check db query = """SELECT COUNT(*) FROM %sR WHERE type <> 'CURRENT' AND id_bibrec BETWEEN '%d' AND '%d'""" % (self.tablename[:-1], low, high) res = run_sql(query, None, 1) if res[0][0]==0: if options["verbose"]: write_message("%s for %d-%d is in consistent state"%(self.tablename,low,high)) return # okay, words table is consistent ## inconsistency detected! write_message("EMERGENCY: %s inconsistencies detected..." % self.tablename) write_message("""EMERGENCY: Errors found. You should check consistency of the %s - %sR tables.\nRunning 'bibindex --repair' is recommended.""" \ % (self.tablename, self.tablename[:-1])) raise StandardError def fix_recID_range(self, low, high): """Try to fix reverse index database consistency (e.g. table idxWORD01R) in the low,high doc-id range. Possible states for a recID follow: CUR TMP FUT: very bad things have happened: warn! CUR TMP : very bad things have happened: warn! CUR FUT: delete FUT (crash before flushing) CUR : database is ok TMP FUT: add TMP to memory and del FUT from memory flush (revert to old state) TMP : very bad things have happened: warn! FUT: very bad things have happended: warn! """ state = {} query = "SELECT id_bibrec,type FROM %sR WHERE id_bibrec BETWEEN '%d' AND '%d'"\ % (self.tablename[:-1], low, high) res = run_sql(query) for row in res: if not state.has_key(row[0]): state[row[0]]=[] state[row[0]].append(row[1]) ok = 1 # will hold info on whether we will be able to repair for recID in state.keys(): if not 'TEMPORARY' in state[recID]: if 'FUTURE' in state[recID]: if 'CURRENT' not in state[recID]: write_message("EMERGENCY: Record %d is in inconsistent state. Can't repair it" % recID) ok = 0 else: write_message("EMERGENCY: Inconsistency in record %d detected" % recID) query = """DELETE FROM %sR WHERE id_bibrec='%d'""" % (self.tablename[:-1], recID) run_sql(query) write_message("EMERGENCY: Inconsistency in record %d repaired." % recID) else: if 'FUTURE' in state[recID] and not 'CURRENT' in state[recID]: self.recIDs_in_mem.append([recID,recID]) # Get the words file query = """SELECT type,termlist FROM %sR WHERE id_bibrec='%d'""" % (self.tablename[:-1], recID) if options["verbose"] >= 9: write_message(query) res = run_sql(query) for row in res: wlist = deserialize_via_marshal(row[1]) if options["verbose"] >= 9: write_message("Words are %s " % wlist) if row[0] == 'TEMPORARY': sign = 1 else: sign = -1 for word in wlist: self.put(recID, word, sign) else: write_message("EMERGENCY: %s for %d is in inconsistent state. Couldn't repair it." % (self.tablename, recID)) ok = 0 if not ok: write_message("""EMERGENCY: Unrepairable errors found. You should check consistency of the %s - %sR tables. Deleting affected records is recommended.""" % (self.tablename, self.tablename[:-1])) raise StandardError def task_run(row): """Run the indexing task. The row argument is the BibSched task queue row, containing if, arguments, etc. Return 1 in case of success and 0 in case of failure. """ global options, task_id, wordTables, stemmer, stopwords # read from SQL row: task_id = row[0] task_proc = row[1] options = loads(row[6]) task_status = row[7] # sanity check: if task_proc != "bibindex": write_message("The task #%d does not seem to be a BibIndex task." % task_id, sys.stderr) return 0 if task_status != "WAITING": write_message("The task #%d is %s. I expected WAITING." % (task_id, task_status), sys.stderr) return 0 # we can run the task now: if options["verbose"]: write_message("Task #%d started." % task_id) task_starting_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) task_update_status("RUNNING") # install signal handlers signal.signal(signal.SIGUSR1, task_sig_sleep) signal.signal(signal.SIGTERM, task_sig_stop) signal.signal(signal.SIGABRT, task_sig_suicide) signal.signal(signal.SIGCONT, task_sig_wakeup) signal.signal(signal.SIGINT, task_sig_unknown) ## go ahead and treat each table : for table in options["windex"]: wordTable = WordTable(table.keys()[0], table.values()[0]) wordTable.report_on_table_consistency() try: if options["cmd"] == "del": if options["id"]: wordTable.del_recIDs(options["id"]) elif options["collection"]: l_of_colls = string.split(options["collection"], ",") recIDs = perform_request_search(c=l_of_colls) recIDs_range = [] for recID in recIDs: recIDs_range.append([recID,recID]) wordTable.del_recIDs(recIDs_range) else: write_message("Missing IDs of records to delete from index %s.", wordTable.tablename, sys.stderr) raise StandardError elif options["cmd"] == "add": if options["id"]: wordTable.add_recIDs(options["id"]) elif options["collection"]: l_of_colls = string.split(options["collection"], ",") recIDs = perform_request_search(c=l_of_colls) recIDs_range = [] for recID in recIDs: recIDs_range.append([recID,recID]) wordTable.add_recIDs(recIDs_range) else: wordTable.add_date(options["modified"]) # only update last_updated if run via automatic mode: wordTable.update_last_updated(task_starting_time) elif options["cmd"] == "repair": wordTable.repair() else: write_message("Invalid command found processing %s" % \ wordTable.tablename, sys.stderr) raise StandardError except StandardError, e: write_message("Exception caught: %s" % e, sys.stderr) if options["verbose"] >= 9: traceback.print_tb(sys.exc_info()[2]) task_update_status("ERROR") task_sig_stop_commands() sys.exit(1) wordTable.report_on_table_consistency() # We are done. State it in the database, close and quit task_update_status("DONE") if options["verbose"]: write_message("Task #%d finished." % task_id) return 1 def command_line(): global options long_flags =["add","del","id=","modified=","collection=", "windex=", "check","repair","maxmem=", "flush=","user=","sleeptime=", "time=","help", "version", "verbose="] short_flags ="adi:m:c:w:krM:f:u:s:t:hVv:" format_string = "%Y-%m-%d %H:%M:%S" tables = None sleeptime = "" try: opts, args = getopt.getopt(sys.argv[1:], short_flags, long_flags) except getopt.GetoptError, err: write_message(err, sys.stderr) usage(1) if args: usage(1) options={"cmd":"add", "id":[], "modified":[], "collection":[], "maxmem":0, "flush":10000, "sleeptime":0, "verbose":1 } sched_time = time.strftime(format_string) user = "" # Check for key options try: for opt in opts: if opt == ("-h","") or opt == ("--help",""): usage(1) elif opt == ("-V","") or opt == ("--version",""): print bibindex_engine_version sys.exit(1) elif opt[0] in ["--verbose", "-v"]: options["verbose"] = int(opt[1]) elif opt == ("-a","") or opt == ("--add",""): options["cmd"] = "add" if ("-x","") in opts or ("--del","") in opts: usage(1) elif opt == ("-k","") or opt == ("--check",""): options["cmd"] = "check" elif opt == ("-r","") or opt == ("--repair",""): options["cmd"] = "repair" elif opt == ("-d","") or opt == ("--del",""): options["cmd"]="del" elif opt[0] in [ "-i", "--id" ]: options["id"] = options["id"] + split_ranges(opt[1]) elif opt[0] in [ "-m", "--modified" ]: options["modified"] = get_date_range(opt[1]) elif opt[0] in [ "-c", "--collection" ]: options["collection"] = opt[1] elif opt[0] in [ "-w", "--windex" ]: tables = opt[1] elif opt[0] in [ "-M", "--maxmem"]: options["maxmem"]=int(opt[1]) if options["maxmem"] < base_process_size + 1000: raise StandardError, "Memory usage should be higher than %d kB" % (base_process_size + 1000) elif opt[0] in [ "-f", "--flush"]: options["flush"]=int(opt[1]) elif opt[0] in [ "-u", "--user"]: user = opt[1] elif opt[0] in [ "-s", "--sleeptime" ]: get_datetime(opt[1]) # see if it is a valid shift sleeptime= opt[1] elif opt[0] in [ "-t", "--time" ]: sched_time= get_datetime(opt[1]) else: usage(1) except StandardError, e: write_message(e, sys.stderr) sys.exit(1) options["windex"]=get_word_tables(tables) if options["cmd"] == "check": for table in options["windex"]: wordTable = WordTable(table.keys()[0], table.values()[0]) wordTable.report_on_table_consistency() return user = authenticate(user) if options["verbose"] >= 9: print "" write_message("storing task options %s\n" % options) new_task_id = run_sql("""INSERT INTO schTASK (proc,user,runtime,sleeptime,arguments,status) VALUES ('bibindex',%s,%s,%s,%s,'WAITING')""", (user, sched_time, sleeptime, dumps(options))) print "Task #%d was successfully scheduled for execution." % new_task_id return def task_sig_sleep(sig, frame): """Signal handler for the 'sleep' signal sent by BibSched.""" if options["verbose"] >= 9: write_message("got signal %d" % sig) write_message("sleeping...") task_update_status("SLEEPING") signal.pause() # wait for wake-up signal def task_sig_wakeup(sig, frame): """Signal handler for the 'wakeup' signal sent by BibSched.""" if options["verbose"] >= 9: write_message("got signal %d" % sig) write_message("continuing...") task_update_status("CONTINUING") def task_sig_stop(sig, frame): """Signal handler for the 'stop' signal sent by BibSched.""" if options["verbose"] >= 9: write_message("got signal %d" % sig) write_message("stopping...") task_update_status("STOPPING") errcode = 0 try: task_sig_stop_commands() write_message("stopped") task_update_status("STOPPED") except StandardError, err: write_message("Error during stopping! %e" % err) task_update_status("STOPPINGFAILED") errcode = 1 sys.exit(errcode) def task_sig_stop_commands(): """Do all the commands necessary to stop the task before quitting. Useful for task_sig_stop() handler. """ write_message("stopping commands started") for table in wordTables: table.put_into_db() write_message("stopping commands ended") def task_sig_suicide(sig, frame): """Signal handler for the 'suicide' signal sent by BibSched.""" if options["verbose"] >= 9: write_message("got signal %d" % sig) write_message("suiciding myself now...") task_update_status("SUICIDING") write_message("suicided") task_update_status("SUICIDED") sys.exit(0) def task_sig_unknown(sig, frame): """Signal handler for the other unknown signals sent by shell or user.""" if options["verbose"] >= 9: write_message("got signal %d" % sig) write_message("unknown signal %d ignored" % sig) # do nothing for other signals def task_update_progress(msg): """Updates progress information in the BibSched task table.""" global task_id, options if options["verbose"] >= 9: write_message("Updating task progress to %s." % msg) return run_sql("UPDATE schTASK SET progress=%s where id=%s", (msg, task_id)) def task_update_status(val): """Updates state information in the BibSched task table.""" global task_id, options if options["verbose"] >= 9: write_message("Updating task status to %s." % val) return run_sql("UPDATE schTASK SET status=%s where id=%s", (val, task_id)) def test_fulltext_indexing(): """Tests fulltext indexing programs on PDF, PS, DOC, PPT, XLS. Prints list of words and word table on the screen. Does not integrate anything into the database. Useful when debugging problems with fulltext indexing: call this function instead of main(). """ options = {} options["verbose"] = 9 print get_words_from_fulltext("http://doc.cern.ch/cgi-bin/setlink?base=atlnot&categ=Communication&id=com-indet-2002-012") # protected URL print get_words_from_fulltext("http://doc.cern.ch/cgi-bin/setlink?base=agenda&categ=a00388&id=a00388s2t7") # XLS print get_words_from_fulltext("http://doc.cern.ch/cgi-bin/setlink?base=agenda&categ=a02883&id=a02883s1t6/transparencies") # PPT print get_words_from_fulltext("http://doc.cern.ch/cgi-bin/setlink?base=agenda&categ=a99149&id=a99149s1t10/transparencies") # DOC print get_words_from_fulltext("http://doc.cern.ch/cgi-bin/setlink?base=preprint&categ=cern&id=lhc-project-report-601") # PDF sys.exit(0) def test_word_separators(phrase="hep-th/0101001"): """Tests word separating policy on various input.""" print "%s:" % phrase for word in get_words_from_phrase(phrase): print "\t-> %s" % word def main(): """Reads arguments and either runs the task, or starts user-interface (command line).""" if len(sys.argv) == 2: try: id = int(sys.argv[1]) except StandardError, err: command_line() sys.exit() res = run_sql("SELECT * FROM schTASK WHERE id='%d'" % (id), None, 1) if not res: write_message("Selected task not found.", sys.stderr) sys.exit(1) try: if not task_run(res[0]): write_message("Error occurred. Exiting.", sys.stderr) except StandardError, e: write_message("Unexpected error occurred: %s." % e, sys.stderr) if options["verbose"] >= 9: traceback.print_tb(sys.exc_info()[2]) write_message("Exiting.") task_update_status("ERROR") else: command_line() diff --git a/modules/bibindex/lib/bibindex_engine.py.wml b/modules/bibindex/lib/bibindex_engine.py.wml index dccab5140..e8676fe08 100644 --- a/modules/bibindex/lib/bibindex_engine.py.wml +++ b/modules/bibindex/lib/bibindex_engine.py.wml @@ -1,1595 +1,1601 @@ ## $Id$ ## BibIndxes bibliographic data, reference and fulltext indexing utility. ## This file is part of the CERN Document Server Software (CDSware). ## Copyright (C) 2002 CERN. ## ## The CDSware is free software; you can redistribute it and/or ## modify it under the terms of the GNU General Public License as ## published by the Free Software Foundation; either version 2 of the ## License, or (at your option) any later version. ## ## The CDSware is distributed in the hope that it will be useful, but ## WITHOUT ANY WARRANTY; without even the implied warranty of ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU ## General Public License for more details. ## ## You should have received a copy of the GNU General Public License ## along with CDSware; if not, write to the Free Software Foundation, Inc., ## 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA. # -*- coding: utf-8 -*- ## $Id$ ## DO NOT EDIT THIS FILE! IT WAS AUTOMATICALLY GENERATED FROM CDSware WML SOURCES. """ BibIndex indexing engine implementation. See bibindex executable for entry point. """ from marshal import loads,dumps from zlib import compress,decompress from string import split,translate,lower,upper import getopt import getpass import string import os import sre import sys import time import MySQLdb import Numeric import urllib import signal import tempfile import unicodedata import traceback import cStringIO from config import * from bibindex_engine_config import * from search_engine_config import cfg_max_recID from search_engine import perform_request_search, strip_accents from dbquery import run_sql from access_control_engine import acc_authorize_action from bibindex_engine_stopwords import is_stopword from bibindex_engine_stemmer import stem ## import optional modules: try: import psyco psyco.bind(get_words_from_phrase) psyco.bind(merge_with_old_recIDs) psyco.bind(serialize_via_numeric_array) psyco.bind(serialize_via_marshal) psyco.bind(deserialize_via_numeric_array) psyco.bind(deserialize_via_marshal) except: pass ## override urllib's default password-asking behaviour: class MyFancyURLopener(urllib.FancyURLopener): def prompt_user_passwd(self, host, realm): # supply some dummy credentials by default return (cfg_urlopener_username, cfg_urlopener_password) def http_error_401(self, url, fp, errcode, errmsg, headers): # do not bother with protected pages raise IOError, (999, 'unauthorized access') return None urllib._urlopener = MyFancyURLopener() def write_message(msg, stream=sys.stdout): """Write message and flush output stream (may be sys.stdout or sys.stderr).""" if stream == sys.stdout or stream == sys.stderr: stream.write(time.strftime("%Y-%m-%d %H:%M:%S --> ", time.localtime())) stream.write("%s\n" % msg) stream.flush() else: sys.stderr.write("Unknown stream %s. [must be sys.stdout or sys.stderr]\n" % stream) return ## precompile some often-used regexp for speed reasons: sre_subfields = sre.compile('\$\$\w'); sre_html = sre.compile("(?s)<[^>]*>|&#?\w+;") sre_block_punctuation_begin = sre.compile(r"^"+cfg_chars_punctuation+"+") sre_block_punctuation_end = sre.compile(cfg_chars_punctuation+"+$") sre_punctuation = sre.compile(cfg_chars_punctuation) sre_separators = sre.compile(cfg_chars_alphanumericseparators) sre_datetime_shift = sre.compile("([-\+]{0,1})([\d]+)([dhms])") nb_char_in_line = 50 # for verbose pretty printing chunksize = 1000 # default size of chunks that the records will be treated by wordTables = [] task_id = -1 base_process_size = 4500 # process base size options = {} # will hold task options ## Dictionary merging functions def intersection(dict, dict2): "Returns intersection of the two dictionaries." int_dict = {} if len(dict1) > len(dict2): for e in dict2: if dict1.has_key(e): int_dict[e] = 1 else: for e in dict1: if dict2.has_key(e): int_dict[e] = 1 return int_dict def union(dict1, dict2): "Returns union of the two dictionaries." union_dict = {} for e in dict1.keys(): union_dict[e] = 1 for e in dict2.keys(): union_dict[e] = 1 return union_dict def diff(dict1, dict2): "Returns dict1 - dict2." diff_dict = {} for e in dict1.keys(): if not dict2.has_key(e): diff_dict[e] = 1 return diff_dict def list_union(list1, list2): "Returns union of the two lists." union_dict = {} for e in list1: union_dict[e] = 1 for e in list2: union_dict[e] = 1 return union_dict.keys() ## safety function for killing slow MySQL threads: def kill_sleepy_mysql_threads(max_threads=cfg_max_mysql_threads, thread_timeout=cfg_mysql_thread_timeout): """Check the number of MySQL threads and if there are more than MAX_THREADS of them, lill all threads that are in a sleeping state for more than THREAD_TIMEOUT seconds. (This is useful for working around the the max_connection problem that appears during indexation in some not-yet-understood cases.) If some threads are to be killed, write info into the log file. """ res = run_sql("SHOW FULL PROCESSLIST") if len(res) > max_threads: for row in res: r_id,r_user,r_host,r_db,r_command,r_time,r_state,r_info = row if r_command == "Sleep" and int(r_time) > thread_timeout: run_sql("KILL %s", (r_id,)) if options["verbose"] >= 1: write_message("WARNING: too many MySQL threads, killing thread %s" % r_id) return ## MARC-21 tag/field access functions def get_fieldvalues(recID, tag): """Returns list of values of the MARC-21 'tag' fields for the record 'recID'.""" out = [] bibXXx = "bib" + tag[0] + tag[1] + "x" bibrec_bibXXx = "bibrec_" + bibXXx query = "SELECT value FROM %s AS b, %s AS bb WHERE bb.id_bibrec=%s AND bb.id_bibxxx=b.id AND tag LIKE '%s'" \ % (bibXXx, bibrec_bibXXx, recID, tag) res = run_sql(query) for row in res: out.append(row[0]) return out def get_field_tags(field): """Returns a list of MARC tags for the field code 'field'. Returns empty list in case of error. Example: field='author', output=['100__%','700__%'].""" out = [] query = """SELECT t.value FROM tag AS t, field_tag AS ft, field AS f WHERE f.code='%s' AND ft.id_field=f.id AND t.id=ft.id_tag ORDER BY ft.score DESC""" % field res = run_sql(query) for row in res: out.append(row[0]) return out ## Fulltext word extraction functions def get_fulltext_urls_from_html_page(htmlpagebody): """Parses htmlpagebody data looking for url_directs referring to probable fulltexts. Returns an array of (ext,url_direct) to fulltexts. Note: it looks for file format extensions as defined by global 'conv_programs'structure. """ out = [] for ext in conv_programs.keys(): expr = sre.compile( r"\"(http://[\w]+\.+[\w]+[^\"'><]*\.)(" + \ ext + r")\"") match = expr.search(htmlpagebody) if match: out.append([ext,match.group()]) else: # FIXME: workaround for getfile, should use bibdoc tables expr_getfile = sre.compile(r"\"(http://.*getfile\.py\?.*format=" + ext + r"&version=.*)\"") match = expr_getfile.search(htmlpagebody) if match: out.append([ext,match.group()]) return out def get_words_from_fulltext(url_indirect,separators="[^\w]",split=string.split): """Returns all the words contained in the fulltext whose url is contained in the document pointed to in phrase. Please note the double indirection. url_indirect returns a document that has to be parsed to get the actual urls.""" if cfg_fulltext_index_local_files_only and string.find(url_indirect, weburl) < 0: return [] if options["verbose"] >= 2: write_message("... reading fulltext files from %s started" % url_indirect) url_direct = None fulltext_urls = None # check for direct link in url url_indirect_ext = lower(split(url_indirect,".")[-1]) if url_indirect_ext in conv_programs.keys(): fulltext_urls = [(url_indirect_ext,url_indirect)] # Indirect url. Try to fetch the real fulltext(s) if not fulltext_urls: # read "setlink" data try: htmlpagebody = urllib.urlopen(url_indirect).read() except: sys.stderr.write("Error: Cannot read %s.\n" % url_indirect) return [] fulltext_urls = get_fulltext_urls_from_html_page(htmlpagebody) if options["verbose"] >= 9: write_message("... fulltext_urls = %s" % fulltext_urls) words = {} # process as many urls as they were found: for (ext,url_direct) in fulltext_urls: if options["verbose"] >= 2: write_message(".... processing %s from %s started" % (ext,url_direct)) # sanity check: if not url_direct: break; # read fulltext file: try: url = urllib.urlopen(url_direct[1:-1]) except: sys.stderr.write("Error: Cannot read %s.\n" % url_direct[1:-1]) break # try other fulltext files... tmp_name = tempfile.mktemp('cdsware.tmp') tmp_fd = open(tmp_name, "w") data_chunk = url.read(8*1024) while data_chunk: tmp_fd.write(data_chunk) data_chunk = url.read(8*1024) tmp_fd.close() # try all available conversion programs according to their order: bingo = 0 for conv_program in conv_programs[ext]: if os.path.exists(conv_program): # intelligence on how to run various conversion programs: cmd = "" # wil keep command to run bingo = 0 # had we success? if os.path.basename(conv_program) == "pdftotext": cmd = "%s %s %s.txt" % (conv_program, tmp_name, tmp_name) elif os.path.basename(conv_program) == "pstotext": if ext == "ps.gz": # is there gzip available? if os.path.exists(conv_programs_helpers["gz"]): cmd = "%s -cd %s | %s > %s.txt" \ % (conv_programs_helpers["gz"], tmp_name, conv_program, tmp_name) else: cmd = "%s %s > %s.txt" \ % (conv_program, tmp_name, tmp_name) elif os.path.basename(conv_program) == "ps2ascii": if ext == "ps.gz": # is there gzip available? if os.path.exists(conv_programs_helpers["gz"]): cmd = "%s -cd %s | %s > %s.txt"\ % (conv_programs_helpers["gz"], tmp_name, conv_program, tmp_name) else: cmd = "%s %s %s.txt" \ % (conv_program, tmp_name, tmp_name) elif os.path.basename(conv_program) == "antiword": cmd = "%s %s > %s.txt" % (conv_program, tmp_name, tmp_name) elif os.path.basename(conv_program) == "catdoc": cmd = "%s %s > %s.txt" % (conv_program, tmp_name, tmp_name) elif os.path.basename(conv_program) == "wvText": cmd = "%s %s %s.txt" % (conv_program, tmp_name, tmp_name) elif os.path.basename(conv_program) == "ppthtml": # is there html2text available? if os.path.exists(conv_programs_helpers["html"]): cmd = "%s %s | %s > %s.txt"\ % (conv_program, tmp_name, conv_programs_helpers["html"], tmp_name) else: cmd = "%s %s > %s.txt" \ % (conv_program, tmp_name, tmp_name) elif os.path.basename(conv_program) == "xlhtml": # is there html2text available? if os.path.exists(conv_programs_helpers["html"]): cmd = "%s %s | %s > %s.txt" % \ (conv_program, tmp_name, conv_programs_helpers["html"], tmp_name) else: cmd = "%s %s > %s.txt" % \ (conv_program, tmp_name, tmp_name) else: sys.stderr.write("Error: Do not know how to handle %s conversion program.\n" % conv_program) # try to run it: try: if options["verbose"] >= 9: write_message("..... launching %s" % cmd) errcode = os.system(cmd) if errcode == 0 and os.path.exists("%s.txt" % tmp_name): bingo = 1 break # bingo! else: write_message("Error while running %s for %s.\n" % (cmd, url_direct), sys.stderr) except: write_message("Error running %s for %s.\n" % (cmd, url_direct), sys.stderr) # were we successful? if bingo: tmp_name_txt_file = open("%s.txt" % tmp_name) for phrase in tmp_name_txt_file.xreadlines(): for word in get_words_from_phrase(phrase): if not words.has_key(word): words[word] = 1; tmp_name_txt_file.close() else: if options["verbose"]: write_message("No conversion success for %s.\n" % (url_direct), sys.stderr) # delete temp files (they might not exist): try: os.unlink(tmp_name) os.unlink(tmp_name + ".txt") except StandardError: write_message("Error: Could not delete file. It didn't exist", sys.stderr) if options["verbose"] >= 2: write_message(".... processing %s from %s ended" % (ext,url_direct)) if options["verbose"] >= 2: write_message("... reading fulltext files from %s ended" % url_indirect) return words.keys() # tagToFunctions mapping. It offers an indirection level necesary for # indexing fulltext. The default is get_words_from_phrase tagToWordsFunctions = {'8564_u':get_words_from_fulltext} def get_words_from_phrase(phrase, split=string.split): - "Returns list of words from phrase 'phrase'." + """Return list of words found in PHRASE. Note that the phrase is + split into groups depending on the alphanumeric characters and + punctuation characters definition present in the config file. + """ words = {} - # chars_punctuation and chars_alphanumericseparators from config - if cfg_remove_html_code and string.find(phrase, " -1: - #Most likely html, remove html code phrase = sre_html.sub(' ', phrase) - phrase = str.lower(phrase) # 1st split phrase into blocks according to whitespace for block in split(strip_accents(phrase)): # 2nd remove leading/trailing punctuation and add block: block = sre_block_punctuation_begin.sub("", block) block = sre_block_punctuation_end.sub("", block) if block: - block = wash_word(block) + block = apply_stemming_and_stopwords_and_length_check(block) if block: words[block] = 1 # 3rd break each block into subblocks according to punctuation and add subblocks: for subblock in sre_punctuation.split(block): - subblock = wash_word(subblock) + subblock = apply_stemming_and_stopwords_and_length_check(subblock) if subblock: words[subblock] = 1 # 4th break each subblock into alphanumeric groups and add groups: for alphanumeric_group in sre_separators.split(subblock): - alphanumeric_group = wash_word(alphanumeric_group) + alphanumeric_group = apply_stemming_and_stopwords_and_length_check(alphanumeric_group) if alphanumeric_group: - words[alphanumeric_group] = 1 - + words[alphanumeric_group] = 1 return words.keys() -def wash_word(word): - if not is_stopword(word): - stemmed = stem(word=word) - if len(stemmed) >= cfg_min_word_length: - return stemmed - return "" +def apply_stemming_and_stopwords_and_length_check(word): + """Return WORD after applying stemming and stopword and length checks. + See the config file in order to influence these. + """ + # stem word, when configured so: + if cfg_stemmer_default_language != None: + word = stem(word, cfg_stemmer_default_language) + # now check against stopwords: + if is_stopword(word): + return "" + # finally check the word length: + if len(word) < cfg_min_word_length: + return "" + return word def remove_subfields(s): "Removes subfields from string, e.g. 'foo $$c bar' becomes 'foo bar'." return sre_subfields.sub(' ', s) def get_index_id(indexname): """Returns the words/phrase index id for INDEXNAME. Returns empty string in case there is no words table for this index. Example: field='author', output=4.""" out = 0 query = """SELECT w.id FROM idxINDEX AS w WHERE w.name='%s' LIMIT 1""" % indexname res = run_sql(query, None, 1) if res: out = res[0][0] return out def get_index_tags(indexname): """Returns the list of tags that are indexed inside INDEXNAME. Returns empty list in case there are no tags indexed in this index. Note: uses get_field_tags() defined before. Example: field='author', output=['100__%', '700__%'].""" out = [] query = """SELECT f.code FROM idxINDEX AS w, idxINDEX_field AS wf, field AS f WHERE w.name='%s' AND w.id=wf.id_idxINDEX AND f.id=wf.id_field""" % indexname res = run_sql(query) for row in res: out.extend(get_field_tags(row[0])) return out def get_all_indexes(): """Returns the list of the names of all defined words indexes. Returns empty list in case there are no tags indexed in this index. Example: output=['global', 'author'].""" out = [] query = """SELECT name FROM idxINDEX""" res = run_sql(query) for row in res: out.append(row[0]) return out def usage(code, msg=''): "Prints usage for this module." if msg: sys.stderr.write("Error: %s.\n" % msg) print >> sys.stderr, \ """ Usage: %s [options] Examples: %s -a -i 234-250,293,300-500 -u admin@cdsware %s -a -w author,fulltext -M 8192 -v3 %s -d -m +4d -A on --flush=10000 Indexing options: -a, --add add or update words for selected records -d, --del delete words for selected records -i, --id=low[-high] select according to doc recID -m, --modified=from[,to] select according to modification date -c, --collection=c1[,c2] select according to collection Repairing options: -k, --check check consistency for all records in the table(s) -r, --repair try to repair all records in the table(s) Specific options: -w, --windex=w1[,w2] word/phrase indexes to consider (all) -M, --maxmem=XXX maximum memory usage in kB (no limit) -f, --flush=NNN full consistent table flush after NNN records (10000) Scheduling options: -u, --user=USER user name to store task, password needed -s, --sleeptime=SLEEP time after which to repeat tasks (no) e.g.: 1s, 30m, 24h, 7d -t, --time=TIME moment for the task to be active (now) e.g.: +15s, 5m, 3h , 2002-10-27 13:57:26 General options: -h, --help print this help and exit -V, --version print version and exit -v, --verbose=LEVEL verbose level (from 0 to 9, default 1) """ % ((sys.argv[0],) * 4) sys.exit(code) def authenticate(user, header="BibIndex Task Submission", action="runbibindex"): """Authenticate the user against the user database. Check for its password, if it exists. Check for action access rights. Return user name upon authorization success, do system exit upon authorization failure. """ print header print "=" * len(header) if user == "": print >> sys.stdout, "\rUsername: ", user = string.strip(string.lower(sys.stdin.readline())) else: print >> sys.stdout, "\rUsername: ", user ## first check user pw: res = run_sql("select id,password from user where email=%s", (user,), 1) if not res: print "Sorry, %s does not exist." % user sys.exit(1) else: (uid_db, password_db) = res[0] if password_db: password_entered = getpass.getpass() if password_db == password_entered: pass else: print "Sorry, wrong credentials for %s." % user sys.exit(1) ## secondly check authorization for the action: (auth_code, auth_message) = acc_authorize_action(uid_db, action) if auth_code != 0: print auth_message sys.exit(1) return user def split_ranges(parse_string): recIDs = [] ranges = string.split(parse_string, ",") for range in ranges: tmp_recIDs = string.split(range, "-") if len(tmp_recIDs)==1: recIDs.append([int(tmp_recIDs[0]), int(tmp_recIDs[0])]) else: if int(tmp_recIDs[0]) > int(tmp_recIDs[1]): # sanity check tmp = tmp_recIDs[0] tmp_recIDs[0] = tmp_recIDs[1] tmp_recIDs[1] = tmp recIDs.append([int(tmp_recIDs[0]), int(tmp_recIDs[1])]) return recIDs def get_word_tables(tables): wordTables = [] if tables: indexes = string.split(tables, ",") for index in indexes: index_id = get_index_id(index) if index_id: wordTables.append({"idxWORD%02dF" % index_id: \ get_index_tags(index)}) else: write_message("Error: There is no %s words table." % index, sys.stderr) else: for index in get_all_indexes(): index_id = get_index_id(index) wordTables.append({"idxWORD%02dF" % index_id: \ get_index_tags(index)}) return wordTables def get_date_range(var): "Returns the two dates contained as a low,high tuple" limits = string.split(var, ",") if len(limits)==1: low = get_datetime(limits[0]) return low,None if len(limits)==2: low = get_datetime(limits[0]) high = get_datetime(limits[1]) return low,high def get_datetime(var, format_string="%Y-%m-%d %H:%M:%S"): """Returns a date string according to the format string. It can handle normal date strings and shifts with respect to now.""" date = time.time() factors = {"d":24*3600, "h":3600, "m":60, "s":1} m = sre_datetime_shift.match(var) if m: sign = m.groups()[0] == "-" and -1 or 1 factor = factors[m.groups()[2]] value = float(m.groups()[1]) date = time.localtime(date + sign * factor * value) date = time.strftime(format_string, date) else: date = time.strptime(var, format_string) date = time.strftime(format_string, date) return date def create_range_list(res): """Creates a range list from a recID select query result contained in res. The result is expected to have ascending numerical order.""" if not res: return [] row = res[0] if not row: return [] else: range_list = [[row[0],row[0]]] for row in res[1:]: id = row[0] if id == range_list[-1][1] + 1: range_list[-1][1] = id else: range_list.append([id,id]) return range_list def beautify_range_list(range_list): """Returns a non overlapping, maximal range list""" ret_list = [] for new in range_list: found = 0 for old in ret_list: if new[0] <= old[0] <= new[1] + 1 or new[0] - 1 <= old[1] <= new[1]: old[0] = min(old[0], new[0]) old[1] = max(old[1], new[1]) found = 1 break if not found: ret_list.append(new) return ret_list def serialize_via_numeric_array(arr): """Serialize Numeric array into a compressed string.""" return compress(Numeric.dumps(arr)) def deserialize_via_numeric_array(string): """Decompress and deserialize string into a Numeric array.""" return Numeric.loads(decompress(string)) def serialize_via_marshal(obj): """Serialize Python object via marshal into a compressed string.""" return MySQLdb.escape_string(compress(dumps(obj))) def deserialize_via_marshal(string): """Decompress and deserialize string into a Python object via marshal.""" return loads(decompress(string)) class WordTable: "A class to hold the words table." def __init__(self, tablename, fields_to_index, separators="[^\s]"): "Creates words table instance." self.tablename = tablename self.recIDs_in_mem = [] self.fields_to_index = fields_to_index self.separators = separators self.value = {} def get_field(self, recID, tag): """Returns list of values of the MARC-21 'tag' fields for the record 'recID'.""" out = [] bibXXx = "bib" + tag[0] + tag[1] + "x" bibrec_bibXXx = "bibrec_" + bibXXx query = """SELECT value FROM %s AS b, %s AS bb WHERE bb.id_bibrec=%s AND bb.id_bibxxx=b.id AND tag LIKE '%s'""" % (bibXXx, bibrec_bibXXx, recID, tag); res = run_sql(query) for row in res: out.append(row[0]) return out def clean(self): "Cleans the words table." self.value={} def put_into_db(self, mode="normal", split=string.split): """Updates the current words table in the corresponding MySQL's idxFOO table. Mode 'normal' means normal execution, mode 'emergency' means words index reverting to old state. """ if options["verbose"]: write_message("%s %s wordtable flush started" % (self.tablename,mode)) write_message('...updating %d words into %s started' % \ (len(self.value), self.tablename)) task_update_progress("%s flushed %d/%d words" % (self.tablename, 0, len(self.value))) self.recIDs_in_mem = beautify_range_list(self.recIDs_in_mem) if mode == "normal": for group in self.recIDs_in_mem: query = """UPDATE %sR SET type='TEMPORARY' WHERE id_bibrec BETWEEN '%d' AND '%d' AND type='CURRENT'""" % \ (self.tablename[:-1], group[0], group[1]) if options["verbose"] >= 9: write_message(query) run_sql(query) nb_words_total = len(self.value) nb_words_report = int(nb_words_total/10) nb_words_done = 0 for word in self.value.keys(): self.put_word_into_db(word) nb_words_done += 1 if nb_words_report!=0 and ((nb_words_done % nb_words_report) == 0): if options["verbose"]: write_message('......processed %d/%d words' % (nb_words_done, nb_words_total)) task_update_progress("%s flushed %d/%d words" % (self.tablename, nb_words_done, nb_words_total)) if options["verbose"] >= 9: write_message('...updating %d words into %s ended' % \ (nb_words_total, self.tablename)) if options["verbose"]: write_message('...updating reverse table %sR started' % self.tablename[:-1]) if mode == "normal": for group in self.recIDs_in_mem: query = """UPDATE %sR SET type='CURRENT' WHERE id_bibrec BETWEEN '%d' AND '%d' AND type='FUTURE'""" % \ (self.tablename[:-1], group[0], group[1]) if options["verbose"] >= 9: write_message(query) run_sql(query) query = """DELETE FROM %sR WHERE id_bibrec BETWEEN '%d' AND '%d' AND type='TEMPORARY'""" % \ (self.tablename[:-1], group[0], group[1]) if options["verbose"] >= 9: write_message(query) run_sql(query) if options["verbose"] >= 9: write_message('End of updating wordTable into %s' % self.tablename) elif mode == "emergency": for group in self.recIDs_in_mem: query = """UPDATE %sR SET type='CURRENT' WHERE id_bibrec BETWEEN '%d' AND '%d' AND type='TEMPORARY'""" % \ (self.tablename[:-1], group[0], group[1]) if options["verbose"] >= 9: write_message(query) run_sql(query) query = """DELETE FROM %sR WHERE id_bibrec BETWEEN '%d' AND '%d' AND type='FUTURE'""" % \ (self.tablename[:-1], group[0], group[1]) if options["verbose"] >= 9: write_message(query) run_sql(query) if options["verbose"] >= 9: write_message('End of emergency flushing wordTable into %s' % self.tablename) if options["verbose"]: write_message('...updating reverse table %sR ended' % self.tablename[:-1]) self.clean() self.recIDs_in_mem = [] if options["verbose"]: write_message("%s %s wordtable flush ended" % (self.tablename, mode)) task_update_progress("%s flush ended" % (self.tablename)) def load_old_recIDs(self,word): """Load existing hitlist for the word from the database index files.""" query = "SELECT hitlist FROM %s WHERE term=%%s" % self.tablename res = run_sql(query, (word,)) if res: return deserialize_via_numeric_array(res[0][0]) else: return None def merge_with_old_recIDs(self,word,set): """Merge the system numbers stored in memory (hash of recIDs with value +1 or -1 according to whether to add/delete them) with those stored in the database index and received in set universe of recIDs for the given word. Return 0 in case no change was done to SET, return 1 in case SET was changed. """ set_changed_p = 0 for recID,sign in self.value[word].items(): if sign == -1 and set[recID]==1: # delete recID if existent in set and if marked as to be deleted set[recID] = 0 set_changed_p = 1 elif set[recID] == 0: # add recID if not existent in set and if marked as to be added set[recID] = 1 set_changed_p = 1 return set_changed_p def put_word_into_db(self, word, split=string.split): """Flush a single word to the database and delete it from memory""" set = self.load_old_recIDs(word) if set: # merge the word recIDs found in memory: if self.merge_with_old_recIDs(word,set) == 0: # nothing to update: if options["verbose"] >= 9: write_message("......... unchanged hitlist for ``%s''" % word) pass else: # yes there were some new words: if options["verbose"] >= 9: write_message("......... updating hitlist for ``%s''" % word) run_sql("UPDATE %s SET hitlist=%%s WHERE term=%%s" % self.tablename, (serialize_via_numeric_array(set), word)) else: # the word is new, will create new set: set = Numeric.zeros(cfg_max_recID+1, Numeric.Int0) Numeric.put(set, self.value[word].keys(), 1) if options["verbose"] >= 9: write_message("......... inserting hitlist for ``%s''" % word) run_sql("INSERT INTO %s (term, hitlist) VALUES (%%s, %%s)" % self.tablename, (word, serialize_via_numeric_array(set))) if not set: # never store empty words run_sql("DELETE from %s WHERE term=%%s" % self.tablename, (word,)) del self.value[word] def display(self): "Displays the word table." keys = self.value.keys() keys.sort() for k in keys: if options["verbose"]: write_message("%s: %s" % (k, self.value[k])) def count(self): "Returns the number of words in the table." return len(self.value) def info(self): "Prints some information on the words table." if options["verbose"]: write_message("The words table contains %d words." % self.count()) def lookup_words(self, word=""): "Lookup word from the words table." if not word: done = 0 while not done: try: word = raw_input("Enter word: ") done = 1 except (EOFError, KeyboardInterrupt): return if self.value.has_key(word): if options["verbose"]: write_message("The word '%s' is found %d times." \ % (word, len(self.value[word]))) else: if options["verbose"]: write_message("The word '%s' does not exist in the word file."\ % word) def update_last_updated(self, starting_time=None): """Update last_updated column of the index table in the database. Puts starting time there so that if the task was interrupted for record download, the records will be reindexed next time.""" if starting_time is None: return None if options["verbose"] >= 9: write_message("updating last_updated to %s...", starting_time) return run_sql("UPDATE idxINDEX SET last_updated=%s WHERE id=%s", (starting_time, self.tablename[len("idxWORD"):-1],)) def add_recIDs(self, recIDs): """Fetches records which id in the recIDs range list and adds them to the wordTable. The recIDs range list is of the form: [[i1_low,i1_high],[i2_low,i2_high], ..., [iN_low,iN_high]]. """ global chunksize flush_count = 0 records_done = 0 records_to_go = 0 for range in recIDs: records_to_go = records_to_go + range[1] - range[0] + 1 time_started = time.time() # will measure profile time for range in recIDs: i_low = range[0] chunksize_count = 0 while i_low <= range[1]: # calculate chunk group of recIDs and treat it: i_high = min(i_low+options["flush"]-flush_count-1,range[1]) i_high = min(i_low+chunksize-chunksize_count-1, i_high) try: self.chk_recID_range(i_low, i_high) except StandardError, e: write_message("Exception caught: %s" % e, sys.stderr) if options["verbose"] >= 9: traceback.print_tb(sys.exc_info()[2]) task_update_status("ERROR") task_sig_stop_commands() sys.exit(1) if options["verbose"]: write_message("%s adding records #%d-#%d started" % \ (self.tablename, i_low, i_high)) if cfg_check_mysql_threads: kill_sleepy_mysql_threads() task_update_progress("%s adding recs %d-%d" % (self.tablename, i_low, i_high)) self.del_recID_range(i_low, i_high) just_processed = self.add_recID_range(i_low, i_high) flush_count = flush_count + i_high - i_low + 1 chunksize_count = chunksize_count + i_high - i_low + 1 records_done = records_done + just_processed if options["verbose"]: write_message("%s adding records #%d-#%d ended " % \ (self.tablename, i_low, i_high)) if chunksize_count >= chunksize: chunksize_count = 0 # flush if necessary: if flush_count >= options["flush"]: self.put_into_db() self.clean() if options["verbose"]: write_message("%s backing up" % (self.tablename)) flush_count = 0 self.log_progress(time_started,records_done,records_to_go) # iterate: i_low = i_high + 1 if flush_count > 0: self.put_into_db() self.log_progress(time_started,records_done,records_to_go) def add_date(self, date): # If date is not set, then retrieve it from the database. # Reindex all formats newer than the modification date if not date: id = self.tablename[len("bibindex"):] query = """SELECT last_updated FROM idxINDEX WHERE id='%s' """ % id res = run_sql(query) if not res: return if not res[0][0]: date = "0000-00-00" else: date = res[0][0] query = """SELECT b.id FROM bibrec AS b WHERE b.modification_date >= '%s' ORDER BY b.id ASC""" % date res = run_sql(query) list = create_range_list(res) if not list: if options["verbose"]: write_message( "No new records added. %s is up to date" % self.tablename) else: self.add_recIDs(list) def add_recID_range(self, recID1, recID2): empty_list_string = serialize_via_marshal([]) wlist = {} self.recIDs_in_mem.append([recID1,recID2]) # secondly fetch all needed tags: for tag in self.fields_to_index: if tag in tagToWordsFunctions.keys(): get_words_function = tagToWordsFunctions[ tag ] else: get_words_function = get_words_from_phrase bibXXx = "bib" + tag[0] + tag[1] + "x" bibrec_bibXXx = "bibrec_" + bibXXx query = """SELECT bb.id_bibrec,b.value FROM %s AS b, %s AS bb WHERE bb.id_bibrec BETWEEN %d AND %d AND bb.id_bibxxx=b.id AND tag LIKE '%s'""" % (bibXXx, bibrec_bibXXx, recID1, recID2, tag) res = run_sql(query) nb_total_to_read = len(res) verbose_idx = 0 # for verbose pretty printing for row in res: recID,phrase = row if not wlist.has_key(recID): wlist[recID] = [] new_words = get_words_function(phrase) # ,self.separators wlist[recID] = list_union(new_words,wlist[recID]) # were there some words for these recIDs found? if len(wlist) == 0: return 0 recIDs = wlist.keys() for recID in recIDs: # was this record marked as deleted? if "DELETED" in self.get_field(recID, "980__c"): wlist[recID] = [] if options["verbose"] >= 9: write_message("... record %d was declared deleted, removing its word list" % recID) if options["verbose"] >= 9: write_message("... record %d, termlist: %s" % (recID, wlist[recID])) # Using cStringIO for speed. query_factory = cStringIO.StringIO() qwrite = query_factory.write qwrite( "INSERT INTO %sR (id_bibrec,termlist,type) VALUES" % self.tablename[:-1]) qwrite( "('" ) qwrite( str(recIDs[0]) ) qwrite( "','" ) qwrite( serialize_via_marshal(wlist[recIDs[0]]) ) qwrite( "','FUTURE')" ) for recID in recIDs[1:]: qwrite(",('") qwrite(str(recID)) qwrite("','") qwrite(serialize_via_marshal(wlist[recID])) qwrite("','FUTURE')") query = query_factory.getvalue() query_factory.close() run_sql(query) query_factory = cStringIO.StringIO() qwrite = query_factory.write qwrite("INSERT INTO %sR (id_bibrec,termlist,type) VALUES" % self.tablename[:-1]) qwrite("('") qwrite(str(recIDs[0])) qwrite("','") qwrite(serialize_via_marshal(wlist[recIDs[0]])) qwrite("','CURRENT')") for recID in recIDs[1:]: qwrite( ",('" ) qwrite( str(recID) ) qwrite( "','" ) qwrite( empty_list_string ) qwrite( "','CURRENT')" ) query = query_factory.getvalue() query_factory.close() try: run_sql(query) except MySQLdb.DatabaseError: # ok, we tried to add an existent record. No problem pass put = self.put for recID in recIDs: for w in wlist[recID]: put(recID, w, 1) return len(recIDs) def log_progress(self, start, done, todo): """Calculate progress and store it. start: start time, done: records processed, todo: total number of records""" time_elapsed = time.time() - start # consistency check if time_elapsed == 0 or done > todo: return time_recs_per_min = done/(time_elapsed/60.0) if options["verbose"]: write_message("%d records took %.1f seconds to complete.(%1.f recs/min)"\ % (done, time_elapsed, time_recs_per_min)) if time_recs_per_min: if options["verbose"]: write_message("Estimated runtime: %.1f minutes" % \ ((todo-done)/time_recs_per_min)) def put(self, recID, word, sign): "Adds/deletes a word to the word list." try: word = lower(word[:50]) if self.value.has_key(word): # the word 'word' exist already: update sign self.value[word][recID] = sign else: self.value[word] = {recID: sign} except: write_message("Error: Cannot put word %s with sign %d for recID %s." % (word, sign, recID)) def del_recIDs(self, recIDs): """Fetches records which id in the recIDs range list and adds them to the wordTable. The recIDs range list is of the form: [[i1_low,i1_high],[i2_low,i2_high], ..., [iN_low,iN_high]]. """ count = 0 for range in recIDs: self.del_recID_range(range[0],range[1]) count = count + range[1] - range[0] self.put_into_db() def del_recID_range(self, low, high): """Deletes records with 'recID' system number between low and high from memory words index table.""" if options["verbose"] > 2: write_message("%s fetching existing words for records #%d-#%d started" % \ (self.tablename, low, high)) self.recIDs_in_mem.append([low,high]) query = """SELECT id_bibrec,termlist FROM %sR as bb WHERE bb.id_bibrec BETWEEN '%d' AND '%d'""" % (self.tablename[:-1], low, high) recID_rows = run_sql(query) for recID_row in recID_rows: recID = recID_row[0] wlist = deserialize_via_marshal(recID_row[1]) for word in wlist: self.put(recID, word, -1) if options["verbose"] > 2: write_message("%s fetching existing words for records #%d-#%d ended" % \ (self.tablename, low, high)) def report_on_table_consistency(self): """Check reverse words index tables (e.g. idxWORD01R) for interesting states such as 'TEMPORARY' state. Prints small report (no of words, no of bad words). """ # find number of words: query = """SELECT COUNT(*) FROM %s""" % (self.tablename) res = run_sql(query, None, 1) if res: nb_words = res[0][0] else: nb_words = 0 # find number of records: query = """SELECT COUNT(DISTINCT(id_bibrec)) FROM %sR""" % (self.tablename[:-1]) res = run_sql(query, None, 1) if res: nb_records = res[0][0] else: nb_records = 0 # report stats: if options["verbose"]: write_message("%s contains %d words from %d records" % (self.tablename, nb_words, nb_records)) # find possible bad states in reverse tables: query = """SELECT COUNT(DISTINCT(id_bibrec)) FROM %sR WHERE type <> 'CURRENT'""" % (self.tablename[:-1]) res = run_sql(query) if res: nb_bad_records = res[0][0] else: nb_bad_records = 999999999 if nb_bad_records: write_message("EMERGENCY: %s needs to repair %d of %d records" % \ (self.tablename, nb_bad_records, nb_records)) else: if options["verbose"]: write_message("%s is in consistent state" % (self.tablename)) return nb_bad_records def repair(self): """Repair the whole table""" # find possible bad states in reverse tables: query = """SELECT COUNT(DISTINCT(id_bibrec)) FROM %sR WHERE type <> 'CURRENT'""" % (self.tablename[:-1]) res = run_sql(query, None, 1) if res: nb_bad_records = res[0][0] else: nb_bad_records = 0 # find number of records: query = """SELECT COUNT(DISTINCT(id_bibrec)) FROM %sR""" % (self.tablename[:-1]) res = run_sql(query) if res: nb_records = res[0][0] else: nb_records = 0 if nb_bad_records == 0: return query = """SELECT id_bibrec FROM %sR WHERE type <> 'CURRENT' ORDER BY id_bibrec""" \ % (self.tablename[:-1]) res = run_sql(query) recIDs = create_range_list(res) flush_count = 0 records_done = 0 records_to_go = 0 for range in recIDs: records_to_go = records_to_go + range[1] - range[0] + 1 time_started = time.time() # will measure profile time for range in recIDs: i_low = range[0] chunksize_count = 0 while i_low <= range[1]: # calculate chunk group of recIDs and treat it: i_high = min(i_low+options["flush"]-flush_count-1,range[1]) i_high = min(i_low+chunksize-chunksize_count-1, i_high) try: self.fix_recID_range(i_low, i_high) except StandardError, e: write_message("Exception caught: %s" % e, sys.stderr) if options["verbose"] >= 9: traceback.print_tb(sys.exc_info()[2]) task_update_status("ERROR") task_sig_stop_commands() sys.exit(1) flush_count = flush_count + i_high - i_low + 1 chunksize_count = chunksize_count + i_high - i_low + 1 records_done = records_done + i_high - i_low + 1 if chunksize_count >= chunksize: chunksize_count = 0 # flush if necessary: if flush_count >= options["flush"]: self.put_into_db("emergency") self.clean() flush_count = 0 self.log_progress(time_started,records_done,records_to_go) # iterate: i_low = i_high + 1 if flush_count > 0: self.put_into_db("emergency") self.log_progress(time_started,records_done,records_to_go) write_message("%s inconsistencies repaired." % self.tablename) def chk_recID_range(self, low, high): """Check if the reverse index table is in proper state""" ## check db query = """SELECT COUNT(*) FROM %sR WHERE type <> 'CURRENT' AND id_bibrec BETWEEN '%d' AND '%d'""" % (self.tablename[:-1], low, high) res = run_sql(query, None, 1) if res[0][0]==0: if options["verbose"]: write_message("%s for %d-%d is in consistent state"%(self.tablename,low,high)) return # okay, words table is consistent ## inconsistency detected! write_message("EMERGENCY: %s inconsistencies detected..." % self.tablename) write_message("""EMERGENCY: Errors found. You should check consistency of the %s - %sR tables.\nRunning 'bibindex --repair' is recommended.""" \ % (self.tablename, self.tablename[:-1])) raise StandardError def fix_recID_range(self, low, high): """Try to fix reverse index database consistency (e.g. table idxWORD01R) in the low,high doc-id range. Possible states for a recID follow: CUR TMP FUT: very bad things have happened: warn! CUR TMP : very bad things have happened: warn! CUR FUT: delete FUT (crash before flushing) CUR : database is ok TMP FUT: add TMP to memory and del FUT from memory flush (revert to old state) TMP : very bad things have happened: warn! FUT: very bad things have happended: warn! """ state = {} query = "SELECT id_bibrec,type FROM %sR WHERE id_bibrec BETWEEN '%d' AND '%d'"\ % (self.tablename[:-1], low, high) res = run_sql(query) for row in res: if not state.has_key(row[0]): state[row[0]]=[] state[row[0]].append(row[1]) ok = 1 # will hold info on whether we will be able to repair for recID in state.keys(): if not 'TEMPORARY' in state[recID]: if 'FUTURE' in state[recID]: if 'CURRENT' not in state[recID]: write_message("EMERGENCY: Record %d is in inconsistent state. Can't repair it" % recID) ok = 0 else: write_message("EMERGENCY: Inconsistency in record %d detected" % recID) query = """DELETE FROM %sR WHERE id_bibrec='%d'""" % (self.tablename[:-1], recID) run_sql(query) write_message("EMERGENCY: Inconsistency in record %d repaired." % recID) else: if 'FUTURE' in state[recID] and not 'CURRENT' in state[recID]: self.recIDs_in_mem.append([recID,recID]) # Get the words file query = """SELECT type,termlist FROM %sR WHERE id_bibrec='%d'""" % (self.tablename[:-1], recID) if options["verbose"] >= 9: write_message(query) res = run_sql(query) for row in res: wlist = deserialize_via_marshal(row[1]) if options["verbose"] >= 9: write_message("Words are %s " % wlist) if row[0] == 'TEMPORARY': sign = 1 else: sign = -1 for word in wlist: self.put(recID, word, sign) else: write_message("EMERGENCY: %s for %d is in inconsistent state. Couldn't repair it." % (self.tablename, recID)) ok = 0 if not ok: write_message("""EMERGENCY: Unrepairable errors found. You should check consistency of the %s - %sR tables. Deleting affected records is recommended.""" % (self.tablename, self.tablename[:-1])) raise StandardError def task_run(row): """Run the indexing task. The row argument is the BibSched task queue row, containing if, arguments, etc. Return 1 in case of success and 0 in case of failure. """ global options, task_id, wordTables, stemmer, stopwords # read from SQL row: task_id = row[0] task_proc = row[1] options = loads(row[6]) task_status = row[7] # sanity check: if task_proc != "bibindex": write_message("The task #%d does not seem to be a BibIndex task." % task_id, sys.stderr) return 0 if task_status != "WAITING": write_message("The task #%d is %s. I expected WAITING." % (task_id, task_status), sys.stderr) return 0 # we can run the task now: if options["verbose"]: write_message("Task #%d started." % task_id) task_starting_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) task_update_status("RUNNING") # install signal handlers signal.signal(signal.SIGUSR1, task_sig_sleep) signal.signal(signal.SIGTERM, task_sig_stop) signal.signal(signal.SIGABRT, task_sig_suicide) signal.signal(signal.SIGCONT, task_sig_wakeup) signal.signal(signal.SIGINT, task_sig_unknown) ## go ahead and treat each table : for table in options["windex"]: wordTable = WordTable(table.keys()[0], table.values()[0]) wordTable.report_on_table_consistency() try: if options["cmd"] == "del": if options["id"]: wordTable.del_recIDs(options["id"]) elif options["collection"]: l_of_colls = string.split(options["collection"], ",") recIDs = perform_request_search(c=l_of_colls) recIDs_range = [] for recID in recIDs: recIDs_range.append([recID,recID]) wordTable.del_recIDs(recIDs_range) else: write_message("Missing IDs of records to delete from index %s.", wordTable.tablename, sys.stderr) raise StandardError elif options["cmd"] == "add": if options["id"]: wordTable.add_recIDs(options["id"]) elif options["collection"]: l_of_colls = string.split(options["collection"], ",") recIDs = perform_request_search(c=l_of_colls) recIDs_range = [] for recID in recIDs: recIDs_range.append([recID,recID]) wordTable.add_recIDs(recIDs_range) else: wordTable.add_date(options["modified"]) # only update last_updated if run via automatic mode: wordTable.update_last_updated(task_starting_time) elif options["cmd"] == "repair": wordTable.repair() else: write_message("Invalid command found processing %s" % \ wordTable.tablename, sys.stderr) raise StandardError except StandardError, e: write_message("Exception caught: %s" % e, sys.stderr) if options["verbose"] >= 9: traceback.print_tb(sys.exc_info()[2]) task_update_status("ERROR") task_sig_stop_commands() sys.exit(1) wordTable.report_on_table_consistency() # We are done. State it in the database, close and quit task_update_status("DONE") if options["verbose"]: write_message("Task #%d finished." % task_id) return 1 def command_line(): global options long_flags =["add","del","id=","modified=","collection=", "windex=", "check","repair","maxmem=", "flush=","user=","sleeptime=", "time=","help", "version", "verbose="] short_flags ="adi:m:c:w:krM:f:u:s:t:hVv:" format_string = "%Y-%m-%d %H:%M:%S" tables = None sleeptime = "" try: opts, args = getopt.getopt(sys.argv[1:], short_flags, long_flags) except getopt.GetoptError, err: write_message(err, sys.stderr) usage(1) if args: usage(1) options={"cmd":"add", "id":[], "modified":[], "collection":[], "maxmem":0, "flush":10000, "sleeptime":0, "verbose":1 } sched_time = time.strftime(format_string) user = "" # Check for key options try: for opt in opts: if opt == ("-h","") or opt == ("--help",""): usage(1) elif opt == ("-V","") or opt == ("--version",""): print bibindex_engine_version sys.exit(1) elif opt[0] in ["--verbose", "-v"]: options["verbose"] = int(opt[1]) elif opt == ("-a","") or opt == ("--add",""): options["cmd"] = "add" if ("-x","") in opts or ("--del","") in opts: usage(1) elif opt == ("-k","") or opt == ("--check",""): options["cmd"] = "check" elif opt == ("-r","") or opt == ("--repair",""): options["cmd"] = "repair" elif opt == ("-d","") or opt == ("--del",""): options["cmd"]="del" elif opt[0] in [ "-i", "--id" ]: options["id"] = options["id"] + split_ranges(opt[1]) elif opt[0] in [ "-m", "--modified" ]: options["modified"] = get_date_range(opt[1]) elif opt[0] in [ "-c", "--collection" ]: options["collection"] = opt[1] elif opt[0] in [ "-w", "--windex" ]: tables = opt[1] elif opt[0] in [ "-M", "--maxmem"]: options["maxmem"]=int(opt[1]) if options["maxmem"] < base_process_size + 1000: raise StandardError, "Memory usage should be higher than %d kB" % (base_process_size + 1000) elif opt[0] in [ "-f", "--flush"]: options["flush"]=int(opt[1]) elif opt[0] in [ "-u", "--user"]: user = opt[1] elif opt[0] in [ "-s", "--sleeptime" ]: get_datetime(opt[1]) # see if it is a valid shift sleeptime= opt[1] elif opt[0] in [ "-t", "--time" ]: sched_time= get_datetime(opt[1]) else: usage(1) except StandardError, e: write_message(e, sys.stderr) sys.exit(1) options["windex"]=get_word_tables(tables) if options["cmd"] == "check": for table in options["windex"]: wordTable = WordTable(table.keys()[0], table.values()[0]) wordTable.report_on_table_consistency() return user = authenticate(user) if options["verbose"] >= 9: print "" write_message("storing task options %s\n" % options) new_task_id = run_sql("""INSERT INTO schTASK (proc,user,runtime,sleeptime,arguments,status) VALUES ('bibindex',%s,%s,%s,%s,'WAITING')""", (user, sched_time, sleeptime, dumps(options))) print "Task #%d was successfully scheduled for execution." % new_task_id return def task_sig_sleep(sig, frame): """Signal handler for the 'sleep' signal sent by BibSched.""" if options["verbose"] >= 9: write_message("got signal %d" % sig) write_message("sleeping...") task_update_status("SLEEPING") signal.pause() # wait for wake-up signal def task_sig_wakeup(sig, frame): """Signal handler for the 'wakeup' signal sent by BibSched.""" if options["verbose"] >= 9: write_message("got signal %d" % sig) write_message("continuing...") task_update_status("CONTINUING") def task_sig_stop(sig, frame): """Signal handler for the 'stop' signal sent by BibSched.""" if options["verbose"] >= 9: write_message("got signal %d" % sig) write_message("stopping...") task_update_status("STOPPING") errcode = 0 try: task_sig_stop_commands() write_message("stopped") task_update_status("STOPPED") except StandardError, err: write_message("Error during stopping! %e" % err) task_update_status("STOPPINGFAILED") errcode = 1 sys.exit(errcode) def task_sig_stop_commands(): """Do all the commands necessary to stop the task before quitting. Useful for task_sig_stop() handler. """ write_message("stopping commands started") for table in wordTables: table.put_into_db() write_message("stopping commands ended") def task_sig_suicide(sig, frame): """Signal handler for the 'suicide' signal sent by BibSched.""" if options["verbose"] >= 9: write_message("got signal %d" % sig) write_message("suiciding myself now...") task_update_status("SUICIDING") write_message("suicided") task_update_status("SUICIDED") sys.exit(0) def task_sig_unknown(sig, frame): """Signal handler for the other unknown signals sent by shell or user.""" if options["verbose"] >= 9: write_message("got signal %d" % sig) write_message("unknown signal %d ignored" % sig) # do nothing for other signals def task_update_progress(msg): """Updates progress information in the BibSched task table.""" global task_id, options if options["verbose"] >= 9: write_message("Updating task progress to %s." % msg) return run_sql("UPDATE schTASK SET progress=%s where id=%s", (msg, task_id)) def task_update_status(val): """Updates state information in the BibSched task table.""" global task_id, options if options["verbose"] >= 9: write_message("Updating task status to %s." % val) return run_sql("UPDATE schTASK SET status=%s where id=%s", (val, task_id)) def test_fulltext_indexing(): """Tests fulltext indexing programs on PDF, PS, DOC, PPT, XLS. Prints list of words and word table on the screen. Does not integrate anything into the database. Useful when debugging problems with fulltext indexing: call this function instead of main(). """ options = {} options["verbose"] = 9 print get_words_from_fulltext("http://doc.cern.ch/cgi-bin/setlink?base=atlnot&categ=Communication&id=com-indet-2002-012") # protected URL print get_words_from_fulltext("http://doc.cern.ch/cgi-bin/setlink?base=agenda&categ=a00388&id=a00388s2t7") # XLS print get_words_from_fulltext("http://doc.cern.ch/cgi-bin/setlink?base=agenda&categ=a02883&id=a02883s1t6/transparencies") # PPT print get_words_from_fulltext("http://doc.cern.ch/cgi-bin/setlink?base=agenda&categ=a99149&id=a99149s1t10/transparencies") # DOC print get_words_from_fulltext("http://doc.cern.ch/cgi-bin/setlink?base=preprint&categ=cern&id=lhc-project-report-601") # PDF sys.exit(0) def test_word_separators(phrase="hep-th/0101001"): """Tests word separating policy on various input.""" print "%s:" % phrase for word in get_words_from_phrase(phrase): print "\t-> %s" % word def main(): """Reads arguments and either runs the task, or starts user-interface (command line).""" if len(sys.argv) == 2: try: id = int(sys.argv[1]) except StandardError, err: command_line() sys.exit() res = run_sql("SELECT * FROM schTASK WHERE id='%d'" % (id), None, 1) if not res: write_message("Selected task not found.", sys.stderr) sys.exit(1) try: if not task_run(res[0]): write_message("Error occurred. Exiting.", sys.stderr) except StandardError, e: write_message("Unexpected error occurred: %s." % e, sys.stderr) if options["verbose"] >= 9: traceback.print_tb(sys.exc_info()[2]) write_message("Exiting.") task_update_status("ERROR") else: command_line()