diff --git a/BlackDynamite/run.py b/BlackDynamite/run.py index e503d1c..65c1c30 100755 --- a/BlackDynamite/run.py +++ b/BlackDynamite/run.py @@ -1,435 +1,438 @@ #!/usr/bin/env python from __future__ import print_function ################################################################ import job import runconfig import conffile import sqlobject import sys import re import numpy as np import datetime import bdlogging import logging ################################################################ __all__ = ["Run"] print = bdlogging.invalidPrint logger = logging.getLogger(__name__) ################################################################ class Run(sqlobject.SQLObject): """ """ def getJob(self): return self.base.getJobFromID(self.entries["job_id"]) def start(self): self.entries['state'] = 'START' logger.debug('starting run') self.update() logger.debug('update done') self.base.commit() logger.debug('commited') def finish(self): self.entries['state'] = 'FINISHED' logger.debug('finish run') self.update() logger.debug('update done') self.base.commit() logger.debug('commited') def attachToJob(self, job): self["job_id"] = job.id self.base.insert(self) self.addConfigFile(self.execfile) for cnffile in self.configfiles: self.addConfigFile(cnffile) def getExecFile(self): return self.getUpdatedConfigFile(self.entries["exec"]) def setExecFile(self, file_name): # check if the file is already in the config files for f in self.configfiles: if f.entries["filename"] == file_name: self.execfile = f self.entries["exec"] = f.id return f.id # the file is not in the current config files # so it as to be added conf = conffile.addFile(file_name, self.base, regex_params=regex_params, params=params_list) self.configfiles.append(conf) self.execfile = conf self.entries["exec"] = conf.id return conf.id def listFiles(self): import subprocess p = subprocess.Popen('ssh {0} "ls {1}"'.format( self['machine_name'], self['run_path']), shell=True, stdout=subprocess.PIPE) out = p.stdout.readlines() out = [o.strip() for o in out] return out def getFile(self, filename, outpath='/tmp'): import subprocess import os dest_path = os.path.join( outpath, "BD-" + self.base.schema + "-cache", "run-{0}".format(self.id)) dest_file = os.path.join(dest_path, filename) # logger.warning(dest_path) # logger.warning(dest_file) try: os.makedirs(dest_path) except Exception as e: # logger.error(e) pass if os.path.isfile(dest_file): return dest_file cmd = 'scp {0}:{1}/{2} {3}'.format( self['machine_name'], self['run_path'], filename, dest_file) logger.warning(cmd) p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE) logger.warning(p.stdout.read()) return dest_file def addConfigFiles(self, file_list, regex_params=None): if not type(file_list) == list: file_list = [file_list] self.prepare() params_list = self.types.keys() myjob = job.Job(self.base) myjob.prepare() params_list += myjob.types.keys() # logger.debug (regex_params) file_ids = [f.id for f in self.configfiles] files_to_add = [ conffile.addFile( fname, self.base, regex_params=regex_params, params=params_list) for fname in file_list] for f in files_to_add: if (f.id not in file_ids): self.configfiles.append(f) return self.configfiles def addConfigFile(self, configfile): myrun_config = runconfig.RunConfig(self.base) myrun_config.prepare() myrun_config.attachToRun(self) myrun_config.addConfigFile(configfile) self.base.insert(myrun_config) def getConfigFiles(self): # myjob = job.Job(self.base) # myjob["id"] = self.entries["job_id"] # myjob = self.getMatchedObjectList()[0] runconf = runconfig.RunConfig(self.base) runconf["run_id"] = self.id runconf_list = runconf.getMatchedObjectList() conffiles = [ self.getUpdatedConfigFile(f["configfile_id"]) for f in runconf_list] return conffiles def getConfigFile(self, file_id): - runconf = runconfig.RunConfig(self.base) + # runconf = runconfig.RunConfig(self.base) conf = conffile.ConfFile(self.base) conf.prepare() conf["id"] = file_id conf = conf.getMatchedObjectList()[0] return conf def replaceBlackDynamiteVariables(self, text): myjob = job.Job(self.base) myjob.prepare() myjob["id"] = self.entries["job_id"] myjob = myjob.getMatchedObjectList()[0] for key, val in myjob.entries.iteritems(): tmp = text.replace("__BLACKDYNAMITE__" + key + "__", str(val)) if ((not tmp == text) and val is None): raise Exception("unset job parameter " + key) text = tmp for key, val in self.entries.iteritems(): tmp = text.replace("__BLACKDYNAMITE__" + key + "__", str(val)) if ((not tmp == text) and val is None): logger.debug(self.entries) raise Exception("unset run parameter " + key) text = tmp text = text.replace("__BLACKDYNAMITE__dbhost__", self.base.dbhost) text = text.replace("__BLACKDYNAMITE__study__", self.base.schema) text = text.replace("__BLACKDYNAMITE__run_id__", str(self.id)) return text def getUpdatedConfigFile(self, file_id): conf = self.getConfigFile(file_id) conf["file"] = self.replaceBlackDynamiteVariables(conf["file"]) return conf def listQuantities(self): request = "SELECT id,name FROM {0}.quantities".format(self.base.schema) curs = self.base.performRequest(request) all_quantities = [res[1] for res in curs] return all_quantities def getScalarQuantities(self, names, additional_request=None): request = ("SELECT id,name FROM {0}.quantities " "WHERE (is_vector) = (false)").format( self.base.schema) params = [] if (names): if (not type(names) == list): names = [names] request += " and (" for name in names: similar_op_match = re.match(r"\s*(~)\s*(.*)", name) op = " = " if (similar_op_match): op = " ~ " name = similar_op_match.group(2) request += " name " + op + "%s or" params.append(str(name)) request = request[:len(request)-3] + ")" # logger.debug (request) # logger.debug (params) curs = self.base.performRequest(request, params) quantities = [res[1] for res in curs] if (len(quantities) == 0): logger.debug("No quantity matches " + str(names)) logger.debug("Quantities declared in the database are \n" + "\n".join( [res for res in self.listQuantities()])) sys.exit(-1) return None try: quant_indexes = [quantities.index(n) for n in names] + logger.debug(quant_indexes) quantities = names except: # quant_indexes = None pass # logger.debug (quant) results = [] for key in quantities: q = self.getScalarQuantity(key, additional_request) if (q is not None): results.append([key, q]) logger.debug("got Quantity " + str(key) + " : " + str(q.shape[0]) + " values") return results def getLastStep(self): request = """SELECT max(b.max),max(b.time) from ( SELECT max(step) as max,max(computed_at) as time from {0}.scalar_integer where run_id = {1} union SELECT max(step) as max,max(computed_at) as time from {0}.scalar_real where run_id = {1} union SELECT max(step) as max,max(computed_at) as time from {0}.vector_integer where run_id = {1} union SELECT max(step) as max,max(computed_at) as time from {0}.vector_real where run_id = {1}) as b """.format(self.base.schema, self.id) # logger.debug (request) curs = self.base.performRequest(request, []) item = curs.fetchone() if (item is not None): return item[0], item[1] def getQuantityID(self, name, is_integer=None, is_vector=None): request = """ SELECT id,is_integer,is_vector FROM {0}.quantities WHERE (name) = (%s) """.format(self.base.schema) curs = self.base.performRequest(request, [name]) item = curs.fetchone() if (item is None): raise Exception("unknown quantity \"" + name + "\"") if ((is_integer is not None) and (not is_integer == item[1])): raise Exception("quantity \"" + name + "\" has is_integer = " + str(item[1])) if ((is_vector is not None) and (not is_vector == item[2])): raise Exception("quantity \"" + name + "\" has is_vector = " + str(item[2])) return item[0], item[1], item[2] def getScalarQuantity(self, name, additional_request=None): quantity_id, is_integer, is_vector = self.getQuantityID(name) if is_vector is True: raise Exception("Quantity " + name + " is not scalar") request = """ SELECT step,measurement from {0}.{1} WHERE (run_id,quantity_id) = ({2},{3}) """.format(self.base.schema, "scalar_real" if (is_integer is False) else "scalar_integer", self.id, quantity_id) if (additional_request): request += " and " + " and ".join(additional_request) request += " ORDER BY step" curs = self.base.performRequest(request, [name]) fetch = curs.fetchall() if (not fetch): return None return (np.array([(val[0], val[1]) for val in fetch])) def getVectorQuantity(self, name, step): quantity_id, is_integer, is_vector = self.getQuantityID(name) if (is_vector is False): raise Exception("Quantity " + name + " is not vectorial") request = """ SELECT measurement from {0}.{1} WHERE (run_id,quantity_id,step) = ({2},{3},{4}) """.format(self.base.schema, "vector_real" if (is_integer is False) else "vector_integer", self.id, quantity_id, step) curs = self.base.performRequest(request, [name]) fetch = curs.fetchone() if (fetch): return np.array(fetch[0]) return None def pushVectorQuantity(self, vec, step, name, is_integer, description=None): logger.debug('pushing {0}'.format(name)) try: quantity_id, is_integer, is_vector = self.getQuantityID( name, is_integer=is_integer, is_vector=True) except Exception as e: typecode = "int" if is_integer is False: typecode = "float" typecode += ".vector" quantity_id = self.base.pushQuantity(name, typecode, description) array = [i for i in vec] - if is_integer is True: - array_format = ",".join(["{:d}".format(i) for i in vec]) + # if is_integer is True: + # array_format = ",".join(["{:d}".format(i) for i in vec]) request = """ INSERT INTO {0}.{1} (run_id,quantity_id,measurement,step) VALUES (%s,%s,%s,%s) """.format(self.base.schema, "vector_real" if is_integer is False else "vector_integer") curs = self.base.performRequest(request, [self.id, quantity_id, array, step]) + logger.debug(curs) logger.debug('ready to commit') self.base.commit() logger.debug('commited') def pushScalarQuantity(self, val, step, name, is_integer, description=None): logger.debug('pushing {0}'.format(name)) try: quantity_id, is_integer, is_vector = self.getQuantityID( name, is_integer=is_integer, is_vector=False) except Exception as e: typecode = "int" if is_integer is False: typecode = "float" quantity_id = self.base.pushQuantity(name, typecode, description) request = """ INSERT INTO {0}.{1} (run_id,quantity_id,measurement,step) VALUES (%s,%s,%s,%s) """.format(self.base.schema, "scalar_real" if is_integer is False else "scalar_integer") curs = self.base.performRequest(request, [self.id, quantity_id, val, step]) + logger.debug(curs) logger.debug('ready to commit') self.base.commit() logger.debug('commited') def getAllVectorQuantity(self, name): quantity_id, is_integer, is_vector = self.getQuantityID( name, is_vector=True) request = """ SELECT step,measurement from {0}.{1} WHERE (run_id,quantity_id) = ({2},{3}) order by step """.format(self.base.schema, "vector_real" if is_integer is False else "vector_integer", self.id, quantity_id) curs = self.base.performRequest(request, [name]) fetch = curs.fetchall() if (not fetch): return [None, None] matres = np.array([val[1] for val in fetch]) stepres = np.array([val[0] for val in fetch]) return (stepres, matres) def deleteData(self): request, params = ( "DELETE FROM {0}.scalar_real WHERE run_id={1}".format( self.base.schema, self.id), []) self.base.performRequest(request, params) request, params = ( "DELETE FROM {0}.scalar_integer WHERE run_id={1}".format( self.base.schema, self.id), []) self.base.performRequest(request, params) request, params = ( "DELETE FROM {0}.vector_real WHERE run_id={1}".format( self.base.schema, self.id), []) self.base.performRequest(request, params) request, params = ( "DELETE FROM {0}.vector_integer WHERE run_id={1}".format( self.base.schema, self.id), []) self.base.performRequest(request, params) def __init__(self, base): sqlobject.SQLObject.__init__(self, base) self.table_name = "runs" self.foreign_keys["job_id"] = "jobs" self.types["machine_name"] = str self.types["run_path"] = str self.allowNull["run_path"] = True self.types["job_id"] = int self.types["nproc"] = int self.types["run_name"] = str self.types["wait_id"] = int self.allowNull["wait_id"] = True self.types["start_time"] = datetime.datetime self.allowNull["start_time"] = True self.types["state"] = str self.allowNull["state"] = True self.execfile = None self.configfiles = [] self.types["exec"] = str ################################################################ diff --git a/BlackDynamite/runconfig.py b/BlackDynamite/runconfig.py index 5f868ea..a0fb0d6 100755 --- a/BlackDynamite/runconfig.py +++ b/BlackDynamite/runconfig.py @@ -1,24 +1,24 @@ #!/usr/bin/env python from __future__ import print_function -import job import sqlobject + class RunConfig(sqlobject.SQLObject): """ """ - def attachToRun(self,run): + def attachToRun(self, run): self["run_id"] = run.id - def addConfigFile(self,configfile): + def addConfigFile(self, configfile): self["configfile_id"] = configfile.id - def __init__ (self,base): - sqlobject.SQLObject.__init__(self,base) + def __init__(self, base): + sqlobject.SQLObject.__init__(self, base) self.table_name = "runconfig" self.foreign_keys["run_id"] = "runs" self.foreign_keys["configfile_id"] = "configfiles" self.types["run_id"] = int self.types["configfile_id"] = int