Page Menu
Home
c4science
Search
Configure Global Search
Log In
Files
F63082940
repo.py
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Subscribers
None
File Metadata
Details
File Info
Storage
Attached
Created
Fri, May 17, 16:11
Size
7 KB
Mime Type
text/x-python
Expires
Sun, May 19, 16:11 (1 d, 23 h)
Engine
blob
Format
Raw Data
Handle
17690789
Attached To
rPHAPI Phabricator API scripts
repo.py
View Options
# -*- coding: utf-8 -*-
import
copy
import
logging
import
os
,
stat
import
tempfile
import
platform
from
.
import
export
from
.
import
colored
from
.
import
color_code
from
.backends
import
_get_class
from
.directory
import
Directory
__author__
=
"Nicolas Richart"
__copyright__
=
"Copyright (C) 2016, EPFL (Ecole Polytechnique Fédérale "
\
"de Lausanne) - SCITAS (Scientific IT and Application "
\
"Support)"
__credits__
=
[
"Nicolas Richart"
]
__license__
=
"BSD"
__version__
=
"0.1"
__maintainer__
=
"Nicolas Richart"
__email__
=
"nicolas.richart@epfl.ch"
_logger
=
logging
.
getLogger
(
__name__
)
@export
class
Repo
(
object
):
'''Interface class to define for your backend'''
VIEW
=
0x1
PUSH
=
0x2
EDIT
=
0x4
_repo_backends
=
dict
()
def
__new__
(
cls
,
*
args
,
**
kwargs
):
"""
Factory constructor depending on the chosen backend
"""
option
=
copy
.
copy
(
kwargs
)
backend
=
option
.
pop
(
'backend'
,
None
)
repo_type
=
option
.
pop
(
'type'
,
'git'
)
_class
=
_get_class
(
repo_type
,
backend
)
return
super
(
Repo
,
cls
)
.
__new__
(
_class
)
def
__init__
(
self
,
name
,
*
args
,
**
kwargs
):
self
.
_name
=
name
self
.
_colored_name
=
self
.
color_name
(
name
)
options
=
copy
.
copy
(
kwargs
)
self
.
_username
=
options
.
pop
(
'username'
,
None
)
self
.
_type
=
options
.
pop
(
'type'
,
None
)
self
.
_dry_run
=
options
.
pop
(
"dry_run"
,
False
)
self
.
_backend_name
=
options
.
pop
(
"backend"
,
None
)
self
.
_trust_cert
=
options
.
pop
(
'trust_cert'
,
False
)
self
.
_user_db
=
options
.
pop
(
'user_db'
,
None
)
self
.
_directory
=
options
.
pop
(
'directory'
,
None
)
if
self
.
_directory
is
None
:
self
.
_directory
=
Directory
(
type
=
'directory'
,
backend
=
self
.
_backend_name
,
username
=
self
.
_username
,
dry_run
=
self
.
_dry_run
,
**
options
)
def
enable
(
self
):
pass
@property
def
backend_name
(
self
):
return
self
.
_backend_name
@property
def
repo_type
(
self
):
return
self
.
_type
def
color_name
(
self
,
name
):
return
colored
(
name
,
color_code
[
'repo'
],
attrs
=
[
'bold'
])
@property
def
directory
(
self
):
return
self
.
_directory
class
Permissions
(
object
):
def
__init__
(
self
,
repo
):
self
.
__groups
=
[]
self
.
__users
=
[]
self
.
__anonymous
=
False
self
.
__repo
=
repo
@property
def
groups
(
self
):
return
self
.
__groups
@groups.setter
def
groups
(
self
,
groups_perms
):
self
.
__groups
=
copy
.
copy
(
groups_perms
)
def
add_group
(
self
,
group_id
,
perm
):
self
.
__groups
.
append
({
'id'
:
group_id
,
'perm'
:
perm
})
def
add_user
(
self
,
user_id
,
perm
):
self
.
__users
.
append
({
'id'
:
user_id
,
'perm'
:
perm
})
def
remove_permission
(
self
,
perm
):
_lists
=
{
'group'
:
self
.
__groups
,
'user'
:
self
.
__users
}
for
_type
in
_lists
.
keys
():
for
_entity
in
_lists
[
_type
]:
_entity
[
'perm'
]
=
\
_entity
[
'perm'
]
^
(
_entity
[
'perm'
]
&
perm
)
def
user_perm
(
self
,
_id
):
for
_user
in
self
.
__users
:
if
_user
[
'id'
]
==
_id
:
return
_user
[
'perm'
]
return
0
@property
def
users
(
self
):
return
self
.
__users
@users.setter
def
users
(
self
,
users_perms
):
self
.
__users
=
copy
.
copy
(
users_perms
)
@property
def
anonymous
(
self
):
return
self
.
__anonymous
@anonymous.setter
def
anonymous
(
self
,
anonymous
):
self
.
__anonymous
=
anonymous
def
all_users
(
self
,
perm_filter
=
0
):
_users
=
[]
for
u
in
self
.
__users
:
if
(
perm_filter
==
0
)
or
(
u
[
'perm'
]
&
perm_filter
):
_users
.
append
(
u
[
'id'
])
_directory
=
self
.
__repo
.
directory
for
g
in
self
.
__groups
:
if
(
perm_filter
==
0
)
or
(
g
[
'perm'
]
&
perm_filter
):
_users
.
extend
(
_directory
.
get_users_from_group
(
g
[
'id'
]))
return
set
(
_users
)
def
__repr__
(
self
):
return
'<groups: {0}, users: {1}, anonymous: {2}>'
.
format
(
self
.
__groups
,
self
.
__users
,
self
.
__anonymous
)
def
set_permissions
(
self
,
permissions
):
pass
@property
def
permissions
(
self
):
'''
Returns a dictionary of permissions of the form:
{'groups': [{'id': id, 'perm': perm, ...}, ...],
'users': [{'id': id, 'perm': perm, ...}, ...],
'anonymous': True/False}
perm should be read, write, admin, or None
'''
return
self
.
Permissions
(
self
)
@property
def
name
(
self
):
return
self
.
_name
@property
def
url
(
self
):
return
self
.
_url
@property
def
username
(
self
):
return
self
.
_username
def
get_query
(
self
):
if
self
.
_type
==
'git'
:
from
.repo_backends
import
RepoGit
return
RepoGit
(
self
)
else
:
raise
RuntimeError
(
'No backend for
\'
{0}
\'
implemented yet'
.
format
(
self
.
_type
))
class
RepoQuery
(
object
):
class
debug_mktemp
:
def
__init__
(
self
,
name
):
self
.
_path
=
os
.
path
.
join
(
os
.
environ
[
'TMPDIR'
],
'getmystuph'
,
name
)
try
:
os
.
makedirs
(
self
.
_path
)
except
FileExistsError
:
pass
@property
def
name
(
self
):
return
self
.
_path
def
cleanup
(
self
):
pass
def
__init__
(
self
,
repo
,
**
kwargs
):
self
.
_in_repo
=
repo
self
.
_name
=
repo
.
name
self
.
_url
=
repo
.
url
self
.
_username
=
repo
.
username
self
.
_dry_run
=
kwargs
.
pop
(
'dry_run'
,
False
)
self
.
_keyring
=
kwargs
.
pop
(
'keyring'
,
None
)
self
.
_out_repo
=
kwargs
.
pop
(
'out_repo'
,
None
)
self
.
_user_db
=
kwargs
.
pop
(
'user_db'
,
None
)
self
.
_trust_cert
=
kwargs
.
pop
(
'trust_cert'
,
False
)
self
.
_stage_path
=
None
def
__enter__
(
self
):
if
self
.
_stage_path
is
None
:
self
.
_create_stage
()
return
self
def
_create_stage
(
self
):
self
.
_stage_path
=
tempfile
.
TemporaryDirectory
(
prefix
=
self
.
_name
+
'-'
)
# self._stage_path = RepoQuery.debug_mktemp(self._name)
_logger
.
debug
(
'Creating stage folder {0} for repo {1}'
.
format
(
colored
(
self
.
working_dir
,
attrs
=
[
'bold'
]),
self
.
_in_repo
.
color_name
(
self
.
_name
)))
def
__exit__
(
self
,
*
arg
,
**
kwargs
):
_logger
.
debug
(
'Cleaning staged folder {0}'
.
format
(
colored
(
self
.
working_dir
,
attrs
=
[
'bold'
])))
if
platform
.
system
()
==
'Windows'
:
# Fix TemporaryDirectory permissions on Windows
for
root
,
dirs
,
files
in
os
.
walk
(
self
.
_stage_path
.
name
):
for
_dir
in
dirs
:
os
.
chmod
(
os
.
path
.
join
(
root
,
_dir
),
stat
.
S_IWRITE
)
for
_file
in
files
:
os
.
chmod
(
os
.
path
.
join
(
root
,
_file
),
stat
.
S_IWRITE
)
self
.
_stage_path
.
cleanup
()
def
join_the_dark_side
(
self
):
'''Allow nasty changes if required'''
pass
def
join_the_good_side
(
self
):
'''Disallow nasty changes if required'''
pass
@property
def
tags
(
self
):
return
[]
@property
def
branches
(
self
):
return
[]
@property
def
working_dir
(
self
):
return
self
.
_stage_path
.
name
def
push
(
self
):
pass
Event Timeline
Log In to Comment