diff --git a/python/BlackDynamite/run.py b/python/BlackDynamite/run.py index 7124761..9abd6f9 100644 --- a/python/BlackDynamite/run.py +++ b/python/BlackDynamite/run.py @@ -1,320 +1,321 @@ #!/usr/bin/env python from __future__ import print_function import job import runconfig import conffile import sqlobject import base import sys import bdparser as bdp import re import numpy as np import datetime class Run(sqlobject.SQLObject): """ """ def attachToJob(self,job): self["job_id"] = job.id self.base.insert(self) self.addConfigFile(self.execfile) for cnffile in self.configfiles: self.addConfigFile(cnffile) def getExecFile(self): return self.getUpdatedConfigFile(self.entries["exec"]) def addConfigFiles(self,file_list,regex_params=None): self.prepare() params_list = self.types.keys() myjob = job.Job(self.base) myjob.prepare() params_list += myjob.types.keys() #print (regex_params) file_ids = [f.id for f in self.configfiles] files_to_add = [conffile.addFile(fname,self.base, regex_params=regex_params, params=params_list) for fname in file_list] for f in files_to_add: if (f.id not in file_ids): self.configfiles.append(f) return self.configfiles def addConfigFile(self,configfile): myrun_config = runconfig.RunConfig(self.base) myrun_config.prepare() myrun_config.attachToRun(self) myrun_config.addConfigFile(configfile) self.base.insert(myrun_config) def getConfigFiles(self): # myjob = job.Job(self.base) # myjob["id"] = self.entries["job_id"] # myjob = self.getMatchedObjectList()[0] runconf = runconfig.RunConfig(self.base) runconf["run_id"] = self.id runconf_list = runconf.getMatchedObjectList() conffiles = [self.getUpdatedConfigFile(f["configfile_id"]) for f in runconf_list] return conffiles def getConfigFile(self,file_id): runconf = runconfig.RunConfig(self.base) conf = conffile.ConfFile(self.base) conf.prepare() conf["id"] = file_id conf = conf.getMatchedObjectList()[0] return conf def replaceBlackDynamiteVariables(self,text): myjob = job.Job(self.base) myjob.prepare() myjob["id"] = self.entries["job_id"] myjob = myjob.getMatchedObjectList()[0] for key,val in myjob.entries.iteritems(): tmp = text.replace("__BLACKDYNAMITE__" + key + "__",str(val)) if ((not tmp == text) and val is None): raise Exception( "unset job parameter " + key) text = tmp for key,val in self.entries.iteritems(): tmp = text.replace("__BLACKDYNAMITE__" + key + "__",str(val)) if ((not tmp == text) and val is None): + print(self.entries) raise Exception( "unset run parameter " + key) text = tmp text = text.replace("__BLACKDYNAMITE__dbhost__",self.base.dbhost) text = text.replace("__BLACKDYNAMITE__study__",self.base.schema) text = text.replace("__BLACKDYNAMITE__run_id__",str(self.id)) return text def getUpdatedConfigFile(self,file_id): conf = self.getConfigFile(file_id) # myjob = job.Job(self.base) # myjob.prepare() # myjob["id"] = self.entries["job_id"] # myjob = myjob.getMatchedObjectList()[0] # for key,val in myjob.entries.iteritems(): # tmp = conf["file"].replace("__BLACKDYNAMITE__" + key + "__",str(val)) # if ((not tmp == conf["file"]) and val is None): # raise Exception( "unset job parameter " + key) # conf["file"] = tmp # # for key,val in self.entries.iteritems(): # tmp = conf["file"].replace("__BLACKDYNAMITE__" + key + "__",str(val)) # if ((not tmp == conf["file"]) and not val): # raise Exception( "unset run parameter " + key) # conf["file"] = tmp # # conf["file"] = conf["file"].replace("__BLACKDYNAMITE__dbhost__",self.base.dbhost) # conf["file"] = conf["file"].replace("__BLACKDYNAMITE__study__",self.base.schema) # conf["file"] = conf["file"].replace("__BLACKDYNAMITE__run_id__",str(self.id)) conf["file"] = self.replaceBlackDynamiteVariables(conf["file"]) return conf def listQuantities(self): request = "SELECT id,name FROM {0}.quantities WHERE (is_vector) = (false)".format(self.base.schema) curs = self.base.performRequest(request) all_quantities = [res[1] for res in curs] return all_quantities def getScalarQuantities(self,names,additional_request=None): request = "SELECT id,name FROM {0}.quantities WHERE (is_vector) = (false)".format(self.base.schema) params = [] if (names): if (not type(names) == list): names = [names] request += " and (" for name in names: similar_op_match = re.match(r"\s*(~)\s*(.*)", name) op = " = " if (similar_op_match): op = " ~ " name = similar_op_match.group(2) request += " name " + op + "%s or" params.append(str(name)) request = request[:len(request)-3] + ")" # print (request) # print (params) curs = self.base.performRequest(request,params) quantities = [res[1] for res in curs] if (len(quantities) == 0): print ("No quantity matches " + str(names)) print ("Quantities declared in the database are \n" + "\n".join([res for res in self.listQuantities()])) sys.exit(-1) return None try: quant_indexes = [quantities.index(n) for n in names] quantities = names except: quant_indexes = None # print (quant) results = [] for key in quantities: q = self.getScalarQuantity(key,additional_request) if (q is not None): results.append([key,q]) print ("got Quantity " + str(key) + " : " + str(q.shape[0]) + " values") return results def getLastStep(self): request = """SELECT max(b.max),max(b.time) from ( SELECT max(step) as max,max(computed_at) as time from {0}.scalar_integer where run_id = {1} union SELECT max(step) as max,max(computed_at) as time from {0}.scalar_real where run_id = {1} union SELECT max(step) as max,max(computed_at) as time from {0}.vector_integer where run_id = {1} union SELECT max(step) as max,max(computed_at) as time from {0}.vector_real where run_id = {1}) as b""".format(self.base.schema,self.id) # print (request) curs = self.base.performRequest(request,[]) item = curs.fetchone() if (item is not None): return item[0],item[1] def getQuantityID(self,name, is_integer = None, is_vector = None): request = "SELECT id,is_integer,is_vector FROM {0}.quantities WHERE (name) = (%s)".format(self.base.schema) curs = self.base.performRequest(request,[name]) item = curs.fetchone() if (item is None): raise Exception("unknown quantity \"" + name + "\"") if ((is_integer is not None) and (not is_integer == item[1])): raise Exception("quantity \"" + name + "\" has is_integer = " + str(item[1])) if ((is_vector is not None) and (not is_vector == item[2])): raise Exception("quantity \"" + name + "\" has is_vector = " + str(item[2])) return item[0],item[1],item[2] def getScalarQuantity(self,name,additional_request=None): quantity_id,is_integer,is_vector = self.getQuantityID(name) if (is_vector == True): raise Exception("Quantity " + name + " is not scalar") request = "SELECT step,measurement from {0}.{1} WHERE (run_id,quantity_id) = ({2},{3})".format(self.base.schema,"scalar_real" if (is_integer == False) else "scalar_integer",self.id,quantity_id) if (additional_request): request += " and " + " and ".join(additional_request) request += " ORDER BY step" curs = self.base.performRequest(request,[name]) fetch = curs.fetchall() if (not fetch): return None return (np.array([(val[0],val[1]) for val in fetch])) def getVectorQuantity(self,name, step): quantity_id,is_integer,is_vector = self.getQuantityID(name) if (is_vector == False): raise Exception("Quantity " + name + " is not vectorial") request = "SELECT measurement from {0}.{1} WHERE (run_id,quantity_id,step) = ({2},{3},{4})".format(self.base.schema,"vector_real" if (is_integer == False) else "vector_integer",self.id,quantity_id,step) curs = self.base.performRequest(request,[name]) fetch = curs.fetchone() if (fetch): return np.array(fetch[0]) return None def pushVectorQuantity(self,vec,step,name, is_integer, description = None): try: quantity_id,is_integer,is_vector = self.getQuantityID(name,is_integer = is_integer, is_vector = True) except Exception as e: print (e) typecode = "int" if is_integer == False: typecode = "float" typecode += ".vector" quantity_id = self.base.pushQuantity(name,typecode,description) array = [i for i in vec] if (is_integer == True): array_format = ",".join(["{:d}".format(i) for i in vec]) request = "INSERT INTO {0}.{1} (run_id,quantity_id,measurement,step) VALUES (%s,%s,%s,%s)".format(self.base.schema,"vector_real" if (is_integer == False) else "vector_integer") curs = self.base.performRequest(request,[self.id,quantity_id,array,step]) def pushScalarQuantity(self,val,step,name, is_integer, description = None): try: quantity_id,is_integer,is_vector = self.getQuantityID(name,is_integer = is_integer, is_vector = False) except Exception as e: print (e) typecode = "int" if is_integer == False: typecode = "float" quantity_id = self.base.pushQuantity(name,typecode,description) if (is_integer == True): array_format = ",".join(["{:d}".format(i) for i in vec]) request = "INSERT INTO {0}.{1} (run_id,quantity_id,measurement,step) VALUES (%s,%s,%s,%s)".format(self.base.schema,"vector_real" if (is_integer == False) else "vector_integer") curs = self.base.performRequest(request,[self.id,quantity_id,val,step]) def getAllVectorQuantity(self,name): quantity_id,is_integer,is_vector = self.getQuantityID(name,is_vector = True) request = "SELECT step,measurement from {0}.{1} WHERE (run_id,quantity_id) = ({2},{3}) order by step".format(self.base.schema,"vector_real" if (is_integer == False) else "vector_integer",self.id,quantity_id) curs = self.base.performRequest(request,[name]) fetch = curs.fetchall() if (not fetch): return [None,None] matres = np.array([val[1] for val in fetch]) stepres = np.array([val[0] for val in fetch]) return (stepres,matres) def deleteData(self): request,params = "DELETE FROM {0}.scalar_real WHERE run_id={1}".format(self.base.schema,self.id),[] self.base.performRequest(request,params) request,params = "DELETE FROM {0}.scalar_integer WHERE run_id={1}".format(self.base.schema,self.id),[] self.base.performRequest(request,params) request,params = "DELETE FROM {0}.vector_real WHERE run_id={1}".format(self.base.schema,self.id),[] self.base.performRequest(request,params) request,params = "DELETE FROM {0}.vector_integer WHERE run_id={1}".format(self.base.schema,self.id),[] self.base.performRequest(request,params) def __init__ (self,base): sqlobject.SQLObject.__init__(self,base) self.table_name = "runs" self.foreign_keys["job_id"] = "jobs" self.types["machine_name"] = str self.types["run_path"] = str self.allowNull["run_path"] = True self.types["job_id"] = int self.types["nproc"] = int self.types["run_name"] = str self.types["wait_id"] = int self.allowNull["wait_id"] = True self.types["start_time"] = datetime.datetime self.allowNull["start_time"] = True self.types["state"] = str self.allowNull["state"] = True self.execfile = None self.configfiles = [] self.types["exec"] = str ################################################################ diff --git a/python/BlackDynamite/sgeCoat.py b/python/BlackDynamite/sgeCoat.py index cf73575..417fc6e 100755 --- a/python/BlackDynamite/sgeCoat.py +++ b/python/BlackDynamite/sgeCoat.py @@ -1,104 +1,105 @@ #!/usr/bin/env python import run import os,stat import subprocess import re import socket +import shlex admissible_params = {"walltime":str,"email":str,"nproc":int,"sge_option":[str],"module":[str]} #default_params = {"walltime":"00:05:00"} help = {"walltime":"Specify the wall time for the runs", "email":"Specify the email to notify", "nproc":"Force the number of processors and update the run", "sge_option":"Allow to provide additional SGE options", "module":"List of module to load"} def launch(run,params): _exec = run.getExecFile() if ("walltime" not in params): raise Exception("walltime not set for this job {0}".format(run.id)) head = \ """#!/bin/bash #$ -S /bin/sh #$ -cwd #$ -j y #$ -notify #$ -l walltime={0} export BLACKDYNAMITE_HOST=__BLACKDYNAMITE__dbhost__ export BLACKDYNAMITE_SCHEMA=__BLACKDYNAMITE__study__ export BLACKDYNAMITE_RUN_ID=__BLACKDYNAMITE__run_id__ export BLACKDYNAMITE_USER=$USER """.format(params["walltime"]) if ("email" in params): head += "#$ -m eas -M {0}\n".format(params["email"]) sge_head_name = "#$ -N " + "run" + str(run.id) + "-" + run["run_name"] + "\n" sge_head_name = sge_head_name.replace(":","_") head += sge_head_name run["state"] = "SGE submit" if ("nproc" in params): run["nproc"] = params["nproc"] nproc = run["nproc"] if (nproc % 12 == 0 and nproc % 8 == 0): head += "#$ -pe orte* " + str(nproc) + "\n" elif (nproc % 12 == 0): head += "#$ -pe orte12 " + str(nproc) + "\n" elif (nproc % 8 == 0): head += "#$ -pe orte8 " + str(nproc) + "\n" if ("sge_option" in params): for i in params["sge_option"]: head += "#$ {0}\n".format(i) if ("module" in params): for i in params["module"]: head += "module load {0}\n".format(i) run.update() head += """ on_kill() { updateRuns.py --study=$SCHEMA --host=$HOST --run_constraints \"id = $RUN_ID\" --updates \"state = SGE killed\" --truerun exit 0 } on_stop() { updateRuns.py --study=$SCHEMA --host=$HOST --run_constraints \"id = $RUN_ID\" --updates \"state = SGE stopped\" --truerun exit 0 } # Execute function on_die() receiving TERM signal # trap on_stop SIGUSR1 trap on_kill SIGUSR2 """ _exec["file"] = run.replaceBlackDynamiteVariables(head) + _exec["file"] f = open(_exec["filename"], 'w') f.write(_exec["file"]) f.close() # os.chmod(_exec["filename"], stat.S_IRWXU) print ("execute qsub ./" + _exec["filename"]) print ("in dir ") subprocess.call("pwd") if (params["truerun"] == True): ret = subprocess.call("qsub " + _exec["filename"],shell=True) diff --git a/src/pusher.hh b/src/pusher.hh index 6808a15..c249256 100644 --- a/src/pusher.hh +++ b/src/pusher.hh @@ -1,310 +1,324 @@ /* author : Guillaume ANCIAUX (anciaux@labri.fr, g.anciaux@laposte.net) author : Till JUNGE */ /* -------------------------------------------------------------------------- */ #ifndef __BLACK_DYNAMITE_PUSHER_HH__ #define __BLACK_DYNAMITE_PUSHER_HH__ /* -------------------------------------------------------------------------- */ #include #include #include +#include /* -------------------------------------------------------------------------- */ /* -------------------------------------------------------------------------- */ #ifndef FATAL #define FATAL(x) {std::cerr << x << std::endl ; exit(EXIT_FAILURE);} #endif /* -------------------------------------------------------------------------- */ namespace BlackDynamite { typedef unsigned int UInt; typedef double Real; class Pusher { /* ------------------------------------------------------------------------ */ /* Typedefs */ /* ------------------------------------------------------------------------ */ // See chapter 10 section "Designing transactor<>-based Applications" in Douglas: "PostgreSQL" (0-672-32756-2 // this object represents a transaction template struct transaction_input { std::string tablename; UInt run_id; UInt quantity_id; UInt step; const type * value; }; // transaction for non scalar types template struct transaction_input { std::string tablename; UInt run_id; UInt quantity_id; UInt step; const std::vector * value; }; // pqxx transaction type typedef pqxx::robusttransaction transaction; // obect to really do the push to database template class SqlPusher; // obect to really do the push to database class SqlStateUpdater; enum RunState { UNDEF = 0, STARTED = 1, ENDED = 2 }; /* ------------------------------------------------------------------------ */ /* Constructors/Destructors */ /* ------------------------------------------------------------------------ */ public: Pusher(); virtual ~Pusher(); /* ------------------------------------------------------------------------ */ /* Methods */ /* ------------------------------------------------------------------------ */ //! say wether the pusher was initialized or not bool isInitialized(); //! initialisation method void init(); //! initialisation method void init(const std::string & dbname, const std::string & user, const std::string & host, const std::string & schema, UInt run_id // const std::string & hostaddr, // const std::string & port, // const std::string & connect_timeout, // const std::string & slmode, // const std::string & service ); //! function to be called to update the state of the job at the end of the run void endRun(); //! template method that pushes a value associated with a quantity at a particular timestep template inline void push(valuetype value, const std::string& quantity, const UInt& step); //! template method that pushes an int associated with a quantity at a particular timestep void push(const int & value, const UInt& quantity_id, const UInt& step); //! template method that pushes a real associated with a quantity at a particular timestep void push(const Real & value, const UInt& quantity_id, const UInt& step); //! template method that pushes an array of int associated with a quantity at a particular timestep void push(const std::vector & values, const UInt& quantity_id, const UInt& step); //! template method that pushes an array of real associated with a quantity at a particular timestep void push(const std::vector & values, const UInt& quantity_id, const UInt& step); //! template method that pushes a string to a given run column template void push(const type & value, const std::string & field); //! return the integer id of a quantity identified with a string template UInt getQuantityID(const std::string & name); template inline UInt createQuantityID(const std::string & name); template inline std::string createQuantityIDMakeRequest(const std::string & name); private: //! simple function to concatenate the keyword and parameter for forging a request inline std::string request_increment(std::string keyword, std::string parameter); /* ------------------------------------------------------------------------ */ /* Class Members */ /* ------------------------------------------------------------------------ */ protected: UInt run_id; transaction_input scalar_integer; transaction_input scalar_real; transaction_input vector_integer; transaction_input vector_real; //standard postgresql connection parameters // std::string sql_dbname, sql_user, sql_host, sql_password, sql_hostaddr, sql_port, sql_connect_timeout, sql_slmode, sql_service; std::string sql_schema; std::string quantities_tablename; static pqxx::connection * sql_connection; static UInt connection_counter; std::map quantity_ids; bool sql_mode; SqlStateUpdater * state_updater; //! initialisation method bool is_initialized; }; /* -------------------------------------------------------------------------- */ // transaction for non scalar types template <> struct Pusher::transaction_input { std::string tablename; UInt run_id; std::string field_id; std::string value; }; /* -------------------------------------------------------------------------- */ template inline void Pusher::push(valuetype value, const std::string& quantity, const UInt& step){ if (!this->quantity_ids.count(quantity)) this->quantity_ids[quantity] = getQuantityID(quantity); UInt quantity_id = this->quantity_ids[quantity]; this->push(value, quantity_id, step); } /* -------------------------------------------------------------------------- */ template UInt Pusher::getQuantityID(const std::string & name) { UInt return_id = 0; std::stringstream command; command << "SELECT id FROM " << this->quantities_tablename << " WHERE name = '" << pqxx::to_string(name) << "';"; pqxx::transaction transaction(*this->sql_connection); pqxx::result result; try { result = transaction.exec(command); transaction.commit(); } catch (std::runtime_error & e) { FATAL("Connection failed with " << e.what()); } catch (std::exception & e) { FATAL("exception occured when executing sql request: "<< command.str() << std::endl << e.what()); } catch (...) { FATAL("Unknown error occured while trying to query quantity_id for " << name); } if (result.size() == 0) { return this->createQuantityID(name); FATAL("There is no quantity named '" << name << " defined in the database. ( The failed query was " << command.str() << ")"); } return_id = result.at(0).at(0).as(UInt(0)); if (return_id == 0) { FATAL("There appears to be a problem, query for quantity id of " << name); } return return_id; } /* -------------------------------------------------------------------------- */ template<> inline std::string Pusher::createQuantityIDMakeRequest >(const std::string & name) { std::stringstream command; command << "INSERT into " << this->quantities_tablename << " (name,is_integer,is_vector) VALUES ('" << pqxx::to_string(name) << "' ," << pqxx::to_string("true") << "," << pqxx::to_string("true") << ") RETURNING id;"; return command.str(); } /* -------------------------------------------------------------------------- */ template<> inline std::string Pusher::createQuantityIDMakeRequest >(const std::string & name) { std::stringstream command; command << "INSERT into " << this->quantities_tablename << " (name,is_integer,is_vector) VALUES ('" << pqxx::to_string(name) << "' ," << pqxx::to_string("true") << "," << pqxx::to_string("true") << ") RETURNING id;"; return command.str(); } /* -------------------------------------------------------------------------- */ template<> inline std::string Pusher::createQuantityIDMakeRequest(const std::string & name){ std::stringstream command; command << "INSERT into " << this->quantities_tablename << " (name,is_integer,is_vector) VALUES ('" << pqxx::to_string(name) << "' ," << pqxx::to_string("false") << "," << pqxx::to_string("false") << ") RETURNING id;"; return command.str(); } /* -------------------------------------------------------------------------- */ template inline UInt Pusher::createQuantityID(const std::string & name) { std::string command = createQuantityIDMakeRequest(name); std::cerr << "command: " << command << std::endl; pqxx::transaction transaction(*this->sql_connection); pqxx::result result; try { result = transaction.exec(command); transaction.commit(); } catch (std::runtime_error & e) { FATAL("Connection failed with " << e.what()); } catch (std::exception & e) { FATAL("exception occured when executing sql request: "<< command << std::endl << e.what()); } catch (...) { FATAL("Unknown error occured while trying to query quantity_id for " << name); } if (result.size() == 0) { FATAL("Could not insert quantity named '" << name << " ( The failed query was " << command << ")"); } UInt return_id = 0; return_id = result.at(0).at(0).as(UInt(0)); if (return_id == 0) FATAL("There appears to be a problem, query for quantity id of " << name); return return_id; } /* -------------------------------------------------------------------------- */ template inline std::string nan_avoider(const num& val) { return pqxx::to_string(val); } /* -------------------------------------------------------------------------- */ template <> inline std::string nan_avoider(const double& val) { if (val != val) { return "'nan'::double precision"; } else { + /* Postgres has its own range definitions + + */ + double fval = fabs(val); + if (fval <= 1e-307) { + return "0.0::double precision"; + } else if (fval >=1e308) { + if (val > 0.) { + return "'infinity'::double precision"; + } else { + return "'-infinity'::double precision"; + } + } std::stringstream val_rep; val_rep.precision(16); val_rep << std::scientific << val; return val_rep.str(); } } /* -------------------------------------------------------------------------- */ } #endif /* __BLACK_DYNAMITE_PUSHER_HH__ */