Page MenuHomec4science

repo.py
No OneTemporary

File Metadata

Created
Fri, May 17, 16:11
# -*- coding: utf-8 -*-
import copy
import logging
import os, stat
import tempfile
import platform
from . import export
from . import colored
from . import color_code
from .backends import _get_class
from .directory import Directory
__author__ = "Nicolas Richart"
__copyright__ = "Copyright (C) 2016, EPFL (Ecole Polytechnique Fédérale " \
"de Lausanne) - SCITAS (Scientific IT and Application " \
"Support)"
__credits__ = ["Nicolas Richart"]
__license__ = "BSD"
__version__ = "0.1"
__maintainer__ = "Nicolas Richart"
__email__ = "nicolas.richart@epfl.ch"
_logger = logging.getLogger(__name__)
@export
class Repo(object):
'''Interface class to define for your backend'''
VIEW = 0x1
PUSH = 0x2
EDIT = 0x4
_repo_backends = dict()
def __new__(cls, *args, **kwargs):
"""
Factory constructor depending on the chosen backend
"""
option = copy.copy(kwargs)
backend = option.pop('backend', None)
repo_type = option.pop('type', 'git')
_class = _get_class(repo_type, backend)
return super(Repo, cls).__new__(_class)
def __init__(self, name, *args, **kwargs):
self._name = name
self._colored_name = self.color_name(name)
options = copy.copy(kwargs)
self._username = options.pop('username', None)
self._type = options.pop('type', None)
self._dry_run = options.pop("dry_run", False)
self._backend_name = options.pop("backend", None)
self._trust_cert = options.pop('trust_cert', False)
self._user_db = options.pop('user_db', None)
self._directory = options.pop('directory', None)
if self._directory is None:
self._directory = Directory(type='directory',
backend=self._backend_name,
username=self._username,
dry_run=self._dry_run,
**options)
def enable(self):
pass
@property
def backend_name(self):
return self._backend_name
@property
def repo_type(self):
return self._type
def color_name(self, name):
return colored(name, color_code['repo'], attrs=['bold'])
@property
def directory(self):
return self._directory
class Permissions(object):
def __init__(self, repo):
self.__groups = []
self.__users = []
self.__anonymous = False
self.__repo = repo
@property
def groups(self):
return self.__groups
@groups.setter
def groups(self, groups_perms):
self.__groups = copy.copy(groups_perms)
def add_group(self, group_id, perm):
self.__groups.append({'id': group_id, 'perm': perm})
def add_user(self, user_id, perm):
self.__users.append({'id': user_id, 'perm': perm})
def remove_permission(self, perm):
_lists = {'group': self.__groups, 'user': self.__users}
for _type in _lists.keys():
for _entity in _lists[_type]:
_entity['perm'] = \
_entity['perm'] ^ (_entity['perm'] & perm)
def user_perm(self, _id):
for _user in self.__users:
if _user['id'] == _id:
return _user['perm']
return 0
@property
def users(self):
return self.__users
@users.setter
def users(self, users_perms):
self.__users = copy.copy(users_perms)
@property
def anonymous(self):
return self.__anonymous
@anonymous.setter
def anonymous(self, anonymous):
self.__anonymous = anonymous
def all_users(self, perm_filter=0):
_users = []
for u in self.__users:
if (perm_filter == 0) or (u['perm'] & perm_filter):
_users.append(u['id'])
_directory = self.__repo.directory
for g in self.__groups:
if (perm_filter == 0) or (g['perm'] & perm_filter):
_users.extend(_directory.get_users_from_group(g['id']))
return set(_users)
def __repr__(self):
return '<groups: {0}, users: {1}, anonymous: {2}>'.format(
self.__groups, self.__users, self.__anonymous)
def set_permissions(self, permissions):
pass
@property
def permissions(self):
'''
Returns a dictionary of permissions of the form:
{'groups': [{'id': id, 'perm': perm, ...}, ...],
'users': [{'id': id, 'perm': perm, ...}, ...],
'anonymous': True/False}
perm should be read, write, admin, or None
'''
return self.Permissions(self)
@property
def name(self):
return self._name
@property
def url(self):
return self._url
@property
def username(self):
return self._username
def get_query(self):
if self._type == 'git':
from .repo_backends import RepoGit
return RepoGit(self)
else:
raise RuntimeError(
'No backend for \'{0}\' implemented yet'.format(self._type))
class RepoQuery(object):
class debug_mktemp:
def __init__(self, name):
self._path = os.path.join(os.environ['TMPDIR'],
'getmystuph', name)
try:
os.makedirs(self._path)
except FileExistsError:
pass
@property
def name(self):
return self._path
def cleanup(self):
pass
def __init__(self, repo, **kwargs):
self._in_repo = repo
self._name = repo.name
self._url = repo.url
self._username = repo.username
self._dry_run = kwargs.pop('dry_run', False)
self._keyring = kwargs.pop('keyring', None)
self._out_repo = kwargs.pop('out_repo', None)
self._user_db = kwargs.pop('user_db', None)
self._trust_cert = kwargs.pop('trust_cert', False)
self._stage_path = None
def __enter__(self):
if self._stage_path is None:
self._create_stage()
return self
def _create_stage(self):
self._stage_path = tempfile.TemporaryDirectory(
prefix=self._name + '-')
# self._stage_path = RepoQuery.debug_mktemp(self._name)
_logger.debug('Creating stage folder {0} for repo {1}'.format(
colored(self.working_dir, attrs=['bold']),
self._in_repo.color_name(self._name)))
def __exit__(self, *arg, **kwargs):
_logger.debug('Cleaning staged folder {0}'.format(
colored(self.working_dir, attrs=['bold'])))
if platform.system() == 'Windows':
# Fix TemporaryDirectory permissions on Windows
for root, dirs, files in os.walk(self._stage_path.name):
for _dir in dirs:
os.chmod(os.path.join(root, _dir), stat.S_IWRITE)
for _file in files:
os.chmod(os.path.join(root, _file), stat.S_IWRITE)
self._stage_path.cleanup()
def join_the_dark_side(self):
'''Allow nasty changes if required'''
pass
def join_the_good_side(self):
'''Disallow nasty changes if required'''
pass
@property
def tags(self):
return []
@property
def branches(self):
return []
@property
def working_dir(self):
return self._stage_path.name
def push(self):
pass

Event Timeline