diff --git a/getmystuph/backends/epfl/repo.py b/getmystuph/backends/epfl/repo.py index 9ffb60c..a0489ca 100644 --- a/getmystuph/backends/epfl/repo.py +++ b/getmystuph/backends/epfl/repo.py @@ -1,245 +1,241 @@ # -*- coding: utf-8 -*- from bs4 import BeautifulSoup import re import copy import logging from ... import export from ... import colored from ... import color_code from ...repo import Repo from .tequila import TequilaGet __author__ = "Nicolas Richart" __copyright__ = "Copyright (C) 2016, EPFL (Ecole Polytechnique Fédérale " \ "de Lausanne) - SCITAS (Scientific IT and Application " \ "Support)" __credits__ = ["Nicolas Richart"] __license__ = "BSD" __version__ = "0.1" __maintainer__ = "Nicolas Richart" __email__ = "nicolas.richart@epfl.ch" _logger = logging.getLogger(__name__) class RepoEPFL(Repo): ''' Description of a repostitory on {svn git}.epfl.ch ''' _LIST_REPOS = '{root}/repository/my.go' _MANAGE_REPO = '{root}/repository/manage.go?id={id}' _PERMISSION_URL = '{root}/objectRole/list.go?objectId={id}' _REPO_REGEX = '/polyrepo/private/repository/manage\.go\?id=([0-9]+)' _PERMISSIONS = {'Reader': Repo.VIEW, 'Contributor': Repo.PUSH + Repo.VIEW, 'Administrator': Repo.EDIT + Repo.PUSH + Repo.VIEW} _repo_list_cache = {} def __init__(self, name, *args, **kwargs): super().__init__(name, *args, **kwargs) option = copy.copy(kwargs) self._id = option.pop('id', None) self.__tequila_ctx = self._tequila_ctx(**kwargs) self._permissions = None if self._id is None: if self._type in self._repo_list_cache: cache = self._repo_list_cache[self._type] if name in cache: self._id = cache[name] _logger.debug('repo_id {0} for {1} was found in' ' repositories cache'.format( colored(self._id, color_code['repo']), self._colored_name)) else: _logger.debug('No repo_id provided for {0}'.format( self._colored_name)) self.list_repositories( tequila_ctx=self.__tequila_ctx, list_perm=False) cache = self._repo_list_cache[self._type] if name in cache: self._id = cache[name] _logger.debug('repo_id {0} for {1} was found in' ' repositories list'.format( colored(self._id, color_code['repo']), self._colored_name)) if self._id is None: _msg = 'The repo {0} was not found in' \ ' your list of repositories'.format( self._colored_name) _logger.error(_msg) raise RuntimeError(_msg) @property def permissions(self): '''Get the group and user permissions on the repository''' if self._permissions is not None: pass self._permissions = Repo.Permissions(self) _logger.info('Retrieving list of permissions' + ' for repositories {0}'.format(self._colored_name)) _html_resp = None try: _html_resp = self.__tequila_ctx.get( self._PERMISSION_URL.format(root=self._ROOT_URL, id=self._id)) except: return self._permissions _html_soup = BeautifulSoup(_html_resp.text, 'html.parser') _anonymous_perm = _html_soup.find( 'input', {'id': 'anonymousAccess'}).has_attr('checked') _logger.debug(' anonymous access: {0}'.format(_anonymous_perm)) self._permissions.anonymous = _anonymous_perm _group_regex = re.compile('([US][0-9]+)') _list_soup = _html_soup.find( 'form', {'name': 'lister'}).find_all('tr') for _tr in _list_soup: _tds = _tr.find_all('td') if not _tds: continue _perm_txt = _tds[-2].text.strip() _perm = self._PERMISSIONS[_perm_txt] _id_td = _tds[-1] _ug_id = _id_td.text _is_group = _group_regex.match(_ug_id) - _name = '' - if _is_group: - _perm_type = 'group' - if _logger.getEffectiveLevel() == logging.DEBUG: - _name = self.directory.get_group_name(_ug_id) - else: - _perm_type = 'user' - if _logger.getEffectiveLevel() == logging.DEBUG: - _name = self.directory.get_user_name(_ug_id) + _name = None + _perm_type = 'group' if _is_group else 'user' + _name = getattr( + self._user_db, + 'get_{0}_name'.format(_perm_type))(_ug_id, self.directory) if _name is None: _name = _ug_id _logger.warning('{0} {1} unknown'.format( _perm_type, self.directory.color_name(_ug_id))) else: getattr( self._permissions, 'add_{0}'.format(_perm_type))(_ug_id, _perm) _logger.debug(' {0}: {1} [{2}] -> {3} [{4}]'.format( _perm_type, self.directory.color_name(_name, type=_perm_type), self.directory.color_name(_ug_id), _perm_txt.lower(), _perm)) return self._permissions @classmethod def _tequila_ctx(cls, tequila_ctx=None, **kwargs): if tequila_ctx is None: return TequilaGet( cls._LIST_REPOS.format(root=cls._ROOT_URL), **kwargs) else: return tequila_ctx @classmethod def list_repositories(cls, list_perm=False, **kwargs): _type = cls._repo_type _logger.info("Retrieving the {0} list of repositories".format( _type)) _repos = [] _extra_info = {} tequila_ctx = kwargs.pop('tequila_ctx', None) _tequila_ctx = cls._tequila_ctx(tequila_ctx=tequila_ctx, **kwargs) _html_resp = _tequila_ctx.get( cls._LIST_REPOS.format(root=cls._ROOT_URL) ) _html_soup = BeautifulSoup(_html_resp.text, 'html.parser') _list_soup = _html_soup.find('tbody') _id_regex = re.compile(cls._REPO_REGEX) for _link in _list_soup.find_all('a'): _repo = _link.get_text() _repos.append(_repo) _repo_link = _link.get('href') _match = _id_regex.match(_repo_link) if _match: _id = _match.group(1) _extra_info[_repo] = {'id': _id} if list_perm and ('directory' in kwargs): _repo_epfl = Repo(name=_repo, type=_type, id=_id, tequila_ctx=_tequila_ctx, **kwargs) _perms = _repo_epfl.permissions _perm = _perms.user_perm( kwargs['directory'].whoami ) _extra_info[_repo]['perm'] = _perm _logger.debug(' List of repositories:') for _repo in _repos: _logger.debug( ' [{1}] {0} - {2}'.format( colored(_repo, color_code['repo'], attrs=['bold']), colored('{:>5}'.format(_extra_info[_repo]['id']), color_code['repo']), _extra_info[_repo])) if _type not in cls._repo_list_cache: cls._repo_list_cache[_type] = dict() cache = cls._repo_list_cache[_type] for name, info in _extra_info.items(): if 'id' in info: cache[name] = info['id'] return (_repos, _extra_info) @export class RepoGitEPFL(RepoEPFL): _ROOT_URL = 'https://git.epfl.ch/polyrepo/private' _repo_type = 'git' def __init__(self, name, *args, **kwargs): super().__init__(name, *args, **kwargs) self._url = 'https://{0}@git.epfl.ch/repo/{1}.git'.format( self._username, name) @export class RepoSvnEPFL(RepoEPFL): _ROOT_URL = 'https://svn.epfl.ch/polyrepo/private' _repo_type = 'svn' def __init__(self, name, *args, **kwargs): super().__init__(name, *args, **kwargs) self._url = 'https://{0}@svn.epfl.ch/svn/{1}'.format( self._username, name) diff --git a/getmystuph/importers/import_user_db.py b/getmystuph/importers/import_user_db.py index 968460a..7671252 100644 --- a/getmystuph/importers/import_user_db.py +++ b/getmystuph/importers/import_user_db.py @@ -1,279 +1,290 @@ # -*- coding: utf-8 -*- import logging import yaml from .. import export from .. import colored from .group_importer import GroupImporter __author__ = "Nicolas Richart" __copyright__ = "Copyright (C) 2016, EPFL (Ecole Polytechnique Fédérale " \ "de Lausanne) - SCITAS (Scientific IT and Application " \ "Support)" __credits__ = ["Nicolas Richart"] __license__ = "BSD" __version__ = "0.1" __maintainer__ = "Nicolas Richart" __email__ = "nicolas.richart@epfl.ch" _logger = logging.getLogger(__name__) @export class ImportUserDB: def __init__(self, in_directory, out_directory, **kwargs): self._in_directory = in_directory self._out_directory = out_directory self._users = {} self._imported_groups = {} self._default_importer_config = {} self._cache_file = kwargs.pop('cache_file', None) if self._cache_file is not None: self._populate() self._keyring = kwargs.pop('keyring', None) self._dry_run = kwargs.pop('dry_run', False) def _populate(self): try: with open(self._cache_file, 'r+') as _cache_file: _data = yaml.load(_cache_file) if 'users' in _data: self._users = _data['users'] if 'groups' in _data: self._imported_groups = _data['groups'] except FileNotFoundError: pass def save(self): if self._cache_file is None: return if self._dry_run: return with open(self._cache_file, 'w') as _cache_file: yaml.dump( {'users': self._users, 'groups': self._imported_groups}, _cache_file, default_flow_style=False) def import_group(self, _id, _imported_id, _in_name, _out_name): self._imported_groups[_id] = {'out_id': _imported_id, 'in_name': _in_name, 'out_name': _out_name} def add_users(self, users): for _user in users: if _user in self._users and 'oid' in self._users[_user]: _logger.debug( 'User {0} ({1}) from {2} already in cache [{3} - {4}]' .format( self._in_directory.color_name( self._users[_user]['name'], type='user'), self._in_directory.color_name(_user), self._in_directory.backend_name, self._out_directory.color_name( self._users[_user]['oid']), self._out_directory.backend_name)) continue _user_info = {'id': _user} try: _mail = self._in_directory.get_user_email(_user) _user_info['email'] = _mail _user_info['name'] = self._in_directory.get_user_name(_user) _user_info['in_login'] = self._in_directory.get_user_login( _user) _out_id = self._out_directory.get_user_unique_id(_mail) if _out_id is not None: _logger.debug( 'Found {0} ({1}) in {2} as {3} in {4}'.format( self._in_directory.color_name(_user_info['name'], type='user'), self._in_directory.color_name(_user), self._in_directory.backend_name, self._out_directory.color_name(_out_id), self._out_directory.backend_name)) _user_info['oid'] = _out_id self._users[_user] = _user_info else: _logger.debug( 'Did not find {0} ({1}) from {2} in {3}'.format( self._in_directory.color_name(_user_info['name'], type='user'), self._in_directory.color_name(_user), self._in_directory.backend_name, self._out_directory.backend_name)) except: _logger.warning( "The user {0} does not exists in {1}".format( self._in_directory.color_name(_user), colored(self._in_directory.backend_name, attrs=['bold']))) def get_user_oids(self, users_ids): return [self._users[_user]['oid'] for _user in users_ids if (_user in self._users) and ('oid' in self._users[_user])] def is_id_in_cache(self, _id): return _id in self._users def get_user_name(self, _id, directory=None): if _id in self._users and 'name' in self._users[_id]: return self._users[_id]['name'] else: return directory.get_user_name(_id) + def get_group_name(self, _id, directory=None): + return directory.get_user_name(_id) + @property def default_importer(self): return self._default_importer_config @default_importer.setter def default_importer(self, importer_config): self._default_importer_config = importer_config @property def users(self): return self._users @property def directory(self): return self._out_directory def get_importer(self, name): if name not in self._default_importer_config: name = '__all__' importer = GroupImporter( name, self._default_importer_config[name], backend_in={'directory': self._in_directory}, backend_out={'directory': self._out_directory}, user_db=self, keyring=self._keyring, dry_run=self._dry_run) return importer @property def in_directory(self): return self._in_directory def group(self, _id, create=False): if _id in self._imported_groups: _logger.debug('Found group {0} in cache: {1}'.format( self._in_directory.color_name(_id), self._out_directory.color_name( self._imported_groups[_id]['out_name']))) return self._imported_groups[_id]['out_id'] else: _name = self._in_directory.get_group_name(_id) if _name is not None: _importer = self.get_importer(_name) _t_name = _importer.transfered_name(_name) _out = self.get_group_unique_id( _t_name, create=create, in_name=_name) if _out is None: _logger.error( ' Could not find {1} ({0}) in directory {2} {3}' .format( self._in_directory.color_name(_id), self._in_directory.color_name( _t_name, type='group'), self._out_directory.backend_name, create)) return _out else: _logger.error( ' Could not find {0} in directory {1}'.format( _id, self._in_directory.backend_name)) return None def group_by_in_name(self, _name): for _id, data in self._imported_groups.items(): if data['in_name'] == _name: return data['out_id'] return None def group_by_out_name(self, _name): for _id, data in self._imported_groups.items(): if data['out_name'] == _name: return data['out_id'] return None def get_group_unique_id(self, _name, create=False, in_name=None, members=set(), transfer_options={}): _gid = self.group_by_out_name(_name) if _gid is not None: return _gid _gid = self._out_directory.get_group_unique_id(_name) if _gid is not None: _logger.debug("Found group {0} in {1}: {2}".format( self._out_directory.color_name(_name), self._out_directory.backend_name, self._out_directory.color_name(_gid) )) if members: members.update(self._out_directory.get_users_from_group(_gid)) self._out_directory.set_group_users(_gid, members) return _gid if not create: return None if in_name is None: in_name = _name _logger.debug("Group {0} not in {1}, try to create it".format( self._out_directory.color_name(_name), self._out_directory.backend_name )) _importer = self.get_importer(in_name) _gid = _importer.transfer(in_name, **transfer_options) if members: self._out_directory.set_group_users(_gid, members) return _gid def user_by_in_login(self, login): for _id, data in self._users.items(): if data['in_login'] == login: return data _id = self._in_directory.get_user_unique_id_from_login(login) if _id is not None: _name = self._in_directory.get_user_name(_id) _email = self._in_directory.get_user_email(_id) self._users[_id] = { 'name': _name, 'email': _email, 'in_login': login } return self._users[_id] return None def user(self, _id, **kwargs): if _id in self._users and 'oid' in self._users[_id]: _logger.debug('Found user {0} in cache: {1}'.format( self._in_directory.color_name(_id), self._out_directory.color_name(self._users[_id]['oid']))) return self._users[_id]['oid'] + + if _id in self._users and 'email' in self._users[_id]: + _logger.debug('Partially found user {0} in cache: {1}'.format( + self._in_directory.color_name(_id), + self._out_directory.color_name(self._users[_id]['email']))) + _email = self._users[_id]['email'] else: - if 'email' in self._users: - _logger.debug('Partially found user {0} in cache: {1}'.format( - self._in_directory.color_name(_id), - self._out_directory.color_name(self._users[_id]['oid']))) + _email = self._in_directory.get_user_email(_id) - _email = self._users['email'] - else: - _email = self._in_directory.get_user_email(_id) - if _email is not None: - return self._out_directory.get_user_unique_id(_email) - return None + _oid = None + if _email is not None: + _oid = self._out_directory.get_user_unique_id(_email) + + if _oid: + _logger.debug('Found a match for user {0}: {1}'.format( + self._in_directory.color_name(_email), + self._out_directory.color_name(_oid))) + self._users[_id]['oid'] = _oid + + return _oid