diff --git a/bin/getRunInfo.py b/bin/getRunInfo.py index f37f5ec..2fa09ba 100755 --- a/bin/getRunInfo.py +++ b/bin/getRunInfo.py @@ -1,217 +1,220 @@ #!/usr/bin/env python import BlackDynamite as BD import os, sys, stat import subprocess import getopt import socket import datetime ################################################################ def printSummary(mybase, params): runSelector = BD.RunSelector(mybase) run_list = runSelector.selectRuns(params,params,quiet=True) print ("************************ run summary => study {0} *********************************".format(mybase.schema)) if (len(run_list) == 0): print ("no runs found") sys.exit(0) request = "SELECT run_name,state,count(state) from {0}.runs ".format(mybase.schema) if (len(run_list) > 0): request += "where id in (" + ",".join([str(r.id) for r,j in run_list ]) + ")" request += " group by state,run_name order by run_name,state" # print (request) curs = mybase.performRequest(request,[]) stats = {} for i in curs: if i[0] not in stats: stats[i[0]] = [] stats[i[0]].append([i[1] , int(i[2]) ]) for run_name,st in stats.iteritems(): tot = 0 for n,count in st: tot += count for n,count in st: print ("{:20} {:>20} => {:5} ({:>5.1f}%)".format(run_name,n,count,100.*count/tot)) print ("") sys.exit(0) ################################################################ def getRunInfo(run_id, mybase): myrun = BD.Run(mybase) myrun["id"] = run_id myrun.id = run_id run_list = myrun.getMatchedObjectList() if (len(run_list) == 0): print ("no run found with id " + str(run_id)) sys.exit(1) myrun = run_list[0] myjob = BD.Job(mybase) myjob.id = myrun["job_id"] myjob["id"] = myrun["job_id"] job_list = myjob.getMatchedObjectList() if (len(job_list) == 0): print ("no job found with id " + myjob.id) sys.exit(1) myjob = job_list[0] list_entries = myjob.entries.keys() print ("************************ job info *********************************") for entry in list_entries: if (myjob[entry]): print (entry + ": " + str(myjob[entry])) print ("************************ run info *********************************") list_entries = myrun.entries.keys() regular_run_entries = ("run_name", \ "job_id" , \ "state", \ "start_time", \ "machine_name" , \ "exec", \ "nproc", \ "wait_id" \ ) for entry in regular_run_entries: if (myrun[entry]): print (entry + ": " + str(myrun[entry])) list_entries.remove(entry) for entry in list_entries: if (myrun[entry]): print (entry + ": " + str(myrun[entry])) conffiles = myrun.getConfigFiles() for conf in conffiles: print ("****************************************************************") print ("file #" + str(conf.id) + ": " + conf["filename"]) print ("****************************************************************") print (conf["file"]) ################################################################ def getInfoNames(): infos = [] infos.append("run_name") infos.append("id") infos.append("job_id") if "infos" in params: infos += params['infos'] else: infos += ["state","nproc","machine_name"] infos.append("start_time") infos.append("last step") infos.append("last update") infos.append("Time/step") infos.append("Total Time") return infos ################################################################ def getFormatString(infos): format_string = " {:<20} | {:^6} | {:^6} |" if "infos" in params: format_string += " {:^10} |" * len(params['infos']) else: format_string += " {:<15} | {:^5} | {:<20} |" format_string += " {:14} | {:9} | {:14} | {:10} | {:14} |" return format_string ################################################################ def formatTimeDelta(t): if (t < datetime.timedelta(seconds=1)): if (t < datetime.timedelta(microseconds=1000)): t = str(t.microseconds) + u'\u00B5'.encode('UTF-8') + "s" else: t = str(1./1000.*t.microseconds) + 'ms' else: ms = t.microseconds t -= datetime.timedelta(microseconds=ms) t = str(t) return t ################################################################ def getTimeInfos(r): step,steptime = r.getLastStep() start_time = r['start_time'] time_perstep = None total_time = None if (step): time_perstep = (steptime-start_time)/step total_time = steptime-start_time time_perstep = formatTimeDelta(time_perstep) total_time = formatTimeDelta(total_time) if start_time: start_time = start_time.strftime("%H:%M %d/%m/%y") if steptime : steptime = steptime.strftime("%H:%M %d/%m/%y") run_infos = [start_time,step,steptime,time_perstep,total_time] return run_infos ################################################################ def getRunInfos(r,j): run_infos = [] for col in info_names: key_run = col.replace('%r.','').strip() if not key_run == 'start_time' and key_run in r.entries: run_infos.append(r[key_run]) else: key_job = col.replace('%j.','').strip() if key_job in j.entries: run_infos.append(j[key_job]) run_infos += getTimeInfos(r) return run_infos ################################################################ parser = BD.BDParser() parser.register_params(group="getRunInfo", params={"run_id":int, "order":str, "summary":bool, "infos":[str]}, defaults={"order":"id"}, help={"run_id":"Select a run_id for complete output", "summary":"Output a summary of the completeness of the study", "order": "specify the column which serves to order the lines"}) params = parser.parseBDParameters() mybase = BD.Base(**params) if (params["summary"] == True): printSummary(mybase,params) if ("run_id" in params): getRunInfo(params["run_id"],mybase) else: info_names = getInfoNames() format_string = getFormatString(info_names) header = format_string.format(*info_names) separator = "-" * len(header) print (separator) print (header) print (separator) run_constraints = [] job_constraints = [] runSelector = BD.RunSelector(mybase) run_list = runSelector.selectRuns(params,params,sort_by="runs." + params["order"],quiet=True) for r,j in run_list: line = format_string.format(*getRunInfos(r,j)) print (line) + + +#mybase.close() diff --git a/python/BlackDynamite/base.py b/python/BlackDynamite/base.py index 73c84b0..c1752b0 100755 --- a/python/BlackDynamite/base.py +++ b/python/BlackDynamite/base.py @@ -1,282 +1,282 @@ #!/usr/bin/env python from __future__ import print_function __all__ = [ "Base" ] import job import os import psycopg2 import re import copy import numpy as np import psycopg2 import bdparser import sys import getpass import datetime import run ################################################################ -import bdlogging -import logging +import bdlogging,logging print = bdlogging.invalidPrint -logger = bdlogging.BDLogging.getLogger(__name__,level=logging.DEBUG) +logger = logging.getLogger(__name__) ################################################################ class Base(object): """ """ def createBase(self,job_desc,run_desc,quantities={},**kwargs): #logger.debug (quantities) self.createSchema(kwargs) self.createTable(job_desc) self.createTable(run_desc) self.createGenericTables() for qname,type in quantities.iteritems(): self.pushQuantity(qname,type) if (self.truerun): self.commit() def getObject(self,sqlobject): curs = self.connection.cursor() curs.execute("SELECT * FROM {0}.{1} WHERE id = {2}".format(self.schema,sqlobject.table_name,sqlobject.id)) col_info = self.getColumnProperties(sqlobject) line = curs.fetchone() for i in range(0,len(col_info)): col_name = col_info[i][0] sqlobject[col_name] = line[i] def createSchema(self, params = {"yes": False}): # create the schema of the simulation curs = self.connection.cursor() curs.execute("SELECT schema_name FROM information_schema.schemata WHERE schema_name = '{0}'".format(self.schema).lower()) if (curs.rowcount): validated = bdparser.validate_question("Are you sure you want to drop the schema named '" + self.schema + "'", params, False) if (validated == True): curs.execute("DROP SCHEMA {0} cascade".format(self.schema)) else: logger.debug ("creation canceled: exit program") sys.exit(-1) curs.execute("CREATE SCHEMA {0}".format(self.schema)) def createTypeCodes(self): curs = self.connection.cursor() curs.execute("SELECT typname,oid from pg_type;") self.type_code = {} for i in curs: # logger.debug (i[0]) if (i[0] == 'float8'): self.type_code[i[1]] = float if (i[0] == 'text'): self.type_code[i[1]] = str if (i[0] == 'int8'): self.type_code[i[1]] = int if (i[0] == 'int4'): self.type_code[i[1]] = int if (i[0] == 'bool'): self.type_code[i[1]] = bool if (i[0] == 'timestamp'): self.type_code[i[1]] = datetime.datetime def createTable(self, object): request = object.createTableRequest() curs = self.connection.cursor() # logger.debug (request) curs.execute(request) def createGenericTables(self,): sql_script_name = os.path.join(os.path.dirname(__file__),"build_tables.sql") curs = self.connection.cursor() # create generic tables query_list = list() with open(sql_script_name,"r") as fh: for line in fh: query_list.append(re.sub("SCHEMAS_IDENTIFIER",self.schema,line)) curs.execute("\n".join(query_list)) def getColumnProperties(self, sqlobject): curs = self.connection.cursor() curs.execute("SELECT * FROM {0}.{1} LIMIT 0".format(self.schema,sqlobject.table_name)) column_names = [ desc[0] for desc in curs.description] column_type = [ desc[1] for desc in curs.description] return zip(column_names,column_type) def setObjectItemTypes(self, sqlobject): col_info = self.getColumnProperties(sqlobject) for i,j in col_info: sqlobject.types[i] = self.type_code[j] # logger.debug (str(i) + " " + str(self.type_code[j])) def insert(self, sqlobject): sqlobject.prepare() curs = self.performRequest(*(sqlobject.insert())) sqlobject.id = curs.fetchone()[0] def performRequest(self, request, params=[]): curs = self.connection.cursor() # logger.debug (request) # logger.debug (params) try: curs.execute(request,params) except psycopg2.ProgrammingError as err: raise psycopg2.ProgrammingError( ("While trying to execute the query '{0}' with parameters " + "'{1}', I caught this: '{2}'").format(request, params, err)) return curs def createParameterSpace(self,myjob,entry_nb=0,tmp_job=None,nb_inserted = 0): keys = myjob.entries.keys() nparam = len(keys) if (entry_nb == nparam): if (not tmp_job): logger.debug ("internal error") sys.exit(-1) if (len(tmp_job.getMatchedObjectList()) > 0): return nb_inserted nb_inserted += 1 - logger.debug ("insert job #{0}".format(nb_inserted) + ': ' + str(tmp_job.entries)) + logger.info("insert job #{0}".format(nb_inserted) + ': ' + str(tmp_job.entries)) self.insert(tmp_job) return nb_inserted if (not tmp_job): tmp_job = job.Job(self) key = keys[entry_nb] e = myjob[key] if (type(e) == list): for typ in e: tmp_job[key.lower()] = typ nb_inserted = self.createParameterSpace(myjob,entry_nb+1,tmp_job,nb_inserted) else: tmp_job[key.lower()] = e nb_inserted = self.createParameterSpace(myjob,entry_nb+1,tmp_job,nb_inserted) if (self.truerun): self.commit() return nb_inserted def pushQuantity(self,name, type_code, description=None): """ implemented type_codes: "int" "float" "int.vector" "float.vector" """ if ((type_code == "int") or (type_code == int)): is_integer = True is_vector = False elif (type_code == "int.vector"): is_integer = True is_vector = True elif ((type_code == "float") or (type_code == float)): is_integer = False is_vector = False elif (type_code == "float.vector"): is_integer = False is_vector = True else: raise Exception("invalid type '{0}' for a quantity".format(type_code)) curs = self.connection.cursor() curs.execute("INSERT INTO {0}.quantities (name,is_integer,is_vector,description) VALUES (%s , %s , %s, %s) RETURNING id".format(self.schema),(name,is_integer,is_vector,description) ) item = curs.fetchone() if (item is None): raise Exception("Counld not create quantity \"" + name + "\"") return item[0] def commit(self): logger.debug("commiting changes to base") self.connection.commit() def getSchemaList(self): curs = self.connection.cursor() curs.execute("SELECT distinct(table_schema) from information_schema.tables where table_name='runs'") schemas = [ desc[0] for desc in curs] return schemas def checkStudy(self,dico): if not "study" in dico: logger.debug ("*"*30) logger.debug ("Parameter 'study' must be provided at command line") logger.debug ("possibilities are:") schemas = self.getSchemaList() for s in schemas: logger.debug ("\t" + s) logger.debug ("") logger.debug ("FATAL => ABORT") logger.debug ("*"*30) sys.exit(-1) def close(self): if 'connection' in self.__dict__: + logger.debug ('closing database session' ) self.connection.close() + del (self.__dict__['connection']) def __del__(self): - logger.debug ('closing database session') self.close() def __init__ (self, truerun=False,**kwargs): psycopg2_params = ["host","user","port","password"] connection_params = bdparser.filterParams(psycopg2_params,kwargs) if "password" in connection_params and connection_params["password"] == 'ask': connection_params["password"] = getpass.getpass() logger.debug('connection arguments: {0}'.format(connection_params)) try: self.connection = psycopg2.connect(**connection_params) logger.debug('connected to base') except Exception as e: logger.error("Connection failed: check your connection settings:\n" + str(e)) sys.exit(-1) assert(isinstance(self.connection,psycopg2._psycopg.connection)) self.dbhost = kwargs["host"] if "host" in kwargs.keys() else "localhost" if ("should_not_check_study" not in kwargs): self.checkStudy(kwargs) self.schema = kwargs["study"] self.createTypeCodes() self.truerun = truerun if("list_parameters" in kwargs and kwargs["list_parameters"] == True): myjob = job.Job(self) myjob.prepare() logger.debug ("****************************************************************") logger.debug ("Job parameters:") logger.debug ("****************************************************************") params = [str(j[0]) + ": " + str(j[1]) for j in myjob.types.iteritems() ] logger.debug("\n".join(params)) myrun = run.Run(self) myrun.prepare() logger.debug ("****************************************************************") logger.debug ("Run parameters:") logger.debug ("****************************************************************") params = [str(j[0]) + ": " + str(j[1]) for j in myrun.types.iteritems() ] logger.debug("\n".join(params)) sys.exit(0) ################################################################ if __name__ == "__main__": connection = psycopg2.connect(host = "localhost") job_description = job.Job(dict(hono=int,lulu=float,toto=str)) base = Base("honoluluSchema",connection,job_description) base.create() connection.commit() base.pushJob(dict(hono=12,lulu=24.2,toto="toto")) base.pushQuantity("ekin", "float") connection.commit() diff --git a/python/BlackDynamite/bdlogging.py b/python/BlackDynamite/bdlogging.py index 55ed608..67d27d3 100644 --- a/python/BlackDynamite/bdlogging.py +++ b/python/BlackDynamite/bdlogging.py @@ -1,85 +1,98 @@ #!/usr/bin/env python from __future__ import print_function __all__ = [ "BDLogging" ] import logging,traceback,os ################################################################ def invalidPrint(x): raise Exception('print should not be used in that class: use the logging system instead: "{0}"'.format(x)) ################################################################ -class BDlogger: - - def __init__(self,name,fileHandler): - self.logger_screen = logging.getLogger(name) - self.logger_file = logging.getLogger(name + 'file') - self.logger_file.addHandler(fileHandler) - self.logger_screen.addHandler(logging.StreamHandler()) +file_handler = None +def setFileHandler(logger,streamformatter): + if globals()['file_handler'] == None: +# print ("resetting the file for {0}".format(logger.name)) + f = open('bd.log','w') + f.close() + globals()['file_handler'] = logging.FileHandler('bd.log') + file_handler = globals()['file_handler'] + file_handler.setFormatter(streamformatter) + if not file_handler in logger.handlers: + logger.addHandler(file_handler) +################################################################ +Parent = logging.getLoggerClass() +class BDlogger(Parent): + + def __init__(self,name): + self.name = name + self.logger_screen = Parent(name + 'screen') + self.logger_file = Parent(name + 'file') + self.streamformatter = logging.Formatter(fmt='%(levelname)s:%(foo)50s:%(f)15s:%(l)s:' + ' '*10 + '%(message)s') + self.screen_handler = logging.StreamHandler() + self.screen_handler.setFormatter(self.streamformatter) + self.logger_screen.addHandler(self.screen_handler) + self.enable_file = False + self.setScreenLevel(logging.INFO) + def setLogFileLevel(self,level): self.logger_file.setLevel(level) def setScreenLevel(self,level): self.logger_screen.setLevel(level) + def activateFileLog(self,level = logging.DEBUG): + setFileHandler(self.logger_file,self.streamformatter) + self.enable_file = True + self.setLogFileLevel(level) + def getExtra(self): extra = {} tr = traceback.extract_stack(limit=3) tr = tr[0] fname = os.path.basename(tr[0]) extra['foo'] = tr[2] extra['f'] = fname extra['l'] = tr[1] return extra def debug(self,x): self.logger_screen.debug(x,extra=self.getExtra()) - self.logger_file.debug(x,extra=self.getExtra()) + if self.enable_file is True: + self.logger_file.debug(x,extra=self.getExtra()) def warning(self,x): self.logger_screen.warning(x,extra=self.getExtra()) - self.logger_file.warning(x,extra=self.getExtra()) + if self.enable_file is True: + self.logger_file.warning(x,extra=self.getExtra()) def info(self,x): self.logger_screen.info(x,extra=self.getExtra()) - self.logger_file.info(x,extra=self.getExtra()) + if self.enable_file is True: + self.logger_file.info(x,extra=self.getExtra()) def error(self,x): self.logger_screen.error(x,extra=self.getExtra()) - self.logger_file.error(x,extra=self.getExtra()) - - -class BDLogging: - - """ - """ - - filehandler = None - - @classmethod - def getLogger(cls,name,level=logging.WARNING,emptylog=False): + if self.enable_file is True: + self.logger_file.error(x,extra=self.getExtra()) +################################################################ +logging.setLoggerClass(BDlogger) +################################################################ +import logging +logger = logging.getLogger(__name__) +################################################################ - if cls.filehandler is None: - if emptylog: - cls.filehandler = logging.FileHandler('bd.log', mode='w') - else: - cls.filehandler = logging.FileHandler('bd.log') - streamformatter = logging.Formatter(fmt='%(levelname)s:%(foo)50s:%(f)15s:%(l)s:' + ' '*10 + '%(message)s') - - cls.filehandler.setFormatter(streamformatter) - - logger = BDlogger(name,cls.filehandler) - logger.setScreenLevel(logging.WARNING) - logger.setLogFileLevel(level) - logger.debug('Starting to log') - return logger +def activateFileLogging(): + for name,log in logging.Logger.manager.loggerDict.iteritems(): + if isinstance(log,BDlogger): log.activateFileLog() + diff --git a/python/BlackDynamite/bdparser.py b/python/BlackDynamite/bdparser.py index ec0ccc9..bba9a7e 100755 --- a/python/BlackDynamite/bdparser.py +++ b/python/BlackDynamite/bdparser.py @@ -1,582 +1,588 @@ #!/usr/bin/env python from __future__ import print_function __all__ = [ "BDParser", "RunParser" ] import BlackDynamite as BD import sys import re import os import stat import pwd import base import run import argcomplete, argparse from argcomplete.completers import EnvironCompleter import traceback from types import ModuleType import imp ################################################################ import bdlogging,logging print = bdlogging.invalidPrint -#logger = bdlogging.BDLogging.getLogger(__name__,level=logging.DEBUG) -logger = bdlogging.BDLogging.getLogger(__name__) +logger = logging.getLogger(__name__) ################################################################ class BDParser(object): """ """ def listPossibleHosts(self): logger.debug("in") bd_dir = os.path.expanduser("~/.blackdynamite") bd_hosts = os.path.join(bd_dir,'hosts') hosts = [] try: hosts += [h.strip() for h in open(bd_hosts)] except: pass return hosts def listPossibleModules(self,pre_args): logger.debug("in") paths = [] if ("PYTHONPATH" in os.environ): paths = os.environ["PYTHONPATH"].split(':') if ("module_path" in pre_args) : paths += pre_args["module_path"].split(':') paths += BD.__path__ paths += [ path + "/coating" for path in BD.__path__ ] module_list = [] paths = [p.strip() for p in paths if not p.strip() == ''] for p in paths: files = os.listdir(p) files = [f for f in files if os.path.splitext(f)[1] == '.py'] files = [f for f in files if not f[0] == '_'] matching_string = ".*blackdynamite.*" files = [os.path.splitext(f)[0] for f in files if re.match(matching_string,open(os.path.join(p,f)).read().replace('\n',' '),flags=re.IGNORECASE)] module_list += files logger.debug("found these files " + str(module_list)) return module_list def updatePossibleHosts(self,new_host): logger.debug("in") bd_dir = os.path.expanduser("~/.blackdynamite") bd_hosts = os.path.join(bd_dir,'hosts') hosts = set(self.listPossibleHosts()) hosts.add(new_host) f = open(bd_hosts,'w') for h in hosts: f.write(h+'\n') def completer(self,prefix,**kwargs): - logger.debug("in") + #bdlogging.activateFileLogging() + try: params = vars(kwargs["parsed_args"]) + if params['logging'] is True: bdlogging.activateFileLogging() + logger.debug("in") logger.debug("BDparser prefix " + str(prefix) + "\n") for k, v in kwargs.iteritems(): logger.debug("kwargs[" + str(k) + "] = " + str(v) + "\n") logger.debug("dest " + str(vars(kwargs["action"])["dest"]) + "\n") for k in params.keys(): if params[k] is None: del params[k] key = vars(kwargs["action"])["dest"] logger.debug("key " + str(key) + "\n") if (key == "BDconf"): return self.listPossibleConf() if 'BDconf' in params: self.readConfFiles(params,params['BDconf']) if 'host' in params: self.readConfFile(params,params['host']+'.bd') logger.debug("key " + str(key) + "\n") for k in params.keys(): logger.debug("params = " + str(k)) if (key == "host"): return self.listPossibleHosts() if (key == "study"): params["should_not_check_study"] = True mybase = base.Base(**params) return mybase.getSchemaList() if (key == 'quantity'): mybase = base.Base(**params) myrun = run.Run(mybase) return myrun.listQuantities() if (key in self.admissible_params and self.admissible_params[key] == ModuleType): logger.debug("trying to complete module list for '{}'".format(key) ) return self.listPossibleModules(params) except Exception as e: logger.debug(traceback.format_exc()) logger.debug(str(e)) return [] def listPossibleConf(self): logger.debug("in") files = [] for dir in ["./", os.path.expanduser("~/.blackdynamite")]: for filename in os.listdir(dir): fileName, fileExtension = os.path.splitext(filename) if (fileExtension == ".bd"): files.append(filename) return files return files def readConfFiles(self,read_params,fnames): logger.debug("in") for f in fnames: self.readConfFile(read_params,f) def readConfFile(self,read_params,fname): logger.debug("in") if fname in self.readConfFileList: return self.readConfFileList.append(fname) if type(fname) == list: raise Exception('cannot use list in that function: ' + str(type(fname))) pre_args = {} for dir in ["./",os.path.expanduser("~/.blackdynamite")]: fullpath = os.path.join(dir,fname) if (os.path.isfile(fullpath)): fname = fullpath break try: with open(fname) as fh: #print("loading file '{0}'".format(fname)) os.chmod(fname,stat.S_IREAD|stat.S_IWRITE) lines = [line.strip() for line in fh] regex = "(.*)=(.*)" for line in lines: match = re.match(regex,line) if (not match): print("malformed line:" + line) sys.exit(-1) param = match.group(1).strip() val = match.group(2).strip() pre_args[param] = val self.argv.append("--"+param) self.argv.append(val) read_params.update(self.createParamsMap(pre_args)) except Exception as e: logger.debug("cannot open file " + fname + '\n' + str(e) + '\n' + str(traceback.format_exc())) def checkParam(self,p,dico): logger.debug("in") print("****************") print("Obsolete: should use the mandatory argument for the declare_params function") print("It was used by object " + str(self) + " for keyword " + p) print("FATAL => ABORT") print("****************") sys.exit(-1) def loadModule(self,read_params,myscript,pre_args): logger.debug("in") paths = [] if ("PYTHONPATH" in os.environ): paths = os.environ["PYTHONPATH"].split(':') if ("module_path" in pre_args) : paths += pre_args["module_path"].split(':') paths += BD.__path__ paths += [ path + "/coating" for path in BD.__path__ ] mymod = None for p in paths: try: modfile = os.path.join(p,myscript+".py") #print("loading file " + modfile) mymod = imp.load_source(myscript,modfile) break except IOError as io_err: logger.debug("loadModule " + str(io_err)) logger.debug("loadModule " + str(mymod)) if (mymod is None): logger.debug("cannot find module '" + myscript + "' from paths " + str(paths)) logger.debug("trace :" + traceback.format_exc()+ '\n') raise Exception("cannot find module '" + myscript + "' from paths " + str(paths)) return mymod def createParamsMap(self,pre_args): - logger.debug("in") + logger.debug("in") read_params = {} + if ('logging' in pre_args) and (pre_args['logging'] is True): + bdlogging.activateFileLogging() + + for opt,args in pre_args.iteritems(): logger.debug("createParamsMap1 " + str(opt) + " : " + str(args)) if (args is None): continue if (not type(args) == list): args = [args] logger.debug("createParamsMap2 " + str(opt) + " : " + str(args)) for arg in args: if (arg is None): continue if (opt == 'BDconf'): self.readConfFiles(read_params,arg) continue if (opt == 'host'): self.updatePossibleHosts(arg) self.readConfFile(read_params,arg+'.bd') for param, typ in self.admissible_params.iteritems(): if opt == param: logger.debug("createParamsMap3 " + str(param) + " : " + str(typ)) if (typ == ModuleType): read_params[param] = self.loadModule(read_params,arg,pre_args) logger.debug("createParamsMap4 " + str(param) + " : " + str(typ)) elif (type(typ) == list): args = arg.split(",") if (not param in read_params): read_params[param] = [] if (not len(typ) == 1) : subtype = str else : subtype = typ[0] for i in range(0,len(args)): read_params[param].append(subtype(args[i])) else: read_params[param] = typ(arg) break return read_params def addModulesAdmissibleParameters(self,read_params): logger.debug("in") for k,v in read_params.iteritems(): if self.admissible_params[k] == ModuleType: mymod = read_params[k] modname = mymod.__name__ if "admissible_params" in mymod.__dict__: self.admissible_params.update(mymod.__dict__["admissible_params"]) self.group_params["'" + modname + "' module options" ] = mymod.__dict__["admissible_params"].keys() if "default_params" in mymod.__dict__: self.default_params.update(mymod.__dict__["default_params"]) if "help" in mymod.__dict__: self.help.update(mymod.__dict__["help"]) if "mandatory" in mymod.__dict__: self.mandatory.update(mymod.__dict__["mandatory"]) logger.debug(str(self.admissible_params)) def addModulesAdmissibleParametersForComplete(self,read_params): logger.debug("in") if not "_ARGCOMPLETE" in os.environ: return logger.debug("arg complete ? " + os.environ["_ARGCOMPLETE"]) # breaks = os.environ["COMP_WORDBREAKS"] tmp_read_params = {} breaks = " |=|&|<|>|;" logger.debug("break line " + os.environ["COMP_LINE"]) all_args = re.split(breaks,os.environ["COMP_LINE"]) logger.debug("break line " + str(all_args)) for i in range(0,len(all_args)): a = all_args[i] res = re.match("--(.*)",a) if res is None: continue a = res.group(1) logger.debug("treating a " + str(a) ) if a in self.admissible_params: if self.admissible_params[a] == ModuleType: logger.debug("here treating a " + str(a) ) if i+1 >= len(all_args): continue b = all_args[i+1] logger.debug("treating b " + str(b) ) res = re.match("--(.*)",b) if res is not None: continue if not b.strip() == '': tmp_read_params[a] = b if ("module_path" in read_params): tmp_read_params["module_path"] = read_params["module_path"] logger.debug("tmp_read_params " + str(tmp_read_params) ) try: tmp_read_params = self.createParamsMap(tmp_read_params) except Exception as e: logger.debug("trace :" + traceback.format_exc()+ '\n' + str(e)) logger.debug("AAAAAAAAA " + str(tmp_read_params)) try: self.addModulesAdmissibleParameters(tmp_read_params) except Exception as e: logger.debug("trace :" + traceback.format_exc()+ '\n' + str(e)) logger.debug("CCCCCCCCCC" + str(self.admissible_params)) def constructArgParser(self,add_help=True,add_mandatory=True): logger.debug("in") parser = argparse.ArgumentParser(description = "BlackDynamite option parser",formatter_class=argparse.ArgumentDefaultsHelpFormatter,add_help=add_help) self.params_group = {} group = parser.add_argument_group("General") self.params_group["General"] = group for g,param_list in self.group_params.iteritems(): group = parser.add_argument_group(g) for p in param_list: self.params_group[p] = group for param, typ in self.admissible_params.iteritems(): #print ("param {0}: {1}".format(param,typ) ) p_help = "help TODO" is_mandatory = (param in self.mandatory.keys() and self.mandatory[param] == True and add_mandatory) if (param in self.help) : p_help = self.help[param] if (param in self.params_group): grp = self.params_group[param] else : grp = self.params_group["General"] if (typ is None): raise Exception("Deprectated option type for " + param + " : should be changed to 'bool'") if (typ is bool): if (param in self.default_params and self.default_params[param] == True): grp.add_argument("--" + param, help = p_help, dest = param, action = 'store_false', required = is_mandatory) else: grp.add_argument("--" + param, help = p_help, dest = param, action = 'store_true', required = is_mandatory) elif (typ is list or typ == [str]): grp.add_argument("--" + param, action = 'append', dest = param, help = p_help, required = is_mandatory).completer = self.completer else: grp.add_argument("--" + param, dest = param, help = p_help, required = is_mandatory).completer = self.completer parser.set_defaults(**self.default_params) return parser def register_params(self,group="General",params=None,defaults=None,help=None, mandatory=None): logger.debug("in") if (params is not None): self.admissible_params.update(params) if group not in self.group_params: self.group_params[group] = [] self.group_params[group] += params.keys() if (defaults is not None): self.default_params.update(defaults) for key in defaults.keys(): self.mandatory[key] = False if (help is not None): self.help.update(help) if (mandatory is not None): self.mandatory.update(mandatory) for param,typ in self.admissible_params.iteritems(): if typ == bool and param not in self.default_params: self.default_params[param] = False def addEnvBDArguments(self,parser): logger.debug("in") parser = self.constructArgParser(add_help=False,add_mandatory=False) pre_args = vars(parser.parse_known_args(args=self.argv)[0]) for name, value in os.environ.iteritems(): m = re.match("BLACKDYNAMITE_(.*)",name) if (m): var = m.group(1).lower() if (var not in pre_args or pre_args[var] is None): if (var in self.admissible_params): self.argv.append("--" + var) self.argv.append(value) def parseBDParameters(self, argv = None): logger.debug("in") if argv == None: self.argv = list(sys.argv[1:]) else: self.argv = list(argv) logger.debug("program called with " + str(len(self.argv)) + " args " + str(self.argv) + "\n") logger.debug("env is\n\n") for k,v in os.environ.iteritems(): logger.debug("export " + k + "='" + v + "'\n") logger.debug("constructArgParser\n") parser = self.constructArgParser(add_help=False,add_mandatory=False) self.addEnvBDArguments(parser) logger.debug("parse_known_args\n") pre_args = parser.parse_known_args(args=self.argv)[0] logger.debug("createParamsMap\n") read_params = self.createParamsMap(vars(pre_args)) logger.debug("addModuleAdmissibleParameters\n") self.addModulesAdmissibleParameters(read_params) logger.debug("addModulesAdmissibleParametersForComplete\n") try: self.addModulesAdmissibleParametersForComplete(read_params) except KeyError as e: logger.debug("trace :" + traceback.format_exc()) logger.debug("constructArgParser\n") parser = self.constructArgParser() argcomplete.autocomplete(parser) pre_args = parser.parse_args(args=self.argv) read_params = self.createParamsMap(vars(pre_args)) if not "user" in read_params: read_params["user"] = pwd.getpwuid(os.getuid())[0] return read_params def __init__ (self): logger.debug("in") self.admissible_params = {} self.help = {} self.default_params = {} self.group_params = {} self.mandatory = {} self.admissible_params["study"] = str self.help["study"] = "Specify the study from the BlackDynamite database. This refers to the schemas in PostgreSQL language" self.admissible_params["host"] = str self.help["host"] = "Specify data base server address" self.admissible_params["port"] = int self.help["port"] = "Specify data base server port" self.admissible_params["user"] = str self.help["user"] = "Specify user name to connect to data base server" self.admissible_params["password"] = str self.help["password"] = "Provides the password" self.admissible_params["BDconf"] = list self.help["BDconf"] = "Path to a BlackDynamite file (*.bd) configuring current optons" self.admissible_params["truerun"] = bool self.help["truerun"] = "Set this flag if you want to truly perform the action on base. If not set all action are mainly dryrun" self.default_params["truerun"] = False self.admissible_params["job_constraints"] = [str] self.help["job_constraints"] = "This allows to constraint run selections by job properties" self.default_params["job_constraints"] = None self.admissible_params["run_constraints"] = [str] self.help["run_constraints"] = "This allows to constraint run selections by run properties" self.default_params["run_constraints"] = None self.admissible_params["binary_operator"] = str self.default_params["binary_operator"] = 'and' self.help["binary_operator"] = 'Set the default binary operator to make requests to database' self.admissible_params["list_parameters"] = bool self.help["list_parameters"] = "Request to list the possible job/run parameters" self.admissible_params["yes"] = bool self.default_params["yes"] = False self.help["yes"] = "Answer all questions to yes." self.admissible_params["logging"] = bool - self.help["logging"] = "Activate the logging system" + self.help["logging"] = "Activate the file logging system" self.group_params["BDParser"] = ["study", "host", "port", "user", "password", "BDconf", "truerun", "job_constraints", "run_constraints", "list_parameters"] self.readConfFileList = [] ################################################################ def validate_question(question, params, default_validated = True): logger.debug("in") if (default_validated): default_str = "(Y/n)" else: default_str = "(y/N)" if (params["yes"] == False): validated = raw_input("{0}? {1} ".format(question, default_str)) #print (validated) if (validated == "\n" or validated == ""): validated = default_validated elif(validated == "Y" or validated == "y"): validated = True else: validated = False else: print("{0}? {1} Forced Y".format(question, default_str)) validated = True return validated ################################################################ def filterParams(sub_list,total_list): logger.debug("in") new_list = {} for p in sub_list: if (p in total_list and total_list[p] is not False): new_list[p] = total_list[p] return new_list ################################################################ class RunParser(BDParser): """ """ def parseBDParameters(self): logger.debug("in") params = BDParser.parseBDParameters(self) params['run_name'], nb_subs = re.subn('\s', '_', params['run_name']) return params def __init__ (self): logger.debug("in") BDParser.__init__(self) self.mandatory["machine_name"] = True self.mandatory["nproc"] = True self.mandatory["run_name"] = True self.admissible_params["machine_name"] = str self.help["machine_name"] = "Specify the name of the machine where the job is to be launched" self.admissible_params["nproc"] = int self.help["nproc"] = "Specify the number of processors onto which this run is supposed to be launched" self.admissible_params["run_name"] = str self.help["run_name"] = "User friendly name given to this run. This is usually helpful to recall a run kind" self.default_params = {} self.default_params["job_constraints"] = None self.group_params["RunParser"] = [ "machine_name", "nproc", "run_name"] diff --git a/python/BlackDynamite/run.py b/python/BlackDynamite/run.py index 5d7c69d..941785b 100755 --- a/python/BlackDynamite/run.py +++ b/python/BlackDynamite/run.py @@ -1,352 +1,351 @@ #!/usr/bin/env python from __future__ import print_function __all__ = [ "Run" ] import job import runconfig import conffile import base import bdparser as bdp import sqlobject import sys import re import numpy as np import datetime ################################################################ -import bdlogging -import logging +import bdlogging,logging print = bdlogging.invalidPrint -logger = bdlogging.BDLogging.getLogger(__name__,level=logging.DEBUG) +logger = logging.getLogger(__name__) ################################################################ class Run(sqlobject.SQLObject): """ """ def start(self): self.entries['state'] = 'START' logger.debug ('starting run') self.update() logger.debug ('update done') self.base.commit() logger.debug ('commited') def attachToJob(self,job): self["job_id"] = job.id self.base.insert(self) self.addConfigFile(self.execfile) for cnffile in self.configfiles: self.addConfigFile(cnffile) def getExecFile(self): return self.getUpdatedConfigFile(self.entries["exec"]) def setExecFile(self, file_name): # check if the file is already in the config files for f in self.configfiles: if f.entries["filename"] == file_name: self.execfile = f self.entries["exec"] = f.id return f.id # the file is not in the current config files so it as to be added conf = conffile.addFile(file_name, self.base, regex_params = regex_params, params = params_list) self.configfiles.append(conf) self.execfile = conf self.entries["exec"] = conf.id return conf.id def addConfigFiles(self,file_list,regex_params=None): if not type(file_list) == list: file_list = [file_list] self.prepare() params_list = self.types.keys() myjob = job.Job(self.base) myjob.prepare() params_list += myjob.types.keys() #logger.debug (regex_params) file_ids = [f.id for f in self.configfiles] files_to_add = [conffile.addFile(fname,self.base, regex_params=regex_params, params=params_list) for fname in file_list] for f in files_to_add: if (f.id not in file_ids): self.configfiles.append(f) return self.configfiles def addConfigFile(self,configfile): myrun_config = runconfig.RunConfig(self.base) myrun_config.prepare() myrun_config.attachToRun(self) myrun_config.addConfigFile(configfile) self.base.insert(myrun_config) def getConfigFiles(self): # myjob = job.Job(self.base) # myjob["id"] = self.entries["job_id"] # myjob = self.getMatchedObjectList()[0] runconf = runconfig.RunConfig(self.base) runconf["run_id"] = self.id runconf_list = runconf.getMatchedObjectList() conffiles = [self.getUpdatedConfigFile(f["configfile_id"]) for f in runconf_list] return conffiles def getConfigFile(self,file_id): runconf = runconfig.RunConfig(self.base) conf = conffile.ConfFile(self.base) conf.prepare() conf["id"] = file_id conf = conf.getMatchedObjectList()[0] return conf def replaceBlackDynamiteVariables(self,text): myjob = job.Job(self.base) myjob.prepare() myjob["id"] = self.entries["job_id"] myjob = myjob.getMatchedObjectList()[0] for key,val in myjob.entries.iteritems(): tmp = text.replace("__BLACKDYNAMITE__" + key + "__",str(val)) if ((not tmp == text) and val is None): raise Exception( "unset job parameter " + key) text = tmp for key,val in self.entries.iteritems(): tmp = text.replace("__BLACKDYNAMITE__" + key + "__",str(val)) if ((not tmp == text) and val is None): logger.debug(self.entries) raise Exception( "unset run parameter " + key) text = tmp text = text.replace("__BLACKDYNAMITE__dbhost__",self.base.dbhost) text = text.replace("__BLACKDYNAMITE__study__",self.base.schema) text = text.replace("__BLACKDYNAMITE__run_id__",str(self.id)) return text def getUpdatedConfigFile(self,file_id): conf = self.getConfigFile(file_id) # myjob = job.Job(self.base) # myjob.prepare() # myjob["id"] = self.entries["job_id"] # myjob = myjob.getMatchedObjectList()[0] # for key,val in myjob.entries.iteritems(): # tmp = conf["file"].replace("__BLACKDYNAMITE__" + key + "__",str(val)) # if ((not tmp == conf["file"]) and val is None): # raise Exception( "unset job parameter " + key) # conf["file"] = tmp # # for key,val in self.entries.iteritems(): # tmp = conf["file"].replace("__BLACKDYNAMITE__" + key + "__",str(val)) # if ((not tmp == conf["file"]) and not val): # raise Exception( "unset run parameter " + key) # conf["file"] = tmp # # conf["file"] = conf["file"].replace("__BLACKDYNAMITE__dbhost__",self.base.dbhost) # conf["file"] = conf["file"].replace("__BLACKDYNAMITE__study__",self.base.schema) # conf["file"] = conf["file"].replace("__BLACKDYNAMITE__run_id__",str(self.id)) conf["file"] = self.replaceBlackDynamiteVariables(conf["file"]) return conf def listQuantities(self): request = "SELECT id,name FROM {0}.quantities WHERE (is_vector) = (false)".format(self.base.schema) curs = self.base.performRequest(request) all_quantities = [res[1] for res in curs] return all_quantities def getScalarQuantities(self,names,additional_request=None): request = "SELECT id,name FROM {0}.quantities WHERE (is_vector) = (false)".format(self.base.schema) params = [] if (names): if (not type(names) == list): names = [names] request += " and (" for name in names: similar_op_match = re.match(r"\s*(~)\s*(.*)", name) op = " = " if (similar_op_match): op = " ~ " name = similar_op_match.group(2) request += " name " + op + "%s or" params.append(str(name)) request = request[:len(request)-3] + ")" # logger.debug (request) # logger.debug (params) curs = self.base.performRequest(request,params) quantities = [res[1] for res in curs] if (len(quantities) == 0): logger.debug ("No quantity matches " + str(names)) logger.debug ("Quantities declared in the database are \n" + "\n".join([res for res in self.listQuantities()])) sys.exit(-1) return None try: quant_indexes = [quantities.index(n) for n in names] quantities = names except: quant_indexes = None # logger.debug (quant) results = [] for key in quantities: q = self.getScalarQuantity(key,additional_request) if (q is not None): results.append([key,q]) logger.debug ("got Quantity " + str(key) + " : " + str(q.shape[0]) + " values") return results def getLastStep(self): request = """SELECT max(b.max),max(b.time) from ( SELECT max(step) as max,max(computed_at) as time from {0}.scalar_integer where run_id = {1} union SELECT max(step) as max,max(computed_at) as time from {0}.scalar_real where run_id = {1} union SELECT max(step) as max,max(computed_at) as time from {0}.vector_integer where run_id = {1} union SELECT max(step) as max,max(computed_at) as time from {0}.vector_real where run_id = {1}) as b""".format(self.base.schema,self.id) # logger.debug (request) curs = self.base.performRequest(request,[]) item = curs.fetchone() if (item is not None): return item[0],item[1] def getQuantityID(self,name, is_integer = None, is_vector = None): request = "SELECT id,is_integer,is_vector FROM {0}.quantities WHERE (name) = (%s)".format(self.base.schema) curs = self.base.performRequest(request,[name]) item = curs.fetchone() if (item is None): raise Exception("unknown quantity \"" + name + "\"") if ((is_integer is not None) and (not is_integer == item[1])): raise Exception("quantity \"" + name + "\" has is_integer = " + str(item[1])) if ((is_vector is not None) and (not is_vector == item[2])): raise Exception("quantity \"" + name + "\" has is_vector = " + str(item[2])) return item[0],item[1],item[2] def getScalarQuantity(self,name,additional_request=None): quantity_id,is_integer,is_vector = self.getQuantityID(name) if (is_vector == True): raise Exception("Quantity " + name + " is not scalar") request = "SELECT step,measurement from {0}.{1} WHERE (run_id,quantity_id) = ({2},{3})".format(self.base.schema,"scalar_real" if (is_integer == False) else "scalar_integer",self.id,quantity_id) if (additional_request): request += " and " + " and ".join(additional_request) request += " ORDER BY step" curs = self.base.performRequest(request,[name]) fetch = curs.fetchall() if (not fetch): return None return (np.array([(val[0],val[1]) for val in fetch])) def getVectorQuantity(self,name, step): quantity_id,is_integer,is_vector = self.getQuantityID(name) if (is_vector == False): raise Exception("Quantity " + name + " is not vectorial") request = "SELECT measurement from {0}.{1} WHERE (run_id,quantity_id,step) = ({2},{3},{4})".format(self.base.schema,"vector_real" if (is_integer == False) else "vector_integer",self.id,quantity_id,step) curs = self.base.performRequest(request,[name]) fetch = curs.fetchone() if (fetch): return np.array(fetch[0]) return None def pushVectorQuantity(self,vec,step,name, is_integer, description = None): logger.debug ('pushing {0}'.format(name)) try: quantity_id,is_integer,is_vector = self.getQuantityID(name,is_integer = is_integer, is_vector = True) except Exception as e: typecode = "int" if is_integer == False: typecode = "float" typecode += ".vector" quantity_id = self.base.pushQuantity(name,typecode,description) array = [i for i in vec] if (is_integer == True): array_format = ",".join(["{:d}".format(i) for i in vec]) request = "INSERT INTO {0}.{1} (run_id,quantity_id,measurement,step) VALUES (%s,%s,%s,%s)".format(self.base.schema,"vector_real" if (is_integer == False) else "vector_integer") curs = self.base.performRequest(request,[self.id,quantity_id,array,step]) logger.debug ('ready to commit') self.base.commit() logger.debug ('commited') def pushScalarQuantity(self,val,step, name, is_integer, description = None): logger.debug ('pushing {0}'.format(name)) try: quantity_id,is_integer,is_vector = self.getQuantityID(name,is_integer = is_integer, is_vector = False) except Exception as e: typecode = "int" if is_integer == False: typecode = "float" quantity_id = self.base.pushQuantity(name,typecode,description) request = "INSERT INTO {0}.{1} (run_id,quantity_id,measurement,step) VALUES (%s,%s,%s,%s)".format(self.base.schema,"scalar_real" if (is_integer == False) else "scalar_integer") curs = self.base.performRequest(request,[self.id,quantity_id,val,step]) logger.debug ('ready to commit') self.base.commit() logger.debug ('commited') def getAllVectorQuantity(self,name): quantity_id,is_integer,is_vector = self.getQuantityID(name,is_vector = True) request = "SELECT step,measurement from {0}.{1} WHERE (run_id,quantity_id) = ({2},{3}) order by step".format(self.base.schema,"vector_real" if (is_integer == False) else "vector_integer",self.id,quantity_id) curs = self.base.performRequest(request,[name]) fetch = curs.fetchall() if (not fetch): return [None,None] matres = np.array([val[1] for val in fetch]) stepres = np.array([val[0] for val in fetch]) return (stepres,matres) def deleteData(self): request,params = "DELETE FROM {0}.scalar_real WHERE run_id={1}".format(self.base.schema,self.id),[] self.base.performRequest(request,params) request,params = "DELETE FROM {0}.scalar_integer WHERE run_id={1}".format(self.base.schema,self.id),[] self.base.performRequest(request,params) request,params = "DELETE FROM {0}.vector_real WHERE run_id={1}".format(self.base.schema,self.id),[] self.base.performRequest(request,params) request,params = "DELETE FROM {0}.vector_integer WHERE run_id={1}".format(self.base.schema,self.id),[] self.base.performRequest(request,params) def __init__ (self,base): sqlobject.SQLObject.__init__(self,base) self.table_name = "runs" self.foreign_keys["job_id"] = "jobs" self.types["machine_name"] = str self.types["run_path"] = str self.allowNull["run_path"] = True self.types["job_id"] = int self.types["nproc"] = int self.types["run_name"] = str self.types["wait_id"] = int self.allowNull["wait_id"] = True self.types["start_time"] = datetime.datetime self.allowNull["start_time"] = True self.types["state"] = str self.allowNull["state"] = True self.execfile = None self.configfiles = [] self.types["exec"] = str ################################################################