Page MenuHomec4science

phabricator.py
No OneTemporary

File Metadata

Created
Thu, May 9, 09:07

phabricator.py

# -*- coding: utf-8 -*-
import logging
import copy
import re
import time
from ... import export
from ... import dry_do
from ...repo import Repo
from ...utils import get_phabricator_instance
from .. import color_phid
__author__ = "Nicolas Richart"
__copyright__ = "Copyright (C) 2016, EPFL (Ecole Polytechnique Fédérale " \
"de Lausanne) - SCITAS (Scientific IT and Application " \
"Support)"
__credits__ = ["Nicolas Richart"]
__license__ = "BSD"
__version__ = "0.1"
__maintainer__ = "Nicolas Richart"
__email__ = "nicolas.richart@epfl.ch"
_logger = logging.getLogger(__name__)
@export
class PhabRepo(Repo):
def __init__(self, *args, host=None, username=None, token=None, **kwargs):
super().__init__(**kwargs, username=username)
options = copy.copy(kwargs)
self._repo_type = options.pop('repo_type', 'git')
# _create = options.pop('create', False)
self._host = '{0}/api/'.format(host.rstrip('/'))
self._server = None
_server_re = re.compile(r'https?://(.*)')
_match = _server_re.match(host.rstrip('/'))
if _match is not None:
self._server = _match.group(1)
if self._server is None:
raise RuntimeError(
'Cannot extract the server name for repo {0} from {1}'.format(
self._colored_name, host))
self._phab = get_phabricator_instance(host=self._host,
username=username,
token=token)
_data = self._phab.diffusion.repository.search(
queryKey="all", constraints={"name": self._name})['data']
self._phab_id = None
for _repo in _data:
_repo_name = _repo['fields']['name']
if _repo_name == self._name:
self._phab_id = _repo['phid']
self._id = _repo['id']
_logger.debug('Repositories {0} has id {1}'.format(
self._colored_name,
self._phab_id))
if self._phab_id is None:
_logger.debug('Repositories {0} not in phabricator'.format(
self._colored_name))
def create(self):
if self._phab_id is not None:
_msg = 'The repository {0}:{1} already exists'.format(
self._colored_name, self._phab_id)
_logger.error(_msg)
return
#raise RuntimeError(_msg)
if self._dry_run:
self._phab_id = "PHID-REPO-notarealrepo"
else:
_data = self._phab.diffusion.repository.edit(
transactions=[{'type': 'name',
'value': self._name},
{'type': 'vcs',
'value': self._repo_type}])
self._creation_data = _data['object']
self._phab_id = self._creation_data['phid']
_msg = 'Created repository {0} id {1}'.format(self._colored_name,
color_phid(self._phab_id))
_logger.info(_msg)
if self._dry_run:
dry_do(_msg)
self._id = 666
else:
self._id = self._creation_data['id']
@property
def url(self):
if self.repo_type == 'git':
self._url = 'git@{0}:/diffusion/{1}/{2}.git'.format(
self._server, self._id, self._name)
elif self.repo_type == 'svn':
self._url = 'svn+ssh://git@{0}/diffusion/{1}/{2}.git'.format(
self._server, self._id)
return self._url
def enable(self):
_msg = 'Activating repository {0} [{1}]'.format(
self._colored_name, color_phid(self._phab_id))
_logger.info(_msg)
if self._dry_run:
dry_do(_msg)
else:
self._phab.diffusion.repository.edit(
transactions=[{'type': 'status',
'value': 'active'}],
objectIdentifier=self._phab_id)
def wait_enabled(self, timeout=3600):
_msg = 'Checking if {0} [{1}] is activated'.format(
self._colored_name, color_phid(self._phab_id))
_logger.info(_msg)
if self._dry_run:
dry_do(_msg)
else:
_time = 0
while True:
_data = self._phab.diffusion.repository.search(
queryKey="all", constraints={"phids": [self._phab_id]})['data']
if not len(_data) == 1:
raise RuntimeError('Cannot find the repo {0}'.format(self._colored_name))
_status = _data[0]['fields']['status']
if _status == 'active' or _time > timeout:
return
time.sleep(1)
_time += 1
def set_permissions(self, permissions):
_perms = {'edit': [],
'push': [],
'view': []}
_equivalent = {'edit': Repo.EDIT,
'view': Repo.VIEW,
'push': Repo.PUSH}
_phab_perms = {'edit': 'edit',
'view': 'view',
'push': 'policy.push'}
_special_perms = {'_author_': 'obj.repository.author',
'_users_': 'users',
'_public_': 'public'}
for _type in {'group', 'user'}:
_perms_ug = getattr(permissions, '{0}s'.format(_type))
for _entity in _perms_ug:
_id = _entity['id']
if _id in _special_perms:
_id = _special_perms[_id]
for _phab, _gen in _equivalent.items():
if _entity['perm'] & _gen:
_perms[_phab].append(_id)
if permissions.anonymous:
_perms['view'] = ['public']
for _type in ['push', 'view', 'edit']:
if _type not in _perms:
continue
_perms[_type] = list(set(_perms[_type]))
if len(_perms[_type]) > 1:
if 'public' in _perms[_type]:
_perms[_type] = 'public'
continue
if 'users' in _perms[_type]:
_perms[_type] = 'users'
continue
# create custom policy to replace the list
regex = re.compile(r'PHID-([A-Z]{4})-.+')
_lists = {'PROJ': [],
'USER': []}
for _p in _perms[_type]:
match = regex.match(_p)
if match is not None:
_lists[match.group(1)].append(_p)
elif _p == 'obj.repository.author':
_lists['USER'].append(self._directory.whoami)
_policy = [
{"action": "allow", "rule": 'PhabricatorUsersPolicyRule', "value": _lists['USER']},
{"action": "allow", "rule": 'PhabricatorProjectsPolicyRule', "value": _lists['PROJ']}]
_msg = 'Creating policy for users [{0}] and projects [{1}]'.format(
', '.join([color_phid(_id) for _id in _lists['USER']]),
', '.join([color_phid(_id) for _id in _lists['PROJ']]))
_logger.debug(_msg)
if self._dry_run:
dry_do(_msg)
_phid = 'PHID-PLCY-notapolicy'
else:
_data = self._phab.policy.create(objectType='REPO', default='deny',
policy=_policy)
_phid = _data['phid']
_logger.info('Replacing list {0} by policy {1}'.format(
_perms[_type], _phid))
_perms[_type] = _phid
else:
_perms[_type] = _perms[_type][0]
_msg = 'Setting \'{0}\' permissions for {1} to {2}:'.format(
_type, self._colored_name,
color_phid(_perms[_type]))
_logger.info(_msg)
if not self._dry_run:
self._phab.diffusion.repository.edit(
transactions=[{'type': _phab_perms[_type],
'value': _perms[_type]}],
objectIdentifier=self._phab_id)
else:
dry_do(_msg)

Event Timeline