diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 9cdeb21..c4fe389 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -1,23 +1,23 @@ CMAKE_MINIMUM_REQUIRED(VERSION 2.8) PROJECT(BlackDynamite) # LIBMULTISCALE version number. An even minor number corresponds to releases. SET(BLACKDYNAMITE_MAJOR_VERSION 1) SET(BLACKDYNAMITE_MINOR_VERSION 0) SET(BLACKDYNAMITE_BUILD_VERSION 0) SET(BLACKDYNAMITE_VERSION "${BLACKDYNAMITE_MAJOR_VERSION}.${BLACKDYNAMITE_MINOR_VERSION}.${BLACKDYNAMITE_BUILD_VERSION}" ) #=============================================================================== # Project includes #=============================================================================== set(SOURCES pusher.cc ) -ADD_EXECUTABLE(test ${SOURCES}) -TARGET_LINK_LIBRARIES(test "-lpqxx") \ No newline at end of file +ADD_LIBRARY(BlackDynamite ${SOURCES}) +TARGET_LINK_LIBRARIES(BlackDynamite "-lpqxx") \ No newline at end of file diff --git a/src/build_tables.sql b/src/build_tables.sql index 3a333df..e6b056e 100644 --- a/src/build_tables.sql +++ b/src/build_tables.sql @@ -1,94 +1,93 @@ -- quantities CREATE TABLE SCHEMAS_IDENTIFIER.quantities ( id SERIAL PRIMARY KEY, --id, standard name TEXT NOT NULL, -- human readable name idendifying the quantity - mean_window INTEGER, -- size of the averaging window. NULL corresponds to no averaging (i.e. mean_window=NULL corresponds to mean_window=1) is_integer BOOLEAN NOT NULL, --whether the quantity is an integer quantity is_vector BOOLEAN NOT NULL, --whether the quantity is vectorial or scalar description TEXT , --general description the quantity (like units) - UNIQUE (name, mean_window)); + UNIQUE (name)); -- integer scalar values CREATE TABLE SCHEMAS_IDENTIFIER.scalar_integer ( id SERIAL PRIMARY KEY, --id, standard run_id INTEGER NOT NULL, --referencing SCHEMAS_IDENTIFIER.runs quantity_id INTEGER NOT NULL, --referencing SCHEMAS_IDENTIFIER.quantities measurement INTEGER NOT NULL, -- measured value step INTEGER NOT NULL, --step at which this measurement has been taken computed_at TIMESTAMP NOT NULL, --time when the measurement has been added to the database. this column might be dropped later on FOREIGN KEY (run_id) REFERENCES SCHEMAS_IDENTIFIER.runs, FOREIGN KEY (quantity_id) REFERENCES SCHEMAS_IDENTIFIER.quantities); -- real scalar values CREATE TABLE SCHEMAS_IDENTIFIER.scalar_real ( id SERIAL PRIMARY KEY, --id, standard run_id INTEGER NOT NULL, --referencing SCHEMAS_IDENTIFIER.runs quantity_id INTEGER NOT NULL, --referencing SCHEMAS_IDENTIFIER.quantities measurement DOUBLE PRECISION NOT NULL, -- measured value step INTEGER NOT NULL, --step at which this measurement has been taken computed_at TIMESTAMP NOT NULL, --time when the measurement has been added to the database. this column might be dropped later on FOREIGN KEY (run_id) REFERENCES SCHEMAS_IDENTIFIER.runs, FOREIGN KEY (quantity_id) REFERENCES SCHEMAS_IDENTIFIER.quantities); -- real vector values CREATE TABLE SCHEMAS_IDENTIFIER.vector_real ( id SERIAL PRIMARY KEY, --id, standard run_id INTEGER NOT NULL, --referencing SCHEMAS_IDENTIFIER.runs quantity_id INTEGER NOT NULL, --referencing SCHEMAS_IDENTIFIER.quantities measurement DOUBLE PRECISION[] NOT NULL, -- measured value step INTEGER NOT NULL, --step at which this measurement has been taken computed_at TIMESTAMP NOT NULL, --time when the measurement has been added to the database. this column might be dropped later on FOREIGN KEY (run_id) REFERENCES SCHEMAS_IDENTIFIER.runs, FOREIGN KEY (quantity_id) REFERENCES SCHEMAS_IDENTIFIER.quantities); -- int vector values CREATE TABLE SCHEMAS_IDENTIFIER.vector_integer ( id SERIAL PRIMARY KEY, --id, standard run_id INTEGER NOT NULL, --referencing SCHEMAS_IDENTIFIER.runs quantity_id INTEGER NOT NULL, --referencing SCHEMAS_IDENTIFIER.quantities measurement INTEGER [] NOT NULL, -- measured value step INTEGER NOT NULL, --step at which this measurement has been taken computed_at TIMESTAMP NOT NULL, --time when the measurement has been added to the database. this column might be dropped later on FOREIGN KEY (run_id) REFERENCES SCHEMAS_IDENTIFIER.runs, FOREIGN KEY (quantity_id) REFERENCES SCHEMAS_IDENTIFIER.quantities); -- create functions for the different triggers --create run_inserter CREATE FUNCTION SCHEMAS_IDENTIFIER.run_inserter() RETURNS TRIGGER AS $run_inserter$ BEGIN NEW.has_started := FALSE; NEW.has_finished := FALSE; NEW.run_name := 'run' || NEW.id; RETURN NEW; END; $run_inserter$ LANGUAGE 'PLPGSQL' VOLATILE; CREATE TRIGGER run_inserter BEFORE INSERT ON SCHEMAS_IDENTIFIER.runs FOR EACH ROW EXECUTE PROCEDURE SCHEMAS_IDENTIFIER.run_inserter(); --create measurement inserter CREATE FUNCTION SCHEMAS_IDENTIFIER.measurement_inserter() RETURNS TRIGGER AS $measurement_inserter$ BEGIN NEW.computed_at := CURRENT_TIMESTAMP; RETURN NEW; END; $measurement_inserter$ LANGUAGE 'PLPGSQL' VOLATILE; CREATE TRIGGER int_scal_insert BEFORE INSERT ON SCHEMAS_IDENTIFIER.scalar_integer FOR EACH ROW EXECUTE PROCEDURE SCHEMAS_IDENTIFIER.measurement_inserter(); CREATE TRIGGER int_vect_insert BEFORE INSERT ON SCHEMAS_IDENTIFIER.vector_integer FOR EACH ROW EXECUTE PROCEDURE SCHEMAS_IDENTIFIER.measurement_inserter(); CREATE TRIGGER real_scal_insert BEFORE INSERT ON SCHEMAS_IDENTIFIER.scalar_real FOR EACH ROW EXECUTE PROCEDURE SCHEMAS_IDENTIFIER.measurement_inserter(); CREATE TRIGGER real_vect_insert BEFORE INSERT ON SCHEMAS_IDENTIFIER.vector_real FOR EACH ROW EXECUTE PROCEDURE SCHEMAS_IDENTIFIER.measurement_inserter(); diff --git a/src/pusher.cc b/src/pusher.cc index 764f127..309fa13 100644 --- a/src/pusher.cc +++ b/src/pusher.cc @@ -1,173 +1,209 @@ /* author : Guillaume ANCIAUX (anciaux@labri.fr, g.anciaux@laposte.net) author : Till JUNGE */ /* -------------------------------------------------------------------------- */ #include "pusher.hh" #include "sql_pusher.hh" #include #include /* -------------------------------------------------------------------------- */ namespace BlackDynamite { -Pusher::Pusher() { - this->connection_counter++; - this->run_id = 0; - this->sql_mode = false; -} + Pusher::Pusher() { + this->connection_counter++; + this->run_id = 0; + this->sql_mode = false; + } -/* -------------------------------------------------------------------------- */ + /* -------------------------------------------------------------------------- */ -Pusher::~Pusher() { - this->connection_counter--; - if (this->connection_counter == 0) { - delete this->sql_connection; - this->sql_connection = NULL; + Pusher::~Pusher() { + this->connection_counter--; + if (this->connection_counter == 0) { + delete this->sql_connection; + this->sql_connection = NULL; + } } -} -/* -------------------------------------------------------------------------- */ + /* -------------------------------------------------------------------------- */ inline std::string Pusher::request_increment(std::string keyword, std::string parameter) { if (!keyword.empty()) return keyword + "=" + parameter + " "; return ""; } -/* -------------------------------------------------------------------------- */ + /* -------------------------------------------------------------------------- */ + void Pusher::init() { + char * ptr = NULL; + ptr = getenv("DBNAME");if (!ptr) FATAL("undefined env var"); + std::string dbname = ptr; + ptr = getenv("USER");if (!ptr) FATAL("undefined env var"); + std::string user = ptr; + ptr = getenv("HOST");if (!ptr) FATAL("undefined env var"); + std::string host = ptr; + ptr = getenv("SCHEMA");if (!ptr) FATAL("undefined env var"); + std::string schema = ptr; + ptr = getenv("RUN_ID");if (!ptr) FATAL("undefined env var"); + UInt run_id = atoi(ptr); + + + // ptr = getenv("HOSTADDR");if (!ptr) FATAL("undefined env var"); + // std::string hostaddr = ptr; + // ptr = getenv("PORT");if (!ptr) FATAL("undefined env var"); + // std::string port = ptr; + // ptr = getenv("CONNECT_TIMEOUT");if (!ptr) FATAL("undefined env var"); + // std::string connect_timeout = ptr; + // ptr = getenv("SLMODE");if (!ptr) FATAL("undefined env var"); + // std::string slmode = ptr; + // ptr = getenv("SERVICE");if (!ptr) FATAL("undefined env var"); + // std::string service = ptr; + + // init(dbname,user,host,hostaddr,port,connect_timeout,slmode,service); + init(dbname,user,host,schema,run_id); + } + /* -------------------------------------------------------------------------- */ + + void Pusher::init(const std::string & dbname, const std::string & user, const std::string & host, - const std::string & hostaddr, - const std::string & port, - const std::string & connect_timeout, - const std::string & slmode, - const std::string & service) { + const std::string & schema, + UInt run_id + //const std::string & hostaddr, + //const std::string & port, + //const std::string & connect_timeout, + //const std::string & slmode, + //const std::string & service + ) { std::string options(""); if (this->sql_connection == NULL) { options += request_increment("dbname" , dbname ); options += request_increment("user" , user ); options += request_increment("host" , host ); - options += request_increment("hostaddr" , hostaddr ); - options += request_increment("port" , port ); - options += request_increment("connect_timeout", connect_timeout); - options += request_increment("slmode" , slmode ); - options += request_increment("service" , service ); + // options += request_increment("hostaddr" , hostaddr ); + // options += request_increment("port" , port ); + //options += request_increment("connect_timeout", connect_timeout); + //options += request_increment("slmode" , slmode ); + //options += request_increment("service" , service ); try { this->sql_connection = new pqxx::connection(options); this->sql_connection->activate(); } catch (std::runtime_error & e) { FATAL ("Database connection failed : " << e.what()); } catch (std::exception & e) { - FATAL (e.what()); - } catch (...) { - FATAL ("Unknown exception during connection to database"); + FATAL (e.what()); + } catch (...) { + FATAL ("Unknown exception during connection to database"); + } } - } - if (this->sql_schema.empty()) { - this->scalar_integer.tablename = this->sql_scalar_integer_name; - this->scalar_float.tablename = this->sql_scalar_float_name; - this->vector_float.tablename = this->sql_vector_float_name; - this->quantities_tablename = "quantities"; - } else { - this->scalar_integer.tablename = this->sql_schema + "." + this->sql_scalar_integer_name; - this->scalar_float.tablename = this->sql_schema + "." + this->sql_scalar_float_name; - this->vector_float.tablename = this->sql_schema + "." + this->sql_vector_float_name; - this->quantities_tablename = this->sql_schema + ".quantities"; - } - if (this->run_id <1) { - FATAL("The run id required for the sql dumps has not been specified"); + this->sql_schema = schema; + if (this->sql_schema.empty()) { + this->scalar_integer.tablename = "scalar_integer"; + this->scalar_real.tablename = "scalar_real"; + this->vector_real.tablename = "vector_real"; + this->quantities_tablename = "quantities"; + } else { + this->scalar_integer.tablename = this->sql_schema + "." + "scalar_integer"; + this->scalar_real.tablename = this->sql_schema + "." + "scalar_real"; + this->vector_real.tablename = this->sql_schema + "." + "vector_real"; + this->quantities_tablename = this->sql_schema + ".quantities"; + } + this->run_id = run_id; + if (this->run_id <1) { + FATAL("The run id required for the sql dumps has not been specified"); + } + this->scalar_integer.run_id = this->run_id; + this->scalar_real.run_id = this->run_id; + this->vector_real.run_id = this->run_id; } - this->scalar_integer.run_id = this->run_id; - this->scalar_float.run_id = this->run_id; - this->vector_float.run_id = this->run_id; -} -/* -------------------------------------------------------------------------- */ + /* -------------------------------------------------------------------------- */ -void Pusher::push(const int & value, const UInt& quantity_id, const UInt& step) { - this->scalar_integer.value = &value; - this->scalar_integer.run_id = this->run_id; - this->scalar_integer.quantity_id = quantity_id; - this->scalar_integer.step = step; - this->sql_connection->perform(SqlPusher(this->scalar_integer)); -} + void Pusher::push(const int & value, const UInt& quantity_id, const UInt& step) { + this->scalar_integer.value = &value; + this->scalar_integer.run_id = this->run_id; + this->scalar_integer.quantity_id = quantity_id; + this->scalar_integer.step = step; + this->sql_connection->perform(SqlPusher(this->scalar_integer)); + } -/* -------------------------------------------------------------------------- */ + /* -------------------------------------------------------------------------- */ -void Pusher::push(const Real & value, const UInt& quantity_id, const UInt& step) { - this->scalar_float.value = &value; - this->scalar_float.run_id = this->run_id; - this->scalar_float.quantity_id = quantity_id; - this->scalar_float.step = step; - this->sql_connection->perform(SqlPusher(this->scalar_float)); -} + void Pusher::push(const Real & value, const UInt& quantity_id, const UInt& step) { + this->scalar_real.value = &value; + this->scalar_real.run_id = this->run_id; + this->scalar_real.quantity_id = quantity_id; + this->scalar_real.step = step; + this->sql_connection->perform(SqlPusher(this->scalar_real)); + } -/* -------------------------------------------------------------------------- */ + /* -------------------------------------------------------------------------- */ void Pusher::push(const std::vector & values, const UInt& quantity_id, const UInt& step) { - this->vector_float.value = &values; - this->vector_float.run_id = this->run_id; - this->vector_float.quantity_id = quantity_id; - this->vector_float.step = step; - this->sql_connection->perform(SqlPusher(this->vector_float)); + this->vector_real.value = &values; + this->vector_real.run_id = this->run_id; + this->vector_real.quantity_id = quantity_id; + this->vector_real.step = step; + this->sql_connection->perform(SqlPusher(this->vector_real)); } -/* -------------------------------------------------------------------------- */ + /* -------------------------------------------------------------------------- */ void Pusher::push(const std::vector & values, const UInt& quantity_id, const UInt& step) { this->vector_integer.value = &values; this->vector_integer.run_id = this->run_id; this->vector_integer.quantity_id = quantity_id; this->vector_integer.step = step; this->sql_connection->perform(SqlPusher(this->vector_integer)); } -/* -------------------------------------------------------------------------- */ + /* -------------------------------------------------------------------------- */ UInt Pusher::getQuantityID(const std::string & name) { UInt return_id = 0; std::stringstream command; command << "SELECT id FROM " << this->quantities_tablename << " WHERE name = '" << pqxx::to_string(name) << "';"; pqxx::transaction transaction(*this->sql_connection); try { pqxx::result result= transaction.exec(command); if (result.size() == 0) { FATAL("There is no quantity named '" << name << " defined in the database. ( The failed query was " << command.str() << ")"); } return_id = result.at(0).at(0).as(UInt(0)); if (return_id == 0) { FATAL("There appears to be a problem, query for quantity id of " << name); } } catch (std::runtime_error & e) { FATAL("Connection failed with " << e.what()); } catch (std::exception & e) { FATAL("exception occured when executing sql request: "<< command.str() << std::endl << e.what()); } catch (...) { FATAL("Unknown error occured while trying to query quantity_id for " << name); } return return_id; } -/* -------------------------------------------------------------------------- */ + /* -------------------------------------------------------------------------- */ -UInt Pusher::connection_counter = 0; + UInt Pusher::connection_counter = 0; pqxx::connection * Pusher::sql_connection = NULL; -/* -------------------------------------------------------------------------- */ + /* -------------------------------------------------------------------------- */ } diff --git a/src/pusher.hh b/src/pusher.hh index 3cb68ed..17a1ea6 100644 --- a/src/pusher.hh +++ b/src/pusher.hh @@ -1,161 +1,170 @@ /* author : Guillaume ANCIAUX (anciaux@labri.fr, g.anciaux@laposte.net) author : Till JUNGE */ /* -------------------------------------------------------------------------- */ #ifndef __BLACK_DYNAMITE_PUSHER_HH__ #define __BLACK_DYNAMITE_PUSHER_HH__ /* -------------------------------------------------------------------------- */ #include #include #include #include /* -------------------------------------------------------------------------- */ typedef unsigned int UInt; typedef double Real; /* -------------------------------------------------------------------------- */ +#ifndef FATAL #define FATAL(x) {std::cerr << x << std::endl ; exit(EXIT_FAILURE);} +#endif /* -------------------------------------------------------------------------- */ namespace BlackDynamite { class Pusher { /* ------------------------------------------------------------------------ */ /* Typedefs */ /* ------------------------------------------------------------------------ */ // See chapter 10 section "Designing transactor<>-based Applications" in Douglas: "PostgreSQL" (0-672-32756-2 // this object represents a transaction template struct transaction_input { std::string tablename; UInt run_id; UInt quantity_id; UInt step; const type * value; }; // transaction for non scalar types template struct transaction_input { std::string tablename; UInt run_id; UInt quantity_id; UInt step; const std::vector * value; }; // pqxx transaction type typedef pqxx::robusttransaction transaction; // obect to really do the push to database template class SqlPusher; /* ------------------------------------------------------------------------ */ /* Constructors/Destructors */ /* ------------------------------------------------------------------------ */ public: Pusher(); virtual ~Pusher(); /* ------------------------------------------------------------------------ */ /* Methods */ /* ------------------------------------------------------------------------ */ + //! initialisation method + void init(); + //! initialisation method void init(const std::string & dbname, const std::string & user, const std::string & host, - const std::string & hostaddr, - const std::string & port, - const std::string & connect_timeout, - const std::string & slmode, - const std::string & service); + const std::string & schema, + UInt run_id + // const std::string & hostaddr, + // const std::string & port, + // const std::string & connect_timeout, + // const std::string & slmode, + // const std::string & service + ); //! template method that pushes a value associated with a quantity at a particular timestep template inline void push(valuetype value, const std::string& quantity, const UInt& step); //! template method that pushes an int associated with a quantity at a particular timestep void push(const int & value, const UInt& quantity_id, const UInt& step); //! template method that pushes a real associated with a quantity at a particular timestep void push(const Real & value, const UInt& quantity_id, const UInt& step); //! template method that pushes an array of int associated with a quantity at a particular timestep void push(const std::vector & values, const UInt& quantity_id, const UInt& step); //! template method that pushes an array of real associated with a quantity at a particular timestep void push(const std::vector & values, const UInt& quantity_id, const UInt& step); //! return the integer id of a quantity identified with a string UInt getQuantityID(const std::string & name); private: //! simple function to concatenate the keyword and parameter for forging a request inline std::string request_increment(std::string keyword, std::string parameter); /* ------------------------------------------------------------------------ */ /* Class Members */ /* ------------------------------------------------------------------------ */ protected: UInt run_id; transaction_input scalar_integer; - transaction_input scalar_float; + transaction_input scalar_real; transaction_input vector_integer; - transaction_input vector_float; + transaction_input vector_real; //standard postgresql connection parameters // std::string sql_dbname, sql_user, sql_host, sql_password, sql_hostaddr, sql_port, sql_connect_timeout, sql_slmode, sql_service; std::string sql_schema; - std::string sql_scalar_integer_name, sql_scalar_float_name, sql_vector_float_name; std::string quantities_tablename; static pqxx::connection * sql_connection; static UInt connection_counter; std::map quantity_ids; bool sql_mode; }; /* -------------------------------------------------------------------------- */ template inline void Pusher::push(valuetype value, const std::string& quantity, const UInt& step){ + if (!this->quantity_ids.count(quantity)) + this->quantity_ids[quantity] = getQuantityID(quantity); UInt quantity_id = this->quantity_ids[quantity]; this->push(value, quantity_id, step); } /* -------------------------------------------------------------------------- */ template inline std::string nan_avoider(const num& val) { return pqxx::to_string(val); } /* -------------------------------------------------------------------------- */ template <> inline std::string nan_avoider(const double& val) { if (isnan(val)) { return "'nan'::double precision"; } else { std::stringstream val_rep; val_rep.precision(16); val_rep << std::scientific << val; return val_rep.str(); } } /* -------------------------------------------------------------------------- */ } #endif /* __BLACK_DYNAMITE_PUSHER_HH__ */