Page MenuHomec4science

getmystuph_in_phab.py
No OneTemporary

File Metadata

Created
Sat, Apr 20, 04:46

getmystuph_in_phab.py

#!/usr/bin/env python3
import os
import argparse
import yaml
import keyring
import collections
import getmystuph
from getmystuph.backends import _get_class
import getmystuph.importers as getimp
from getmystuph.repo import Repo
import logging
import logging.config
_logger = logging.getLogger('getmystuph').getChild('main')
class PhabImporter(object):
"""
Parses a YAML configuration file:
"""
_config = None
def __init__(self, args):
self._config = yaml.load(args.config)
self._dry_run = args.dry_run
_logger.debug('ConfigurationParser: {0}'.format(self._config))
def check_section(sect, config):
if sect not in config:
_msg = 'The configuration does not contain ' \
'a \'{0}\' section'.format(sect)
_logger.error(_msg)
raise RuntimeError(_msg)
check_section('global', self._config)
check_section('in', self._config['global'])
check_section('backend', self._config['global']['in'])
check_section('out', self._config['global'])
check_section('backend', self._config['global']['out'])
self._in_backend = self._config['global']['in']['backend']
self._out_backend = self._config['global']['out']['backend']
self._keyring = None
if 'use_keyring' in self._config['global'] and \
self._config['global']['use_keyring']:
self._keyring = keyring.get_keyring()
self._imported_groups = {}
self._imported_repository = []
self._in_directory = \
getmystuph.Directory(dry_run=self._dry_run,
keyring=self._keyring,
**self._config['global']['in'])
self._config['global']['in']['directory'] = self._in_directory
self._out_directory = \
getmystuph.Directory(dry_run=self._dry_run,
keyring=self._keyring,
**self._config['global']['out'])
self._config['global']['out']['directory'] = self._out_directory
self._users_db = getimp.ImportUserDB(
self._in_directory, self._out_directory,
cache_file=self._config['global'].pop('user_db_cache', None),
keyring=self._keyring,
dry_run=self._dry_run)
self._trust_cert = self._config['global'].pop('trust_cert', False)
def __get_full_info(self, info, _type):
if info is None:
info = {}
def _merge_maps(_map, _default):
for key, value in _default.items():
if key not in _map:
_map[key] = value
elif type(_map[key]) is dict:
_map[key] = _merge_maps(_map[key], value)
return _map
if '__all__' in self._config[_type]:
_global_conf = self._config[_type]['__all__']
_merge_maps(info, _global_conf)
return info
def import_all(self):
methods = collections.OrderedDict(
[('groups', 'GroupImporter'),
('repositories', 'RepoImporter')])
if 'groups' in self._config and \
'__all__' in self._config['groups']:
_group_importer_config = self._config['groups']
else:
_group_importer_config = {'__all__': {}}
self._users_db.default_importer = _group_importer_config
for _type in methods.keys():
if _type in self._config:
all_info = self._config[_type]
if type(all_info) == list:
all_info = {key: {} for key in all_info}
for name, info in all_info.items():
if name == '__all__':
continue
try:
info = self.__get_full_info(info, _type)
importer = getattr(getimp, methods[_type])(
name, info,
backend_in=self._config['global']['in'],
backend_out=self._config['global']['out'],
user_db=self._users_db,
keyring=self._keyring,
dry_run=self._dry_run)
importer.transfer(name)
except RuntimeError as _e:
_logger.error('An error occurred while trying to'
' import the {0} {1}: {2}'.format(
_type, name, _e))
# mark it imported even if it fails to not import it again
# in all after
self._imported_repository.append(name)
if 'all' in self._config['global']:
if type(self._config['global']['all']) is not list:
self._config['global']['all'] = [self._config['global']['all']]
info = {}
if 'repositories' in self._config and \
'__all__' in self._config['repositories']:
info = self._config['repositories']['__all__']
for _type in self._config['global']['all']:
_repo_list, _extra_info = self.list_repositories(_type)
info['type'] = _type
for _repo in _repo_list:
if _repo in self._imported_repository:
_logger.info(
'Repository {0} already imported'.format(_repo))
continue
if 'perm' not in _extra_info[_repo]:
continue
if not (_extra_info[_repo]['perm'] & Repo.EDIT):
_logger.info(
'Not sufficient permissions to import {0}'.format(
_repo))
continue
importer = getattr(getimp, methods['repositories'])(
'__all__', info,
type=_type,
backend_in=self._config['global']['in'],
backend_out=self._config['global']['out'],
user_db=self._users_db,
keyring=self._keyring,
dry_run=self._dry_run)
importer.transfer(_repo)
self._users_db.save()
def list_repositories(self, _type):
_class = _get_class(_type, self._in_backend)
return _class.list_repositories(
list_perm=True,
user_db=self._users_db,
keyring=self._keyring,
dry_run=self._dry_run,
**self._config['global']['in'])
# Set up cli arguments
parser = argparse.ArgumentParser(
description='Import projects into c4science'
)
parser.add_argument(
'--config',
help='configuration file (YAML)',
type=argparse.FileType(),
required=True
)
parser.add_argument(
'--list',
help='List repositories',
action='store_true'
)
parser.add_argument(
'--dry-run',
help='Do not do the creations',
action='store_true'
)
parser.add_argument(
'-v', '--verbose',
help='Make the tool verbose',
action='store_true'
)
parser.add_argument(
'-d', '--debug',
help='Switch output to debug level',
action='store_true'
)
args = parser.parse_args()
_src_dir = os.path.dirname(os.path.realpath(__file__))
if args.debug:
with open(os.path.join(_src_dir,
'.logging-debug.conf'), 'r') as fh:
config = yaml.load(fh)
logging.config.dictConfig(config)
elif args.verbose:
with open(os.path.join(_src_dir,
'.logging.conf'), 'r') as fh:
config = yaml.load(fh)
logging.config.dictConfig(config)
else:
logging.basicConfig(level=logging.ERROR)
imp = PhabImporter(args)
try:
if args.list:
imp.list_repositories('git')
imp.list_repositories('svn')
else:
imp.import_all()
except KeyboardInterrupt:
pass
# LocalWords: keyring

Event Timeline