Page MenuHomec4science

run.py
No OneTemporary

File Metadata

Created
Fri, May 31, 19:58
#!/usr/bin/env python
from __future__ import print_function
__all__ = [ "Run" ]
import job
import runconfig
import conffile
import base
import bdparser as bdp
import sqlobject
import sys
import re
import numpy as np
import datetime
################################################################
import bdlogging,logging
print = bdlogging.invalidPrint
logger = logging.getLogger(__name__)
################################################################
class Run(sqlobject.SQLObject):
"""
"""
def start(self):
self.entries['state'] = 'START'
logger.debug ('starting run')
self.update()
logger.debug ('update done')
self.base.commit()
logger.debug ('commited')
def attachToJob(self,job):
self["job_id"] = job.id
self.base.insert(self)
self.addConfigFile(self.execfile)
for cnffile in self.configfiles:
self.addConfigFile(cnffile)
def getExecFile(self):
return self.getUpdatedConfigFile(self.entries["exec"])
def setExecFile(self, file_name):
# check if the file is already in the config files
for f in self.configfiles:
if f.entries["filename"] == file_name:
self.execfile = f
self.entries["exec"] = f.id
return f.id
# the file is not in the current config files so it as to be added
conf = conffile.addFile(file_name, self.base,
regex_params = regex_params,
params = params_list)
self.configfiles.append(conf)
self.execfile = conf
self.entries["exec"] = conf.id
return conf.id
def addConfigFiles(self,file_list,regex_params=None):
if not type(file_list) == list: file_list = [file_list]
self.prepare()
params_list = self.types.keys()
myjob = job.Job(self.base)
myjob.prepare()
params_list += myjob.types.keys()
#logger.debug (regex_params)
file_ids = [f.id for f in self.configfiles]
files_to_add = [conffile.addFile(fname,self.base,
regex_params=regex_params,
params=params_list) for fname in file_list]
for f in files_to_add:
if (f.id not in file_ids):
self.configfiles.append(f)
return self.configfiles
def addConfigFile(self,configfile):
myrun_config = runconfig.RunConfig(self.base)
myrun_config.prepare()
myrun_config.attachToRun(self)
myrun_config.addConfigFile(configfile)
self.base.insert(myrun_config)
def getConfigFiles(self):
# myjob = job.Job(self.base)
# myjob["id"] = self.entries["job_id"]
# myjob = self.getMatchedObjectList()[0]
runconf = runconfig.RunConfig(self.base)
runconf["run_id"] = self.id
runconf_list = runconf.getMatchedObjectList()
conffiles = [self.getUpdatedConfigFile(f["configfile_id"]) for f in runconf_list]
return conffiles
def getConfigFile(self,file_id):
runconf = runconfig.RunConfig(self.base)
conf = conffile.ConfFile(self.base)
conf.prepare()
conf["id"] = file_id
conf = conf.getMatchedObjectList()[0]
return conf
def replaceBlackDynamiteVariables(self,text):
myjob = job.Job(self.base)
myjob.prepare()
myjob["id"] = self.entries["job_id"]
myjob = myjob.getMatchedObjectList()[0]
for key,val in myjob.entries.iteritems():
tmp = text.replace("__BLACKDYNAMITE__" + key + "__",str(val))
if ((not tmp == text) and val is None):
raise Exception( "unset job parameter " + key)
text = tmp
for key,val in self.entries.iteritems():
tmp = text.replace("__BLACKDYNAMITE__" + key + "__",str(val))
if ((not tmp == text) and val is None):
logger.debug(self.entries)
raise Exception( "unset run parameter " + key)
text = tmp
text = text.replace("__BLACKDYNAMITE__dbhost__",self.base.dbhost)
text = text.replace("__BLACKDYNAMITE__study__",self.base.schema)
text = text.replace("__BLACKDYNAMITE__run_id__",str(self.id))
return text
def getUpdatedConfigFile(self,file_id):
conf = self.getConfigFile(file_id)
# myjob = job.Job(self.base)
# myjob.prepare()
# myjob["id"] = self.entries["job_id"]
# myjob = myjob.getMatchedObjectList()[0]
# for key,val in myjob.entries.iteritems():
# tmp = conf["file"].replace("__BLACKDYNAMITE__" + key + "__",str(val))
# if ((not tmp == conf["file"]) and val is None):
# raise Exception( "unset job parameter " + key)
# conf["file"] = tmp
#
# for key,val in self.entries.iteritems():
# tmp = conf["file"].replace("__BLACKDYNAMITE__" + key + "__",str(val))
# if ((not tmp == conf["file"]) and not val):
# raise Exception( "unset run parameter " + key)
# conf["file"] = tmp
#
# conf["file"] = conf["file"].replace("__BLACKDYNAMITE__dbhost__",self.base.dbhost)
# conf["file"] = conf["file"].replace("__BLACKDYNAMITE__study__",self.base.schema)
# conf["file"] = conf["file"].replace("__BLACKDYNAMITE__run_id__",str(self.id))
conf["file"] = self.replaceBlackDynamiteVariables(conf["file"])
return conf
def listQuantities(self):
request = "SELECT id,name FROM {0}.quantities WHERE (is_vector) = (false)".format(self.base.schema)
curs = self.base.performRequest(request)
all_quantities = [res[1] for res in curs]
return all_quantities
def getScalarQuantities(self,names,additional_request=None):
request = "SELECT id,name FROM {0}.quantities WHERE (is_vector) = (false)".format(self.base.schema)
params = []
if (names):
if (not type(names) == list):
names = [names]
request += " and ("
for name in names:
similar_op_match = re.match(r"\s*(~)\s*(.*)", name)
op = " = "
if (similar_op_match):
op = " ~ "
name = similar_op_match.group(2)
request += " name " + op + "%s or"
params.append(str(name))
request = request[:len(request)-3] + ")"
# logger.debug (request)
# logger.debug (params)
curs = self.base.performRequest(request,params)
quantities = [res[1] for res in curs]
if (len(quantities) == 0):
logger.debug ("No quantity matches " + str(names))
logger.debug ("Quantities declared in the database are \n" + "\n".join([res for res in self.listQuantities()]))
sys.exit(-1)
return None
try:
quant_indexes = [quantities.index(n) for n in names]
quantities = names
except:
quant_indexes = None
# logger.debug (quant)
results = []
for key in quantities:
q = self.getScalarQuantity(key,additional_request)
if (q is not None):
results.append([key,q])
logger.debug ("got Quantity " + str(key) + " : " + str(q.shape[0]) + " values")
return results
def getLastStep(self):
request = """SELECT max(b.max),max(b.time) from (
SELECT max(step) as max,max(computed_at) as time from {0}.scalar_integer where run_id = {1}
union
SELECT max(step) as max,max(computed_at) as time from {0}.scalar_real where run_id = {1}
union
SELECT max(step) as max,max(computed_at) as time from {0}.vector_integer where run_id = {1}
union
SELECT max(step) as max,max(computed_at) as time from {0}.vector_real where run_id = {1}) as b""".format(self.base.schema,self.id)
# logger.debug (request)
curs = self.base.performRequest(request,[])
item = curs.fetchone()
if (item is not None):
return item[0],item[1]
def getQuantityID(self,name, is_integer = None, is_vector = None):
request = "SELECT id,is_integer,is_vector FROM {0}.quantities WHERE (name) = (%s)".format(self.base.schema)
curs = self.base.performRequest(request,[name])
item = curs.fetchone()
if (item is None):
raise Exception("unknown quantity \"" + name + "\"")
if ((is_integer is not None) and (not is_integer == item[1])):
raise Exception("quantity \"" + name + "\" has is_integer = " + str(item[1]))
if ((is_vector is not None) and (not is_vector == item[2])):
raise Exception("quantity \"" + name + "\" has is_vector = " + str(item[2]))
return item[0],item[1],item[2]
def getScalarQuantity(self,name,additional_request=None):
quantity_id,is_integer,is_vector = self.getQuantityID(name)
if (is_vector == True):
raise Exception("Quantity " + name + " is not scalar")
request = "SELECT step,measurement from {0}.{1} WHERE (run_id,quantity_id) = ({2},{3})".format(self.base.schema,"scalar_real" if (is_integer == False) else "scalar_integer",self.id,quantity_id)
if (additional_request):
request += " and " + " and ".join(additional_request)
request += " ORDER BY step"
curs = self.base.performRequest(request,[name])
fetch = curs.fetchall()
if (not fetch):
return None
return (np.array([(val[0],val[1]) for val in fetch]))
def getVectorQuantity(self,name, step):
quantity_id,is_integer,is_vector = self.getQuantityID(name)
if (is_vector == False):
raise Exception("Quantity " + name + " is not vectorial")
request = "SELECT measurement from {0}.{1} WHERE (run_id,quantity_id,step) = ({2},{3},{4})".format(self.base.schema,"vector_real" if (is_integer == False) else "vector_integer",self.id,quantity_id,step)
curs = self.base.performRequest(request,[name])
fetch = curs.fetchone()
if (fetch):
return np.array(fetch[0])
return None
def pushVectorQuantity(self,vec,step,name, is_integer, description = None):
logger.debug ('pushing {0}'.format(name))
try:
quantity_id,is_integer,is_vector = self.getQuantityID(name,is_integer = is_integer, is_vector = True)
except Exception as e:
typecode = "int"
if is_integer == False:
typecode = "float"
typecode += ".vector"
quantity_id = self.base.pushQuantity(name,typecode,description)
array = [i for i in vec]
if (is_integer == True):
array_format = ",".join(["{:d}".format(i) for i in vec])
request = "INSERT INTO {0}.{1} (run_id,quantity_id,measurement,step) VALUES (%s,%s,%s,%s)".format(self.base.schema,"vector_real" if (is_integer == False) else "vector_integer")
curs = self.base.performRequest(request,[self.id,quantity_id,array,step])
logger.debug ('ready to commit')
self.base.commit()
logger.debug ('commited')
def pushScalarQuantity(self,val,step, name, is_integer, description = None):
logger.debug ('pushing {0}'.format(name))
try:
quantity_id,is_integer,is_vector = self.getQuantityID(name,is_integer = is_integer, is_vector = False)
except Exception as e:
typecode = "int"
if is_integer == False:
typecode = "float"
quantity_id = self.base.pushQuantity(name,typecode,description)
request = "INSERT INTO {0}.{1} (run_id,quantity_id,measurement,step) VALUES (%s,%s,%s,%s)".format(self.base.schema,"scalar_real" if (is_integer == False) else "scalar_integer")
curs = self.base.performRequest(request,[self.id,quantity_id,val,step])
logger.debug ('ready to commit')
self.base.commit()
logger.debug ('commited')
def getAllVectorQuantity(self,name):
quantity_id,is_integer,is_vector = self.getQuantityID(name,is_vector = True)
request = "SELECT step,measurement from {0}.{1} WHERE (run_id,quantity_id) = ({2},{3}) order by step".format(self.base.schema,"vector_real" if (is_integer == False) else "vector_integer",self.id,quantity_id)
curs = self.base.performRequest(request,[name])
fetch = curs.fetchall()
if (not fetch):
return [None,None]
matres = np.array([val[1] for val in fetch])
stepres = np.array([val[0] for val in fetch])
return (stepres,matres)
def deleteData(self):
request,params = "DELETE FROM {0}.scalar_real WHERE run_id={1}".format(self.base.schema,self.id),[]
self.base.performRequest(request,params)
request,params = "DELETE FROM {0}.scalar_integer WHERE run_id={1}".format(self.base.schema,self.id),[]
self.base.performRequest(request,params)
request,params = "DELETE FROM {0}.vector_real WHERE run_id={1}".format(self.base.schema,self.id),[]
self.base.performRequest(request,params)
request,params = "DELETE FROM {0}.vector_integer WHERE run_id={1}".format(self.base.schema,self.id),[]
self.base.performRequest(request,params)
def __init__ (self,base):
sqlobject.SQLObject.__init__(self,base)
self.table_name = "runs"
self.foreign_keys["job_id"] = "jobs"
self.types["machine_name"] = str
self.types["run_path"] = str
self.allowNull["run_path"] = True
self.types["job_id"] = int
self.types["nproc"] = int
self.types["run_name"] = str
self.types["wait_id"] = int
self.allowNull["wait_id"] = True
self.types["start_time"] = datetime.datetime
self.allowNull["start_time"] = True
self.types["state"] = str
self.allowNull["state"] = True
self.execfile = None
self.configfiles = []
self.types["exec"] = str
################################################################

Event Timeline