diff --git a/sausage-api b/sausage-api old mode 100644 new mode 100755 index ecf2187..27abc67 --- a/sausage-api +++ b/sausage-api @@ -1,96 +1,105 @@ #!/usr/bin/python3 # © All rights reserved. ECOLE POLYTECHNIQUE FEDERALE DE LAUSANNE, # Switzerland # SCITAS - Scientific IT and Application Support, 2021 # See the LICENSE.txt file for more details. import falcon import gunicorn.app.base import multiprocessing import time import threading import requests import logging +from datetime import date from sys import exit from os import getpid from sausage.middleware import Sausage from sausage.readconf import ReadConf class UpdateDb(object): def __init__(self): server_opt = ReadConf() self.server = server_opt.bind_addr + ":" + server_opt.bind_port self.reponse = {} # create logger self.logger = logging.getLogger('UpdateDb') self.logger.setLevel(logging.DEBUG) # create console handler and set level to debug lconf = logging.StreamHandler() lconf.setLevel(logging.DEBUG) formatter = logging.Formatter( '[%(asctime)s] [%(name)s] [%(levelname)s] %(message)s', datefmt='%Y-%m-%d %H:%M:%S') lconf.setFormatter(formatter) self.logger.addHandler(lconf) self.exe = threading.Thread(target=self.infinite) self.exe.daemon = True self.exe.start() def infinite(self): init = 0 + purgedb = 1 pid = getpid() try: while True: + if int(date.today().day) == 1 and purgedb == 1: + self.response = requests.get( + 'http://' + self.server + '/database/purge') + purgedb = 0 + if int(date.today().day) > 1: + purgedb = 1 + if init == 1: timeout = 180 else: timeout = 3 init = 1 # Hack to catch sigint in the self thread try: time.sleep(timeout) except (KeyboardInterrupt, SystemExit): exit(0) # End of hack self.response = requests.get( 'http://' + self.server + '/database/update') req_state = self.response.json() if req_state['return'] == 'Database connection error': self.logger.warning(req_state['return']) else: self.logger.info('Update Database with pid ' + str(pid)) except Exception: self.logger.error('Error updating records in database') exit(1) class GunicornApp(gunicorn.app.base.BaseApplication): def __init__(self, app, options=None): self.options = options or {} self.application = app super().__init__() def load_config(self): config = {key: value for key, value in self.options.items() if key in self.cfg.settings and value is not None} for key, value in config.items(): self.cfg.set(key.lower(), value) def load(self): return self.application if __name__ == '__main__': sausage_api = Sausage() server_opt = ReadConf() options = { 'bind': '%s:%s' % (server_opt.bind_addr, server_opt.bind_port), 'workers': multiprocessing.cpu_count(), } updtdb = UpdateDb() GunicornApp(sausage_api, options).run() diff --git a/sausage/datamanager.py b/sausage/datamanager.py index 68c8a5a..108e88a 100644 --- a/sausage/datamanager.py +++ b/sausage/datamanager.py @@ -1,66 +1,82 @@ # © All rights reserved. ECOLE POLYTECHNIQUE FEDERALE DE LAUSANNE, # Switzerland # SCITAS - Scientific IT and Application Support, 2021 # See the LICENSE.txt file for more details. import falcon from sausage.esoperations import ESOp from sausage.dboperations import DBOp from sausage.esquery import ESQuery from sausage.printer import PrintJson from sausage.loadconf import LoadConf from sausage.readconf import ReadConf class DataManager(object): def __init__(self): self.entities = ["account", "user"] newconf = LoadConf() self.currencies = newconf.currencies server_opt = ReadConf() self.es_server = server_opt.es_server def on_get_update(self, req, resp): response = {"operation": "update database", "return": "OK"} resp.status = falcon.HTTP_200 for entity in self.entities: for currency in self.currencies: + try: oqueries = ESQuery(currency, entity) self.doc = oqueries.query query = ESOp(self.es_server) esdata = query.esget("slurm", self.doc) try: newdb = DBOp() newdb.populate(esdata, entity, currency) except BaseException: response = { "operation": "update database", "return": "Database connection error"} resp.media = response except BaseException: response = { "operation": "getting data", "return": "Elasticsearch Service Unavailable"} resp.media = response resp.media = response def on_get_show(self, req, resp): resp.status = falcon.HTTP_200 response = {} for entity in self.entities: try: newdb = DBOp() table_data = newdb.show_table(entity) if table_data: pjson = PrintJson(entity, table_data) response[entity] = pjson.response resp.media = response except BaseException: response = { "operation": "show table", "return": "Database connection error"} resp.media = response + + def on_get_purge(self, req, resp): + resp.status = falcon.HTTP_200 + response = {"operation": "purge database", "return": "OK"} + + for entity in self.entities: + try: + newdb = DBOp() + newdb.purge_table(entity) + resp.media = response + except BaseException: + response = { + "operation": "purge database", + "return": "Database connection error"} + resp.media = response diff --git a/sausage/dboperations.py b/sausage/dboperations.py index dc967e3..d815a0b 100644 --- a/sausage/dboperations.py +++ b/sausage/dboperations.py @@ -1,101 +1,109 @@ # © All rights reserved. ECOLE POLYTECHNIQUE FEDERALE DE LAUSANNE, # Switzerland # SCITAS - Scientific IT and Application Support, 2021 # See the LICENSE.txt file for more details. import sqlite3 from datetime import datetime class DBOp: def __init__(self, db=sqlite3.connect( "file:/dev/shm/sausagedb?cache=shared", uri=True)): self.database = db self.cursor = self.database.cursor() self.cursor.execute('''CREATE TABLE IF NOT EXISTS account (key text PRIMARY KEY, name text NOT NULL, time float, chf float,\ co2 float, queue float)''') self.cursor.execute( '''CREATE UNIQUE INDEX IF NOT EXISTS idx_key ON account (key)''') self.cursor.execute('''CREATE TABLE IF NOT EXISTS user (key text PRIMARY KEY, name text NOT NULL, account text NOT NULL,\ time float, chf float, co2 float, queue float)''') self.cursor.execute( '''CREATE UNIQUE INDEX IF NOT EXISTS idx_key ON user (key)''') self.database.commit() def valid_date(self, date): try: if datetime.strptime(date, "%Y-%m-%d"): return True except ValueError: return False def populate(self, esdata, entity, currency): for record in esdata['aggregations']['cluster']['buckets']: cluster = record['key'] for acct in record['account']['buckets']: unit = acct['key'] self.cursor.execute( "INSERT OR IGNORE INTO account (key, name) VALUES(?,?)", (cluster + '-' + unit, unit, )) if entity == "account": consumption = acct['cost']['value'] if currency == "chf": self.cursor.execute( "UPDATE account SET chf = ? WHERE key = ?", (consumption, cluster + '-' + unit,)) elif currency == "co2": self.cursor.execute( "UPDATE account SET co2 = ? WHERE key = ?", (consumption, cluster + '-' + unit,)) elif currency == "time": self.cursor.execute( "UPDATE account SET time = ? WHERE key = ?", (consumption, cluster + '-' + unit,)) else: return False self.database.commit() elif entity == "user": for usr in acct['user']['buckets']: person = usr["key"] consumption = usr['cost']['value'] key = cluster + '-' + unit + '-' + person self.cursor.execute( "INSERT OR IGNORE INTO user (key, name, account) VALUES(?,?,?)", (key, person, unit, )) if currency == "chf": self.cursor.execute( "UPDATE user SET chf = ? WHERE key = ?", (consumption, key,)) elif currency == "co2": self.cursor.execute( "UPDATE user SET co2 = ? WHERE key = ?", (consumption, key,)) elif currency == "time": self.cursor.execute( "UPDATE user SET time = ? WHERE key = ?", (consumption, key,)) else: return False self.database.commit() else: return False def get_account(self, account): self.cursor.execute("SELECT * FROM account WHERE name=?", (account,)) return self.cursor.fetchall() def get_user(self, username, account): if account == "null": self.cursor.execute("SELECT * FROM user WHERE name=?", (username,)) else: - self.cursor.execute( - "SELECT * FROM user WHERE name=? AND account=?", (username, account,)) + if username == "all": + self.cursor.execute("SELECT * FROM user WHERE account=?", (account,)) + else: + self.cursor.execute( + "SELECT * FROM user WHERE name=? AND account=?", (username, account,)) return self.cursor.fetchall() def show_table(self, table): query = "SELECT * FROM " + table + " ORDER BY name ASC" return self.cursor.execute(query).fetchall() + def purge_table(self, table): + query = "DELETE FROM " + table + self.cursor.execute(query) + self.database.commit() + def destroy(self): self.cursor.close() self.database.close() diff --git a/sausage/middleware.py b/sausage/middleware.py index 8ca55c8..c785310 100644 --- a/sausage/middleware.py +++ b/sausage/middleware.py @@ -1,38 +1,39 @@ # © All rights reserved. ECOLE POLYTECHNIQUE FEDERALE DE LAUSANNE, # Switzerland # SCITAS - Scientific IT and Application Support, 2021 # See the LICENSE.txt file for more details. import falcon from sausage.slurmaccount import SlurmAccount from sausage.datamanager import DataManager from sausage.queryrange import QueryRange class Sausage(falcon.App): def __init__(self): super(Sausage, self).__init__() # Create our resources saccount = SlurmAccount() datamgr = DataManager() qrange = QueryRange() # Build routes self.add_route('/account/{account}', saccount, suffix="account") self.add_route( '/account/{account}/{username}', saccount, suffix="user") self.add_route('/user/{username}', saccount, suffix="user") self.add_route('/database/update', datamgr, suffix="update") self.add_route('/database/show', datamgr, suffix="show") + self.add_route('/database/purge', datamgr, suffix="purge") self.add_route('/range/{entity}/{start}/{end}/{item}', qrange) def start(self): """ A hook to when a Gunicorn worker calls run().""" pass def stop(self, signal): """ A hook to when a Gunicorn worker starts shutting down. """ pass diff --git a/sausage/printer.py b/sausage/printer.py index 88f634f..0d5e0ae 100644 --- a/sausage/printer.py +++ b/sausage/printer.py @@ -1,47 +1,53 @@ # © All rights reserved. ECOLE POLYTECHNIQUE FEDERALE DE LAUSANNE, # Switzerland # SCITAS - Scientific IT and Application Support, 2021 # See the LICENSE.txt file for more details. class PrintJson(object): def __init__(self, entity, data): self.entity = entity self.data = data self.response = {} self.set_scheme() def set_scheme(self): if self.entity == "account": for element in self.data: + if "name" not in self.response.keys(): + self.response.update({ + "name": element[1] + }) self.response.update({ - "name": element[1], element[0].split(sep='-')[0]: { "time": element[2], "chf": element[3], "co2": element[4] } }) + + + elif self.entity == "user": for element in self.data: if "name" not in self.response.keys(): self.response.update({ "name": element[1] }) if element[2] in self.response.keys(): self.response[element[2]].update({ element[0].split(sep='-')[0]: { "time": element[3], "chf": element[4], "co2": element[5] } }) else: self.response.update({ element[2]: { element[0].split(sep='-')[0]: { "time": element[3], "chf": element[4], "co2": element[5] } } })