diff --git a/modules/bibsched/doc/admin/bibsched-admin-guide.webdoc b/modules/bibsched/doc/admin/bibsched-admin-guide.webdoc index abd428ec9..716fa812b 100644 --- a/modules/bibsched/doc/admin/bibsched-admin-guide.webdoc +++ b/modules/bibsched/doc/admin/bibsched-admin-guide.webdoc @@ -1,185 +1,186 @@ ## -*- mode: html; coding: utf-8; -*- ## This file is part of CDS Invenio. ## Copyright (C) 2002, 2003, 2004, 2005, 2006, 2007, 2008 CERN. ## ## CDS Invenio is free software; you can redistribute it and/or ## modify it under the terms of the GNU General Public License as ## published by the Free Software Foundation; either version 2 of the ## License, or (at your option) any later version. ## ## CDS Invenio is distributed in the hope that it will be useful, but ## WITHOUT ANY WARRANTY; without even the implied warranty of ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU ## General Public License for more details. ## ## You should have received a copy of the GNU General Public License ## along with CDS Invenio; if not, write to the Free Software Foundation, Inc., ## 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA.

Overview

BibSched -- the bibliographic task scheduler -- is central unit of the system that allows all other modules to access the bibliographic database in a controlled manner, preventing sharing violation threats and assuring the coherent execution of the database update tasks. The module comes with an administrative interface that allows to monitor the task queue including various possibilities of a manual intervention, for example to re-schedule queued tasks, change the task order, etc.

You can run the administrative interface by doing:

 $ bibsched
 

Note that in general you should run bibsched with the same rights of the Apache user of your system.

The bibsched can run in two modes: automatic and manual. In the automatic mode, it will execute tasks automatically as they arrive in the waiting queue. In the manual mode, the administrator has to launch the tasks manually.

Bibsched graphical interface

bibsched interface is text mode graphical interface to display running tasks. It has three views, one for listing done tasks, one for scheduled/running/failed tasks and a third one for displaying archived tasks. You can switch among these three views by pressing respectively "1", "2" or "3".

With the harrows you can move from one task to the other

By pressing "O" you can see all the details of the selected task

If the task is running or is already run, you can press "l" (lower case "l") to access the standard output produced by the task, if any. You can press "L" (upper case "L") to access the standard error produced by the task, if any.

By pressing "P" you can clean the list of DONE tasks and archive/delete them.

By pressing "Q" you can Quit the interface.

By pressing "A" you can switch from Auto to Manual mode and viceversa.

Manual mode

In manual mode, depending on the status of the task you are currently selecting you're given different actions.

You can press "R" for running Waiting tasks

You can press "D" for deleting non running tasks

You can press "N" for changing the priority of a waiting task. (the equivalent of the UNIX renice command)

On a running task you can press "K" to kill the task immediately in case of emergency. "T" for stopping it cleanly. "S" for putting it temporarily to sleep. A sleeping task can be waken up by pressing "W". Note that for stopping or putting to sleep a task, a signal is sent to the given bibtask and this, in turn, will acknowledge it and decide to stop or go to sleep whenever it thinks it's safe.

On a failed task you can press "K" the acknowledge the error. This is necessary in case you wish to put bibsched back to automatic mode.

Automatic mode

In automatic mode bibsched will take care of launching tasks based on their priority and runtime schedule. The available option are only those that allow you to query a given task (see the logs and the options).

If you have configured bibsched to allow for the execution of concurrent bibtasks, bibsched will take care of launching compatible tasks concurrently (note that this feature is currently experimental). Bibupload tasks will always be executed in the chronological order (to preserve input consistency).

Bibsched maintenance

bibsched produce two log files. bibsched.log and bibsched.err, located under the usual log directory of your CDS Invenio installation. The former will contain all the actions (either automatic of manual) that bibsched has performed. The latter will contain all the exceptional errors.

In case of a bibtask failing while bibsched is in automatic mode, bibsched will stop turning into manual mode, and sending an email to the administrator (and an emergency SMS in case it is configured to do so). Note that in case of failed bibtasks, bibsched will refuse to be put back to automatic mode, until either the task is reinitialized, or deleted or the error is acknowledged.

Priorities

A task can be scheduled with a given priority, represented by an integer number. When at a given time two or more tasks might be executed, the task with higher priority will be executed first.

If this task as priority higher than 10 and there are currently other task running, conflicting with the execution of this task (because either the maximum number of task runnable is already reached or because the other tasks should not run concurrently with this task), then the other tasks are stopped.

If the priority is less than 0 than the task will never be executed automatically.

Bibupload tasks are not affected by priority with respect to each other and will always be executed in the proper order.

Task logging

When executed each tasks will produced (if necessary) a couple of log files. One called bibsched_task_{task_id}.log and the other bibsched_task_{task_id}.err. In case of reschedulable task, each time the task is rescheduled it is being assigned the same task_id. That means that log information of successive execution of the given task will be appended at the end of already existing log files.

A log-rotation algorithm is applied when writing into the log file. By default each log will be no bigger than 1MB. After this limit is reached the log is rotated. Note that when viewing the log file inside the bibsched monitor interface, only the latest log will be displayed.

Task concurrency

A recent experimental feature of bibsched is the concurrent execution of compatible tasks. The current definition of when two tasks are considered compatible is: "If a two tasks have the same name (e.g. bibupload) then they're incompatible."

Sometimes you might want to consider compatible two tasks even when they have the same name. For this you can add a name specification via the bibtask command line option --task-specific-name. E.g. you might want to distinguish a generic bibupload from a bibupload carrying only preformatting information. For this just launch bibupload -N "bibformat", and it will be considered compatible with all the other bibuploads.

Submitting BibTasks

All the BibTasks have a common command line interface

 Scheduling options:
   -u, --user=USER       User name to submit the task as, password needed.
   -t, --runtime=TIME    Time to execute the task (now), e.g.: +15s, 5m, 3h, 2002-10-27 13:57:26
   -s, --sleeptime=SLEEP Sleeping frequency after which to repeat task (no), e.g.: 30m, 2h, 1d
   -L  --runtime-limit=LIMIT     Range of time when it's to execute the task, e.g.: Sunday 23:00-5:00
                                 with the syntax We[ekday][ fh[:fm][-th:[tm]]]
   -P, --priority=PRIORITY       Priority level (an integer, 0 is default)
   -N, --task-specific-name=TASK_SPECIFIC_NAME   Advanced option
 General options:
   -h, --help            Print this help.
   -V, --version         Print version information.
   -v, --verbose=LEVEL   Verbose level (0=min, 1=default, 9=max).
 

Configuration

Bibsched can be tweaked by adjusting some variables in the usual invenio(-local).conf file. Please refer to the documentation associated with each variable inside this file.

Bibsched command line interface

     Usage: /soft/cdsweb/bin/bibsched [options] [start|stop|restart|monitor|status]
 
     The following commands are available for bibsched:
 
     start      start bibsched in background
-    stop       stop a running bibsched
+    stop       stop running bibtasks and the bibsched daemon safely
+    halt       halt running bibsched while keeping bibtasks running
     restart    restart a running bibsched
     monitor    enter the interactive monitor
     status     get report about current status of the queue
     purge      purge the scheduler queue from old tasks
 
     Command options:
     -d, --daemon           Launch BibSched in the daemon mode (deprecated, use 'start')
     General options:
     -h, --help             Print this help.
     -V, --version          Print version information.
     Status options:
     -s, --status=LIST      Which BibTask status should be considered (default is Running,waiting)
     -S, --since=TIME       Since how long time to consider tasks e.g.: 30m, 2h, 1d (default
     is all)
     -t, --tasks=LIST       Comma separated list of BibTask to consider (default
     is all)
     Purge options:
     -s, --status=LIST      Which BibTask status should be considered (default is DONE)
     -S, --since=TIME       Since how long time to consider tasks e.g.: 30m, 2h, 1d (default
     is 30 days)
     -t, --tasks=LIST       Comma separated list of BibTask to consider (default
     is bibindex,bibreformat,webcoll,bibrank,inveniogc,bibupload,oaiarchive)
 

Bibtasks command line interface

Each bibtask has a common command interface in addition to the proper bibtask related options.

     Scheduling options:
     -u, --user=USER       User name to submit the task as, password needed.
     -t, --runtime=TIME    Time to execute the task (now), e.g.: +15s, 5m, 3h, 2002-10-27 13:57:26
     -s, --sleeptime=SLEEP Sleeping frequency after which to repeat task (no), e.g.: 30m, 2h, 1d
     -P, --priority=PRIORITY       Priority level (an integer, 0 is default)
     -N, --task_specific_name=TASK_SPECIFIC_NAME   Advanced option
     General options:
     -h, --help            Print this help.
     -V, --version         Print version information.
     -v, --verbose=LEVEL   Verbose level (0=min, 1=default, 9=max).
 
-

\ No newline at end of file +

diff --git a/modules/bibsched/lib/bibsched.py b/modules/bibsched/lib/bibsched.py index b407c71b1..5bc77a4d5 100644 --- a/modules/bibsched/lib/bibsched.py +++ b/modules/bibsched/lib/bibsched.py @@ -1,1269 +1,1311 @@ # -*- coding: utf-8 -*- ## ## This file is part of CDS Invenio. ## Copyright (C) 2002, 2003, 2004, 2005, 2006, 2007, 2008 CERN. ## ## CDS Invenio is free software; you can redistribute it and/or ## modify it under the terms of the GNU General Public License as ## published by the Free Software Foundation; either version 2 of the ## License, or (at your option) any later version. ## ## CDS Invenio is distributed in the hope that it will be useful, but ## WITHOUT ANY WARRANTY; without even the implied warranty of ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU ## General Public License for more details. ## ## You should have received a copy of the GNU General Public License ## along with CDS Invenio; if not, write to the Free Software Foundation, Inc., ## 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA. """BibSched - task management, scheduling and executing system for CDS Invenio """ __revision__ = "$Id$" import os import string import sys import time import re import marshal import getopt import copy from socket import gethostname import signal from invenio.bibtask_config import CFG_BIBTASK_VALID_TASKS from invenio.config import \ CFG_PREFIX, \ CFG_BIBSCHED_REFRESHTIME, \ CFG_BIBSCHED_LOG_PAGER, \ CFG_BINDIR, \ CFG_LOGDIR, \ CFG_BIBSCHED_GC_TASKS_OLDER_THAN, \ CFG_BIBSCHED_GC_TASKS_TO_REMOVE, \ CFG_BIBSCHED_GC_TASKS_TO_ARCHIVE, \ CFG_BIBSCHED_MAX_NUMBER_CONCURRENT_TASKS, \ CFG_SITE_URL from invenio.dbquery import run_sql, escape_string from invenio.textutils import wrap_text_in_a_box from invenio.errorlib import register_exception, register_emergency CFG_VALID_STATUS = ('WAITING', 'SCHEDULED', 'RUNNING', 'CONTINUING', 'DELETED %', 'ABOUT TO STOP', 'ABOUT TO SLEEP', 'STOPPED', 'SLEEPING', 'KILLED') shift_re = re.compile("([-\+]{0,1})([\d]+)([dhms])") def get_datetime(var, format_string="%Y-%m-%d %H:%M:%S"): """Returns a date string according to the format string. It can handle normal date strings and shifts with respect to now.""" try: date = time.time() factors = {"d":24*3600, "h":3600, "m":60, "s":1} m = shift_re.match(var) if m: sign = m.groups()[0] == "-" and -1 or 1 factor = factors[m.groups()[2]] value = float(m.groups()[1]) date = time.localtime(date + sign * factor * value) date = time.strftime(format_string, date) else: date = time.strptime(var, format_string) date = time.strftime(format_string, date) return date except: return None def get_my_pid(process, args=''): if sys.platform.startswith('freebsd'): COMMAND = "ps -o pid,args | grep '%s %s' | grep -v 'grep' | sed -n 1p" % (process, args) else: COMMAND = "ps -C %s o '%%p%%a' | grep '%s %s' | grep -v 'grep' | sed -n 1p" % (process, process, args) answer = string.strip(os.popen(COMMAND).read()) if answer == '': answer = 0 else: answer = answer[:string.find(answer,' ')] return int(answer) def get_task_pid(task_name, task_id, ignore_error=False): """Return the pid of task_name/task_id""" try: pid = int(open(os.path.join(CFG_PREFIX, 'var', 'run', 'bibsched_task_%d.pid' % task_id)).read()) os.kill(pid, signal.SIGUSR2) return pid except (OSError, IOError): if ignore_error: return None register_exception() return get_my_pid(task_name, str(task_id)) def is_task_scheduled(task_name): """Check if a certain task_name is due for execution (WAITING or RUNNING)""" sql = "SELECT COUNT(proc) FROM schTASK WHERE proc = %s AND (status='WAITING' OR status='RUNNING')" return run_sql(sql, (task_name,))[0][0] > 0 def get_task_ids_by_descending_date(task_name, statuses=['SCHEDULED']): """Returns list of task ids, ordered by descending runtime.""" sql = "SELECT id FROM schTASK WHERE proc=%s AND (" + \ " OR ".join(["status = '%s'" % x for x in statuses]) + ") ORDER BY runtime DESC" return [x[0] for x in run_sql(sql, (task_name,))] def get_task_options(task_id): """Returns options for task_id read from the BibSched task queue table.""" res = run_sql("SELECT arguments FROM schTASK WHERE id=%s", (task_id,)) try: return marshal.loads(res[0][0]) except IndexError: return list() def gc_tasks(verbose=False, statuses=None, since=None, tasks=None): """Garbage collect the task queue.""" if tasks is None: tasks = CFG_BIBSCHED_GC_TASKS_TO_REMOVE + CFG_BIBSCHED_GC_TASKS_TO_ARCHIVE if since is None: since = '-%id' % CFG_BIBSCHED_GC_TASKS_OLDER_THAN if statuses is None: statuses = ['DONE'] statuses = [status.upper() for status in statuses if status.upper() != 'RUNNING'] date = get_datetime(since) status_query = 'status in (%s)' % ','.join([repr(escape_string(status)) for status in statuses]) for task in tasks: if task in CFG_BIBSCHED_GC_TASKS_TO_REMOVE: res = run_sql("""DELETE FROM schTASK WHERE proc=%%s AND %s AND runtime<%%s""" % status_query, (task, date)) write_message('Deleted %s %s tasks (created before %s) with %s' % (res, task, date, status_query)) elif task in CFG_BIBSCHED_GC_TASKS_TO_ARCHIVE: run_sql("""INSERT INTO hstTASK(id,proc,host,user, runtime,sleeptime,arguments,status,progress) SELECT id,proc,host,user, runtime,sleeptime,arguments,status,progress FROM schTASK WHERE proc=%%s AND %s AND runtime<%%s""" % status_query, (task, date)) res = run_sql("""DELETE FROM schTASK WHERE proc=%%s AND %s AND runtime<%%s""" % status_query, (task, date)) write_message('Archived %s %s tasks (created before %s) with %s' % (res, task, date, status_query)) def bibsched_set_status(task_id, status): """Update the status of task_id.""" return run_sql("UPDATE schTASK SET status=%s WHERE id=%s", (status, task_id)) def bibsched_set_progress(task_id, progress): """Update the progress of task_id.""" return run_sql("UPDATE schTASK SET progress=%s WHERE id=%s", (progress, task_id)) def bibsched_set_priority(task_id, priority): """Update the priority of task_id.""" return run_sql("UPDATE schTASK SET priority=%s WHERE id=%s", (priority, task_id)) def bibsched_send_signal(proc, task_id, signal): """Send a signal to a given task.""" try: pid = get_task_pid(proc, task_id, True) os.kill(pid, signal) except OSError: pass except KeyError: register_exception() class Manager: def __init__(self, old_stdout): import curses import curses.panel from curses.wrapper import wrapper self.old_stdout = old_stdout self.curses = curses self.helper_modules = CFG_BIBTASK_VALID_TASKS self.running = 1 #self.footer_move_mode = "[KeyUp/KeyDown Move] [M Select mode] [Q Quit]" self.footer_auto_mode = "Automatic Mode [A Manual] [1/2/3 Display] [P Purge] [l/L Log] [O Opts] [Q Quit]" self.footer_select_mode = "Manual Mode [A Automatic] [1/2/3 Display Type] [P Purge] [l/L Log] [O Opts] [Q Quit]" self.footer_waiting_item = "[R Run] [D Delete] [N Priority]" self.footer_running_item = "[S Sleep] [T Stop] [K Kill]" self.footer_stopped_item = "[I Initialise] [D Delete] [K Acknowledge]" self.footer_sleeping_item = "[W Wake Up]" self.item_status = "" self.selected_line = 2 self.rows = [] self.panel = None self.display = 2 self.first_visible_line = 0 #self.move_mode = 0 self.auto_mode = 0 self.currentrow = None self.current_attr = 0 wrapper(self.start) def handle_keys(self, chr): if chr == -1: return if self.auto_mode and (chr not in (self.curses.KEY_UP, self.curses.KEY_DOWN, self.curses.KEY_PPAGE, self.curses.KEY_NPAGE, ord("q"), ord("Q"), ord("a"), ord("A"), ord("1"), ord("2"), ord("3"), ord("p"), ord("P"), ord("o"), ord("O"), ord("l"), ord("L"))): self.display_in_footer("in automatic mode") self.stdscr.refresh() #elif self.move_mode and (chr not in (self.curses.KEY_UP, #self.curses.KEY_DOWN, #ord("m"), ord("M"), ord("q"), #ord("Q"))): #self.display_in_footer("in move mode") #self.stdscr.refresh() else: status = self.currentrow and self.currentrow[5] or None if chr == self.curses.KEY_UP: #if self.move_mode: #self.move_up() #else: self.selected_line = max(self.selected_line - 1, 2) self.repaint() if chr == self.curses.KEY_PPAGE: self.selected_line = max(self.selected_line - 10, 2) self.repaint() elif chr == self.curses.KEY_DOWN: #if self.move_mode: #self.move_down() #else: self.selected_line = min(self.selected_line + 1, len(self.rows) + 1 ) self.repaint() elif chr == self.curses.KEY_NPAGE: self.selected_line = min(self.selected_line + 10, len(self.rows) + 1 ) self.repaint() elif chr == self.curses.KEY_HOME: self.first_visible_line = 0 self.selected_line = 2 elif chr in (ord("a"), ord("A")): self.change_auto_mode() elif chr == ord("l"): self.openlog() elif chr == ord("L"): self.openlog(err=True) elif chr in (ord("w"), ord("W")): self.wakeup() elif chr in (ord("n"), ord("N")): self.change_priority() elif chr in (ord("r"), ord("R")): if status in ('WAITING', 'SCHEDULED'): self.run() elif chr in (ord("s"), ord("S")): self.sleep() elif chr in (ord("k"), ord("K")): if status in ('ERROR', 'DONE WITH ERRORS'): self.acknowledge() elif status is not None: self.kill() elif chr in (ord("t"), ord("T")): self.stop() elif chr in (ord("d"), ord("D")): self.delete() elif chr in (ord("i"), ord("I")): self.init() #elif chr in (ord("m"), ord("M")): #self.change_select_mode() elif chr in (ord("p"), ord("P")): self.purge_done() elif chr in (ord("o"), ord("O")): self.display_task_options() elif chr == ord("1"): self.display = 1 self.first_visible_line = 0 self.selected_line = 2 self.display_in_footer("only done processes are displayed") elif chr == ord("2"): self.display = 2 self.first_visible_line = 0 self.selected_line = 2 self.display_in_footer("only not done processes are displayed") elif chr == ord("3"): self.display = 3 self.first_visible_line = 0 self.selected_line = 2 self.display_in_footer("only archived processes are displayed") elif chr in (ord("q"), ord("Q")): if self.curses.panel.top_panel() == self.panel: self.panel.bottom() self.curses.panel.update_panels() else: self.running = 0 return def openlog(self, err=False): task_id = self.currentrow[0] status = self.currentrow[5] if err: logname = os.path.join(CFG_LOGDIR, 'bibsched_task_%d.err' % task_id) else: logname = os.path.join(CFG_LOGDIR, 'bibsched_task_%d.log' % task_id) if os.path.exists(logname): pager = CFG_BIBSCHED_LOG_PAGER or os.environ.get('PAGER', '/bin/more') if os.path.exists(pager): self.curses.endwin() os.system('%s %s' % (pager, logname)) print >> self.old_stdout, "\rPress ENTER to continue", self.old_stdout.flush() raw_input() self.curses.panel.update_panels() def display_task_options(self): """Nicely display information about current process.""" msg = ' id: %i\n\n' % self.currentrow[0] pid = get_task_pid(self.currentrow[1], self.currentrow[0], True) if pid is not None: msg += ' pid: %s\n\n' % pid msg += ' priority: %s\n\n' % self.currentrow[8] msg += ' proc: %s\n\n' % self.currentrow[1] msg += ' user: %s\n\n' % self.currentrow[2] msg += ' runtime: %s\n\n' % self.currentrow[3].strftime("%Y-%m-%d %H:%M:%S") msg += ' sleeptime: %s\n\n' % self.currentrow[4] msg += ' status: %s\n\n' % self.currentrow[5] msg += ' progress: %s\n\n' % self.currentrow[6] arguments = marshal.loads(self.currentrow[7]) if type(arguments) is dict: # FIXME: REMOVE AFTER MAJOR RELEASE 1.0 msg += ' options : %s\n\n' % arguments else: msg += 'executable : %s\n\n' % arguments[0] msg += ' arguments : %s\n\n' % ' '.join(arguments[1:]) msg += '\n\nPress a key to continue...' msg = wrap_text_in_a_box(msg, style='no_border') rows = msg.split('\n') height = len(rows) + 2 width = max([len(row) for row in rows]) + 4 try: self.win = self.curses.newwin( height, width, (self.height - height) / 2 + 1, (self.width - width) / 2 + 1 ) except self.curses.error: return self.panel = self.curses.panel.new_panel(self.win) self.panel.top() self.win.border() i = 1 for row in rows: self.win.addstr(i, 2, row, self.current_attr) i += 1 self.win.refresh() self.win.getch() def count_processes(self, status): out = 0 res = run_sql("SELECT COUNT(id) FROM schTASK WHERE status=%s GROUP BY status", (status,)) try: out = res[0][0] except: pass return out def change_priority(self): task_id = self.currentrow[0] priority = self.currentrow[8] new_priority = self._display_ask_number_box("Insert the desired priority for task %s. The smaller the number the less the priority. Note that a negative number will mean to always postpone the task while a number bigger than 10 will mean some tasks with less priority could be stopped in order to let this task run. The current priority is %s. New value:" % (task_id, priority)) try: new_priority = int(new_priority) except ValueError: return bibsched_set_priority(task_id, new_priority) def wakeup(self): task_id = self.currentrow[0] process = self.currentrow[1] status = self.currentrow[5] #if self.count_processes('RUNNING') + self.count_processes('CONTINUING') >= 1: #self.display_in_footer("a process is already running!") if status == "SLEEPING": bibsched_send_signal(process, task_id, signal.SIGCONT) self.display_in_footer("process woken up") else: self.display_in_footer("process is not sleeping") self.stdscr.refresh() def _display_YN_box(self, msg): """Utility to display confirmation boxes.""" msg += ' (Y/N)' msg = wrap_text_in_a_box(msg, style='no_border') rows = msg.split('\n') height = len(rows) + 2 width = max([len(row) for row in rows]) + 4 self.win = self.curses.newwin( height, width, (self.height - height) / 2 + 1, (self.width - width) / 2 + 1 ) self.panel = self.curses.panel.new_panel( self.win ) self.panel.top() self.win.border() i = 1 for row in rows: self.win.addstr(i, 2, row, self.current_attr) i += 1 self.win.refresh() while 1: c = self.win.getch() if c in (ord('y'), ord('Y')): self.curses.panel.update_panels() return True elif c in (ord('n'), ord('N')): self.curses.panel.update_panels() return False def _display_ask_number_box(self, msg): """Utility to display confirmation boxes.""" msg = wrap_text_in_a_box(msg, style='no_border') rows = msg.split('\n') height = len(rows) + 3 width = max([len(row) for row in rows]) + 4 self.win = self.curses.newwin( height, width, (self.height - height) / 2 + 1, (self.width - width) / 2 + 1 ) self.panel = self.curses.panel.new_panel( self.win ) self.panel.top() self.win.border() i = 1 for row in rows: self.win.addstr(i, 2, row, self.current_attr) i += 1 self.win.refresh() self.win.move(height - 2, 2) self.curses.echo() ret = self.win.getstr() self.curses.noecho() return ret def purge_done(self): """Garbage collector.""" if self._display_YN_box("You are going to purge the list of DONE tasks.\n\n" "%s tasks, submitted since %s days, will be archived.\n\n" "%s tasks, submitted since %s days, will be deleted.\n\n" "Are you sure?" % ( ','.join(CFG_BIBSCHED_GC_TASKS_TO_ARCHIVE), CFG_BIBSCHED_GC_TASKS_OLDER_THAN, ','.join(CFG_BIBSCHED_GC_TASKS_TO_REMOVE), CFG_BIBSCHED_GC_TASKS_OLDER_THAN)): gc_tasks() self.display_in_footer("DONE processes purged") def run(self): task_id = self.currentrow[0] process = self.currentrow[1].split(':')[0] status = self.currentrow[5] #if self.count_processes('RUNNING') + self.count_processes('CONTINUING') >= 1: #self.display_in_footer("a process is already running!") if status in ("SCHEDULED", "WAITING"): if process in self.helper_modules: program = os.path.join(CFG_BINDIR, process) COMMAND = "%s %s > /dev/null 2> /dev/null &" % (program, str(task_id)) os.system(COMMAND) Log("manually running task #%d (%s)" % (task_id, process)) else: self.display_in_footer("Process %s is not in the list of allowed processes." % process) else: self.display_in_footer("Process status should be SCHEDULED or WAITING!") def acknowledge(self): task_id = self.currentrow[0] status = self.currentrow[5] if status in ('ERROR', 'DONE WITH ERRORS'): bibsched_set_status(task_id, 'ACK ' + status) self.display_in_footer("Acknowledged error") def sleep(self): task_id = self.currentrow[0] process = self.currentrow[1].split(':')[0] status = self.currentrow[5] if status in ('RUNNING', 'CONTINUING'): bibsched_send_signal(process, task_id, signal.SIGUSR1) self.display_in_footer("SLEEP signal sent to task #%s" % task_id) else: self.display_in_footer("Cannot put to sleep non-running processes") def kill(self): task_id = self.currentrow[0] process = self.currentrow[1] status = self.currentrow[5] if status in ('RUNNING', 'CONTINUING', 'ABOUT TO STOP', 'ABOUT TO SLEEP'): if self._display_YN_box("Are you sure you want to kill the %s process %s?" % (process, task_id)): bibsched_send_signal(process, task_id, signal.SIGKILL) bibsched_set_status(task_id, 'KILLED') self.display_in_footer("KILL signal sent to task #%s" % task_id) else: self.display_in_footer("Cannot kill non-running processes") def stop(self): task_id = self.currentrow[0] process = self.currentrow[1] status = self.currentrow[5] if status in ('RUNNING', 'CONTINUING'): bibsched_send_signal(process, task_id, signal.SIGTERM) self.display_in_footer("TERM signal sent to task #%s" % task_id) else: self.display_in_footer("Cannot stop non-running processes") def delete(self): task_id = self.currentrow[0] status = self.currentrow[5] if status not in ('RUNNING', 'CONTINUING', 'SLEEPING', 'SCHEDULED', 'ABOUT TO STOP', 'ABOUT TO SLEEP'): bibsched_set_status(task_id, "%s_DELETED" % status) self.display_in_footer("process deleted") self.selected_line = max(self.selected_line, 2) else: self.display_in_footer("Cannot delete running processes") def init(self): task_id = self.currentrow[0] status = self.currentrow[5] if status not in ('RUNNING', 'CONTINUING', 'SLEEPING'): bibsched_set_status(task_id, "WAITING") bibsched_set_progress(task_id, "") self.display_in_footer("process initialised") else: self.display_in_footer("Cannot initialise running processes") #def change_select_mode(self): #if self.move_mode: #self.move_mode = 0 #else: #status = self.currentrow[5] #if status in ("RUNNING" , "CONTINUING" , "SLEEPING"): #self.display_in_footer("cannot move running processes!") #else: #self.move_mode = 1 #self.stdscr.refresh() def change_auto_mode(self): if self.auto_mode: program = os.path.join(CFG_BINDIR, "bibsched") COMMAND = "%s -q stop" % program os.system(COMMAND) self.auto_mode = 0 else: program = os.path.join( CFG_BINDIR, "bibsched") COMMAND = "%s -q start" % program os.system(COMMAND) self.auto_mode = 1 self.move_mode = 0 self.stdscr.refresh() #def move_up(self): #self.display_in_footer("not implemented yet") #self.stdscr.refresh() #def move_down(self): #self.display_in_footer("not implemented yet") #self.stdscr.refresh() def put_line(self, row, header=False): col_w = [7 , 15, 10, 21, 7, 11, 25] maxx = self.width if self.y == self.selected_line - self.first_visible_line and self.y > 1: #if self.auto_mode: #attr = self.curses.color_pair(2) + self.curses.A_STANDOUT + self.curses.A_BOLD + self.current.A_REVERSE ##elif self.move_mode: ##attr = self.curses.color_pair(7) + self.curses.A_STANDOUT + self.curses.A_BOLD #else: #attr = self.curses.color_pair(8) + self.curses.A_STANDOUT + self.curses.A_BOLD + self.current.A_REVERSE self.item_status = row[5] self.currentrow = row if self.y == 0: if self.auto_mode: attr = self.curses.color_pair(2) + self.curses.A_STANDOUT + self.curses.A_BOLD #elif self.move_mode: #attr = self.curses.color_pair(7) + self.curses.A_STANDOUT + self.curses.A_BOLD else: attr = self.curses.color_pair(8) + self.curses.A_STANDOUT + self.curses.A_BOLD elif row[5] == "DONE": attr = self.curses.color_pair(5) + self.curses.A_BOLD elif row[5] == "STOPPED": attr = self.curses.color_pair(6) + self.curses.A_BOLD elif row[5].find("ERROR") > -1: attr = self.curses.color_pair(4) + self.curses.A_BOLD elif row[5] == "WAITING": attr = self.curses.color_pair(3) + self.curses.A_BOLD elif row[5] in ("RUNNING","CONTINUING") : attr = self.curses.color_pair(2) + self.curses.A_BOLD elif not header and row[8]: attr = self.curses.A_BOLD else: attr = self.curses.A_NORMAL if self.y == self.selected_line - self.first_visible_line and self.y > 1: self.current_attr = attr attr += self.curses.A_REVERSE if header: # Dirty hack. put_line should be better refactored. # row contains one less element: arguments myline = str(row[0]).ljust(col_w[0]) myline += str(row[1]).ljust(col_w[1]) myline += str(row[2])[:19].ljust(col_w[2]) myline += str(row[3]).ljust(col_w[3]) myline += str(row[4]).ljust(col_w[4]) myline += str(row[5]).ljust(col_w[5]) myline += str(row[6]).ljust(col_w[6]) else: myline = str(row[0]).ljust(col_w[0]) myline += (str(row[1]) + (row[8] and ' [%s]' % row[8] or '')).ljust(col_w[1]) myline += str(row[2]).ljust(col_w[2]) myline += str(row[3])[:19].ljust(col_w[3]) myline += str(row[4]).ljust(col_w[4]) myline += str(row[5]).ljust(col_w[5]) myline += str(row[6]).ljust(col_w[6]) myline = myline.ljust(maxx) try: self.stdscr.addnstr(self.y, 0, myline, maxx, attr) except self.curses.error: pass self.y = self.y+1 def display_in_footer(self, footer, i = 0, print_time_p=0): if print_time_p: footer = "%s %s" % (footer, time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())) maxx = self.stdscr.getmaxyx()[1] footer = footer.ljust(maxx) if self.auto_mode: colorpair = 2 #elif self.move_mode: #colorpair = 7 else: colorpair = 1 try: self.stdscr.addnstr(self.y - i, 0, footer, maxx - 1, self.curses.A_STANDOUT + self.curses.color_pair(colorpair) + self.curses.A_BOLD ) except self.curses.error: pass def repaint(self): if server_pid(): self.auto_mode = 1 else: if self.auto_mode == 1: self.curses.beep() self.auto_mode = 0 self.y = 0 self.stdscr.clear() self.height, self.width = self.stdscr.getmaxyx() maxy = self.height - 2 #maxx = self.width self.put_line(("ID", "PROC [PRI]", "USER", "RUNTIME", "SLEEP", "STATUS", "PROGRESS"), True) self.put_line(("------", "---------", "----", "-------------------", "-----", "-----", "--------"), True) if self.selected_line > maxy + self.first_visible_line - 1: self.first_visible_line = self.selected_line - maxy + 1 if self.selected_line < self.first_visible_line + 2: self.first_visible_line = self.selected_line - 2 for row in self.rows[self.first_visible_line:self.first_visible_line+maxy-2]: self.put_line(row) self.y = self.stdscr.getmaxyx()[0] - 1 if self.auto_mode: self.display_in_footer(self.footer_auto_mode, print_time_p=1) #elif self.move_mode: #self.display_in_footer(self.footer_move_mode, print_time_p=1) else: self.display_in_footer(self.footer_select_mode, print_time_p=1) footer2 = "" if self.item_status.find("DONE") > -1 or self.item_status in ("ERROR", "STOPPED", "KILLED"): footer2 += self.footer_stopped_item elif self.item_status in ("RUNNING", "CONTINUING", "ABOUT TO STOP", "ABOUT TO SLEEP"): footer2 += self.footer_running_item elif self.item_status == "SLEEPING": footer2 += self.footer_sleeping_item elif self.item_status == "WAITING": footer2 += self.footer_waiting_item self.display_in_footer(footer2, 1) self.stdscr.refresh() def start(self, stdscr): if self.curses.has_colors(): self.curses.start_color() self.curses.init_pair(8, self.curses.COLOR_WHITE, self.curses.COLOR_BLACK) self.curses.init_pair(1, self.curses.COLOR_WHITE, self.curses.COLOR_RED) self.curses.init_pair(2, self.curses.COLOR_GREEN, self.curses.COLOR_BLACK) self.curses.init_pair(3, self.curses.COLOR_MAGENTA, self.curses.COLOR_BLACK) self.curses.init_pair(4, self.curses.COLOR_RED, self.curses.COLOR_BLACK) self.curses.init_pair(5, self.curses.COLOR_BLUE, self.curses.COLOR_BLACK) self.curses.init_pair(6, self.curses.COLOR_CYAN, self.curses.COLOR_BLACK) self.curses.init_pair(7, self.curses.COLOR_YELLOW, self.curses.COLOR_BLACK) self.stdscr = stdscr self.base_panel = self.curses.panel.new_panel( self.stdscr ) self.base_panel.bottom() self.curses.panel.update_panels() self.height, self.width = stdscr.getmaxyx() self.stdscr.clear() if server_pid(): self.auto_mode = 1 ring = 4 while self.running: if ring == 4: if self.display == 1: table = "schTASK" where = "and status='DONE'" order = "runtime DESC" elif self.display == 2: table = "schTASK" where = "and status<>'DONE'" order = "runtime ASC" else: table = "hstTASK" order = "runtime DESC" where = '' self.rows = run_sql("""SELECT id,proc,user,runtime,sleeptime,status,progress,arguments,priority FROM %s WHERE status NOT LIKE '%%DELETED%%' %s ORDER BY %s""" % (table, where, order)) ring = 0 self.repaint() ring += 1 char = -1 try: char = timed_out(self.stdscr.getch, 1) if char == 27: # escaping sequence char = self.stdscr.getch() if char == 79: # arrow char = self.stdscr.getch() if char == 65: #arrow up char = self.curses.KEY_UP elif char == 66: #arrow down char = self.curses.KEY_DOWN elif char == 72: char = self.curses.KEY_PPAGE elif char == 70: char = self.curses.KEY_NPAGE elif char == 91: char = self.stdscr.getch() if char == 53: char = self.stdscr.getch() if char == 126: char = self.curses.KEY_HOME except TimedOutExc: char = -1 self.handle_keys(char) _refresh_tasks = True def _bibsched_sig_info(sig, frame): """Signal handler for the 'USR2' signal sent by a finished task.""" global _refresh_tasks _refresh_tasks = True write_message('A task has terminated. Refreshing the task list.') class BibSched: def __init__(self): self.helper_modules = CFG_BIBTASK_VALID_TASKS self.scheduled = None signal.signal(signal.SIGUSR2, _bibsched_sig_info) def tasks_safe_p(self, proc1, proc2): """Return True when the two tasks can run concurrently.""" return proc1 != proc2 and not proc1.startswith('bibupload') and not proc2.startswith('bibupload') def get_tasks_to_sleep_and_stop(self, proc, task_set): """Among the task_set, return the dict of task to stop and the dict of task to sleep. """ min_prio = None min_task_id = None min_proc = None to_stop = {} for this_task_id, (this_proc, this_priority) in task_set.iteritems(): if self.tasks_safe_p(proc, this_proc): if min_prio is None or this_priority < min_prio: min_prio = this_priority min_task_id = this_task_id min_proc = this_proc else: to_stop[this_task_id] = (this_proc, this_priority) if len(task_set) < CFG_BIBSCHED_MAX_NUMBER_CONCURRENT_TASKS and not to_stop: ## All the task are safe and there are enough resources return {}, {} else: if to_stop: return to_stop, {} else: return {}, {min_task_id : (min_proc, min_prio)} def get_running_tasks(self, task_status): running_tasks = {} for status in ('RUNNING', 'CONTINUING', 'SLEEPING', 'ABOUT TO SLEEP', 'ABOUT TO STOP', 'SCHEDULED'): running_tasks.update(copy.deepcopy(task_status.get(status, {}))) return running_tasks def split_running_tasks_by_priority(self, task_status, task_id, priority): """Return two sets (by dict): the set of task_ids with lower priority and those with higher or equal priority.""" higher = {} lower = {} for other_task_id, (task_proc, dummy, task_priority) in self.get_running_tasks(task_status).iteritems(): if task_id == other_task_id: continue if task_priority < priority: lower[other_task_id] = (task_proc, task_priority) else: higher[other_task_id] = (task_proc, task_priority) return lower, higher def bibupload_in_the_queue(self, task_id, runtime): """Check if bibupload is scheduled/running before runtime. This is useful in order to enforce bibupload order.""" return run_sql("SELECT id, status FROM schTASK WHERE proc='bibupload' AND runtime<=%s AND id<%s AND (status='RUNNING' OR status='WAITING' OR status='CONTINUING' OR status='SLEEPING' OR status='SCHEDULED' OR status='ABOUT TO SLEEP' OR status='ABOUT TO STOP' OR status='ERROR' OR status='DONE WITH ERRORS')", (runtime, task_id)) def task_really_running_p(self, proc, task_id): """Ping the task and update its status to error if necessary.""" if run_sql("SELECT id FROM schTASK WHERE id=%s AND status in ('CONTINUING', 'RUNNING', 'ABOUT TO STOP', 'SLEEPING', 'ABOUT TO SLEEP')", (task_id, )): if not get_task_pid(proc, task_id): bibsched_set_status(task_id, "ERROR") return False return True return False def handle_row(self, task_status, task_id, proc, runtime, status, priority): """Perform needed action of the row representing a task. Return True when task_status need to be refreshed""" #write_message('%s id: %s, proc: %s, runtime: %s, status: %s, priority: %s' % (task_status, task_id, proc, runtime, status, priority)) #write_message("task_id: %s, proc: %s, runtime: %s, status: %s, priority: %s" % (task_id, proc, runtime, status, priority)) if task_id in task_status['RUNNING'] or task_id in task_status['CONTINUING']: if not self.task_really_running_p(proc, task_id): #write_message('update required') return True elif task_id in task_status['WAITING'] or task_id in task_status['SLEEPING']: #write_message("Trying to run %s" % task_id) if self.scheduled is not None and task_id != self.scheduled: ## Another task is scheduled for running. #write_message("cannot run because %s is already scheduled" % self.scheduled) return False nothing_was_scheduled = self.scheduled is None res = self.bibupload_in_the_queue(task_id, runtime) if _refresh_tasks: ## Some tasks have finished. Better refresh things... return True if priority < 0: return False if res: ## All bibupload must finish before. for (atask_id, astatus) in res: if astatus in ('ERROR', 'DONE WITH ERRORS'): raise StandardError('BibSched had to halt because a bibupload with id %s has status %s. Please do your checks and delete/reinitialize the failed bibupload.' % (atask_id, astatus)) #write_message("cannot run because these bibupload are scheduled: %s" % res) Log("Task #%d (%s) not yet run because there is a bibupload in the queue" % (task_id, proc)) return False self.scheduled = task_id if nothing_was_scheduled: Log("Task #%d (%s) scheduled for running" % (task_id, proc)) #write_message('Scheduled task %s' % self.scheduled) ## Schedule the task for running. lower, higher = self.split_running_tasks_by_priority(task_status, task_id, priority) #write_message('lower: %s' % lower) #write_message('higher: %s' % higher) for other_task_id, (other_proc, dummy) in higher.iteritems(): if not self.tasks_safe_p(proc, other_proc): ## There's at least a higher priority task running that ## cannot run at the same time of the given task. ## We give up #write_message("cannot run because task_id: %s, proc: %s is the queue and incompatible" % (other_task_id, other_proc)) return False ## No higer priority task have issue with the given task. if len(higher) >= CFG_BIBSCHED_MAX_NUMBER_CONCURRENT_TASKS: ## Not enough resources. #write_message("cannot run because all resource (%s) are used (%s), higher: %s" % (CFG_BIBSCHED_MAX_NUMBER_CONCURRENT_TASKS, len(higher), higher)) return False ## We check if it is necessary to stop/put to sleep some lower priority ## task. tasks_to_stop, tasks_to_sleep = self.get_tasks_to_sleep_and_stop(proc, lower) #write_message('tasks_to_stop: %s' % tasks_to_stop) #write_message('tasks_to_sleep: %s' % tasks_to_sleep) if tasks_to_stop and priority < 10: ## Only tasks with priority higher than 10 have the power ## to put task to stop. #write_message("cannot run because there are task to stop: %s and priority < 10" % tasks_to_stop) return False procname = proc.split(':')[0] if not tasks_to_stop and not tasks_to_sleep: self.scheduled = None if status in ("SLEEPING", "ABOUT TO SLEEP"): bibsched_send_signal(proc, task_id, signal.SIGCONT) Log("Task #%d (%s) woken up" % (task_id, proc)) return True elif procname in self.helper_modules: program = os.path.join(CFG_BINDIR, procname) ## Trick to log in bibsched.log the task exiting - exit_str = '&& echo "`date "+%%Y-%%m-%%d %%H:%%M:%%S"` --> Task #%d (%s) exited" >> %s/bibsched.log' % (task_id, proc, CFG_LOGDIR) - COMMAND = "(%s %s > /dev/null 2> /dev/null %s)&" % (program, str(task_id), exit_str) + exit_str = '&& echo "`date "+%%Y-%%m-%%d %%H:%%M:%%S"` --> Task #%d (%s) exited" >> %s' % (task_id, proc, os.path.join(CFG_LOGDIR, 'bibsched.log')) + COMMAND = "(%s %s > /dev/null 2> /dev/null %s) &" % (program, str(task_id), exit_str) bibsched_set_status(task_id, "SCHEDULED") Log("Task #%d (%s) started" % (task_id, proc)) os.system(COMMAND) return True else: ## It's not still safe to run the task. for other_task_id in tasks_to_stop: if other_task_id not in task_status['ABOUT TO STOP']: bibsched_set_status(other_task_id, 'ABOUT TO STOP') bibsched_send_signal(proc, other_task_id, signal.SIGTERM) for other_task_id in tasks_to_sleep: if other_task_id not in task_status['ABOUT TO SLEEP']: bibsched_set_status(other_task_id, 'ABOUT TO SLEEP') bibsched_send_signal(proc, other_task_id, signal.SIGUSR1) return True + def uniformize_bibupload_priorities(self): + """ + If a biupload with priority 10 happens to be submitted + after a bibupload with priority less than 10 (say 9), the first + bibupload is blocked until the second is executed. To give real + priority to the first bibupload, we should rise the priority of the + second one up to the priority of the first. + """ + max_priority = run_sql("SELECT max(priority) FROM schTASK WHERE proc='bibupload' AND status NOT LIKE 'DONE' AND status NOT like '%DELETED%' AND (runtime <= NOW() OR status='WAITING' OR status='SCHEDULED')") + if max_priority: + max_priority = max_priority[0][0] + run_sql("UPDATE schTASK SET priority=%s WHERE proc='bibupload' AND status NOT LIKE 'DONE' AND status NOT like '%%DELETED%%' AND (runtime <= NOW() OR status='WAITING' OR status='SCHEDULED')", (max_priority, )) + def watch_loop(self): global _refresh_tasks def get_rows(): """Return all the rows to work on.""" - return run_sql("SELECT id,proc,runtime,status,priority FROM schTASK WHERE status NOT LIKE 'DONE' AND status NOT LIKE '%%DELETED%%' AND (runtime<=NOW() OR status='RUNNING' OR status='ABOUT TO STOP' OR status='ABOUT TO SLEEP' OR status='SLEEPING' OR status='SCHEDULED') ORDER BY priority DESC, runtime ASC, id ASC") + return run_sql("SELECT id,proc,runtime,status,priority FROM schTASK WHERE status NOT LIKE 'DONE' AND status NOT LIKE '%DELETED%' AND (runtime<=NOW() OR status='RUNNING' OR status='ABOUT TO STOP' OR status='ABOUT TO SLEEP' OR status='SLEEPING' OR status='SCHEDULED' OR status='CONTINUING') ORDER BY priority DESC, runtime ASC, id ASC") def get_task_status(rows): """Return a handy data structure to analize the task status.""" ret = { 'RUNNING' : {}, 'CONTINUING' : {}, 'SLEEPING' : {}, 'WAITING' : {}, 'ABOUT TO STOP' : {}, 'ABOUT TO SLEEP' : {}, 'ERROR' : {}, 'DONE WITH ERRORS' : {}, 'SCHEDULED' : {} } for (id, proc, runtime, status, priority) in rows: if status not in ret: ret[status] = {} ret[status][id] = (proc, runtime, priority) return ret ## Cleaning up scheduled task not run because of bibsched being ## interrupted in the middle. run_sql("UPDATE schTASK SET status='WAITING' WHERE status='SCHEDULED'") try: while True: + self.uniformize_bibupload_priorities() rows = get_rows() _refresh_tasks = False task_status = get_task_status(rows) if task_status['ERROR'] or task_status['DONE WITH ERRORS']: raise StandardError('BibSched had to halt because at least a task is in status ERROR (%s) or DONE WITH ERRORS (%s)' % (task_status['ERROR'], task_status['DONE WITH ERRORS'])) for row in rows: if _refresh_tasks or self.handle_row(task_status, *row): # Things have changed let's restart break time.sleep(CFG_BIBSCHED_REFRESHTIME) except: register_exception(alert_admin=True) try: register_emergency('Emergency from %s: BibSched had to halt!' % CFG_SITE_URL) except NotImplementedError: pass raise class TimedOutExc(Exception): def __init__(self, value = "Timed Out"): Exception.__init__(self) self.value = value def __str__(self): return repr(self.value) def timed_out(f, timeout, *args, **kwargs): def handler(signum, frame): raise TimedOutExc() old = signal.signal(signal.SIGALRM, handler) signal.alarm(timeout) try: result = f(*args, **kwargs) finally: signal.signal(signal.SIGALRM, old) signal.alarm(0) return result def Log(message): log = open(CFG_LOGDIR + "/bibsched.log","a") log.write(time.strftime("%Y-%m-%d %H:%M:%S --> ", time.localtime())) log.write(message) log.write("\n") log.close() def redirect_stdout_and_stderr(): "This function redirects stdout and stderr to bibsched.log and bibsched.err file." old_stdout = sys.stdout sys.stdout = open(CFG_LOGDIR + "/bibsched.log", "a") sys.stderr = open(CFG_LOGDIR + "/bibsched.err", "a") return old_stdout def usage(exitcode=1, msg=""): """Prints usage info.""" if msg: sys.stderr.write("Error: %s.\n" % msg) sys.stderr.write("""\ Usage: %s [options] [start|stop|restart|monitor|status] The following commands are available for bibsched: start start bibsched in background - stop stop a running bibsched - restart restart a running bibsched + stop stop running bibtasks and the bibsched daemon safely + halt halt running bibsched while keeping bibtasks running + restart restart running bibsched monitor enter the interactive monitor status get report about current status of the queue purge purge the scheduler queue from old tasks Command options: -d, --daemon \t Launch BibSched in the daemon mode (deprecated, use 'start') General options: -h, --help \t Print this help. -V, --version \t Print version information. Status options: -s, --status=LIST\t Which BibTask status should be considered (default is Running,waiting) -S, --since=TIME\t Since how long time to consider tasks e.g.: 30m, 2h, 1d (default is all) -t, --tasks=LIST\t Comma separated list of BibTask to consider (default \t is all) Purge options: -s, --status=LIST\t Which BibTask status should be considered (default is DONE) -S, --since=TIME\t Since how long time to consider tasks e.g.: 30m, 2h, 1d (default is %s days) -t, --tasks=LIST\t Comma separated list of BibTask to consider (default \t is %s) """ % (sys.argv[0], CFG_BIBSCHED_GC_TASKS_OLDER_THAN, ','.join(CFG_BIBSCHED_GC_TASKS_TO_REMOVE + CFG_BIBSCHED_GC_TASKS_TO_ARCHIVE))) #sys.stderr.write(" -v, --verbose=LEVEL \t Verbose level (0=min, 1=default, 9=max).\n") sys.exit(exitcode) pidfile = os.path.join(CFG_PREFIX, 'var', 'run', 'bibsched.pid') def error(msg): print >> sys.stderr, "error: " + msg sys.exit(1) def server_pid(): # The pid must be stored on the filesystem try: pid = int(open(pidfile).read()) except IOError: return None # Even if the pid is available, we check if it corresponds to an # actual process, as it might have been killed externally try: os.kill(pid, signal.SIGCONT) except OSError: return None return pid def start(verbose = True): """ Fork this process in the background and start processing requests. The process PID is stored in a pid file, so that it can be stopped later on.""" if verbose: sys.stdout.write("starting bibsched: ") sys.stdout.flush() pid = server_pid() if pid: error("another instance of bibsched (pid %d) is running" % pid) # start the child process using the "double fork" technique pid = os.fork() if pid > 0: sys.exit(0) os.setsid() os.chdir('/') pid = os.fork() if pid > 0: if verbose: sys.stdout.write('pid %d\n' % pid) Log("daemon started (pid %d)" % pid) open(pidfile, 'w').write('%d' % pid) return sys.stdin.close() redirect_stdout_and_stderr() sched = BibSched() sched.watch_loop() return -def stop(verbose=True, soft=False): +def halt(verbose=True, soft=False): pid = server_pid() if not pid: if soft: print >> sys.stderr, 'bibsched seems not to be running.' return else: error('bibsched seems not to be running.') try: os.kill(pid, signal.SIGKILL) except OSError: print >> sys.stderr, 'no bibsched process found' Log("daemon stopped (pid %d)" % pid) if verbose: print "stopping bibsched: pid %d" % pid os.unlink(pidfile) return def monitor(verbose = True): old_stdout = redirect_stdout_and_stderr() manager = Manager(old_stdout) return def write_message(msg, stream=None, verbose=1): """Write message and flush output stream (may be sys.stdout or sys.stderr). Useful for debugging stuff.""" if stream is None: stream = sys.stdout if msg: if stream == sys.stdout or stream == sys.stderr: stream.write(time.strftime("%Y-%m-%d %H:%M:%S --> ", time.localtime())) try: stream.write("%s\n" % msg) except UnicodeEncodeError: stream.write("%s\n" % msg.encode('ascii', 'backslashreplace')) stream.flush() else: sys.stderr.write("Unknown stream %s. [must be sys.stdout or sys.stderr]\n" % stream) def report_queue_status(verbose=True, status=None, since=None, tasks=None): """ Report about the current status of BibSched queue on standard output. """ def report_about_processes(status='RUNNING', since=None, tasks=None): """ Helper function to report about processes with the given status. """ if tasks is None: task_query = '' else: task_query = 'AND proc IN (%s)' % ( ','.join([repr(escape_string(task)) for task in tasks])) if since is None: since_query = '' else: # We're not interested in future task if since.startswith('+') or since.startswith('-'): since = since[1:] since = '-' + since since_query = "AND runtime >= '%s'" % get_datetime(since) res = run_sql("""SELECT id,proc,user,runtime,sleeptime,status,progress,priority FROM schTASK WHERE status=%%s %(task_query)s %(since_query)s ORDER BY id ASC""" % { 'task_query' : task_query, 'since_query' : since_query }, (status,)) write_message("%s processes: %d" % (status, len(res))) for (proc_id, proc_proc, proc_user, proc_runtime, proc_sleeptime, proc_status, proc_progress, proc_priority) in res: write_message(' * ID="%s" PRIORITY="%s" PROC="%s" USER="%s" RUNTIME="%s" SLEEPTIME="%s" STATUS="%s" PROGRESS="%s"' % \ (proc_id, proc_priority, proc_proc, proc_user, proc_runtime, proc_sleeptime, proc_status, proc_progress)) return write_message("BibSched queue status report for %s:" % gethostname()) if status is None: report_about_processes('Running', since, tasks) report_about_processes('Waiting', since, tasks) else: for state in status: report_about_processes(state, since, tasks) write_message("Done.") return def restart(verbose = True): - stop(verbose, soft=True) + halt(verbose, soft=True) start(verbose) return +def stop(verbose=True): + """ + * Stop bibsched + * Send stop signal to all the running tasks + * wait for all the tasks to stop + * return + """ + if verbose: + print "Stopping BibSched if running" + stop(verbose, soft=True) + run_sql("UPDATE schTASK SET status='WAITING' WHERE status='SCHEDULED'") + res = run_sql("SELECT id,proc FROM schTASK WHERE status NOT LIKE 'DONE' AND status NOT LIKE '%%DELETED%%' AND (status='RUNNING' OR status='ABOUT TO STOP' OR status='ABOUT TO SLEEP' OR status='SLEEPING' OR status='CONTINUING')") + if verbose: + print "Stopping all running BibTasks" + for task_id, proc in res: + bibsched_send_signal(proc, task_id, signal.SIGTERM) + while run_sql("SELECT id FROM schTASK WHERE status NOT LIKE 'DONE' AND status NOT LIKE '%%DELETED%%' AND (status='RUNNING' OR status='ABOUT TO STOP' OR status='ABOUT TO SLEEP' OR status='SLEEPING' OR status='CONTINUING')"): + if verbose: + sys.stdout.write('.') + sys.stdout.flush() + time.sleep(1) + if verbose: + print "\nStopped" + Log("BibSched and all BibTasks stopped") + return + def main(): from invenio.bibtask import check_running_process_user check_running_process_user() verbose = True status = None since = None tasks = None try: opts, args = getopt.gnu_getopt(sys.argv[1:], "hVdqS:s:t:", [ "help","version","daemon", "quiet", "since=", "status=", "task="]) except getopt.GetoptError, err: Log("Error: %s" % err) usage(1, err) for opt, arg in opts: if opt in ["-h", "--help"]: usage(0) elif opt in ["-V", "--version"]: print __revision__ sys.exit(0) elif opt in ["-d", "--daemon"]: redirect_stdout_and_stderr() sched = BibSched() Log("daemon started") sched.watch_loop() elif opt in ['-q', '--quiet']: verbose = False elif opt in ['-s', '--status']: status = arg.split(',') elif opt in ['-S', '--since']: since = arg elif opt in ['-t', '--task']: tasks = arg.split(',') else: usage(1) try: cmd = args [0] except IndexError: cmd = 'monitor' try: if cmd in ('status', 'purge'): { 'status' : report_queue_status, 'purge' : gc_tasks, } [cmd] (verbose, status, since, tasks) else: {'start': start, - 'stop': stop, + 'halt': halt, + 'stop': stop, 'restart': restart, 'monitor': monitor} [cmd] (verbose) except KeyError: usage(1, 'unkown command: %s' % cmd) return if __name__ == '__main__': main() diff --git a/modules/bibsched/lib/bibtask.py b/modules/bibsched/lib/bibtask.py index 9b6726a95..eacd74f82 100644 --- a/modules/bibsched/lib/bibtask.py +++ b/modules/bibsched/lib/bibtask.py @@ -1,867 +1,870 @@ # -*- coding: utf-8 -*- ## ## This file is part of CDS Invenio. ## Copyright (C) 2002, 2003, 2004, 2005, 2006, 2007, 2008 CERN. ## ## CDS Invenio is free software; you can redistribute it and/or ## modify it under the terms of the GNU General Public License as ## published by the Free Software Foundation; either version 2 of the ## License, or (at your option) any later version. ## ## CDS Invenio is distributed in the hope that it will be useful, but ## WITHOUT ANY WARRANTY; without even the implied warranty of ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU ## General Public License for more details. ## ## You should have received a copy of the GNU General Public License ## along with CDS Invenio; if not, write to the Free Software Foundation, Inc., ## 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA. """CDS Invenio Bibliographic Task Class. BibTask class. A BibTask is an executable under CFG_BINDIR, whose name is stored in bibtask_config.CFG_BIBTASK_VALID_TASKS. A valid task must call the task_init function with the proper parameters. Generic task related parameters (user, sleeptime, runtime, task_id, task_name verbose) go to _task_params global dictionary accessible through task_get_task_param. Option specific to the particular BibTask go to _options global dictionary and are accessible via task_get_option/task_set_option. In order to log something properly, just use write_message(s) with the desired verbose level. task_update_status and task_update_progress can be used to update the status of the task (DONE, FAILED, DONE WITH ERRORS...) and it's progress (1 out 100..) within the bibsched monitor. It is possible to enqueue a BibTask via API call by means of task_low_level_submission. """ __revision__ = "$Id$" import getopt import getpass import marshal import os import pwd import re import signal import sys import time import datetime import popen2 import traceback import logging import logging.handlers from invenio.dbquery import run_sql, _db_login from invenio.access_control_engine import acc_authorize_action from invenio.config import CFG_PREFIX, CFG_BINDIR, CFG_LOGDIR, \ CFG_BIBSCHED_PROCESS_USER from invenio.errorlib import register_exception from invenio.access_control_config import CFG_EXTERNAL_AUTH_USING_SSO, \ CFG_EXTERNAL_AUTHENTICATION from invenio.webuser import get_user_preferences, get_email from invenio.bibtask_config import CFG_BIBTASK_VALID_TASKS, \ CFG_BIBTASK_DEFAULT_TASK_SETTINGS # Which tasks don't need to ask the user for authorization? CFG_VALID_PROCESSES_NO_AUTH_NEEDED = ("bibupload", ) CFG_TASK_IS_NOT_A_DEAMON = ("bibupload", ) def fix_argv_paths(paths, argv=None): """Given the argv vector of cli parameters, and a list of path that can be relative and may have been specified within argv, it substitute all the occurencies of these paths in argv. argv is changed in place and returned. """ if argv is None: argv = sys.argv for path in paths: for count in xrange(len(argv)): if path == argv[count]: argv[count] = os.path.realpath(path) return argv def task_low_level_submission(name, user, *argv): """Let special lowlevel enqueuing of a task on the bibsche queue. @param name is the name of the bibtask. It must be a valid executable under CFG_BINDIR. @param user is a string that will appear as the "user" submitting the task. Since task are submitted via API it make sense to set the user to the name of the module/function that called task_low_level_submission. @param argv will be merged with the default setting of the given task as they can be found in bibtask_config. In order to know which variable are valid and which is the semantic, please have a glimpse at bibtask_config and to the source of the task_submit_elaborate_specific_parameter function of the desired bibtask. @return the task_id when the task is correctly enqueued. Use with care! Please use absolute paths in argv! """ def get_priority(argv): """Try to get the priority by analysing the arguments.""" priority = 0 try: stripped_argv = [arg for arg in argv if not arg.startswith('-') or arg.startswith('-P') or arg.startswith('--priority')] opts, args = getopt.gnu_getopt(stripped_argv, 'P:', ['priority=']) for opt in opts: if opt[0] in ('-P', '--priority'): priority = opt[1] except: pass return priority task_id = None try: if not name in CFG_BIBTASK_VALID_TASKS: raise StandardError('%s is not a valid task name' % name) priority = get_priority(argv) argv = tuple([os.path.join(CFG_BINDIR, name)] + list(argv)) ## submit task: task_id = run_sql("""INSERT INTO schTASK (proc,user, runtime,sleeptime,status,progress,arguments,priority) VALUES (%s,%s,NOW(),'','WAITING','',%s,%s)""", (name, user, marshal.dumps(argv), priority)) except Exception: register_exception() if task_id: run_sql("""DELETE FROM schTASK WHERE id=%s""", (task_id, )) raise return task_id def task_init( authorization_action="", authorization_msg="", description="", help_specific_usage="", version=__revision__, specific_params=("", []), task_stop_helper_fnc=None, task_submit_elaborate_specific_parameter_fnc=None, task_submit_check_options_fnc=None, task_run_fnc=None): """ Initialize a BibTask. @param authorization_action is the name of the authorization action connected with this task; @param authorization_msg is the header printed when asking for an authorization password; @param description is the generic description printed in the usage page; @param help_specific_usage is the specific parameter help @param task_stop_fnc is a function that will be called whenever the task is stopped @param task_submit_elaborate_specific_parameter_fnc will be called passing a key and a value, for parsing specific cli parameters. Must return True if it has recognized the parameter. Must eventually update the options with bibtask_set_option; @param task_submit_check_options must check the validity of options (via bibtask_get_option) once all the options where parsed; @param task_run_fnc will be called as the main core function. Must return False in case of errors. """ global _task_params, _options _task_params = { "version" : version, "task_stop_helper_fnc" : task_stop_helper_fnc, "task_name" : os.path.basename(sys.argv[0]), "task_specific_name" : '', "user" : '', "verbose" : 1, "sleeptime" : '', "runtime" : time.strftime("%Y-%m-%d %H:%M:%S"), "priority" : 0, "runtime_limit" : None } to_be_submitted = True if len(sys.argv) == 2 and sys.argv[1].isdigit(): _task_params['task_id'] = int(sys.argv[1]) argv = _task_get_options(_task_params['task_id'], _task_params['task_name']) to_be_submitted = False else: argv = sys.argv if type(argv) is dict: # FIXME: REMOVE AFTER MAJOR RELEASE 1.0 # This is needed for old task submitted before CLI parameters # where stored in DB and _options dictionary was stored instead. _options = argv else: _task_build_params(_task_params['task_name'], argv, description, help_specific_usage, version, specific_params, task_submit_elaborate_specific_parameter_fnc, task_submit_check_options_fnc) write_message('argv=%s' % (argv, ), verbose=9) write_message('_options=%s' % (_options, ), verbose=9) write_message('_task_params=%s' % (_task_params, ), verbose=9) if to_be_submitted: _task_submit(argv, authorization_action, authorization_msg) else: try: if not _task_run(task_run_fnc): write_message("Error occurred. Exiting.", sys.stderr) except Exception, e: write_message("Unexpected error occurred: %s." % e, sys.stderr) write_message("Traceback is:", sys.stderr) traceback.print_tb(sys.exc_info()[2]) write_message("Exiting.", sys.stderr) task_update_status("ERROR") def _task_build_params( task_name, argv, description="", help_specific_usage="", version=__revision__, specific_params=("", []), task_submit_elaborate_specific_parameter_fnc=None, task_submit_check_options_fnc=None): """ Build the BibTask params. @param argv a list of string as in sys.argv @param description is the generic description printed in the usage page; @param help_specific_usage is the specific parameter help @param task_submit_elaborate_specific_parameter_fnc will be called passing a key and a value, for parsing specific cli parameters. Must return True if it has recognized the parameter. Must eventually update the options with bibtask_set_option; @param task_submit_check_options must check the validity of options (via bibtask_get_option) once all the options where parsed; """ global _task_params, _options _options = {} if task_name in CFG_BIBTASK_DEFAULT_TASK_SETTINGS: _options.update(CFG_BIBTASK_DEFAULT_TASK_SETTINGS[task_name]) # set user-defined options: try: (short_params, long_params) = specific_params opts, args = getopt.gnu_getopt(argv[1:], "hVv:u:s:t:P:N:L:" + short_params, [ "help", "version", "verbose=", "user=", "sleep=", "time=", "priority=", "task-specific-name=", "runtime-limit=" ] + long_params) except getopt.GetoptError, err: _usage(1, err, help_specific_usage=help_specific_usage, description=description) try: for opt in opts: if opt[0] in ("-h", "--help"): _usage(0, help_specific_usage=help_specific_usage, description=description) elif opt[0] in ("-V", "--version"): print _task_params["version"] sys.exit(0) elif opt[0] in ("-u", "--user"): _task_params["user"] = opt[1] elif opt[0] in ("-v", "--verbose"): _task_params["verbose"] = int(opt[1]) elif opt[0] in ("-s", "--sleeptime"): if task_name not in CFG_TASK_IS_NOT_A_DEAMON: get_datetime(opt[1]) # see if it is a valid shift _task_params["sleeptime"] = opt[1] elif opt[0] in ("-t", "--runtime"): _task_params["runtime"] = get_datetime(opt[1]) elif opt[0] in ("-P", "--priority"): _task_params["priority"] = int(opt[1]) elif opt[0] in ("-N", "--task-specific-name"): _task_params["task_specific_name"] = opt[1] elif opt[0] in ("-L", "--runtime-limit"): _task_params["runtime_limit"] = parse_runtime_limit(opt[1]) elif not callable(task_submit_elaborate_specific_parameter_fnc) or \ not task_submit_elaborate_specific_parameter_fnc(opt[0], opt[1], opts, args): _usage(1, help_specific_usage=help_specific_usage, description=description) except StandardError, e: _usage(e, help_specific_usage=help_specific_usage, description=description) if callable(task_submit_check_options_fnc): if not task_submit_check_options_fnc(): _usage(1, help_specific_usage=help_specific_usage, description=description) def task_set_option(key, value): """Set an value to key in the option dictionary of the task""" global _options try: _options[key] = value except NameError: _options = {key : value} def task_get_option(key, default=None): """Returns the value corresponding to key in the option dictionary of the task""" try: return _options.get(key, default) except NameError: return default def task_has_option(key): """Map the has_key query to _options""" try: return _options.has_key(key) except NameError: return False def task_get_task_param(key, default=None): """Returns the value corresponding to the particular task param""" try: return _task_params.get(key, default) except NameError: return default def task_set_task_param(key, value): """Set the value corresponding to the particular task param""" global _task_params try: _task_params[key] = value except NameError: _task_params = {key : value} def task_update_progress(msg): """Updates progress information in the BibSched task table.""" write_message("Updating task progress to %s." % msg, verbose=9) return run_sql("UPDATE schTASK SET progress=%s where id=%s", (msg, _task_params["task_id"])) def task_update_status(val): """Updates status information in the BibSched task table.""" write_message("Updating task status to %s." % val, verbose=9) return run_sql("UPDATE schTASK SET status=%s where id=%s", (val, _task_params["task_id"])) def task_read_status(): """Read status information in the BibSched task table.""" res = run_sql("SELECT status FROM schTASK where id=%s", (_task_params['task_id'],), 1) try: out = res[0][0] except: out = 'UNKNOWN' return out def write_messages(msgs, stream=sys.stdout, verbose=1): """Write many messages through write_message""" for msg in msgs.split('\n'): write_message(msg, stream, verbose) def write_message(msg, stream=sys.stdout, verbose=1): """Write message and flush output stream (may be sys.stdout or sys.stderr). Useful for debugging stuff.""" if msg and _task_params['verbose'] >= verbose: if stream == sys.stdout: print "%s --> %s" % (time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), msg) logging.info(msg) elif stream == sys.stderr: print >> sys.stderr, "%s --> %s" % (time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), msg) logging.error(msg) else: sys.stderr.write("Unknown stream %s. [must be sys.stdout or sys.stderr]\n" % stream) _RE_SHIFT = re.compile("([-\+]{0,1})([\d]+)([dhms])") def get_datetime(var, format_string="%Y-%m-%d %H:%M:%S"): """Returns a date string according to the format string. It can handle normal date strings and shifts with respect to now.""" date = time.time() factors = {"d":24*3600, "h":3600, "m":60, "s":1} m = _RE_SHIFT.match(var) if m: sign = m.groups()[0] == "-" and -1 or 1 factor = factors[m.groups()[2]] value = float(m.groups()[1]) date = time.localtime(date + sign * factor * value) date = time.strftime(format_string, date) else: date = time.strptime(var, format_string) date = time.strftime(format_string, date) return date _RE_RUNTIMELIMIT_FULL = re.compile(r"(?P\w+)(\s+(?P\d\d?(:\d\d?)?)(-(?P\d\d?(:\d\d?)?))?)?") _RE_RUNTIMELIMIT_HOUR = re.compile(r'(?P\d\d?):(?P\d\d?)') def parse_runtime_limit(value): """ Parsing CLI option for runtime limit, supplied as VALUE. Value could be something like: Sunday 23:00-05:00, the format being Wee[kday][ hh[:mm][-hh:[mm]]]. The function would return the first range datetime in which now() is contained. """ def extract_time(value): value = _RE_RUNTIMELIMIT_HOUR.search(value).groupdict() hour = int(value['hour']) % 24 minutes = (value['minutes'] is not None and int(value['minutes']) or 0) % 60 return hour * 3600 + minutes * 60 today = datetime.datetime.today() try: g = _RE_RUNTIMELIMIT_FULL.search(value) if not g: raise ValueError pieces = g.groupdict() weekday = { 'mon' : 0, 'tue' : 1, 'wed' : 2, 'thu' : 3, 'fri' : 4, 'sat' : 5, 'sun' : 6, }[pieces['weekday'][:3].lower()] today_weekday = today.isoweekday() - 1 first_occasion_day = -((today_weekday - weekday) % 7) * 24 * 3600 next_occasion_day = first_occasion_day + 7 * 24 * 3600 if pieces['begin'] is None: pieces['begin'] = '00:00' if pieces['end'] is None: pieces['end'] = '00:00' beginning_time = extract_time(pieces['begin']) ending_time = extract_time(pieces['end']) if beginning_time >= ending_time: ending_time += 24 * 3600 reference_time = time.mktime(datetime.datetime(today.year, today.month, today.day).timetuple()) first_range = ( reference_time + first_occasion_day + beginning_time, reference_time + first_occasion_day + ending_time ) second_range = ( reference_time + next_occasion_day + beginning_time, reference_time + next_occasion_day + ending_time ) return first_range, second_range except: raise ValueError, '"%s" does not seem to be correct format for parse_runtime_limit() (Wee[kday][ hh[:mm][-hh:[mm]]]).' % value def task_sleep_now_if_required(can_stop_too=False): """This function should be called during safe state of BibTask, e.g. after flushing caches or outside of run_sql calls. """ write_message('Entering task_sleep_now_if_required with signal_request=%s' % _task_params['signal_request'], verbose=9) if _task_params['signal_request'] == 'sleep': _task_params['signal_request'] = None write_message("sleeping...") task_update_status("SLEEPING") signal.pause() # wait for wake-up signal elif _task_params['signal_request'] == 'ctrlz': _task_params['signal_request'] = None signal.signal(signal.SIGTSTP, signal.SIG_DFL) write_message("sleeping...") task_update_status("SLEEPING") os.kill(os.getpid(), signal.SIGTSTP) time.sleep(1) elif _task_params['signal_request'] == 'ctrlc' and can_stop_too: _task_params['signal_request'] = None signal.signal(signal.SIGINT, signal.SIG_DFL) write_message("stopped") task_update_status("STOPPED") os.kill(os.getpid(), signal.SIGINT) time.sleep(1) elif _task_params['signal_request'] == 'stop' and can_stop_too: _task_params['signal_request'] = None write_message("stopped") task_update_status("STOPPED") sys.exit(0) def authenticate(user, authorization_action, authorization_msg=""): """Authenticate the user against the user database. Check for its password, if it exists. Check for authorization_action access rights. Return user name upon authorization success, do system exit upon authorization failure. """ # With SSO it's impossible to check for pwd if CFG_EXTERNAL_AUTH_USING_SSO or os.path.basename(sys.argv[0]) in CFG_VALID_PROCESSES_NO_AUTH_NEEDED: return user if authorization_msg: print authorization_msg print "=" * len(authorization_msg) if user == "": print >> sys.stdout, "\rUsername: ", try: user = sys.stdin.readline().lower().strip() except EOFError: sys.stderr.write("\n") sys.exit(1) except KeyboardInterrupt: sys.stderr.write("\n") sys.exit(1) else: print >> sys.stdout, "\rUsername:", user ## first check user: # p_un passed may be an email or a nickname: res = run_sql("select id from user where email=%s", (user,), 1) + \ run_sql("select id from user where nickname=%s", (user,), 1) if not res: print "Sorry, %s does not exist." % user sys.exit(1) else: uid = res[0][0] ok = False login_method = get_user_preferences(uid)['login_method'] if not CFG_EXTERNAL_AUTHENTICATION[login_method][0]: #Local authentication, let's see if we want passwords. res = run_sql("select id from user where id=%s " "and password=AES_ENCRYPT(email,'')", (uid,), 1) if res: ok = True if not ok: try: password_entered = getpass.getpass() except EOFError: sys.stderr.write("\n") sys.exit(1) except KeyboardInterrupt: sys.stderr.write("\n") sys.exit(1) if not CFG_EXTERNAL_AUTHENTICATION[login_method][0]: res = run_sql("select id from user where id=%s " "and password=AES_ENCRYPT(email, %s)", (uid, password_entered), 1) if res: ok = True else: if CFG_EXTERNAL_AUTHENTICATION[login_method][0].auth_user(get_email(uid), password_entered): ok = True if not ok: print "Sorry, wrong credentials for %s." % user sys.exit(1) else: ## secondly check authorization for the authorization_action: (auth_code, auth_message) = acc_authorize_action(uid, authorization_action) if auth_code != 0: print auth_message sys.exit(1) return user def _task_submit(argv, authorization_action, authorization_msg): """Submits task to the BibSched task queue. This is what people will be invoking via command line.""" ## check as whom we want to submit? check_running_process_user() ## sanity check: remove eventual "task" option: ## authenticate user: _task_params['user'] = authenticate(_task_params["user"], authorization_action, authorization_msg) ## submit task: if _task_params['task_specific_name']: task_name = '%s:%s' % (_task_params['task_name'], _task_params['task_specific_name']) else: task_name = _task_params['task_name'] write_message("storing task options %s\n" % argv, verbose=9) _task_params['task_id'] = run_sql("""INSERT INTO schTASK (proc,user, runtime,sleeptime,status,progress,arguments,priority) VALUES (%s,%s,%s,%s,'WAITING','',%s, %s)""", (task_name, _task_params['user'], _task_params["runtime"], _task_params["sleeptime"], marshal.dumps(argv), _task_params['priority'])) ## update task number: write_message("Task #%d submitted." % _task_params['task_id']) return _task_params['task_id'] def _task_get_options(task_id, task_name): """Returns options for the task 'id' read from the BibSched task queue table.""" out = {} res = run_sql("SELECT arguments FROM schTASK WHERE id=%s AND proc LIKE %s", (task_id, task_name+'%')) try: out = marshal.loads(res[0][0]) except: write_message("Error: %s task %d does not seem to exist." \ % (task_name, task_id), sys.stderr) task_update_status('ERROR') sys.exit(1) write_message('Options retrieved: %s' % (out, ), verbose=9) return out def _task_run(task_run_fnc): """Runs the task by fetching arguments from the BibSched task queue. This is what BibSched will be invoking via daemon call. The task prints Fibonacci numbers for up to NUM on the stdout, and some messages on stderr. @param task_run_fnc will be called as the main core function. Must return False in case of errors. Return True in case of success and False in case of failure.""" ## We prepare the pid file inside /prefix/var/run/taskname_id.pid check_running_process_user() try: pidfile_name = os.path.join(CFG_PREFIX, 'var', 'run', 'bibsched_task_%d.pid' % _task_params['task_id']) pidfile = open(pidfile_name, 'w') pidfile.write(str(os.getpid())) pidfile.close() except OSError: register_exception(alert_admin=True) task_update_status("ERROR") return False ## Setting up the logging system logger = logging.getLogger() stderr_logger = logging.handlers.RotatingFileHandler(os.path.join(CFG_LOGDIR, 'bibsched_task_%d.err' % _task_params['task_id']), 'a', 1*1024*1024, 10) stdout_logger = logging.handlers.RotatingFileHandler(os.path.join(CFG_LOGDIR, 'bibsched_task_%d.log' % _task_params['task_id']), 'a', 1*1024*1024, 10) formatter = logging.Formatter('%(asctime)s --> %(message)s', '%Y-%m-%d %H:%M:%S') stderr_logger.setFormatter(formatter) stdout_logger.setFormatter(formatter) logger.addHandler(stderr_logger) logger.addHandler(stdout_logger) logger.setLevel(logging.INFO) ## check task status: task_status = task_read_status() if task_status not in ("WAITING", "SCHEDULED"): write_message("Error: The task #%d is %s. I expected WAITING or SCHEDULED." % (_task_params['task_id'], task_status), sys.stderr) return False time_now = time.time() if _task_params['runtime_limit'] is not None: if not _task_params['runtime_limit'][0][0] <= time_now <= _task_params['runtime_limit'][0][1]: if time_now <= _task_params['runtime_limit'][0][0]: new_runtime = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(_task_params['runtime_limit'][0][0])) else: new_runtime = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(_task_params['runtime_limit'][1][0])) progress = run_sql("SELECT progress FROM schTASK WHERE id=%s", (_task_params['task_id'], )) if progress: progress = progress[0][0] else: progress = '' g = re.match(r'Postponed \d+ time\(s\)', progress) if g: postponed_times = int(g.group(1)) else: postponed_times = 0 run_sql("UPDATE schTASK SET runtime=%s, status='WAITING', progress=%s WHERE id=%s", (new_runtime, 'Postponed %d time(s)' % (postponed_times + 1), _task_params['task_id'])) write_message("Task #%d postponed because outside of runtime limit" % _task_params['task_id']) return True ## initialize signal handler: _task_params['signal_request'] = None signal.signal(signal.SIGUSR1, _task_sig_sleep) signal.signal(signal.SIGUSR2, signal.SIG_IGN) signal.signal(signal.SIGTSTP, _task_sig_ctrlz) signal.signal(signal.SIGTERM, _task_sig_stop) signal.signal(signal.SIGQUIT, _task_sig_stop) signal.signal(signal.SIGABRT, _task_sig_suicide) signal.signal(signal.SIGCONT, _task_sig_wakeup) signal.signal(signal.SIGINT, _task_sig_ctrlc) ## we can run the task now: write_message("Task #%d started." % _task_params['task_id']) task_update_status("RUNNING") ## run the task: _task_params['task_starting_time'] = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) sleeptime = _task_params['sleeptime'] try: try: if callable(task_run_fnc) and task_run_fnc(): task_update_status("DONE") else: task_update_status("DONE WITH ERRORS") except SystemExit: pass except: register_exception(alert_admin=True) task_update_status("ERROR") finally: task_status = task_read_status() if sleeptime: new_runtime = get_datetime(sleeptime) ## The task is a daemon. We resubmit it if task_status == 'DONE': ## It has finished in a good way. We recycle the database row run_sql("UPDATE schTASK SET runtime=%s, status='WAITING', progress='' WHERE id=%s", (new_runtime, _task_params['task_id'])) write_message("Task #%d finished and resubmitted." % _task_params['task_id']) + elif task_status == 'STOPPED': + run_sql("UPDATE schTASK SET status='WAITING', progress='' WHERE id=%s", (_task_params['task_id'], )) + write_message("Task #%d stopped and resubmitted." % _task_params['task_id']) else: ## We keep the bad result and we resubmit with another id. #res = run_sql('SELECT proc,user,sleeptime,arguments,priority FROM schTASK WHERE id=%s', (_task_params['task_id'], )) #proc, user, sleeptime, arguments, priority = res[0] #run_sql("""INSERT INTO schTASK (proc,user, #runtime,sleeptime,status,arguments,priority) #VALUES (%s,%s,%s,%s,'WAITING',%s, %s)""", #(proc, user, new_runtime, sleeptime, arguments, priority)) write_message("Task #%d finished but not resubmitted. [%s]" % (_task_params['task_id'], task_status)) else: ## we are done: write_message("Task #%d finished. [%s]" % (_task_params['task_id'], task_status)) ## Removing the pid os.remove(pidfile_name) try: # Let's signal bibsched that we have finished. from invenio.bibsched import pidfile os.kill(int(open(pidfile).read()), signal.SIGUSR2) except: pass return True def _usage(exitcode=1, msg="", help_specific_usage="", description=""): """Prints usage info.""" if msg: sys.stderr.write("Error: %s.\n" % msg) sys.stderr.write("Usage: %s [options]\n" % sys.argv[0]) if help_specific_usage: sys.stderr.write("Command options:\n") sys.stderr.write(help_specific_usage) sys.stderr.write("Scheduling options:\n") sys.stderr.write(" -u, --user=USER\tUser name to submit the" " task as, password needed.\n") sys.stderr.write(" -t, --runtime=TIME\tTime to execute the" " task (now), e.g. +15s, 5m, 3h, 2002-10-27 13:57:26\n") sys.stderr.write(" -s, --sleeptime=SLEEP\tSleeping frequency after" " which to repeat task (no), e.g.: 30m, 2h, 1d\n") sys.stderr.write(" -L --runtime-limit=LIMIT\tTime limit when it is" " allowed to execute the task, e.g. Sunday 01:00-05:00\n" "\t\t\t\twith the syntax Wee[kday][ hh[:mm][-hh:[mm]]]\n") sys.stderr.write(" -P, --priority=PRIORITY\tPriority level (an integer, 0 is default)\n") sys.stderr.write(" -N, --task-specific-name=TASK_SPECIFIC_NAME\tAdvanced option\n") sys.stderr.write("General options:\n") sys.stderr.write(" -h, --help\t\tPrint this help.\n") sys.stderr.write(" -V, --version\t\tPrint version information.\n") sys.stderr.write(" -v, --verbose=LEVEL\tVerbose level (0=min," " 1=default, 9=max).\n") if description: sys.stderr.write(description) sys.exit(exitcode) def _task_sig_sleep(sig, frame): """Signal handler for the 'sleep' signal sent by BibSched.""" write_message("task_sig_sleep(), got signal %s frame %s" % (sig, frame), verbose=9) write_message("sleeping as soon as possible...") _task_params['signal_request'] = 'sleep' _db_login(1) task_update_status("ABOUT TO SLEEP") def _task_sig_ctrlz(sig, frame): """Signal handler for the 'ctrlz' signal sent by BibSched.""" write_message("task_sig_ctrlz(), got signal %s frame %s" % (sig, frame), verbose=9) write_message("sleeping as soon as possible...") _task_params['signal_request'] = 'ctrlz' _db_login(1) task_update_status("ABOUT TO STOP") def _task_sig_wakeup(sig, frame): """Signal handler for the 'wakeup' signal sent by BibSched.""" signal.signal(signal.SIGTSTP, _task_sig_ctrlz) write_message("task_sig_wakeup(), got signal %s frame %s" % (sig, frame), verbose=9) write_message("continuing...") _task_params['signal_request'] = None _db_login(1) task_update_status("CONTINUING") def _task_sig_ctrlc(sig, frame): """Signal handler for the 'stop' signal sent by BibSched.""" write_message("task_sig_ctrlc(), got signal %s frame %s" % (sig, frame), verbose=9) write_message("stopping as soon as possible...") _db_login(1) # To avoid concurrency with an interrupted run_sql call task_update_status("STOPPING") _task_params['signal_request'] = 'ctrlc' def _task_sig_stop(sig, frame): """Signal handler for the 'stop' signal sent by BibSched.""" write_message("task_sig_stop(), got signal %s frame %s" % (sig, frame), verbose=9) write_message("stopping as soon as possible...") _db_login(1) # To avoid concurrency with an interrupted run_sql call task_update_status("STOPPING") _task_params['signal_request'] = 'stop' def _task_sig_suicide(sig, frame): """Signal handler for the 'suicide' signal sent by BibSched.""" write_message("task_sig_suicide(), got signal %s frame %s" % (sig, frame), verbose=9) write_message("suiciding myself now...") task_update_status("SUICIDING") write_message("suicided") _db_login(1) task_update_status("SUICIDED") sys.exit(0) def _task_sig_unknown(sig, frame): """Signal handler for the other unknown signals sent by shell or user.""" # do nothing for unknown signals: write_message("unknown signal %d (frame %s) ignored" % (sig, frame)) _RE_PSLINE = re.compile('^\s*(.+?)\s+(.+?)\s*$') def guess_apache_process_user_from_ps(): """Guess Apache process user by parsing the list of running processes.""" apache_users = [] try: # Tested on Linux, Sun and MacOS X for line in os.popen('ps -A -o user,comm').readlines(): g = _RE_PSLINE.match(line) if g: username = g.group(2) process = os.path.basename(g.group(1)) if process in ('apache', 'apache2', 'httpd') : if username not in apache_users and username != 'root': apache_users.append(username) except Exception, e: print >> sys.stderr, "WARNING: %s" % e return tuple(apache_users) def guess_apache_process_user(): """ Return the possible name of the user running the Apache server process. (Look at running OS processes or look at OS users defined in /etc/passwd.) """ apache_users = guess_apache_process_user_from_ps() + ('apache2', 'apache', 'www-data') for username in apache_users: try: userline = pwd.getpwnam(username) return userline[0] except KeyError: pass print >> sys.stderr, "ERROR: Cannot detect Apache server process user. Please set the correct value in CFG_BIBSCHED_PROCESS_USER." sys.exit(1) def check_running_process_user(): """ Check that the user running this program is the same as the user configured in CFG_BIBSCHED_PROCESS_USER or as the user running the Apache webserver process. """ running_as_user = pwd.getpwuid(os.getuid())[0] if CFG_BIBSCHED_PROCESS_USER and running_as_user != CFG_BIBSCHED_PROCESS_USER: print >> sys.stderr, """ERROR: You must run "%(x_proc)s" as the user set up in your CFG_BIBSCHED_PROCESS_USER (seems to be "%(x_user)s"). You may want to do "sudo -u %(x_user)s %(x_proc)s ..." to do so. If you think this is not right, please set CFG_BIBSCHED_PROCESS_USER appropriately and rerun "inveniocfg --update-config-py".""" % \ {'x_proc': os.path.basename(sys.argv[0]), 'x_user': CFG_BIBSCHED_PROCESS_USER} sys.exit(1) elif running_as_user != guess_apache_process_user(): print >> sys.stderr, """ERROR: You must run "%(x_proc)s" as the same user that runs your Apache server process (seems to be "%(x_user)s"). You may want to do "sudo -u %(x_user)s %(x_proc)s ..." to do so. If you think this is not right, please set CFG_BIBSCHED_PROCESS_USER appropriately and rerun "inveniocfg --update-config-py".""" % \ {'x_proc': os.path.basename(sys.argv[0]), 'x_user': guess_apache_process_user()} sys.exit(1) return