diff --git a/job_submit_fx/elastic.lua b/job_submit_fx/elastic.lua index a7c9a5a..f97aadd 100644 --- a/job_submit_fx/elastic.lua +++ b/job_submit_fx/elastic.lua @@ -1,179 +1,179 @@ local elastic = {} -- now local now = os.date('*t') -- http headers local http_headers = {} if ELK_API_KEY then local http_headers = { ["authorization"] = "ApiKey "..ELK_API_KEY } end -- search queries -- sum of cost of all job of the current month from an item (account or username) local post_data_completed = [[ { "query":{ "bool": { "must": [ { "range": { "@end": { "gte": "%s", "lt": "%s" } } }, { "term": { "%s.keyword": "%s" } } ], "must_not": [ { "term": { "state.keyword": "NODE_FAIL" } }, { "terms": { "partition.keyword": %s } } ] } }, "aggs": { "my_agg": { "sum": { "field": "cost_chf" } } }, "from": 0, "size": 0 } ]] -- sum of cost of all job in squeue index in state RUNNING or PENDING from an item (account or username) local post_data_queued = [[ { "query":{ "bool": { "must": [ { "terms": { "state.keyword" : ["RUNNING", "PENDING"] } }, { "term": { "%s.keyword": "%s" } } ], "must_not": [ { "terms": { "partition.keyword": %s } } ] } }, "aggs": { "my_agg": { "sum": { "field": "cost_chf" } } }, "from": 0, "size": 0 } ]] -- get cost multiplier function elastic.get_cost_multiplier(account, cluster) - local cost_multiplier = 1 + local cost_multiplier = 0 url = f.sprintf("%s/%s/_doc/%s", ELK_URL, ELK_INDEX_ACCOUNT_PRICING, account) pricing_table = f.http_get(url, http_headers) if pricing_table then url = f.sprintf("%s/%s/_doc/%s", ELK_URL, ELK_INDEX_PRICING_COST, pricing_table._source.pricing) cost_multiplier_table = f.http_get(url, http_headers) if cost_multiplier_table then if cost_multiplier_table._source.clusters[cluster] then cost_multiplier = cost_multiplier_table._source.clusters[cluster] else slurm.log_info("elastic::: pricing %s without cluster %s entry in %s index",pricing_table._source.pricing, cluster, ELK_INDEX_PRICING_COST) end else slurm.log_info("elastic::: pricing %s unknow in %s index",pricing_table._source.pricing,ELK_INDEX_PRICING_COST) end else slurm.log_info("elastic::: acccount %s without pricing",account) end return cost_multiplier end -- get sum of cost of an account or an username function elastic.get_sum_cost(term, value, estimate) -- assert(term == "account" or term == "username", slurm.log_info("term must be 'account' or 'username'")) if (estimate) then request_data = f.sprintf(post_data_queued, term, value, FREE_PARTS) index = ELK_INDEX_SLURM_QUEUED else date_min = f.format_date(now.year, now.month,1) date_max = f.format_date(now.year, now.month+1,1) request_data = f.sprintf(post_data_completed, date_min, date_max, term, value, FREE_PARTS) index = ELK_INDEX_SLURM_COMPLETED end url = f.sprintf("%s/%s/_search", ELK_URL, index) local temp = f.http_get(url, http_headers, request_data) local ret = tonumber(temp.aggregations.my_agg.value) or 0 if (estimate) then slurm.log_user("In addition, you will consume up to %.2f CHF with your %s %s (based on queued jobs)",ret,term,value) else slurm.log_user("You have consumed %.2f CHF with your %s %s",ret,term,value) end return ret end -- get capping of an account or an username function elastic.get_capping(term, value) -- assert(term == "account" or term == "username", slurm.log_info("term must be 'account' or 'username'")) url = f.sprintf("%s/%s/_doc/%s_%s", ELK_URL, ELK_INDEX_CAPPING, term, value) value_table = f.http_get(url, http_headers) if value_table then ret = value_table._source.value end return (ret or 0) end -- check capping of an account or an username function elastic.check_capping(term, value, estimate) local ret = false local capping = elastic.get_capping(term, value) local sum_cost_c = elastic.get_sum_cost(term, value, false) local sum_cost_s = elastic.get_sum_cost(term, value, true) local sum_cost = sum_cost_c + sum_cost_s slurm.log_info("elastic::: %s %s have consumed %.2f / %.2f", term, value, sum_cost_c, sum_cost_s) if capping == 0 then return false end slurm.log_info("elastic::: %s %s has a capping of %s and have consumed %.2f", term, value,capping, sum_cost) if (sum_cost > capping) then slurm.log_user("You have reached the limit (%d CHF) of your monthly consumption for your %s %s",capping, term, value) ret = true elseif ((sum_cost+estimate) > capping) then slurm.log_user("You will reach the limit (%d CHF) of your monthly consumption for your %s %s by submitting this job",capping, term, value) ret = true end return ret end return elastic