#! /usr/bin/env python3 import sys import os import logging import sqlite3 from logging.handlers import SysLogHandler from elasticsearch import Elasticsearch log_level = logging.INFO host = "https://scitastaloa.epfl.ch:9200" api_key = ["XXXXXXXXXXXXXXXXXXXX", "XXXXXXXXXXXXXXXXXXXXXX"] indexes = { "slurm_pricing_cost": ".slurm_pricing_cost", "slurm_account_pricing": ".slurm_account_pricing" } pivot_idx = 'pricing' sqlite_db = '.slurm.db' sql_table = { 'name': 'data', 'fields': { 'account': 'text', 'cluster': 'text', 'value': 'real' } } def create_data(): data = [] es = Elasticsearch(hosts=host, timeout=10, api_key=api_key) temp_a = {} index = indexes["slurm_pricing_cost"] resp = es.search(index=index, query={"match_all": {}}, size=1000) for hit in resp['hits']['hits']: source = hit['_source'] pivot = source[pivot_idx] temp_b = [] for cluster, value in source['clusters'].items(): temp_b.append([cluster, value]) temp_a[pivot] = temp_b index = indexes["slurm_account_pricing"] resp = es.search(index=index, query={"match_all": {}}, size=1000) for hit in resp['hits']['hits']: source = hit['_source'] pivot = source[pivot_idx] for cluster, value in temp_a[pivot]: data.append((source['account'], cluster, value)) return data def sqlite_init(cur): sql = "DROP TABLE %s" % sql_table['name'] try: cur.execute(sql) except sqlite3.OperationalError: pass col = ["%s %s" % (k, v) for k, v in sql_table['fields'].items()] sql = "CREATE TABLE %s (%s)" % (sql_table['name'], ",".join(col)) cur.execute(sql) def insert_data(): data = create_data() con = sqlite3.connect(sqlite_db) cur = con.cursor() sqlite_init(cur) cur.executemany(f"INSERT INTO {sql_table['name']} VALUES(?, ?, ?)", data) con.commit() con.close() if __name__ == "__main__": assert sys.version_info >= (3, 6) pid = os.getpid() level = log_level fmt = f"[%(levelname)s] ES {pid} %(name)s - %(message)s" # create a syslog handler handler = SysLogHandler(address="/dev/log") handler.setLevel(level) handler.setFormatter(logging.Formatter(fmt)) # add syslog handler to root logging.basicConfig(level=level, format=fmt, handlers=[handler]) logging.getLogger("elasticsearch").setLevel(logging.WARNING) logging.getLogger("elastic_transport").setLevel(logging.WARNING) insert_data() logging.info("Done")