Page MenuHomec4science

esquery.py
No OneTemporary

File Metadata

Created
Sun, Sep 1, 15:32

esquery.py

'''
ESQuery module
'''
from datetime import date, timedelta
from elasticsearch_dsl import Q, A
from sausage.readconf import ReadConf
class ESQuery(object):
def __init__(self, entity, start=None, end=None, item=None, all_sub_item=False):
self.clusters = ReadConf.clusters
self.item = item
self.entity = entity
self.all_sub_item = all_sub_item
if not all((start, end)):
self.start = str(date.today().replace(day=1))
self.end = str(date.today() + timedelta(days=1))
else:
self.start = start
self.end = end
self.start = start.strftime("%Y-%m-%d")
self.end = self.end.strftime("%Y-%m-%d")
self.default_query = Q(
"bool",
must=[
Q("terms", cluster__keyword=self.clusters),
Q(
"range",
**{
"@end": {
"gte": f"{self.start}T00:00:00",
"lt": f"{self.end}T00:00:00",
}
},
),
],
must_not=[
Q("terms", partition__keyword=["build", "debug"]),
Q("term", state__keyword="NODE_FAIL"),
],
)
self.set_query()
def __str__(self):
return super.__str__(self.query)
def get_metrics(self, field):
aggs = A("terms", field=f"{field}.keyword", size=1000)
aggs.metric("carbon", "sum", field=ReadConf.fields["carbon"])
aggs.metric("money", "sum", field=ReadConf.fields["money"])
aggs.metric("time", "sum", field=ReadConf.fields["time"])
return aggs
def set_query(self):
if self.entity == "acctbilling":
field = "account"
self.query = self.default_query
self.aggs = self.get_metrics("account")
elif self.entity == "user" or self.entity == "user_account":
field = "username"
if self.entity == "user_account":
self.query = (
self.default_query
& Q("term", username__keyword=self.item[0])
& Q("term", account__keyword=self.item[1])
)
else:
self.query = self.default_query & Q(
"term", username__keyword=self.item)
self.aggs = A("terms", field=f"{field}.keyword", size=1000)
self.aggs.bucket(
"account", A("terms", field="account.keyword", size=1000)
).bucket("metrics", self.get_metrics("cluster"))
elif self.entity == "account":
field = "account"
self.query = self.default_query & Q(
"term", account__keyword=self.item)
if self.all_sub_item:
self.aggs = A("terms", field=f"{field}.keyword", size=1000)
self.aggs.bucket(
"user", A("terms", field="username.keyword", size=1000)
).bucket("metrics", self.get_metrics("cluster"))
else:
self.aggs = self.get_metrics("cluster")

Event Timeline