diff --git a/sausage/esclient.py b/sausage/esclient.py index 68e0418..6508ab3 100644 --- a/sausage/esclient.py +++ b/sausage/esclient.py @@ -1,141 +1,139 @@ from elasticsearch import Elasticsearch, exceptions as es_e from elasticsearch_dsl import Search from sausage.esquery import ESQuery import warnings import logging import json class ESClient(Elasticsearch): def __init__(self, hosts, index, conf): self.log = logging.getLogger(f"sausage.{self.__class__.__name__}") self.conf = conf self.index = index self.item = None self.set_query() self.es = Search(using=Elasticsearch(hosts=hosts, timeout=10), index=index) self.es = self.es.query(self.query.query) self.es.aggs.bucket(self.entity, self.query.aggs) self.log.debug(f"Query : {json.dumps(self.es.to_dict())}") self.total = {"time": 0, "count": 0, "carbon": 0, "money": 0} def set_result(self): # WARNING : # To catch a warning when anonymous user as no privilege to check health of ES Cluster # We will have to check it w/ another way with warnings.catch_warnings(record=True) as w: try: resp = self.es[:0].execute() except es_e.ConnectionError as es: self.result = None self.log.error(f"Unable to connect : {es}") return if resp is resp.hits.total.value == 0: self.result = None return - debug(f"Response : {json.dumps(resp.to_dict())}") result = { self.entity: self.item, "type": self.type, "start": self.query.start, "end": self.query.end, } # self.total = dict(result, **self.total) result["metrics"] = [] # if self.item: # result["item"] = self.item if self.entity == "user" or (self.entity == "account" and self.conf["all"]): sub_entity = "account" if self.entity == "user" else "user" sub_res = getattr(resp.aggregations, self.entity)[0] sub_res = getattr(sub_res, sub_entity) for a in sub_res: result["metrics"].extend( self.get_metrics(a.metrics, "cluster", sub_entity, a.key) ) # sort by sub group (user or account) result["metrics"] = sorted( result["metrics"], key=lambda d: d[sub_entity], ) else: sub_res = getattr(resp.aggregations, self.entity) if self.type == "acctbilling": entity = "account" else: entity = "cluster" result["metrics"].extend(self.get_metrics(sub_res, entity, None, self.item)) - debug(f"Result : {result}") self.result = result def get_metrics(self, metrics, entity, sub_entity=None, sub_item=None): result = [] # result = [ # { # sub_entity: sub_item, # entity: b.key, # "count": b.doc_count, # "money": round(b.money.value, 2), # "carbon": round(b.carbon.value, 2), # "time": round(b.time.value, 2), # } # for b in metrics # ] for b in metrics: temp = { entity: b.key, "count": b.doc_count, "money": round(b.money.value, 2), "carbon": round(b.carbon.value, 2), "time": round(b.time.value, 2), } for k in ["count", "money", "carbon", "time"]: self.total[k] = round(temp[k] + self.total[k], 2) if sub_entity: temp[sub_entity] = sub_item result.append(temp) result = sorted(result, key=lambda d: d[entity]) return result def set_query(self): start = self.conf["start"] end = self.conf["end"] if self.conf["billing"] is not None: self.type = "acctbilling" self.entity = self.type start = self.conf["billing"] elif self.conf["user"] is not None and self.conf["account"] is not None: self.type = "user" self.entity = "user_account" self.item = [self.conf["user"], self.conf["account"]] elif self.conf["user"] is not None: self.entity = "user" self.item = self.conf["user"] self.type = "user" elif self.conf["account"] is not None: self.entity = "account" self.item = self.conf["account"] self.type = "account_all" if self.conf["all"] else "account" else: # TODO print("ZZZZ") self.query = ESQuery( entity=self.entity, start=start, end=end, item=self.item, all_sub_item=self.conf["all"], ) if self.entity == "user_account": self.entity = "user" self.item = self.item[0] # match list(self.conf.items()): # case [('billing',date)] : # self.query = ESQuery("acctforbilling") # print(date) # case _ : # print('ZZZZ')