diff --git a/.gitconfig b/.gitconfig index 2401c25..0d7a2d4 100644 --- a/.gitconfig +++ b/.gitconfig @@ -1,3 +1,3 @@ [filter "token"] - smudge = ../.clean-api-key -smudge - clean = ../.clean-api-key -clean + smudge = ./.clean-api-key -smudge + clean = ./.clean-api-key -clean %f diff --git a/example.yaml b/example.yaml index 6f0cd51..1e39208 100644 --- a/example.yaml +++ b/example.yaml @@ -1,78 +1,78 @@ # ----------------------------------------------------------------------------- # Date fin du mois pour le script # Fin avril fin commit # Fin aout # ---------------------------------------------------------------------------- global: use_keyring: true in: backend: epfl username: richart out: - backend: c4science -# backend: phabricator -# username: richart-test -# host: https://scitassrv18.epfl.ch/api/ -# token: api-la3x6bw7xbna2hul54gltze5dj5g # API token obtain in the settings +# backend: c4science + backend: phabricator + username: richart-test + host: https://scitassrv18.epfl.ch/api/ + token: cli-n5dutb2wv26ivcpo66yvb3sbk64g # API token obtain in the settings groups: __all__: import-scheme: type: sub-project # or project project: test_import # only for sub-project type name: test_{original_name} scitas-ge-unit: import-scheme: type: project name: scitas-ge hpc-lsms: lsms-unit: import-scheme: name: lsms repositories: __all__: type: git import-scheme: type: same #| git | svn name: gms_{original_name} permissions: scheme: import #| project | user | static # project: name -> tag to add # view: | _author_ | _public_ | _users_ # push: | _author_ | _public_ | _users_ # edit: | _author_ | _public_ | _users_ iohelper: import-scheme: branches: [master] tags: [] permissions: view: _public_ push: _users_ edit: lsms toto: akantu: import-scheme: branches: [master] tags: [] test-interface: type: svn import-scheme: permissions: scheme: project project: scitas-ge view: _public_ # partial-import: /path # branches: [] # if not specified all are imported # tags: [] # if not specified all are imported diff --git a/getmystuph/backends/directories/ldap.py b/getmystuph/backends/directories/ldap.py index c0e31f7..c807c0a 100644 --- a/getmystuph/backends/directories/ldap.py +++ b/getmystuph/backends/directories/ldap.py @@ -1,162 +1,162 @@ # -*- coding: utf-8 -*- from ... import export from ...directory import Directory import ldap3 as ldap __author__ = "Nicolas Richart" __copyright__ = "Copyright (C) 2016, EPFL (Ecole Polytechnique Fédérale " \ "de Lausanne) - SCITAS (Scientific IT and Application " \ "Support)" __credits__ = ["Nicolas Richart"] __license__ = "BSD" __version__ = "0.1" __maintainer__ = "Nicolas Richart" __email__ = "nicolas.richart@epfl.ch" @export class LDAPDirectory(Directory): def __init__(self, uri, *args, **kwargs): self.__ldap_basedn = kwargs.pop('basedn', '') self.__ldap_scope = kwargs.pop('scope', ldap.SUBTREE) self.__ldap_user_unique_id = kwargs.pop('uidNumber', 'uidNumber') self.__ldap_user_gecos = kwargs.pop('gecos', 'gecos') self.__ldap_user_id = kwargs.pop('uid', 'uid') self.__ldap_user_email = kwargs.pop('email', 'email') self.__ldap_user_filter = kwargs.pop('user_filter', '(&(objectClass=posixAccount)({attr}={value}))') # NOQA: ignore=E501 self.__ldap_user_group_attrs = kwargs.pop('user_group_attrs', 'memberOf') # NOQA: ignore=E501 self.__ldap_group_unique_id = kwargs.pop('gidNumber', 'gidNumber') self.__ldap_group_id = kwargs.pop('gid', 'cn') self.__ldap_group_filter = kwargs.pop('group_filter', '(&(objectClass=posixGroup)({attr}={value}))') # NOQA: ignore=E501 self.__ldap_group_member_filter = kwargs.pop('group_member_filter', 'uidNumber') # NOQA: ignore=E501 self.__ldap_group_user_attrs = kwargs.pop('group_user_attrs', 'memberUid') # NOQA: ignore=E501 super(LDAPDirectory, self).__init__(*args, **kwargs) self.__ldap_uri = uri self.__server = ldap.Server(self.__ldap_uri) self.__ldap = ldap.Connection(self.__server, auto_bind=True) def __get_one(self, fltr, attr): """get the first ldap entry of attribute (attr) for a given filter (fltr)""" return self.__get_all(fltr, attr)[0] def __get_one_attr(self, fltr, attr): """get the first ldap entry of attribute (attr) for a given filter (fltr)""" _res = self.__get_all(fltr, attr) if len(_res) != 0: return _res[0][attr].value return None def __get_all(self, fltr, attr): """get all the ldap attributes entries (attr) for a given filter (fltr)""" if type(attr) is not list: attrs = [attr] else: attrs = attr _res = self.__ldap.search(search_base=self.__ldap_basedn, search_scope=self.__ldap_scope, search_filter=fltr, attributes=attrs) if _res: return self.__ldap.entries else: return [] def is_valid_user(self, id): _res = self.__get_all( self.__ldap_user_filter.format( attr=self.__ldap_user_unique_id, value=id), self.__ldap_user_unique_id ) return len(_res) != 0 def is_valid_group(self, id): _res = self.__get_one( self.__ldap_user_filter.format( attr=self.__ldap_group_unique_id, value=id), self.__ldap_group_unique_id ) return len(_res) != 0 def get_users_from_group(self, id): _users = [] _members = self.__get_one_attr( self.__ldap_group_filter.format( attr=self.__ldap_group_unique_id, value=id), self.__ldap_group_user_attrs ) if self.__ldap_group_member_filter != self.__ldap_user_unique_id: for m in _members: _filter = \ self.__ldap_user_filter.format( attr=self.__ldap_group_member_filter, value=m) _id = self.__get_one_attr( _filter, self.__ldap_user_unique_id, ) if _id: _users.append(_id) else: for m in _members: if self.is_valid_user(m): _users.append(m) return _users def get_group_unique_id(self, name): return self.__get_one_attr( self.__ldap_group_filter.format( attr=self.__ldap_group_id, value=name), self.__ldap_group_unique_id) def get_user_unique_id(self, email): return self.__get_one_attr( self.__ldap_user_filter.format( attr=self.__ldap_user_email, value=email), self.__ldap_user_unique_id) - def get_user_unique_id_from_name(self, name): + def get_user_unique_id_from_login(self, name): return self.__get_one_attr( self.__ldap_user_filter.format( attr=self.__ldap_user_id, value=name), self.__ldap_user_unique_id) def get_group_name(self, id): return self.__get_one_attr( self.__ldap_group_filter.format( attr=self.__ldap_group_unique_id, value=id), self.__ldap_group_id) def get_user_name(self, id): return self.__get_one_attr( self.__ldap_user_filter.format( attr=self.__ldap_user_unique_id, value=id), self.__ldap_user_gecos) def get_user_email(self, id): return self.__get_one_attr( self.__ldap_user_filter.format( attr=self.__ldap_user_unique_id, value=id), self.__ldap_user_email) diff --git a/getmystuph/backends/directories/phabricator.py b/getmystuph/backends/directories/phabricator.py index 8774cbe..5e28cf0 100644 --- a/getmystuph/backends/directories/phabricator.py +++ b/getmystuph/backends/directories/phabricator.py @@ -1,165 +1,165 @@ import logging import re from ... import colored from ... import export from ... import dry_do from ... import Directory from ...utils import get_phabricator_instance from .. import color_phid __author__ = "Nicolas Richart" __copyright__ = "Copyright (C) 2016, EPFL (Ecole Polytechnique Fédérale " \ "de Lausanne) - SCITAS (Scientific IT and Application " \ "Support)" __credits__ = ["Nicolas Richart"] __license__ = "BSD" __version__ = "0.1" __maintainer__ = "Nicolas Richart" __email__ = "nicolas.richart@epfl.ch" _logger = logging.getLogger(__name__) @export class PhabDirectory(Directory): def __init__(self, *args, host=None, username=None, token=None, **kwargs): super().__init__(**kwargs) self._phab = get_phabricator_instance(host=host, username=username, token=token) def _set_default_policy(self, phid): _default_policy = [{"type": "edit", "value": self.whoami}, {"type": "view", "value": "obj.project.members"}, {"type": "join", "value": "no-one"}] _msg = "Setting default policy for project {0} to {1}".format( color_phid(phid), ', '.join(["{0}: {1}".format(colored(m["type"], attrs=['bold']), color_phid(m['value'])) for m in _default_policy])) _logger.debug(_msg) if not self._dry_run: self._phab.project.edit(transactions=_default_policy, objectIdentifier=phid) else: dry_do(_msg) def color_name(self, name, **kwargs): regex = re.compile(r'PHID-([A-Z]{4})-.+') match = regex.match(name) if match: return color_phid(name) else: return super().color_name(name, **kwargs) def is_valid_user(self, id): return self.get_user_name(id) is not None def is_valid_group(self, id): return self.get_group_name(id) is not None def get_users_from_group(self, id): _res = self._phab.project.search(constraints={'phids': [id]}, attachments={'members': True}) if _res['data']: return [member['phid'] for member in _res['data'][0]['attachments']['members']['members']] return [] def get_group_unique_id(self, name): _res = self._phab.project.query(names=[name]) if _res['data']: return list(_res['data'].keys())[0] return None def get_user_unique_id(self, email): _res = self._phab.user.query(emails=[email]) if _res: return _res[0]['phid'] return None - def get_user_unique_id_from_name(self, name): + def get_user_unique_id_from_login(self, name): _res = self._phab.user.query(emails=[name]) if _res: return _res[0]['phid'] return None def get_group_name(self, gid): _res = self._phab.project.query(phids=[gid]) if _res['data']: return _res[0]['data'][gid]['name'] return None def get_user_name(self, uid): _res = self._phab.user.query(phids=[uid]) if _res: return _res[0]['realName'] return None def get_user_email(self, uid): raise RuntimeError("This information is not accessible") def create_group(self, name, members=[]): _unique_members = list(set(members)) _msg = 'Creating group {0} with members [{1}]'.format( self.color_name(name, type='group'), ', '.join([color_phid(_id) for _id in _unique_members])) _logger.debug(_msg) if not self._dry_run: _res = self._phab.project.create(name=name, members=_unique_members) _phid = _res['phid'] else: _phid = "PHID-PROJ-notarealproject" self._set_default_policy(_phid) return _phid def set_group_users(self, gid, uids): _unique_uids = list(set(uids)) _msg = 'Setting users {0} as members of group {1}'.format( ', '.join([color_phid(_id) for _id in _unique_uids]), color_phid(gid)) _logger.debug(_msg) transactions = [{"type": "members.set", "value": _unique_uids}] if not self._dry_run: self._phab.project.edit(transactions=transactions, objectIdentifier=gid) else: dry_do(_msg) def create_subgroup(self, name, pgid, members=[]): _unique_members = list(set(members)) _msg = 'Creating group {0} as a subgroup of {1} with members {2}'.format( colored(name, 'red', attrs=['bold']), colored(pgid, attrs=['bold']), ', '.join([color_phid(_id) for _id in _unique_members])) _logger.debug(_msg) transactions = [{"type": "parent", "value": pgid}, {"type": "name", "value": name}] if members is not None: transactions.append({"type": "members.set", "value": _unique_members}) if not self._dry_run: _res = self._phab.project.edit(transactions=transactions) _phid = _res['object']['phid'] else: _phid = 'PHID-PROJ-notarealproject' self._set_default_policy(_phid) return _phid @property def whoami(self): me = self._phab.user.whoami() return me['phid'] diff --git a/getmystuph/directory.py b/getmystuph/directory.py index 1ec158f..82aa2b9 100644 --- a/getmystuph/directory.py +++ b/getmystuph/directory.py @@ -1,92 +1,92 @@ # -*- coding: utf-8 -*- import copy import logging from . import export from . import colored from . import color_code from .backends import _get_class __author__ = "Nicolas Richart" __copyright__ = "Copyright (C) 2016, EPFL (Ecole Polytechnique Fédérale " \ "de Lausanne) - SCITAS (Scientific IT and Application " \ "Support)" __credits__ = ["Nicolas Richart"] __license__ = "BSD" __version__ = "0.1" __maintainer__ = "Nicolas Richart" __email__ = "nicolas.richart@epfl.ch" _logger = logging.getLogger(__name__) @export class Directory(object): def __new__(cls, **kwargs): """ Factory constructor depending on the chosen backend """ option = copy.copy(kwargs) backend = option.pop('backend', None) _class = _get_class('directory', backend) return super(Directory, cls).__new__(_class) def __init__(self, **kwargs): opts = copy.copy(kwargs) self._username = opts.pop('username', None) self._backend_name = opts.pop("backend", None) self._dry_run = opts.pop("dry_run", False) def color_name(self, name, type=None): if type is None: return colored(name, attrs=['bold']) else: return colored(name, color_code[type], attrs=['bold']) @property def backend_name(self): return self._backend_name def is_valid_user(self, id): return False def is_valid_group(self, id): return False def get_users_from_group(self, id): return [] def get_group_unique_id(self, name): return None def get_user_unique_id(self, email): return None - def get_user_unique_id_from_name(self, name): + def get_user_unique_id_from_login(self, name): return None def get_group_name(self, id): return None def get_user_name(self, id): return None def get_user_email(self, id): return None def search_users(self, list_emails): _res = {} for _mail in list_emails: _res[_mail] = self.get_user_unique_id(_mail) return _res def create_group(self, name): raise PermissionError('Groups cannot be created in this directory') def set_group_users(self, gid, uids): raise PermissionError('Groups cannot be modified in this directory') @property def whoami(self): return self.get_user_unique_id_from_name(self._username)