diff --git a/job-submit.spec b/job-submit.spec index 75ec47c..75d9ee0 100644 --- a/job-submit.spec +++ b/job-submit.spec @@ -1,51 +1,51 @@ %define fxdir /usr/share/lua/5.3/job_submit_fx %define name scitas-job-submit %define version 1.0.3 %define release 1%{?dist} Name: %{name} Version: %{version} Release: %{release} License: GPLv3 Summary: Collection of functions for job_submit.lua script URL: https://c4science.ch/source/scitas-job-submit Source0: %{name}-%{version}.tar.gz BuildRoot: %(mktemp -ud %{_tmppath}/%{name}-%{version}-%{release}-XXXXXX) -Requires: lua>=5.3 , lua-dbi +Requires: lua >= 5.3 , lua-http, lua-json %description Collection of functions developed in LUA, integrated into the slurm job_submit.lua script. %prep %setup -q %build %install install -m 755 -d %{buildroot}%{fxdir} install -m 644 job_submit_fx/*.lua %{buildroot}%{fxdir}/ install -m 755 -d %{buildroot}/etc/slurm/job_submit install -m 644 job_submit/* %{buildroot}/etc/slurm/job_submit install -m 644 job_submit.lua %{buildroot}/etc/slurm/job_submit.lua %files %config(noreplace) /etc/slurm/job_submit/* %{fxdir} /etc/slurm/job_submit.lua %clean rm -rf %{buildroot} %changelog * Wed May 25 2022 Nicolas COUDENE - 1.0.3 - Add new function to check job description (array format for starter) * Mon Jan 10 2022 Antonio J. RUSSO - 1.0.2 - Add new function to force users to specify an account * Wed Sep 15 2021 Antonio J. RUSSO - 1.0.1 - Build and debug partitions become free * Mon Jul 26 2021 Antonio J. RUSSO - 1.0.0 - Initial RPM release diff --git a/job_submit.lua b/job_submit.lua index b9fce5a..14d7c6b 100644 --- a/job_submit.lua +++ b/job_submit.lua @@ -1,92 +1,97 @@ --########################################################################-- -- -- Load billing_cost_estimate parameters (can be overriden in rates_file) -- --########################################################################-- CONF_DIR = '/etc/slurm/job_submit' CONF_FILES = {'job_submit.conf', 'cluster.conf', 'rates.conf'} for index, file in ipairs(CONF_FILES) do filetoload = CONF_DIR.."/"..file file_fh = io.open(filetoload, "r") if file_fh == nil then slurm.log_info("slurm_job_modify: No readable %s found!", filetoload) else io.close(file_fh) dofile(filetoload) end end --- require fonctions verbose_mode, track_gres, scitas_cost, partition_setting --- if FX_VERBOSE then require('job_submit_fx/verbose_mode') end if FX_TRACK_GRES then require('job_submit_fx/track_gres') end if FX_SCITAS_COST then require('job_submit_fx/scitas_cost') end if FX_PARTITION then require('job_submit_fx/partition_setting') end if FX_FORCE_ACCOUNT then require('job_submit_fx/force_account') end require('job_submit_fx/validate_job') function slurm_job_submit(job_desc, part_list, submit_uid) --- Verbose mode --- --- To enable verbose mode, you must set the verbose_mode variable to 1 if FX_VERBOSE then verbose_mode(job_desc) end + if FX_PARTITION then local partition = partition_setting (job_desc, submit_uid, INFINITE, SEVENTY, PARALLEL_PARTITION, SERIAL_PARTITION) if job_desc.partition ~= partition then job_desc.partition = partition - slurm.log_info("slurm_job_modify: for user %u , setting partition: %s", + slurm.log_info("slurm_job_submit: for user %u , setting partition: %s", submit_uid, partition) end end if FX_TRACK_GRES then status = track_gres(job_desc, submit_uid) if status ~= 0 then return status end end - if FX_SCITAS_COST then - scitas_cost(job_desc, submit_uid) - end - if FX_FORCE_ACCOUNT then - force_account(job_desc) + local ret_force_account = force_account(job_desc) + if ret_force_account ~= slurm.SUCCESS then return ret_force_account end end - validate_job(job_desc) + -- Job validation + local ret_validate = validate_job(job_desc) + if ret_validate ~= slurm.SUCCESS then return ret_validate end + + if FX_SCITAS_COST then + local ret_cost = scitas_cost(job_desc, submit_uid) + if ret_cost ~= slurm.SUCCESS then return ret_cost end + end return slurm.SUCCESS end -- The other required function function slurm_job_modify(job_desc, job_rec, part_list, modify_uid) return slurm.SUCCESS end slurm.log_info("job_submit_plugin initialized") return slurm.SUCCESS diff --git a/job_submit_fx/billing_cost_estimate.lua b/job_submit_fx/billing_cost_estimate.lua index 6a6690c..c3fcad7 100644 --- a/job_submit_fx/billing_cost_estimate.lua +++ b/job_submit_fx/billing_cost_estimate.lua @@ -1,159 +1,174 @@ -- Function billing_cost_estimate to display job cost -- function billing_cost_estimate (job_desc, submit_uid) -- Initializing local variables require('job_submit_fx/scitas_debug') + local el = require('job_submit_fx/elastic') local gputres = nil local gpu = nil local chf = nil local timeinsec = nil local nodetres = job_desc.min_nodes local cputres = job_desc.min_cpus local wtime = job_desc.time_limit - local partition = job_desc.partition + local partition = (job_desc.partition or DEFAULT_PARTITION) local ntaskpernode = job_desc.ntasks_per_node local costunit = "cpu" - local debugmode = scitas_debug(job_desc) - local infisixteen = 2^16 - 1 - local infithirtytwo = slurm.INFINITE - 1 + local debugmode = scitas_debug(job_desc.comment) + local noval16 = slurm.NO_VAL16 + local noval = slurm.NO_VAL local arrayindex = 1 local freeparts = {'build', 'debug'} local isfree = 0 + local account = (job_desc.account or job_desc.default_account) - DBI = require('DBI') - dbd, err = DBI.Connect('SQLite3', '/etc/slurm/job_submit/pricing.db') - assert(dbd, err) - dbd:autocommit(true) - account = (job_desc.account or job_desc.default_account) - statement, err = dbd:prepare( 'select account,cluster,value from data where account=? and cluster=?' ) - statement:execute(account, slurm.CLUSTER_NAME) - for row in statement:rows(true) do - slurm.log_info("estimation::: account = "..row['account']) - slurm.log_info("estimation::: cluster = "..row['cluster']) - slurm.log_info("estimation::: value = "..row['value']) - cost = row['value'] - end - - if partition == nil then - partition = DEFAULT_PARTITION - end + -- Cost multiplier + cost_m = el.get_cost_multiplier(account, slurm.CLUSTER_NAME) + slurm.log_info("estimation::: account = "..account) + slurm.log_info("estimation::: cluster = "..slurm.CLUSTER_NAME) + slurm.log_info("estimation::: cost multiplier = "..cost_m) -- Free partitions for i, fpname in ipairs(freeparts) do if partition == fpname then isfree = 1 end end if job_desc.array_inx ~= nil then ak, av = string.match(job_desc.array_inx, "(.*)%-(.*)") if tonumber(ak) ~= nil and tonumber(av) ~= nil then arrayindex = tonumber(av) - tonumber(ak) + 1 end end if job_desc.gres ~= nil then gputres = string.match(job_desc.gres, "gpu.[0-9]+") end -- Convert time in seconds - if wtime == nil or wtime == infithirtytwo or wtime == infisixteen then + if wtime == nil or wtime == noval or wtime == noval16 then timeinsec = DEFAULT_WTIME else timeinsec = wtime * 60 end -- Update cost unit if user request GPUs if gputres ~= nil then gk, gv = string.match(gputres, "(.*)%:(.*)") gpu=tonumber(gv) if gpu == nil or gpu < 1 then costunit = nil else costunit = "gpu" end end -- -- User does not define the number of nodes -- - if nodetres == infithirtytwo or nodetres == infisixteen or nodetres == nil + if nodetres == noval or nodetres == noval16 or nodetres == nil then if string.match(partition, SERIAL_PARTITION) then node = 1 else node = 0 end else node = nodetres end -- CPUs -- First case: user does not define the number of CPUs -- - if cputres == infithirtytwo or cputres == infisixteen or cputres == nil then + if cputres == noval or cputres == noval16 or cputres == nil then cpu = CORES_PER_NODE -- -- Second case: user defines the number of CPUs -- else cpu = cputres end -- -- Special cases -- if string.match(partition, PARALLEL_PARTITION) then cpu = CORES_PER_NODE if tonumber(cputres) >= tonumber(CORES_PER_NODE) and node == 0 then - if ntaskpernode == infithirtytwo or ntaskpernode == nil or ntaskpernode == infisixteen then + if ntaskpernode == noval or ntaskpernode == nil or ntaskpernode == noval16 then node = math.ceil(tonumber(cputres)/tonumber(CORES_PER_NODE)) else node = math.ceil(tonumber(cputres)/tonumber(ntaskpernode)) end end end -- -- Calculating price -- if isfree == 1 then chf = 0 else if costunit == "cpu" then - if cpu == infithirtytwo or node == infithirtytwo then - slurm.log_info("billing::: cannot determine the values to calculate the price") + if cpu == noval or node == noval then + slurm.log_info("estimation::: cannot determine the values to calculate the price") else - chf = cpu * node * cost * timeinsec * arrayindex + chf = cpu * node * cost_m * timeinsec * arrayindex end elseif costunit == "gpu" then if node == 0 then node = math.ceil(tonumber(gpu)/tonumber(GPUS_PER_NODE)) end - chf = gpu * node * cost * timeinsec * arrayindex + chf = gpu * node * cost_m * timeinsec * arrayindex else - slurm.log_info("billing::: cannot determine the unit type") + slurm.log_info("estimation::: cannot determine the unit type") end end -- -- Print the price -- -- if chf ~= nil then if debugmode == 1 then - slurm.log_user("billing::: cpu: "..(cpu or 'nil')) - slurm.log_user("billing::: gpu: "..(gpu or 'nil')) - slurm.log_user("billing::: node: "..(node or 'nil')) - slurm.log_user("billing::: timeinsec: "..(timeinsec or 'nil')) + slurm.log_user("estimation::: cpu: "..(cpu or 'nil')) + slurm.log_user("estimation::: gpu: "..(gpu or 'nil')) + slurm.log_user("estimation::: node: "..(node or 'nil')) + slurm.log_user("estimation::: timeinsec: "..(timeinsec or 'nil')) end slurm.log_user("The estimated cost of this job is CHF "..string.format("%.2f",chf)) - slurm.log_info("billing::: cost "..submit_uid.."|"..chf) + slurm.log_info("estimation::: cost "..submit_uid.."|"..chf) else - slurm.log_info("billing::: cannot calculate the price") + slurm.log_info("estimation::: cannot calculate the price") + end + + return (chf or 0) +end + + +-- Function billing_capping to enable blocking a job if a limit is reached -- +function billing_capping (job_desc, submit_uid) + + local ret = slurm.SUCCESS + local enforce = (FX_ENFORCE_CAPPING or false) + local el = require('job_submit_fx/elastic') + estimate = billing_cost_estimate(job_desc, submit_uid) + if estimate then + local account = (job_desc.account or job_desc.default_account) + local username = job_desc.user_name + local username_c = el.check_capping("username",username, estimate) + local account_c = el.check_capping("account",account, estimate) + if (account_c or username_c) and enforce then + ret = slurm.ERROR + end end + + return ret + end + diff --git a/job_submit_fx/elastic.lua b/job_submit_fx/elastic.lua new file mode 100644 index 0000000..6dfae97 --- /dev/null +++ b/job_submit_fx/elastic.lua @@ -0,0 +1,126 @@ +local elastic = {} + +local f = require("job_submit_fx/functions") + +-- now +local now = os.date('*t') + +-- http headers +local http_headers = { + ["authorization"] = "ApiKey "..ELK_API_KEY +} + +local function get_http_headers () + return {["authorization"] = "ApiKey "..ELK_API_KEY} +end + +-- search query +-- sum of cost of all job of the current month from an item (account or username) +local post_data = [[ +{ + "query":{ + "bool": { + "must": [ + { + "range": { + "@end": { + "gte": "%s", + "lt": "%s" + + } + } + }, + { + "term": { + "%s.keyword": "%s" + } + } + ], + "must_not": [ + { + "term": { + "state.keyword": "NODE_FAIL" + } + } + ] + } + }, + "aggs": { + "my_agg": { + "sum": { + "field": "cost_chf" + } + } + }, + "from": 0, + "size": 0 + +} +]] + +-- get cost multiplier +function elastic.get_cost_multiplier(account, cluster) + local cost_multiplier = 1 + url = f.sprintf("%s/%s/_doc/%s", ELK_URL, ELK_INDEX_ACCOUNT_PRICING, account) + pricing_table = f.http_get(url, http_headers) + if pricing_table then + url = f.sprintf("%s/%s/_doc/%s", ELK_URL, ELK_INDEX_PRICING_COST, pricing_table._source.pricing) + cost_multiplier_table = f.http_get(url, http_headers) + if cost_multiplier_table then + if cost_multiplier_table._source.clusters[cluster] then + cost_multiplier = cost_multiplier_table._source.clusters[cluster] + else + slurm.log_info("elastic::: pricing %s without cluster %s entry in %s index",pricing_table._source.pricing, cluster, ELK_INDEX_PRICING_COST) + end + else + slurm.log_info("elastic::: pricing %s unknow in %s index",pricing_table._source.pricing,ELK_INDEX_PRICING_COST) + end + else + slurm.log_info("elastic::: acccount %s without pricing",account) + end + return cost_multiplier +end + +-- get sum of cost of an account or an username + function elastic.get_sum_cost(term, value) + -- assert(term == "account" or term == "username", slurm.log_info("term must be 'account' or 'username'")) + date_min = f.format_date(now.year, now.month,1) + date_max = f.format_date(now.year, now.month+1,1) + request_data = f.sprintf(post_data, date_min, date_max, term, value) + url = f.sprintf("%s/%s/_search", ELK_URL, ELK_INDEX_SLURM) + local temp = f.http_get(url, http_headers, request_data) + local ret = tonumber(temp.aggregations.my_agg.value) or 0 + slurm.log_user("You have consummed %.2f CHF with your %s %s",ret,term,value) + return ret + +end + +-- get capping of an account or an username +function elastic.get_capping(term, value) + -- assert(term == "account" or term == "username", slurm.log_info("term must be 'account' or 'username'")) + url = f.sprintf("%s/%s/_doc/%s_%s", ELK_URL, ELK_INDEX_CAPPING, term, value) + value_table = f.http_get(url, http_headers) + if value_table then + ret = value_table._source.value + end + return (ret or 0) +end + +-- check capping of an account or an username +function elastic.check_capping(term, value, estimate) + local ret=false + local capping = elastic.get_capping(term, value) + local sum_cost = elastic.get_sum_cost(term, value) + if capping == 0 then return false end + slurm.log_info("elastic::: %s %s has a capping of %s and have consumed %.2f", term, value,capping, sum_cost) + if (sum_cost > capping) then + slurm.log_user("You have reached the limit (%d CHF) of your monthly consumption for your %s %s",capping, term, value) + ret = true + elseif ((sum_cost+estimate) > capping) then + slurm.log_user("You will reach the limit (%d CHF) of your monthly consumption for your %s %s by submitting this job",capping, term, value) + ret = true + end + return ret +end + +return elastic diff --git a/job_submit_fx/force_account.lua b/job_submit_fx/force_account.lua index 0d76d63..7a9ec40 100644 --- a/job_submit_fx/force_account.lua +++ b/job_submit_fx/force_account.lua @@ -1,7 +1,8 @@ function force_account(job_desc) + ret = slurm.SUCCESS if job_desc.account == nil then slurm.log_info("User "..job_desc.user_id.." did not specify a valid account.") slurm.log_user("WARNING: You must specify a valid account") --- return slurm.ESLURM_INVALID_ACCOUNT + ret = 2045 -- ESLURM_INVALID_ACCOUNT end end diff --git a/job_submit_fx/functions.lua b/job_submit_fx/functions.lua new file mode 100644 index 0000000..4584dc1 --- /dev/null +++ b/job_submit_fx/functions.lua @@ -0,0 +1,124 @@ +local f = {} + +-- local curl = require("cURL") +local json = require("json") +local http_h = require("http.headers") +local http_r = require("http.request") + +-- printf equivalent +function f.printf(s,...) + return io.write(s:format(...)) +end + +-- sprintf equivalent +function f.sprintf(s,...) + return s:format(...) +end + +-- format date with %Y-%m-%dT00:00:00 format +function f.format_date(year,month,day) + return os.date('%Y-%m-%dT%H:%M:%S',os.time({year=year,month=month,day=day,hour=0,min=0,sec=0})) +end + +-- print table +function f.dump_table(t,size) + if not t then return print(nil) end + if not size then size = 0 end + for key, value in pairs(t) do + if size ~= 0 then + for i=1,size do io.write('=') end + io.write(' ') + end + if type(value) == 'table' then + print(key) + dump_table(value,size+2) + else + print(key, value) + end + end +end + +-- http get +function f.http_get(url, headers, data) + local ret = nil + local req = http_r.new_from_uri(url) + for name, value in pairs(headers) do + req.headers:append(name,value) + end + req.headers:upsert("content-type", "application/json") + if data ~= nil then + req.headers:upsert(":method", "GET") + req:set_body(data) + end + -- slurm.log_user("url : "..url) + local headers_r, stream = assert(req:go()) + local body = assert(stream:get_body_as_string()) + if headers_r:get ":status" == "200" then + ret = json.decode(body) + end + return ret +end + +function f.log_user_and_debug(fmt, ...) +--[[Implicit definition of arg was removed in Lua 5.2]]-- + local arg = {...} + slurm.log_user(fmt, unpack(arg)) + slurm.log_debug(fmt, unpack(arg)) +end + + + +-- -- collect function for curl +-- function collect(chunk) +-- if chunk ~= nil then +-- res = res .. chunk +-- end +-- return true +-- end + +-- -- curl get +-- function f.curl_get(url, headers) +-- res = '' +-- ret = nil +-- curl.easy{ +-- url = url, +-- httpheader = headers, +-- writefunction = collect +-- --writefunction = io.stderr -- use io.stderr:write() +-- } +-- :perform() +-- :close() +-- if res ~= nil then +-- decode = json.decode(res) +-- end +-- if decode ~= nil then +-- if decode.found then +-- ret = decode +-- end +-- end +-- return ret +-- end + +-- -- curl post +-- function f.curl_post(url, headers, data) +-- res = "" +-- ret = nil +-- curl.easy{ +-- url = url, +-- post = true, +-- httpheader = headers, +-- postfields = data, +-- writefunction = collect +-- } +-- :perform() +-- :close() +-- if res ~= nil then +-- decode = json.decode(res) +-- end +-- if not decode.time_out and decode ~= nil or decode.status == 200 then +-- ret = decode +-- end +-- return ret +-- end + +return f diff --git a/job_submit_fx/scitas_cost.lua b/job_submit_fx/scitas_cost.lua index 81402de..95c8a80 100644 --- a/job_submit_fx/scitas_cost.lua +++ b/job_submit_fx/scitas_cost.lua @@ -1,12 +1,15 @@ function scitas_cost (job_desc, submit_uid) if job_desc.comment ~= nil then - if string.match(job_desc.comment, "scitas.cost") then + if string.match(job_desc.comment, "scitas.capping") then require('job_submit_fx/billing_cost_estimate') - billing_cost_estimate(job_desc, submit_uid) + return billing_capping(job_desc, submit_uid) + elseif string.match(job_desc.comment, "scitas.cost") then + require('job_submit_fx/billing_cost_estimate') + return billing_cost_estimate(job_desc, submit_uid) else - return 0 + return slurm.SUCCESS end else - return 0 + return slurm.SUCCESS end end diff --git a/job_submit_fx/scitas_debug.lua b/job_submit_fx/scitas_debug.lua index 7081d48..e5b8e77 100644 --- a/job_submit_fx/scitas_debug.lua +++ b/job_submit_fx/scitas_debug.lua @@ -1,10 +1,10 @@ -function scitas_debug (job_desc) +function scitas_debug (comment) slurm.log_info("billing::: Scitas Debug") - if job_desc.comment ~= nil then - if string.match(job_desc.comment, "scitas.debug") then + if comment ~= nil then + if string.match(comment, "scitas.debug") then return 1 else return 0 end end end diff --git a/job_submit_fx/validate_job.lua b/job_submit_fx/validate_job.lua index 6e5bd5a..0eb284e 100644 --- a/job_submit_fx/validate_job.lua +++ b/job_submit_fx/validate_job.lua @@ -1,12 +1,14 @@ -- function to validate job options function validate_job (job_desc) + local ret = slurm.SUCCESS if job_desc.array_inx ~= nil then ak, av = string.match(job_desc.array_inx, "^(%d+)-(%d+)$") if ak == nil or av == nil then slurm.log_user('--array/-a option have to be a range of values with a "-" separator, ex.: 0-14') slurm.log_error("slurm_job_submit: invalid array index: %s",job_desc.array_inx) -- 2081 == ESLURM_INVALID_ARRAY - return 2081 + ret = 2081 end end + return ret end