diff --git a/INSTALL.md b/INSTALL.md index 321fa3d..8f78da7 100644 --- a/INSTALL.md +++ b/INSTALL.md @@ -1,38 +1,44 @@ # Docker Build the image : ``` docker build -t sausage-app . ``` +To add a cacert file to verify your ES cluster API : + +``` +docker build -t sausage-app . --build-arg caurl= +``` + # Singularity Build a docker image and then build a singularity image with it : ``` docker build -t sausage-app . singularity build -F sausage-app.sif docker-daemon://sausage-app ``` OR Build a docker image, save it and build a singularity image with the archive : ``` docker build -t sausage-app . docker save sausage-app -o sausage-app.tar singularity build -F sausage-app.sif docker-archive://sausage-app.tar ``` Note: usefull if you do not have docker and singularity on the same system. # RPM TODO # Python ``` pip3 install . mkir /etc/sausage cp src/etc/* /etc/sausage ``` diff --git a/src/sausage/appargs.py b/src/sausage/appargs.py index 74a8610..3dbfdcf 100644 --- a/src/sausage/appargs.py +++ b/src/sausage/appargs.py @@ -1,170 +1,173 @@ """ Arguments parsing module """ import argparse import getpass import sys import grp from os import getgroups from sausage.functions import valid_date, valid_period from sausage.readconf import ReadConf -ReadConf() class AppArgs(object): def __init__(self, cost=False): self.response = {} self.parser = argparse.ArgumentParser( prog="Sausage", description="SCITAS Account Usage.", formatter_class=argparse.ArgumentDefaultsHelpFormatter, ) self.add_default_args() if cost: self.add_args_cost() else: self.add_args() AppArgs.verbose = self.args.verbose def add_default_args(self): self.parser.add_argument( "-v", "--verbose", help="Verbose", action="store_true", ) return def add_args(self): self.parser.add_argument( "-u", "--user", help="If not provided whoami is considered" ) self.parser.add_argument( "-a", "--all", help="all users from an account are printed", action="store_true", ) self.parser.add_argument( "-A", "--account", help="Prints account consumption per cluster" ) self.parser.add_argument( "-s", "--start", help="Start date - format YYYY-MM-DD", type=valid_date ) self.parser.add_argument( "-e", "--end", help="End date - format YYYY-MM-DD", type=valid_date ) self.parser.add_argument( "-c", "--carbon", help="Prints the carbon footprint per cluster", action="store_true", ) self.parser.add_argument( "-b", "--billing", help="Displays the billing period - format YYYY-MM or YYYY", type=valid_period, ) self.parser.add_argument( "-x", "--csv", help="Print result in csv style", action="store_true", default=False, ) self.args = self.parser.parse_args() if self.args.billing: listofgroups = [grp.getgrgid(g).gr_name for g in getgroups()] if ReadConf.billinggrp not in listofgroups: self.parser.error( "--billing is only available for users in " + ReadConf.billinggrp + " group" ) if ( self.args.user or self.args.account or self.args.all or self.args.start or self.args.end or self.args.carbon ): self.parser.error( "--billing is not compatible with any other option") if self.args.start and self.args.end is None: self.parser.error("range requires both dates (--start and --end)") if self.args.end: if self.args.start is None: self.parser.error( "range requires both dates (--start and --end)") if self.args.end < self.args.start: self.parser.error("start date must be earlier than end date") if self.args.all and self.args.account is None: self.parser.error( "the option --all requires a valid account (--all and --account)" ) if self.args.all and self.args.user: self.parser.error( "--all option is not compatible with --user option") if len(sys.argv) <= 1 or ( ( ( all(v is not None for v in [ self.args.start, self.args.end]) or any(v is not None for v in [self.args.carbon, self.args.verbose]) ) and all(v is None for v in [self.args.account, self.args.user]) ) ): self.args.user = getpass.getuser() AppArgs.csv = self.args.csv AppArgs.response = { "user": self.args.user, "account": self.args.account, "all": self.args.all, "start": self.args.start, "end": self.args.end, "carbon": self.args.carbon, "billing": self.args.billing, } def add_args_cost(self): self.parser.add_argument( "-N", "--nodes", help="number of (min) nodes on which to run", default=1 ) self.parser.add_argument( "-n", "--nbtasks", help="number of tasks to run", default=1 ) self.parser.add_argument( "-t", "--time", help="time limit in minutes", default=1 ) self.parser.add_argument( "-p", "--partition", help="partition", default="parallel" ) self.parser.add_argument( "-g", "--gres", help="required generic resources per node", default="gpu:0" ) self.parser.add_argument( "-a", "--array", help="job array index values") self.parser.add_argument( "--ntasks-per-node", help="number of tasks to invoke on each node", ) self.parser.add_argument( "-c", "--cpus-per-task", help="number of cpus required per task", default=1 ) + self.parser.add_argument( + "--capping", + help="check if estimation cost surpass the limit for the account", + action="store_true", + ) self.args = self.parser.parse_args() - print(self.args) AppArgs.response = self.args return diff --git a/src/sausage/cost_estimation.py b/src/sausage/cost_estimation.py index ce76f5a..1095143 100644 --- a/src/sausage/cost_estimation.py +++ b/src/sausage/cost_estimation.py @@ -1,99 +1,120 @@ ''' Estimation cost module ''' -from sys import version_info +from sys import version_info, exit import re from sausage.readconf import ReadConf from sausage.appargs import AppArgs # from sausage.esclient import ESClient from sausage.debug import debug from sausage.logging import setup_logging +ReadConf(cost=True) AppArgs(cost=True) -ReadConf() @debug(timer=AppArgs.verbose) @debug(prof=AppArgs.verbose) -def estimation(): +def estimation() -> None: assert version_info >= (3, 8) logger = setup_logging(ReadConf.debug, AppArgs.verbose) logger.debug("START") # client = ESClient() # print(client.get_estimation()) - args = vars(AppArgs.response) - args.pop("verbose", None) - print(_estimation(**args)) + if AppArgs.response.capping: + est = _estimation(**vars(AppArgs.response)) + exit(0) if get_capping(est,"plop") else exit(42) + + else: + print(_estimation(**vars(AppArgs.response))) logger.debug("END") +def str2int(val: str) -> int: + return int(float(val)) -def is_zero(val): +def is_zero(val: int) -> bool: ''' Check default zero in a slurmctld context ''' infisixteen = 2 ** 16 - 2 infithirtytwo = 2 ** 32 - 2 - ret = True - if val == infithirtytwo or val == infisixteen or val is None: - ret = False + ret = False + if val == infithirtytwo or val == infisixteen or val is None or val == 0: + ret = True + print(f'val={val}, ret={ret}') return ret +def get_capping(est: int, account: str) -> bool: + return False + +def get_cost_multi(account: str, partition: str, cluster: str)-> float: + return 1 +""" +if used with job_submit the arguments order has to +the same has in lua script +""" def _estimation( - partition, - array, - nodes=0, - nbtasks=0, - time=0, - ntasks_per_node=0, - gres=0, - cpus_per_task=0, -): + account="root", + partition="default", + array_inx="1", + gres="", + min_cpus="1", + min_nodes="1", + ntasks_per_node="0", + time_limit="0", + capping="False" +) -> int: + # for key, value in kwargs.items(): + # print(f"{key} == {value}") + # DEFAULT (TODO) defaultwtime = 1 corespernode = 20 gpuspernode = 2 est = 0 - cgpu_cost = 1 + cgpu_cost = get_cost_multi(account, partition, ReadConf.cluster) # time - if is_zero(time): + time_limit = str2int(time_limit) + print(f'time is zero {is_zero(time_limit)} {time_limit}') + if is_zero(time_limit): timeinsec = defaultwtime else: - timeinsec = int(time) * 60 + timeinsec = time_limit * 60 # cpu - if is_zero(cpus_per_task): + min_cpus = str2int(min_cpus) + if is_zero(min_cpus): cpus = corespernode else: - cpus = cpus_per_task + cpus = min_cpus # (g|c)putres gputmp = re.match(r'gpu:(\d+)', gres) gpus = int(gputmp.group(1)) if gputmp else 0 tres = cpus if gpus == 0 else gpus # nodes - nodes = int(nodes) + nodes = str2int(min_nodes) if is_zero(nodes): if gpus != 0: - nodes = cpus / corespernode - elif gpus == 0: nodes = gpus / gpuspernode else: - nodes = 0 + nodes = cpus / corespernode # array - if array is None: - arrayindex = 1 + if array_inx is None: + array = 1 else: - arrayindex = array.split("-") if array else [0, 0] - arrayindex = abs(int(arrayindex[1]) - int(arrayindex[0])) + 1 + arrayindex = array_inx.split("-") if array_inx else [0, 0] + array = abs(int(arrayindex[1]) - int(arrayindex[0])) + 1 - est = nodes * tres * cgpu_cost * timeinsec * arrayindex + print(f'n{nodes} t{tres} c{cgpu_cost} t{timeinsec} a{array}') + est = nodes * tres * cgpu_cost * timeinsec * array return est diff --git a/src/sausage/esclient.py b/src/sausage/esclient.py index 3a40512..cbda213 100644 --- a/src/sausage/esclient.py +++ b/src/sausage/esclient.py @@ -1,149 +1,149 @@ ''' ESClient module ''' import warnings import logging from elasticsearch import Elasticsearch, exceptions as es_e from elasticsearch_dsl import Search from dateutil.relativedelta import relativedelta from datetime import date, datetime from sausage.esquery import ESQuery from sausage.appargs import AppArgs from sausage.readconf import ReadConf class ESClient(): def __init__(self): self.log = logging.getLogger(f"sausage.{self.__class__.__name__}") self.args = AppArgs.response self.item = None self.search_h = Elasticsearch( hosts=ReadConf.hosts, timeout=10, api_key=ReadConf.apikey ) self.search = Search( using=self.search_h, index=ReadConf.indexes, ) self.total = 0 self.result = None self.type = None self.entity = None self.query = None def set_result(self): self.set_query() self.search = self.search.query(self.query.query) self.search.aggs.bucket(self.entity, self.query.aggs) self.total = {"time": 0, "count": 0, "carbon": 0, "money": 0} ''' WARNING : To catch a warning when anonymous user as no privilege to check health of ES Cluster. We will have to check it with another way. ''' with warnings.catch_warnings(record=True) as w: try: resp = self.search[:0].execute() - except (es_e.ConnectionError, es_e.AuthenticationException) as excpt: + except (es_e.ConnectionError, es_e.AuthenticationException) as expt: self.log.error("Unable to connect : %s", excpt) return if resp.hits.total.value == 0: self.result = None return result = { self.entity: self.item, "type": self.type, "start": self.query.start, "end": self.query.end, } # self.total = dict(result, **self.total) result["metrics"] = [] # if self.item: # result["item"] = self.item if self.entity == "user" or ( self.entity == "account" and self.args["all"]): sub_entity = "account" if self.entity == "user" else "user" sub_res = getattr(resp.aggregations, self.entity)[0] sub_res = getattr(sub_res, sub_entity) for tmp in sub_res: result["metrics"].extend( self.get_metrics(tmp.metrics, "cluster", sub_entity, tmp.key) ) # sort by sub group (user or account) result["metrics"] = sorted( result["metrics"], key=lambda d: d[sub_entity], ) else: sub_res = getattr(resp.aggregations, self.entity) if self.type == "acctbilling": entity = "account" else: entity = "cluster" result["metrics"].extend(self.get_metrics( sub_res, entity, None, self.item)) # for display purpose we change the end date # to 'end - 1 day' tmp_date = datetime.strptime(result["end"], '%Y-%m-%d') tmp_date = tmp_date + relativedelta(days=-1) result["end"] = tmp_date.strftime("%Y-%m-%d") self.log.debug("Result : %s", result) self.result = result def get_metrics(self, metrics, entity, sub_entity=None, sub_item=None): result = [] for metric in metrics: temp = { entity: metric.key, "count": metric.doc_count, "money": round(metric.money.value, 2), "carbon": round(metric.carbon.value, 2), "time": round(metric.time.value, 2), } for k in ["count", "money", "carbon", "time"]: self.total[k] = round(temp[k] + self.total[k], 2) if sub_entity: temp[sub_entity] = sub_item result.append(temp) result = sorted(result, key=lambda d: d[entity]) return result def set_query(self): start = self.args["start"] end = self.args["end"] if self.args["billing"] is not None: self.type = "acctbilling" self.entity = self.type start = self.args["billing"][0] end = self.args["billing"][1] elif self.args["user"] is not None and self.args["account"] is not None: self.type = "user" self.entity = "user_account" self.item = [self.args["user"], self.args["account"]] elif self.args["user"] is not None: self.entity = "user" self.item = self.args["user"] self.type = "user" elif self.args["account"] is not None: self.entity = "account" self.item = self.args["account"] self.type = "account_all" if self.args["all"] else "account" else: print(self.args) # TODO raise "NotImplemented" self.query = ESQuery( entity=self.entity, start=start, end=end, item=self.item, all_sub_item=self.args["all"], ) if self.entity == "user_account": self.entity = "user" self.item = self.item[0] diff --git a/src/sausage/readconf.py b/src/sausage/readconf.py index a2eb048..75ba8d4 100644 --- a/src/sausage/readconf.py +++ b/src/sausage/readconf.py @@ -1,27 +1,42 @@ ''' Configuration file module ''' import configparser +import errno +import os +conf_path = "/etc/sausage/sausage.cfg" + +class SectionMissing(Exception): + def __init__(self, section): + self.message = f"Configuration file have to contain {section} section" + def __str__(self): + return self.message class ReadConf(object): - def __init__(self): + def __init__(self, cost=False) -> None: self.options = {} self.cfg_parser = configparser.ConfigParser() - self.cfg_parser.read("/etc/sausage/sausage.cfg") - ReadConf.cluster = self.cfg_parser["default"].get("cluster") - ReadConf.hosts = self.cfg_parser["server"].get("urls").split(",") + if len(self.cfg_parser.read(conf_path)) != 1: + raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), conf_path) + if "default" not in self.cfg_parser: + raise(SectionMissing("default")) + ReadConf.cluster = self.cfg_parser["default"].get("cluster", "cluster") ReadConf.clusters = [ i.replace(" ", "") - for i in self.cfg_parser["default"].get("clusters").split(",") + for i in self.cfg_parser["default"].get("clusters","").split(",") ] - ReadConf.indexes = self.cfg_parser["server"].get("index") - ReadConf.fields = dict(self.cfg_parser["fields"]) ReadConf.debug = self.cfg_parser["default"].getboolean("log_devel") - ReadConf.apikey = tuple( - map(str, self.cfg_parser["server"].get("apikey").split(",")) - ) - if len(ReadConf.apikey) != 2: - ReadConf.apikey = tuple((None, None)) - ReadConf.billinggrp = self.cfg_parser["default"].get( - "billing_group", "root") + if not cost: + ReadConf.hosts = self.cfg_parser["server"].get("urls","").split(",") + ReadConf.indexes = self.cfg_parser["server"].get("index") + + ReadConf.fields = dict(self.cfg_parser["fields"]) + ReadConf.apikey = tuple( + map(str, self.cfg_parser["server"].get("apikey").split(",")) + ) + if len(ReadConf.apikey) != 2: + ReadConf.apikey = tuple((None, None)) + + ReadConf.billinggrp = self.cfg_parser["default"].get( + "billing_group", "root")