Page MenuHomec4science

esclient.py
No OneTemporary

File Metadata

Created
Sat, May 11, 06:27

esclient.py

from elasticsearch import Elasticsearch
from elasticsearch_dsl import Search
from sausage.esquery import ESQuery
from decimal import *
import warnings
import json
class ESClient(Elasticsearch):
def __init__(self, hosts, index, conf):
self.conf = conf
self.index = index
self.item = None
self.set_query()
self.es = Search(using=Elasticsearch(hosts=hosts), index=index)
self.es = self.es.query(self.query.query)
self.es.aggs.bucket(self.entity, self.query.aggs)
self.total = {"time": 0, "count": 0, "carbon": 0, "money": 0}
def set_result(self):
# WARNING :
# To catch a warning when anonymous user as no privilege to check health of ES Cluster
# We will have to check it w/ another way
with warnings.catch_warnings(record=True) as w:
resp = self.es[:0].execute()
if resp.hits.total.value == 0:
self.result = None
return
# print(json.dumps(resp.to_dict(), indent=2))
result = {
self.entity: self.item,
"type": self.type,
"start": self.query.start,
"end": self.query.end,
}
# self.total = dict(result, **self.total)
result["metrics"] = []
# if self.item:
# result["item"] = self.item
if self.entity == "user" or (self.entity == "account" and self.conf["all"]):
sub_entity = "account" if self.entity == "user" else "user"
sub_res = getattr(resp.aggregations, self.entity)[0]
sub_res = getattr(sub_res, sub_entity)
for a in sub_res:
result["metrics"].extend(
self.get_metrics(a.metrics, "cluster", sub_entity, a.key)
)
# sort by sub group (user or account)
result["metrics"] = sorted(
result["metrics"],
key=lambda d: d[sub_entity],
)
else:
sub_res = getattr(resp.aggregations, self.entity)
if self.type == "acctbilling":
entity = "account"
else:
entity = "cluster"
result["metrics"].extend(self.get_metrics(sub_res, entity, None, self.item))
self.result = result
def get_metrics(self, metrics, entity, sub_entity=None, sub_item=None):
result = []
# result = [
# {
# sub_entity: sub_item,
# entity: b.key,
# "count": b.doc_count,
# "money": round(b.money.value, 2),
# "carbon": round(b.carbon.value, 2),
# "time": round(b.time.value, 2),
# }
# for b in metrics
# ]
for b in metrics:
temp = {
entity: b.key,
"count": b.doc_count,
"money": round(b.money.value, 2),
"carbon": round(b.carbon.value, 2),
"time": round(b.time.value, 2),
}
for k in ["count", "money", "carbon", "time"]:
self.total[k] = round(temp[k] + self.total[k], 2)
if sub_entity:
temp[sub_entity] = sub_item
result.append(temp)
result = sorted(result, key=lambda d: d[entity])
return result
def set_query(self):
start = self.conf["start"]
end = self.conf["end"]
if self.conf["billing"] is not None:
self.type = "acctbilling"
self.entity = self.type
start = self.conf["billing"]
elif self.conf["user"] is not None and self.conf["account"] is not None:
self.type = "user"
self.entity = "user_account"
self.item = [self.conf["user"], self.conf["account"]]
elif self.conf["user"] is not None:
self.entity = "user"
self.item = self.conf["user"]
self.type = "user"
elif self.conf["account"] is not None:
self.entity = "account"
self.item = self.conf["account"]
self.type = "account_all" if self.conf["all"] else "account"
else:
# TODO
print("ZZZZ")
self.query = ESQuery(
entity=self.entity,
start=start,
end=end,
item=self.item,
all_sub_item=self.conf["all"],
)
if self.entity == "user_account":
self.entity = "user"
self.item = self.item[0]
# match list(self.conf.items()):
# case [('billing',date)] :
# self.query = ESQuery("acctforbilling")
# print(date)
# case _ :
# print('ZZZZ')

Event Timeline