Page MenuHomec4science

repo_importer.py
No OneTemporary

File Metadata

Created
Sat, Oct 12, 07:55

repo_importer.py

# -*- coding: utf-8 -*-
import logging
import copy
from .. import export
from .. import colored
from .. import Repo
from . import Importer
from . import GroupImporter
__author__ = "Nicolas Richart"
__copyright__ = "Copyright (C) 2016, EPFL (Ecole Polytechnique Fédérale " \
"de Lausanne) - SCITAS (Scientific IT and Application " \
"Support)"
__credits__ = ["Nicolas Richart"]
__license__ = "BSD"
__version__ = "0.1"
__maintainer__ = "Nicolas Richart"
__email__ = "nicolas.richart@epfl.ch"
_logger = logging.getLogger(__name__)
@export
class RepoImporter(Importer):
__default_import_scheme = {'type': 'git',
'permissions': {'scheme': 'import'}}
def __init__(self, name, config, **kwargs):
super().__init__(name, config,
self.__default_import_scheme, **kwargs)
if 'group_importer' in kwargs and kwargs['group_importer'] is not None:
self._group_importer = kwargs['group_importer']
else:
self._group_importer = GroupImporter(
'__all__', {},
keyring=self._keyring,
dry_run=self._dry_run,
backend_in=self._backend_in,
backend_out=self._backend_out,
user_db=self._users_db)
_logger.info(
'Initializing importer for Repo {0}'
' with configuration: {1}'.format(self._colored_name,
self._config))
def transfer(self, name):
_colored_name = colored(name, 'red', attrs=['bold'])
_logger.info('Locking for repo: {0} ({1})'.format(_colored_name,
self._colored_name))
_import_scheme = copy.copy(self._config['import-scheme'])
_logger.debug(' --> repo info {0}'.format(colored(self._config,
attrs=['bold'])))
_config = copy.copy(self._config)
_type = _config.pop('type', self.__default_import_scheme['type'])
_in_repo = Repo(name=name,
keyring=self._keyring,
dry_run=self._dry_run,
type=_type,
**self._backend_in)
_type = _import_scheme['type']
if _type == 'same':
_type = _in_repo.repo_type
_name = _import_scheme.pop('name', self._name).format(
original_name=self._name)
_out_repo = Repo(name=_name,
keyring=self._keyring,
dry_run=self._dry_run,
type=_type,
**self._backend_out)
_out_repo.create()
_permissions_scheme = _import_scheme['permissions']
_perms = {'edit': [],
'view': [],
'push': []}
if _permissions_scheme['scheme'] == 'import':
_in_perms = _in_repo.permissions
_logger.debug("Replicating permissions {0}".format(_in_perms))
def _add_to_perms(_perms, ug_perm, _id):
if ug_perm == Repo.EDIT:
_perms['edit'].append(_id)
_perms['push'].append(_id)
_perms['view'].append(_id)
if ug_perm == Repo.PUSH:
_perms['push'].append(_id)
_perms['view'].append(_id)
if ug_perm == Repo.VIEW:
_perms['view'].append(_id)
for _group in _in_perms.groups:
_in_gid = _group['id']
_out_gid = self._user_db.group(_in_gid)
if _out_gid is None:
name = _in_repo.directory.get_group_name(_in_gid)
if name is not None:
_out_gid = self._group_importer.transfer(name)
if _out_gid is not None:
_add_to_perms(_perms, _group['perm'], _out_gid)
else:
_logger.error(
' Could not find {0} in directory {1}'.format(
_in_gid, self._user_db.directory.backend_name))
for user in _in_perms.users:
_in_id = user['id']
_out_id = self._user_db.user(_in_id)
if _out_id is None:
self._user_db.add_users([_in_id], _in_repo.directory)
_out_id = self._user_db.user(_in_id)
if _out_id is not None:
_add_to_perms(_perms, user['perm'], _out_id)
else:
_logger.error(
' Could not find {0} in directory {1}'.format(
colored(_in_id, attrs=['bold']),
self._user_db.directory.backend_name))
if _in_perms.anonymous:
_perms['view'] = ['public']
elif _permissions_scheme['scheme'] == 'project':
if 'project' in _permissions_scheme:
_gid = self._user_db.directory.get_group_unique_id(
_permissions_scheme['project'])
if _gid is not None:
_perms['edit'] = [_gid]
_perms['view'] = [_gid]
_perms['push'] = [_gid]
else:
_msg = 'You should specify a project name in the ' + \
'permissions of repo {0} to be able to use the ' + \
'\'project\' import scheme'.format(_colored_name)
_logger.error(_msg)
raise RuntimeError(_msg)
elif _permissions_scheme['scheme'] == 'user':
_perms['edit'] = ['obj.repository.author']
_perms['view'] = ['obj.repository.author']
_perms['push'] = ['obj.repository.author']
elif _permissions_scheme['scheme'] == 'static':
for _perm_type in ['edit', 'view', 'push']:
if _perm_type not in _permissions_scheme:
_msg = 'You should specify a \'{0}\' in the ' + \
'permissions of repo {1} to be able to use the ' + \
'\'project\' import scheme'.format(_perm_type,
_colored_name)
_logger.error(_msg)
raise RuntimeError(_msg)
for _perm_type in ['edit', 'view', 'push']:
if _perm_type in _permissions_scheme:
_perm = _permissions_scheme[_perm_type]
if _perm == 'user':
_perm = 'obj.repository.author'
_perms[_perm_type] = [_perm]
_out_repo.set_permissions(**_perms)

Event Timeline