Page Menu
Home
c4science
Search
Configure Global Search
Log In
Files
F84296239
repo.py
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Subscribers
None
File Metadata
Details
File Info
Storage
Attached
Created
Sat, Sep 21, 22:52
Size
6 KB
Mime Type
text/x-python
Expires
Mon, Sep 23, 22:52 (2 d)
Engine
blob
Format
Raw Data
Handle
20982577
Attached To
rPHAPI Phabricator API scripts
repo.py
View Options
# -*- coding: utf-8 -*-
import
copy
import
logging
import
os
import
tempfile
from
.
import
export
from
.
import
colored
from
.
import
color_code
from
.backends
import
_get_class
from
.directory
import
Directory
__author__
=
"Nicolas Richart"
__copyright__
=
"Copyright (C) 2016, EPFL (Ecole Polytechnique Fédérale "
\
"de Lausanne) - SCITAS (Scientific IT and Application "
\
"Support)"
__credits__
=
[
"Nicolas Richart"
]
__license__
=
"BSD"
__version__
=
"0.1"
__maintainer__
=
"Nicolas Richart"
__email__
=
"nicolas.richart@epfl.ch"
_logger
=
logging
.
getLogger
(
__name__
)
@export
class
Repo
(
object
):
'''Interface class to define for your backend'''
VIEW
=
0x1
PUSH
=
0x2
EDIT
=
0x4
_repo_backends
=
dict
()
def
__new__
(
cls
,
*
args
,
**
kwargs
):
"""
Factory constructor depending on the chosen backend
"""
option
=
copy
.
copy
(
kwargs
)
backend
=
option
.
pop
(
'backend'
,
None
)
repo_type
=
option
.
pop
(
'type'
,
'git'
)
_class
=
_get_class
(
repo_type
,
backend
)
return
super
(
Repo
,
cls
)
.
__new__
(
_class
)
def
__init__
(
self
,
name
,
*
args
,
**
kwargs
):
self
.
_name
=
name
self
.
_colored_name
=
self
.
color_name
(
name
)
options
=
copy
.
copy
(
kwargs
)
self
.
_username
=
options
.
pop
(
'username'
,
None
)
self
.
_type
=
options
.
pop
(
'type'
,
None
)
self
.
_dry_run
=
options
.
pop
(
"dry_run"
,
False
)
self
.
_backend_name
=
options
.
pop
(
"backend"
,
None
)
self
.
_directory
=
options
.
pop
(
'directory'
,
None
)
if
self
.
_directory
is
None
:
self
.
_directory
=
Directory
(
type
=
'directory'
,
backend
=
self
.
_backend_name
,
username
=
self
.
_username
,
dry_run
=
self
.
_dry_run
,
**
options
)
def
enable
(
self
):
pass
@property
def
backend_name
(
self
):
return
self
.
_backend_name
@property
def
repo_type
(
self
):
return
self
.
_type
def
color_name
(
self
,
name
):
return
colored
(
name
,
color_code
[
'repo'
],
attrs
=
[
'bold'
])
@property
def
directory
(
self
):
return
self
.
_directory
class
Permissions
(
object
):
def
__init__
(
self
,
repo
):
self
.
__groups
=
[]
self
.
__users
=
[]
self
.
__anonymous
=
False
self
.
__repo
=
repo
@property
def
groups
(
self
):
return
self
.
__groups
@groups.setter
def
groups
(
self
,
groups_perms
):
self
.
__groups
=
copy
.
copy
(
groups_perms
)
def
add_group
(
self
,
group_id
,
perm
):
self
.
__groups
.
append
({
'id'
:
group_id
,
'perm'
:
perm
})
def
add_user
(
self
,
user_id
,
perm
):
self
.
__users
.
append
({
'id'
:
user_id
,
'perm'
:
perm
})
def
remove_permission
(
self
,
perm
):
_lists
=
{
'group'
:
self
.
__groups
,
'user'
:
self
.
__users
}
for
_type
in
_lists
.
keys
():
for
_entity
in
_lists
[
_type
]:
_entity
[
'perm'
]
=
\
_entity
[
'perm'
]
^
(
_entity
[
'perm'
]
&
perm
)
def
user_perm
(
self
,
_id
):
for
_user
in
self
.
__users
:
if
_user
[
'id'
]
==
_id
:
return
_user
[
'perm'
]
return
0
@property
def
users
(
self
):
return
self
.
__users
@users.setter
def
users
(
self
,
users_perms
):
self
.
__users
=
copy
.
copy
(
users_perms
)
@property
def
anonymous
(
self
):
return
self
.
__anonymous
@anonymous.setter
def
anonymous
(
self
,
anonymous
):
self
.
__anonymous
=
anonymous
@property
def
all_users
(
self
):
_users
=
[
u
[
'id'
]
for
u
in
self
.
__users
]
_directory
=
self
.
__repo
.
directory
for
g
in
self
.
_groups
:
_users
.
extend
(
_directory
.
get_users_from_group
(
g
[
'id'
]))
return
set
(
_users
)
def
__repr__
(
self
):
return
'<groups: {0}, users: {1}, anonymous: {2}>'
.
format
(
self
.
__groups
,
self
.
__users
,
self
.
__anonymous
)
def
set_permissions
(
self
,
permissions
):
pass
@property
def
permissions
(
self
):
'''
Returns a dictionary of permissions of the form:
{'groups': [{'id': id, 'perm': perm, ...}, ...],
'users': [{'id': id, 'perm': perm, ...}, ...],
'anonymous': True/False}
perm should be read, write, admin, or None
'''
return
self
.
Permissions
(
self
)
@property
def
name
(
self
):
return
self
.
_name
@property
def
url
(
self
):
return
self
.
_url
@property
def
username
(
self
):
return
self
.
_username
def
get_query
(
self
):
if
self
.
_type
==
'git'
:
from
.repo_backends
import
RepoGit
return
RepoGit
(
self
)
else
:
raise
RuntimeError
(
'No backend for
\'
{0}
\'
implemented yet'
.
format
(
self
.
_type
))
class
RepoQuery
(
object
):
class
debug_mktemp
:
def
__init__
(
self
,
name
):
self
.
_path
=
os
.
path
.
join
(
os
.
environ
[
'TMPDIR'
],
'getmystuph'
,
name
)
try
:
os
.
makedirs
(
self
.
_path
)
except
FileExistsError
:
pass
@property
def
name
(
self
):
return
self
.
_path
def
cleanup
(
self
):
pass
def
__init__
(
self
,
repo
,
**
kwargs
):
self
.
_in_repo
=
repo
self
.
_name
=
repo
.
name
self
.
_url
=
repo
.
url
self
.
_username
=
repo
.
username
self
.
_dry_run
=
kwargs
.
pop
(
'dry_run'
,
False
)
self
.
_keyring
=
kwargs
.
pop
(
'keyring'
,
None
)
self
.
_out_repo
=
kwargs
.
pop
(
'out_repo'
,
None
)
self
.
_user_db
=
kwargs
.
pop
(
'user_db'
,
None
)
self
.
_stage_path
=
None
def
__enter__
(
self
):
if
self
.
_stage_path
is
None
:
self
.
_create_stage
()
return
self
def
_create_stage
(
self
):
self
.
_stage_path
=
tempfile
.
TemporaryDirectory
(
prefix
=
self
.
_name
+
'-'
)
#self._stage_path = RepoQuery.debug_mktemp(self._name)
_logger
.
debug
(
'Creating stage folder {0} for repo {1}'
.
format
(
colored
(
self
.
working_dir
,
attrs
=
[
'bold'
]),
self
.
_in_repo
.
color_name
(
self
.
_name
)))
def
__exit__
(
self
,
*
arg
,
**
kwargs
):
_logger
.
debug
(
'Cleaning staged folder {0}'
.
format
(
colored
(
self
.
working_dir
,
attrs
=
[
'bold'
])))
self
.
_stage_path
.
cleanup
()
def
add_remote
(
self
,
out_repo
):
pass
@property
def
tags
(
self
):
return
[]
@property
def
branches
(
self
):
return
[]
@property
def
working_dir
(
self
):
return
self
.
_stage_path
.
name
Event Timeline
Log In to Comment