diff --git a/sausage/esquery.py b/sausage/esquery.py index 2495e6d..99dfb05 100644 --- a/sausage/esquery.py +++ b/sausage/esquery.py @@ -1,298 +1,370 @@ # © All rights reserved. ECOLE POLYTECHNIQUE FEDERALE DE LAUSANNE, # Switzerland # SCITAS - Scientific IT and Application Support, 2021 # See the LICENSE.txt file for more details. from datetime import date from sausage.loadconf import LoadConf class ESQuery(object): def __init__(self, index, entity, start=None, end=None, item=None): newconf = LoadConf() self.factor = getattr(newconf, index) self.clusters = newconf.clusters if not all((start, end)): self.start = str(date.today().replace(day=1)) self.end = str(date.today()) self.set_query(entity) else: if start <= end: self.start = start self.end = end self.set_range(entity, item) def set_query(self, entity): if entity == "account": self.query = { "size": 0, "query": { "bool": { "must": [ { "range": { "@end": { "gte": self.start + "T00:00:00", "lte": self.end + "T23:59:59" } } }, { "terms": { "cluster.keyword": self.clusters } } ], "must_not": [ { "terms": { "partition.keyword": ["build", "debug"] } }, { "term": { "state.keyword": "NODE_FAIL" } } ] } }, "aggs": { "cluster": { "terms": { "field": "cluster.keyword", "size": 10 }, "aggs": { "account": { "terms": { "field": "account.keyword", "size": 1000 }, "aggs": { "cost": { "sum": { "script": { "id": "cost_calculation", "params": {"factors": self.factor} } } } } } } } } } elif entity == "user": self.query = { "size": 0, "query": { "bool": { "must": [ { "range": { "@end": { "gte": self.start + "T00:00:00", "lte": self.end + "T23:59:59" } } }, { "terms": { "cluster.keyword": self.clusters } } ], "must_not": [ { "terms": { "partition.keyword": ["build", "debug"] } }, { "term": { "state.keyword": "NODE_FAIL" } } ] } }, "aggs": { "cluster": { "terms": { "field": "cluster.keyword", "size": 10 }, "aggs": { "account": { "terms": { "field": "account.keyword", "size": 1000 }, "aggs": { "user": { "terms": { "field": "username.keyword", "size": 1000 }, "aggs": { "cost": { "sum": { "script": { "id": "cost_calculation", "params": { "factors": self.factor } } } } } } } } } } } } def set_range(self, entity, item): if entity == "account": esfield = "account.keyword" self.query = { "size": 0, "query": { "bool": { "must": [ { "range": { "@end": { "gte": self.start + "T00:00:00", "lte": self.end + "T23:59:59" } } }, { "terms": { "cluster.keyword": self.clusters } }, { "term": { esfield: { "value": item } } } ], "must_not": [ { "terms": { "partition.keyword": ["build", "debug"] } }, { "term": { "state.keyword": "NODE_FAIL" } } ] } }, "aggs": { "cluster": { "terms": { "field": "cluster.keyword", "size": 10 }, "aggs": { "cost": { "sum": { "script": { "id": "cost_calculation", "params": { "factors": self.factor } } } } } } } } elif entity == "user": esfield = "username.keyword" self.query = { "size": 0, "query": { "bool": { "must": [ { "range": { "@end": { "gte": self.start + "T00:00:00", "lte": self.end + "T23:59:59" } } }, { "terms": { "cluster.keyword": self.clusters } }, { "term": { esfield: { "value": item } } } ], "must_not": [ { "terms": { "partition.keyword": ["build", "debug"] } }, { "term": { "state.keyword": "NODE_FAIL" } } ] } }, "aggs": { "account": { "terms": { "field": "account.keyword", "size": 10 }, "aggs": { "cluster": { "terms": { "field": "cluster.keyword", "size": 10 }, "aggs": { "cost": { "sum": { "script": { "id": "cost_calculation", "params": { "factors": self.factor } } } } } } } } } } + elif entity == "all": + esfield = "account.keyword" + self.query = { + "size": 0, + "query": { + "bool": { + "must": [ + { + "range": { + "@end": { + "gte": self.start + "T00:00:00", + "lte": self.end + "T23:59:59" + } + } + }, + { + "terms": { + "cluster.keyword": self.clusters + } + }, + { + "term": { + esfield: { + "value": item + } + } + } + ], + "must_not": [ + { + "terms": { + "partition.keyword": ["build", "debug"] + } + }, + { + "term": { + "state.keyword": "NODE_FAIL" + } + } + ] + } + }, + "aggs": { + "user": { + "terms": { + "field": "username.keyword", + "size": 500 + }, + "aggs": { + "cluster": { + "terms": { + "field": "cluster.keyword", + "size": 10 + }, + "aggs": { + "cost": { + "sum": { + "script": { + "id": "cost_calculation", + "params": { + "factors": self.factor + } + } + } + } + } + } + } + } + } + } + diff --git a/sausage/queryrange.py b/sausage/queryrange.py index 685bac3..526954c 100644 --- a/sausage/queryrange.py +++ b/sausage/queryrange.py @@ -1,88 +1,106 @@ # © All rights reserved. ECOLE POLYTECHNIQUE FEDERALE DE LAUSANNE, # Switzerland # SCITAS - Scientific IT and Application Support, 2021 # See the LICENSE.txt file for more details. import falcon from datetime import datetime from sausage.esoperations import ESOp from sausage.esquery import ESQuery from sausage.loadconf import LoadConf from sausage.readconf import ReadConf class QueryRange(object): def __init__(self): self.stamp = "" newconf = LoadConf() self.currencies = newconf.currencies server_opt = ReadConf() self.es_server = server_opt.es_server def get_es_data(self, entity, start, end, item): self.cache.update({ item + '-' + self.stamp: { "name": item, "timestamp": self.timestamp } }) for currency in self.currencies: oqueries = ESQuery(currency, entity, start, end, item) self.doc = oqueries.query query = ESOp(self.es_server) esdata = query.esget("slurm", self.doc) if entity == "account": for cluster in esdata['aggregations']['cluster']['buckets']: if self.cache[item + '-' + self.stamp].get(cluster["key"]) is None: self.cache[item + '-' + self.stamp].update({ cluster["key"]: { } }) self.cache[item + '-' + self.stamp][cluster["key"] ][currency] = cluster["cost"]["value"] + elif entity == "all": + for user in esdata['aggregations']['user']['buckets']: + if self.cache[item + '-' + + self.stamp].get(user["key"]) is None: + self.cache[item + '-' + self.stamp].update({ + user["key"]: { + } + }) + for cluster in user['cluster']['buckets']: + if self.cache[item + '-' + self.stamp][user["key"] + ].get(cluster["key"]) is None: + self.cache[item + '-' + self.stamp][user["key"]].update({ + cluster["key"]: { + } + }) + self.cache[item + '-' + self.stamp][user["key"] + ][cluster["key"]][currency] = cluster["cost"]["value"] + elif entity == "user": for account in esdata['aggregations']['account']['buckets']: if self.cache[item + '-' + self.stamp].get(account["key"]) is None: self.cache[item + '-' + self.stamp].update({ account["key"]: { } }) for cluster in account['cluster']['buckets']: if self.cache[item + '-' + self.stamp][account["key"] ].get(cluster["key"]) is None: self.cache[item + '-' + self.stamp][account["key"]].update({ cluster["key"]: { } }) self.cache[item + '-' + self.stamp][account["key"] ][cluster["key"]][currency] = cluster["cost"]["value"] def on_get(self, req, resp, entity, start, end, item): try: self.stamp = start.replace('-', '') + end.replace('-', '') - if entity == "account" or entity == "user": + if entity == "account" or entity == "user" or entity == "all": self.timestamp = int(datetime.timestamp(datetime.now())) if not hasattr(self, 'cache'): self.cache = {} if self.cache.get(item + '-' + self.stamp) is None: esdata = self.get_es_data(entity, start, end, item) else: lifetime = self.timestamp - \ self.cache[item + '-' + self.stamp]["timestamp"] if lifetime > 10: esdata = self.get_es_data(entity, start, end, item) response = self.cache[item + '-' + self.stamp] else: response = {} except BaseException: response = {} resp.status = falcon.HTTP_200 resp.media = response