diff --git a/BlackDynamite/base.py b/BlackDynamite/base.py index a09fff8..4216e6d 100755 --- a/BlackDynamite/base.py +++ b/BlackDynamite/base.py @@ -1,418 +1,436 @@ #!/usr/bin/env python ################################################################ from __future__ import print_function ################################################################ import job import os import psycopg2 import re import bdparser import sys import getpass import datetime import run import bdlogging import logging +import jobselector ################################################################ __all__ = ["Base"] print = bdlogging.invalidPrint logger = logging.getLogger(__name__) ################################################################ class Base(object): """ """ def getRunFromID(self, run_id): myrun = run.Run(self) myrun["id"] = run_id myrun.id = run_id run_list = myrun.getMatchedObjectList() if len(run_list) != 1: raise Exception('Unknown run {0}'.format(run_id)) return run_list[0] def getJobFromID(self, job_id): myjob = job.Job(self) myjob["id"] = job_id myjob.id = job_id job_list = myjob.getMatchedObjectList() if len(job_list) != 1: raise Exception('Unknown run {0}'.format(job_id)) return job_list[0] def createBase(self, job_desc, run_desc, quantities={}, **kwargs): # logger.debug (quantities) self.createSchema(kwargs) self.createTable(job_desc) self.createTable(run_desc) self.createGenericTables() for qname, type in quantities.iteritems(): self.pushQuantity(qname, type) if self.truerun: self.commit() def getObject(self, sqlobject): curs = self.connection.cursor() curs.execute("SELECT * FROM {0}.{1} WHERE id = {2}".format( self.schema, sqlobject.table_name, sqlobject.id)) col_info = self.getColumnProperties(sqlobject) line = curs.fetchone() for i in range(0, len(col_info)): col_name = col_info[i][0] sqlobject[col_name] = line[i] def createSchema(self, params={"yes": False}): # create the schema of the simulation curs = self.connection.cursor() curs.execute(("SELECT schema_name FROM information_schema.schemata" " WHERE schema_name = '{0}'").format( self.schema).lower()) if curs.rowcount: validated = bdparser.validate_question( "Are you sure you want to drop the schema named '" + self.schema + "'", params, False) if validated is True: curs.execute("DROP SCHEMA {0} cascade".format(self.schema)) else: logger.debug("creation canceled: exit program") sys.exit(-1) curs.execute("CREATE SCHEMA {0}".format(self.schema)) def createTypeCodes(self): curs = self.connection.cursor() curs.execute("SELECT typname,oid from pg_type;") self.type_code = {} for i in curs: # logger.debug (i[0]) if (i[0] == 'float8'): self.type_code[i[1]] = float if (i[0] == 'text'): self.type_code[i[1]] = str if (i[0] == 'int8'): self.type_code[i[1]] = int if (i[0] == 'int4'): self.type_code[i[1]] = int if (i[0] == 'bool'): self.type_code[i[1]] = bool if (i[0] == 'timestamp'): self.type_code[i[1]] = datetime.datetime def createTable(self, object): request = object.createTableRequest() curs = self.connection.cursor() # logger.debug (request) curs.execute(request) def createGenericTables(self,): sql_script_name = os.path.join(os.path.dirname(__file__), "build_tables.sql") curs = self.connection.cursor() # create generic tables query_list = list() with open(sql_script_name, "r") as fh: for line in fh: query_list.append(re.sub("SCHEMAS_IDENTIFIER", self.schema, line)) curs.execute("\n".join(query_list)) def getColumnProperties(self, sqlobject): curs = self.connection.cursor() curs.execute("SELECT * FROM {0}.{1} LIMIT 0".format( self.schema, sqlobject.table_name)) column_names = [desc[0] for desc in curs.description] column_type = [desc[1] for desc in curs.description] return zip(column_names, column_type) def setObjectItemTypes(self, sqlobject): col_info = self.getColumnProperties(sqlobject) for i, j in col_info: sqlobject.types[i] = self.type_code[j] # logger.debug (str(i) + " " + str(self.type_code[j])) def insert(self, sqlobject): sqlobject.prepare() curs = self.performRequest(*(sqlobject.insert())) sqlobject.id = curs.fetchone()[0] def performRequest(self, request, params=[]): curs = self.connection.cursor() # logger.debug (request) # logger.debug (params) try: curs.execute(request, params) except psycopg2.ProgrammingError as err: raise psycopg2.ProgrammingError( ("While trying to execute the query '{0}' with parameters " + "'{1}', I caught this: '{2}'").format(request, params, err)) return curs def createParameterSpace(self, myjob, entry_nb=0, tmp_job=None, nb_inserted=0): + + """ + This function is a recursive call to generate the points + in the parametric space + + The entries of the jobs are treated one by one + in a recursive manner + """ + keys = myjob.entries.keys() nparam = len(keys) - if (entry_nb == nparam): - if (not tmp_job): - logger.debug("internal error") - sys.exit(-1) - if (len(tmp_job.getMatchedObjectList()) > 0): + + # if this is the case I have done all the + # entries of the job + # it is time to insert it (after some checks) + if entry_nb == nparam: + if tmp_job is None: + raise RuntimeError("internal error") + + # check if already inserted + jselect = jobselector.JobSelector(self) + jobs = jselect.selectJobs(tmp_job, quiet=True) + if len(jobs) > 0: return nb_inserted + + # insert it nb_inserted += 1 logger.info("insert job #{0}".format(nb_inserted) + ': ' + str(tmp_job.entries)) self.insert(tmp_job) return nb_inserted - if not tmp_job: + if tmp_job is None: tmp_job = job.Job(self) + # the key that I am currently treating key = keys[entry_nb] e = myjob[key] - if (type(e) == list): - for typ in e: - tmp_job[key.lower()] = typ - nb_inserted = self.createParameterSpace( - myjob, entry_nb+1, tmp_job, nb_inserted) - else: - tmp_job[key.lower()] = e + # if this is a list I have to create several parametric points + if not isinstance(e, list): + e = [e] + for value in e: + tmp_job[key.lower()] = value nb_inserted = self.createParameterSpace( myjob, entry_nb+1, tmp_job, nb_inserted) if self.truerun: self.commit() return nb_inserted def pushQuantity(self, name, type_code, description=None): """ implemented type_codes: "int" "float" "int.vector" "float.vector" """ if ((type_code == "int") or (type_code == int)): is_integer = True is_vector = False elif (type_code == "int.vector"): is_integer = True is_vector = True elif ((type_code == "float") or (type_code == float)): is_integer = False is_vector = False elif (type_code == "float.vector"): is_integer = False is_vector = True else: raise Exception( "invalid type '{0}' for a quantity".format(type_code)) curs = self.connection.cursor() curs.execute(""" INSERT INTO {0}.quantities (name, is_integer, is_vector, description) VALUES (%s , %s , %s, %s) RETURNING id """.format(self.schema), (name, is_integer, is_vector, description)) item = curs.fetchone() if (item is None): raise Exception("Counld not create quantity \"" + name + "\"") return item[0] def commit(self): logger.debug("commiting changes to base") self.connection.commit() def getUserList(self): curs = self.connection.cursor() curs.execute(""" select tableowner from pg_tables where tablename = 'runs'; """) users = [desc[0] for desc in curs] return users def getStudySize(self, study): curs = self.connection.cursor() try: logger.info(study) curs.execute(""" select sz from (SELECT SUM(pg_total_relation_size(quote_ident(schemaname) || '.' || quote_ident(tablename)))::BIGINT FROM pg_tables WHERE schemaname = '{0}') as sz """.format(study)) size = curs.fetchone()[0] curs.execute(""" select pg_size_pretty(cast({0} as bigint)) """.format(size)) size = curs.fetchone()[0] curs.execute(""" select count({0}.runs.id) from {0}.runs """.format(study)) nruns = curs.fetchone()[0] curs.execute(""" select count({0}.jobs.id) from {0}.jobs """.format(study)) njobs = curs.fetchone()[0] except psycopg2.ProgrammingError: self.connection.rollback() size = '????' return {'size': size, 'nruns': nruns, 'njobs': njobs} def grantAccess(self, study, user): curs = self.connection.cursor() curs.execute(""" grant SELECT on ALL tables in schema {0} to {1}; grant USAGE on SCHEMA {0} to {1}; """.format(study, user)) self.commit() def revokeAccess(self, study, user): curs = self.connection.cursor() curs.execute(""" revoke SELECT on ALL tables in schema {0} from {1}; revoke USAGE on SCHEMA {0} from {1}; """.format(study, user)) self.commit() def getStudyOwner(self, schema): curs = self.connection.cursor() curs.execute(""" select grantor from information_schema.table_privileges where (table_name,table_schema,privilege_type) = ('runs','{0}','SELECT'); """.format(schema)) owners = [desc[0] for desc in curs] return owners[0] def getGrantedUsers(self, schema): curs = self.connection.cursor() curs.execute(""" select grantee from information_schema.table_privileges where (table_name,table_schema,privilege_type) = ('runs','{0}','SELECT'); """.format(schema)) granted_users = [desc[0] for desc in curs] return granted_users def getSchemaList(self, filter_names=True): curs = self.connection.cursor() curs.execute(""" SELECT distinct(table_schema) from information_schema.tables where table_name='runs' """) schemas = [desc[0] for desc in curs] filtered_schemas = [] if filter_names is True: for s in schemas: m = re.match('{0}_(.+)'.format(self.user), s) if m: s = m.group(1) filtered_schemas.append(s) else: filtered_schemas = schemas return filtered_schemas def checkStudy(self, dico): if "study" not in dico: logger.debug("*"*30) logger.debug("Parameter 'study' must be provided at command line") logger.debug("possibilities are:") schemas = self.getSchemaList() for s in schemas: logger.debug("\t" + s) logger.debug("") logger.debug("FATAL => ABORT") logger.debug("*"*30) sys.exit(-1) def close(self): if 'connection' in self.__dict__: logger.debug('closing database session') self.connection.close() del (self.__dict__['connection']) def __del__(self): self.close() def __init__(self, truerun=False, **kwargs): psycopg2_params = ["host", "user", "port", "password"] connection_params = bdparser.filterParams(psycopg2_params, kwargs) connection_params['dbname'] = 'blackdynamite' if ("password" in connection_params and connection_params["password"] == 'ask'): connection_params["password"] = getpass.getpass() logger.debug('connection arguments: {0}'.format(connection_params)) try: self.connection = psycopg2.connect(**connection_params) logger.debug('connected to base') except Exception as e: logger.error( "Connection failed: check your connection settings:\n" + str(e)) sys.exit(-1) assert(isinstance(self.connection, psycopg2._psycopg.connection)) self.dbhost = (kwargs["host"] if "host" in kwargs.keys() else "localhost") if 'user' in kwargs: self.user = kwargs["user"] else: self.user = os.getlogin() if ("should_not_check_study" not in kwargs): self.checkStudy(kwargs) if 'study' in kwargs: if re.match('(.+)_(.+)', kwargs["study"]): self.schema = kwargs["study"] else: self.schema = kwargs["user"] + '_' + kwargs["study"] self.createTypeCodes() self.truerun = truerun if("list_parameters" in kwargs and kwargs["list_parameters"] is True): myjob = job.Job(self) myjob.prepare() message = "" message += ("*"*65 + "\n") message += ("Job parameters:\n") message += ("*"*65 + "\n") params = [str(j[0]) + ": " + str(j[1]) for j in myjob.types.iteritems()] message += ("\n".join(params)+"\n") myrun = run.Run(self) myrun.prepare() message += ("*"*65 + "\n") message += ("Run parameters:\n") message += ("*"*65 + "\n") params = [str(j[0]) + ": " + str(j[1]) for j in myrun.types.iteritems()] message += ("\n".join(params)) logger.info("\n{0}".format(message)) sys.exit(0) ################################################################ if __name__ == "__main__": connection = psycopg2.connect(host="localhost") job_description = job.Job(dict(hono=int, lulu=float, toto=str)) base = Base("honoluluSchema", connection, job_description) base.create() connection.commit() base.pushJob(dict(hono=12, lulu=24.2, toto="toto")) base.pushQuantity("ekin", "float") connection.commit() diff --git a/BlackDynamite/conffile.py b/BlackDynamite/conffile.py index 477e4c1..e140c5e 100755 --- a/BlackDynamite/conffile.py +++ b/BlackDynamite/conffile.py @@ -1,77 +1,80 @@ #!/usr/bin/env python from __future__ import print_function ################################################################ import sqlobject import re import os +import selector ################################################################ __all__ = ["ConfFile", "addFile"] ################################################################ class ConfFile(sqlobject.SQLObject): """ """ + table_name = "configfiles" + def addFile(self, filename, params=None, regex_params=None, content=None): print("adding file " + filename) self.prepare() self.entries["filename"] = os.path.basename(filename) if (content): self.entries["file"] = content else: self.entries["file"] = open(filename, 'r').read() if regex_params: for p in params: # lowerp = p.lower() rp = "(" + regex_params rp = rp.replace("%p", ")(" + p) rp = rp.replace("%v", ")(.*)") rr = "\\1\\2__BLACKDYNAMITE__" + p + "__" # print (rp) # print (rr) self.entries["file"] = re.sub(rp, rr, self.entries["file"], flags=re.IGNORECASE) # print (self.entries["file"]) - - filelist = self.getMatchedObjectList() + file_select = selector.Selector(self.base) + + filelist = file_select.select(ConfFile, self) if (len(filelist) == 0): tmp_conffile = ConfFile(self.base) tmp_conffile.entries = dict(self.entries) del tmp_conffile.entries['filename'] md5filelist = tmp_conffile.getMatchedObjectList() if len(md5filelist) != 0: import md5 for f in md5filelist: raise Exception(""" There is already another file with same content but different name: this is an impossible situation for BlackDynamite. The file concerned is '{0}' md5:{1} ** If you want keep going, please rename the file before insertion ** """.format(f['filename'], md5.new(f['file']).hexdigest())) self.base.insert(self) elif (len(filelist) == 1): self.entries = filelist[0].entries self.id = filelist[0].id def __init__(self, connection): sqlobject.SQLObject.__init__(self, connection) - self.table_name = "configfiles" self.types["filename"] = str self.types["file"] = str def addFile(filename, base, **kwargs): cnffile = ConfFile(base) cnffile.addFile(filename, **kwargs) return cnffile diff --git a/BlackDynamite/constraints.py b/BlackDynamite/constraints.py index 39a8e1d..e21e23f 100644 --- a/BlackDynamite/constraints.py +++ b/BlackDynamite/constraints.py @@ -1,186 +1,184 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- from __future__ import print_function -# import job -# import run -# -# -# class BDconstraints(object): -# -# "" -# -# def __init__(self, base, constraints): -# -# self.base = base -# if isinstance(constraints, dict): -# -# self.run_constraints = [] -# self.job_constraints = [] -# -# if "run_constraints" in constraints: -# self.run_constraints = constraints["run_constraints"] -# if "job_constraints" in constraints: -# self.job_constraints = constraints["job_constraints"] -# if "run_id" in constraints: -# self.run_constraints = [ -# 'id = {0}'.format(constraints['run_id'])] -# if "job_id" in constraints: -# self.job_constraints = [ -# 'id = {0}'.format(constraints['job_id'])] -# -# def setConstraint(self, sqlobject): -# # print (constraints) -# for cons in constraints: -# _regex = "\s*(\w*)\s*((?:\=~)|(?:!=)|<|>|=)\s*(.*)" -# match = re.match(_regex, cons) -# # print (match.groups()) -# if (not match or (not len(match.groups()) == 3)): -# print ("malformed constraint: " + cons) -# sys.exit(-1) -# key = match.group(1).lower().strip() -# op = match.group(2) -# if (op == "=~"): -# op = "~" -# # print (op) -# val = match.group(3) -# # print (key) -# # print (op) -# # print (val) -# if (key not in self.types): -# print ("unknown key '{0}'".format(key)) -# print (constraints) -# print ("possible keys are:") -# for k in self.types.keys(): -# print ("\t" + k) -# sys.exit(-1) -# # print (val) -# # print (key) -# # print (self.types[key]) -# val = self.types[key](val) -# if (key not in self.entries): -# self.entries[key] = [] -# self.entries[key].append(val) -# if (key not in self.operators): -# self.operators[key] = [] -# self.operators[key].append(op) -# -# def prepare(self): -# -# myjob = job.Job(self.base) -# myjob.prepare() -# myjob.setConstraint(const.job_constraints) -# myrun = run.Run(self.base) -# myrun.prepare() -# myrun.setConstraint(const.run_constraints) -# -# # if ("machine_name" in myrun.entries): -# # if (len(myrun["machine_name"]) > 1): -# # myrun["machine_name"] = myrun["machine_name"][0] -# -# condition_job, params_job = \ -# myjob.makeMatchingCondition(binary_operator) -# condition_run, params_run = \ -# myrun.makeMatchingCondition(binary_operator) -# -# def makeMatchingCondition(self, binary_operator="and"): -# self.prepare() -# condition = "" -# params = [] -# binary_operator = " " + binary_operator + " " -# for key, values in self.entries.items(): -# if (values is None): -# continue -# if (not type(values) == list): -# values = [values] -# for i in range(0, len(values)): -# try: -# if (not isinstance(self.types[key], type)): -# raise Exception("Key " + key -# + " needs to be given a type and not " -# + self.types[key]) -# except KeyError as err: -# raise KeyError( -# "{0}, keys are {1}".format(err, self.types.keys())) -# # print (str(key) + "type:" + str(self.types[key]) + " " -# # + str(values[i])) -# params.append(self.types[key](values[i])) -# op = "=" -# if (key in self.operators): -# op = self.operators[key][i] -# if (condition): -# condition += binary_operator -# if (isinstance(values[i], float) -# and op == "=" and values[i] != 0): -# condition += "@({0}.{1} / %s -1) < 1e-10".format( -# self.table_name, key) -# else: -# condition += self.table_name + "." + key -# condition += " " + op + " " -# condition += " %s " -# -# return condition, params -# -# def joinConditions(self, -# condition_job, condition_run, -# params_job, params_run): -# condition = '' -# if condition_job: -# condition = '(' + condition_job + ')' -# if (condition and condition_run): -# condition += ' and ' -# if (condition_run): -# condition += '(' + condition_run + ')' -# if (condition): -# condition += ' and ' -# condition += " runs.job_id = jobs.id " -# params = params_job + params_run -# return params -# + +import sqlobject +import job +import run +import pyparsing as pp + + +class BDconstraints(object): + + "" + + def __init__(self, base, ref_obj): + + self.base = base + self.constraints = [] + self.ref_obj = ref_obj + if isinstance(ref_obj, type): + self.ref_obj = ref_obj(base) + + + self.ref_obj.prepare() + + def getMatchingCondition(self, constraints): + if isinstance(constraints, dict): + + if "constraints" in constraints: + self.run_constraints = constraints["constraints"] + if "run_id" in constraints: + self.run_constraints = [ + 'runs.id = {0}'.format(constraints['run_id'])] + if "job_id" in constraints: + self.job_constraints = [ + 'jobs.id = {0}'.format(constraints['job_id'])] + + if isinstance(constraints, sqlobject.SQLObject): + sql_obj = constraints + for k, v in sql_obj.entries.iteritems(): + if isinstance(v, str): + self.constraints.append('({0}.{1} = {2})'.format( + sql_obj.table_name, k, v)) + + self.constraints = ' and '.join(self.constraints) + return self.constraints + + def makeConstraint(self, sqlobject): + # print (constraints) + for cons in constraints: + _regex = "\s*(\w*)\s*((?:\=~)|(?:!=)|<|>|=)\s*(.*)" + match = re.match(_regex, cons) + # print (match.groups()) + if (not match or (not len(match.groups()) == 3)): + print ("malformed constraint: " + cons) + sys.exit(-1) + key = match.group(1).lower().strip() + op = match.group(2) + if (op == "=~"): + op = "~" + # print (op) + val = match.group(3) + # print (key) + # print (op) + # print (val) + if (key not in self.types): + print ("unknown key '{0}'".format(key)) + print (constraints) + print ("possible keys are:") + for k in self.types.keys(): + print ("\t" + k) + sys.exit(-1) + # print (val) + # print (key) + # print (self.types[key]) + val = self.types[key](val) + if (key not in self.entries): + self.entries[key] = [] + self.entries[key].append(val) + if (key not in self.operators): + self.operators[key] = [] + self.operators[key].append(op) + + def makeMatchingCondition(self, binary_operator="and"): + self.prepare() + condition = "" + params = [] + binary_operator = " " + binary_operator + " " + for key, values in self.entries.items(): + if (values is None): + continue + if (not type(values) == list): + values = [values] + for i in range(0, len(values)): + try: + if (not isinstance(self.types[key], type)): + raise Exception("Key " + key + + " needs to be given a type and not " + + self.types[key]) + except KeyError as err: + raise KeyError( + "{0}, keys are {1}".format(err, self.types.keys())) + # print (str(key) + "type:" + str(self.types[key]) + " " + # + str(values[i])) + params.append(self.types[key](values[i])) + op = "=" + if (key in self.operators): + op = self.operators[key][i] + if (condition): + condition += binary_operator + if (isinstance(values[i], float) + and op == "=" and values[i] != 0): + condition += "@({0}.{1} / %s -1) < 1e-10".format( + self.table_name, key) + else: + condition += self.table_name + "." + key + condition += " " + op + " " + condition += " %s " + + return condition, params + + def joinConditions(self, + condition_job, condition_run, + params_job, params_run): + condition = '' + if condition_job: + condition = '(' + condition_job + ')' + if (condition and condition_run): + condition += ' and ' + if (condition_run): + condition += '(' + condition_run + ')' + if (condition): + condition += ' and ' + condition += " runs.job_id = jobs.id " + params = params_job + params_run + return params + ################################################################ # try parsing logic expressions ################################################################ -import pyparsing as pp # rule for entry in the sqlobject var = pp.Word(pp.alphas) prefix = (pp.Literal('runs') | pp.Literal('jobs')) + '.' entry = (pp.Optional(prefix) + var).setResultsName('entry') entry.setParseAction(lambda tokens: ''.join(tokens)) # rule to parse the operators operators = [ '+', # addition 2 + 3 5 '-', # subtraction 2 - 3 -1 '*', # multiplication 2 * 3 6 '/', # division (integer division truncates the result) 4 / 2 2 '%', # modulo (remainder) 5 % 4 1 '^', # exponentiation 2.0 ^ 3.0 8 '<', # less than '>', # greater than '<=', # less than or equal to '>=', # greater than or equal to '=', # equal '!=', # not equal '~', # Matches regular expression, case sensitive '~*', # Matches regular expression, case insensitive '!~', # Does not match regular expression, case sensitive '!~*' # Does not match regular expression, case insensitive ] ops = pp.Literal(operators[0]) for o in operators[1:]: ops |= pp.Literal(o) # parse a constraint of the form 'var operator value' and flatten it constraint = pp.Group(entry + ops + pp.Word(pp.alphanums)).setParseAction( lambda tokens: '(' + ' '.join([str(t) for t in tokens[0]]) + ')') separator = pp.Literal(',').setParseAction(lambda tokens: 'and') constraints = (constraint + pp.Optional(pp.OneOrMore(separator + constraint)))\ .setParseAction(lambda tokens: ' '.join(tokens)) hello = "runs.toto > 2, tata = 2, jobs.toto != 3" res = constraints.parseString(hello)[0] print (res) diff --git a/BlackDynamite/job.py b/BlackDynamite/job.py index a6271e8..169193b 100755 --- a/BlackDynamite/job.py +++ b/BlackDynamite/job.py @@ -1,15 +1,17 @@ #!/usr/bin/env python ################################################################ import sqlobject ################################################################ __all__ = ["Job"] ################################################################ class Job(sqlobject.SQLObject): """ """ + table_name = 'jobs' + def __init__(self, base): sqlobject.SQLObject.__init__(self, base) self.table_name = "jobs" diff --git a/BlackDynamite/jobselector.py b/BlackDynamite/jobselector.py index 8036913..e6ee26b 100755 --- a/BlackDynamite/jobselector.py +++ b/BlackDynamite/jobselector.py @@ -1,39 +1,28 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- # -*- py-which-shell: "python"; -*- ################################################################ __all__ = ["JobSelector"] +import constraints as BDcons +import selector import job ################################################################ -class JobSelector(object): +class JobSelector(selector.Selector): """ """ - def selectJobs(self, job_constraints=None, sort_job_by=None): + def selectJobs(self, constraints=None, sort_by=None, quiet=False): - if job_constraints is None: - job_constraints = [] - if (type(job_constraints) == dict): - if ("job_constraints" in job_constraints): - job_constraints = job_constraints["job_constraints"] - else: - job_constraints = [] + job_list = self.select(job.Job, constraints=constraints, + sort_by=sort_by) - myjob = job.Job(self.base) - myjob.prepare() - - myjob.setConstraint(job_constraints) - - order_condition = "" - if (sort_job_by): - order_condition = " ORDER BY " + sort_job_by - job_list = myjob.getMatchedObjectList(order_condition) - if (not job_list): + if not job_list: print ("no jobs found") - print ("Selected jobs are: " + str([j.id for j in job_list])) + if quiet is False: + print ("Selected jobs are: " + str([j.id for j in job_list])) return job_list def __init__(self, base): - self.base = base + selector.Selector.__init__(self, base) diff --git a/BlackDynamite/run.py b/BlackDynamite/run.py index 688b432..8038b1e 100755 --- a/BlackDynamite/run.py +++ b/BlackDynamite/run.py @@ -1,463 +1,464 @@ #!/usr/bin/env python from __future__ import print_function ################################################################ import job import runconfig import conffile import sqlobject import sys import re import numpy as np import datetime import bdlogging import logging import bdparser import base import runselector import subprocess import socket ################################################################ __all__ = ['Run', 'getRunFromScript'] print = bdlogging.invalidPrint logger = logging.getLogger(__name__) ################################################################ class Run(sqlobject.SQLObject): """ """ + table_name = 'runs' + def getJob(self): return self.base.getJobFromID(self.entries["job_id"]) def start(self): self.entries['state'] = 'START' logger.debug('starting run') self.update() logger.debug('update done') self.base.commit() logger.debug('commited') def finish(self): self.entries['state'] = 'FINISHED' logger.debug('finish run') self.update() logger.debug('update done') self.base.commit() logger.debug('commited') def attachToJob(self, job): self["job_id"] = job.id self.base.insert(self) self.addConfigFile(self.execfile) for cnffile in self.configfiles: self.addConfigFile(cnffile) def getExecFile(self): return self.getUpdatedConfigFile(self.entries["exec"]) def setExecFile(self, file_name, **kwargs): # check if the file is already in the config files for f in self.configfiles: if f.entries["filename"] == file_name: self.execfile = f self.entries["exec"] = f.id return f.id # the file is not in the current config files # so it has to be added conf = conffile.addFile(file_name, self.base, **kwargs) self.configfiles.append(conf) self.execfile = conf self.entries["exec"] = conf.id return conf.id def listFiles(self): command = 'ls {0}'.format(self['run_path']) if not self['machine_name'] == socket.gethostname(): command = 'ssh {0} "{1}"'.format(self['machine_name'], command) logger.warning(command) p = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE) out = p.stdout.readlines() out = [o.strip() for o in out] return out def getFile(self, filename, outpath='/tmp'): import subprocess import os dest_path = os.path.join( outpath, "BD-" + self.base.schema + "-cache", "run-{0}".format(self.id)) dest_file = os.path.join(dest_path, filename) if self['machine_name'] == socket.gethostname(): return self.getFullFileName(filename) # logger.warning(dest_path) # logger.warning(dest_file) try: os.makedirs(dest_path) except Exception as e: # logger.error(e) pass if os.path.isfile(dest_file): return dest_file cmd = 'scp {0}:{1} {2}'.format(self['machine_name'], self.getFullFileName(filename), dest_file) logger.warning(cmd) p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE) logger.warning(p.stdout.read()) return dest_file def getFullFileName(self, filename): import os return os.path.join(self['run_path'], filename) def addConfigFiles(self, file_list, regex_params=None): if not type(file_list) == list: file_list = [file_list] self.prepare() params_list = self.types.keys() myjob = job.Job(self.base) myjob.prepare() params_list += myjob.types.keys() # logger.debug (regex_params) file_ids = [f.id for f in self.configfiles] files_to_add = [ conffile.addFile( fname, self.base, regex_params=regex_params, params=params_list) for fname in file_list] for f in files_to_add: if (f.id not in file_ids): self.configfiles.append(f) return self.configfiles def addConfigFile(self, configfile): myrun_config = runconfig.RunConfig(self.base) myrun_config.prepare() myrun_config.attachToRun(self) myrun_config.addConfigFile(configfile) self.base.insert(myrun_config) def getConfigFiles(self): # myjob = job.Job(self.base) # myjob["id"] = self.entries["job_id"] # myjob = self.getMatchedObjectList()[0] runconf = runconfig.RunConfig(self.base) runconf["run_id"] = self.id runconf_list = runconf.getMatchedObjectList() conffiles = [ self.getUpdatedConfigFile(f["configfile_id"]) for f in runconf_list] return conffiles def getConfigFile(self, file_id): # runconf = runconfig.RunConfig(self.base) conf = conffile.ConfFile(self.base) conf.prepare() conf["id"] = file_id conf = conf.getMatchedObjectList()[0] return conf def replaceBlackDynamiteVariables(self, text): myjob = job.Job(self.base) myjob.prepare() myjob["id"] = self.entries["job_id"] myjob = myjob.getMatchedObjectList()[0] for key, val in myjob.entries.iteritems(): tmp = text.replace("__BLACKDYNAMITE__" + key + "__", str(val)) if ((not tmp == text) and val is None): raise Exception("unset job parameter " + key) text = tmp for key, val in self.entries.iteritems(): tmp = text.replace("__BLACKDYNAMITE__" + key + "__", str(val)) if ((not tmp == text) and val is None): logger.debug(self.entries) raise Exception("unset run parameter " + key) text = tmp text = text.replace("__BLACKDYNAMITE__dbhost__", self.base.dbhost) text = text.replace("__BLACKDYNAMITE__study__", self.base.schema) text = text.replace("__BLACKDYNAMITE__run_id__", str(self.id)) return text def getUpdatedConfigFile(self, file_id): conf = self.getConfigFile(file_id) conf["file"] = self.replaceBlackDynamiteVariables(conf["file"]) return conf def listQuantities(self): request = "SELECT id,name FROM {0}.quantities".format(self.base.schema) curs = self.base.performRequest(request) all_quantities = [res[1] for res in curs] return all_quantities def getScalarQuantities(self, names, additional_request=None): request = ("SELECT id,name FROM {0}.quantities " "WHERE (is_vector) = (false)").format( self.base.schema) params = [] if (names): if (not type(names) == list): names = [names] request += " and (" for name in names: similar_op_match = re.match(r"\s*(~)\s*(.*)", name) op = " = " if (similar_op_match): op = " ~ " name = similar_op_match.group(2) request += " name " + op + "%s or" params.append(str(name)) request = request[:len(request)-3] + ")" # logger.debug (request) # logger.debug (params) curs = self.base.performRequest(request, params) quantities = [res[1] for res in curs] if (len(quantities) == 0): logger.debug("No quantity matches " + str(names)) logger.debug("Quantities declared in the database are \n" + "\n".join( [res for res in self.listQuantities()])) sys.exit(-1) return None try: quant_indexes = [quantities.index(n) for n in names] logger.debug(quant_indexes) quantities = names except: # quant_indexes = None pass # logger.debug (quant) results = [] for key in quantities: q = self.getScalarQuantity(key, additional_request) if (q is not None): results.append([key, q]) logger.debug("got Quantity " + str(key) + " : " + str(q.shape[0]) + " values") return results def getLastStep(self): request = """SELECT max(b.max),max(b.time) from ( SELECT max(step) as max,max(computed_at) as time from {0}.scalar_integer where run_id = {1} union SELECT max(step) as max,max(computed_at) as time from {0}.scalar_real where run_id = {1} union SELECT max(step) as max,max(computed_at) as time from {0}.vector_integer where run_id = {1} union SELECT max(step) as max,max(computed_at) as time from {0}.vector_real where run_id = {1}) as b """.format(self.base.schema, self.id) # logger.debug (request) curs = self.base.performRequest(request, []) item = curs.fetchone() if (item is not None): return item[0], item[1] def getQuantityID(self, name, is_integer=None, is_vector=None): request = """ SELECT id,is_integer,is_vector FROM {0}.quantities WHERE (name) = (%s) """.format(self.base.schema) curs = self.base.performRequest(request, [name]) item = curs.fetchone() if (item is None): raise Exception("unknown quantity \"" + name + "\"") if ((is_integer is not None) and (not is_integer == item[1])): raise Exception("quantity \"" + name + "\" has is_integer = " + str(item[1])) if ((is_vector is not None) and (not is_vector == item[2])): raise Exception("quantity \"" + name + "\" has is_vector = " + str(item[2])) return item[0], item[1], item[2] def getScalarQuantity(self, name, additional_request=None): quantity_id, is_integer, is_vector = self.getQuantityID(name) if is_vector is True: raise Exception("Quantity " + name + " is not scalar") request = """ SELECT step,measurement from {0}.{1} WHERE (run_id,quantity_id) = ({2},{3}) """.format(self.base.schema, "scalar_real" if (is_integer is False) else "scalar_integer", self.id, quantity_id) if (additional_request): request += " and " + " and ".join(additional_request) request += " ORDER BY step" curs = self.base.performRequest(request, [name]) fetch = curs.fetchall() if (not fetch): return None return (np.array([(val[0], val[1]) for val in fetch])) def getVectorQuantity(self, name, step): quantity_id, is_integer, is_vector = self.getQuantityID(name) if (is_vector is False): raise Exception("Quantity " + name + " is not vectorial") request = """ SELECT measurement from {0}.{1} WHERE (run_id,quantity_id,step) = ({2},{3},{4}) """.format(self.base.schema, "vector_real" if (is_integer is False) else "vector_integer", self.id, quantity_id, step) curs = self.base.performRequest(request, [name]) fetch = curs.fetchone() if (fetch): return np.array(fetch[0]) return None def pushVectorQuantity(self, vec, step, name, is_integer, description=None): logger.debug('pushing {0}'.format(name)) try: quantity_id, is_integer, is_vector = self.getQuantityID( name, is_integer=is_integer, is_vector=True) except Exception as e: typecode = "int" if is_integer is False: typecode = "float" typecode += ".vector" quantity_id = self.base.pushQuantity(name, typecode, description) array = [i for i in vec] # if is_integer is True: # array_format = ",".join(["{:d}".format(i) for i in vec]) request = """ INSERT INTO {0}.{1} (run_id,quantity_id,measurement,step) VALUES (%s,%s,%s,%s) """.format(self.base.schema, "vector_real" if is_integer is False else "vector_integer") curs = self.base.performRequest(request, [self.id, quantity_id, array, step]) logger.debug(curs) logger.debug('ready to commit') self.base.commit() logger.debug('commited') def pushScalarQuantity(self, val, step, name, is_integer, description=None): logger.debug('pushing {0}'.format(name)) try: quantity_id, is_integer, is_vector = self.getQuantityID( name, is_integer=is_integer, is_vector=False) except Exception as e: typecode = "int" if is_integer is False: typecode = "float" quantity_id = self.base.pushQuantity(name, typecode, description) request = """ INSERT INTO {0}.{1} (run_id,quantity_id,measurement,step) VALUES (%s,%s,%s,%s) """.format(self.base.schema, "scalar_real" if is_integer is False else "scalar_integer") curs = self.base.performRequest(request, [self.id, quantity_id, val, step]) logger.debug(curs) logger.debug('ready to commit') self.base.commit() logger.debug('commited') def getAllVectorQuantity(self, name): quantity_id, is_integer, is_vector = self.getQuantityID( name, is_vector=True) request = """ SELECT step,measurement from {0}.{1} WHERE (run_id,quantity_id) = ({2},{3}) order by step """.format(self.base.schema, "vector_real" if is_integer is False else "vector_integer", self.id, quantity_id) curs = self.base.performRequest(request, [name]) fetch = curs.fetchall() if (not fetch): return [None, None] matres = np.array([val[1] for val in fetch]) stepres = np.array([val[0] for val in fetch]) return (stepres, matres) def deleteData(self): request, params = ( "DELETE FROM {0}.scalar_real WHERE run_id={1}".format( self.base.schema, self.id), []) self.base.performRequest(request, params) request, params = ( "DELETE FROM {0}.scalar_integer WHERE run_id={1}".format( self.base.schema, self.id), []) self.base.performRequest(request, params) request, params = ( "DELETE FROM {0}.vector_real WHERE run_id={1}".format( self.base.schema, self.id), []) self.base.performRequest(request, params) request, params = ( "DELETE FROM {0}.vector_integer WHERE run_id={1}".format( self.base.schema, self.id), []) self.base.performRequest(request, params) def __init__(self, base): sqlobject.SQLObject.__init__(self, base) - self.table_name = "runs" self.foreign_keys["job_id"] = "jobs" self.types["machine_name"] = str self.types["run_path"] = str self.allowNull["run_path"] = True self.types["job_id"] = int self.types["nproc"] = int self.types["run_name"] = str self.types["wait_id"] = int self.allowNull["wait_id"] = True self.types["start_time"] = datetime.datetime self.allowNull["start_time"] = True self.types["state"] = str self.allowNull["state"] = True self.execfile = None self.configfiles = [] self.types["exec"] = str ################################################################ def getRunFromScript(): parser = bdparser.BDParser() parser.register_params(params={"run_id": int}) params = parser.parseBDParameters(argv=[]) mybase = base.Base(**params) runSelector = runselector.RunSelector(mybase) run_list = runSelector.selectRuns(params, params) if len(run_list) > 1: raise Exception('internal error') if len(run_list) == 0: raise Exception('internal error') myrun, myjob = run_list[0] # myrun.setEntries(params) return myrun, myjob diff --git a/BlackDynamite/runselector.py b/BlackDynamite/runselector.py index 3e8ed3f..88a67d9 100755 --- a/BlackDynamite/runselector.py +++ b/BlackDynamite/runselector.py @@ -1,129 +1,113 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- # -*- py-which-shell: "python"; -*- __all__ = ["RunSelector"] import copy # import run # import job import constraints as BDcons ################################################################ class RunSelector(object): """ """ def selectRuns(self, constraints, sort_by=None, quiet=False): request, params = self.selectRunsRequest( constraints, sort_by=sort_by) curs = self.base.performRequest(request, params) run_list = self.buildJoinedList(myjob, myrun, curs) if (not quiet): print("Selected runs are: " + str([r[0].id for r in run_list])) print("Selected jobs are: " + str([r[0]["job_id"] for r in run_list])) return run_list def selectRunsRequest(self, constraints, sort_by=None): const = BDcons.BDconstraints(self.base, constraints) - const.prepare() raise if (not type(sort_by) == list and sort_by is not None): sort_by = [sort_by] order_condition = "" if (sort_by is not None): order_condition = " ORDER BY (" for cond in sort_by: order_condition += cond + "," order_condition = order_condition[:len(order_condition)-1] + ")" # print ("order condition: " + order_condition) request = "SELECT * FROM {0}.jobs,{0}.runs ".format(self.base.schema) request += " WHERE " + condition request += order_condition # print (condition) # print (request) # print (params) raise def buildJoinedList(self, job_object, run_object, curs): coljob_info = self.base.getColumnProperties(job_object) colrun_info = self.base.getColumnProperties(run_object) run_list = [] for entries in curs: jobj = copy.deepcopy(job_object) robj = copy.deepcopy(run_object) ncoljob = len(coljob_info) ncolrun = len(colrun_info) for i in range(0, ncoljob): col_name = coljob_info[i][0] jobj[col_name] = entries[i] jobj.id = jobj["id"] for i in range(0, ncolrun): col_name = colrun_info[i][0] robj[col_name] = entries[i+ncoljob] robj.id = robj["id"] run_list.append([robj, jobj]) return run_list def getObjectList(self, order="id"): self.prepare() request = "SELECT * FROM {0}.{1} ORDER BY ".format( self.base.schema, self.table_name) + order curs = self.base.performRequest(request) return self.buildList(curs) def getMatchedObjectList(self, order_condition=""): condition, params = self.makeMatchingCondition() request = self.makeSelectRequest() # print (condition) # print (order_condition) if (condition): request += "WHERE " + condition if (order_condition): request += order_condition # print (request) # print (params) curs = self.base.performRequest(request, params) return self.buildList(curs) - def buildList(self, curs): - col_info = self.base.getColumnProperties(self) - - list_objects = [] - for entries in curs: - # print (col_info) - obj = copy.deepcopy(self) - for i in range(0, len(col_info)): - col_name = col_info[i][0] - obj[col_name] = entries[i] - obj.id = obj["id"] - # print (col_name + " " + str(entries[i])) - list_objects.append(obj) - # print (list_objects[0].entries) - return list_objects def __init__(self, base): self.base = base