diff --git a/modules/bibrank/lib/bibrank_tag_based_indexer.py b/modules/bibrank/lib/bibrank_tag_based_indexer.py index ae062bd7d..e63dc8ba4 100644 --- a/modules/bibrank/lib/bibrank_tag_based_indexer.py +++ b/modules/bibrank/lib/bibrank_tag_based_indexer.py @@ -1,945 +1,933 @@ -##Ranking of records using different parameters and methods. +## $Id$ +## Ranking of records using different parameters and methods. ## This file is part of the CERN Document Server Software (CDSware). ## Copyright (C) 2002 CERN. ## ## The CDSware is free software; you can redistribute it and/or ## modify it under the terms of the GNU General Public License as ## published by the Free Software Foundation; either version 2 of the ## License, or (at your option) any later version. ## ## The CDSware is distributed in the hope that it will be useful, but ## WITHOUT ANY WARRANTY; without even the implied warranty of ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU ## General Public License for more details. ## ## You should have received a copy of the GNU General Public License ## along with CDSware; if not, write to the Free Software Foundation, Inc., ## 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA. ## read config variables: -#include "config.wml" -#include "configbis.wml" #include "cdswmllib.wml" -## start Python: -#! # -*- coding: utf-8 -*- ## $Id$ ## DO NOT EDIT THIS FILE! IT WAS AUTOMATICALLY GENERATED FROM CDSware WML SOURCES. + __version__ = "<: print generate_pretty_version_string('$Id$'); :>" -## fill config variables: -pylibdir = "/python" - -try: - from marshal import loads,dumps - from zlib import compress,decompress - from string import split,translate,lower,upper - import getopt - import getpass - import string - import os - import sre - import sys - import time - import MySQLdb - import Numeric - import urllib - import signal - import tempfile - import unicodedata - import traceback - import cStringIO - import re - import copy - import types - import ConfigParser -except ImportError, e: - import sys - -try: - sys.path.append('%s' % pylibdir) - from cdsware.config import * - from cdsware.search_engine_config import cfg_max_recID - from cdsware.search_engine import perform_request_search, strip_accents - from cdsware.search_engine import HitSet, get_index_id, create_basic_search_units - from cdsware.dbquery import run_sql -except ImportError, e: - import sys +from marshal import loads,dumps +from zlib import compress,decompress +from string import split,translate,lower,upper +import getopt +import getpass +import string +import os +import sre +import sys +import time +import MySQLdb +import Numeric +import urllib +import signal +import tempfile +import unicodedata +import traceback +import cStringIO +import re +import copy +import types +import ConfigParser + +from config import * +from search_engine_config import cfg_max_recID +from search_engine import perform_request_search, strip_accents +from search_engine import HitSet, get_index_id, create_basic_search_units +from dbquery import run_sql options = {} def single_tag_rank_method_exec(rank_method_code, name, config): """Creating the rank method data""" startCreate = time.time() rnkset = {} rnkset_old = fromDB(rank_method_code) date = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) rnkset_new = single_tag_rank(config) rnkset = union_dicts(rnkset_old, rnkset_new) intoDB(rnkset, date, rank_method_code) def single_tag_rank(config): """Connect the given tag with the data from the kb file given""" if options["verbose"] >= 9: write_message("Loading knowledgebase file") kb_data = {} records = [] write_message("Reading knowledgebase file: %s" % config.get(config.get("rank_method", "function"), "kb_src")) input = open(config.get(config.get("rank_method", "function"), "kb_src"), 'r') data = input.readlines() for line in data: if not line[0:1] == "#": kb_data[string.strip((string.split(string.strip(line),"---"))[0])] = (string.split(string.strip(line), "---"))[1] write_message("Number of lines read from knowledgebase file: %s" % len(kb_data)) tag = config.get(config.get("rank_method", "function"),"tag") tags = split(config.get(config.get("rank_method", "function"), "check_mandatory_tags"),",") if tags == ['']: tags = "" records = [] for (recids,recide) in options["recid_range"]: write_message("......Processing records #%s-%s" % (recids, recide)) recs = run_sql("SELECT id_bibrec,value FROM bib%sx,bibrec_bib%sx WHERE tag='%s' AND id_bibxxx=id and id_bibrec >=%s and id_bibrec<=%s" % (tag[0:2], tag[0:2], tag, recids, recide)) valid = HitSet(Numeric.ones(cfg_max_recID + 1)) for key in tags: newset = HitSet() newset.addlist(run_sql("SELECT id_bibrec FROM bib%sx,bibrec_bib%sx WHERE id_bibxxx=id AND tag='%s' AND id_bibxxx=id and id_bibrec >=%s and id_bibrec<=%s" % (tag[0:2], tag[0:2], key, recids, recide))) valid.intersect(newset) if tags: recs = filter(lambda x: valid.contains(x[0]), recs) records = records + list(recs) write_message("Number of records found with the necessary tags: %s" % len(records)) records = filter(lambda x: options["validset"].contains(x[0]), records) rnkset = {} for key,value in records: if kb_data.has_key(value): if not rnkset.has_key(key): rnkset[key] = float(kb_data[value]) else: if kb_data.has_key(rnkset[key]) and float(kb_data[value]) > float((rnkset[key])[1]): rnkset[key] = float(kb_data[value]) else: rnkset[key] = 0 write_message("Number of records available in rank method: %s" % len(rnkset)) return rnkset def get_lastupdated(rank_method_code): """Get the last time the rank method was updated""" res = run_sql("SELECT rnkMETHOD.last_updated FROM rnkMETHOD WHERE name='%s'" % rank_method_code) if res: return res[0][0] else: raise Exception("Is this the first run? Please do a complete update.") def intoDB(dict, date, rank_method_code): """Insert the rank method data into the database""" id = run_sql("SELECT id from rnkMETHOD where name='%s'" % rank_method_code) del_rank_method_codeDATA(rank_method_code) run_sql("INSERT INTO rnkMETHODDATA(id_rnkMETHOD, relevance_data) VALUES ('%s','%s')" % (id[0][0], serialize_via_marshal(dict))) run_sql("UPDATE rnkMETHOD SET last_updated='%s' WHERE name='%s'" % (date, rank_method_code)) def fromDB(rank_method_code): """Get the data for a rank method""" id = run_sql("SELECT id from rnkMETHOD where name='%s'" % rank_method_code) res = run_sql("SELECT relevance_data FROM rnkMETHODDATA WHERE id_rnkMETHOD=%s" % id[0][0]) if res: return deserialize_via_marshal(res[0][0]) else: return {} def del_rank_method_codeDATA(rank_method_code): """Delete the data for a rank method""" id = run_sql("SELECT id from rnkMETHOD where name='%s'" % rank_method_code) res = run_sql("DELETE FROM rnkMETHODDATA WHERE id_rnkMETHOD=%s" % id[0][0]) def del_recids(rank_method_code, range): """Delete some records from the rank method""" id = run_sql("SELECT id from rnkMETHOD where name='%s'" % rank_method_code) res = run_sql("SELECT relevance_data FROM rnkMETHODDATA WHERE id_rnkMETHOD=%s" % id[0][0]) if res: rec_dict = deserialize_via_marshal(res[0][0]) write_message("Old size: %s" % len(rec_dict)) for (recids,recide) in range: for i in range(int(recids), int(recide)): if rec_dict.has_key(i): del rec_dict[i] write_messag("New size: %s" % len(rec_dict)) date = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) intoDB(rec_dict, date, rank_method_code) else: print "Create before deleting!" def union_dicts(dict1, dict2): "Returns union of the two dicts." union_dict = {} for (key, value) in dict1.iteritems(): union_dict[key] = value for (key, value) in dict2.iteritems(): union_dict[key] = value return union_dict def rank_method_code_statistics(rank_method_code): """Print statistics""" method = fromDB(rank_method_code) max = ('',-999999) maxcount = 0 min = ('',999999) mincount = 0 for (recID, value) in method.iteritems(): if value < min and value > 0: min = value if value > max: max = value for (recID, value) in method.iteritems(): if value == min: mincount += 1 if value == max: maxcount += 1 write_message("Showing statistic for selected method") write_message("Method name: %s" % getName(rank_method_code)) write_message("Short name: %s" % rank_method_code) write_message("Last run: %s" % get_lastupdated(rank_method_code)) write_message("Number of records: %s" % len(method)) write_message("Lowest value: %s - Number of records: %s" % (min, mincount)) write_message("Highest value: %s - Number of records: %s" % (max, maxcount)) write_message("Divided into 10 sets:") for i in range(1,11): setcount = 0 distinct_values = {} lower = -1.0 + ((float(max + 1) / 10)) * (i - 1) upper = -1.0 + ((float(max + 1) / 10)) * i for (recID, value) in method.iteritems(): if value >= lower and value <= upper: setcount += 1 distinct_values[value] = 1 write_message("Set %s (%s-%s) %s Distinct values: %s" % (i, lower, upper, len(distinct_values), setcount)) def check_method(rank_method_code): write_message("Checking rank method...") if len(fromDB(rank_method_code)) == 0: write_message("Rank method not yet executed, please run it to create the necessary data.") else: if len(add_date(rank_method_code)) > 0: write_message("Records modified, update recommended") else: write_message("No records modified, update not necessary") def write_message(msg, stream = sys.stdout): """Write message and flush output stream (may be sys.stdout or sys.stderr). Useful for debugging stuff.""" if stream == sys.stdout or stream == sys.stderr: stream.write(time.strftime("%Y-%m-%d %H:%M:%S --> ", time.localtime())) stream.write("%s\n" % msg) stream.flush() else: sys.stderr.write("Unknown stream %s. [must be sys.stdout or sys.stderr]\n" % stream) return def get_datetime(var, format_string="%Y-%m-%d %H:%M:%S"): """Returns a date string according to the format string. It can handle normal date strings and shifts with respect to now.""" date = time.time() shift_re = sre.compile("([-\+]{0,1})([\d]+)([dhms])") factors = {"d":24*3600, "h":3600, "m":60, "s":1} m = shift_re.match(var) if m: sign = m.groups()[0] == "-" and -1 or 1 factor = factors[m.groups()[2]] value = float(m.groups()[1]) date = time.localtime(date + sign * factor * value) date = time.strftime(format_string, date) else: date = time.strptime(var, format_string) date = time.strftime(format_string, date) return date def task_sig_sleep(sig, frame): """Signal handler for the 'sleep' signal sent by BibSched.""" if options["verbose"]>= 9: write_message("got signal %d" % sig) write_message("sleeping...") task_update_status("SLEEPING") signal.pause() # wait for wake-up signal def task_sig_wakeup(sig, frame): """Signal handler for the 'wakeup' signal sent by BibSched.""" if options["verbose"]>= 9: write_message("got signal %d" % sig) write_message("continuing...") task_update_status("CONTINUING") def task_sig_stop(sig, frame): """Signal handler for the 'stop' signal sent by BibSched.""" if options["verbose"]>= 9: write_message("got signal %d" % sig) write_message("stopping...") task_update_status("STOPPING") errcode = 0 try: task_sig_stop_commands() write_message("stopped") task_update_status("STOPPED") except StandardError, err: write_message("Error during stopping! %e" % err) task_update_status("STOPPINGFAILED") errcode = 1 sys.exit(errcode) def task_sig_stop_commands(): """Do all the commands necessary to stop the task before quitting. Useful for task_sig_stop() handler. """ write_message("stopping commands started") write_message("stopping commands ended") def task_sig_suicide(sig, frame): """Signal handler for the 'suicide' signal sent by BibSched.""" if options["verbose"]>= 9: write_message("got signal %d" % sig) write_message("suiciding myself now...") task_update_status("SUICIDING") write_message("suicided") task_update_status("SUICIDED") sys.exit(0) def task_sig_unknown(sig, frame): """Signal handler for the other unknown signals sent by shell or user.""" if options["verbose"]>= 9: write_message("got signal %d" % sig) write_message("unknown signal %d ignored" % sig) # do nothing for other signals def task_update_progress(msg): """Updates progress information in the BibSched task table.""" query = "UPDATE schTASK SET progress='%s' where id=%d" % (MySQLdb.escape_string(msg), task_id) if options["verbose"]>= 9: write_message(query) run_sql(query) return def task_update_status(val): """Updates state information in the BibSched task table.""" query = "UPDATE schTASK SET status='%s' where id=%d" % (MySQLdb.escape_string(val), task_id) if options["verbose"]>= 9: write_message(query) run_sql(query) return def split_ranges(parse_string): recIDs = [] ranges = string.split(parse_string, ",") for range in ranges: tmp_recIDs = string.split(range, "-") if len(tmp_recIDs)==1: recIDs.append([int(tmp_recIDs[0]), int(tmp_recIDs[0])]) else: if int(tmp_recIDs[0]) > int(tmp_recIDs[1]): # sanity check tmp = tmp_recIDs[0] tmp_recIDs[0] = tmp_recIDs[1] tmp_recIDs[1] = tmp recIDs.append([int(tmp_recIDs[0]), int(tmp_recIDs[1])]) return recIDs def bibrank_engine(row, run): """Run the indexing task. The row argument is the BibSched task queue row, containing if, arguments, etc. Return 1 in case of success and 0 in case of failure. """ try: import psyco psyco.bind(single_tag_rank) psyco.bind(single_tag_rank_method_exec) psyco.bind(serialize_via_numeric_array) psyco.bind(deserialize_via_numeric_array) #psyco.bind(authorimpact_exec) #psyco.bind(merge_exec) #psyco.bind(citationimpact_exec) psyco.bind(accessimpact_exec) except StandardError, e: print "Psyco ERROR",e startCreate = time.time() global options, task_id task_id = row[0] task_proc = row[1] options = loads(row[6]) task_starting_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) signal.signal(signal.SIGUSR1, task_sig_sleep) signal.signal(signal.SIGTERM, task_sig_stop) signal.signal(signal.SIGABRT, task_sig_suicide) signal.signal(signal.SIGCONT, task_sig_wakeup) signal.signal(signal.SIGINT, task_sig_unknown) sets = {} try: options["run"] = [] options["run"].append(run) for rank_method_code in options["run"]: cfg_name = getName(rank_method_code) if options["verbose"] >= 0: write_message("Running rank method: %s." % cfg_name) file = etcdir + "/bibrank/" + rank_method_code + ".cfg" config = ConfigParser.ConfigParser() try: config.readfp(open(file)) except StandardError, e: write_message("Cannot find configurationfile: %s" % file, sys.stderr) raise StandardError cfg_short = rank_method_code cfg_function = config.get("rank_method", "function") + "_exec" cfg_name = getName(cfg_short) options["validset"] = get_valid_range(rank_method_code) if options["collection"]: l_of_colls = string.split(options["collection"], ",") recIDs = perform_request_search(c=l_of_colls) recIDs_range = [] for recID in recIDs: recIDs_range.append([recID,recID]) options["recid_range"] = recIDs_range elif options["id"]: options["recid_range"] = options["id"] elif options["modified"]: options["recid_range"] = add_date(rank_method_code, options["modified"]) elif options["last_updated"]: options["recid_range"] = add_date(rank_method_code) else: if options["verbose"] > 1: write_message("No records specified, updating all") min_id = run_sql("SELECT min(id) from bibrec")[0][0] max_id = run_sql("SELECT max(id) from bibrec")[0][0] options["recid_range"] = [[min_id, max_id]] if options["quick"] == "no" and options["verbose"] >= 9: write_message("Rebalance not yet enabled, parameter ignored.") if options["cmd"] == "del": del_recids(cfg_short, options["recid_range"]) elif options["cmd"] == "add": func_object = globals().get(cfg_function) func_object(rank_method_code, cfg_name, config) elif options["cmd"] == "stat": rank_method_code_statistics(rank_method_code) elif options["cmd"] == "check": check_method(rank_method_code) else: write_message("Invalid command found processing %s" % rank_method_code, sys.stderr) raise StandardError except StandardError, e: write_message("\nException caught: %s" % e, sys.stderr) if options["verbose"] >= 9: traceback.print_tb(sys.exc_info()[2]) raise StandardError if options["verbose"]: showtime((time.time() - startCreate)) return 1 def get_valid_range(rank_method_code): """Return a range of records""" if options["verbose"] >=9: write_message("Getting records from collections enabled for rank method.") res = run_sql("SELECT collection.name FROM collection,collection_rnkMETHOD,rnkMETHOD WHERE collection.id=id_collection and id_rnkMETHOD=rnkMETHOD.id and rnkMETHOD.name='%s'" % rank_method_code) l_of_colls = [] for coll in res: l_of_colls.append(coll[0]) if len(l_of_colls) > 0: recIDs = perform_request_search(c=l_of_colls) else: recIDs = [] valid = HitSet() valid.addlist(recIDs) return valid def add_date(rank_method_code, date=""): """If date is not set, then retrieve it from the database. Reindex all formats newer than the modification date""" if not date: try: date = (get_lastupdated(rank_method_code),'') except Exception, e: date = "0000-00-00 00:00:00" query = """SELECT b.id FROM bibrec AS b WHERE b.modification_date >= '%s'""" % date[0] if date[1]: query += "and b.modification_date <= '%s'" % date[1] query += "ORDER BY b.id ASC""" res = run_sql(query) list = create_range_list(res) if not list: if options["verbose"]: write_message("No new records added since last time method was run") return list def getName(rank_method_code, ln=cdslang, type='ln'): """Returns the name of the method if it exists""" try: rnkid = run_sql("SELECT id FROM rnkMETHOD where name='%s'" % rank_method_code) if rnkid: rnkid = str(rnkid[0][0]) res = run_sql("SELECT value FROM rnkMETHODNAME where type='%s' and ln='%s' and id_rnkMETHOD=%s" % (type, ln, rnkid)) if not res: res = run_sql("SELECT value FROM rnkMETHODNAME WHERE ln='%s' and id_rnkMETHOD=%s and type='%s'" % (cdslang, rnkid, type)) if not res: return rank_method_code return res[0][0] else: raise Exception except Exception, e: write_message("Cannot run rank method, either given code for method is wrong, or it has not been added using the webinterface.") raise Exception def create_range_list(res): """Creates a range list from a recID select query result contained in res. The result is expected to have ascending numerical order.""" if not res: return [] row = res[0] if not row: return [] else: range_list = [[row[0],row[0]]] for row in res[1:]: id = row[0] if id == range_list[-1][1] + 1: range_list[-1][1] = id else: range_list.append([id,id]) return range_list def single_tag_rank_method(row, run): return bibrank_engine(row, run) def serialize_via_numeric_array_dumps(arr): return Numeric.dumps(arr) def serialize_via_numeric_array_compr(str): return compress(str) def serialize_via_numeric_array_escape(str): return MySQLdb.escape_string(str) def serialize_via_numeric_array(arr): """Serialize Numeric array into a compressed string.""" return serialize_via_numeric_array_escape(serialize_via_numeric_array_compr(serialize_via_numeric_array_dumps(arr))) def deserialize_via_numeric_array(string): """Decompress and deserialize string into a Numeric array.""" return Numeric.loads(decompress(string)) def serialize_via_marshal(obj): """Serialize Python object via marshal into a compressed string.""" return MySQLdb.escape_string(compress(dumps(obj))) def deserialize_via_marshal(string): """Decompress and deserialize string into a Python object via marshal.""" return loads(decompress(string)) def accessimpact_exec(rank_method_code, name, config): """Generating rankset based on number of downloads per document""" startCreate = time.time() options["dbhost"] = config.get("accessimpact", "dbhost") options["dbname"] = config.get("accessimpact", "dbname") options["dbuser"] = config.get("accessimpact", "dbuser") options["dbpass"] = config.get("accessimpact", "dbpass") date = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) sysno_tag = config.get("accessimpact", "sysnr_tag") curr_repnr_tag = config.get("accessimpact", "curr_tag") old_repnr_tag = config.get("accessimpact", "old_tag") impacc = {} if 1: #not options["modified"]: imprec = run_sql2("SELECT imprecno,base,bsysno,bref FROM imprec") impacc = dict(run_sql2("SELECT imprecno,SUM(nbaccess) FROM impacc group BY imprecno")) cdssysno = run_sql("SELECT value,id_bibrec FROM bib%sx,bibrec_bib%sx WHERE tag='%s' AND id_bibxxx=id" % (sysno_tag[0:2], sysno_tag[0:2], sysno_tag)) else: fromDB(starset) impacc = {} if options["verbose"] >= 9: write_message("Updating records modified after: %s" % options["modified"]) pre_impacc = dict(run_sql2("SELECT distinct imprecno,'' FROM impacc WHERE sdate >=%s", (options["modified"],))) imprec = [] cdssysno = [] for key in pre_impacc.keys(): test_impacc = run_sql2("SELECT imprecno,SUM(nbaccess) FROM impacc WHERE imprecno=%s GROUP BY imprecno", (key,)) impacc[test_impacc[0][0]] = test_impacc[0][1] data = run_sql2("SELECT imprecno,base,bsysno,bref FROM imprec WHERE imprecno=%s", (key,)) if data: data2 = run_sql2("SELECT imprecno FROM imprec WHERE bsysn=%s", (data[0][2],)) for key2 in data2: imprec.append((key2, data[0][1], data[0][2], data[0][3])) sysno = '0' * (9 - len(str(data[0][2]))) + str(data[0][2]) + data[0][1][0:3] data = run_sql("SELECT value,id_bibrec FROM bib%sx,bibrec_bib%sx WHERE tag='%s' AND id_bibxxx=id AND value='%s'" % sysno_tag[0:2], sysno_tag [0:2], sysno_tag, sysno) for key2,value in data: cdssysno.append((key2, value)) tempdict = {} for value,key in cdssysno: if not tempdict.has_key(value): tempdict[value] = [key] else: tempdict[value] = tempdict[value] + [key] tempdoc = {} count = 0 notcount = 0 for key, base, bsysno, value in imprec: if impacc.has_key(key): sysno = '0' * (9 - len(str(bsysno))) + str(bsysno) + base[0:3] data = () if tempdict.has_key(sysno): data = tempdict[sysno] else: data = run_sql("SELECT id_bibrec FROM bib%sx,bibrec_bib%sx WHERE tag='%s' AND id_bibxxx=id AND value='%s'" % (curr_repnr_tag[0:2], curr_repnr_tag[0:2], curr_repnr_tag, value)) if len(data) == 0: data = run_sql("SELECT id_bibrec FROM bib%sx,bibrec_bib%sx WHERE tag='%s' AND id_bibxxx=id AND value='%s'" % (old_repnr_tag[0:2], old_repnr_tag[0:2], old_repnr_tag, value)) if len(data) != 0: count = count + int(impacc[key]) for key2 in range(0, len(data)): if type(data[key2]) is tuple: key3 = data[key2][0] else: key3 = data[key2] if tempdoc.has_key(key3): tempdoc[key3] = int(tempdoc[key3] + float(impacc[key])) else: tempdoc[key3] = int(impacc[key]) else: notcount = notcount + int(impacc[key]) if options["verbose"] >= 9: try: write_message("Percentage of accesses matched with a record: %s%%,(%s/%s)" % (round((float(count) / float(count+notcount)) * 100, 3), notcount, count)) write_message("Number of records available in rank method: %s" % len(tempdoc)) except: print count, notcount intoDB(tempdoc, date, rank_method_code) #------------------------------------------------------------ #---------------BELOW IS OLD CODE, NOT WORKING ATM----------- #------------------------------------------------------------ def citationimpact_exec(rank_method_code, name, config): """Calculates rankset based on number of citations each document has""" startCreate = time.time() tempdoc = {} if options["verbose"] >= 1: write_message("Running: %s." % name) citation_tag = config.get("citationimpact", "citation_tag") curr_repnr_tag = config.get("citationimpact", "curr_tag") old_repnr_tag = config.get("citationimpact", "old_tag") if not options["modified"]: if options["verbose"] >= 9: write_message("Rebalancing") starset.setLastUpdated(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())) citations = run_sql("SELECT id_bibrec,value FROM bib%sx,bibrec_bib%sx WHERE tag='%s' AND id_bibxxx=id" % (citation_tag[0:2], citation_tag[0:2], citation_tag)) else: fromDB(starset) if options["modified"] == "last_updated": options["modified"] = starset.getLastUpdated() if options["verbose"] >= 9: write_message("Updating records modified after: %s" % options["modified"]) starset.setLastUpdated(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())) mod_data = run_sql("SELECT id FROM bibrec WHERE modification_date >=%s", (options["modified"]),) for id in mod_data: citation = run_sql("SELECT id_bibrec,value FROM bib%sx,bibrec_bib%sx WHERE tag='%s' AND id_bibxxx=id and id_bibrec=%s" % (citation_tag[0:2], citation_tag[0:2], citation_tag, id)) for id,value in citation: citations.append((id,value)) for key,value in citations: data = run_sql("SELECT id_bibrec FROM bib%sx,bibrec_bib%sx WHERE tag='%s' AND id_bibxxx=id AND value='%s'" % (curr_repnr_tag[0:2], curr_repnr_tag[0:2], curr_repnr_tag, value)) data = run_sql("SELECT id_bibrec FROM bib%sx,bibrec_bib%sx WHERE tag='%s' AND id_bibxxx=id AND value='%s'" % (old_repnr_tag[0:2], old_repnr_tag[0:2], old_repnr_tag, value)) if not options["modified"]: starset.setUnsorted(tempdoc) sort(starset) else: merge_two_sets(tempdoc, starset) #intoDB(starset) showtime((time.time() - startCreate)) def authorimpact_exec(rank_method_code, starsets, config): """Calculating the rankvalue a document has based on its authors""" startCreate = time.time() starset = starsets[rank_method_code] if options["verbose"] >= 1: write_message("Running: %s" % starset.getName()) tempdoc = single_tag_rank(starset, config) Auth1 = {} documents2 = {} authors = {} upd_authors = [] sql_src = [] p_author_tag = config.get("authorimpact", "primary_tag") s_author_tag = config.get("authorimpact", "secondary_tag") sql_src.append("SELECT id_bibrec,value FROM bib%sx,bibrec_bib%sx WHERE tag='%s' AND id_bibxxx=id" % (p_author_tag[0:2], p_author_tag[0:2], p_author_tag)) sql_src.append("SELECT id_bibrec,value FROM bib%sx,bibrec_bib%sx WHERE tag='%s' AND id_bibxxx=id" % (s_author_tag[0:2], s_author_tag[0:2], s_author_tag)) if not options["modified"]: increment = 50000 if options["verbose"] >= 9: write_message("Rebalancing") starset.setLastUpdated(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())) for key in sql_src: ij = -increment while ij <= (cfg_max_recID): ij = ij + increment data = run_sql(key + " AND id_bibrec>%i AND id_bibrec<=%i" % (ij, (ij + increment))) authorimpact_modified(data, Auth1) else: fromDB(starset) mod_data = run_sql("select id from bibrec where modification_date >= %s", (options["modified"],)) if options["modified"] == "last_updated": options["modified"] = starset.getLastUpdated() if options["verbose"] >= 9: write_message("Updating records modified after: %s" % options["modified"]) starset.setLastUpdated(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())) for key in sql_src: for id in mod_data: data = run_sql(key + " AND id_bibrec=%s" % id[0]) authorimpact_modified(data, Auth1) for key2,value in data: upd_authors.append((key2,value)) for key in Auth1.keys(): for key2 in sql_src: data = run_sql(key2 + " AND value=%s", (key,)) authorimpact_modified(data, Auth1) del data Auth = [] for key in Auth1.keys(): for key1 in range(0, len(Auth1[key])): Auth.append((Auth1[key][key1], key)) del Auth1 factor = 0.0 for key, value in Auth: if tempdoc.has_key(key) and tempdoc[key][1] > 0.0: factor = tempdoc[key][1] else: factor = 0.0 if not authors.has_key(value): authors[value] = (factor, 1) else: authors[value] = (authors[value][0] + factor, authors[value][1] + 1) if options["modified"]: Auth = upd_authors tempdoc = {} for key,value in Auth: if documents2.has_key(key) and authors[value][0] > 0.0: documents2[key] = (documents2[key][0] + authors[value][0], documents2[key][1] + authors[value][1]) elif authors[value][0] > 0.0: documents2[key] = authors[value] del Auth for key in documents2.keys(): tempdoc[key] = ("", float(documents2[key][0]) / float(documents2[key][1])) if options["verbose"] >= 9: write_message("Number of distinct authors: %s" % len(authors)) if not options["modified"]: for key in tempdoc.keys(): if len(tempdoc[key][0]) != 0: tempdoc[key] = ("", -1.0) starset.setUnsorted(tempdoc) sort(starset) else: merge_two_sets(tempdoc,starset) intoDB(starset) showtime((time.time() - startCreate)) def authorimpact_modified(data, Auth): """Adding data to the dictionary""" for key,value in data: if not Auth.has_key(value): Auth[value] = [] Auth[value].append(key) else: found=0 for key2 in range(0, len(Auth[value])): if Auth[value][key2] == key: found = 1 break if not found == 1: Auth[value].append(key) def merge_exec(rank_method_code, starsets, config): """Merge several methods into one starset""" startCreate = time.time() if options["verbose"] >= 1: write_message("Running: %s" % starsets[rank_method_code].getName()) starsets[rank_method_code].setLastUpdated(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())) threshold = {} finalset = {} permut = '' for nr in range(0, starsets[rank_method_code].getSize() + 1): finalset[nr] = HitSet() permut = permut + "%s" % nr starsets[rank_method_code].setWeigth(1.0) sum = 0.0 nr = 0 convert = {} size=-1 for key in starsets: if key != rank_method_code: sum = sum + starsets[key].getWeigth() convert[nr] = key nr=nr + 1 if size > -1 and size != len(starsets[key].getStars()) -1: write_message("The sets have different sizes, process cancelled") sys.exit() else: size = len(starsets[key].getStars()) -1 sum = 1.0 / sum for key in starsets: if key != rank_method_code: starsets[key].setWeigth(starsets[key].getWeigth() * sum) p = Permutation(permut, len(starsets)-1) for perm in p: tempset = copy.copy(starsets[convert[0]].getStar(int(perm[0]))) place = float(perm[0]) * float(starsets[convert[0]].getWeigth()) for i in range(1, len(perm)): tempset.intersect(starsets[convert[i]].getStar(int(perm[i]))) tempset.calculate_nbhits() place = place+float(perm[i]) * float(starsets[convert[i]].getWeigth()) finalset[int(round(place))].union(tempset) for i in range(0, starsets[rank_method_code].getSize() + 1): finalset[i].calculate_nbhits() threshold[i] = 0 starsets[rank_method_code].setStars(finalset) starsets[rank_method_code].setThreshold(threshold) intoDB(starsets[rank_method_code]) showtime((time.time() - startCreate)) def showtime(timeused): """Show time used for method""" if options["verbose"] >= 9: write_message("Time used: %d second(s)." % timeused) def stats2(starset): """Print statistics""" try: total = 0 write_message("Statistics: %s , Top Star size: %s%% , Overall Importance: %s%%" % (starset.getName(), round(float(starset.getTopStar()) * 100,2), round(float(starset.getWeigth())*100, 2))) for nr in range(0, starset.getSize() + 1): write_message("%s star(s): Range >= \t%s\t%s" % (nr, round(starset.getThreshold()[nr],3), (starset.getStar(nr))._nbhits)) total = total + (starset.getStar(nr))._nbhits write_message("Total: %s" % total) except StandardError, e: write_message("Error showing statistics: %s" % starset.getName(), sys.stderr()) raise StandardError def check(starset): """Check if rebalancing is necessary""" try: size = cfg_max_recID + 1 - starset.getStar(0)._nbhits if options["verbose"] >= 9: for nr in range(1, starset.getSize() + 1): write_message("%s---%f" % (nr, float(starset.getStar(nr)._nbhits) / float(size))) if (float(starset.getStar(starset.getSize())._nbhits) / float(size)) >= float(options["check"]): write_message("Rebalance: %s" % starset.getName()) except StandardError, e: write_message("Error checking: %s" % starset.getName(), sys.stderr) raise StandardError def compare_on_val(first, second): return cmp(second[1], first[1]) class Permutation: """Creates permutations""" def __init__(self, values, length): self.values = values self.length = length return def __len__(self): return len(self.values) ** self.length def __getitem__(self, n): """permutation number n""" if n >= len(self): raise IndexError res = [] lv = len(self.values) vals = self.values for ndx in range(self.length): res.append( vals[ n % lv ]) n = n / lv return res def _db_login(relogin = 0): """Login to the database""" global DB_CONN if relogin: DB_CONN = MySQLdb.connect(host=options["dbhost"], db=options["dbname"], user=options["dbuser"], passwd=options["dbpass"]) return DB_CONN else: #try: if 1==1: d = DB_CONN return d #except NameError: # DB_CONN = MySQLdb.connect(host=dbhost, db=dbname, user=dbuser, passwd=dbpass) # return DB_CONN def run_sql2(sql, param=None, n=0, with_desc=0): """ Runs SQL on the server and returns result""" db = _db_login(1) if param: param = tuple(param) try: cur = db.cursor() rc = cur.execute(sql, param) except: db = _db_login(relogin = 1) cur = db.cursor() rc = cur.execute(sql, param) if string.upper(string.split(sql)[0]) in ("SELECT", "SHOW", "DESC", "DESCRIBE"): if n: recset = cur.fetchmany(n) else: recset = cur.fetchall() if with_desc: return recset, cur.description else: return recset else: if string.upper(string.split(sql)[0]) == "INSERT": rc = cur.insert_id() return rc #def citationimpact(row): # return bibrank_engine(row) def accessimpact(row, run): return bibrank_engine(row, run) #def authorimpact(row): # return bibrank_engine(row) #def merge(row): -# return bibrank_engine(row) \ No newline at end of file +# return bibrank_engine(row) diff --git a/modules/bibrank/lib/bibrank_tag_based_indexer.py.wml b/modules/bibrank/lib/bibrank_tag_based_indexer.py.wml index ae062bd7d..e63dc8ba4 100644 --- a/modules/bibrank/lib/bibrank_tag_based_indexer.py.wml +++ b/modules/bibrank/lib/bibrank_tag_based_indexer.py.wml @@ -1,945 +1,933 @@ -##Ranking of records using different parameters and methods. +## $Id$ +## Ranking of records using different parameters and methods. ## This file is part of the CERN Document Server Software (CDSware). ## Copyright (C) 2002 CERN. ## ## The CDSware is free software; you can redistribute it and/or ## modify it under the terms of the GNU General Public License as ## published by the Free Software Foundation; either version 2 of the ## License, or (at your option) any later version. ## ## The CDSware is distributed in the hope that it will be useful, but ## WITHOUT ANY WARRANTY; without even the implied warranty of ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU ## General Public License for more details. ## ## You should have received a copy of the GNU General Public License ## along with CDSware; if not, write to the Free Software Foundation, Inc., ## 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA. ## read config variables: -#include "config.wml" -#include "configbis.wml" #include "cdswmllib.wml" -## start Python: -#! # -*- coding: utf-8 -*- ## $Id$ ## DO NOT EDIT THIS FILE! IT WAS AUTOMATICALLY GENERATED FROM CDSware WML SOURCES. + __version__ = "<: print generate_pretty_version_string('$Id$'); :>" -## fill config variables: -pylibdir = "/python" - -try: - from marshal import loads,dumps - from zlib import compress,decompress - from string import split,translate,lower,upper - import getopt - import getpass - import string - import os - import sre - import sys - import time - import MySQLdb - import Numeric - import urllib - import signal - import tempfile - import unicodedata - import traceback - import cStringIO - import re - import copy - import types - import ConfigParser -except ImportError, e: - import sys - -try: - sys.path.append('%s' % pylibdir) - from cdsware.config import * - from cdsware.search_engine_config import cfg_max_recID - from cdsware.search_engine import perform_request_search, strip_accents - from cdsware.search_engine import HitSet, get_index_id, create_basic_search_units - from cdsware.dbquery import run_sql -except ImportError, e: - import sys +from marshal import loads,dumps +from zlib import compress,decompress +from string import split,translate,lower,upper +import getopt +import getpass +import string +import os +import sre +import sys +import time +import MySQLdb +import Numeric +import urllib +import signal +import tempfile +import unicodedata +import traceback +import cStringIO +import re +import copy +import types +import ConfigParser + +from config import * +from search_engine_config import cfg_max_recID +from search_engine import perform_request_search, strip_accents +from search_engine import HitSet, get_index_id, create_basic_search_units +from dbquery import run_sql options = {} def single_tag_rank_method_exec(rank_method_code, name, config): """Creating the rank method data""" startCreate = time.time() rnkset = {} rnkset_old = fromDB(rank_method_code) date = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) rnkset_new = single_tag_rank(config) rnkset = union_dicts(rnkset_old, rnkset_new) intoDB(rnkset, date, rank_method_code) def single_tag_rank(config): """Connect the given tag with the data from the kb file given""" if options["verbose"] >= 9: write_message("Loading knowledgebase file") kb_data = {} records = [] write_message("Reading knowledgebase file: %s" % config.get(config.get("rank_method", "function"), "kb_src")) input = open(config.get(config.get("rank_method", "function"), "kb_src"), 'r') data = input.readlines() for line in data: if not line[0:1] == "#": kb_data[string.strip((string.split(string.strip(line),"---"))[0])] = (string.split(string.strip(line), "---"))[1] write_message("Number of lines read from knowledgebase file: %s" % len(kb_data)) tag = config.get(config.get("rank_method", "function"),"tag") tags = split(config.get(config.get("rank_method", "function"), "check_mandatory_tags"),",") if tags == ['']: tags = "" records = [] for (recids,recide) in options["recid_range"]: write_message("......Processing records #%s-%s" % (recids, recide)) recs = run_sql("SELECT id_bibrec,value FROM bib%sx,bibrec_bib%sx WHERE tag='%s' AND id_bibxxx=id and id_bibrec >=%s and id_bibrec<=%s" % (tag[0:2], tag[0:2], tag, recids, recide)) valid = HitSet(Numeric.ones(cfg_max_recID + 1)) for key in tags: newset = HitSet() newset.addlist(run_sql("SELECT id_bibrec FROM bib%sx,bibrec_bib%sx WHERE id_bibxxx=id AND tag='%s' AND id_bibxxx=id and id_bibrec >=%s and id_bibrec<=%s" % (tag[0:2], tag[0:2], key, recids, recide))) valid.intersect(newset) if tags: recs = filter(lambda x: valid.contains(x[0]), recs) records = records + list(recs) write_message("Number of records found with the necessary tags: %s" % len(records)) records = filter(lambda x: options["validset"].contains(x[0]), records) rnkset = {} for key,value in records: if kb_data.has_key(value): if not rnkset.has_key(key): rnkset[key] = float(kb_data[value]) else: if kb_data.has_key(rnkset[key]) and float(kb_data[value]) > float((rnkset[key])[1]): rnkset[key] = float(kb_data[value]) else: rnkset[key] = 0 write_message("Number of records available in rank method: %s" % len(rnkset)) return rnkset def get_lastupdated(rank_method_code): """Get the last time the rank method was updated""" res = run_sql("SELECT rnkMETHOD.last_updated FROM rnkMETHOD WHERE name='%s'" % rank_method_code) if res: return res[0][0] else: raise Exception("Is this the first run? Please do a complete update.") def intoDB(dict, date, rank_method_code): """Insert the rank method data into the database""" id = run_sql("SELECT id from rnkMETHOD where name='%s'" % rank_method_code) del_rank_method_codeDATA(rank_method_code) run_sql("INSERT INTO rnkMETHODDATA(id_rnkMETHOD, relevance_data) VALUES ('%s','%s')" % (id[0][0], serialize_via_marshal(dict))) run_sql("UPDATE rnkMETHOD SET last_updated='%s' WHERE name='%s'" % (date, rank_method_code)) def fromDB(rank_method_code): """Get the data for a rank method""" id = run_sql("SELECT id from rnkMETHOD where name='%s'" % rank_method_code) res = run_sql("SELECT relevance_data FROM rnkMETHODDATA WHERE id_rnkMETHOD=%s" % id[0][0]) if res: return deserialize_via_marshal(res[0][0]) else: return {} def del_rank_method_codeDATA(rank_method_code): """Delete the data for a rank method""" id = run_sql("SELECT id from rnkMETHOD where name='%s'" % rank_method_code) res = run_sql("DELETE FROM rnkMETHODDATA WHERE id_rnkMETHOD=%s" % id[0][0]) def del_recids(rank_method_code, range): """Delete some records from the rank method""" id = run_sql("SELECT id from rnkMETHOD where name='%s'" % rank_method_code) res = run_sql("SELECT relevance_data FROM rnkMETHODDATA WHERE id_rnkMETHOD=%s" % id[0][0]) if res: rec_dict = deserialize_via_marshal(res[0][0]) write_message("Old size: %s" % len(rec_dict)) for (recids,recide) in range: for i in range(int(recids), int(recide)): if rec_dict.has_key(i): del rec_dict[i] write_messag("New size: %s" % len(rec_dict)) date = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) intoDB(rec_dict, date, rank_method_code) else: print "Create before deleting!" def union_dicts(dict1, dict2): "Returns union of the two dicts." union_dict = {} for (key, value) in dict1.iteritems(): union_dict[key] = value for (key, value) in dict2.iteritems(): union_dict[key] = value return union_dict def rank_method_code_statistics(rank_method_code): """Print statistics""" method = fromDB(rank_method_code) max = ('',-999999) maxcount = 0 min = ('',999999) mincount = 0 for (recID, value) in method.iteritems(): if value < min and value > 0: min = value if value > max: max = value for (recID, value) in method.iteritems(): if value == min: mincount += 1 if value == max: maxcount += 1 write_message("Showing statistic for selected method") write_message("Method name: %s" % getName(rank_method_code)) write_message("Short name: %s" % rank_method_code) write_message("Last run: %s" % get_lastupdated(rank_method_code)) write_message("Number of records: %s" % len(method)) write_message("Lowest value: %s - Number of records: %s" % (min, mincount)) write_message("Highest value: %s - Number of records: %s" % (max, maxcount)) write_message("Divided into 10 sets:") for i in range(1,11): setcount = 0 distinct_values = {} lower = -1.0 + ((float(max + 1) / 10)) * (i - 1) upper = -1.0 + ((float(max + 1) / 10)) * i for (recID, value) in method.iteritems(): if value >= lower and value <= upper: setcount += 1 distinct_values[value] = 1 write_message("Set %s (%s-%s) %s Distinct values: %s" % (i, lower, upper, len(distinct_values), setcount)) def check_method(rank_method_code): write_message("Checking rank method...") if len(fromDB(rank_method_code)) == 0: write_message("Rank method not yet executed, please run it to create the necessary data.") else: if len(add_date(rank_method_code)) > 0: write_message("Records modified, update recommended") else: write_message("No records modified, update not necessary") def write_message(msg, stream = sys.stdout): """Write message and flush output stream (may be sys.stdout or sys.stderr). Useful for debugging stuff.""" if stream == sys.stdout or stream == sys.stderr: stream.write(time.strftime("%Y-%m-%d %H:%M:%S --> ", time.localtime())) stream.write("%s\n" % msg) stream.flush() else: sys.stderr.write("Unknown stream %s. [must be sys.stdout or sys.stderr]\n" % stream) return def get_datetime(var, format_string="%Y-%m-%d %H:%M:%S"): """Returns a date string according to the format string. It can handle normal date strings and shifts with respect to now.""" date = time.time() shift_re = sre.compile("([-\+]{0,1})([\d]+)([dhms])") factors = {"d":24*3600, "h":3600, "m":60, "s":1} m = shift_re.match(var) if m: sign = m.groups()[0] == "-" and -1 or 1 factor = factors[m.groups()[2]] value = float(m.groups()[1]) date = time.localtime(date + sign * factor * value) date = time.strftime(format_string, date) else: date = time.strptime(var, format_string) date = time.strftime(format_string, date) return date def task_sig_sleep(sig, frame): """Signal handler for the 'sleep' signal sent by BibSched.""" if options["verbose"]>= 9: write_message("got signal %d" % sig) write_message("sleeping...") task_update_status("SLEEPING") signal.pause() # wait for wake-up signal def task_sig_wakeup(sig, frame): """Signal handler for the 'wakeup' signal sent by BibSched.""" if options["verbose"]>= 9: write_message("got signal %d" % sig) write_message("continuing...") task_update_status("CONTINUING") def task_sig_stop(sig, frame): """Signal handler for the 'stop' signal sent by BibSched.""" if options["verbose"]>= 9: write_message("got signal %d" % sig) write_message("stopping...") task_update_status("STOPPING") errcode = 0 try: task_sig_stop_commands() write_message("stopped") task_update_status("STOPPED") except StandardError, err: write_message("Error during stopping! %e" % err) task_update_status("STOPPINGFAILED") errcode = 1 sys.exit(errcode) def task_sig_stop_commands(): """Do all the commands necessary to stop the task before quitting. Useful for task_sig_stop() handler. """ write_message("stopping commands started") write_message("stopping commands ended") def task_sig_suicide(sig, frame): """Signal handler for the 'suicide' signal sent by BibSched.""" if options["verbose"]>= 9: write_message("got signal %d" % sig) write_message("suiciding myself now...") task_update_status("SUICIDING") write_message("suicided") task_update_status("SUICIDED") sys.exit(0) def task_sig_unknown(sig, frame): """Signal handler for the other unknown signals sent by shell or user.""" if options["verbose"]>= 9: write_message("got signal %d" % sig) write_message("unknown signal %d ignored" % sig) # do nothing for other signals def task_update_progress(msg): """Updates progress information in the BibSched task table.""" query = "UPDATE schTASK SET progress='%s' where id=%d" % (MySQLdb.escape_string(msg), task_id) if options["verbose"]>= 9: write_message(query) run_sql(query) return def task_update_status(val): """Updates state information in the BibSched task table.""" query = "UPDATE schTASK SET status='%s' where id=%d" % (MySQLdb.escape_string(val), task_id) if options["verbose"]>= 9: write_message(query) run_sql(query) return def split_ranges(parse_string): recIDs = [] ranges = string.split(parse_string, ",") for range in ranges: tmp_recIDs = string.split(range, "-") if len(tmp_recIDs)==1: recIDs.append([int(tmp_recIDs[0]), int(tmp_recIDs[0])]) else: if int(tmp_recIDs[0]) > int(tmp_recIDs[1]): # sanity check tmp = tmp_recIDs[0] tmp_recIDs[0] = tmp_recIDs[1] tmp_recIDs[1] = tmp recIDs.append([int(tmp_recIDs[0]), int(tmp_recIDs[1])]) return recIDs def bibrank_engine(row, run): """Run the indexing task. The row argument is the BibSched task queue row, containing if, arguments, etc. Return 1 in case of success and 0 in case of failure. """ try: import psyco psyco.bind(single_tag_rank) psyco.bind(single_tag_rank_method_exec) psyco.bind(serialize_via_numeric_array) psyco.bind(deserialize_via_numeric_array) #psyco.bind(authorimpact_exec) #psyco.bind(merge_exec) #psyco.bind(citationimpact_exec) psyco.bind(accessimpact_exec) except StandardError, e: print "Psyco ERROR",e startCreate = time.time() global options, task_id task_id = row[0] task_proc = row[1] options = loads(row[6]) task_starting_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) signal.signal(signal.SIGUSR1, task_sig_sleep) signal.signal(signal.SIGTERM, task_sig_stop) signal.signal(signal.SIGABRT, task_sig_suicide) signal.signal(signal.SIGCONT, task_sig_wakeup) signal.signal(signal.SIGINT, task_sig_unknown) sets = {} try: options["run"] = [] options["run"].append(run) for rank_method_code in options["run"]: cfg_name = getName(rank_method_code) if options["verbose"] >= 0: write_message("Running rank method: %s." % cfg_name) file = etcdir + "/bibrank/" + rank_method_code + ".cfg" config = ConfigParser.ConfigParser() try: config.readfp(open(file)) except StandardError, e: write_message("Cannot find configurationfile: %s" % file, sys.stderr) raise StandardError cfg_short = rank_method_code cfg_function = config.get("rank_method", "function") + "_exec" cfg_name = getName(cfg_short) options["validset"] = get_valid_range(rank_method_code) if options["collection"]: l_of_colls = string.split(options["collection"], ",") recIDs = perform_request_search(c=l_of_colls) recIDs_range = [] for recID in recIDs: recIDs_range.append([recID,recID]) options["recid_range"] = recIDs_range elif options["id"]: options["recid_range"] = options["id"] elif options["modified"]: options["recid_range"] = add_date(rank_method_code, options["modified"]) elif options["last_updated"]: options["recid_range"] = add_date(rank_method_code) else: if options["verbose"] > 1: write_message("No records specified, updating all") min_id = run_sql("SELECT min(id) from bibrec")[0][0] max_id = run_sql("SELECT max(id) from bibrec")[0][0] options["recid_range"] = [[min_id, max_id]] if options["quick"] == "no" and options["verbose"] >= 9: write_message("Rebalance not yet enabled, parameter ignored.") if options["cmd"] == "del": del_recids(cfg_short, options["recid_range"]) elif options["cmd"] == "add": func_object = globals().get(cfg_function) func_object(rank_method_code, cfg_name, config) elif options["cmd"] == "stat": rank_method_code_statistics(rank_method_code) elif options["cmd"] == "check": check_method(rank_method_code) else: write_message("Invalid command found processing %s" % rank_method_code, sys.stderr) raise StandardError except StandardError, e: write_message("\nException caught: %s" % e, sys.stderr) if options["verbose"] >= 9: traceback.print_tb(sys.exc_info()[2]) raise StandardError if options["verbose"]: showtime((time.time() - startCreate)) return 1 def get_valid_range(rank_method_code): """Return a range of records""" if options["verbose"] >=9: write_message("Getting records from collections enabled for rank method.") res = run_sql("SELECT collection.name FROM collection,collection_rnkMETHOD,rnkMETHOD WHERE collection.id=id_collection and id_rnkMETHOD=rnkMETHOD.id and rnkMETHOD.name='%s'" % rank_method_code) l_of_colls = [] for coll in res: l_of_colls.append(coll[0]) if len(l_of_colls) > 0: recIDs = perform_request_search(c=l_of_colls) else: recIDs = [] valid = HitSet() valid.addlist(recIDs) return valid def add_date(rank_method_code, date=""): """If date is not set, then retrieve it from the database. Reindex all formats newer than the modification date""" if not date: try: date = (get_lastupdated(rank_method_code),'') except Exception, e: date = "0000-00-00 00:00:00" query = """SELECT b.id FROM bibrec AS b WHERE b.modification_date >= '%s'""" % date[0] if date[1]: query += "and b.modification_date <= '%s'" % date[1] query += "ORDER BY b.id ASC""" res = run_sql(query) list = create_range_list(res) if not list: if options["verbose"]: write_message("No new records added since last time method was run") return list def getName(rank_method_code, ln=cdslang, type='ln'): """Returns the name of the method if it exists""" try: rnkid = run_sql("SELECT id FROM rnkMETHOD where name='%s'" % rank_method_code) if rnkid: rnkid = str(rnkid[0][0]) res = run_sql("SELECT value FROM rnkMETHODNAME where type='%s' and ln='%s' and id_rnkMETHOD=%s" % (type, ln, rnkid)) if not res: res = run_sql("SELECT value FROM rnkMETHODNAME WHERE ln='%s' and id_rnkMETHOD=%s and type='%s'" % (cdslang, rnkid, type)) if not res: return rank_method_code return res[0][0] else: raise Exception except Exception, e: write_message("Cannot run rank method, either given code for method is wrong, or it has not been added using the webinterface.") raise Exception def create_range_list(res): """Creates a range list from a recID select query result contained in res. The result is expected to have ascending numerical order.""" if not res: return [] row = res[0] if not row: return [] else: range_list = [[row[0],row[0]]] for row in res[1:]: id = row[0] if id == range_list[-1][1] + 1: range_list[-1][1] = id else: range_list.append([id,id]) return range_list def single_tag_rank_method(row, run): return bibrank_engine(row, run) def serialize_via_numeric_array_dumps(arr): return Numeric.dumps(arr) def serialize_via_numeric_array_compr(str): return compress(str) def serialize_via_numeric_array_escape(str): return MySQLdb.escape_string(str) def serialize_via_numeric_array(arr): """Serialize Numeric array into a compressed string.""" return serialize_via_numeric_array_escape(serialize_via_numeric_array_compr(serialize_via_numeric_array_dumps(arr))) def deserialize_via_numeric_array(string): """Decompress and deserialize string into a Numeric array.""" return Numeric.loads(decompress(string)) def serialize_via_marshal(obj): """Serialize Python object via marshal into a compressed string.""" return MySQLdb.escape_string(compress(dumps(obj))) def deserialize_via_marshal(string): """Decompress and deserialize string into a Python object via marshal.""" return loads(decompress(string)) def accessimpact_exec(rank_method_code, name, config): """Generating rankset based on number of downloads per document""" startCreate = time.time() options["dbhost"] = config.get("accessimpact", "dbhost") options["dbname"] = config.get("accessimpact", "dbname") options["dbuser"] = config.get("accessimpact", "dbuser") options["dbpass"] = config.get("accessimpact", "dbpass") date = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) sysno_tag = config.get("accessimpact", "sysnr_tag") curr_repnr_tag = config.get("accessimpact", "curr_tag") old_repnr_tag = config.get("accessimpact", "old_tag") impacc = {} if 1: #not options["modified"]: imprec = run_sql2("SELECT imprecno,base,bsysno,bref FROM imprec") impacc = dict(run_sql2("SELECT imprecno,SUM(nbaccess) FROM impacc group BY imprecno")) cdssysno = run_sql("SELECT value,id_bibrec FROM bib%sx,bibrec_bib%sx WHERE tag='%s' AND id_bibxxx=id" % (sysno_tag[0:2], sysno_tag[0:2], sysno_tag)) else: fromDB(starset) impacc = {} if options["verbose"] >= 9: write_message("Updating records modified after: %s" % options["modified"]) pre_impacc = dict(run_sql2("SELECT distinct imprecno,'' FROM impacc WHERE sdate >=%s", (options["modified"],))) imprec = [] cdssysno = [] for key in pre_impacc.keys(): test_impacc = run_sql2("SELECT imprecno,SUM(nbaccess) FROM impacc WHERE imprecno=%s GROUP BY imprecno", (key,)) impacc[test_impacc[0][0]] = test_impacc[0][1] data = run_sql2("SELECT imprecno,base,bsysno,bref FROM imprec WHERE imprecno=%s", (key,)) if data: data2 = run_sql2("SELECT imprecno FROM imprec WHERE bsysn=%s", (data[0][2],)) for key2 in data2: imprec.append((key2, data[0][1], data[0][2], data[0][3])) sysno = '0' * (9 - len(str(data[0][2]))) + str(data[0][2]) + data[0][1][0:3] data = run_sql("SELECT value,id_bibrec FROM bib%sx,bibrec_bib%sx WHERE tag='%s' AND id_bibxxx=id AND value='%s'" % sysno_tag[0:2], sysno_tag [0:2], sysno_tag, sysno) for key2,value in data: cdssysno.append((key2, value)) tempdict = {} for value,key in cdssysno: if not tempdict.has_key(value): tempdict[value] = [key] else: tempdict[value] = tempdict[value] + [key] tempdoc = {} count = 0 notcount = 0 for key, base, bsysno, value in imprec: if impacc.has_key(key): sysno = '0' * (9 - len(str(bsysno))) + str(bsysno) + base[0:3] data = () if tempdict.has_key(sysno): data = tempdict[sysno] else: data = run_sql("SELECT id_bibrec FROM bib%sx,bibrec_bib%sx WHERE tag='%s' AND id_bibxxx=id AND value='%s'" % (curr_repnr_tag[0:2], curr_repnr_tag[0:2], curr_repnr_tag, value)) if len(data) == 0: data = run_sql("SELECT id_bibrec FROM bib%sx,bibrec_bib%sx WHERE tag='%s' AND id_bibxxx=id AND value='%s'" % (old_repnr_tag[0:2], old_repnr_tag[0:2], old_repnr_tag, value)) if len(data) != 0: count = count + int(impacc[key]) for key2 in range(0, len(data)): if type(data[key2]) is tuple: key3 = data[key2][0] else: key3 = data[key2] if tempdoc.has_key(key3): tempdoc[key3] = int(tempdoc[key3] + float(impacc[key])) else: tempdoc[key3] = int(impacc[key]) else: notcount = notcount + int(impacc[key]) if options["verbose"] >= 9: try: write_message("Percentage of accesses matched with a record: %s%%,(%s/%s)" % (round((float(count) / float(count+notcount)) * 100, 3), notcount, count)) write_message("Number of records available in rank method: %s" % len(tempdoc)) except: print count, notcount intoDB(tempdoc, date, rank_method_code) #------------------------------------------------------------ #---------------BELOW IS OLD CODE, NOT WORKING ATM----------- #------------------------------------------------------------ def citationimpact_exec(rank_method_code, name, config): """Calculates rankset based on number of citations each document has""" startCreate = time.time() tempdoc = {} if options["verbose"] >= 1: write_message("Running: %s." % name) citation_tag = config.get("citationimpact", "citation_tag") curr_repnr_tag = config.get("citationimpact", "curr_tag") old_repnr_tag = config.get("citationimpact", "old_tag") if not options["modified"]: if options["verbose"] >= 9: write_message("Rebalancing") starset.setLastUpdated(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())) citations = run_sql("SELECT id_bibrec,value FROM bib%sx,bibrec_bib%sx WHERE tag='%s' AND id_bibxxx=id" % (citation_tag[0:2], citation_tag[0:2], citation_tag)) else: fromDB(starset) if options["modified"] == "last_updated": options["modified"] = starset.getLastUpdated() if options["verbose"] >= 9: write_message("Updating records modified after: %s" % options["modified"]) starset.setLastUpdated(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())) mod_data = run_sql("SELECT id FROM bibrec WHERE modification_date >=%s", (options["modified"]),) for id in mod_data: citation = run_sql("SELECT id_bibrec,value FROM bib%sx,bibrec_bib%sx WHERE tag='%s' AND id_bibxxx=id and id_bibrec=%s" % (citation_tag[0:2], citation_tag[0:2], citation_tag, id)) for id,value in citation: citations.append((id,value)) for key,value in citations: data = run_sql("SELECT id_bibrec FROM bib%sx,bibrec_bib%sx WHERE tag='%s' AND id_bibxxx=id AND value='%s'" % (curr_repnr_tag[0:2], curr_repnr_tag[0:2], curr_repnr_tag, value)) data = run_sql("SELECT id_bibrec FROM bib%sx,bibrec_bib%sx WHERE tag='%s' AND id_bibxxx=id AND value='%s'" % (old_repnr_tag[0:2], old_repnr_tag[0:2], old_repnr_tag, value)) if not options["modified"]: starset.setUnsorted(tempdoc) sort(starset) else: merge_two_sets(tempdoc, starset) #intoDB(starset) showtime((time.time() - startCreate)) def authorimpact_exec(rank_method_code, starsets, config): """Calculating the rankvalue a document has based on its authors""" startCreate = time.time() starset = starsets[rank_method_code] if options["verbose"] >= 1: write_message("Running: %s" % starset.getName()) tempdoc = single_tag_rank(starset, config) Auth1 = {} documents2 = {} authors = {} upd_authors = [] sql_src = [] p_author_tag = config.get("authorimpact", "primary_tag") s_author_tag = config.get("authorimpact", "secondary_tag") sql_src.append("SELECT id_bibrec,value FROM bib%sx,bibrec_bib%sx WHERE tag='%s' AND id_bibxxx=id" % (p_author_tag[0:2], p_author_tag[0:2], p_author_tag)) sql_src.append("SELECT id_bibrec,value FROM bib%sx,bibrec_bib%sx WHERE tag='%s' AND id_bibxxx=id" % (s_author_tag[0:2], s_author_tag[0:2], s_author_tag)) if not options["modified"]: increment = 50000 if options["verbose"] >= 9: write_message("Rebalancing") starset.setLastUpdated(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())) for key in sql_src: ij = -increment while ij <= (cfg_max_recID): ij = ij + increment data = run_sql(key + " AND id_bibrec>%i AND id_bibrec<=%i" % (ij, (ij + increment))) authorimpact_modified(data, Auth1) else: fromDB(starset) mod_data = run_sql("select id from bibrec where modification_date >= %s", (options["modified"],)) if options["modified"] == "last_updated": options["modified"] = starset.getLastUpdated() if options["verbose"] >= 9: write_message("Updating records modified after: %s" % options["modified"]) starset.setLastUpdated(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())) for key in sql_src: for id in mod_data: data = run_sql(key + " AND id_bibrec=%s" % id[0]) authorimpact_modified(data, Auth1) for key2,value in data: upd_authors.append((key2,value)) for key in Auth1.keys(): for key2 in sql_src: data = run_sql(key2 + " AND value=%s", (key,)) authorimpact_modified(data, Auth1) del data Auth = [] for key in Auth1.keys(): for key1 in range(0, len(Auth1[key])): Auth.append((Auth1[key][key1], key)) del Auth1 factor = 0.0 for key, value in Auth: if tempdoc.has_key(key) and tempdoc[key][1] > 0.0: factor = tempdoc[key][1] else: factor = 0.0 if not authors.has_key(value): authors[value] = (factor, 1) else: authors[value] = (authors[value][0] + factor, authors[value][1] + 1) if options["modified"]: Auth = upd_authors tempdoc = {} for key,value in Auth: if documents2.has_key(key) and authors[value][0] > 0.0: documents2[key] = (documents2[key][0] + authors[value][0], documents2[key][1] + authors[value][1]) elif authors[value][0] > 0.0: documents2[key] = authors[value] del Auth for key in documents2.keys(): tempdoc[key] = ("", float(documents2[key][0]) / float(documents2[key][1])) if options["verbose"] >= 9: write_message("Number of distinct authors: %s" % len(authors)) if not options["modified"]: for key in tempdoc.keys(): if len(tempdoc[key][0]) != 0: tempdoc[key] = ("", -1.0) starset.setUnsorted(tempdoc) sort(starset) else: merge_two_sets(tempdoc,starset) intoDB(starset) showtime((time.time() - startCreate)) def authorimpact_modified(data, Auth): """Adding data to the dictionary""" for key,value in data: if not Auth.has_key(value): Auth[value] = [] Auth[value].append(key) else: found=0 for key2 in range(0, len(Auth[value])): if Auth[value][key2] == key: found = 1 break if not found == 1: Auth[value].append(key) def merge_exec(rank_method_code, starsets, config): """Merge several methods into one starset""" startCreate = time.time() if options["verbose"] >= 1: write_message("Running: %s" % starsets[rank_method_code].getName()) starsets[rank_method_code].setLastUpdated(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())) threshold = {} finalset = {} permut = '' for nr in range(0, starsets[rank_method_code].getSize() + 1): finalset[nr] = HitSet() permut = permut + "%s" % nr starsets[rank_method_code].setWeigth(1.0) sum = 0.0 nr = 0 convert = {} size=-1 for key in starsets: if key != rank_method_code: sum = sum + starsets[key].getWeigth() convert[nr] = key nr=nr + 1 if size > -1 and size != len(starsets[key].getStars()) -1: write_message("The sets have different sizes, process cancelled") sys.exit() else: size = len(starsets[key].getStars()) -1 sum = 1.0 / sum for key in starsets: if key != rank_method_code: starsets[key].setWeigth(starsets[key].getWeigth() * sum) p = Permutation(permut, len(starsets)-1) for perm in p: tempset = copy.copy(starsets[convert[0]].getStar(int(perm[0]))) place = float(perm[0]) * float(starsets[convert[0]].getWeigth()) for i in range(1, len(perm)): tempset.intersect(starsets[convert[i]].getStar(int(perm[i]))) tempset.calculate_nbhits() place = place+float(perm[i]) * float(starsets[convert[i]].getWeigth()) finalset[int(round(place))].union(tempset) for i in range(0, starsets[rank_method_code].getSize() + 1): finalset[i].calculate_nbhits() threshold[i] = 0 starsets[rank_method_code].setStars(finalset) starsets[rank_method_code].setThreshold(threshold) intoDB(starsets[rank_method_code]) showtime((time.time() - startCreate)) def showtime(timeused): """Show time used for method""" if options["verbose"] >= 9: write_message("Time used: %d second(s)." % timeused) def stats2(starset): """Print statistics""" try: total = 0 write_message("Statistics: %s , Top Star size: %s%% , Overall Importance: %s%%" % (starset.getName(), round(float(starset.getTopStar()) * 100,2), round(float(starset.getWeigth())*100, 2))) for nr in range(0, starset.getSize() + 1): write_message("%s star(s): Range >= \t%s\t%s" % (nr, round(starset.getThreshold()[nr],3), (starset.getStar(nr))._nbhits)) total = total + (starset.getStar(nr))._nbhits write_message("Total: %s" % total) except StandardError, e: write_message("Error showing statistics: %s" % starset.getName(), sys.stderr()) raise StandardError def check(starset): """Check if rebalancing is necessary""" try: size = cfg_max_recID + 1 - starset.getStar(0)._nbhits if options["verbose"] >= 9: for nr in range(1, starset.getSize() + 1): write_message("%s---%f" % (nr, float(starset.getStar(nr)._nbhits) / float(size))) if (float(starset.getStar(starset.getSize())._nbhits) / float(size)) >= float(options["check"]): write_message("Rebalance: %s" % starset.getName()) except StandardError, e: write_message("Error checking: %s" % starset.getName(), sys.stderr) raise StandardError def compare_on_val(first, second): return cmp(second[1], first[1]) class Permutation: """Creates permutations""" def __init__(self, values, length): self.values = values self.length = length return def __len__(self): return len(self.values) ** self.length def __getitem__(self, n): """permutation number n""" if n >= len(self): raise IndexError res = [] lv = len(self.values) vals = self.values for ndx in range(self.length): res.append( vals[ n % lv ]) n = n / lv return res def _db_login(relogin = 0): """Login to the database""" global DB_CONN if relogin: DB_CONN = MySQLdb.connect(host=options["dbhost"], db=options["dbname"], user=options["dbuser"], passwd=options["dbpass"]) return DB_CONN else: #try: if 1==1: d = DB_CONN return d #except NameError: # DB_CONN = MySQLdb.connect(host=dbhost, db=dbname, user=dbuser, passwd=dbpass) # return DB_CONN def run_sql2(sql, param=None, n=0, with_desc=0): """ Runs SQL on the server and returns result""" db = _db_login(1) if param: param = tuple(param) try: cur = db.cursor() rc = cur.execute(sql, param) except: db = _db_login(relogin = 1) cur = db.cursor() rc = cur.execute(sql, param) if string.upper(string.split(sql)[0]) in ("SELECT", "SHOW", "DESC", "DESCRIBE"): if n: recset = cur.fetchmany(n) else: recset = cur.fetchall() if with_desc: return recset, cur.description else: return recset else: if string.upper(string.split(sql)[0]) == "INSERT": rc = cur.insert_id() return rc #def citationimpact(row): # return bibrank_engine(row) def accessimpact(row, run): return bibrank_engine(row, run) #def authorimpact(row): # return bibrank_engine(row) #def merge(row): -# return bibrank_engine(row) \ No newline at end of file +# return bibrank_engine(row) diff --git a/modules/bibrank/lib/bibrank_word_indexer.py b/modules/bibrank/lib/bibrank_word_indexer.py index 192df316f..cbb401156 100644 --- a/modules/bibrank/lib/bibrank_word_indexer.py +++ b/modules/bibrank/lib/bibrank_word_indexer.py @@ -1,1472 +1,1457 @@ - # $Id$ +## $Id$ ## BibRank word frequency indexer utility. ## This file is part of the CERN Document Server Software (CDSware). ## Copyright (C) 2002 CERN. ## ## The CDSware is free software; you can redistribute it and/or ## modify it under the terms of the GNU General Public License as ## published by the Free Software Foundation; either version 2 of the ## License, or (at your option) any later version. ## ## The CDSware is distributed in the hope that it will be useful, but ## WITHOUT ANY WARRANTY; without even the implied warranty of ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU ## General Public License for more details. ## ## You should have received a copy of the GNU General Public License ## along with CDSware; if not, write to the Free Software Foundation, Inc., ## 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA. ## read config variables: -#include "config.wml" -#include "configbis.wml" #include "cdswmllib.wml" -## start Python: -#! # -*- coding: utf-8 -*- ## $Id$ ## DO NOT EDIT THIS FILE! IT WAS AUTOMATICALLY GENERATED FROM CDSware WML SOURCES. __version__ = "<: print generate_pretty_version_string('$Id$'); :>" -## fill config variables: -pylibdir = "/python" - ## programs used to convert fulltext files to text: conv_programs = {#"ps": ["",""], # switched off at the moment, since PDF is faster #"ps.gz": ["",""], "pdf": ["","",""], "doc": ["","",""], "ppt": [""], "xls": [""] } ## helper programs used if the above programs convert only to html or other intermediate file formats: conv_programs_helpers = {"html": "", "gz": "" } ## okay, rest of the Python code goes below ####### -## import interesting modules: -try: - from zlib import compress,decompress - from string import split,translate,lower,upper - import marshal - import getopt - import getpass - import string - import os - import sre - import sys - import time - import MySQLdb - import Numeric - import urllib - import signal - import tempfile - import unicodedata - import traceback - import cStringIO - import math - import re - import ConfigParser -except ImportError, e: - import sys - -try: - sys.path.append('%s' % pylibdir) - from cdsware.config import * - from cdsware.search_engine_config import cfg_max_recID - from cdsware.search_engine import perform_request_search, strip_accents, HitSet - from cdsware.dbquery import run_sql - from cdsware.bibindex_engine_stemmer import is_stemmer_available_for_language, stem - from cdsware.bibindex_engine_stopwords import is_stopword -except ImportError, e: - import sys +from zlib import compress,decompress +from string import split,translate,lower,upper +import marshal +import getopt +import getpass +import string +import os +import sre +import sys +import time +import MySQLdb +import Numeric +import urllib +import signal +import tempfile +import unicodedata +import traceback +import cStringIO +import math +import re +import ConfigParser + +from config import * +from search_engine_config import cfg_max_recID +from search_engine import perform_request_search, strip_accents, HitSet +from dbquery import run_sql +from bibindex_engine_stemmer import is_stemmer_available_for_language, stem +from bibindex_engine_stopwords import is_stopword ## safety parameters concerning MySQL thread-multiplication problem: cfg_check_mysql_threads = 0 # to check or not to check the problem? cfg_max_mysql_threads = 50 # how many threads (connections) we consider as still safe cfg_mysql_thread_timeout = 20 # we'll kill threads that were sleeping for more than X seconds ## override urllib's default password-asking behaviour: class MyFancyURLopener(urllib.FancyURLopener): def prompt_user_passwd(self, host, realm): # supply some dummy credentials by default return ("mysuperuser", "mysuperpass") def http_error_401(self, url, fp, errcode, errmsg, headers): # do not bother with protected pages raise IOError, (999, 'unauthorized access') return None #urllib._urlopener = MyFancyURLopener() ## precompile some often-used regexp for speed reasons: re_subfields = sre.compile('\$\$\w'); nb_char_in_line = 50 # for verbose pretty printing chunksize = 1000 # default size of chunks that the records will be treated by wordTables = [] base_process_size = 4500 # process base size ## Dictionary merging functions def dict_union(list1, list2): "Returns union of the two dictionaries." union_dict = {} for (e, count) in list1.iteritems(): union_dict[e] = count for (e, count) in list2.iteritems(): if not union_dict.has_key(e): union_dict[e] = count else: union_dict[e] = (union_dict[e][0] + count[0], count[1]) #for (e, count) in list2.iteritems(): # list1[e] = (list1.get(e, (0, 0))[0] + count[0], count[1]) #return list1 return union_dict ## safety function for killing slow MySQL threads: def kill_sleepy_mysql_threads(max_threads=cfg_max_mysql_threads, thread_timeout=cfg_mysql_thread_timeout): """Check the number of MySQL threads and if there are more than MAX_THREADS of them, lill all threads that are in a sleeping state for more than THREAD_TIMEOUT seconds. (This is useful for working around the the max_connection problem that appears during indexation in some not-yet-understood cases.) If some threads are to be killed, write info into the log file. """ res = run_sql("SHOW FULL PROCESSLIST") if len(res) > max_threads: for row in res: r_id,r_user,r_host,r_db,r_command,r_time,r_state,r_info = row if r_command == "Sleep" and int(r_time) > thread_timeout: run_sql("KILL %s", (r_id,)) if options["verbose"] >= 1: write_message("WARNING: too many MySQL threads, killing thread %s" % r_id) return # tagToFunctions mapping. It offers an indirection level necesary for # indexing fulltext. The default is get_words_from_phrase tagToWordsFunctions = {} def get_words_from_phrase(phrase, weight, lang="", chars_punctuation=r"[\.\,\:\;\?\!\"]", chars_alphanumericseparators=r"[1234567890\!\"\#\$\%\&\'\(\)\*\+\,\-\.\/\:\;\<\=\>\?\@\[\\\]\^\_\`\{\|\}\~]", split=string.split): "Returns list of words from phrase 'phrase'." words = {} phrase = strip_accents(phrase) phrase = lower(phrase) #Getting rid of strange characters phrase = re.sub("é", 'e', phrase) phrase = re.sub("è", 'e', phrase) phrase = re.sub("à", 'a', phrase) phrase = re.sub(" ", ' ', phrase) phrase = re.sub("«", ' ', phrase) phrase = re.sub("»", ' ', phrase) phrase = re.sub("ê", ' ', phrase) phrase = re.sub("&", ' ', phrase) if string.find(phrase, " -1: #Most likely html, remove html code phrase = re.sub("(?s)<[^>]*>|&#?\w+;", ' ', phrase) #removes http links phrase = re.sub("(?s)http://[^( )]*", '', phrase) phrase = re.sub(chars_punctuation, ' ', phrase) #By doing this like below, characters standing alone, like c a b is not added to the inedx, but when they are together with characters like c++ or c$ they are added. for word in split(phrase): if options["remove_stopword"] == "True" and not is_stopword(word, 1) and check_term(word, 0): if lang and lang !="none" and options["use_stemming"]: word = stem(word, lang) if not words.has_key(word): words[word] = (0,0) words[word] = (words[word][0] + weight, 0) elif options["remove_stopword"] == "True" and not is_stopword(word, 1): phrase = re.sub(chars_alphanumericseparators, ' ', word) for word_ in split(phrase): if lang and lang !="none" and options["use_stemming"]: word_ = stem(word_, lang) if word_: if not words.has_key(word_): words[word_] = (0,0) words[word_] = (words[word_][0] + weight, 0) return words def split_ranges(parse_string): recIDs = [] ranges = string.split(parse_string, ",") for range in ranges: tmp_recIDs = string.split(range, "-") if len(tmp_recIDs)==1: recIDs.append([int(tmp_recIDs[0]), int(tmp_recIDs[0])]) else: if int(tmp_recIDs[0]) > int(tmp_recIDs[1]): # sanity check tmp = tmp_recIDs[0] tmp_recIDs[0] = tmp_recIDs[1] tmp_recIDs[1] = tmp recIDs.append([int(tmp_recIDs[0]), int(tmp_recIDs[1])]) return recIDs def get_date_range(var): "Returns the two dates contained as a low,high tuple" limits = string.split(var, ",") if len(limits)==1: low = get_date(limits[0]) return low,None if len(limits)==2: low = get_date(limits[0]) high = get_date(limits[1]) return low,high def get_datetime(var, format_string="%Y-%m-%d %H:%M:%S"): """Returns a date string according to the format string. It can handle normal date strings and shifts with respect to now.""" date = time.time() shift_re=sre.compile("([-\+]{0,1})([\d]+)([dhms])") factors = {"d":24*3600, "h":3600, "m":60, "s":1} m = shift_re.match(var) if m: sign = m.groups()[0] == "-" and -1 or 1 factor = factors[m.groups()[2]] value = float(m.groups()[1]) date = time.localtime(date + sign * factor * value) date = time.strftime(format_string, date) else: date = time.strptime(var, format_string) date = time.strftime(format_string, date) return date def create_range_list(res): """Creates a range list from a recID select query result contained in res. The result is expected to have ascending numerical order.""" if not res: return [] row = res[0] if not row: return [] else: range_list = [[row[0],row[0]]] for row in res[1:]: id = row[0] if id == range_list[-1][1] + 1: range_list[-1][1] = id else: range_list.append([id,id]) return range_list def beautify_range_list(range_list): """Returns a non overlapping, maximal range list""" ret_list = [] for new in range_list: found = 0 for old in ret_list: if new[0] <= old[0] <= new[1] + 1 or new[0] - 1 <= old[1] <= new[1]: old[0] = min(old[0], new[0]) old[1] = max(old[1], new[1]) found = 1 break if not found: ret_list.append(new) return ret_list def serialize_via_numeric_array_dumps(arr): return Numeric.dumps(arr) def serialize_via_numeric_array_compr(str): return compress(str) def serialize_via_numeric_array(arr): """Serialize Numeric array into a compressed string.""" return serialize_via_numeric_array_compr(serialize_via_numeric_array_dumps(arr)) def deserialize_via_numeric_array(string): """Decompress and deserialize string into a Numeric array.""" return Numeric.loads(decompress(string)) def serialize_via_marshal(obj): """Serialize Python object via marshal into a compressed string.""" return MySQLdb.escape_string(compress(marshal.dumps(obj))) def deserialize_via_marshal(string): """Decompress and deserialize string into a Python object via marshal.""" return marshal.loads(decompress(string)) class WordTable: "A class to hold the words table." def __init__(self, tablename, fields_to_index, separators="[^\s]"): "Creates words table instance." self.tablename = tablename self.recIDs_in_mem = [] self.fields_to_index = fields_to_index self.separators = separators self.value = {} def get_field(self, recID, tag): """Returns list of values of the MARC-21 'tag' fields for the record 'recID'.""" out = [] bibXXx = "bib" + tag[0] + tag[1] + "x" bibrec_bibXXx = "bibrec_" + bibXXx query = """SELECT value FROM %s AS b, %s AS bb WHERE bb.id_bibrec=%s AND bb.id_bibxxx=b.id AND tag LIKE '%s'""" % (bibXXx, bibrec_bibXXx, recID, tag); res = run_sql(query) for row in res: out.append(row[0]) return out def clean(self): "Cleans the words table." self.value={} def put_into_db(self, mode="normal", split=string.split): """Updates the current words table in the corresponding MySQL's rnkWORD table. Mode 'normal' means normal execution, mode 'emergency' means words index reverting to old state. """ if options["verbose"]: write_message("%s %s wordtable flush started" % (self.tablename,mode)) write_message('...updating %d words into %sR started' % \ (len(self.value), self.tablename[:-1])) task_update_progress("%s flushed %d/%d words" % (self.tablename, 0, len(self.value))) self.recIDs_in_mem = beautify_range_list(self.recIDs_in_mem) if mode == "normal": for group in self.recIDs_in_mem: query = """UPDATE %sR SET type='TEMPORARY' WHERE id_bibrec BETWEEN '%d' AND '%d' AND type='CURRENT'""" % \ (self.tablename[:-1], group[0], group[1]) if options["verbose"] >= 9: write_message(query) run_sql(query) nb_words_total = len(self.value) nb_words_report = int(nb_words_total/10) nb_words_done = 0 for word in self.value.keys(): self.put_word_into_db(word, self.value[word]) nb_words_done += 1 if nb_words_report!=0 and ((nb_words_done % nb_words_report) == 0): if options["verbose"]: write_message('......processed %d/%d words' % (nb_words_done, nb_words_total)) task_update_progress("%s flushed %d/%d words" % (self.tablename, nb_words_done, nb_words_total)) if options["verbose"] >= 9: write_message('...updating %d words into %s ended' % \ (nb_words_total, self.tablename)) #if options["verbose"]: # write_message('...updating reverse table %sR started' % self.tablename[:-1]) if mode == "normal": for group in self.recIDs_in_mem: query = """UPDATE %sR SET type='CURRENT' WHERE id_bibrec BETWEEN '%d' AND '%d' AND type='FUTURE'""" % \ (self.tablename[:-1], group[0], group[1]) if options["verbose"] >= 9: write_message(query) run_sql(query) query = """DELETE FROM %sR WHERE id_bibrec BETWEEN '%d' AND '%d' AND type='TEMPORARY'""" % \ (self.tablename[:-1], group[0], group[1]) if options["verbose"] >= 9: write_message(query) run_sql(query) if options["verbose"] >= 9: write_message('End of updating wordTable into %s' % self.tablename) elif mode == "emergency": write_message("emergency") for group in self.recIDs_in_mem: query = """UPDATE %sR SET type='CURRENT' WHERE id_bibrec BETWEEN '%d' AND '%d' AND type='TEMPORARY'""" % \ (self.tablename[:-1], group[0], group[1]) if options["verbose"] >= 9: write_message(query) run_sql(query) query = """DELETE FROM %sR WHERE id_bibrec BETWEEN '%d' AND '%d' AND type='FUTURE'""" % \ (self.tablename[:-1], group[0], group[1]) if options["verbose"] >= 9: write_message(query) run_sql(query) if options["verbose"] >= 9: write_message('End of emergency flushing wordTable into %s' % self.tablename) #if options["verbose"]: # write_message('...updating reverse table %sR ended' % self.tablename[:-1]) self.clean() self.recIDs_in_mem = [] if options["verbose"]: write_message("%s %s wordtable flush ended" % (self.tablename, mode)) task_update_progress("%s flush ended" % (self.tablename)) def load_old_recIDs(self,word): """Load existing hitlist for the word from the database index files.""" query = "SELECT hitlist FROM %s WHERE term=%%s" % self.tablename res = run_sql(query, (word,)) if res: return deserialize_via_marshal(res[0][0]) else: return None def merge_with_old_recIDs(self,word,recIDs, set): """Merge the system numbers stored in memory (hash of recIDs with value[0] > 0 or -1 according to whether to add/delete them) with those stored in the database index and received in set universe of recIDs for the given word. Return 0 in case no change was done to SET, return 1 in case SET was changed. """ set_changed_p = 0 for recID,sign in recIDs.iteritems(): if sign[0] == -1 and set.has_key(recID): # delete recID if existent in set and if marked as to be deleted del set[recID] set_changed_p = 1 elif sign[0] > -1 and not set.has_key(recID): # add recID if not existent in set and if marked as to be added set[recID] = sign set_changed_p = 1 elif sign[0] > -1 and sign[0] != set[recID][0]: set[recID] = sign set_changed_p = 1 return set_changed_p def put_word_into_db(self, word, recIDs, split=string.split): """Flush a single word to the database and delete it from memory""" set = self.load_old_recIDs(word) #write_message("%s %s" % (word, self.value[word])) if set: # merge the word recIDs found in memory: options["modified_words"][word] = 1 if self.merge_with_old_recIDs(word, recIDs, set) == 0: # nothing to update: if options["verbose"] >= 9: write_message("......... unchanged hitlist for ``%s''" % word) pass else: # yes there were some new words: if options["verbose"] >= 9: write_message("......... updating hitlist for ``%s''" % word) run_sql("UPDATE %s SET hitlist='%s' WHERE term='%s'" % (self.tablename, serialize_via_marshal(set), MySQLdb.escape_string(word))) else: # the word is new, will create new set: if options["verbose"] >= 9: write_message("......... inserting hitlist for ``%s''" % word) set = self.value[word] if len(set) > 0: #new word, add to list options["modified_words"][word] = 1 run_sql("INSERT INTO %s (term, hitlist) VALUES ('%s', '%s')" % (self.tablename, MySQLdb.escape_string(word), serialize_via_marshal(set))) if not set: # never store empty words run_sql("DELETE from %s WHERE term=%%s" % self.tablename, (word,)) del self.value[word] def display(self): "Displays the word table." keys = self.value.keys() keys.sort() for k in keys: if options["verbose"]: write_message("%s: %s" % (k, self.value[k])) def count(self): "Returns the number of words in the table." return len(self.value) def info(self): "Prints some information on the words table." if options["verbose"]: write_message("The words table contains %d words." % self.count()) def lookup_words(self, word=""): "Lookup word from the words table." if not word: done = 0 while not done: try: word = raw_input("Enter word: ") done = 1 except (EOFError, KeyboardInterrupt): return if self.value.has_key(word): if options["verbose"]: write_message("The word '%s' is found %d times." \ % (word, len(self.value[word]))) else: if options["verbose"]: write_message("The word '%s' does not exist in the word file."\ % word) def update_last_updated(self, rank_method_code, starting_time=None): """Update last_updated column of the index table in the database. Puts starting time there so that if the task was interrupted for record download, the records will be reindexed next time.""" if starting_time is None: return None if options["verbose"] >= 9: write_message("updating last_updated to %s...", starting_time) return run_sql("UPDATE rnkMETHOD SET last_updated=%s WHERE name=%s", (starting_time, rank_method_code,)) def add_recIDs(self, recIDs): """Fetches records which id in the recIDs range list and adds them to the wordTable. The recIDs range list is of the form: [[i1_low,i1_high],[i2_low,i2_high], ..., [iN_low,iN_high]]. """ global chunksize flush_count = 0 records_done = 0 records_to_go = 0 for range in recIDs: records_to_go = records_to_go + range[1] - range[0] + 1 time_started = time.time() # will measure profile time for range in recIDs: i_low = range[0] chunksize_count = 0 while i_low <= range[1]: # calculate chunk group of recIDs and treat it: i_high = min(i_low+options["flush"]-flush_count-1,range[1]) i_high = min(i_low+chunksize-chunksize_count-1, i_high) try: self.chk_recID_range(i_low, i_high) except StandardError, e: write_message("Exception caught: %s" % e, sys.stderr) if options["verbose"] >= 9: traceback.print_tb(sys.exc_info()[2]) task_update_status("ERROR") task_sig_stop_commands() sys.exit(1) if options["verbose"]: write_message("%s adding records #%d-#%d started" % \ (self.tablename, i_low, i_high)) if cfg_check_mysql_threads: kill_sleepy_mysql_threads() task_update_progress("%s adding recs %d-%d" % (self.tablename, i_low, i_high)) self.del_recID_range(i_low, i_high) just_processed = self.add_recID_range(i_low, i_high) flush_count = flush_count + i_high - i_low + 1 chunksize_count = chunksize_count + i_high - i_low + 1 records_done = records_done + just_processed if options["verbose"]: write_message("%s adding records #%d-#%d ended " % \ (self.tablename, i_low, i_high)) if chunksize_count >= chunksize: chunksize_count = 0 # flush if necessary: if flush_count >= options["flush"]: self.put_into_db() self.clean() if options["verbose"]: write_message("%s backing up" % (self.tablename)) flush_count = 0 self.log_progress(time_started,records_done,records_to_go) # iterate: i_low = i_high + 1 if flush_count > 0: self.put_into_db() self.log_progress(time_started,records_done,records_to_go) def add_date(self, date=""): # If date is not set, then retrieve it from the database. # Reindex all formats newer than the modification date if not date: write_message("Using the last update time for the rank method") id = self.tablename[len("bibindex"):] query = """SELECT last_updated FROM rnkMETHOD WHERE name='%s' """ % options["current_run"] res = run_sql(query) if not res: return if not res[0][0]: date = ("0000-00-00",'') else: date = (res[0][0],'') query = """SELECT b.id FROM bibrec AS b WHERE b.modification_date >= '%s'""" % date[0] if date[1]: query += "and b.modification_date <= '%s'" % date[1] query += "ORDER BY b.id ASC""" res = run_sql(query) list = create_range_list(res) if not list: if options["verbose"]: write_message( "No new records added. %s is up to date" % self.tablename) else: self.add_recIDs(list) return list def add_recID_range(self, recID1, recID2): empty_list_string = serialize_via_marshal([]) wlist = {} normalize = {} self.recIDs_in_mem.append([recID1,recID2]) # secondly fetch all needed tags: for (tag, weight, lang) in self.fields_to_index: if tag in tagToWordsFunctions.keys(): get_words_function = tagToWordsFunctions[ tag ] else: get_words_function = get_words_from_phrase bibXXx = "bib" + tag[0] + tag[1] + "x" bibrec_bibXXx = "bibrec_" + bibXXx query = """SELECT bb.id_bibrec,b.value FROM %s AS b, %s AS bb WHERE bb.id_bibrec BETWEEN %d AND %d AND bb.id_bibxxx=b.id AND tag LIKE '%s'""" % (bibXXx, bibrec_bibXXx, recID1, recID2, tag) res = run_sql(query) nb_total_to_read = len(res) verbose_idx = 0 # for verbose pretty printing for row in res: recID, phrase = row if options["validset"].contains(recID): if not wlist.has_key(recID): wlist[recID] = {} new_words = get_words_function(phrase, weight, lang) # ,self.separators wlist[recID] = dict_union(new_words,wlist[recID]) # were there some words for these recIDs found? if len(wlist) == 0: return 0 recIDs = wlist.keys() for recID in recIDs: # was this record marked as deleted? if "DELETED" in self.get_field(recID, "980__c"): wlist[recID] = {} if options["verbose"] >= 9: write_message("... record %d was declared deleted, removing its word list" % recID) if options["verbose"] >= 9: write_message("... record %d, termlist: %s" % (recID, wlist[recID])) query_factory = cStringIO.StringIO() qwrite = query_factory.write qwrite( "INSERT INTO %sR (id_bibrec,termlist,type) VALUES" % self.tablename[:-1]) qwrite( "('" ) qwrite( str(recIDs[0]) ) qwrite( "','" ) qwrite( serialize_via_marshal(wlist[recIDs[0]]) ) qwrite( "','FUTURE')" ) for recID in recIDs[1:]: qwrite(",('") qwrite(str(recID)) qwrite("','") qwrite(serialize_via_marshal(wlist[recID])) qwrite("','FUTURE')") query = query_factory.getvalue() query_factory.close() run_sql(query) query_factory = cStringIO.StringIO() qwrite = query_factory.write qwrite("INSERT INTO %sR (id_bibrec,termlist,type) VALUES" % self.tablename[:-1]) qwrite("('") qwrite(str(recIDs[0])) qwrite("','") qwrite(serialize_via_marshal(wlist[recIDs[0]])) qwrite("','CURRENT')") for recID in recIDs[1:]: qwrite( ",('" ) qwrite( str(recID) ) qwrite( "','" ) qwrite( empty_list_string ) qwrite( "','CURRENT')" ) query = query_factory.getvalue() query_factory.close() try: run_sql(query) except MySQLdb.DatabaseError: pass put = self.put for recID in recIDs: for (w, count) in wlist[recID].iteritems(): put(recID, w, count) return len(recIDs) def log_progress(self, start, done, todo): """Calculate progress and store it. start: start time, done: records processed, todo: total number of records""" time_elapsed = time.time() - start # consistency check if time_elapsed == 0 or done > todo: return time_recs_per_min = done/(time_elapsed/60.0) if options["verbose"]: write_message("%d records took %.1f seconds to complete.(%1.f recs/min)"\ % (done, time_elapsed, time_recs_per_min)) if time_recs_per_min: if options["verbose"]: write_message("Estimated runtime: %.1f minutes" % \ ((todo-done)/time_recs_per_min)) def put(self, recID, word, sign): "Adds/deletes a word to the word list." try: word = lower(word[:50]) if self.value.has_key(word): # the word 'word' exist already: update sign self.value[word][recID] = sign # PROBLEM ? else: self.value[word] = {recID: sign} except: write_message("Error: Cannot put word %s with sign %d for recID %s." % (word, sign, recID)) def del_recIDs(self, recIDs): """Fetches records which id in the recIDs range list and adds them to the wordTable. The recIDs range list is of the form: [[i1_low,i1_high],[i2_low,i2_high], ..., [iN_low,iN_high]]. """ count = 0 for range in recIDs: self.del_recID_range(range[0],range[1]) count = count + range[1] - range[0] self.put_into_db() def del_recID_range(self, low, high): """Deletes records with 'recID' system number between low and high from memory words index table.""" if options["verbose"] > 2: write_message("%s fetching existing words for records #%d-#%d started" % \ (self.tablename, low, high)) self.recIDs_in_mem.append([low,high]) query = """SELECT id_bibrec,termlist FROM %sR as bb WHERE bb.id_bibrec BETWEEN '%d' AND '%d'""" % (self.tablename[:-1], low, high) recID_rows = run_sql(query) for recID_row in recID_rows: recID = recID_row[0] wlist = deserialize_via_marshal(recID_row[1]) for word in wlist: self.put(recID, word, (-1, 0)) if options["verbose"] > 2: write_message("%s fetching existing words for records #%d-#%d ended" % \ (self.tablename, low, high)) def report_on_table_consistency(self): """Check reverse words index tables (e.g. rnkWORD01R) for interesting states such as 'TEMPORARY' state. Prints small report (no of words, no of bad words). """ # find number of words: query = """SELECT COUNT(*) FROM %s""" % (self.tablename) res = run_sql(query, None, 1) if res: nb_words = res[0][0] else: nb_words = 0 # find number of records: query = """SELECT COUNT(DISTINCT(id_bibrec)) FROM %sR""" % (self.tablename[:-1]) res = run_sql(query, None, 1) if res: nb_records = res[0][0] else: nb_records = 0 # report stats: if options["verbose"]: write_message("%s contains %d words from %d records" % (self.tablename, nb_words, nb_records)) # find possible bad states in reverse tables: query = """SELECT COUNT(DISTINCT(id_bibrec)) FROM %sR WHERE type <> 'CURRENT'""" % (self.tablename[:-1]) res = run_sql(query) if res: nb_bad_records = res[0][0] else: nb_bad_records = 999999999 if nb_bad_records: write_message("EMERGENCY: %s needs to repair %d of %d records" % \ (self.tablename, nb_bad_records, nb_records)) else: if options["verbose"]: write_message("%s is in consistent state" % (self.tablename)) return nb_bad_records def repair(self): """Repair the whole table""" # find possible bad states in reverse tables: query = """SELECT COUNT(DISTINCT(id_bibrec)) FROM %sR WHERE type <> 'CURRENT'""" % (self.tablename[:-1]) res = run_sql(query, None, 1) if res: nb_bad_records = res[0][0] else: nb_bad_records = 0 # find number of records: query = """SELECT COUNT(DISTINCT(id_bibrec)) FROM %sR""" % (self.tablename[:-1]) res = run_sql(query) if res: nb_records = res[0][0] else: nb_records = 0 if nb_bad_records == 0: return query = """SELECT id_bibrec FROM %sR WHERE type <> 'CURRENT' ORDER BY id_bibrec""" \ % (self.tablename[:-1]) res = run_sql(query) recIDs = create_range_list(res) flush_count = 0 records_done = 0 records_to_go = 0 for range in recIDs: records_to_go = records_to_go + range[1] - range[0] + 1 time_started = time.time() # will measure profile time for range in recIDs: i_low = range[0] chunksize_count = 0 while i_low <= range[1]: # calculate chunk group of recIDs and treat it: i_high = min(i_low+options["flush"]-flush_count-1,range[1]) i_high = min(i_low+chunksize-chunksize_count-1, i_high) try: self.fix_recID_range(i_low, i_high) except StandardError, e: write_message("Exception caught: %s" % e, sys.stderr) if options["verbose"] >= 9: traceback.print_tb(sys.exc_info()[2]) task_update_status("ERROR") task_sig_stop_commands() sys.exit(1) flush_count = flush_count + i_high - i_low + 1 chunksize_count = chunksize_count + i_high - i_low + 1 records_done = records_done + i_high - i_low + 1 if chunksize_count >= chunksize: chunksize_count = 0 # flush if necessary: if flush_count >= options["flush"]: self.put_into_db("emergency") self.clean() flush_count = 0 self.log_progress(time_started,records_done,records_to_go) # iterate: i_low = i_high + 1 if flush_count > 0: self.put_into_db("emergency") self.log_progress(time_started,records_done,records_to_go) write_message("%s inconsistencies repaired." % self.tablename) def chk_recID_range(self, low, high): """Check if the reverse index table is in proper state""" ## check db query = """SELECT COUNT(*) FROM %sR WHERE type <> 'CURRENT' AND id_bibrec BETWEEN '%d' AND '%d'""" % (self.tablename[:-1], low, high) res = run_sql(query, None, 1) if res[0][0]==0: if options["verbose"]: write_message("%s for %d-%d is in consistent state"%(self.tablename,low,high)) return # okay, words table is consistent ## inconsistency detected! write_message("EMERGENCY: %s inconsistencies detected..." % self.tablename) write_message("""EMERGENCY: Errors found. You should check consistency of the %s - %sR tables.\nRunning 'bibindex --repair' is recommended.""" \ % (self.tablename, self.tablename[:-1])) raise StandardError def fix_recID_range(self, low, high): """Try to fix reverse index database consistency (e.g. table rnkWORD01R) in the low,high doc-id range. Possible states for a recID follow: CUR TMP FUT: very bad things have happened: warn! CUR TMP : very bad things have happened: warn! CUR FUT: delete FUT (crash before flushing) CUR : database is ok TMP FUT: add TMP to memory and del FUT from memory flush (revert to old state) TMP : very bad things have happened: warn! FUT: very bad things have happended: warn! """ state = {} query = "SELECT id_bibrec,type FROM %sR WHERE id_bibrec BETWEEN '%d' AND '%d'"\ % (self.tablename[:-1], low, high) res = run_sql(query) for row in res: if not state.has_key(row[0]): state[row[0]]=[] state[row[0]].append(row[1]) ok = 1 # will hold info on whether we will be able to repair for recID in state.keys(): if not 'TEMPORARY' in state[recID]: if 'FUTURE' in state[recID]: if 'CURRENT' not in state[recID]: write_message("EMERGENCY: Record %d is in inconsistent state. Can't repair it" % recID) ok = 0 else: write_message("EMERGENCY: Inconsistency in record %d detected" % recID) query = """DELETE FROM %sR WHERE id_bibrec='%d'""" % (self.tablename[:-1], recID) run_sql(query) write_message("EMERGENCY: Inconsistency in record %d repaired." % recID) else: if 'FUTURE' in state[recID] and not 'CURRENT' in state[recID]: self.recIDs_in_mem.append([recID,recID]) # Get the words file query = """SELECT type,termlist FROM %sR WHERE id_bibrec='%d'""" % (self.tablename[:-1], recID) if options["verbose"] >= 9: write_message(query) res = run_sql(query) for row in res: wlist = deserialize_via_marshal(row[1]) if options["verbose"] >= 9: write_message("Words are %s " % wlist) if row[0] == 'TEMPORARY': sign = 1 else: sign = -1 for word in wlist: self.put(recID, word, wlist[word]) else: write_message("EMERGENCY: %s for %d is in inconsistent state. Couldn't repair it." % (self.tablename, recID)) ok = 0 if not ok: write_message("""EMERGENCY: Unrepairable errors found. You should check consistency of the %s - %sR tables. Deleting affected records is recommended.""" % (self.tablename, self.tablename[:-1])) raise StandardError def word_index(row, run): """Run the indexing task. The row argument is the BibSched task queue row, containing if, arguments, etc. Return 1 in case of success and 0 in case of failure. """ ## import optional modules: try: import psyco psyco.bind(get_words_from_phrase) psyco.bind(WordTable.merge_with_old_recIDs) psyco.bind(serialize_via_numeric_array) psyco.bind(serialize_via_marshal) psyco.bind(deserialize_via_numeric_array) psyco.bind(deserialize_via_marshal) psyco.bind(update_rnkWORD) psyco.bind(check_rnkWORD) except StandardError,e: print "Warning: Psyco", e pass global options, task_id, wordTables, languages # read from SQL row: task_id = row[0] task_proc = row[1] options = marshal.loads(row[6]) # install signal handlers signal.signal(signal.SIGUSR1, task_sig_sleep) signal.signal(signal.SIGTERM, task_sig_stop) signal.signal(signal.SIGABRT, task_sig_suicide) signal.signal(signal.SIGCONT, task_sig_wakeup) signal.signal(signal.SIGINT, task_sig_unknown) ## go ahead and treat each table: options["run"] = [] options["run"].append(run) for rank_method_code in options["run"]: method_starting_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) write_message("Running rank method: %s" % getName(rank_method_code)) try: file = etcdir + "/bibrank/" + rank_method_code + ".cfg" config = ConfigParser.ConfigParser() config.readfp(open(file)) except StandardError, e: write_message("Cannot find configurationfile: %s" % file, sys.stderr) raise StandardError options["current_run"] = rank_method_code options["modified_words"] = {} options["table"] = config.get(config.get("rank_method", "function"), "table") options["use_stemming"] = config.get(config.get("rank_method","function"),"stemming") options["remove_stopword"] = config.get(config.get("rank_method","function"),"stopword") tags = get_tags(config) #get the tags to include options["validset"] = get_valid_range(rank_method_code) #get the records from the collections the method is enabled for function = config.get("rank_method","function") wordTable = WordTable(options["table"], tags) wordTable.report_on_table_consistency() try: if options["cmd"] == "del": if options["id"]: wordTable.del_recIDs(options["id"]) elif options["collection"]: l_of_colls = string.split(options["collection"], ",") recIDs = perform_request_search(c=l_of_colls) recIDs_range = [] for recID in recIDs: recIDs_range.append([recID,recID]) wordTable.del_recIDs(recIDs_range) else: write_message("Missing IDs of records to delete from index %s.", wordTable.tablename, sys.stderr) raise StandardError elif options["cmd"] == "add": if options["id"]: wordTable.add_recIDs(options["id"]) elif options["collection"]: l_of_colls = string.split(options["collection"], ",") recIDs = perform_request_search(c=l_of_colls) recIDs_range = [] for recID in recIDs: recIDs_range.append([recID,recID]) wordTable.add_recIDs(recIDs_range) elif options["last_updated"]: wordTable.add_date("") wordTable.update_last_updated(rank_method_code, method_starting_time) else: wordTable.add_recIDs([[0,cfg_max_recID]]) #wordTable.add_date(options["modified"]) # only update last_updated if run via automatic mode: elif options["cmd"] == "repair": wordTable.repair() check_rnkWORD(options["table"]) elif options["cmd"] == "check": check_rnkWORD(options["table"]) options["modified_words"] = {} elif options["cmd"] == "stat": rank_method_code_statistics(options["table"]) else: write_message("Invalid command found processing %s" % \ wordTable.tablename, sys.stderr) raise StandardError update_rnkWORD(options["table"], options["modified_words"]) except StandardError, e: write_message("Exception caught: %s" % e, sys.stderr) if options["verbose"] >= 9: traceback.print_tb(sys.exc_info()[2]) sys.exit(1) wordTable.report_on_table_consistency() # We are done. State it in the database, close and quit return 1 def get_tags(config): """Get the tags that should be used creating the index and each tag's parameter""" tags = [] function = config.get("rank_method","function") i = 1 shown_error = 0 #try: if 1: while config.has_option(function,"tag%s"% i): tag = config.get(function, "tag%s" % i) tag = string.split(tag, ",") tag[1] = int(string.strip(tag[1])) tag[2] = string.strip(tag[2]) #check if stemmer for language is available if config.get(function,"stemming") and stem("information", "en") != "inform": if shown_error == 0: write_message("Warning: PyStemmer not found. Please read INSTALL.") shown_error = 1 elif tag[2] and tag[2] != "none" and config.get(function,"stemming") and not is_stemmer_available_for_language(tag[2]): write_message("Warning: Language '%s' not available in PyStemmer." % tag[2]) tags.append(tag) i += 1 #except Exception: # write_message("Could not read data from configuration file, please check for errors") # raise StandardError return tags def get_valid_range(rank_method_code): """Returns which records are valid for this rank method, according to which collections it is enabled for.""" #if options["verbose"] >=9: # write_message("Getting records from collections enabled for rank method.") #res = run_sql("SELECT collection.name FROM collection,collection_rnkMETHOD,rnkMETHOD WHERE collection.id=id_collection and id_rnkMETHOD=rnkMETHOD.id and rnkMETHOD.name='%s'" % rank_method_code) #l_of_colls = [] #for coll in res: # l_of_colls.append(coll[0]) #if len(l_of_colls) > 0: # recIDs = perform_request_search(c=l_of_colls) #else: # recIDs = [] valid = HitSet(Numeric.ones(cfg_max_recID+1, Numeric.Int0)) #valid.addlist(recIDs) return valid def write_message(msg, stream=sys.stdout): """Prints message and flush output stream (may be sys.stdout or sys.stderr).""" if stream == sys.stdout or stream == sys.stderr: stream.write(time.strftime("%Y-%m-%d %H:%M:%S --> ", time.localtime())) stream.write("%s\n" % msg) stream.flush() else: sys.stderr.write("Unknown stream %s. [must be sys.stdout or sys.stderr]\n" % stream) def check_term(term, termlength): """Check if term contains not allowed characters, or for any other reasons for not using this term.""" try: if len(term) <= termlength: return False reg = re.compile(r"[1234567890\!\"\#\$\%\&\'\(\)\*\+\,\-\.\/\:\;\<\=\>\?\@\[\\\]\^\_\`\{\|\}\~]") if re.search(reg, term): return False term = str.replace(term, "-", "") term = str.replace(term, ".", "") term = str.replace(term, ",", "") if int(term): return False except StandardError, e: pass return True def check_rnkWORD(table): """Checks for any problems in rnkWORD tables.""" i = 0 errors = {} termslist = run_sql("SELECT term FROM %s" % table) N = run_sql("select max(id_bibrec) from %sR" % table[:-1])[0][0] write_message("Checking integrity of rank values in %s" % table) terms = map(lambda x: x[0], termslist) while i < len(terms): current_terms = "" for j in range(i, ((i+5000)< len(terms) and (i+5000) or len(terms))): current_terms += "'%s'," % terms[j] terms_docs = run_sql("SELECT term, hitlist FROM %s WHERE term in (%s)" % (table, current_terms[:-1])) for (t, hitlist) in terms_docs: term_docs = deserialize_via_marshal(hitlist) if (term_docs.has_key("Gi") and term_docs["Gi"][1] == 0) or not term_docs.has_key("Gi"): write_message("ERROR: Missing value for term: %s (%s) in %s: %s" % (t, repr(t), table, len(term_docs))) errors[t] = 1 i += 5000 write_message("Checking integrity of rank values in %sR" % table[:-1]) i = 0 while i < N: docs_terms = run_sql("SELECT id_bibrec, termlist FROM %sR WHERE id_bibrec>=%s and id_bibrec<=%s" % (table[:-1], i, i+5000)) for (j, termlist) in docs_terms: termlist = deserialize_via_marshal(termlist) for (t, tf) in termlist.iteritems(): if tf[1] == 0 and not errors.has_key(t): errors[t] = 1 write_message("ERROR: Gi missing for record %s and term: %s (%s) in %s" % (j,t,repr(t), table)) terms_docs = run_sql("SELECT term, hitlist FROM %s WHERE term='%s'" % (table, t)) termlist = deserialize_via_marshal(terms_docs[0][1]) i += 5000 if len(errors) == 0: write_message("No direct errors found, but nonconsistent data may exist.") else: write_message("%s errors found during integrity check, repair and rebalancing recommended." % len(errors)) options["modified_words"] = errors def rank_method_code_statistics(table): """Shows some statistics about this rank method.""" maxID = run_sql("select max(id) from %s" % table) maxID = maxID[0][0] terms = {} Gi = {} write_message("Showing statistics of terms in index:") write_message("Important: For the 'Least used terms', the number of terms is shown first, and the number of occurences second.") write_message("Least used terms---Most important terms---Least important terms") i = 0 while i < maxID: terms_docs=run_sql("SELECT term, hitlist FROM %s WHERE id>= %s and id < %s" % (table, i, i + 10000)) for (t, hitlist) in terms_docs: term_docs=deserialize_via_marshal(hitlist) terms[len(term_docs)] = terms.get(len(term_docs), 0) + 1 if term_docs.has_key("Gi"): Gi[t] = term_docs["Gi"] i=i + 10000 terms=terms.items() terms.sort(lambda x, y: cmp(y[1], x[1])) Gi=Gi.items() Gi.sort(lambda x, y: cmp(y[1], x[1])) for i in range(0, 20): write_message("%s/%s---%s---%s" % (terms[i][0],terms[i][1], Gi[i][0],Gi[len(Gi) - i - 1][0])) def update_rnkWORD(table, terms): """Updates rnkWORDF and rnkWORDR with Gi and Nj values. For each term in rnkWORDF, a Gi value for the term is added. And for each term in each document, the Nj value for that document is added. In rnkWORDR, the Gi value for each term in each document is added. For description on how things are computed, look in the hacking docs. table - name of forward index to update terms - modified terms""" stime = time.time() Gi = {} Nj = {} N = run_sql("select count(id_bibrec) from %sR" % table[:-1])[0][0] write_message("Beginning post-processing of %s terms" % len(terms)) if len(terms) == 0: write_message("No terms to process, ending...") return "" #Locating all documents related to the modified/new/deleted terms, if fast update, #only take into account new/modified occurences write_message("Phase 1: Finding records containing modified terms") terms = terms.keys() i = 0 while i < len(terms): terms_docs = get_from_forward_index(terms, i, (i+5000), table) for (t, hitlist) in terms_docs: term_docs = deserialize_via_marshal(hitlist) if term_docs.has_key("Gi"): del term_docs["Gi"] for (j, tf) in term_docs.iteritems(): if (options["quick"] == "yes" and tf[1] == 0) or options["quick"] == "no": Nj[j] = 0 write_message("Phase 1: ......processed %s/%s terms" % ((i+5000>len(terms) and len(terms) or (i+5000)), len(terms))) i += 5000 write_message("Phase 1: Finished finding records containing modified terms") #Find all terms in the records found in last phase write_message("Phase 2: Finding all terms in affected records") records = Nj.keys() i = 0 while i < len(records): docs_terms = get_from_reverse_index(records, i, (i + 5000), table) for (j, termlist) in docs_terms: doc_terms = deserialize_via_marshal(termlist) for (t, tf) in doc_terms.iteritems(): Gi[t] = 0 write_message("Phase 2: ......processed %s/%s records " % ((i+5000>len(records) and len(records) or (i+5000)), len(records))) i += 5000 write_message("Phase 2: Finished finding all terms in affected records") terms = Gi.keys() Gi = {} i = 0 if options["quick"] == "no": #Calculating Fi and Gi value for each term write_message("Phase 3: Calculating importance of all affected terms") while i < len(terms): terms_docs = get_from_forward_index(terms, i, (i+5000), table) for (t, hitlist) in terms_docs: term_docs = deserialize_via_marshal(hitlist) if term_docs.has_key("Gi"): del term_docs["Gi"] Fi = 0 Gi[t] = 1 for (j, tf) in term_docs.iteritems(): Fi += tf[0] for (j, tf) in term_docs.iteritems(): if tf[0] != Fi: Gi[t] = Gi[t] + ((float(tf[0]) / Fi) * math.log(float(tf[0]) / Fi) / math.log(2)) / math.log(N) write_message("Phase 3: ......processed %s/%s terms" % ((i+5000>len(terms) and len(terms) or (i+5000)), len(terms))) i += 5000 write_message("Phase 3: Finished calculating importance of all affected terms") else: #Using existing Gi value instead of calculating a new one. Missing some accurancy. write_message("Phase 3: Getting approximate importance of all affected terms") while i < len(terms): terms_docs = get_from_forward_index(terms, i, (i+5000), table) for (t, hitlist) in terms_docs: term_docs = deserialize_via_marshal(hitlist) if term_docs.has_key("Gi"): Gi[t] = term_docs["Gi"][1] elif len(term_docs) == 1: Gi[t] = 1 else: Fi = 0 Gi[t] = 1 for (j, tf) in term_docs.iteritems(): Fi += tf[0] for (j, tf) in term_docs.iteritems(): if tf[0] != Fi: Gi[t] = Gi[t] + ((float(tf[0]) / Fi) * math.log(float(tf[0]) / Fi) / math.log(2)) / math.log(N) write_message("Phase 3: ......processed %s/%s terms" % ((i+5000>len(terms) and len(terms) or (i+5000)), len(terms))) i += 5000 write_message("Phase 3: Finished getting approximate importance of all affected terms") write_message("Phase 4: Calculating normalization value for all affected records and updating %sR" % table[:-1]) records = Nj.keys() i = 0 while i < len(records): #Calculating the normalization value for each document, and adding the Gi value to each term in each document. docs_terms = get_from_reverse_index(records, i, (i + 5000), table) for (j, termlist) in docs_terms: doc_terms = deserialize_via_marshal(termlist) for (t, tf) in doc_terms.iteritems(): if Gi.has_key(t): Nj[j] = Nj.get(j, 0) + math.pow(Gi[t] * (1 + math.log(tf[0])), 2) Git = int(math.floor(Gi[t]*100)) if Git >= 0: Git += 1 doc_terms[t] = (tf[0], Git) else: Nj[j] = Nj.get(j, 0) + math.pow(tf[1] * (1 + math.log(tf[0])), 2) Nj[j] = 1.0 / math.sqrt(Nj[j]) Nj[j] = int(Nj[j] * 100) if Nj[j] >= 0: Nj[j] += 1 run_sql("UPDATE %sR SET termlist='%s' WHERE id_bibrec=%s" % (table[:-1], serialize_via_marshal(doc_terms), j)) write_message("Phase 4: ......processed %s/%s records" % ((i+5000>len(records) and len(records) or (i+5000)), len(records))) i += 5000 write_message("Phase 4: Finished calculating normalization value for all affected records and updating %sR" % table[:-1]) write_message("Phase 5: Updating %s with new normalization values" % table) i = 0 terms = Gi.keys() while i < len(terms): #Adding the Gi value to each term, and adding the normalization value to each term in each document. terms_docs = get_from_forward_index(terms, i, (i+5000), table) for (t, hitlist) in terms_docs: term_docs = deserialize_via_marshal(hitlist) if term_docs.has_key("Gi"): del term_docs["Gi"] for (j, tf) in term_docs.iteritems(): if Nj.has_key(j): term_docs[j] = (tf[0], Nj[j]) Git = int(math.floor(Gi[t]*100)) if Git >= 0: Git += 1 term_docs["Gi"] = (0, Git) run_sql("UPDATE %s SET hitlist='%s' WHERE term='%s'" % (table, serialize_via_marshal(term_docs), MySQLdb.escape_string(t))) write_message("Phase 5: ......processed %s/%s terms" % ((i+5000>len(terms) and len(terms) or (i+5000)), len(terms))) i += 5000 write_message("Phase 5: Finished updating %s with new normalization values" % table) write_message("Time used for post-processing: %.1fmin" % ((time.time() - stime) / 60)) write_message("Finished post-processing") def get_from_forward_index(terms, start, stop, table): current_terms = "" for j in range(start, (stop < len(terms) and stop or len(terms))): current_terms += "'%s'," % terms[j] terms_docs = run_sql("SELECT term, hitlist FROM %s WHERE term IN (%s)" % (table,current_terms[:-1])) return terms_docs def get_from_reverse_index(records, start, stop, table): current_recs = "%s" % records[start:stop] current_recs = current_recs[1:-1] docs_terms = run_sql("SELECT id_bibrec, termlist FROM %sR WHERE id_bibrec IN (%s)" % (table[:-1],current_recs)) return docs_terms def test_word_separators(phrase="hep-th/0101001"): """Tests word separating policy on various input.""" print "%s:" % phrase gwfp = get_words_from_phrase(phrase) for (word, count) in gwfp.iteritems(): print "\t-> %s - %s" % (word, count) def task_sig_sleep(sig, frame): """Signal handler for the 'sleep' signal sent by BibSched.""" if options["verbose"] >= 9: write_message("got signal %d" % sig) write_message("sleeping...") task_update_status("SLEEPING") signal.pause() # wait for wake-up signal def task_sig_wakeup(sig, frame): """Signal handler for the 'wakeup' signal sent by BibSched.""" if options["verbose"] >= 9: write_message("got signal %d" % sig) write_message("continuing...") task_update_status("CONTINUING") def task_sig_stop(sig, frame): """Signal handler for the 'stop' signal sent by BibSched.""" if options["verbose"] >= 9: write_message("got signal %d" % sig) write_message("stopping...") task_update_status("STOPPING") errcode = 0 try: task_sig_stop_commands() write_message("stopped") task_update_status("STOPPED") except StandardError, err: write_message("Error during stopping! %e" % err) task_update_status("STOPPINGFAILED") errcode = 1 sys.exit(errcode) def task_sig_stop_commands(): """Do all the commands necessary to stop the task before quitting. Useful for task_sig_stop() handler. """ write_message("stopping commands started") for table in wordTables: table.put_into_db() write_message("stopping commands ended") def task_sig_suicide(sig, frame): """Signal handler for the 'suicide' signal sent by BibSched.""" if options["verbose"] >= 9: write_message("got signal %d" % sig) write_message("suiciding myself now...") task_update_status("SUICIDING") write_message("suicided") task_update_status("SUICIDED") sys.exit(0) def task_sig_unknown(sig, frame): """Signal handler for the other unknown signals sent by shell or user.""" if options["verbose"] >= 9: write_message("got signal %d" % sig) write_message("unknown signal %d ignored" % sig) # do nothing for other signals def task_update_progress(msg): """Updates progress information in the BibSched task table.""" global task_id, options if options["verbose"] >= 9: write_message("Updating task progress to %s." % msg) return run_sql("UPDATE schTASK SET progress=%s where id=%s", (msg, task_id)) def task_update_status(val): """Updates state information in the BibSched task table.""" global task_id, options if options["verbose"] >= 9: write_message("Updating task status to %s." % val) return run_sql("UPDATE schTASK SET status=%s where id=%s", (val, task_id)) def getName(methname, ln=cdslang, type='ln'): """Returns the name of the rank method, either in default language or given language. methname = short name of the method ln - the language to get the name in type - which name "type" to get.""" try: rnkid = run_sql("SELECT id FROM rnkMETHOD where name='%s'" % methname) if rnkid: rnkid = str(rnkid[0][0]) res = run_sql("SELECT value FROM rnkMETHODNAME where type='%s' and ln='%s' and id_rnkMETHOD=%s" % (type, ln, rnkid)) if not res: res = run_sql("SELECT value FROM rnkMETHODNAME WHERE ln='%s' and id_rnkMETHOD=%s and type='%s'" % (cdslang, rnkid, type)) if not res: return methname return res[0][0] else: raise Exception except Exception, e: write_message("Cannot run rank method, either given code for method is wrong, or it has not been added using the webinterface.") raise Exception def word_similarity(row, run): """Call correct method""" return word_index(row, run) diff --git a/modules/bibrank/lib/bibrank_word_indexer.py.wml b/modules/bibrank/lib/bibrank_word_indexer.py.wml index 192df316f..cbb401156 100644 --- a/modules/bibrank/lib/bibrank_word_indexer.py.wml +++ b/modules/bibrank/lib/bibrank_word_indexer.py.wml @@ -1,1472 +1,1457 @@ - # $Id$ +## $Id$ ## BibRank word frequency indexer utility. ## This file is part of the CERN Document Server Software (CDSware). ## Copyright (C) 2002 CERN. ## ## The CDSware is free software; you can redistribute it and/or ## modify it under the terms of the GNU General Public License as ## published by the Free Software Foundation; either version 2 of the ## License, or (at your option) any later version. ## ## The CDSware is distributed in the hope that it will be useful, but ## WITHOUT ANY WARRANTY; without even the implied warranty of ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU ## General Public License for more details. ## ## You should have received a copy of the GNU General Public License ## along with CDSware; if not, write to the Free Software Foundation, Inc., ## 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA. ## read config variables: -#include "config.wml" -#include "configbis.wml" #include "cdswmllib.wml" -## start Python: -#! # -*- coding: utf-8 -*- ## $Id$ ## DO NOT EDIT THIS FILE! IT WAS AUTOMATICALLY GENERATED FROM CDSware WML SOURCES. __version__ = "<: print generate_pretty_version_string('$Id$'); :>" -## fill config variables: -pylibdir = "/python" - ## programs used to convert fulltext files to text: conv_programs = {#"ps": ["",""], # switched off at the moment, since PDF is faster #"ps.gz": ["",""], "pdf": ["","",""], "doc": ["","",""], "ppt": [""], "xls": [""] } ## helper programs used if the above programs convert only to html or other intermediate file formats: conv_programs_helpers = {"html": "", "gz": "" } ## okay, rest of the Python code goes below ####### -## import interesting modules: -try: - from zlib import compress,decompress - from string import split,translate,lower,upper - import marshal - import getopt - import getpass - import string - import os - import sre - import sys - import time - import MySQLdb - import Numeric - import urllib - import signal - import tempfile - import unicodedata - import traceback - import cStringIO - import math - import re - import ConfigParser -except ImportError, e: - import sys - -try: - sys.path.append('%s' % pylibdir) - from cdsware.config import * - from cdsware.search_engine_config import cfg_max_recID - from cdsware.search_engine import perform_request_search, strip_accents, HitSet - from cdsware.dbquery import run_sql - from cdsware.bibindex_engine_stemmer import is_stemmer_available_for_language, stem - from cdsware.bibindex_engine_stopwords import is_stopword -except ImportError, e: - import sys +from zlib import compress,decompress +from string import split,translate,lower,upper +import marshal +import getopt +import getpass +import string +import os +import sre +import sys +import time +import MySQLdb +import Numeric +import urllib +import signal +import tempfile +import unicodedata +import traceback +import cStringIO +import math +import re +import ConfigParser + +from config import * +from search_engine_config import cfg_max_recID +from search_engine import perform_request_search, strip_accents, HitSet +from dbquery import run_sql +from bibindex_engine_stemmer import is_stemmer_available_for_language, stem +from bibindex_engine_stopwords import is_stopword ## safety parameters concerning MySQL thread-multiplication problem: cfg_check_mysql_threads = 0 # to check or not to check the problem? cfg_max_mysql_threads = 50 # how many threads (connections) we consider as still safe cfg_mysql_thread_timeout = 20 # we'll kill threads that were sleeping for more than X seconds ## override urllib's default password-asking behaviour: class MyFancyURLopener(urllib.FancyURLopener): def prompt_user_passwd(self, host, realm): # supply some dummy credentials by default return ("mysuperuser", "mysuperpass") def http_error_401(self, url, fp, errcode, errmsg, headers): # do not bother with protected pages raise IOError, (999, 'unauthorized access') return None #urllib._urlopener = MyFancyURLopener() ## precompile some often-used regexp for speed reasons: re_subfields = sre.compile('\$\$\w'); nb_char_in_line = 50 # for verbose pretty printing chunksize = 1000 # default size of chunks that the records will be treated by wordTables = [] base_process_size = 4500 # process base size ## Dictionary merging functions def dict_union(list1, list2): "Returns union of the two dictionaries." union_dict = {} for (e, count) in list1.iteritems(): union_dict[e] = count for (e, count) in list2.iteritems(): if not union_dict.has_key(e): union_dict[e] = count else: union_dict[e] = (union_dict[e][0] + count[0], count[1]) #for (e, count) in list2.iteritems(): # list1[e] = (list1.get(e, (0, 0))[0] + count[0], count[1]) #return list1 return union_dict ## safety function for killing slow MySQL threads: def kill_sleepy_mysql_threads(max_threads=cfg_max_mysql_threads, thread_timeout=cfg_mysql_thread_timeout): """Check the number of MySQL threads and if there are more than MAX_THREADS of them, lill all threads that are in a sleeping state for more than THREAD_TIMEOUT seconds. (This is useful for working around the the max_connection problem that appears during indexation in some not-yet-understood cases.) If some threads are to be killed, write info into the log file. """ res = run_sql("SHOW FULL PROCESSLIST") if len(res) > max_threads: for row in res: r_id,r_user,r_host,r_db,r_command,r_time,r_state,r_info = row if r_command == "Sleep" and int(r_time) > thread_timeout: run_sql("KILL %s", (r_id,)) if options["verbose"] >= 1: write_message("WARNING: too many MySQL threads, killing thread %s" % r_id) return # tagToFunctions mapping. It offers an indirection level necesary for # indexing fulltext. The default is get_words_from_phrase tagToWordsFunctions = {} def get_words_from_phrase(phrase, weight, lang="", chars_punctuation=r"[\.\,\:\;\?\!\"]", chars_alphanumericseparators=r"[1234567890\!\"\#\$\%\&\'\(\)\*\+\,\-\.\/\:\;\<\=\>\?\@\[\\\]\^\_\`\{\|\}\~]", split=string.split): "Returns list of words from phrase 'phrase'." words = {} phrase = strip_accents(phrase) phrase = lower(phrase) #Getting rid of strange characters phrase = re.sub("é", 'e', phrase) phrase = re.sub("è", 'e', phrase) phrase = re.sub("à", 'a', phrase) phrase = re.sub(" ", ' ', phrase) phrase = re.sub("«", ' ', phrase) phrase = re.sub("»", ' ', phrase) phrase = re.sub("ê", ' ', phrase) phrase = re.sub("&", ' ', phrase) if string.find(phrase, " -1: #Most likely html, remove html code phrase = re.sub("(?s)<[^>]*>|&#?\w+;", ' ', phrase) #removes http links phrase = re.sub("(?s)http://[^( )]*", '', phrase) phrase = re.sub(chars_punctuation, ' ', phrase) #By doing this like below, characters standing alone, like c a b is not added to the inedx, but when they are together with characters like c++ or c$ they are added. for word in split(phrase): if options["remove_stopword"] == "True" and not is_stopword(word, 1) and check_term(word, 0): if lang and lang !="none" and options["use_stemming"]: word = stem(word, lang) if not words.has_key(word): words[word] = (0,0) words[word] = (words[word][0] + weight, 0) elif options["remove_stopword"] == "True" and not is_stopword(word, 1): phrase = re.sub(chars_alphanumericseparators, ' ', word) for word_ in split(phrase): if lang and lang !="none" and options["use_stemming"]: word_ = stem(word_, lang) if word_: if not words.has_key(word_): words[word_] = (0,0) words[word_] = (words[word_][0] + weight, 0) return words def split_ranges(parse_string): recIDs = [] ranges = string.split(parse_string, ",") for range in ranges: tmp_recIDs = string.split(range, "-") if len(tmp_recIDs)==1: recIDs.append([int(tmp_recIDs[0]), int(tmp_recIDs[0])]) else: if int(tmp_recIDs[0]) > int(tmp_recIDs[1]): # sanity check tmp = tmp_recIDs[0] tmp_recIDs[0] = tmp_recIDs[1] tmp_recIDs[1] = tmp recIDs.append([int(tmp_recIDs[0]), int(tmp_recIDs[1])]) return recIDs def get_date_range(var): "Returns the two dates contained as a low,high tuple" limits = string.split(var, ",") if len(limits)==1: low = get_date(limits[0]) return low,None if len(limits)==2: low = get_date(limits[0]) high = get_date(limits[1]) return low,high def get_datetime(var, format_string="%Y-%m-%d %H:%M:%S"): """Returns a date string according to the format string. It can handle normal date strings and shifts with respect to now.""" date = time.time() shift_re=sre.compile("([-\+]{0,1})([\d]+)([dhms])") factors = {"d":24*3600, "h":3600, "m":60, "s":1} m = shift_re.match(var) if m: sign = m.groups()[0] == "-" and -1 or 1 factor = factors[m.groups()[2]] value = float(m.groups()[1]) date = time.localtime(date + sign * factor * value) date = time.strftime(format_string, date) else: date = time.strptime(var, format_string) date = time.strftime(format_string, date) return date def create_range_list(res): """Creates a range list from a recID select query result contained in res. The result is expected to have ascending numerical order.""" if not res: return [] row = res[0] if not row: return [] else: range_list = [[row[0],row[0]]] for row in res[1:]: id = row[0] if id == range_list[-1][1] + 1: range_list[-1][1] = id else: range_list.append([id,id]) return range_list def beautify_range_list(range_list): """Returns a non overlapping, maximal range list""" ret_list = [] for new in range_list: found = 0 for old in ret_list: if new[0] <= old[0] <= new[1] + 1 or new[0] - 1 <= old[1] <= new[1]: old[0] = min(old[0], new[0]) old[1] = max(old[1], new[1]) found = 1 break if not found: ret_list.append(new) return ret_list def serialize_via_numeric_array_dumps(arr): return Numeric.dumps(arr) def serialize_via_numeric_array_compr(str): return compress(str) def serialize_via_numeric_array(arr): """Serialize Numeric array into a compressed string.""" return serialize_via_numeric_array_compr(serialize_via_numeric_array_dumps(arr)) def deserialize_via_numeric_array(string): """Decompress and deserialize string into a Numeric array.""" return Numeric.loads(decompress(string)) def serialize_via_marshal(obj): """Serialize Python object via marshal into a compressed string.""" return MySQLdb.escape_string(compress(marshal.dumps(obj))) def deserialize_via_marshal(string): """Decompress and deserialize string into a Python object via marshal.""" return marshal.loads(decompress(string)) class WordTable: "A class to hold the words table." def __init__(self, tablename, fields_to_index, separators="[^\s]"): "Creates words table instance." self.tablename = tablename self.recIDs_in_mem = [] self.fields_to_index = fields_to_index self.separators = separators self.value = {} def get_field(self, recID, tag): """Returns list of values of the MARC-21 'tag' fields for the record 'recID'.""" out = [] bibXXx = "bib" + tag[0] + tag[1] + "x" bibrec_bibXXx = "bibrec_" + bibXXx query = """SELECT value FROM %s AS b, %s AS bb WHERE bb.id_bibrec=%s AND bb.id_bibxxx=b.id AND tag LIKE '%s'""" % (bibXXx, bibrec_bibXXx, recID, tag); res = run_sql(query) for row in res: out.append(row[0]) return out def clean(self): "Cleans the words table." self.value={} def put_into_db(self, mode="normal", split=string.split): """Updates the current words table in the corresponding MySQL's rnkWORD table. Mode 'normal' means normal execution, mode 'emergency' means words index reverting to old state. """ if options["verbose"]: write_message("%s %s wordtable flush started" % (self.tablename,mode)) write_message('...updating %d words into %sR started' % \ (len(self.value), self.tablename[:-1])) task_update_progress("%s flushed %d/%d words" % (self.tablename, 0, len(self.value))) self.recIDs_in_mem = beautify_range_list(self.recIDs_in_mem) if mode == "normal": for group in self.recIDs_in_mem: query = """UPDATE %sR SET type='TEMPORARY' WHERE id_bibrec BETWEEN '%d' AND '%d' AND type='CURRENT'""" % \ (self.tablename[:-1], group[0], group[1]) if options["verbose"] >= 9: write_message(query) run_sql(query) nb_words_total = len(self.value) nb_words_report = int(nb_words_total/10) nb_words_done = 0 for word in self.value.keys(): self.put_word_into_db(word, self.value[word]) nb_words_done += 1 if nb_words_report!=0 and ((nb_words_done % nb_words_report) == 0): if options["verbose"]: write_message('......processed %d/%d words' % (nb_words_done, nb_words_total)) task_update_progress("%s flushed %d/%d words" % (self.tablename, nb_words_done, nb_words_total)) if options["verbose"] >= 9: write_message('...updating %d words into %s ended' % \ (nb_words_total, self.tablename)) #if options["verbose"]: # write_message('...updating reverse table %sR started' % self.tablename[:-1]) if mode == "normal": for group in self.recIDs_in_mem: query = """UPDATE %sR SET type='CURRENT' WHERE id_bibrec BETWEEN '%d' AND '%d' AND type='FUTURE'""" % \ (self.tablename[:-1], group[0], group[1]) if options["verbose"] >= 9: write_message(query) run_sql(query) query = """DELETE FROM %sR WHERE id_bibrec BETWEEN '%d' AND '%d' AND type='TEMPORARY'""" % \ (self.tablename[:-1], group[0], group[1]) if options["verbose"] >= 9: write_message(query) run_sql(query) if options["verbose"] >= 9: write_message('End of updating wordTable into %s' % self.tablename) elif mode == "emergency": write_message("emergency") for group in self.recIDs_in_mem: query = """UPDATE %sR SET type='CURRENT' WHERE id_bibrec BETWEEN '%d' AND '%d' AND type='TEMPORARY'""" % \ (self.tablename[:-1], group[0], group[1]) if options["verbose"] >= 9: write_message(query) run_sql(query) query = """DELETE FROM %sR WHERE id_bibrec BETWEEN '%d' AND '%d' AND type='FUTURE'""" % \ (self.tablename[:-1], group[0], group[1]) if options["verbose"] >= 9: write_message(query) run_sql(query) if options["verbose"] >= 9: write_message('End of emergency flushing wordTable into %s' % self.tablename) #if options["verbose"]: # write_message('...updating reverse table %sR ended' % self.tablename[:-1]) self.clean() self.recIDs_in_mem = [] if options["verbose"]: write_message("%s %s wordtable flush ended" % (self.tablename, mode)) task_update_progress("%s flush ended" % (self.tablename)) def load_old_recIDs(self,word): """Load existing hitlist for the word from the database index files.""" query = "SELECT hitlist FROM %s WHERE term=%%s" % self.tablename res = run_sql(query, (word,)) if res: return deserialize_via_marshal(res[0][0]) else: return None def merge_with_old_recIDs(self,word,recIDs, set): """Merge the system numbers stored in memory (hash of recIDs with value[0] > 0 or -1 according to whether to add/delete them) with those stored in the database index and received in set universe of recIDs for the given word. Return 0 in case no change was done to SET, return 1 in case SET was changed. """ set_changed_p = 0 for recID,sign in recIDs.iteritems(): if sign[0] == -1 and set.has_key(recID): # delete recID if existent in set and if marked as to be deleted del set[recID] set_changed_p = 1 elif sign[0] > -1 and not set.has_key(recID): # add recID if not existent in set and if marked as to be added set[recID] = sign set_changed_p = 1 elif sign[0] > -1 and sign[0] != set[recID][0]: set[recID] = sign set_changed_p = 1 return set_changed_p def put_word_into_db(self, word, recIDs, split=string.split): """Flush a single word to the database and delete it from memory""" set = self.load_old_recIDs(word) #write_message("%s %s" % (word, self.value[word])) if set: # merge the word recIDs found in memory: options["modified_words"][word] = 1 if self.merge_with_old_recIDs(word, recIDs, set) == 0: # nothing to update: if options["verbose"] >= 9: write_message("......... unchanged hitlist for ``%s''" % word) pass else: # yes there were some new words: if options["verbose"] >= 9: write_message("......... updating hitlist for ``%s''" % word) run_sql("UPDATE %s SET hitlist='%s' WHERE term='%s'" % (self.tablename, serialize_via_marshal(set), MySQLdb.escape_string(word))) else: # the word is new, will create new set: if options["verbose"] >= 9: write_message("......... inserting hitlist for ``%s''" % word) set = self.value[word] if len(set) > 0: #new word, add to list options["modified_words"][word] = 1 run_sql("INSERT INTO %s (term, hitlist) VALUES ('%s', '%s')" % (self.tablename, MySQLdb.escape_string(word), serialize_via_marshal(set))) if not set: # never store empty words run_sql("DELETE from %s WHERE term=%%s" % self.tablename, (word,)) del self.value[word] def display(self): "Displays the word table." keys = self.value.keys() keys.sort() for k in keys: if options["verbose"]: write_message("%s: %s" % (k, self.value[k])) def count(self): "Returns the number of words in the table." return len(self.value) def info(self): "Prints some information on the words table." if options["verbose"]: write_message("The words table contains %d words." % self.count()) def lookup_words(self, word=""): "Lookup word from the words table." if not word: done = 0 while not done: try: word = raw_input("Enter word: ") done = 1 except (EOFError, KeyboardInterrupt): return if self.value.has_key(word): if options["verbose"]: write_message("The word '%s' is found %d times." \ % (word, len(self.value[word]))) else: if options["verbose"]: write_message("The word '%s' does not exist in the word file."\ % word) def update_last_updated(self, rank_method_code, starting_time=None): """Update last_updated column of the index table in the database. Puts starting time there so that if the task was interrupted for record download, the records will be reindexed next time.""" if starting_time is None: return None if options["verbose"] >= 9: write_message("updating last_updated to %s...", starting_time) return run_sql("UPDATE rnkMETHOD SET last_updated=%s WHERE name=%s", (starting_time, rank_method_code,)) def add_recIDs(self, recIDs): """Fetches records which id in the recIDs range list and adds them to the wordTable. The recIDs range list is of the form: [[i1_low,i1_high],[i2_low,i2_high], ..., [iN_low,iN_high]]. """ global chunksize flush_count = 0 records_done = 0 records_to_go = 0 for range in recIDs: records_to_go = records_to_go + range[1] - range[0] + 1 time_started = time.time() # will measure profile time for range in recIDs: i_low = range[0] chunksize_count = 0 while i_low <= range[1]: # calculate chunk group of recIDs and treat it: i_high = min(i_low+options["flush"]-flush_count-1,range[1]) i_high = min(i_low+chunksize-chunksize_count-1, i_high) try: self.chk_recID_range(i_low, i_high) except StandardError, e: write_message("Exception caught: %s" % e, sys.stderr) if options["verbose"] >= 9: traceback.print_tb(sys.exc_info()[2]) task_update_status("ERROR") task_sig_stop_commands() sys.exit(1) if options["verbose"]: write_message("%s adding records #%d-#%d started" % \ (self.tablename, i_low, i_high)) if cfg_check_mysql_threads: kill_sleepy_mysql_threads() task_update_progress("%s adding recs %d-%d" % (self.tablename, i_low, i_high)) self.del_recID_range(i_low, i_high) just_processed = self.add_recID_range(i_low, i_high) flush_count = flush_count + i_high - i_low + 1 chunksize_count = chunksize_count + i_high - i_low + 1 records_done = records_done + just_processed if options["verbose"]: write_message("%s adding records #%d-#%d ended " % \ (self.tablename, i_low, i_high)) if chunksize_count >= chunksize: chunksize_count = 0 # flush if necessary: if flush_count >= options["flush"]: self.put_into_db() self.clean() if options["verbose"]: write_message("%s backing up" % (self.tablename)) flush_count = 0 self.log_progress(time_started,records_done,records_to_go) # iterate: i_low = i_high + 1 if flush_count > 0: self.put_into_db() self.log_progress(time_started,records_done,records_to_go) def add_date(self, date=""): # If date is not set, then retrieve it from the database. # Reindex all formats newer than the modification date if not date: write_message("Using the last update time for the rank method") id = self.tablename[len("bibindex"):] query = """SELECT last_updated FROM rnkMETHOD WHERE name='%s' """ % options["current_run"] res = run_sql(query) if not res: return if not res[0][0]: date = ("0000-00-00",'') else: date = (res[0][0],'') query = """SELECT b.id FROM bibrec AS b WHERE b.modification_date >= '%s'""" % date[0] if date[1]: query += "and b.modification_date <= '%s'" % date[1] query += "ORDER BY b.id ASC""" res = run_sql(query) list = create_range_list(res) if not list: if options["verbose"]: write_message( "No new records added. %s is up to date" % self.tablename) else: self.add_recIDs(list) return list def add_recID_range(self, recID1, recID2): empty_list_string = serialize_via_marshal([]) wlist = {} normalize = {} self.recIDs_in_mem.append([recID1,recID2]) # secondly fetch all needed tags: for (tag, weight, lang) in self.fields_to_index: if tag in tagToWordsFunctions.keys(): get_words_function = tagToWordsFunctions[ tag ] else: get_words_function = get_words_from_phrase bibXXx = "bib" + tag[0] + tag[1] + "x" bibrec_bibXXx = "bibrec_" + bibXXx query = """SELECT bb.id_bibrec,b.value FROM %s AS b, %s AS bb WHERE bb.id_bibrec BETWEEN %d AND %d AND bb.id_bibxxx=b.id AND tag LIKE '%s'""" % (bibXXx, bibrec_bibXXx, recID1, recID2, tag) res = run_sql(query) nb_total_to_read = len(res) verbose_idx = 0 # for verbose pretty printing for row in res: recID, phrase = row if options["validset"].contains(recID): if not wlist.has_key(recID): wlist[recID] = {} new_words = get_words_function(phrase, weight, lang) # ,self.separators wlist[recID] = dict_union(new_words,wlist[recID]) # were there some words for these recIDs found? if len(wlist) == 0: return 0 recIDs = wlist.keys() for recID in recIDs: # was this record marked as deleted? if "DELETED" in self.get_field(recID, "980__c"): wlist[recID] = {} if options["verbose"] >= 9: write_message("... record %d was declared deleted, removing its word list" % recID) if options["verbose"] >= 9: write_message("... record %d, termlist: %s" % (recID, wlist[recID])) query_factory = cStringIO.StringIO() qwrite = query_factory.write qwrite( "INSERT INTO %sR (id_bibrec,termlist,type) VALUES" % self.tablename[:-1]) qwrite( "('" ) qwrite( str(recIDs[0]) ) qwrite( "','" ) qwrite( serialize_via_marshal(wlist[recIDs[0]]) ) qwrite( "','FUTURE')" ) for recID in recIDs[1:]: qwrite(",('") qwrite(str(recID)) qwrite("','") qwrite(serialize_via_marshal(wlist[recID])) qwrite("','FUTURE')") query = query_factory.getvalue() query_factory.close() run_sql(query) query_factory = cStringIO.StringIO() qwrite = query_factory.write qwrite("INSERT INTO %sR (id_bibrec,termlist,type) VALUES" % self.tablename[:-1]) qwrite("('") qwrite(str(recIDs[0])) qwrite("','") qwrite(serialize_via_marshal(wlist[recIDs[0]])) qwrite("','CURRENT')") for recID in recIDs[1:]: qwrite( ",('" ) qwrite( str(recID) ) qwrite( "','" ) qwrite( empty_list_string ) qwrite( "','CURRENT')" ) query = query_factory.getvalue() query_factory.close() try: run_sql(query) except MySQLdb.DatabaseError: pass put = self.put for recID in recIDs: for (w, count) in wlist[recID].iteritems(): put(recID, w, count) return len(recIDs) def log_progress(self, start, done, todo): """Calculate progress and store it. start: start time, done: records processed, todo: total number of records""" time_elapsed = time.time() - start # consistency check if time_elapsed == 0 or done > todo: return time_recs_per_min = done/(time_elapsed/60.0) if options["verbose"]: write_message("%d records took %.1f seconds to complete.(%1.f recs/min)"\ % (done, time_elapsed, time_recs_per_min)) if time_recs_per_min: if options["verbose"]: write_message("Estimated runtime: %.1f minutes" % \ ((todo-done)/time_recs_per_min)) def put(self, recID, word, sign): "Adds/deletes a word to the word list." try: word = lower(word[:50]) if self.value.has_key(word): # the word 'word' exist already: update sign self.value[word][recID] = sign # PROBLEM ? else: self.value[word] = {recID: sign} except: write_message("Error: Cannot put word %s with sign %d for recID %s." % (word, sign, recID)) def del_recIDs(self, recIDs): """Fetches records which id in the recIDs range list and adds them to the wordTable. The recIDs range list is of the form: [[i1_low,i1_high],[i2_low,i2_high], ..., [iN_low,iN_high]]. """ count = 0 for range in recIDs: self.del_recID_range(range[0],range[1]) count = count + range[1] - range[0] self.put_into_db() def del_recID_range(self, low, high): """Deletes records with 'recID' system number between low and high from memory words index table.""" if options["verbose"] > 2: write_message("%s fetching existing words for records #%d-#%d started" % \ (self.tablename, low, high)) self.recIDs_in_mem.append([low,high]) query = """SELECT id_bibrec,termlist FROM %sR as bb WHERE bb.id_bibrec BETWEEN '%d' AND '%d'""" % (self.tablename[:-1], low, high) recID_rows = run_sql(query) for recID_row in recID_rows: recID = recID_row[0] wlist = deserialize_via_marshal(recID_row[1]) for word in wlist: self.put(recID, word, (-1, 0)) if options["verbose"] > 2: write_message("%s fetching existing words for records #%d-#%d ended" % \ (self.tablename, low, high)) def report_on_table_consistency(self): """Check reverse words index tables (e.g. rnkWORD01R) for interesting states such as 'TEMPORARY' state. Prints small report (no of words, no of bad words). """ # find number of words: query = """SELECT COUNT(*) FROM %s""" % (self.tablename) res = run_sql(query, None, 1) if res: nb_words = res[0][0] else: nb_words = 0 # find number of records: query = """SELECT COUNT(DISTINCT(id_bibrec)) FROM %sR""" % (self.tablename[:-1]) res = run_sql(query, None, 1) if res: nb_records = res[0][0] else: nb_records = 0 # report stats: if options["verbose"]: write_message("%s contains %d words from %d records" % (self.tablename, nb_words, nb_records)) # find possible bad states in reverse tables: query = """SELECT COUNT(DISTINCT(id_bibrec)) FROM %sR WHERE type <> 'CURRENT'""" % (self.tablename[:-1]) res = run_sql(query) if res: nb_bad_records = res[0][0] else: nb_bad_records = 999999999 if nb_bad_records: write_message("EMERGENCY: %s needs to repair %d of %d records" % \ (self.tablename, nb_bad_records, nb_records)) else: if options["verbose"]: write_message("%s is in consistent state" % (self.tablename)) return nb_bad_records def repair(self): """Repair the whole table""" # find possible bad states in reverse tables: query = """SELECT COUNT(DISTINCT(id_bibrec)) FROM %sR WHERE type <> 'CURRENT'""" % (self.tablename[:-1]) res = run_sql(query, None, 1) if res: nb_bad_records = res[0][0] else: nb_bad_records = 0 # find number of records: query = """SELECT COUNT(DISTINCT(id_bibrec)) FROM %sR""" % (self.tablename[:-1]) res = run_sql(query) if res: nb_records = res[0][0] else: nb_records = 0 if nb_bad_records == 0: return query = """SELECT id_bibrec FROM %sR WHERE type <> 'CURRENT' ORDER BY id_bibrec""" \ % (self.tablename[:-1]) res = run_sql(query) recIDs = create_range_list(res) flush_count = 0 records_done = 0 records_to_go = 0 for range in recIDs: records_to_go = records_to_go + range[1] - range[0] + 1 time_started = time.time() # will measure profile time for range in recIDs: i_low = range[0] chunksize_count = 0 while i_low <= range[1]: # calculate chunk group of recIDs and treat it: i_high = min(i_low+options["flush"]-flush_count-1,range[1]) i_high = min(i_low+chunksize-chunksize_count-1, i_high) try: self.fix_recID_range(i_low, i_high) except StandardError, e: write_message("Exception caught: %s" % e, sys.stderr) if options["verbose"] >= 9: traceback.print_tb(sys.exc_info()[2]) task_update_status("ERROR") task_sig_stop_commands() sys.exit(1) flush_count = flush_count + i_high - i_low + 1 chunksize_count = chunksize_count + i_high - i_low + 1 records_done = records_done + i_high - i_low + 1 if chunksize_count >= chunksize: chunksize_count = 0 # flush if necessary: if flush_count >= options["flush"]: self.put_into_db("emergency") self.clean() flush_count = 0 self.log_progress(time_started,records_done,records_to_go) # iterate: i_low = i_high + 1 if flush_count > 0: self.put_into_db("emergency") self.log_progress(time_started,records_done,records_to_go) write_message("%s inconsistencies repaired." % self.tablename) def chk_recID_range(self, low, high): """Check if the reverse index table is in proper state""" ## check db query = """SELECT COUNT(*) FROM %sR WHERE type <> 'CURRENT' AND id_bibrec BETWEEN '%d' AND '%d'""" % (self.tablename[:-1], low, high) res = run_sql(query, None, 1) if res[0][0]==0: if options["verbose"]: write_message("%s for %d-%d is in consistent state"%(self.tablename,low,high)) return # okay, words table is consistent ## inconsistency detected! write_message("EMERGENCY: %s inconsistencies detected..." % self.tablename) write_message("""EMERGENCY: Errors found. You should check consistency of the %s - %sR tables.\nRunning 'bibindex --repair' is recommended.""" \ % (self.tablename, self.tablename[:-1])) raise StandardError def fix_recID_range(self, low, high): """Try to fix reverse index database consistency (e.g. table rnkWORD01R) in the low,high doc-id range. Possible states for a recID follow: CUR TMP FUT: very bad things have happened: warn! CUR TMP : very bad things have happened: warn! CUR FUT: delete FUT (crash before flushing) CUR : database is ok TMP FUT: add TMP to memory and del FUT from memory flush (revert to old state) TMP : very bad things have happened: warn! FUT: very bad things have happended: warn! """ state = {} query = "SELECT id_bibrec,type FROM %sR WHERE id_bibrec BETWEEN '%d' AND '%d'"\ % (self.tablename[:-1], low, high) res = run_sql(query) for row in res: if not state.has_key(row[0]): state[row[0]]=[] state[row[0]].append(row[1]) ok = 1 # will hold info on whether we will be able to repair for recID in state.keys(): if not 'TEMPORARY' in state[recID]: if 'FUTURE' in state[recID]: if 'CURRENT' not in state[recID]: write_message("EMERGENCY: Record %d is in inconsistent state. Can't repair it" % recID) ok = 0 else: write_message("EMERGENCY: Inconsistency in record %d detected" % recID) query = """DELETE FROM %sR WHERE id_bibrec='%d'""" % (self.tablename[:-1], recID) run_sql(query) write_message("EMERGENCY: Inconsistency in record %d repaired." % recID) else: if 'FUTURE' in state[recID] and not 'CURRENT' in state[recID]: self.recIDs_in_mem.append([recID,recID]) # Get the words file query = """SELECT type,termlist FROM %sR WHERE id_bibrec='%d'""" % (self.tablename[:-1], recID) if options["verbose"] >= 9: write_message(query) res = run_sql(query) for row in res: wlist = deserialize_via_marshal(row[1]) if options["verbose"] >= 9: write_message("Words are %s " % wlist) if row[0] == 'TEMPORARY': sign = 1 else: sign = -1 for word in wlist: self.put(recID, word, wlist[word]) else: write_message("EMERGENCY: %s for %d is in inconsistent state. Couldn't repair it." % (self.tablename, recID)) ok = 0 if not ok: write_message("""EMERGENCY: Unrepairable errors found. You should check consistency of the %s - %sR tables. Deleting affected records is recommended.""" % (self.tablename, self.tablename[:-1])) raise StandardError def word_index(row, run): """Run the indexing task. The row argument is the BibSched task queue row, containing if, arguments, etc. Return 1 in case of success and 0 in case of failure. """ ## import optional modules: try: import psyco psyco.bind(get_words_from_phrase) psyco.bind(WordTable.merge_with_old_recIDs) psyco.bind(serialize_via_numeric_array) psyco.bind(serialize_via_marshal) psyco.bind(deserialize_via_numeric_array) psyco.bind(deserialize_via_marshal) psyco.bind(update_rnkWORD) psyco.bind(check_rnkWORD) except StandardError,e: print "Warning: Psyco", e pass global options, task_id, wordTables, languages # read from SQL row: task_id = row[0] task_proc = row[1] options = marshal.loads(row[6]) # install signal handlers signal.signal(signal.SIGUSR1, task_sig_sleep) signal.signal(signal.SIGTERM, task_sig_stop) signal.signal(signal.SIGABRT, task_sig_suicide) signal.signal(signal.SIGCONT, task_sig_wakeup) signal.signal(signal.SIGINT, task_sig_unknown) ## go ahead and treat each table: options["run"] = [] options["run"].append(run) for rank_method_code in options["run"]: method_starting_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) write_message("Running rank method: %s" % getName(rank_method_code)) try: file = etcdir + "/bibrank/" + rank_method_code + ".cfg" config = ConfigParser.ConfigParser() config.readfp(open(file)) except StandardError, e: write_message("Cannot find configurationfile: %s" % file, sys.stderr) raise StandardError options["current_run"] = rank_method_code options["modified_words"] = {} options["table"] = config.get(config.get("rank_method", "function"), "table") options["use_stemming"] = config.get(config.get("rank_method","function"),"stemming") options["remove_stopword"] = config.get(config.get("rank_method","function"),"stopword") tags = get_tags(config) #get the tags to include options["validset"] = get_valid_range(rank_method_code) #get the records from the collections the method is enabled for function = config.get("rank_method","function") wordTable = WordTable(options["table"], tags) wordTable.report_on_table_consistency() try: if options["cmd"] == "del": if options["id"]: wordTable.del_recIDs(options["id"]) elif options["collection"]: l_of_colls = string.split(options["collection"], ",") recIDs = perform_request_search(c=l_of_colls) recIDs_range = [] for recID in recIDs: recIDs_range.append([recID,recID]) wordTable.del_recIDs(recIDs_range) else: write_message("Missing IDs of records to delete from index %s.", wordTable.tablename, sys.stderr) raise StandardError elif options["cmd"] == "add": if options["id"]: wordTable.add_recIDs(options["id"]) elif options["collection"]: l_of_colls = string.split(options["collection"], ",") recIDs = perform_request_search(c=l_of_colls) recIDs_range = [] for recID in recIDs: recIDs_range.append([recID,recID]) wordTable.add_recIDs(recIDs_range) elif options["last_updated"]: wordTable.add_date("") wordTable.update_last_updated(rank_method_code, method_starting_time) else: wordTable.add_recIDs([[0,cfg_max_recID]]) #wordTable.add_date(options["modified"]) # only update last_updated if run via automatic mode: elif options["cmd"] == "repair": wordTable.repair() check_rnkWORD(options["table"]) elif options["cmd"] == "check": check_rnkWORD(options["table"]) options["modified_words"] = {} elif options["cmd"] == "stat": rank_method_code_statistics(options["table"]) else: write_message("Invalid command found processing %s" % \ wordTable.tablename, sys.stderr) raise StandardError update_rnkWORD(options["table"], options["modified_words"]) except StandardError, e: write_message("Exception caught: %s" % e, sys.stderr) if options["verbose"] >= 9: traceback.print_tb(sys.exc_info()[2]) sys.exit(1) wordTable.report_on_table_consistency() # We are done. State it in the database, close and quit return 1 def get_tags(config): """Get the tags that should be used creating the index and each tag's parameter""" tags = [] function = config.get("rank_method","function") i = 1 shown_error = 0 #try: if 1: while config.has_option(function,"tag%s"% i): tag = config.get(function, "tag%s" % i) tag = string.split(tag, ",") tag[1] = int(string.strip(tag[1])) tag[2] = string.strip(tag[2]) #check if stemmer for language is available if config.get(function,"stemming") and stem("information", "en") != "inform": if shown_error == 0: write_message("Warning: PyStemmer not found. Please read INSTALL.") shown_error = 1 elif tag[2] and tag[2] != "none" and config.get(function,"stemming") and not is_stemmer_available_for_language(tag[2]): write_message("Warning: Language '%s' not available in PyStemmer." % tag[2]) tags.append(tag) i += 1 #except Exception: # write_message("Could not read data from configuration file, please check for errors") # raise StandardError return tags def get_valid_range(rank_method_code): """Returns which records are valid for this rank method, according to which collections it is enabled for.""" #if options["verbose"] >=9: # write_message("Getting records from collections enabled for rank method.") #res = run_sql("SELECT collection.name FROM collection,collection_rnkMETHOD,rnkMETHOD WHERE collection.id=id_collection and id_rnkMETHOD=rnkMETHOD.id and rnkMETHOD.name='%s'" % rank_method_code) #l_of_colls = [] #for coll in res: # l_of_colls.append(coll[0]) #if len(l_of_colls) > 0: # recIDs = perform_request_search(c=l_of_colls) #else: # recIDs = [] valid = HitSet(Numeric.ones(cfg_max_recID+1, Numeric.Int0)) #valid.addlist(recIDs) return valid def write_message(msg, stream=sys.stdout): """Prints message and flush output stream (may be sys.stdout or sys.stderr).""" if stream == sys.stdout or stream == sys.stderr: stream.write(time.strftime("%Y-%m-%d %H:%M:%S --> ", time.localtime())) stream.write("%s\n" % msg) stream.flush() else: sys.stderr.write("Unknown stream %s. [must be sys.stdout or sys.stderr]\n" % stream) def check_term(term, termlength): """Check if term contains not allowed characters, or for any other reasons for not using this term.""" try: if len(term) <= termlength: return False reg = re.compile(r"[1234567890\!\"\#\$\%\&\'\(\)\*\+\,\-\.\/\:\;\<\=\>\?\@\[\\\]\^\_\`\{\|\}\~]") if re.search(reg, term): return False term = str.replace(term, "-", "") term = str.replace(term, ".", "") term = str.replace(term, ",", "") if int(term): return False except StandardError, e: pass return True def check_rnkWORD(table): """Checks for any problems in rnkWORD tables.""" i = 0 errors = {} termslist = run_sql("SELECT term FROM %s" % table) N = run_sql("select max(id_bibrec) from %sR" % table[:-1])[0][0] write_message("Checking integrity of rank values in %s" % table) terms = map(lambda x: x[0], termslist) while i < len(terms): current_terms = "" for j in range(i, ((i+5000)< len(terms) and (i+5000) or len(terms))): current_terms += "'%s'," % terms[j] terms_docs = run_sql("SELECT term, hitlist FROM %s WHERE term in (%s)" % (table, current_terms[:-1])) for (t, hitlist) in terms_docs: term_docs = deserialize_via_marshal(hitlist) if (term_docs.has_key("Gi") and term_docs["Gi"][1] == 0) or not term_docs.has_key("Gi"): write_message("ERROR: Missing value for term: %s (%s) in %s: %s" % (t, repr(t), table, len(term_docs))) errors[t] = 1 i += 5000 write_message("Checking integrity of rank values in %sR" % table[:-1]) i = 0 while i < N: docs_terms = run_sql("SELECT id_bibrec, termlist FROM %sR WHERE id_bibrec>=%s and id_bibrec<=%s" % (table[:-1], i, i+5000)) for (j, termlist) in docs_terms: termlist = deserialize_via_marshal(termlist) for (t, tf) in termlist.iteritems(): if tf[1] == 0 and not errors.has_key(t): errors[t] = 1 write_message("ERROR: Gi missing for record %s and term: %s (%s) in %s" % (j,t,repr(t), table)) terms_docs = run_sql("SELECT term, hitlist FROM %s WHERE term='%s'" % (table, t)) termlist = deserialize_via_marshal(terms_docs[0][1]) i += 5000 if len(errors) == 0: write_message("No direct errors found, but nonconsistent data may exist.") else: write_message("%s errors found during integrity check, repair and rebalancing recommended." % len(errors)) options["modified_words"] = errors def rank_method_code_statistics(table): """Shows some statistics about this rank method.""" maxID = run_sql("select max(id) from %s" % table) maxID = maxID[0][0] terms = {} Gi = {} write_message("Showing statistics of terms in index:") write_message("Important: For the 'Least used terms', the number of terms is shown first, and the number of occurences second.") write_message("Least used terms---Most important terms---Least important terms") i = 0 while i < maxID: terms_docs=run_sql("SELECT term, hitlist FROM %s WHERE id>= %s and id < %s" % (table, i, i + 10000)) for (t, hitlist) in terms_docs: term_docs=deserialize_via_marshal(hitlist) terms[len(term_docs)] = terms.get(len(term_docs), 0) + 1 if term_docs.has_key("Gi"): Gi[t] = term_docs["Gi"] i=i + 10000 terms=terms.items() terms.sort(lambda x, y: cmp(y[1], x[1])) Gi=Gi.items() Gi.sort(lambda x, y: cmp(y[1], x[1])) for i in range(0, 20): write_message("%s/%s---%s---%s" % (terms[i][0],terms[i][1], Gi[i][0],Gi[len(Gi) - i - 1][0])) def update_rnkWORD(table, terms): """Updates rnkWORDF and rnkWORDR with Gi and Nj values. For each term in rnkWORDF, a Gi value for the term is added. And for each term in each document, the Nj value for that document is added. In rnkWORDR, the Gi value for each term in each document is added. For description on how things are computed, look in the hacking docs. table - name of forward index to update terms - modified terms""" stime = time.time() Gi = {} Nj = {} N = run_sql("select count(id_bibrec) from %sR" % table[:-1])[0][0] write_message("Beginning post-processing of %s terms" % len(terms)) if len(terms) == 0: write_message("No terms to process, ending...") return "" #Locating all documents related to the modified/new/deleted terms, if fast update, #only take into account new/modified occurences write_message("Phase 1: Finding records containing modified terms") terms = terms.keys() i = 0 while i < len(terms): terms_docs = get_from_forward_index(terms, i, (i+5000), table) for (t, hitlist) in terms_docs: term_docs = deserialize_via_marshal(hitlist) if term_docs.has_key("Gi"): del term_docs["Gi"] for (j, tf) in term_docs.iteritems(): if (options["quick"] == "yes" and tf[1] == 0) or options["quick"] == "no": Nj[j] = 0 write_message("Phase 1: ......processed %s/%s terms" % ((i+5000>len(terms) and len(terms) or (i+5000)), len(terms))) i += 5000 write_message("Phase 1: Finished finding records containing modified terms") #Find all terms in the records found in last phase write_message("Phase 2: Finding all terms in affected records") records = Nj.keys() i = 0 while i < len(records): docs_terms = get_from_reverse_index(records, i, (i + 5000), table) for (j, termlist) in docs_terms: doc_terms = deserialize_via_marshal(termlist) for (t, tf) in doc_terms.iteritems(): Gi[t] = 0 write_message("Phase 2: ......processed %s/%s records " % ((i+5000>len(records) and len(records) or (i+5000)), len(records))) i += 5000 write_message("Phase 2: Finished finding all terms in affected records") terms = Gi.keys() Gi = {} i = 0 if options["quick"] == "no": #Calculating Fi and Gi value for each term write_message("Phase 3: Calculating importance of all affected terms") while i < len(terms): terms_docs = get_from_forward_index(terms, i, (i+5000), table) for (t, hitlist) in terms_docs: term_docs = deserialize_via_marshal(hitlist) if term_docs.has_key("Gi"): del term_docs["Gi"] Fi = 0 Gi[t] = 1 for (j, tf) in term_docs.iteritems(): Fi += tf[0] for (j, tf) in term_docs.iteritems(): if tf[0] != Fi: Gi[t] = Gi[t] + ((float(tf[0]) / Fi) * math.log(float(tf[0]) / Fi) / math.log(2)) / math.log(N) write_message("Phase 3: ......processed %s/%s terms" % ((i+5000>len(terms) and len(terms) or (i+5000)), len(terms))) i += 5000 write_message("Phase 3: Finished calculating importance of all affected terms") else: #Using existing Gi value instead of calculating a new one. Missing some accurancy. write_message("Phase 3: Getting approximate importance of all affected terms") while i < len(terms): terms_docs = get_from_forward_index(terms, i, (i+5000), table) for (t, hitlist) in terms_docs: term_docs = deserialize_via_marshal(hitlist) if term_docs.has_key("Gi"): Gi[t] = term_docs["Gi"][1] elif len(term_docs) == 1: Gi[t] = 1 else: Fi = 0 Gi[t] = 1 for (j, tf) in term_docs.iteritems(): Fi += tf[0] for (j, tf) in term_docs.iteritems(): if tf[0] != Fi: Gi[t] = Gi[t] + ((float(tf[0]) / Fi) * math.log(float(tf[0]) / Fi) / math.log(2)) / math.log(N) write_message("Phase 3: ......processed %s/%s terms" % ((i+5000>len(terms) and len(terms) or (i+5000)), len(terms))) i += 5000 write_message("Phase 3: Finished getting approximate importance of all affected terms") write_message("Phase 4: Calculating normalization value for all affected records and updating %sR" % table[:-1]) records = Nj.keys() i = 0 while i < len(records): #Calculating the normalization value for each document, and adding the Gi value to each term in each document. docs_terms = get_from_reverse_index(records, i, (i + 5000), table) for (j, termlist) in docs_terms: doc_terms = deserialize_via_marshal(termlist) for (t, tf) in doc_terms.iteritems(): if Gi.has_key(t): Nj[j] = Nj.get(j, 0) + math.pow(Gi[t] * (1 + math.log(tf[0])), 2) Git = int(math.floor(Gi[t]*100)) if Git >= 0: Git += 1 doc_terms[t] = (tf[0], Git) else: Nj[j] = Nj.get(j, 0) + math.pow(tf[1] * (1 + math.log(tf[0])), 2) Nj[j] = 1.0 / math.sqrt(Nj[j]) Nj[j] = int(Nj[j] * 100) if Nj[j] >= 0: Nj[j] += 1 run_sql("UPDATE %sR SET termlist='%s' WHERE id_bibrec=%s" % (table[:-1], serialize_via_marshal(doc_terms), j)) write_message("Phase 4: ......processed %s/%s records" % ((i+5000>len(records) and len(records) or (i+5000)), len(records))) i += 5000 write_message("Phase 4: Finished calculating normalization value for all affected records and updating %sR" % table[:-1]) write_message("Phase 5: Updating %s with new normalization values" % table) i = 0 terms = Gi.keys() while i < len(terms): #Adding the Gi value to each term, and adding the normalization value to each term in each document. terms_docs = get_from_forward_index(terms, i, (i+5000), table) for (t, hitlist) in terms_docs: term_docs = deserialize_via_marshal(hitlist) if term_docs.has_key("Gi"): del term_docs["Gi"] for (j, tf) in term_docs.iteritems(): if Nj.has_key(j): term_docs[j] = (tf[0], Nj[j]) Git = int(math.floor(Gi[t]*100)) if Git >= 0: Git += 1 term_docs["Gi"] = (0, Git) run_sql("UPDATE %s SET hitlist='%s' WHERE term='%s'" % (table, serialize_via_marshal(term_docs), MySQLdb.escape_string(t))) write_message("Phase 5: ......processed %s/%s terms" % ((i+5000>len(terms) and len(terms) or (i+5000)), len(terms))) i += 5000 write_message("Phase 5: Finished updating %s with new normalization values" % table) write_message("Time used for post-processing: %.1fmin" % ((time.time() - stime) / 60)) write_message("Finished post-processing") def get_from_forward_index(terms, start, stop, table): current_terms = "" for j in range(start, (stop < len(terms) and stop or len(terms))): current_terms += "'%s'," % terms[j] terms_docs = run_sql("SELECT term, hitlist FROM %s WHERE term IN (%s)" % (table,current_terms[:-1])) return terms_docs def get_from_reverse_index(records, start, stop, table): current_recs = "%s" % records[start:stop] current_recs = current_recs[1:-1] docs_terms = run_sql("SELECT id_bibrec, termlist FROM %sR WHERE id_bibrec IN (%s)" % (table[:-1],current_recs)) return docs_terms def test_word_separators(phrase="hep-th/0101001"): """Tests word separating policy on various input.""" print "%s:" % phrase gwfp = get_words_from_phrase(phrase) for (word, count) in gwfp.iteritems(): print "\t-> %s - %s" % (word, count) def task_sig_sleep(sig, frame): """Signal handler for the 'sleep' signal sent by BibSched.""" if options["verbose"] >= 9: write_message("got signal %d" % sig) write_message("sleeping...") task_update_status("SLEEPING") signal.pause() # wait for wake-up signal def task_sig_wakeup(sig, frame): """Signal handler for the 'wakeup' signal sent by BibSched.""" if options["verbose"] >= 9: write_message("got signal %d" % sig) write_message("continuing...") task_update_status("CONTINUING") def task_sig_stop(sig, frame): """Signal handler for the 'stop' signal sent by BibSched.""" if options["verbose"] >= 9: write_message("got signal %d" % sig) write_message("stopping...") task_update_status("STOPPING") errcode = 0 try: task_sig_stop_commands() write_message("stopped") task_update_status("STOPPED") except StandardError, err: write_message("Error during stopping! %e" % err) task_update_status("STOPPINGFAILED") errcode = 1 sys.exit(errcode) def task_sig_stop_commands(): """Do all the commands necessary to stop the task before quitting. Useful for task_sig_stop() handler. """ write_message("stopping commands started") for table in wordTables: table.put_into_db() write_message("stopping commands ended") def task_sig_suicide(sig, frame): """Signal handler for the 'suicide' signal sent by BibSched.""" if options["verbose"] >= 9: write_message("got signal %d" % sig) write_message("suiciding myself now...") task_update_status("SUICIDING") write_message("suicided") task_update_status("SUICIDED") sys.exit(0) def task_sig_unknown(sig, frame): """Signal handler for the other unknown signals sent by shell or user.""" if options["verbose"] >= 9: write_message("got signal %d" % sig) write_message("unknown signal %d ignored" % sig) # do nothing for other signals def task_update_progress(msg): """Updates progress information in the BibSched task table.""" global task_id, options if options["verbose"] >= 9: write_message("Updating task progress to %s." % msg) return run_sql("UPDATE schTASK SET progress=%s where id=%s", (msg, task_id)) def task_update_status(val): """Updates state information in the BibSched task table.""" global task_id, options if options["verbose"] >= 9: write_message("Updating task status to %s." % val) return run_sql("UPDATE schTASK SET status=%s where id=%s", (val, task_id)) def getName(methname, ln=cdslang, type='ln'): """Returns the name of the rank method, either in default language or given language. methname = short name of the method ln - the language to get the name in type - which name "type" to get.""" try: rnkid = run_sql("SELECT id FROM rnkMETHOD where name='%s'" % methname) if rnkid: rnkid = str(rnkid[0][0]) res = run_sql("SELECT value FROM rnkMETHODNAME where type='%s' and ln='%s' and id_rnkMETHOD=%s" % (type, ln, rnkid)) if not res: res = run_sql("SELECT value FROM rnkMETHODNAME WHERE ln='%s' and id_rnkMETHOD=%s and type='%s'" % (cdslang, rnkid, type)) if not res: return methname return res[0][0] else: raise Exception except Exception, e: write_message("Cannot run rank method, either given code for method is wrong, or it has not been added using the webinterface.") raise Exception def word_similarity(row, run): """Call correct method""" return word_index(row, run)