Page MenuHomec4science

base_zeo.py
No OneTemporary

File Metadata

Created
Sun, Jun 9, 00:21

base_zeo.py

#!/usr/bin/env python3
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
################################################################
from . import bdparser
from . import bdlogging
from . import base
from . import conffile_zeo
from . import zeoobject
from . import lowercase_btree
from .constraints_zeo import ZEOconstraints
################################################################
import re
import os
import subprocess
import ZEO
import ZODB
import sys
from BTrees.OOBTree import OOSet
from . import job
from . import run_zeo
import transaction
################################################################
__all__ = ["BaseZEO"]
print = bdlogging.invalidPrint
logger = bdlogging.getLogger(__name__)
BTree = lowercase_btree.LowerCaseBTree
################################################################
def _transaction(foo):
def _protected_transaction(*args, **kwargs):
for attempt in transaction.manager.attempts():
with attempt:
foo(*args, **kwargs)
return _protected_transaction
class BaseZEO(base.AbstractBase):
"""
"""
singleton_base = None
def __init__(self, truerun=False, creation=False, read_only=False, **kwargs):
BaseZEO.singleton_base = self
self.Job = job.JobZEO
self.Run = run_zeo.RunZEO
self.ConfFile = conffile_zeo.ConfFile
self.BDconstraints = ZEOconstraints
zeo_params = ["host"]
connection_params = bdparser.filterParams(zeo_params, kwargs)
logger.info('connection arguments: {0}'.format(connection_params))
self.filename = connection_params['host']
filename_split = self.filename.split('://')
if filename_split[0] != 'zeo':
raise RuntimeError(
f"wrong protocol with this database: {type(self)}")
self.filename = filename_split[1]
# logger.error(self.filename)
dirname = os.path.dirname(self.filename)
# logger.error(dirname)
socket_name = os.path.join(dirname, 'zeo.socket')
if not os.path.exists(socket_name):
self.process = subprocess.Popen(
f"runzeo -f {self.filename} -a {socket_name}", shell=True,
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
try:
self.connection = ZEO.connection(
socket_name, read_only=read_only, server_sync=True)
self.root = self.connection.root
logger.debug('connected to base')
except Exception as e:
logger.error(
"Connection failed: check your connection settings:\n" +
str(e))
sys.exit(-1)
assert(isinstance(self.connection, ZODB.Connection.Connection))
self.dbhost = (kwargs["host"]
if "host" in kwargs.keys()
else "localhost")
super().__init__(connection=self.connection, truerun=truerun,
creation=creation, **kwargs)
def getSchemaList(self, filter_names=True):
try:
schemas = self.root.schemas
except AttributeError:
self.root.schemas = BTree(key_string='study_')
schemas = self.root.schemas
filtered_schemas = []
if filter_names is True:
for s in schemas:
m = re.match('{0}_(.+)'.format(self.user), s)
if m:
s = m.group(1)
filtered_schemas.append(s)
else:
filtered_schemas = schemas
return filtered_schemas
def getStudySize(self, study):
curs = self.connection.cursor()
try:
logger.info(study)
curs.execute("""
select sz from (SELECT SUM(pg_total_relation_size(quote_ident(schemaname)
|| '.' || quote_ident(tablename)))::BIGINT
FROM pg_tables WHERE schemaname = '{0}') as sz
""".format(study))
size = curs.fetchone()[0]
curs.execute("""
select pg_size_pretty(cast({0} as bigint))
""".format(size))
size = curs.fetchone()[0]
curs.execute("""
select count({0}.runs.id) from {0}.runs
""".format(study))
nruns = curs.fetchone()[0]
curs.execute("""
select count({0}.jobs.id) from {0}.jobs
""".format(study))
njobs = curs.fetchone()[0]
except psycopg2.ProgrammingError:
self.connection.rollback()
size = '????'
return {'size': size, 'nruns': nruns, 'njobs': njobs}
def createSchema(self, params={"yes": False}):
# create the schema of the simulation
if not hasattr(self.root, 'schemas'):
self.root.schemas = BTree(key_string='study_')
if self.schema in self.root.schemas:
validated = bdparser.validate_question(
"Are you sure you want to drop the schema named '" +
self.schema + "'", params, False)
if validated is True:
# logger.error(self.root.schemas[self.schema])
# logger.error(type(self.root.schemas[self.schema]))
del self.root.schemas[self.schema]
else:
logger.debug("creation canceled: exit program")
sys.exit(-1)
self.root.schemas[self.schema] = BTree()
self.root.schemas[self.schema]['Quantities'] = OOSet()
self.root.schemas[self.schema]['Jobs'] = BTree(key_string='job_')
self.root.schemas[self.schema]['Runs'] = BTree(key_string='run_')
self.root.schemas[self.schema]['ConfFiles'] = BTree(key_string='file_')
self.root.schemas[self.schema]['Jobs_counter'] = 0
self.root.schemas[self.schema]['Runs_counter'] = 0
def prepare(self, obj, descriptor):
if not hasattr(self.root, 'schemas'):
return
if descriptor in self.root.schemas[self.schema]:
desc = self.root.schemas[self.schema][descriptor]
for t in desc.types.keys():
obj.types[t] = desc.types[t]
if t not in obj:
obj.t = None
def createBase(self, job_desc, run_desc, quantities={}, **kwargs):
self.createSchema(kwargs)
self.root.schemas[self.schema]['job_desc'] = job_desc
self.root.schemas[self.schema]['run_desc'] = run_desc
for qname, type in quantities.items():
self.pushQuantity(qname, type)
if self.truerun:
self.commit()
def _get_jobs(self):
return self.root.schemas[self.schema]['Jobs']
@property
def quantities(self):
return self.root.schemas[self.schema]['Quantities']
@quantities.setter
def quantities(self, value):
self.root.schemas[self.schema]['Quantities'] = value
@property
def jobs_counter(self):
return self.root.schemas[self.schema]['Jobs_counter']
@jobs_counter.setter
def jobs_counter(self, val):
self.root.schemas[self.schema]['Jobs_counter'] = val
def _get_runs(self):
return self.root.schemas[self.schema]['Runs']
@property
def runs_counter(self):
return self.root.schemas[self.schema]['Runs_counter']
@runs_counter.setter
def runs_counter(self, val):
self.root.schemas[self.schema]['Runs_counter'] = val
def _get_conffiles(self):
return self.root.schemas[self.schema]['ConfFiles']
def select(self, _types, constraints=None, sort_by=None):
if not isinstance(_types, list):
_types = [_types]
_type = _types[0]
if isinstance(_type, zeoobject.ZEOObject):
_type = type(_type)
if _type == self.Job:
obj_container = self._get_jobs()
elif _type == self.Run:
obj_container = self._get_runs()
elif _type == self.ConfFile:
obj_container = self._get_conffiles()
else:
raise RuntimeError(f'{type(_types)}')
if (sort_by is not None) and (not isinstance(sort_by, str)):
raise RuntimeError(
'sort_by argument is not correct: {0}'.format(sort_by))
# logger.error(_type)
# logger.error(type(constraints))
# logger.error(constraints)
const = ZEOconstraints(self, constraints)
condition = const.getMatchingCondition()
obj_list = []
for key, obj in obj_container.items():
obj.base = self
objs = [obj]
# logger.error(type(obj))
if _type == self.Run:
j = self._get_jobs()[obj.job_id]
j.base = self
objs.append(j)
if condition(objs):
# logger.error(key)
# logger.error(obj)
# logger.error(_type)
if len(objs) == 1:
obj_list.append(objs[0])
else:
obj_list.append(objs)
return obj_list
@_transaction
def insert(self, zeoobject):
if isinstance(zeoobject, self.Job):
objs = self._get_jobs()
zeoobject.id = self.jobs_counter
self.jobs_counter += 1
elif isinstance(zeoobject, self.Run):
objs = self._get_runs()
zeoobject = zeoobject.copy()
zeoobject["id"] = self.runs_counter
zeoobject["state"] = 'CREATED'
self.runs_counter += 1
elif isinstance(zeoobject, self.ConfFile):
objs = self._get_conffiles()
# logger.error(f'inserting {zeoobject.id} {zeoobject["id"]}')
# logger.error(zeoobject.id)
# logger.error(zeoobject.entries['id'])
# logger.error(zeoobject)
objs[zeoobject.id] = zeoobject.copy()
# logger.error(f'inserted {zeoobject.id} {objs[zeoobject.id]}')
def setObjectItemTypes(self, zeoobject):
if isinstance(zeoobject, self.Job):
zeoobject.types = self.root.schemas[self.schema]['job_desc'].types
elif isinstance(zeoobject, self.Run):
zeoobject.types = self.root.schemas[self.schema]['run_desc'].types
else:
raise RuntimeError(f'{type(zeoobject)}')
def commit(self):
import transaction
transaction.commit()
################################################################

Event Timeline