Page MenuHomec4science

repo_importer.py
No OneTemporary

File Metadata

Created
Wed, May 22, 11:12

repo_importer.py

# -*- coding: utf-8 -*-
import logging
import copy
from .. import export
from .. import colored
from .. import color_code
from ..repo import Repo
from .importer import Importer
__author__ = "Nicolas Richart"
__copyright__ = "Copyright (C) 2016, EPFL (Ecole Polytechnique Fédérale " \
"de Lausanne) - SCITAS (Scientific IT and Application " \
"Support)"
__credits__ = ["Nicolas Richart"]
__license__ = "BSD"
__version__ = "0.1"
__maintainer__ = "Nicolas Richart"
__email__ = "nicolas.richart@epfl.ch"
_logger = logging.getLogger(__name__)
@export
class RepoImporter(Importer):
__default_import_scheme = {'type': 'git',
'permissions': {'scheme': 'import'}}
def __init__(self, name, config, **kwargs):
super().__init__(name, config,
self.__default_import_scheme, **kwargs)
_logger.debug(
'Initializing importer for Repo {0}'
' with configuration: {1}'.format(self._colored_name,
self._config))
def transfer(self, name):
_colored_name = colored(name, color_code['repo'], attrs=['bold'])
_logger.info('Locking for repo: {0} ({1})'.format(_colored_name,
self._colored_name))
_import_scheme = copy.copy(self._config['import-scheme'])
_logger.debug(' --> repo info {0}'.format(colored(self._config,
attrs=['bold'])))
_config = copy.copy(self._config)
_type = _config.pop('type', self.__default_import_scheme['type'])
_in_repo = Repo(name=name,
keyring=self._keyring,
dry_run=self._dry_run,
type=_type,
**self._backend_in)
_type = _import_scheme['type']
if _type == 'same':
_type = _in_repo.repo_type
_name = _import_scheme.pop('name', name).format(
original_name=name)
_out_repo = Repo(name=_name,
keyring=self._keyring,
dry_run=self._dry_run,
type=_type,
**self._backend_out)
_queries = {'git': {'git': 'RepoGit', 'svn': 'RepoGitSvn'},
'svn': {'svn': 'RepoSvnSync'}}
_module = _out_repo.repo_type
if _module != _in_repo.repo_type:
_module = '_'.join([_module, _in_repo.repo_type])
_query_class = None
if _type in _queries and _in_repo.repo_type in _queries[_type]:
_class_name = _queries[_type][_in_repo.repo_type]
_module = __import__(
'getmystuph.backends.repos.{0}'.format(_module),
globals(),
locals(),
[_class_name], 0)
_query_class = getattr(_module, _class_name)
else:
_msg = 'Cannot import a {0} repo in a {1} repo'.format(
_type, _in_repo.repo_type)
_logger.error(_msg)
raise RuntimeError(_msg)
_permissions_scheme = _import_scheme['permissions']
_projects = []
if 'project' in _permissions_scheme:
if type(_permissions_scheme['project']) == list:
_projects = _permissions_scheme['project']
else:
_projects = [_permissions_scheme['project']]
_out_repo.create(projects=_projects)
_out_perms = Repo.Permissions(_out_repo)
if _permissions_scheme['scheme'] == 'import':
_in_perms = _in_repo.permissions
_logger.debug("Replicating permissions {0}".format(_in_perms))
for _type in {'group', 'user'}:
_perms_ug = getattr(_in_perms, '{0}s'.format(_type))
for _entity in _perms_ug:
_in_id = _entity['id']
_out_id = getattr(
self._user_db, _type)(_in_id, create=True)
if _type == 'user' and \
_out_id == self._user_db.directory.whoami:
_out_id = '_author_'
if _out_id is not None:
getattr(_out_perms,
'add_{0}'.format(_type))(_out_id,
_entity['perm'])
else:
_logger.warning(
'No permissions to replicate for repository {0}'.format(
_colored_name))
elif _permissions_scheme['scheme'] == 'project':
if 'project' in _permissions_scheme:
_gid = self._user_db.get_group_unique_id(
_permissions_scheme['project'])
if _gid is not None:
_out_perms.groups = [
{'id': _gid,
'perm': Repo.EDIT + Repo.PUSH + Repo.VIEW}]
else:
_msg = str(
'The project {0} you specified in the ' +
'permissions of repo {1} does not exists' +
' in {2}').format(
colored(
_permissions_scheme['project'],
color_code['group'],
attrs=['bold']),
_colored_name,
self._user_db.directory.backend_name)
_logger.error(_msg)
raise RuntimeError(_msg)
else:
_msg = 'You should specify a project name in the ' + \
'permissions of repo {0}' .format(_colored_name) + \
' to be able to use the \'project\' import scheme'
_logger.error(_msg)
raise RuntimeError(_msg)
elif _permissions_scheme['scheme'] == 'user':
_out_perms.groups = [
{'id': '_author_',
'perm': Repo.EDIT + Repo.PUSH + Repo.VIEW}]
elif _permissions_scheme['scheme'] == 'static':
for _perm_type in ['edit', 'view', 'push']:
if _perm_type not in _permissions_scheme:
_msg = 'You should specify a \'{0}\' in the ' + \
'permissions of repo {1} to be able to use the ' + \
'\'project\' import scheme'.format(_perm_type,
_colored_name)
_logger.error(_msg)
raise RuntimeError(_msg)
_equivalent = {'edit': Repo.EDIT,
'view': Repo.VIEW,
'push': Repo.PUSH}
for _perm_type in _equivalent.keys():
if _perm_type in _permissions_scheme:
_perm_list = _permissions_scheme[_perm_type]
if type(_perm_list) is not list:
_perm_list = [_perm_list]
if _equivalent[_perm_type] is Repo.VIEW:
_out_perms.anonymous = False
_out_perms.remove_permission(_equivalent[_perm_type])
for _entity in _perm_list:
if _entity == '_author_' or \
_entity == '_users_' or \
_entity == '_public_':
_out_perms.add_user(_entity, _equivalent[_perm_type])
else:
_id = self._user_db.directory.get_group_unique_id(
_entity)
if _id is not None:
_out_perms.add_group(_id,
_equivalent[_perm_type])
else:
_logger.error(
'The project {0} was not found in {1}'.format(
_entity,
self._user_db.directory.backend_name))
if _out_perms is not None:
_out_repo.set_permissions(_out_perms)
if _query_class is not None:
_out_repo.enable()
with _query_class(_in_repo,
out_repo=_out_repo,
dry_run=self._dry_run,
keyring=self._keyring,
user_db=self._user_db) as _clone:
_branches = _clone.branches
_tags = _clone.tags
for b in _branches:
_logger.debug("Branch: {0}".format(
colored(b, attrs=['bold'])))
for t in _tags:
_logger.debug("Tag: {0}".format(
colored(t, attrs=['bold'])))
_out_repo.wait_enabled()
_clone.push()

Event Timeline