diff --git a/job_submit_fx/elastic.lua b/job_submit_fx/elastic.lua index 944d24d..f4d9511 100644 --- a/job_submit_fx/elastic.lua +++ b/job_submit_fx/elastic.lua @@ -1,136 +1,178 @@ local elastic = {} local f = require("job_submit_fx/functions") -- now local now = os.date('*t') -- http headers local http_headers = { ["authorization"] = "ApiKey "..ELK_API_KEY } -local function get_http_headers () - return {["authorization"] = "ApiKey "..ELK_API_KEY} -end - --- search query +-- search queries -- sum of cost of all job of the current month from an item (account or username) -local post_data = [[ +local post_data_completed = [[ { "query":{ "bool": { "must": [ { "range": { "@end": { "gte": "%s", "lt": "%s" - } } }, { "term": { "%s.keyword": "%s" } } ], "must_not": [ { "term": { "state.keyword": "NODE_FAIL" } }, { "terms": { "partition.keyword": %s } } ] } }, "aggs": { "my_agg": { "sum": { "field": "cost_chf" } } }, "from": 0, "size": 0 } ]] +-- sum of cost of all job in squeue index in state RUNNING or PENDING from an item (account or username) +local post_data_queued = [[ +{ + "query":{ + "bool": { + "must": [ + { + "terms": { + "state.keyword" : ["RUNNING", "PENDING"] + } + }, + { + "term": { + "%s.keyword": "%s" + } + } + ], + "must_not": [ + { + "terms": { + "partition.keyword": %s + } + } + ] + } + }, + "aggs": { + "my_agg": { + "sum": { + "field": "cost_chf" + } + } + }, + "from": 0, + "size": 0 + +} +]] + -- get cost multiplier function elastic.get_cost_multiplier(account, cluster) local cost_multiplier = 1 url = f.sprintf("%s/%s/_doc/%s", ELK_URL, ELK_INDEX_ACCOUNT_PRICING, account) pricing_table = f.http_get(url, http_headers) if pricing_table then url = f.sprintf("%s/%s/_doc/%s", ELK_URL, ELK_INDEX_PRICING_COST, pricing_table._source.pricing) cost_multiplier_table = f.http_get(url, http_headers) if cost_multiplier_table then if cost_multiplier_table._source.clusters[cluster] then cost_multiplier = cost_multiplier_table._source.clusters[cluster] else slurm.log_info("elastic::: pricing %s without cluster %s entry in %s index",pricing_table._source.pricing, cluster, ELK_INDEX_PRICING_COST) end else slurm.log_info("elastic::: pricing %s unknow in %s index",pricing_table._source.pricing,ELK_INDEX_PRICING_COST) end else slurm.log_info("elastic::: acccount %s without pricing",account) end return cost_multiplier end -- get sum of cost of an account or an username - function elastic.get_sum_cost(index, term, value, estimate) + function elastic.get_sum_cost(term, value, estimate) -- assert(term == "account" or term == "username", slurm.log_info("term must be 'account' or 'username'")) - date_min = f.format_date(now.year, now.month,1) - date_max = f.format_date(now.year, now.month+1,1) - request_data = f.sprintf(post_data, date_min, date_max, term, value, FREE_PARTS) + if (estimate) then + request_data = f.sprintf(post_data_queued, term, value, FREE_PARTS) + index = ELK_INDEX_SLURM_QUEUED + else + date_min = f.format_date(now.year, now.month,1) + date_max = f.format_date(now.year, now.month+1,1) + request_data = f.sprintf(post_data_completed, date_min, date_max, term, value, FREE_PARTS) + index = ELK_INDEX_SLURM_COMPLETED + end url = f.sprintf("%s/%s/_search", ELK_URL, index) local temp = f.http_get(url, http_headers, request_data) local ret = tonumber(temp.aggregations.my_agg.value) or 0 if (estimate) then slurm.log_user("In addition, you will consume up to %.2f CHF with your %s %s (based on queued jobs)",ret,term,value) else slurm.log_user("You have consumed %.2f CHF with your %s %s",ret,term,value) end return ret end -- get capping of an account or an username function elastic.get_capping(term, value) -- assert(term == "account" or term == "username", slurm.log_info("term must be 'account' or 'username'")) url = f.sprintf("%s/%s/_doc/%s_%s", ELK_URL, ELK_INDEX_CAPPING, term, value) value_table = f.http_get(url, http_headers) if value_table then ret = value_table._source.value end return (ret or 0) end -- check capping of an account or an username function elastic.check_capping(term, value, estimate) local ret = false local capping = elastic.get_capping(term, value) - local sum_cost_c = elastic.get_sum_cost(ELK_INDEX_SLURM_COMP, term, value, false) - local sum_cost_s = elastic.get_sum_cost(ELK_INDEX_SLURM_SQUEUE, term, value, true) + local sum_cost_c = elastic.get_sum_cost(term, value, false) + local sum_cost_s = elastic.get_sum_cost(term, value, true) local sum_cost = sum_cost_c + sum_cost_s slurm.log_info("elastic::: %s %s have consumed %.2f / %.2f", term, value, sum_cost_c, sum_cost_s) if capping == 0 then return false end slurm.log_info("elastic::: %s %s has a capping of %s and have consumed %.2f", term, value,capping, sum_cost) if (sum_cost > capping) then slurm.log_user("You have reached the limit (%d CHF) of your monthly consumption for your %s %s",capping, term, value) ret = true elseif ((sum_cost+estimate) > capping) then slurm.log_user("You will reach the limit (%d CHF) of your monthly consumption for your %s %s by submitting this job",capping, term, value) ret = true end return ret end + +return elastic