Page Menu
Home
c4science
Search
Configure Global Search
Log In
Files
F62152271
esclient.py
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Subscribers
None
File Metadata
Details
File Info
Storage
Attached
Created
Sat, May 11, 06:27
Size
4 KB
Mime Type
text/x-python
Expires
Mon, May 13, 06:27 (2 d)
Engine
blob
Format
Raw Data
Handle
17611185
Attached To
R11599 sausage-binary
esclient.py
View Options
from
elasticsearch
import
Elasticsearch
from
elasticsearch_dsl
import
Search
from
sausage.esquery
import
ESQuery
from
decimal
import
*
import
warnings
import
json
class
ESClient
(
Elasticsearch
):
def
__init__
(
self
,
hosts
,
index
,
conf
):
self
.
conf
=
conf
self
.
index
=
index
self
.
item
=
None
self
.
set_query
()
self
.
es
=
Search
(
using
=
Elasticsearch
(
hosts
=
hosts
),
index
=
index
)
self
.
es
=
self
.
es
.
query
(
self
.
query
.
query
)
self
.
es
.
aggs
.
bucket
(
self
.
entity
,
self
.
query
.
aggs
)
self
.
total
=
{
"time"
:
0
,
"count"
:
0
,
"carbon"
:
0
,
"money"
:
0
}
def
set_result
(
self
):
# WARNING :
# To catch a warning when anonymous user as no privilege to check health of ES Cluster
# We will have to check it w/ another way
with
warnings
.
catch_warnings
(
record
=
True
)
as
w
:
resp
=
self
.
es
[:
0
]
.
execute
()
if
resp
.
hits
.
total
.
value
==
0
:
self
.
result
=
None
return
# print(json.dumps(resp.to_dict(), indent=2))
result
=
{
self
.
entity
:
self
.
item
,
"type"
:
self
.
type
,
"start"
:
self
.
query
.
start
,
"end"
:
self
.
query
.
end
,
}
# self.total = dict(result, **self.total)
result
[
"metrics"
]
=
[]
# if self.item:
# result["item"] = self.item
if
self
.
entity
==
"user"
or
(
self
.
entity
==
"account"
and
self
.
conf
[
"all"
]):
sub_entity
=
"account"
if
self
.
entity
==
"user"
else
"user"
sub_res
=
getattr
(
resp
.
aggregations
,
self
.
entity
)[
0
]
sub_res
=
getattr
(
sub_res
,
sub_entity
)
for
a
in
sub_res
:
result
[
"metrics"
]
.
extend
(
self
.
get_metrics
(
a
.
metrics
,
"cluster"
,
sub_entity
,
a
.
key
)
)
# sort by sub group (user or account)
result
[
"metrics"
]
=
sorted
(
result
[
"metrics"
],
key
=
lambda
d
:
d
[
sub_entity
],
)
else
:
sub_res
=
getattr
(
resp
.
aggregations
,
self
.
entity
)
if
self
.
type
==
"acctbilling"
:
entity
=
"account"
else
:
entity
=
"cluster"
result
[
"metrics"
]
.
extend
(
self
.
get_metrics
(
sub_res
,
entity
,
None
,
self
.
item
))
self
.
result
=
result
def
get_metrics
(
self
,
metrics
,
entity
,
sub_entity
=
None
,
sub_item
=
None
):
result
=
[]
# result = [
# {
# sub_entity: sub_item,
# entity: b.key,
# "count": b.doc_count,
# "money": round(b.money.value, 2),
# "carbon": round(b.carbon.value, 2),
# "time": round(b.time.value, 2),
# }
# for b in metrics
# ]
for
b
in
metrics
:
temp
=
{
entity
:
b
.
key
,
"count"
:
b
.
doc_count
,
"money"
:
round
(
b
.
money
.
value
,
2
),
"carbon"
:
round
(
b
.
carbon
.
value
,
2
),
"time"
:
round
(
b
.
time
.
value
,
2
),
}
for
k
in
[
"count"
,
"money"
,
"carbon"
,
"time"
]:
self
.
total
[
k
]
=
round
(
temp
[
k
]
+
self
.
total
[
k
],
2
)
if
sub_entity
:
temp
[
sub_entity
]
=
sub_item
result
.
append
(
temp
)
result
=
sorted
(
result
,
key
=
lambda
d
:
d
[
entity
])
return
result
def
set_query
(
self
):
start
=
self
.
conf
[
"start"
]
end
=
self
.
conf
[
"end"
]
if
self
.
conf
[
"billing"
]
is
not
None
:
self
.
type
=
"acctbilling"
self
.
entity
=
self
.
type
start
=
self
.
conf
[
"billing"
]
elif
self
.
conf
[
"user"
]
is
not
None
and
self
.
conf
[
"account"
]
is
not
None
:
self
.
type
=
"user"
self
.
entity
=
"user_account"
self
.
item
=
[
self
.
conf
[
"user"
],
self
.
conf
[
"account"
]]
elif
self
.
conf
[
"user"
]
is
not
None
:
self
.
entity
=
"user"
self
.
item
=
self
.
conf
[
"user"
]
self
.
type
=
"user"
elif
self
.
conf
[
"account"
]
is
not
None
:
self
.
entity
=
"account"
self
.
item
=
self
.
conf
[
"account"
]
self
.
type
=
"account_all"
if
self
.
conf
[
"all"
]
else
"account"
else
:
# TODO
print
(
"ZZZZ"
)
self
.
query
=
ESQuery
(
entity
=
self
.
entity
,
start
=
start
,
end
=
end
,
item
=
self
.
item
,
all_sub_item
=
self
.
conf
[
"all"
],
)
if
self
.
entity
==
"user_account"
:
self
.
entity
=
"user"
self
.
item
=
self
.
item
[
0
]
# match list(self.conf.items()):
# case [('billing',date)] :
# self.query = ESQuery("acctforbilling")
# print(date)
# case _ :
# print('ZZZZ')
Event Timeline
Log In to Comment