diff --git a/src/pusher.cc b/src/pusher.cc index c20add3..218ce36 100644 --- a/src/pusher.cc +++ b/src/pusher.cc @@ -1,217 +1,227 @@ /* author : Guillaume ANCIAUX (anciaux@labri.fr, g.anciaux@laposte.net) author : Till JUNGE */ /* -------------------------------------------------------------------------- */ #include "pusher.hh" #include "sql_pusher.hh" #include "sql_state_updater.hh" #include #include /* -------------------------------------------------------------------------- */ namespace BlackDynamite { Pusher::Pusher() { this->connection_counter++; this->run_id = 0; this->sql_mode = false; } /* -------------------------------------------------------------------------- */ Pusher::~Pusher() { this->connection_counter--; if (this->connection_counter == 0) { delete this->sql_connection; this->sql_connection = NULL; } } /* -------------------------------------------------------------------------- */ inline std::string Pusher::request_increment(std::string keyword, std::string parameter) { if (!keyword.empty()) return keyword + "=" + parameter + " "; return ""; } /* -------------------------------------------------------------------------- */ void Pusher::init() { char * ptr = NULL; ptr = getenv("USER");if (!ptr) FATAL("undefined env var"); std::string user = ptr; ptr = getenv("DBNAME"); std::string dbname; if (!ptr) dbname = user; else dbname = ptr; ptr = getenv("HOST");if (!ptr) FATAL("undefined env var"); std::string host = ptr; ptr = getenv("SCHEMA");if (!ptr) FATAL("undefined env var"); std::string schema = ptr; ptr = getenv("RUN_ID");if (!ptr) FATAL("undefined env var"); UInt run_id = atoi(ptr); // ptr = getenv("HOSTADDR");if (!ptr) FATAL("undefined env var"); // std::string hostaddr = ptr; // ptr = getenv("PORT");if (!ptr) FATAL("undefined env var"); // std::string port = ptr; // ptr = getenv("CONNECT_TIMEOUT");if (!ptr) FATAL("undefined env var"); // std::string connect_timeout = ptr; // ptr = getenv("SLMODE");if (!ptr) FATAL("undefined env var"); // std::string slmode = ptr; // ptr = getenv("SERVICE");if (!ptr) FATAL("undefined env var"); // std::string service = ptr; // init(dbname,user,host,hostaddr,port,connect_timeout,slmode,service); init(dbname,user,host,schema,run_id); } /* -------------------------------------------------------------------------- */ void Pusher::init(const std::string & dbname, const std::string & user, const std::string & host, const std::string & schema, UInt run_id //const std::string & hostaddr, //const std::string & port, //const std::string & connect_timeout, //const std::string & slmode, //const std::string & service ) { std::string options(""); if (this->sql_connection == NULL) { options += request_increment("dbname" , dbname ); options += request_increment("user" , user ); options += request_increment("host" , host ); // options += request_increment("hostaddr" , hostaddr ); // options += request_increment("port" , port ); //options += request_increment("connect_timeout", connect_timeout); //options += request_increment("slmode" , slmode ); //options += request_increment("service" , service ); try { this->sql_connection = new pqxx::connection(options); this->sql_connection->activate(); } catch (std::runtime_error & e) { FATAL ("Database connection failed : " << e.what()); } catch (std::exception & e) { FATAL (e.what()); } catch (...) { FATAL ("Unknown exception during connection to database"); } } this->sql_schema = schema; if (this->sql_schema.empty()) { this->scalar_integer.tablename = "scalar_integer"; this->scalar_real.tablename = "scalar_real"; this->vector_real.tablename = "vector_real"; this->vector_integer.tablename = "vector_integer"; this->quantities_tablename = "quantities"; } else { this->scalar_integer.tablename = this->sql_schema + "." + "scalar_integer"; this->scalar_real.tablename = this->sql_schema + "." + "scalar_real"; this->vector_real.tablename = this->sql_schema + "." + "vector_real"; this->vector_integer.tablename = this->sql_schema + "." + "vector_integer"; this->quantities_tablename = this->sql_schema + ".quantities"; } this->run_id = run_id; if (this->run_id <1) { FATAL("The run id required for the sql dumps has not been specified"); } this->scalar_integer.run_id = this->run_id; this->scalar_real.run_id = this->run_id; this->vector_real.run_id = this->run_id; this->vector_integer.run_id = this->run_id; - this->sql_connection->perform(SqlStateUpdater(this->sql_schema, this->run_id)); + state_updater = new SqlStateUpdater(this->sql_schema, this->run_id); + state_updater->changeState(STARTED); + this->sql_connection->perform(*state_updater); + } + + + /* -------------------------------------------------------------------------- */ + + void Pusher::endRun(){ + state_updater->changeState(ENDED); + this->sql_connection->perform(*state_updater); } /* -------------------------------------------------------------------------- */ void Pusher::push(const int & value, const UInt& quantity_id, const UInt& step) { this->scalar_integer.value = &value; this->scalar_integer.run_id = this->run_id; this->scalar_integer.quantity_id = quantity_id; this->scalar_integer.step = step; this->sql_connection->perform(SqlPusher(this->scalar_integer)); } /* -------------------------------------------------------------------------- */ void Pusher::push(const Real & value, const UInt& quantity_id, const UInt& step) { this->scalar_real.value = &value; this->scalar_real.run_id = this->run_id; this->scalar_real.quantity_id = quantity_id; this->scalar_real.step = step; this->sql_connection->perform(SqlPusher(this->scalar_real)); } /* -------------------------------------------------------------------------- */ void Pusher::push(const std::vector & values, const UInt& quantity_id, const UInt& step) { this->vector_real.value = &values; this->vector_real.run_id = this->run_id; this->vector_real.quantity_id = quantity_id; this->vector_real.step = step; this->sql_connection->perform(SqlPusher(this->vector_real)); } /* -------------------------------------------------------------------------- */ void Pusher::push(const std::vector & values, const UInt& quantity_id, const UInt& step) { this->vector_integer.value = &values; this->vector_integer.run_id = this->run_id; this->vector_integer.quantity_id = quantity_id; this->vector_integer.step = step; this->sql_connection->perform(SqlPusher(this->vector_integer)); } /* -------------------------------------------------------------------------- */ UInt Pusher::getQuantityID(const std::string & name) { UInt return_id = 0; std::stringstream command; command << "SELECT id FROM " << this->quantities_tablename << " WHERE name = '" << pqxx::to_string(name) << "';"; pqxx::transaction transaction(*this->sql_connection); try { pqxx::result result= transaction.exec(command); if (result.size() == 0) { FATAL("There is no quantity named '" << name << " defined in the database. ( The failed query was " << command.str() << ")"); } return_id = result.at(0).at(0).as(UInt(0)); if (return_id == 0) { FATAL("There appears to be a problem, query for quantity id of " << name); } } catch (std::runtime_error & e) { FATAL("Connection failed with " << e.what()); } catch (std::exception & e) { FATAL("exception occured when executing sql request: "<< command.str() << std::endl << e.what()); } catch (...) { FATAL("Unknown error occured while trying to query quantity_id for " << name); } return return_id; } /* -------------------------------------------------------------------------- */ UInt Pusher::connection_counter = 0; pqxx::connection * Pusher::sql_connection = NULL; /* -------------------------------------------------------------------------- */ } diff --git a/src/pusher.hh b/src/pusher.hh index ec22a85..513feb0 100644 --- a/src/pusher.hh +++ b/src/pusher.hh @@ -1,173 +1,185 @@ /* author : Guillaume ANCIAUX (anciaux@labri.fr, g.anciaux@laposte.net) author : Till JUNGE */ /* -------------------------------------------------------------------------- */ #ifndef __BLACK_DYNAMITE_PUSHER_HH__ #define __BLACK_DYNAMITE_PUSHER_HH__ /* -------------------------------------------------------------------------- */ #include #include #include #include /* -------------------------------------------------------------------------- */ typedef unsigned int UInt; typedef double Real; /* -------------------------------------------------------------------------- */ #ifndef FATAL #define FATAL(x) {std::cerr << x << std::endl ; exit(EXIT_FAILURE);} #endif /* -------------------------------------------------------------------------- */ namespace BlackDynamite { class Pusher { /* ------------------------------------------------------------------------ */ /* Typedefs */ /* ------------------------------------------------------------------------ */ // See chapter 10 section "Designing transactor<>-based Applications" in Douglas: "PostgreSQL" (0-672-32756-2 // this object represents a transaction template struct transaction_input { std::string tablename; UInt run_id; UInt quantity_id; UInt step; const type * value; }; // transaction for non scalar types template struct transaction_input { std::string tablename; UInt run_id; UInt quantity_id; UInt step; const std::vector * value; }; // pqxx transaction type typedef pqxx::robusttransaction transaction; // obect to really do the push to database template class SqlPusher; // obect to really do the push to database class SqlStateUpdater; + + enum RunState { + UNDEF = 0, + STARTED = 1, + ENDED = 2 + }; /* ------------------------------------------------------------------------ */ /* Constructors/Destructors */ /* ------------------------------------------------------------------------ */ public: Pusher(); virtual ~Pusher(); /* ------------------------------------------------------------------------ */ /* Methods */ /* ------------------------------------------------------------------------ */ //! initialisation method void init(); //! initialisation method void init(const std::string & dbname, const std::string & user, const std::string & host, const std::string & schema, UInt run_id // const std::string & hostaddr, // const std::string & port, // const std::string & connect_timeout, // const std::string & slmode, // const std::string & service ); + + //! function to be called to update the state of the job at the end of the run + void endRun(); + //! template method that pushes a value associated with a quantity at a particular timestep template inline void push(valuetype value, const std::string& quantity, const UInt& step); //! template method that pushes an int associated with a quantity at a particular timestep void push(const int & value, const UInt& quantity_id, const UInt& step); //! template method that pushes a real associated with a quantity at a particular timestep void push(const Real & value, const UInt& quantity_id, const UInt& step); //! template method that pushes an array of int associated with a quantity at a particular timestep void push(const std::vector & values, const UInt& quantity_id, const UInt& step); //! template method that pushes an array of real associated with a quantity at a particular timestep void push(const std::vector & values, const UInt& quantity_id, const UInt& step); //! return the integer id of a quantity identified with a string UInt getQuantityID(const std::string & name); private: //! simple function to concatenate the keyword and parameter for forging a request inline std::string request_increment(std::string keyword, std::string parameter); + /* ------------------------------------------------------------------------ */ /* Class Members */ /* ------------------------------------------------------------------------ */ protected: UInt run_id; transaction_input scalar_integer; transaction_input scalar_real; transaction_input vector_integer; transaction_input vector_real; //standard postgresql connection parameters // std::string sql_dbname, sql_user, sql_host, sql_password, sql_hostaddr, sql_port, sql_connect_timeout, sql_slmode, sql_service; std::string sql_schema; std::string quantities_tablename; static pqxx::connection * sql_connection; static UInt connection_counter; std::map quantity_ids; bool sql_mode; + SqlStateUpdater * state_updater; }; /* -------------------------------------------------------------------------- */ template inline void Pusher::push(valuetype value, const std::string& quantity, const UInt& step){ if (!this->quantity_ids.count(quantity)) this->quantity_ids[quantity] = getQuantityID(quantity); UInt quantity_id = this->quantity_ids[quantity]; this->push(value, quantity_id, step); } /* -------------------------------------------------------------------------- */ template inline std::string nan_avoider(const num& val) { return pqxx::to_string(val); } /* -------------------------------------------------------------------------- */ template <> inline std::string nan_avoider(const double& val) { if (isnan(val)) { return "'nan'::double precision"; } else { std::stringstream val_rep; val_rep.precision(16); val_rep << std::scientific << val; return val_rep.str(); } } /* -------------------------------------------------------------------------- */ } #endif /* __BLACK_DYNAMITE_PUSHER_HH__ */ diff --git a/src/sql_state_updater.hh b/src/sql_state_updater.hh index 3466e96..fa36d70 100644 --- a/src/sql_state_updater.hh +++ b/src/sql_state_updater.hh @@ -1,59 +1,86 @@ #ifndef __AKANTU_SQL_STATE_UPDATER_HH__ #define __AKANTU_SQL_STATE_UPDATER_HH__ /* -------------------------------------------------------------------------- */ #include "pusher.hh" #include /* -------------------------------------------------------------------------- */ namespace BlackDynamite { class Pusher::SqlStateUpdater : public pqxx::transactor { /* ------------------------------------------------------------------------ */ /* Constructors/Destructors */ /* ------------------------------------------------------------------------ */ public: SqlStateUpdater(const std::string & tablename, UInt run_id): pqxx::transactor("Update state of table" + tablename), - tablename(tablename), run_id(run_id) + tablename(tablename), run_id(run_id), state(UNDEF) {} /* ------------------------------------------------------------------------ */ /* Accessors */ /* ------------------------------------------------------------------------ */ void operator()(transaction & T); + void changeState(RunState st){ + state = st; + } + protected: + std::string makeRequest(const std::string & field,bool value); + + /* ------------------------------------------------------------------------ */ /* Class Members */ /* ------------------------------------------------------------------------ */ protected: std::string tablename; UInt run_id; - + + RunState state; + }; /* -------------------------------------------------------------------------- */ + std::string Pusher::SqlStateUpdater::makeRequest(const std::string & field,bool value){ + std::stringstream command; + command << "UPDATE " << pqxx::to_string(tablename) << ".runs" + << " SET " << pqxx::to_string(field) + << " = " << pqxx::to_string(value) + << " WHERE id = " << pqxx::to_string(run_id) + << ";"; + return command.str(); + } void Pusher::SqlStateUpdater::operator()(transaction & trans) { - std::stringstream command; - command << "UPDATE " << pqxx::to_string(tablename) << ".runs" - << " SET has_started = true WHERE id = " << pqxx::to_string(run_id) - << ";"; + std::string command; + std::cout << "state is " << state << " " << this << std::endl ; + switch (state){ + case STARTED: command = this->makeRequest("has_started",true);break; + case ENDED: command = this->makeRequest("has_finished",true);break; + default: FATAL("this should not append"); + } + + std::cout << command << std::endl; + // command << "UPDATE " << pqxx::to_string(tablename) << ".runs" + // << " SET has_started = true WHERE id = " << pqxx::to_string(run_id) + // << ";"; try { trans.exec(command); } catch (std::runtime_error & e) { FATAL("Failed to execute query: " - << command.str() << std::endl << " with message: " << e.what()); + << command << std::endl << " with message: " << e.what()); } + std::cout << "state is " << state << " " << this << std::endl ; } - + } #endif /* __AKANTU_SQL_STATE_UPDATER_HH__ */