diff --git a/bin/enterRun.py b/bin/enterRun.py index 4cbe842..03f0784 100755 --- a/bin/enterRun.py +++ b/bin/enterRun.py @@ -1,58 +1,58 @@ #!/usr/bin/env python import BlackDynamite as BD import subprocess,os,sys ################################################################ parser = BD.BDParser() parser.register_params(group="getRunInfo", params={"run_id":int, "order":str}, help={"run_id":"Select a run_id for switching to it"}) params = parser.parseBDParameters() mybase = BD.Base(**params) if 'run_id' in params: params['run_constraints'] = ['id = {0}'.format(params['run_id'])] try: del params['job_constraints'] except: pass runSelector = BD.RunSelector(mybase) run_list = runSelector.selectRuns(params,params,quiet=True) mybase.close() if (len(run_list) == 0): print ("no run found") sys.exit(1) run,job = run_list[0] run_id = run['id'] separator = '-'*30 print separator print "JOB INFO" print separator print job print separator print "RUN INFO" print separator print run print separator print "LOGGING TO '{0}'".format(run['machine_name']) print separator if run['state'] == 'CREATED': print "Cannot enter run: not yet started" sys.exit(-1) -bashrc_filename = os.path.join('/tmp','bashrc.run{0}'.format(run_id)) +bashrc_filename = os.path.join('/tmp','bashrc.user{0}.study{1}.run{2}'.format(params['user'],params['study'],run_id)) bashrc = open(bashrc_filename,'w') bashrc.write('export PS1="\\u@\\h:<{0}|RUN-{1}> $ "\n'.format(params['study'],run_id)) bashrc.write('cd {0}\n'.format(run['run_path'])) bashrc.write('echo ' + separator) bashrc.close() subprocess.call('scp -q {0} {1}:{0}'.format(bashrc_filename,run['machine_name']),shell=True) command_login = 'ssh -X -A -t {0} "bash --rcfile {1} -i"'.format(run['machine_name'],bashrc_filename) #print command_login subprocess.call(command_login,shell=True) diff --git a/python/BlackDynamite/run.py b/python/BlackDynamite/run.py index 86f5ee0..847005c 100755 --- a/python/BlackDynamite/run.py +++ b/python/BlackDynamite/run.py @@ -1,367 +1,367 @@ #!/usr/bin/env python from __future__ import print_function __all__ = [ "Run" ] import job import runconfig import conffile import base import bdparser as bdp import sqlobject import sys import re import numpy as np import datetime ################################################################ import bdlogging,logging print = bdlogging.invalidPrint logger = logging.getLogger(__name__) ################################################################ class Run(sqlobject.SQLObject): """ """ def getJob(self): return self.base.getJobFromID(self.entries["job_id"]) def start(self): self.entries['state'] = 'START' logger.debug ('starting run') self.update() logger.debug ('update done') self.base.commit() logger.debug ('commited') def finish(self): self.entries['state'] = 'FINISHED' logger.debug ('finish run') self.update() logger.debug ('update done') self.base.commit() logger.debug ('commited') def attachToJob(self,job): self["job_id"] = job.id self.base.insert(self) self.addConfigFile(self.execfile) for cnffile in self.configfiles: self.addConfigFile(cnffile) def getExecFile(self): return self.getUpdatedConfigFile(self.entries["exec"]) def setExecFile(self, file_name): # check if the file is already in the config files for f in self.configfiles: if f.entries["filename"] == file_name: self.execfile = f self.entries["exec"] = f.id return f.id # the file is not in the current config files so it as to be added conf = conffile.addFile(file_name, self.base, regex_params = regex_params, params = params_list) self.configfiles.append(conf) self.execfile = conf self.entries["exec"] = conf.id return conf.id def addConfigFiles(self,file_list,regex_params=None): if not type(file_list) == list: file_list = [file_list] self.prepare() params_list = self.types.keys() myjob = job.Job(self.base) myjob.prepare() params_list += myjob.types.keys() #logger.debug (regex_params) file_ids = [f.id for f in self.configfiles] files_to_add = [conffile.addFile(fname,self.base, regex_params=regex_params, params=params_list) for fname in file_list] for f in files_to_add: if (f.id not in file_ids): self.configfiles.append(f) return self.configfiles def addConfigFile(self,configfile): myrun_config = runconfig.RunConfig(self.base) myrun_config.prepare() myrun_config.attachToRun(self) myrun_config.addConfigFile(configfile) self.base.insert(myrun_config) def getConfigFiles(self): # myjob = job.Job(self.base) # myjob["id"] = self.entries["job_id"] # myjob = self.getMatchedObjectList()[0] runconf = runconfig.RunConfig(self.base) runconf["run_id"] = self.id runconf_list = runconf.getMatchedObjectList() conffiles = [self.getUpdatedConfigFile(f["configfile_id"]) for f in runconf_list] return conffiles def getConfigFile(self,file_id): runconf = runconfig.RunConfig(self.base) conf = conffile.ConfFile(self.base) conf.prepare() conf["id"] = file_id conf = conf.getMatchedObjectList()[0] return conf def replaceBlackDynamiteVariables(self,text): myjob = job.Job(self.base) myjob.prepare() myjob["id"] = self.entries["job_id"] myjob = myjob.getMatchedObjectList()[0] for key,val in myjob.entries.iteritems(): tmp = text.replace("__BLACKDYNAMITE__" + key + "__",str(val)) if ((not tmp == text) and val is None): raise Exception( "unset job parameter " + key) text = tmp for key,val in self.entries.iteritems(): tmp = text.replace("__BLACKDYNAMITE__" + key + "__",str(val)) if ((not tmp == text) and val is None): logger.debug(self.entries) raise Exception( "unset run parameter " + key) text = tmp text = text.replace("__BLACKDYNAMITE__dbhost__",self.base.dbhost) text = text.replace("__BLACKDYNAMITE__study__",self.base.schema) text = text.replace("__BLACKDYNAMITE__run_id__",str(self.id)) return text def getUpdatedConfigFile(self,file_id): conf = self.getConfigFile(file_id) # myjob = job.Job(self.base) # myjob.prepare() # myjob["id"] = self.entries["job_id"] # myjob = myjob.getMatchedObjectList()[0] # for key,val in myjob.entries.iteritems(): # tmp = conf["file"].replace("__BLACKDYNAMITE__" + key + "__",str(val)) # if ((not tmp == conf["file"]) and val is None): # raise Exception( "unset job parameter " + key) # conf["file"] = tmp # # for key,val in self.entries.iteritems(): # tmp = conf["file"].replace("__BLACKDYNAMITE__" + key + "__",str(val)) # if ((not tmp == conf["file"]) and not val): # raise Exception( "unset run parameter " + key) # conf["file"] = tmp # # conf["file"] = conf["file"].replace("__BLACKDYNAMITE__dbhost__",self.base.dbhost) # conf["file"] = conf["file"].replace("__BLACKDYNAMITE__study__",self.base.schema) # conf["file"] = conf["file"].replace("__BLACKDYNAMITE__run_id__",str(self.id)) conf["file"] = self.replaceBlackDynamiteVariables(conf["file"]) return conf def listQuantities(self): - request = "SELECT id,name FROM {0}.quantities WHERE (is_vector) = (false)".format(self.base.schema) + request = "SELECT id,name FROM {0}.quantities".format(self.base.schema) curs = self.base.performRequest(request) all_quantities = [res[1] for res in curs] return all_quantities def getScalarQuantities(self,names,additional_request=None): request = "SELECT id,name FROM {0}.quantities WHERE (is_vector) = (false)".format(self.base.schema) params = [] if (names): if (not type(names) == list): names = [names] request += " and (" for name in names: similar_op_match = re.match(r"\s*(~)\s*(.*)", name) op = " = " if (similar_op_match): op = " ~ " name = similar_op_match.group(2) request += " name " + op + "%s or" params.append(str(name)) request = request[:len(request)-3] + ")" # logger.debug (request) # logger.debug (params) curs = self.base.performRequest(request,params) quantities = [res[1] for res in curs] if (len(quantities) == 0): logger.debug ("No quantity matches " + str(names)) logger.debug ("Quantities declared in the database are \n" + "\n".join([res for res in self.listQuantities()])) sys.exit(-1) return None try: quant_indexes = [quantities.index(n) for n in names] quantities = names except: quant_indexes = None # logger.debug (quant) results = [] for key in quantities: q = self.getScalarQuantity(key,additional_request) if (q is not None): results.append([key,q]) logger.debug ("got Quantity " + str(key) + " : " + str(q.shape[0]) + " values") return results def getLastStep(self): request = """SELECT max(b.max),max(b.time) from ( SELECT max(step) as max,max(computed_at) as time from {0}.scalar_integer where run_id = {1} union SELECT max(step) as max,max(computed_at) as time from {0}.scalar_real where run_id = {1} union SELECT max(step) as max,max(computed_at) as time from {0}.vector_integer where run_id = {1} union SELECT max(step) as max,max(computed_at) as time from {0}.vector_real where run_id = {1}) as b""".format(self.base.schema,self.id) # logger.debug (request) curs = self.base.performRequest(request,[]) item = curs.fetchone() if (item is not None): return item[0],item[1] def getQuantityID(self,name, is_integer = None, is_vector = None): request = "SELECT id,is_integer,is_vector FROM {0}.quantities WHERE (name) = (%s)".format(self.base.schema) curs = self.base.performRequest(request,[name]) item = curs.fetchone() if (item is None): raise Exception("unknown quantity \"" + name + "\"") if ((is_integer is not None) and (not is_integer == item[1])): raise Exception("quantity \"" + name + "\" has is_integer = " + str(item[1])) if ((is_vector is not None) and (not is_vector == item[2])): raise Exception("quantity \"" + name + "\" has is_vector = " + str(item[2])) return item[0],item[1],item[2] def getScalarQuantity(self,name,additional_request=None): quantity_id,is_integer,is_vector = self.getQuantityID(name) if (is_vector == True): raise Exception("Quantity " + name + " is not scalar") request = "SELECT step,measurement from {0}.{1} WHERE (run_id,quantity_id) = ({2},{3})".format(self.base.schema,"scalar_real" if (is_integer == False) else "scalar_integer",self.id,quantity_id) if (additional_request): request += " and " + " and ".join(additional_request) request += " ORDER BY step" curs = self.base.performRequest(request,[name]) fetch = curs.fetchall() if (not fetch): return None return (np.array([(val[0],val[1]) for val in fetch])) def getVectorQuantity(self,name, step): quantity_id,is_integer,is_vector = self.getQuantityID(name) if (is_vector == False): raise Exception("Quantity " + name + " is not vectorial") request = "SELECT measurement from {0}.{1} WHERE (run_id,quantity_id,step) = ({2},{3},{4})".format(self.base.schema,"vector_real" if (is_integer == False) else "vector_integer",self.id,quantity_id,step) curs = self.base.performRequest(request,[name]) fetch = curs.fetchone() if (fetch): return np.array(fetch[0]) return None def pushVectorQuantity(self,vec,step,name, is_integer, description = None): logger.debug ('pushing {0}'.format(name)) try: quantity_id,is_integer,is_vector = self.getQuantityID(name,is_integer = is_integer, is_vector = True) except Exception as e: typecode = "int" if is_integer == False: typecode = "float" typecode += ".vector" quantity_id = self.base.pushQuantity(name,typecode,description) array = [i for i in vec] if (is_integer == True): array_format = ",".join(["{:d}".format(i) for i in vec]) request = "INSERT INTO {0}.{1} (run_id,quantity_id,measurement,step) VALUES (%s,%s,%s,%s)".format(self.base.schema,"vector_real" if (is_integer == False) else "vector_integer") curs = self.base.performRequest(request,[self.id,quantity_id,array,step]) logger.debug ('ready to commit') self.base.commit() logger.debug ('commited') def pushScalarQuantity(self,val,step, name, is_integer, description = None): logger.debug ('pushing {0}'.format(name)) try: quantity_id,is_integer,is_vector = self.getQuantityID(name,is_integer = is_integer, is_vector = False) except Exception as e: typecode = "int" if is_integer == False: typecode = "float" quantity_id = self.base.pushQuantity(name,typecode,description) request = "INSERT INTO {0}.{1} (run_id,quantity_id,measurement,step) VALUES (%s,%s,%s,%s)".format(self.base.schema,"scalar_real" if (is_integer == False) else "scalar_integer") curs = self.base.performRequest(request,[self.id,quantity_id,val,step]) logger.debug ('ready to commit') self.base.commit() logger.debug ('commited') def getAllVectorQuantity(self,name): quantity_id,is_integer,is_vector = self.getQuantityID(name,is_vector = True) request = "SELECT step,measurement from {0}.{1} WHERE (run_id,quantity_id) = ({2},{3}) order by step".format(self.base.schema,"vector_real" if (is_integer == False) else "vector_integer",self.id,quantity_id) curs = self.base.performRequest(request,[name]) fetch = curs.fetchall() if (not fetch): return [None,None] matres = np.array([val[1] for val in fetch]) stepres = np.array([val[0] for val in fetch]) return (stepres,matres) def deleteData(self): request,params = "DELETE FROM {0}.scalar_real WHERE run_id={1}".format(self.base.schema,self.id),[] self.base.performRequest(request,params) request,params = "DELETE FROM {0}.scalar_integer WHERE run_id={1}".format(self.base.schema,self.id),[] self.base.performRequest(request,params) request,params = "DELETE FROM {0}.vector_real WHERE run_id={1}".format(self.base.schema,self.id),[] self.base.performRequest(request,params) request,params = "DELETE FROM {0}.vector_integer WHERE run_id={1}".format(self.base.schema,self.id),[] self.base.performRequest(request,params) def __init__ (self,base): sqlobject.SQLObject.__init__(self,base) self.table_name = "runs" self.foreign_keys["job_id"] = "jobs" self.types["machine_name"] = str self.types["run_path"] = str self.allowNull["run_path"] = True self.types["job_id"] = int self.types["nproc"] = int self.types["run_name"] = str self.types["wait_id"] = int self.allowNull["wait_id"] = True self.types["start_time"] = datetime.datetime self.allowNull["start_time"] = True self.types["state"] = str self.allowNull["state"] = True self.execfile = None self.configfiles = [] self.types["exec"] = str ################################################################ diff --git a/src/sql_query.hh b/src/sql_query.hh index 3f4ef12..2f501a5 100644 --- a/src/sql_query.hh +++ b/src/sql_query.hh @@ -1,290 +1,290 @@ /* author : Nicolas RICHART author : Till JUNGE */ #ifndef __BLACKDYNAMITE_SQL_QUERY__ #define __BLACKDYNAMITE_SQL_QUERY__ #include #include #include #include #include namespace BlackDynamite { /* -------------------------------------------------------------------------- */ template inline std::string nan_avoider(const num& val) { return pqxx::to_string(val); } /* -------------------------------------------------------------------------- */ template <> inline std::string nan_avoider(const double& val) { if (val != val) { #if PQXX_VERSION_MAJOR < 4 return "('nan')::double precision"; #else return "NaN"; #endif } else { /* Postgres has its own range definitions */ double fval = std::abs(val); if (fval <= 1e-307) { #if PQXX_VERSION_MAJOR < 4 return "(0.0)::double precision"; #else return "0."; #endif } else if (fval >=1e308) { if (val > 0.) { #if PQXX_VERSION_MAJOR < 4 return "('infinity')::double precision"; #else return "Infinity"; #endif } else { #if PQXX_VERSION_MAJOR < 4 return "('-infinity')::double precision"; #else return "-Infinity"; #endif } } std::stringstream val_rep; val_rep.precision(16); val_rep << std::scientific << val; return val_rep.str(); } } /** * Simple query handler as a secured transaction */ template< class transaction = pqxx::transaction > class SQLQuery : public pqxx::transactor { public: SQLQuery(const std::string & sql_query) : sql_query(sql_query) { } protected: SQLQuery() {} public: - virtual void operator() (transaction & trans) throw { + virtual void operator() (transaction & trans) throw (){ UInt retry = 10000; bool success = false; std::string message; std::runtime_error * error; while (retry != 0){ try { this->result = this->execute(trans); retry = 0; success = true; } catch (std::runtime_error & e) { --retry; message = e.what(); error = &e; } } if (!success){ std::cerr << "Failed to execute query after " << retry << " retries " << sql_query << std::endl << " with message: " << message; throw *error; } } protected: virtual pqxx::result execute(pqxx::transaction_base & trans) { return trans.exec(sql_query); } private: std::string sql_query; pqxx::result result; }; /* ------------------------------------------------------------------------ */ /** * Simple prepared query this query rely on sql queries previously prepared, * the SQLQueryPrepareHelper is done to help in this process */ template, typename... Arguments> class SQLPreparedQuery : public SQLQuery { public: SQLPreparedQuery(const std::string & query_name, Arguments... parameters) : query_name(query_name), parameters(std::make_tuple(parameters...)) { } /* ---------------------------------------------------------------------- */ /// execute a prepared function protected: virtual pqxx::result execute(pqxx::transaction_base & trans) { pqxx::prepare::invocation invoc = trans.prepared(query_name); return this->prepared<0, Arguments...>(invoc).exec(); } private: template inline pqxx::prepare::invocation & prepared(pqxx::prepare::invocation & invoc) { return invoc; } template inline pqxx::prepare::invocation & prepared(pqxx::prepare::invocation & invoc) { return this->prepared(this->addParameter(invoc, std::get(this->parameters))); } template inline pqxx::prepare::invocation & addParameter(pqxx::prepare::invocation & invoc, const T & param) { return invoc(param); } template struct NanAvoiderIT { const std::string operator()(typename std::vector::iterator & it) { return nan_avoider(*it); } }; template inline pqxx::prepare::invocation & addParameter(pqxx::prepare::invocation & invoc, const std::vector & param) { std::string join = nan_avoider(param[0]); for(auto it = param.begin() + 1; it != param.end(); ++it) { join += ", "; join += nan_avoider(*it); } return invoc("{" + join + "}"); // return invoc("ARRAY [" + pqxx::separated_list(",", param.begin(), param.end(), NanAvoiderIT*() + "]"); } private: std::string query_name; std::tuple parameters; }; /* ------------------------------------------------------------------------ */ #if PQXX_VERSION_MAJOR < 4 /* ---------------------------------------------------------------------- */ // Convertion functions /// Function to convert a c++ type in a SQL type for the preparation template struct InternalPrepareHelper { }; /// convert std::string to "varchar" template<> struct InternalPrepareHelper { static const pqxx::prepare::declaration & addArgumentType(const pqxx::prepare::declaration & decl) { return decl("varchar", pqxx::prepare::treat_string); } }; /// convert double to "real" template<> struct InternalPrepareHelper { static const pqxx::prepare::declaration & addArgumentType(const pqxx::prepare::declaration & decl) { return decl("double precision"); } }; /// convert std::vector to "double precision[]" template<> struct InternalPrepareHelper> { static const pqxx::prepare::declaration & addArgumentType(const pqxx::prepare::declaration & decl) { return decl("double precision[]"); } }; /// convert std::vector to "interger[]" template<> struct InternalPrepareHelper> { static const pqxx::prepare::declaration & addArgumentType(const pqxx::prepare::declaration & decl) { return decl("integer[]"); } }; /// convert std::vector to "interger[]" template<> struct InternalPrepareHelper> { static const pqxx::prepare::declaration & addArgumentType(const pqxx::prepare::declaration & decl) { return decl("integer[]"); } }; /// convert int to "interger" template<> struct InternalPrepareHelper { static const pqxx::prepare::declaration & addArgumentType(const pqxx::prepare::declaration & decl) { return decl("integer"); } }; /// convert unsigned int to "interger" template<> struct InternalPrepareHelper { static const pqxx::prepare::declaration & addArgumentType(const pqxx::prepare::declaration & decl) { return decl("integer"); } }; /// convert bool to "boolean" template<> struct InternalPrepareHelper { static const pqxx::prepare::declaration & addArgumentType(const pqxx::prepare::declaration & decl) { return decl("boolean"); } }; #endif /** * SQLQueryPrepareHelper */ class SQLQueryPrepareHelper { /* ---------------------------------------------------------------------- */ // preparation function public: /// prepare the request in the connection static void prepare(pqxx::connection_base & connection, const std::string & request_name, const std::string & sql_request) { connection.prepare(request_name, sql_request); } /// prepare the request in the connection template static void prepare(pqxx::connection_base & connection, const std::string & request_name, const std::string & sql_request) { #if PQXX_VERSION_MAJOR < 4 pqxx::prepare::declaration decl = connection.prepare(request_name, sql_request); prepare(decl); #else connection.prepare(request_name, sql_request); #endif } static void unprepare(pqxx::connection_base & connection, const std::string & request_name) { connection.unprepare(request_name); } #if PQXX_VERSION_MAJOR < 4 private: template static const pqxx::prepare::declaration & prepare(const pqxx::prepare::declaration & decl) { return decl; } template static const pqxx::prepare::declaration & prepare(const pqxx::prepare::declaration & decl) { return prepare(InternalPrepareHelper::addArgumentType(decl)); } #endif }; } #endif //__BLACKDYNAMITE_SQL_QUERY__