Page Menu
Home
c4science
Search
Configure Global Search
Log In
Files
F60914031
svn.py
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Subscribers
None
File Metadata
Details
File Info
Storage
Attached
Created
Fri, May 3, 08:38
Size
5 KB
Mime Type
text/x-python
Expires
Sun, May 5, 08:38 (2 d)
Engine
blob
Format
Raw Data
Handle
17439129
Attached To
rPHAPI Phabricator API scripts
svn.py
View Options
import
re
import
svn.remote
import
svn.common
import
logging
import
subprocess
from
...
import
dry_do
from
...
import
colored
from
...repo
import
RepoQuery
from
...utils
import
get_password
__author__
=
"Nicolas Richart"
__copyright__
=
"Copyright (C) 2016, EPFL (Ecole Polytechnique Fédérale "
\
"de Lausanne) - SCITAS (Scientific IT and Application "
\
"Support)"
__credits__
=
[
"Nicolas Richart"
]
__license__
=
"BSD"
__version__
=
"0.1"
__maintainer__
=
"Nicolas Richart"
__email__
=
"nicolas.richart@epfl.ch"
_logger
=
logging
.
getLogger
(
__name__
)
class
RepoSvnSync
(
RepoQuery
):
"""This class handles the common part on svn repositories, checkout,
retreiving tags/branches
"""
def
__enter__
(
self
):
super
()
.
__enter__
()
_logger
.
info
(
'Getting repo {0} [{1}] from svn server'
.
format
(
self
.
_in_repo
.
color_name
(
self
.
_name
),
self
.
_url
))
if
re
.
match
(
'^https://'
,
self
.
_url
):
self
.
_need_auth_source
=
True
else
:
self
.
_need_auth_source
=
False
_auth
=
{}
if
self
.
_need_auth_source
:
_auth
[
'username'
]
=
self
.
_username
_auth
[
'password'
]
=
get_password
(
self
.
_in_repo
.
backend_name
,
self
.
_in_repo
.
username
,
keyring
=
self
.
_keyring
)
if
self
.
_trust_cert
:
_auth
[
'trust_cert'
]
=
True
self
.
_svn_client
=
svn
.
remote
.
RemoteClient
(
self
.
_url
,
**
_auth
)
return
self
def
clone
(
self
):
pass
def
_list_path
(
self
,
path
):
try
:
_list
=
self
.
_svn_client
.
list
(
rel_path
=
path
)
except
svn
.
common
.
SvnException
:
_msg
=
'Failed to list path {1} from {0}'
\
.
format
(
self
.
_in_repo
.
color_name
(
self
.
_name
),
path
)
_logger
.
error
(
_msg
)
RuntimeError
(
_msg
)
_paths
=
[]
for
_file
in
_list
:
_path
=
_file
.
rstrip
(
'/'
)
_paths
.
append
(
_path
)
return
_paths
@property
def
branches
(
self
):
if
'branches'
not
in
self
.
_list_path
(
'.'
):
return
[]
_branches
=
self
.
_list_path
(
'branches'
)
return
_branches
@property
def
tags
(
self
):
if
'tags'
not
in
self
.
_list_path
(
'.'
):
return
[]
_tags
=
self
.
_list_path
(
'tags'
)
return
_tags
def
push
(
self
):
_msg
=
'Synchronizing {0} [{1}] with {2} [{3}]'
.
format
(
self
.
_in_repo
.
color_name
(
self
.
_name
),
colored
(
self
.
_in_repo
.
url
,
attrs
=
[
'bold'
]),
self
.
_out_repo
.
color_name
(
self
.
_out_repo
.
_name
),
colored
(
self
.
_out_repo
.
url
,
attrs
=
[
'bold'
]))
_logger
.
info
(
_msg
)
if
self
.
_dry_run
:
dry_do
(
_msg
)
return
self
.
_out_repo
.
join_the_dark_side
()
_auth_args
=
[
'--no-auth-cache'
]
if
self
.
_need_auth_source
:
_auth_args
=
[
'--source-username'
,
self
.
_username
,
'--source-password'
,
get_password
(
self
.
_in_repo
.
backend_name
,
self
.
_in_repo
.
username
,
keyring
=
self
.
_keyring
)]
if
self
.
_trust_cert
:
_auth_args
.
extend
([
'--source-trust-server-cert-failures'
,
'unknown-ca'
])
_outputs
=
'.svnsync_{stage}_{repo}.{type}'
_outputs_dict
=
{
'stage'
:
'init'
,
'repo'
:
self
.
_in_repo
.
_name
}
with
open
(
_outputs
.
format
(
type
=
'out'
,
**
_outputs_dict
),
"w"
)
as
out
,
\
open
(
_outputs
.
format
(
type
=
'err'
,
**
_outputs_dict
),
"w"
)
as
err
:
_init_args
=
[
'svnsync'
,
'init'
,
'--non-interactive'
]
_init_args
.
extend
(
_auth_args
)
_init_args
.
extend
([
self
.
_out_repo
.
url
,
self
.
_in_repo
.
url
])
_svn_sync_init
=
subprocess
.
Popen
(
_init_args
,
stdout
=
out
,
stderr
=
err
)
_ret_code
=
_svn_sync_init
.
wait
()
if
_ret_code
!=
0
:
_msg
=
'Failed to initialize the svn synchronization of {0} (Code {1})'
\
.
format
(
self
.
_in_repo
.
color_name
(
self
.
_name
),
_ret_code
)
_logger
.
error
(
_msg
)
raise
RuntimeError
(
_msg
)
_outputs_dict
[
'stage'
]
=
'sync'
with
open
(
_outputs
.
format
(
type
=
'out'
,
**
_outputs_dict
),
"w"
)
as
out
,
\
open
(
_outputs
.
format
(
type
=
'err'
,
**
_outputs_dict
),
"w"
)
as
err
:
_sync_args
=
[
'svnsync'
,
'sync'
,
'--non-interactive'
]
_sync_args
.
extend
(
_auth_args
)
_sync_args
.
append
(
self
.
_out_repo
.
url
)
_svn_sync
=
subprocess
.
Popen
(
_sync_args
,
stdout
=
out
,
stderr
=
err
)
_ret_code
=
_svn_sync
.
wait
()
if
_ret_code
!=
0
:
_msg
=
'Failed to synchronize {0}'
.
format
(
self
.
_in_repo
.
color_name
(
self
.
_name
))
_logger
.
error
(
_msg
)
raise
RuntimeError
(
_msg
)
def
__exit__
(
self
,
*
args
,
**
kwargs
):
self
.
_out_repo
.
join_the_good_side
()
super
()
.
__exit__
(
*
args
,
**
kwargs
)
Event Timeline
Log In to Comment