diff --git a/getmystuph/repo.py b/getmystuph/repo.py index 41fb792..9fe511e 100644 --- a/getmystuph/repo.py +++ b/getmystuph/repo.py @@ -1,252 +1,261 @@ # -*- coding: utf-8 -*- import copy import logging -import os +import os, stat import tempfile +import platform from . import export from . import colored from . import color_code from .backends import _get_class from .directory import Directory __author__ = "Nicolas Richart" __copyright__ = "Copyright (C) 2016, EPFL (Ecole Polytechnique Fédérale " \ "de Lausanne) - SCITAS (Scientific IT and Application " \ "Support)" __credits__ = ["Nicolas Richart"] __license__ = "BSD" __version__ = "0.1" __maintainer__ = "Nicolas Richart" __email__ = "nicolas.richart@epfl.ch" _logger = logging.getLogger(__name__) @export class Repo(object): '''Interface class to define for your backend''' VIEW = 0x1 PUSH = 0x2 EDIT = 0x4 _repo_backends = dict() def __new__(cls, *args, **kwargs): """ Factory constructor depending on the chosen backend """ option = copy.copy(kwargs) backend = option.pop('backend', None) repo_type = option.pop('type', 'git') _class = _get_class(repo_type, backend) return super(Repo, cls).__new__(_class) def __init__(self, name, *args, **kwargs): self._name = name self._colored_name = self.color_name(name) options = copy.copy(kwargs) self._username = options.pop('username', None) self._type = options.pop('type', None) self._dry_run = options.pop("dry_run", False) self._backend_name = options.pop("backend", None) self._directory = options.pop('directory', None) if self._directory is None: self._directory = Directory(type='directory', backend=self._backend_name, username=self._username, dry_run=self._dry_run, **options) def enable(self): pass @property def backend_name(self): return self._backend_name @property def repo_type(self): return self._type def color_name(self, name): return colored(name, color_code['repo'], attrs=['bold']) @property def directory(self): return self._directory class Permissions(object): def __init__(self, repo): self.__groups = [] self.__users = [] self.__anonymous = False self.__repo = repo @property def groups(self): return self.__groups @groups.setter def groups(self, groups_perms): self.__groups = copy.copy(groups_perms) def add_group(self, group_id, perm): self.__groups.append({'id': group_id, 'perm': perm}) def add_user(self, user_id, perm): self.__users.append({'id': user_id, 'perm': perm}) def remove_permission(self, perm): _lists = {'group': self.__groups, 'user': self.__users} for _type in _lists.keys(): for _entity in _lists[_type]: _entity['perm'] = \ _entity['perm'] ^ (_entity['perm'] & perm) def user_perm(self, _id): for _user in self.__users: if _user['id'] == _id: return _user['perm'] return 0 @property def users(self): return self.__users @users.setter def users(self, users_perms): self.__users = copy.copy(users_perms) @property def anonymous(self): return self.__anonymous @anonymous.setter def anonymous(self, anonymous): self.__anonymous = anonymous @property def all_users(self): _users = [u['id'] for u in self.__users] _directory = self.__repo.directory for g in self._groups: _users.extend(_directory.get_users_from_group(g['id'])) return set(_users) def __repr__(self): return ''.format( self.__groups, self.__users, self.__anonymous) def set_permissions(self, permissions): pass @property def permissions(self): ''' Returns a dictionary of permissions of the form: {'groups': [{'id': id, 'perm': perm, ...}, ...], 'users': [{'id': id, 'perm': perm, ...}, ...], 'anonymous': True/False} perm should be read, write, admin, or None ''' return self.Permissions(self) @property def name(self): return self._name @property def url(self): return self._url @property def username(self): return self._username def get_query(self): if self._type == 'git': from .repo_backends import RepoGit return RepoGit(self) else: raise RuntimeError( 'No backend for \'{0}\' implemented yet'.format(self._type)) class RepoQuery(object): class debug_mktemp: def __init__(self, name): self._path = os.path.join(os.environ['TMPDIR'], 'getmystuph', name) try: os.makedirs(self._path) except FileExistsError: pass @property def name(self): return self._path def cleanup(self): pass def __init__(self, repo, **kwargs): self._in_repo = repo self._name = repo.name self._url = repo.url self._username = repo.username self._dry_run = kwargs.pop('dry_run', False) self._keyring = kwargs.pop('keyring', None) self._out_repo = kwargs.pop('out_repo', None) self._user_db = kwargs.pop('user_db', None) self._stage_path = None def __enter__(self): if self._stage_path is None: self._create_stage() return self def _create_stage(self): self._stage_path = tempfile.TemporaryDirectory( prefix=self._name + '-') # self._stage_path = RepoQuery.debug_mktemp(self._name) _logger.debug('Creating stage folder {0} for repo {1}'.format( colored(self.working_dir, attrs=['bold']), self._in_repo.color_name(self._name))) def __exit__(self, *arg, **kwargs): _logger.debug('Cleaning staged folder {0}'.format( colored(self.working_dir, attrs=['bold']))) + if platform.system() == 'Windows': + # Fix TemporaryDirectory permissions on Windows + for root, dirs, files in os.walk(self._stage_path.name): + for _dir in dirs: + os.chmod(os.path.join(root, _dir), stat.S_IWRITE) + for _file in files: + os.chmod(os.path.join(root, _file), stat.S_IWRITE) + self._stage_path.cleanup() def join_the_dark_side(self): '''Allow nasty changes if required''' pass def join_the_good_side(self): '''Disallow nasty changes if required''' pass @property def tags(self): return [] @property def branches(self): return [] @property def working_dir(self): return self._stage_path.name def push(self): pass