diff --git a/BlackDynamite/bdparser.py b/BlackDynamite/bdparser.py index dab5a90..37eda97 100755 --- a/BlackDynamite/bdparser.py +++ b/BlackDynamite/bdparser.py @@ -1,600 +1,592 @@ #!/usr/bin/env python from __future__ import print_function __all__ = [ "BDParser", "RunParser" ] import BlackDynamite as BD import sys import re import os import stat import pwd import base import run import argcomplete, argparse from argcomplete.completers import EnvironCompleter import traceback from types import ModuleType import imp ################################################################ import bdlogging,logging print = bdlogging.invalidPrint logger = logging.getLogger(__name__) ################################################################ class BDParser(object): """ """ def listPossibleHosts(self): logger.debug("in") bd_dir = os.path.expanduser("~/.blackdynamite") bd_hosts = os.path.join(bd_dir,'hosts') hosts = [] try: hosts += [h.strip() for h in open(bd_hosts)] except: pass return hosts def listPossibleModules(self,pre_args): logger.debug("in") paths = [] if ("PYTHONPATH" in os.environ): paths = os.environ["PYTHONPATH"].split(':') if ("module_path" in pre_args) : paths += pre_args["module_path"].split(':') paths += BD.__path__ paths += [ path + "/coating" for path in BD.__path__ ] module_list = [] paths = [p.strip() for p in paths if not p.strip() == ''] for p in paths: files = os.listdir(p) files = [f for f in files if os.path.splitext(f)[1] == '.py'] files = [f for f in files if not f[0] == '_'] matching_string = ".*blackdynamite.*" files = [os.path.splitext(f)[0] for f in files if re.match(matching_string,open(os.path.join(p,f)).read().replace('\n',' '),flags=re.IGNORECASE)] module_list += files logger.debug("found these files " + str(module_list)) return module_list def updatePossibleHosts(self,new_host): logger.debug("in") bd_dir = os.path.expanduser("~/.blackdynamite") bd_hosts = os.path.join(bd_dir,'hosts') hosts = set(self.listPossibleHosts()) hosts.add(new_host) f = open(bd_hosts,'w') for h in hosts: f.write(h+'\n') def completer(self,prefix,**kwargs): #bdlogging.activateFileLogging() try: params = vars(kwargs["parsed_args"]) if params['logging'] is True: bdlogging.activateFileLogging() logger.debug("in") logger.debug("BDparser prefix " + str(prefix) + "\n") for k, v in kwargs.iteritems(): logger.debug("kwargs[" + str(k) + "] = " + str(v) + "\n") logger.debug("dest " + str(vars(kwargs["action"])["dest"]) + "\n") for k in params.keys(): if params[k] is None: del params[k] key = vars(kwargs["action"])["dest"] logger.debug("key " + str(key) + "\n") if (key == "BDconf"): return self.listPossibleConf() if 'BDconf' in params: self.readConfFiles(params,params['BDconf']) if 'host' in params: self.readConfFile(params,params['host']+'.bd') logger.debug("key " + str(key) + "\n") for k in params.keys(): logger.debug("params = " + str(k)) if (key == "host"): return self.listPossibleHosts() if (key == "study"): params["should_not_check_study"] = True mybase = base.Base(**params) return mybase.getSchemaList() if (key == 'quantity'): mybase = base.Base(**params) myrun = run.Run(mybase) return myrun.listQuantities() if (key in self.admissible_params and self.admissible_params[key] == ModuleType): logger.debug("trying to complete module list for '{}'".format(key) ) return self.listPossibleModules(params) except Exception as e: logger.debug(traceback.format_exc()) logger.debug(str(e)) return [] def listPossibleConf(self): logger.debug("in") files = [] for dir in ["./", os.path.expanduser("~/.blackdynamite")]: for filename in os.listdir(dir): fileName, fileExtension = os.path.splitext(filename) if (fileExtension == ".bd"): files.append(filename) return files return files def readConfFiles(self,read_params,fnames): logger.debug("in") for f in fnames: self.readConfFile(read_params,f) def readConfFile(self,read_params,fname): logger.debug("in") logger.debug("readConfFileList {0}".format(self.readConfFileList)) if fname in self.readConfFileList: return self.readConfFileList.append(fname) if type(fname) == list: raise Exception('cannot use list in that function: ' + str(type(fname))) pre_args = {} for dir in ["./",os.path.expanduser("~/.blackdynamite")]: fullpath = os.path.join(dir,fname) if (os.path.isfile(fullpath)): fname = fullpath break try: with open(fname) as fh: logger.debug("loading file '{0}'".format(fname)) os.chmod(fname,stat.S_IREAD|stat.S_IWRITE) lines = [line.strip() for line in fh] regex = "(.*)=(.*)" for line in lines: match = re.match(regex,line) if (not match): print("malformed line:" + line) sys.exit(-1) param = match.group(1).strip() val = match.group(2).strip() pre_args[param] = val self.argv.append("--"+param) self.argv.append(val) logger.debug("read parameters: '{0}'".format(self.argv)) logger.debug("pre args : '{0}'".format(pre_args)) read_params.update(self.createParamsMap(pre_args)) except Exception as e: logger.debug("cannot open file " + fname + '\n' + str(e) + '\n' + str(traceback.format_exc())) logger.debug("out") def checkParam(self,p,dico): logger.debug("in") print("****************") print("Obsolete: should use the mandatory argument for the declare_params function") print("It was used by object " + str(self) + " for keyword " + p) print("FATAL => ABORT") print("****************") sys.exit(-1) def loadModule(self,read_params,myscript,pre_args): logger.debug("in") paths = [] if ("PYTHONPATH" in os.environ): paths = os.environ["PYTHONPATH"].split(':') if ("module_path" in pre_args) : paths += pre_args["module_path"].split(':') paths += BD.__path__ paths += [ path + "/coating" for path in BD.__path__ ] mymod = None for p in paths: try: modfile = os.path.join(p,myscript+".py") #print("loading file " + modfile) mymod = imp.load_source(myscript,modfile) break except IOError as io_err: logger.debug("loadModule " + str(io_err)) logger.debug("loadModule " + str(mymod)) if (mymod is None): logger.debug("cannot find module '" + myscript + "' from paths " + str(paths)) logger.debug("trace :" + traceback.format_exc()+ '\n') raise Exception("cannot find module '" + myscript + "' from paths " + str(paths)) return mymod def createParamsMap(self,pre_args): logger.debug("in") read_params = {} if ('logging' in pre_args) and (pre_args['logging'] is True): bdlogging.activateFileLogging() for opt,args in pre_args.iteritems(): logger.debug("createParamsMap1 " + str(opt) + " : " + str(args)) if (args is None): continue if (not type(args) == list): args = [args] if (type(args) == str) : args = [args] logger.debug("createParamsMap2 " + str(opt) + " : " + str(args)) for arg in args: if (arg is None): continue if (opt == 'BDconf'): if (not type(arg) == list): arg = [arg] self.readConfFiles(read_params,arg) continue if (opt == 'host'): self.updatePossibleHosts(arg) self.readConfFile(read_params,arg+'.bd') for param, typ in self.admissible_params.iteritems(): if opt == param: logger.debug("createParamsMap3 " + str(param) + " : " + str(typ)) if (typ == ModuleType): read_params[param] = self.loadModule(read_params,arg,pre_args) logger.debug("createParamsMap4 " + str(param) + " : " + str(typ)) elif (type(typ) == list): args = arg.split(",") if (not param in read_params): read_params[param] = [] if (not len(typ) == 1) : subtype = str else : subtype = typ[0] for i in range(0,len(args)): read_params[param].append(subtype(args[i])) else: read_params[param] = typ(arg) break return read_params def addModulesAdmissibleParameters(self,read_params): logger.debug("in") for k,v in read_params.iteritems(): if self.admissible_params[k] == ModuleType: mymod = read_params[k] modname = mymod.__name__ if "admissible_params" in mymod.__dict__: self.admissible_params.update(mymod.__dict__["admissible_params"]) self.group_params["'" + modname + "' module options" ] = mymod.__dict__["admissible_params"].keys() if "default_params" in mymod.__dict__: self.default_params.update(mymod.__dict__["default_params"]) if "help" in mymod.__dict__: self.help.update(mymod.__dict__["help"]) if "mandatory" in mymod.__dict__: self.mandatory.update(mymod.__dict__["mandatory"]) logger.debug(str(self.admissible_params)) def addModulesAdmissibleParametersForComplete(self,read_params): logger.debug("in") if not "_ARGCOMPLETE" in os.environ: return logger.debug("arg complete ? " + os.environ["_ARGCOMPLETE"]) # breaks = os.environ["COMP_WORDBREAKS"] tmp_read_params = {} breaks = " |=|&|<|>|;" logger.debug("break line " + os.environ["COMP_LINE"]) all_args = re.split(breaks,os.environ["COMP_LINE"]) logger.debug("break line " + str(all_args)) for i in range(0,len(all_args)): a = all_args[i] res = re.match("--(.*)",a) if res is None: continue a = res.group(1) logger.debug("treating a " + str(a) ) if a in self.admissible_params: if self.admissible_params[a] == ModuleType: logger.debug("here treating a " + str(a) ) if i+1 >= len(all_args): continue b = all_args[i+1] logger.debug("treating b " + str(b) ) res = re.match("--(.*)",b) if res is not None: continue if not b.strip() == '': tmp_read_params[a] = b if ("module_path" in read_params): tmp_read_params["module_path"] = read_params["module_path"] logger.debug("tmp_read_params " + str(tmp_read_params) ) try: tmp_read_params = self.createParamsMap(tmp_read_params) except Exception as e: logger.debug("trace :" + traceback.format_exc()+ '\n' + str(e)) logger.debug("AAAAAAAAA " + str(tmp_read_params)) try: self.addModulesAdmissibleParameters(tmp_read_params) except Exception as e: logger.debug("trace :" + traceback.format_exc()+ '\n' + str(e)) logger.debug("CCCCCCCCCC" + str(self.admissible_params)) def constructArgParser(self,add_help=True,add_mandatory=True): logger.debug("in") parser = argparse.ArgumentParser(description = "BlackDynamite option parser",formatter_class=argparse.ArgumentDefaultsHelpFormatter,add_help=add_help) self.params_group = {} group = parser.add_argument_group("General") self.params_group["General"] = group for g,param_list in self.group_params.iteritems(): group = parser.add_argument_group(g) for p in param_list: self.params_group[p] = group for param, typ in self.admissible_params.iteritems(): #print ("param {0}: {1}".format(param,typ) ) p_help = "help TODO" is_mandatory = (param in self.mandatory.keys() and self.mandatory[param] == True and add_mandatory) if (param in self.help) : p_help = self.help[param] if (param in self.params_group): grp = self.params_group[param] else : grp = self.params_group["General"] if (typ is None): raise Exception("Deprectated option type for " + param + " : should be changed to 'bool'") if (typ is bool): if (param in self.default_params and self.default_params[param] == True): grp.add_argument("--" + param, help = p_help, dest = param, action = 'store_false', required = is_mandatory) else: grp.add_argument("--" + param, help = p_help, dest = param, action = 'store_true', required = is_mandatory) elif (typ is list or typ == [str]): grp.add_argument("--" + param, action = 'append', dest = param, help = p_help, required = is_mandatory).completer = self.completer else: grp.add_argument("--" + param, dest = param, help = p_help, required = is_mandatory).completer = self.completer parser.set_defaults(**self.default_params) return parser def register_params(self,group="General",params=None,defaults=None,help=None, mandatory=None): logger.debug("in") if (params is not None): self.admissible_params.update(params) if group not in self.group_params: self.group_params[group] = [] self.group_params[group] += params.keys() if (defaults is not None): self.default_params.update(defaults) for key in defaults.keys(): self.mandatory[key] = False if (help is not None): self.help.update(help) if (mandatory is not None): self.mandatory.update(mandatory) for param,typ in self.admissible_params.iteritems(): if typ == bool and param not in self.default_params: self.default_params[param] = False def addEnvBDArguments(self,parser): logger.debug("in") parser = self.constructArgParser(add_help=False,add_mandatory=False) pre_args = vars(parser.parse_known_args(args=self.argv)[0]) for name, value in os.environ.iteritems(): m = re.match("BLACKDYNAMITE_(.*)",name) if (m): var = m.group(1).lower() if (var not in pre_args or pre_args[var] is None): if (var in self.admissible_params): self.argv.append("--" + var) self.argv.append(value) def parseBDParameters(self, argv = None): logger.debug("in") if argv is None: self.argv = list(sys.argv[1:]) else: self.argv = list(argv) logger.debug("program called with " + str(len(self.argv)) + " args " + str(self.argv) + "\n") logger.debug("env is\n\n") for k,v in os.environ.iteritems(): logger.debug("export " + k + "='" + v + "'\n") logger.debug("constructArgParser\n") parser = self.constructArgParser(add_help=False,add_mandatory=False) self.addEnvBDArguments(parser) logger.debug("parse_known_args\n") pre_args = parser.parse_known_args(args=self.argv)[0] logger.debug("createParamsMap\n") read_params = self.createParamsMap(vars(pre_args)) logger.debug("addModuleAdmissibleParameters\n") self.addModulesAdmissibleParameters(read_params) logger.debug("addModulesAdmissibleParametersForComplete\n") try: self.addModulesAdmissibleParametersForComplete(read_params) except KeyError as e: logger.debug("trace :" + traceback.format_exc()) logger.debug("constructArgParser\n") parser = self.constructArgParser() argcomplete.autocomplete(parser) pre_args = parser.parse_args(args=self.argv) read_params = self.createParamsMap(vars(pre_args)) if not "user" in read_params: read_params["user"] = pwd.getpwuid(os.getuid())[0] return read_params def __init__ (self): logger.debug("in") self.admissible_params = {} self.help = {} self.default_params = {} self.group_params = {} self.mandatory = {} self.admissible_params["study"] = str self.help["study"] = "Specify the study from the BlackDynamite database. This refers to the schemas in PostgreSQL language" self.admissible_params["host"] = str self.help["host"] = "Specify data base server address" self.admissible_params["port"] = int self.help["port"] = "Specify data base server port" self.admissible_params["user"] = str self.help["user"] = "Specify user name to connect to data base server" self.admissible_params["password"] = str self.help["password"] = "Provides the password" self.admissible_params["BDconf"] = list self.help["BDconf"] = "Path to a BlackDynamite file (*.bd) configuring current optons" self.admissible_params["truerun"] = bool self.help["truerun"] = "Set this flag if you want to truly perform the action on base. If not set all action are mainly dryrun" self.default_params["truerun"] = False self.admissible_params["constraints"] = [str] self.help["constraints"] = "This allows to constraint run/job selections by properties" self.default_params["constraints"] = None - self.admissible_params["job_constraints"] = [str] - self.help["job_constraints"] = "This allows to constraint run selections by job properties" - self.default_params["job_constraints"] = None - - self.admissible_params["run_constraints"] = [str] - self.help["run_constraints"] = "This allows to constraint run selections by run properties" - self.default_params["run_constraints"] = None - self.admissible_params["binary_operator"] = str self.default_params["binary_operator"] = 'and' self.help["binary_operator"] = 'Set the default binary operator to make requests to database' self.admissible_params["list_parameters"] = bool self.help["list_parameters"] = "Request to list the possible job/run parameters" self.admissible_params["yes"] = bool self.default_params["yes"] = False self.help["yes"] = "Answer all questions to yes." self.admissible_params["logging"] = bool self.help["logging"] = "Activate the file logging system" self.group_params["BDParser"] = ["study", "host", "port", "user", "password", "BDconf", "truerun", "constraints", "job_constraints", "run_constraints", "list_parameters"] self.readConfFileList = [] ################################################################ def validate_question(question, params, default_validated = True): logger.debug("in") if (default_validated): default_str = "(Y/n)" else: default_str = "(y/N)" if (params["yes"] == False): validated = raw_input("{0}? {1} ".format(question, default_str)) #print (validated) if (validated == "\n" or validated == ""): validated = default_validated elif(validated == "Y" or validated == "y"): validated = True else: validated = False else: logger.info("{0}? {1} Forced Y".format(question, default_str)) validated = True return validated ################################################################ def filterParams(sub_list,total_list): logger.debug("in") new_list = {} for p in sub_list: if (p in total_list and total_list[p] is not False): new_list[p] = total_list[p] return new_list ################################################################ class RunParser(BDParser): """ """ def parseBDParameters(self): logger.debug("in") params = BDParser.parseBDParameters(self) params['run_name'], nb_subs = re.subn('\s', '_', params['run_name']) return params def __init__ (self): logger.debug("in") BDParser.__init__(self) self.mandatory["machine_name"] = True self.mandatory["nproc"] = True self.mandatory["run_name"] = True self.admissible_params["machine_name"] = str self.help["machine_name"] = "Specify the name of the machine where the job is to be launched" self.admissible_params["nproc"] = int self.help["nproc"] = "Specify the number of processors onto which this run is supposed to be launched" self.admissible_params["run_name"] = str self.help["run_name"] = "User friendly name given to this run. This is usually helpful to recall a run kind" self.default_params = {} self.default_params["job_constraints"] = None self.group_params["RunParser"] = [ "machine_name", "nproc", "run_name"] diff --git a/BlackDynamite/constraints.py b/BlackDynamite/constraints.py index a7ad1ec..6b5e9c5 100644 --- a/BlackDynamite/constraints.py +++ b/BlackDynamite/constraints.py @@ -1,204 +1,201 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- ################################################################ from __future__ import print_function ################################################################ import sqlobject import job import run import pyparsing as pp ################################################################ class BDconstraints(object): "" def __init__(self, base, constraints): self.constraints = constraints self.base = base self.conditions = None if isinstance(constraints, dict): if "constraints" in constraints: self.constraints = constraints["constraints"] if "run_id" in constraints: self.constraints = [ 'runs.id = {0}'.format(constraints['run_id'])] if "job_id" in constraints: self.constraints = [ 'jobs.id = {0}'.format(constraints['job_id'])] if not isinstance(self.constraints, list): self.constraints = [self.constraints] self.constraint_parser = BDconstraintsParser(self.base) def _pushCondition(self, _cond, _params): if self.conditions != '': self.conditions += ' and ' self.conditions += _cond self.params += _params def pushConditionFromSQLObject(self, _cstr): _cond = [] _params = [] sql_obj = _cstr for k, v in sql_obj.entries.iteritems(): _cond.append('({0}.{1} = %s)'.format( sql_obj.table_name, k)) _params.append(v) _cond = ' and '.join(_cond) self._pushCondition(_cond, _params) def pushConditionFromString(self, _cstr): _cond, _params = self.constraint_parser.parse(_cstr) self._pushCondition(_cond, _params) def getMatchingCondition(self): self.conditions = "" self.params = [] for _cstr in self.constraints: if isinstance(_cstr, str): self.pushConditionFromString(_cstr) if isinstance(_cstr, sqlobject.SQLObject): self.pushConditionFromSQLObject(_cstr) - print(self.conditions) - print(str(self.params)[:200]) + # print(self.conditions) + # print(self.params) return self.conditions, self.params ################################################################ class BDconstraintsParser(object): def __init__(self, base): self.base = base self._params = [] self.ref_run = run.Run(self.base) self.ref_job = job.Job(self.base) - self.ref_run.prepare() - self.ref_job.prepare() # rule for entry in the sqlobject var = pp.Word(pp.alphanums+'_') prefix = (pp.Literal('runs') | pp.Literal('jobs')) + pp.Literal('.') entry = pp.Optional(prefix) + var def check_varname(tokens): # print (tokens) res = pp.ParseResults(''.join(tokens)) if len(tokens) == 3: obj_type = tokens[0] var_name = tokens[2].lower() else: obj_type = None var_name = tokens[0].lower() if obj_type is None: job_var = var_name in self.ref_job.types run_var = var_name in self.ref_run.types if job_var and run_var: raise RuntimeError( 'ambiguous variable: {0}\n{1}'.format( res, self.base.getPossibleParameters())) if job_var: res.type = self.ref_job.types[var_name] elif run_var: res.type = self.ref_run.types[var_name] else: raise RuntimeError( 'unknown variable: {0}\n{1}'.format( res[0], self.base.getPossibleParameters())) else: if obj_type == 'runs': ref_obj = self.ref_run elif obj_type == 'jobs': ref_obj = self.ref_job if var_name not in ref_obj.types: raise RuntimeError( 'unknown variable: "{0}"\n{1}'.format( var_name, ref_obj.types)) res.type = ref_obj.types[var_name] return res entry = entry.setParseAction(check_varname) # rule to parse the operators operators = [ '+', # addition 2 + 3 5 '-', # subtraction 2 - 3 -1 '*', # multiplication 2 * 3 6 '/', # division (integer division truncates the result) '%', # modulo (remainder) 5 % 4 1 '^', # exponentiation 2.0 ^ 3.0 8 '<', # less than '>', # greater than '<=', # less than or equal to '>=', # greater than or equal to '=', # equal '!=', # not equal '~', # Matches regular expression, case sensitive '~*', # Matches regular expression, case insensitive '!~', # Does not match regular expression, case sensitive '!~*' # Does not match regular expression, case insensitive ] ops = pp.Literal(operators[0]) for o in operators[1:]: ops |= pp.Literal(o) # parse a constraint of the form 'var operator value' and flatten it constraint = pp.Group(entry + ops + pp.Word(pp.alphanums+'.')) def regroup_constraints(tokens): expected_type = tokens[0].type key = tokens[0][0] op = tokens[0][1] val = tokens[0][2] try: parse_res = entry.parseString(val) if parse_res.type != expected_type: raise RuntimeError('no the correct type') val = parse_res[0] except: - self._params.append(tokens[0]) + self._params.append(val) val = '%s' res = ('(' + ' '.join([str(key), str(op), str(val)]) + ')') return res constraint = constraint.setParseAction(regroup_constraints) separator = (pp.Literal(',').setParseAction( lambda tokens: 'and') | pp.Literal('and')) self.constraints = (constraint + pp.Optional( pp.OneOrMore(separator + constraint))).setParseAction( lambda tokens: ' '.join(tokens)) def parse(self, _str): - # print(_str) self._params = [] res = self.constraints.parseString(_str) res = ' '.join(res) return res, self._params diff --git a/BlackDynamite/run.py b/BlackDynamite/run.py index 8038b1e..77bc8fa 100755 --- a/BlackDynamite/run.py +++ b/BlackDynamite/run.py @@ -1,464 +1,463 @@ #!/usr/bin/env python from __future__ import print_function ################################################################ import job import runconfig import conffile import sqlobject import sys import re import numpy as np import datetime import bdlogging import logging import bdparser import base import runselector import subprocess import socket ################################################################ __all__ = ['Run', 'getRunFromScript'] print = bdlogging.invalidPrint logger = logging.getLogger(__name__) ################################################################ class Run(sqlobject.SQLObject): """ """ table_name = 'runs' def getJob(self): return self.base.getJobFromID(self.entries["job_id"]) def start(self): self.entries['state'] = 'START' logger.debug('starting run') self.update() logger.debug('update done') self.base.commit() logger.debug('commited') def finish(self): self.entries['state'] = 'FINISHED' logger.debug('finish run') self.update() logger.debug('update done') self.base.commit() logger.debug('commited') def attachToJob(self, job): self["job_id"] = job.id self.base.insert(self) self.addConfigFile(self.execfile) for cnffile in self.configfiles: self.addConfigFile(cnffile) def getExecFile(self): return self.getUpdatedConfigFile(self.entries["exec"]) def setExecFile(self, file_name, **kwargs): # check if the file is already in the config files for f in self.configfiles: if f.entries["filename"] == file_name: self.execfile = f self.entries["exec"] = f.id return f.id # the file is not in the current config files # so it has to be added conf = conffile.addFile(file_name, self.base, **kwargs) self.configfiles.append(conf) self.execfile = conf self.entries["exec"] = conf.id return conf.id def listFiles(self): command = 'ls {0}'.format(self['run_path']) if not self['machine_name'] == socket.gethostname(): command = 'ssh {0} "{1}"'.format(self['machine_name'], command) logger.warning(command) p = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE) out = p.stdout.readlines() out = [o.strip() for o in out] return out def getFile(self, filename, outpath='/tmp'): import subprocess import os dest_path = os.path.join( outpath, "BD-" + self.base.schema + "-cache", "run-{0}".format(self.id)) dest_file = os.path.join(dest_path, filename) if self['machine_name'] == socket.gethostname(): return self.getFullFileName(filename) # logger.warning(dest_path) # logger.warning(dest_file) try: os.makedirs(dest_path) except Exception as e: # logger.error(e) pass if os.path.isfile(dest_file): return dest_file cmd = 'scp {0}:{1} {2}'.format(self['machine_name'], self.getFullFileName(filename), dest_file) logger.warning(cmd) p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE) logger.warning(p.stdout.read()) return dest_file def getFullFileName(self, filename): import os return os.path.join(self['run_path'], filename) def addConfigFiles(self, file_list, regex_params=None): if not type(file_list) == list: file_list = [file_list] self.prepare() params_list = self.types.keys() myjob = job.Job(self.base) myjob.prepare() params_list += myjob.types.keys() # logger.debug (regex_params) file_ids = [f.id for f in self.configfiles] files_to_add = [ conffile.addFile( fname, self.base, regex_params=regex_params, params=params_list) for fname in file_list] for f in files_to_add: if (f.id not in file_ids): self.configfiles.append(f) return self.configfiles def addConfigFile(self, configfile): myrun_config = runconfig.RunConfig(self.base) myrun_config.prepare() myrun_config.attachToRun(self) myrun_config.addConfigFile(configfile) self.base.insert(myrun_config) def getConfigFiles(self): # myjob = job.Job(self.base) # myjob["id"] = self.entries["job_id"] # myjob = self.getMatchedObjectList()[0] runconf = runconfig.RunConfig(self.base) runconf["run_id"] = self.id runconf_list = runconf.getMatchedObjectList() + logger.info(runconf_list) conffiles = [ self.getUpdatedConfigFile(f["configfile_id"]) for f in runconf_list] return conffiles def getConfigFile(self, file_id): # runconf = runconfig.RunConfig(self.base) conf = conffile.ConfFile(self.base) - conf.prepare() conf["id"] = file_id conf = conf.getMatchedObjectList()[0] return conf def replaceBlackDynamiteVariables(self, text): myjob = job.Job(self.base) - myjob.prepare() myjob["id"] = self.entries["job_id"] myjob = myjob.getMatchedObjectList()[0] for key, val in myjob.entries.iteritems(): tmp = text.replace("__BLACKDYNAMITE__" + key + "__", str(val)) if ((not tmp == text) and val is None): raise Exception("unset job parameter " + key) text = tmp for key, val in self.entries.iteritems(): tmp = text.replace("__BLACKDYNAMITE__" + key + "__", str(val)) if ((not tmp == text) and val is None): logger.debug(self.entries) raise Exception("unset run parameter " + key) text = tmp text = text.replace("__BLACKDYNAMITE__dbhost__", self.base.dbhost) text = text.replace("__BLACKDYNAMITE__study__", self.base.schema) text = text.replace("__BLACKDYNAMITE__run_id__", str(self.id)) return text def getUpdatedConfigFile(self, file_id): conf = self.getConfigFile(file_id) conf["file"] = self.replaceBlackDynamiteVariables(conf["file"]) return conf def listQuantities(self): request = "SELECT id,name FROM {0}.quantities".format(self.base.schema) curs = self.base.performRequest(request) all_quantities = [res[1] for res in curs] return all_quantities def getScalarQuantities(self, names, additional_request=None): request = ("SELECT id,name FROM {0}.quantities " "WHERE (is_vector) = (false)").format( self.base.schema) params = [] if (names): if (not type(names) == list): names = [names] request += " and (" for name in names: similar_op_match = re.match(r"\s*(~)\s*(.*)", name) op = " = " if (similar_op_match): op = " ~ " name = similar_op_match.group(2) request += " name " + op + "%s or" params.append(str(name)) request = request[:len(request)-3] + ")" # logger.debug (request) # logger.debug (params) curs = self.base.performRequest(request, params) quantities = [res[1] for res in curs] if (len(quantities) == 0): logger.debug("No quantity matches " + str(names)) logger.debug("Quantities declared in the database are \n" + "\n".join( [res for res in self.listQuantities()])) sys.exit(-1) return None try: quant_indexes = [quantities.index(n) for n in names] logger.debug(quant_indexes) quantities = names except: # quant_indexes = None pass # logger.debug (quant) results = [] for key in quantities: q = self.getScalarQuantity(key, additional_request) if (q is not None): results.append([key, q]) logger.debug("got Quantity " + str(key) + " : " + str(q.shape[0]) + " values") return results def getLastStep(self): request = """SELECT max(b.max),max(b.time) from ( SELECT max(step) as max,max(computed_at) as time from {0}.scalar_integer where run_id = {1} union SELECT max(step) as max,max(computed_at) as time from {0}.scalar_real where run_id = {1} union SELECT max(step) as max,max(computed_at) as time from {0}.vector_integer where run_id = {1} union SELECT max(step) as max,max(computed_at) as time from {0}.vector_real where run_id = {1}) as b """.format(self.base.schema, self.id) # logger.debug (request) curs = self.base.performRequest(request, []) item = curs.fetchone() if (item is not None): return item[0], item[1] def getQuantityID(self, name, is_integer=None, is_vector=None): request = """ SELECT id,is_integer,is_vector FROM {0}.quantities WHERE (name) = (%s) """.format(self.base.schema) curs = self.base.performRequest(request, [name]) item = curs.fetchone() if (item is None): raise Exception("unknown quantity \"" + name + "\"") if ((is_integer is not None) and (not is_integer == item[1])): raise Exception("quantity \"" + name + "\" has is_integer = " + str(item[1])) if ((is_vector is not None) and (not is_vector == item[2])): raise Exception("quantity \"" + name + "\" has is_vector = " + str(item[2])) return item[0], item[1], item[2] def getScalarQuantity(self, name, additional_request=None): quantity_id, is_integer, is_vector = self.getQuantityID(name) if is_vector is True: raise Exception("Quantity " + name + " is not scalar") request = """ SELECT step,measurement from {0}.{1} WHERE (run_id,quantity_id) = ({2},{3}) """.format(self.base.schema, "scalar_real" if (is_integer is False) else "scalar_integer", self.id, quantity_id) if (additional_request): request += " and " + " and ".join(additional_request) request += " ORDER BY step" curs = self.base.performRequest(request, [name]) fetch = curs.fetchall() if (not fetch): return None return (np.array([(val[0], val[1]) for val in fetch])) def getVectorQuantity(self, name, step): quantity_id, is_integer, is_vector = self.getQuantityID(name) if (is_vector is False): raise Exception("Quantity " + name + " is not vectorial") request = """ SELECT measurement from {0}.{1} WHERE (run_id,quantity_id,step) = ({2},{3},{4}) """.format(self.base.schema, "vector_real" if (is_integer is False) else "vector_integer", self.id, quantity_id, step) curs = self.base.performRequest(request, [name]) fetch = curs.fetchone() if (fetch): return np.array(fetch[0]) return None def pushVectorQuantity(self, vec, step, name, is_integer, description=None): logger.debug('pushing {0}'.format(name)) try: quantity_id, is_integer, is_vector = self.getQuantityID( name, is_integer=is_integer, is_vector=True) except Exception as e: typecode = "int" if is_integer is False: typecode = "float" typecode += ".vector" quantity_id = self.base.pushQuantity(name, typecode, description) array = [i for i in vec] # if is_integer is True: # array_format = ",".join(["{:d}".format(i) for i in vec]) request = """ INSERT INTO {0}.{1} (run_id,quantity_id,measurement,step) VALUES (%s,%s,%s,%s) """.format(self.base.schema, "vector_real" if is_integer is False else "vector_integer") curs = self.base.performRequest(request, [self.id, quantity_id, array, step]) logger.debug(curs) logger.debug('ready to commit') self.base.commit() logger.debug('commited') def pushScalarQuantity(self, val, step, name, is_integer, description=None): logger.debug('pushing {0}'.format(name)) try: quantity_id, is_integer, is_vector = self.getQuantityID( name, is_integer=is_integer, is_vector=False) except Exception as e: typecode = "int" if is_integer is False: typecode = "float" quantity_id = self.base.pushQuantity(name, typecode, description) request = """ INSERT INTO {0}.{1} (run_id,quantity_id,measurement,step) VALUES (%s,%s,%s,%s) """.format(self.base.schema, "scalar_real" if is_integer is False else "scalar_integer") curs = self.base.performRequest(request, [self.id, quantity_id, val, step]) logger.debug(curs) logger.debug('ready to commit') self.base.commit() logger.debug('commited') def getAllVectorQuantity(self, name): quantity_id, is_integer, is_vector = self.getQuantityID( name, is_vector=True) request = """ SELECT step,measurement from {0}.{1} WHERE (run_id,quantity_id) = ({2},{3}) order by step """.format(self.base.schema, "vector_real" if is_integer is False else "vector_integer", self.id, quantity_id) curs = self.base.performRequest(request, [name]) fetch = curs.fetchall() if (not fetch): return [None, None] matres = np.array([val[1] for val in fetch]) stepres = np.array([val[0] for val in fetch]) return (stepres, matres) def deleteData(self): request, params = ( "DELETE FROM {0}.scalar_real WHERE run_id={1}".format( self.base.schema, self.id), []) self.base.performRequest(request, params) request, params = ( "DELETE FROM {0}.scalar_integer WHERE run_id={1}".format( self.base.schema, self.id), []) self.base.performRequest(request, params) request, params = ( "DELETE FROM {0}.vector_real WHERE run_id={1}".format( self.base.schema, self.id), []) self.base.performRequest(request, params) request, params = ( "DELETE FROM {0}.vector_integer WHERE run_id={1}".format( self.base.schema, self.id), []) self.base.performRequest(request, params) def __init__(self, base): sqlobject.SQLObject.__init__(self, base) self.foreign_keys["job_id"] = "jobs" self.types["machine_name"] = str self.types["run_path"] = str self.allowNull["run_path"] = True self.types["job_id"] = int self.types["nproc"] = int self.types["run_name"] = str self.types["wait_id"] = int self.allowNull["wait_id"] = True self.types["start_time"] = datetime.datetime self.allowNull["start_time"] = True self.types["state"] = str self.allowNull["state"] = True self.execfile = None self.configfiles = [] self.types["exec"] = str ################################################################ def getRunFromScript(): parser = bdparser.BDParser() parser.register_params(params={"run_id": int}) params = parser.parseBDParameters(argv=[]) mybase = base.Base(**params) runSelector = runselector.RunSelector(mybase) run_list = runSelector.selectRuns(params, params) if len(run_list) > 1: raise Exception('internal error') if len(run_list) == 0: raise Exception('internal error') myrun, myjob = run_list[0] # myrun.setEntries(params) return myrun, myjob diff --git a/BlackDynamite/runconfig.py b/BlackDynamite/runconfig.py index a0fb0d6..9986464 100755 --- a/BlackDynamite/runconfig.py +++ b/BlackDynamite/runconfig.py @@ -1,24 +1,26 @@ #!/usr/bin/env python from __future__ import print_function import sqlobject class RunConfig(sqlobject.SQLObject): """ """ + table_name = 'runconfig' + def attachToRun(self, run): self["run_id"] = run.id def addConfigFile(self, configfile): self["configfile_id"] = configfile.id def __init__(self, base): sqlobject.SQLObject.__init__(self, base) self.table_name = "runconfig" self.foreign_keys["run_id"] = "runs" - self.foreign_keys["configfile_id"] = "configfiles" + self.foreign_keys["configfile_id"] = "configfiles" self.types["run_id"] = int self.types["configfile_id"] = int diff --git a/BlackDynamite/selector.py b/BlackDynamite/selector.py index 2a6ab08..54353ad 100644 --- a/BlackDynamite/selector.py +++ b/BlackDynamite/selector.py @@ -1,84 +1,86 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- # -*- py-which-shell: "python"; -*- ################################################################ import copy import constraints as BDcons ################################################################ class Selector(object): def __init__(self, base): self.base = base def buildList(self, curs, sqlobjs): + print (sqlobjs) if not isinstance(sqlobjs, list): sqlobjs = [sqlobjs] col_infos = [] + sqlobjs2 = [] for sqlobj in sqlobjs: if isinstance(sqlobj, type): sqlobj = sqlobj(self.base) - + sqlobjs2.append(sqlobj) col_infos.append(self.base.getColumnProperties(sqlobj)) + sqlobjs = sqlobjs2 + list_objects = [] for entries in curs: + print(entries) objs = [] + offset = 0 + print (sqlobjs) + for index, sqlobj in enumerate(sqlobjs): obj = copy.deepcopy(sqlobj) + print (type(obj)) + print (obj) col_info = col_infos[index] - for i in range(0, col_info): + ncol_info = len(col_info) + for i in range(0, ncol_info): col_name = col_info[i][0] - obj[col_name] = entries[i] - - objs.append(objs) - list_objects.append(objs) + obj[col_name] = entries[offset] + # print(col_name, entries[offset]) + offset += 1 + objs.append(obj) + if len(objs) == 1: + list_objects.append(objs[0]) + else: + list_objects.append(tuple(objs)) - # print len(list_objects) return list_objects def select(self, _types, constraints=None, sort_by=None): const = BDcons.BDconstraints(self.base, constraints) condition, params = const.getMatchingCondition() -################################################################ -#review -#if not isinstance(sort_by, list) and sort_by is not None): -# sort_by = [sort_by] -# -# order_condition = "" -# if (sort_by is not None): -# order_condition = " ORDER BY (" -# for cond in sort_by: -# order_condition += cond + "," -# order_condition = order_condition[:len(order_condition)-1] + ")" -################################################################ - if not isinstance(_types, list): _types = [_types] selected_tables = ['{0}.{1}'.format(self.base.schema, t.table_name) for t in _types] selected_tables = ','.join(selected_tables) request = "SELECT * FROM {0}".format(selected_tables) if condition: request += " WHERE " + condition + # print (sort_by) if sort_by: request += " ORDER BY " + sort_by - # print(request) - # print(params) + print(request) + print(params) curs = self.base.performRequest(request, params) obj_list = self.buildList(curs, _types) return obj_list diff --git a/BlackDynamite/sqlobject.py b/BlackDynamite/sqlobject.py index eb6724f..1508df3 100644 --- a/BlackDynamite/sqlobject.py +++ b/BlackDynamite/sqlobject.py @@ -1,211 +1,212 @@ #!/usr/bin/env python ################################################################ from __future__ import print_function ################################################################ import copy import re import sys import datetime ################################################################ class SQLObject(object): " The generic object related to entries in the database " def __getattr__(self, attr): if attr in self.entries: return self.entries[attr] else: raise AttributeError(attr) def __setattr__(self, attr, value): if attr == 'entries': object.__setattr__(self, attr, value) entries = self.entries if attr in entries: self.__setitem__(attr, value) else: object.__setattr__(self, attr, value) def __str__(self): # output = "" keys = set(self.entries.keys()) if 'id' in self.entries: keys.remove('id') keys = list(keys) if 'id' in self.entries: keys = ['id'] + keys outputs = [] for k in keys: v = self.entries[k] outputs += [k + ": " + str(v)] return "\n".join(outputs) def __getitem__(self, index): return self.entries[index.lower()] def __setitem__(self, index, value): self.entries[index.lower()] = value def commit(self): self.base.connection.commit() def setEntries(self, params): self.prepare() for p, val in params.iteritems(): if (p in self.types): self.entries[p] = val def setFields(self, constraints): for cons in constraints: _regex = "(\w*)\s*=\s*(.*)" match = re.match(_regex, cons) if (not match or (not len(match.groups()) == 2)): print ("malformed assignment: " + cons) sys.exit(-1) key = match.group(1).lower().strip() val = match.group(2) if (key not in self.types): print ("unknown key '{0}'".format(key)) print ("possible keys are:") for k in self.types.keys(): print ("\t" + k) sys.exit(-1) val = self.types[key](val) self.entries[key] = val def __init__(self, base): self.entries = {} self.foreign_keys = {} self.allowNull = {} self.types = {} self.base = base self.operators = {} + self._prepare() def __copy__(self): raise RuntimeError('this code is obsolete: needs to be reviewed') # _cp.types = self.types.copy() # _cp.entries = self.entries.copy() # _cp.id = self.id # _cp.foreign_keys = self.foreign_keys.copy() # _cp.allowNull = self.foreign_keys.copy() # _cp.connection = self.connection.copy() # return _cp def __deepcopy__(self, memo): _cp = type(self)(self.base) _cp.types = copy.deepcopy(self.types.copy(), memo) _cp.entries = copy.deepcopy(self.entries.copy(), memo) # _cp.id = self.id _cp.foreign_keys = copy.deepcopy(self.foreign_keys, memo) _cp.allowNull = copy.deepcopy(self.foreign_keys, memo) _cp.connection = self.base return _cp - def prepare(self): + def _prepare(self): self.base.setObjectItemTypes(self) def insert(self): params = list() # print (self.types) ex_msg = "" for key, value in self.types.items(): if key == "id": continue if ((key not in self.entries) and (key not in self.allowNull)): ex_msg += ( "key '" + key + "' must be given a value before proceeding insertion\n") if (not ex_msg == ""): raise Exception("\n****************\n"+ex_msg+"****************\n") for key, value in self.entries.items(): # print (key) # print (self.types[key]) # print (value) params.append(self.types[key.lower()](value)) request = """ INSERT INTO {0}.{1} ({2}) VALUES ({3}) RETURNING id """.format(self.base.schema, self.table_name, ','.join(self.entries.keys()), ','.join(["%s" for item in params])), params return request def delete(self): request, params = "DELETE FROM {0}.{1} WHERE id={2}".format( self.base.schema, self.table_name, self.id), [] self.base.performRequest(request, params) def update(self): params = list() keys = list() for key, value in self.entries.items(): if (value is None): continue _type = self.types[key] # print (_type) # print (key) # print (type(value)) if (_type == datetime.datetime): continue # _type = str keys.append(key) params.append(_type(value)) request = "UPDATE {0}.{1} SET ({2}) = ({3}) WHERE id = {4}".format( self.base.schema, self.table_name, ','.join(keys), ','.join(["%s" for item in params]), self.id) self.base.performRequest(request, params) def getquoted(self): raise RuntimeError('code needs review') # objs = [sql_adapt(member) for member in self._sql_members()] # for obj in objs: # if hasattr(obj, 'prepare'): # obj.prepare(self._conn) # quoted_objs = [obj.getquoted() for obj in objs] # return '(' + ', '.join(quoted_objs) + ')' def createTableRequest(self): query_string = "CREATE TABLE {0}.{1} ( id SERIAL PRIMARY KEY,".format( self.base.schema, self.table_name) for key, value in self.types.items(): if (value == float): type_string = "DOUBLE PRECISION" elif (value == int): type_string = "INTEGER" elif (value == str): type_string = "TEXT" elif (value == bool): type_string = "BOOLEAN" elif (value == datetime.datetime): type_string = "TIMESTAMP" else: print (value) raise Exception("type '{0}' not handled".format(value)) query_string += "{0} {1} ".format(key, type_string) if (key not in self.allowNull): query_string += " NOT NULL" query_string += "," for key, value in self.foreign_keys.items(): query_string += "FOREIGN KEY ({0}) REFERENCES {1}.{2},".format( key, self.base.schema, value) return query_string[:-1] + ");" def getMatchedObjectList(self): import selector sel = selector.Selector(self.base) return sel.select(self, self) diff --git a/scripts/launchRuns.py b/scripts/launchRuns.py index 8844cc7..8b61ac5 100755 --- a/scripts/launchRuns.py +++ b/scripts/launchRuns.py @@ -1,105 +1,105 @@ #!/usr/bin/env python ################################################################ import BlackDynamite as BD import os import sys import socket from types import ModuleType ################################################################ def main(argv=None): if (type(argv) == str): argv = argv.split() parser = BD.BDParser() parser.register_params( group="launchRuns.py", params={ "outpath": str, "generator": ModuleType, "nruns": int, "state": str, "machine_name": str}, defaults={ "machine_name": socket.gethostname(), "nruns": -1, "generator": "bashCoat"}) parser.help.update({ "nruns": ('Specify the number of runs to launch. ' 'This is useful when we want to launch ' 'only the first run from the stack.'), "generator": "Specify the launcher generator" }) params = parser.parseBDParameters(argv=argv) mybase = BD.Base(**params) if ("outpath" not in params): print('A directory where to create temp files ' 'should be provided. use --outpath ') sys.exit(-1) mydir = os.path.join(params["outpath"], "BD-" + params["study"] + "-runs") if not os.path.exists(mydir): os.makedirs(mydir) os.chdir(mydir) - run_constraints = [] - # job_constraints = [] runSelector = BD.RunSelector(mybase) - if ("run_constraints" in params): - run_constraints = params["run_constraints"] + constraints = [] + if ("constraints" in params): + constraints = params["constraints"] def item_matcher(name, item): return item.lower().lstrip().startswith(name) - if not any([item_matcher("state", item) for item in run_constraints]): - run_constraints.append("state = CREATED") + if not any([item_matcher("state", item) for item in constraints]): + constraints.append("state = CREATED") if not any([item_matcher("machine_name", item) - for item in run_constraints]): - run_constraints.append("machine_name = {0}".format( + for item in constraints]): + constraints.append("machine_name = {0}".format( params["machine_name"])) - run_list = runSelector.selectRuns(run_constraints, params) + run_list = runSelector.selectRuns(params) if (params["nruns"] > 0): run_list = [run_list[i] for i in range(0, min(params["nruns"], len(run_list)))] if (len(run_list) == 0): print("No runs to be launched") for r, j in run_list: print("Dealing with job {0.id}, run {1.id}".format(j, r)) r["run_path"] = os.path.join(mydir, "run-" + str(r.id)) + print (j.types) j.update() if not os.path.exists("run-" + str(r.id)): os.makedirs("run-" + str(r.id)) os.chdir("run-" + str(r.id)) conffiles = r.getConfigFiles() for conf in conffiles: print("create file " + conf["filename"]) f = open(conf["filename"], 'w') f.write(conf["file"]) f.close() print("launch in '" + mydir + "/" + "run-" + str(r.id) + "/'") mymod = params["generator"] print(mymod) mymod.launch(r, params) os.chdir("../") if (params["truerun"] is True): mybase.commit() if __name__ == '__main__': main()