Page MenuHomec4science

bibsched.wml
No OneTemporary

File Metadata

Created
Tue, Nov 5, 18:38

bibsched.wml

## $Id$
## Script that takes care of the task queue
## This file is part of the CERN Document Server Software (CDSware).
## Copyright (C) 2002, 2003, 2004, 2005 CERN.
##
## The CDSware is free software; you can redistribute it and/or
## modify it under the terms of the GNU General Public License as
## published by the Free Software Foundation; either version 2 of the
## License, or (at your option) any later version.
##
## The CDSware is distributed in the hope that it will be useful, but
## WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
## General Public License for more details.
##
## You should have received a copy of the GNU General Public License
## along with CDSware; if not, write to the Free Software Foundation, Inc.,
## 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA.
## read config variables:
#include "config.wml"
#include "configbis.wml"
#include "cdswmllib.wml"
## start Python:
<protect>#!</protect><PYTHON>
<protect>## $Id$</protect>
<protect>## DO NOT EDIT THIS FILE! IT WAS AUTOMATICALLY GENERATED FROM CDSware WML SOURCES.</protect>
"""BibSched - task management, scheduling and executing system for CDSware
"""
__version__ = "<: print generate_pretty_version_string('$Id$'); :>"
## fill config variables:
pylibdir = "<LIBDIR>/python"
## okay, rest of the Python code goes below
#######
### -- local configuration section starts here ---
cfg_valid_processes = ["bibindex","bibupload","bibreformat","webcoll","bibtaskex","bibrank"] # which tasks are reconized as valid?
### -- local configuration section ends here ---
<protect>
## import interesting modules:
try:
import os
import imp
import string
import sys
import MySQLdb
import time
import sre
import getopt
import curses
import curses.panel
from curses.wrapper import wrapper
import signal
except ImportError, e:
print "Error: %s" % e
import sys
sys.exit(1)
try:
sys.path.append('%s' % pylibdir)
from cdsware.config import *
from cdsware.dbquery import run_sql
except ImportError, e:
print "Error: %s" % e
import sys
sys.exit(1)
def get_datetime(var, format_string="%Y-%m-%d %H:%M:%S"):
"""Returns a date string according to the format string.
It can handle normal date strings and shifts with respect
to now."""
try:
date = time.time()
shift_re=sre.compile("([-\+]{0,1})([\d]+)([dhms])")
factors = {"d":24*3600, "h":3600, "m":60, "s":1}
m = shift_re.match(var)
if m:
sign = m.groups()[0] == "-" and -1 or 1
factor = factors[m.groups()[2]]
value = float(m.groups()[1])
date = time.localtime(date + sign * factor * value)
date = time.strftime(format_string, date)
else:
date = time.strptime(var, format_string)
date = time.strftime(format_string, date)
return date
except:
return None
def get_my_pid(process,args=''):
COMMAND = "ps -C %s o '%%p%%a' | grep '%s %s' | sed -n 1p" % (process,process,args)
answer = string.strip(os.popen(COMMAND).read())
if answer=='':
answer = 0
else:
answer = answer[:string.find(answer,' ')]
return int(answer)
def get_output_channelnames(task_id):
"Construct and return filename for stdout and stderr for the task 'task_id'."
filenamebase = "%s/bibsched_task_%d" % (logdir, task_id)
return [filenamebase + ".log", filenamebase + ".err"]
class Manager:
def __init__(self):
self.helper_modules = cfg_valid_processes
self.running = 1
self.footer_move_mode = "[KeyUp/KeyDown Move] [M Select mode] [Q Quit]"
self.footer_auto_mode = "[A Manual mode] [1/2 Display Type] [Q Quit]"
self.footer_select_mode = "[KeyUp/KeyDown/PgUp/PgDown Select] [L View Log] [1/2 Display Type] [M Move mode] [A Auto mode] [Q Quit]"
self.footer_waiting_item = "[R Run] [D Delete]"
self.footer_running_item = "[S Sleep] [T Stop] [K Kill]"
self.footer_stopped_item = "[I Initialise] [D Delete]"
self.footer_sleeping_item = "[W Wake Up]"
self.item_status = ""
self.selected_line = 2
self.rows = []
self.panel = None
self.display = 2
self.first_visible_line = 0
self.move_mode = 0
self.auto_mode = 0
self.bibsched_daemon_id = 0
self.currentrow = ["","","","","","",""]
wrapper( self.start )
def handle_keys(self):
chr = self.stdscr.getch()
if chr == -1:
return
if self.auto_mode and (chr not in (curses.KEY_UP,curses.KEY_DOWN,curses.KEY_PPAGE,curses.KEY_NPAGE,ord("q"),ord("Q"),ord("a"),ord("A"),ord("1"),ord("2"))):
self.display_in_footer("in automatic mode")
self.stdscr.refresh()
elif self.move_mode and (chr not in (curses.KEY_UP,curses.KEY_DOWN,ord("m"),ord("M"),ord("q"),ord("Q"))):
self.display_in_footer("in move mode")
self.stdscr.refresh()
else:
if chr == curses.KEY_UP:
if self.move_mode:
self.move_up()
else:
self.selected_line = max( self.selected_line - 1 , 2 )
self.repaint()
if chr == curses.KEY_PPAGE:
self.selected_line = max( self.selected_line - 10 , 2 )
self.repaint()
elif chr == curses.KEY_DOWN:
if self.move_mode:
self.move_down()
else:
self.selected_line = min(self.selected_line + 1, len(self.rows) + 1 )
self.repaint()
elif chr == curses.KEY_NPAGE:
self.selected_line = min(self.selected_line + 10, len(self.rows) + 1 )
self.repaint()
elif chr == curses.KEY_HOME:
self.first_visible_line = 0
self.selected_line = 2
elif chr in (ord("a"), ord("A")):
self.change_auto_mode()
elif chr in (ord("l"), ord("L")):
self.openlog()
elif chr in (ord("w"), ord("W")):
self.wakeup()
elif chr in (ord("r"), ord("R")):
self.run()
elif chr in (ord("s"), ord("S")):
self.sleep()
elif chr in (ord("k"), ord("K")):
self.kill()
elif chr in (ord("t"), ord("T")):
self.stop()
elif chr in (ord("d"), ord("D")):
self.delete()
elif chr in (ord("i"), ord("I")):
self.init()
elif chr in (ord("m"), ord("M")):
self.change_select_mode()
elif chr == ord("1"):
self.display = 1
self.first_visible_line = 0
self.selected_line = 2
self.display_in_footer("only done processes are displayed")
elif chr == ord("2"):
self.display = 2
self.first_visible_line = 0
self.selected_line = 2
self.display_in_footer("only not done processes are displayed")
elif chr in (ord("q"), ord("Q")):
if curses.panel.top_panel() == self.panel:
self.panel.bottom()
curses.panel.update_panels()
else:
self.running = 0
return
def set_status(self, id, status):
return run_sql("UPDATE schTASK set status=%s WHERE id=%s", (status, id))
def set_progress(self, id, progress):
return run_sql("UPDATE schTASK set progress=%s WHERE id=%s", (progress, id))
def openlog(self):
self.win = curses.newwin( self.height-2, self.width-2, 1, 1 )
self.panel = curses.panel.new_panel( self.win )
self.panel.top()
self.win.border()
self.win.addstr(1, 1, "Not implemented yet...")
self.win.refresh()
curses.panel.update_panels()
def count_processes(self,status):
out = 0
res = run_sql("SELECT COUNT(id) FROM schTASK WHERE status=%s GROUP BY status", (status,))
try:
out = res[0][0]
except:
pass
return out
def wakeup(self):
id = self.currentrow[0]
process = self.currentrow[1]
status = self.currentrow[5]
if self.count_processes('RUNNING') + self.count_processes('CONTINUING') >= 1:
self.display_in_footer("a process is already running!")
elif status == "SLEEPING":
mypid = get_my_pid(process,str(id))
if mypid!=0:
os.kill(mypid, signal.SIGCONT)
self.display_in_footer("process woken up")
else:
self.display_in_footer("process is not sleeping")
self.stdscr.refresh()
def run(self):
id = self.currentrow[0]
process = self.currentrow[1]
status = self.currentrow[5]
sleeptime = self.currentrow[4]
if self.count_processes('RUNNING') + self.count_processes('CONTINUING') >= 1:
self.display_in_footer("a process is already running!")
elif status == "STOPPED" or status == "WAITING":
if process in self.helper_modules:
program=os.path.join( bindir, process )
fdout, fderr = get_output_channelnames(id)
COMMAND = "%s %s >> %s 2>> %s &" % (program, str(id), fdout, fderr)
os.system(COMMAND)
Log("manually running task #%d (%s)" % (id, process))
if sleeptime:
next_runtime=get_datetime(sleeptime)
run_sql("INSERT INTO schTASK (proc, user, runtime, sleeptime, arguments, status) "\
" VALUES (%s,%s,%s,%s,%s,'WAITING')",
(process,self.currentrow[2], next_runtime,sleeptime,self.currentrow[7]))
else:
self.display_in_footer("process status should be STOPPED or WAITING!")
self.stdscr.refresh()
def sleep(self):
id = self.currentrow[0]
process = self.currentrow[1]
status = self.currentrow[5]
if status!='RUNNING' and status!='CONTINUING':
self.display_in_footer("this process is not running!")
else:
mypid = get_my_pid(process,str(id))
if mypid!=0:
os.kill(mypid, signal.SIGUSR1)
self.display_in_footer("SLEEP signal sent to process #%s" % mypid)
else:
self.set_status(id,'STOPPED')
self.display_in_footer("cannot find process...")
self.stdscr.refresh()
def kill(self):
id = self.currentrow[0]
process = self.currentrow[1]
status = self.currentrow[5]
mypid = get_my_pid(process,str(id))
if mypid!=0:
os.kill(mypid, signal.SIGKILL)
self.set_status(id,'STOPPED')
self.display_in_footer("KILL signal sent to process #%s" % mypid)
else:
self.set_status(id,'STOPPED')
self.display_in_footer("cannot find process...")
self.stdscr.refresh()
def stop(self):
id = self.currentrow[0]
process = self.currentrow[1]
status = self.currentrow[5]
mypid = get_my_pid(process,str(id))
if mypid!=0:
os.kill(mypid, signal.SIGINT)
self.display_in_footer("INT signal sent to process #%s" % mypid)
else:
self.set_status(id,'STOPPED')
self.display_in_footer("cannot find process...")
self.stdscr.refresh()
def delete(self):
id = self.currentrow[0]
process = self.currentrow[1]
status = self.currentrow[5]
if status!='RUNNING' and status!='CONTINUING' and status!='SLEEPING':
self.set_status(id,"%s_DELETED" % status)
self.display_in_footer("process deleted")
self.selected_line = max(self.selected_line, 2)
else:
self.display_in_footer("cannot delete running processes")
self.stdscr.refresh()
def init(self):
id = self.currentrow[0]
process = self.currentrow[1]
status = self.currentrow[5]
if status!='RUNNING' and status!='CONTINUING' and status!='SLEEPING':
self.set_status(id,"WAITING")
self.set_progress(id,"None")
self.display_in_footer("process initialised")
else:
self.display_in_footer("cannot initialise running processes")
self.stdscr.refresh()
def change_select_mode(self):
if self.move_mode:
self.move_mode = 0
else:
status = self.currentrow[5]
if status in ( "RUNNING" , "CONTINUING" , "SLEEPING" ):
self.display_in_footer("cannot move running processes!")
else:
self.move_mode = 1
self.stdscr.refresh()
def change_auto_mode(self):
if self.auto_mode:
if self.bibsched_daemon_id != 0:
os.kill(self.bibsched_daemon_id,signal.SIGKILL)
Log("daemon stopped")
else:
self.display_in_footer('Cannot find daemon process')
self.auto_mode = 0
else:
program = os.path.join( bindir, "bibsched")
COMMAND = "%s -d &" % program
os.system(COMMAND)
self.bibsched_daemon_id = get_my_pid("bibsched","-d")
self.auto_mode = 1
self.move_mode = 0
self.stdscr.refresh()
def move_up(self):
self.display_in_footer("not implemented yet")
self.stdscr.refresh()
def move_down(self):
self.display_in_footer("not implemented yet")
self.stdscr.refresh()
def put_line(self, row):
col_w = [ 5 , 11 , 21 , 21 , 7 , 11 , 25 ]
maxx = self.width
if self.y == self.selected_line - self.first_visible_line and self.y > 1:
if self.auto_mode:
attr = curses.color_pair(2) + curses.A_STANDOUT + curses.A_BOLD
elif self.move_mode:
attr = curses.color_pair(7) + curses.A_STANDOUT + curses.A_BOLD
else:
attr = curses.color_pair(8) + curses.A_STANDOUT + curses.A_BOLD
self.item_status = row[5]
self.currentrow = row
elif self.y == 0:
if self.auto_mode:
attr = curses.color_pair(2) + curses.A_STANDOUT + curses.A_BOLD
elif self.move_mode:
attr = curses.color_pair(7) + curses.A_STANDOUT + curses.A_BOLD
else:
attr = curses.color_pair(8) + curses.A_STANDOUT + curses.A_BOLD
elif row[5] == "DONE":
attr = curses.color_pair(5) + curses.A_BOLD
elif row[5] == "STOPPED":
attr = curses.color_pair(6) + curses.A_BOLD
elif sre.search("ERROR",row[5]):
attr = curses.color_pair(4) + curses.A_BOLD
elif row[5] == "WAITING":
attr = curses.color_pair(3) + curses.A_BOLD
elif row[5] in ("RUNNING","CONTINUING") :
attr = curses.color_pair(2) + curses.A_BOLD
else:
attr = curses.A_BOLD
myline = str(row[0]).ljust(col_w[0])
myline += str(row[1]).ljust(col_w[1])
myline += str(row[2]).ljust(col_w[2])
myline += str(row[3])[:19].ljust(col_w[3])
myline += str(row[4]).ljust(col_w[4])
myline += str(row[5]).ljust(col_w[5])
myline += str(row[6]).ljust(col_w[6])
myline = myline.ljust(maxx)
self.stdscr.addnstr(self.y, 0, myline, maxx, attr)
self.y = self.y+1
def display_in_footer(self, footer, i = 0, print_time_p=0):
if print_time_p:
footer = "%s %s" % (footer, time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))
maxx = self.stdscr.getmaxyx()[1]
footer = footer.ljust(maxx)
if self.auto_mode:
colorpair = 2
elif self.move_mode:
colorpair = 7
else:
colorpair = 1
self.stdscr.addnstr(self.y - i, 0, footer, maxx - 1, curses.A_STANDOUT + curses.color_pair(colorpair) + curses.A_BOLD )
def repaint(self):
self.y = 0
self.stdscr.clear()
self.height,self.width = self.stdscr.getmaxyx()
maxy = self.height - 2
maxx = self.width
self.put_line( ("ID","PROC","USER","RUNTIME","SLEEP","STATUS","PROGRESS") )
self.put_line( ("---","----","----","-------------------","-----","-----","--------") )
if self.selected_line > maxy + self.first_visible_line - 1:
self.first_visible_line = self.selected_line - maxy + 1
if self.selected_line < self.first_visible_line + 2:
self.first_visible_line = self.selected_line - 2
for row in self.rows[self.first_visible_line:self.first_visible_line+maxy-2]:
id,proc,user,runtime,sleeptime,status,progress,arguments = row
self.put_line( row )
self.y = self.stdscr.getmaxyx()[0] - 1
if self.auto_mode:
self.display_in_footer(self.footer_auto_mode, print_time_p=1)
elif self.move_mode:
self.display_in_footer(self.footer_move_mode, print_time_p=1)
else:
self.display_in_footer(self.footer_select_mode, print_time_p=1)
footer2 = ""
if sre.search("DONE",self.item_status) or self.item_status == "ERROR" or self.item_status == "STOPPED":
footer2 += self.footer_stopped_item
elif self.item_status == "RUNNING" or self.item_status == "CONTINUING":
footer2 += self.footer_running_item
elif self.item_status == "SLEEPING":
footer2 += self.footer_sleeping_item
elif self.item_status == "WAITING":
footer2 += self.footer_waiting_item
self.display_in_footer(footer2,1)
self.stdscr.refresh()
def start(self, stdscr):
ring = 0
curses.start_color()
curses.init_pair(8, curses.COLOR_WHITE, curses.COLOR_BLACK)
curses.init_pair(1, curses.COLOR_WHITE, curses.COLOR_RED)
curses.init_pair(2, curses.COLOR_GREEN, curses.COLOR_BLACK)
curses.init_pair(3, curses.COLOR_MAGENTA, curses.COLOR_BLACK)
curses.init_pair(4, curses.COLOR_RED, curses.COLOR_BLACK)
curses.init_pair(5, curses.COLOR_BLUE, curses.COLOR_BLACK)
curses.init_pair(6, curses.COLOR_CYAN, curses.COLOR_BLACK)
curses.init_pair(7, curses.COLOR_YELLOW, curses.COLOR_BLACK)
self.stdscr = stdscr
self.stdscr.nodelay(1)
self.base_panel = curses.panel.new_panel( self.stdscr )
self.base_panel.bottom()
curses.panel.update_panels()
self.height,self.width = stdscr.getmaxyx()
self.stdscr.clear()
self.bibsched_daemon_id = get_my_pid("bibsched","-d")
if self.bibsched_daemon_id != 0:
self.auto_mode = 1
if self.display == 1:
where = "and status='DONE'"
order = "DESC"
else:
where = "and status!='DONE'"
order = "ASC"
self.rows = run_sql("SELECT id,proc,user,runtime,sleeptime,status,progress,arguments FROM schTASK WHERE status NOT LIKE '%%DELETED%%' %s ORDER BY runtime %s" % (where,order))
self.repaint()
while self.running:
time.sleep( 0.1 )
self.handle_keys()
if ring == 20:
if self.display == 1:
where = "and status='DONE'"
order = "DESC"
else:
where = "and status!='DONE'"
order = "ASC"
self.rows = run_sql("SELECT id,proc,user,runtime,sleeptime,status,progress,arguments FROM schTASK WHERE status NOT LIKE '%%DELETED%%' %s ORDER BY runtime %s" % (where,order))
ring = 0
self.repaint()
else:
ring = ring+1
class BibSched:
def __init__(self):
self.helper_modules = cfg_valid_processes
self.running = {}
self.sleep_done = {}
self.sleep_sent ={}
self.stop_sent = {}
self.suicide_sent = {}
def set_status(self, id, status):
return run_sql("UPDATE schTASK set status=%s WHERE id=%s", (status, id))
def can_run( self, proc ):
return len( self.running.keys() ) == 0
def get_running_processes(self):
row = None
res = run_sql("SELECT id,proc,user,UNIX_TIMESTAMP(runtime),sleeptime,arguments,status FROM schTASK "\
" WHERE status='RUNNING' or status='CONTINUING' LIMIT 1")
try:
row = res[0]
except:
pass
return row
def handle_row( self, row ):
id,proc,user,runtime,sleeptime,arguments,status = row
if status == "SLEEP":
if id in self.running.keys():
self.set_status( id, "SLEEP SENT" )
os.kill( self.running[id], signal.SIGUSR1 )
self.sleep_sent[id] = self.running[id]
elif status == "SLEEPING":
if id in self.sleep_sent.keys():
self.sleep_done[id] = self.sleep_sent[id]
del self.sleep_sent[id]
if status == "WAKEUP":
if id in self.sleep_done.keys():
self.running[id] = self.sleep_done[id]
del self.sleep_done[id]
os.kill( self.running[id], signal.SIGCONT )
self.set_status( id, "RUNNING" )
elif status == "STOP":
if id in self.running.keys():
self.set_status( id, "STOP SENT" )
os.kill( self.running[id], signal.SIGUSR2 )
self.stop_sent[id] = self.running[id]
del self.running[id]
elif status == "STOPPED" and id in self.stop_sent.keys():
del self.stop_sent[id]
elif status == "SUICIDE":
if id in self.running.keys():
self.set_status( id, "SUICIDE SENT" )
os.kill( self.running[id], signal.SIGABRT )
self.suicide_sent[id] = self.running[id]
del self.running[id]
elif status == "SUICIDED" and id in self.suicide_sent.keys():
del self.suicide_sent[ id ]
elif sre.search("DONE",status) and id in self.running.keys():
del self.running[id]
elif self.can_run(proc) and status == "WAITING" and runtime <= time.time():
if proc in self.helper_modules:
program=os.path.join( bindir, proc )
fdout, fderr = get_output_channelnames(id)
COMMAND = "%s %s >> %s 2>> %s" % (program, str(id), fdout, fderr)
Log("task #%d (%s) started" % (id, proc))
os.system(COMMAND)
Log("task #%d (%s) ended" % (id, proc))
self.running[id] = get_my_pid(proc,str(id))
if sleeptime:
next_runtime=get_datetime(sleeptime)
run_sql("INSERT INTO schTASK (proc, user, runtime, sleeptime, arguments, status) "\
" VALUES (%s,%s,%s,%s,%s,'WAITING')",
(proc, user, next_runtime, sleeptime, arguments))
def watch_loop(self):
running_process = self.get_running_processes()
if running_process:
proc = running_process[ 1 ]
id = running_process[ 0 ]
if get_my_pid(proc,str(id)):
self.running[id] = get_my_pid(proc,str(id))
else:
self.set_status(id,"ERROR")
rows = []
while 1:
for row in rows:
self.handle_row( row )
time.sleep(1)
rows = run_sql("SELECT id,proc,user,UNIX_TIMESTAMP(runtime),sleeptime,arguments,status FROM schTASK ORDER BY runtime ASC")
def Log(message):
log=open(logdir + "/bibsched.log","a")
log.write(time.strftime("%Y-%m-%d %H:%M:%S --> ", time.localtime()))
log.write(message)
log.write("\n")
log.close()
def redirect_stdout_and_stderr():
"This function redirects stdout and stderr to bibsched.log and bibsched.err file."
sys.stdout = open(logdir + "/bibsched.log", "a")
sys.stderr = open(logdir + "/bibsched.err", "a")
def usage(exitcode=1, msg=""):
"""Prints usage info."""
if msg:
sys.stderr.write("Error: %s.\n" % msg)
sys.stderr.write("Usage: %s [options]\n" % sys.argv[0])
sys.stderr.write("Command options:\n")
sys.stderr.write(" -d, --daemon\t Launch BibSched in the daemon mode.\n")
sys.stderr.write("General options:\n")
sys.stderr.write(" -h, --help \t\t Print this help.\n")
sys.stderr.write(" -V, --version \t\t Print version information.\n")
#sys.stderr.write(" -v, --verbose=LEVEL \t Verbose level (0=min, 1=default, 9=max).\n")
sys.exit(exitcode)
def main():
try:
opts, args = getopt.getopt(sys.argv[1:], "hVd", ["--help","--version","--daemon"])
except getopt.GetoptError, err:
Log( "Error: %s" % err)
usage(1, err)
if not opts:
redirect_stdout_and_stderr()
manager = Manager()
else:
opt = opts[0] # we shall choose only the first option
if opt[0] in ["-h", "--help"]:
usage(0)
elif opt[0] in ["-V", "--version"]:
print __version__
sys.exit(0)
elif opt[0] in ["-d", "--daemon"]:
redirect_stdout_and_stderr()
sched = BibSched()
Log("daemon started")
sched.watch_loop()
else:
usage(1)
sys.stderr.close()
return
if __name__ == '__main__':
main()
</protect>

Event Timeline