diff --git a/invenio/modules/workflows/engine.py b/invenio/modules/workflows/engine.py index 9a95a4e8e..b1b49c573 100644 --- a/invenio/modules/workflows/engine.py +++ b/invenio/modules/workflows/engine.py @@ -1,462 +1,467 @@ # -*- coding: utf-8 -*- ## This file is part of Invenio. ## Copyright (C) 2012, 2013, 2014 CERN. ## ## Invenio is free software; you can redistribute it and/or ## modify it under the terms of the GNU General Public License as ## published by the Free Software Foundation; either version 2 of the ## License, or (at your option) any later version. ## ## Invenio is distributed in the hope that it will be useful, but ## WITHOUT ANY WARRANTY; without even the implied warranty of ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU ## General Public License for more details. ## ## You should have received a copy of the GNU General Public License ## along with Invenio; if not, write to the Free Software Foundation, Inc., ## 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA. from __future__ import absolute_import import sys from six.moves import cPickle from six import iteritems, reraise from uuid import uuid1 as new_uuid import base64 from workflow.engine import (GenericWorkflowEngine, ContinueNextToken, HaltProcessing, StopProcessing, JumpTokenBack, JumpTokenForward, WorkflowError) from invenio.config import CFG_DEVEL_SITE from .models import (Workflow, BibWorkflowObject, BibWorkflowEngineLog, ObjectVersion) -from .utils import (dictproperty, - get_workflow_definition) +from .utils import dictproperty from .logger import (get_logger, BibWorkflowLogHandler) -from .errors import WorkflowHalt +from .errors import (WorkflowHalt, + WorkflowDefinitionError) DEBUG = CFG_DEVEL_SITE > 0 class WorkflowStatus(object): NEW, RUNNING, HALTED, ERROR, COMPLETED = range(5) class BibWorkflowEngine(GenericWorkflowEngine): """ Subclass of GenericWorkflowEngine representing a workflow in the workflows module. Adds a SQLAlchemy database model to save workflow states and workflow data. Overrides key functions in GenericWorkflowEngine to implement logging and certain workarounds for storing data before/after task calls (This part will be revisited in the future). :param name: :param uuid: :param curr_obj: :param workflow_object: :param id_user: :param module_name: :param kwargs: """ def __init__(self, name=None, uuid=None, curr_obj=0, workflow_object=None, id_user=0, module_name="Unknown", **kwargs): super(BibWorkflowEngine, self).__init__() self.db_obj = None if isinstance(workflow_object, Workflow): self.db_obj = workflow_object else: # If uuid is defined we try to get the db object from DB. if uuid is not None: self.db_obj = \ Workflow.get(Workflow.uuid == uuid).first() else: uuid = new_uuid() if self.db_obj is None: self.db_obj = Workflow(name=name, id_user=id_user, current_object=curr_obj, module_name=module_name, uuid=uuid) self.save(status=WorkflowStatus.NEW) if self.db_obj.uuid not in self.log.name: db_handler_obj = BibWorkflowLogHandler(BibWorkflowEngineLog, "uuid") self.log = get_logger(logger_name="workflow.%s" % self.db_obj.uuid, db_handler_obj=db_handler_obj, obj=self) self.set_workflow_by_name(self.db_obj.name) self.set_extra_data_params(**kwargs) def get_extra_data(self): """ Main method to retrieve data saved to the object. """ return cPickle.loads(base64.b64decode(self.db_obj._extra_data)) def set_extra_data(self, value): """ Main method to update data saved to the object. :param value: """ self.db_obj._extra_data = base64.b64encode(cPickle.dumps(value)) def extra_data_get(self, key): if key not in self.db_obj.get_extra_data(): raise KeyError("%s not in extra_data" % (key,)) return self.db_obj.get_extra_data()[key] def extra_data_set(self, key, value): tmp = self.db_obj.get_extra_data() tmp[key] = value self.db_obj.set_extra_data(tmp) extra_data = dictproperty(fget=extra_data_get, fset=extra_data_set, doc="Sets up property") del extra_data_get, extra_data_set @property def counter_object(self): return self.db_obj.counter_object @property def uuid(self): return self.db_obj.uuid @property def id_user(self): return self.db_obj.id_user @property def module_name(self): return self.db_obj.module_name @property def name(self): return self.db_obj.name @property def status(self): return self.db_obj.status def __getstate__(self): if not self._picklable_safe: raise cPickle.PickleError("The instance of the workflow engine " "cannot be serialized, " "because it was constructed with " "custom, user-supplied callbacks. " "Either use PickableWorkflowEngine or " "provide your own __getstate__ method.") state = self.__dict__.copy() del state['log'] return state def __setstate__(self, state): if len(self._objects) < self._i[0]: raise cPickle.PickleError("The workflow instance " "inconsistent state, " "too few objects") db_handler_obj = BibWorkflowLogHandler(BibWorkflowEngineLog, "uuid") state['log'] = get_logger(logger_name="workflow.%s" % state['uuid'], db_handler_obj=db_handler_obj, obj=self) self.__dict__ = state def __repr__(self): return "<BibWorkflow_engine(%s)>" % (self.db_obj.name,) def __str__(self, log=False): return """------------------------------- BibWorkflowEngine ------------------------------- %s ------------------------------- """ % (self.db_obj.__str__(),) @staticmethod def before_processing(objects, self): """ Executed before processing the workflow. """ self.save(status=WorkflowStatus.RUNNING) self.set_counter_initial(len(objects)) GenericWorkflowEngine.before_processing(objects, self) @staticmethod def after_processing(objects, self): self._i = [-1, [0]] if self.has_completed(): self.save(WorkflowStatus.COMPLETED) else: self.save(WorkflowStatus.HALTED) def has_completed(self): """ Returns True of workflow is fully completed meaning that all associated objects are in FINAL state. """ number_of_objects = BibWorkflowObject.query.filter( BibWorkflowObject.id_workflow == self.uuid, BibWorkflowObject.version.in_([ObjectVersion.HALTED, ObjectVersion.RUNNING]) ).count() return number_of_objects == 0 def save(self, status=None): """ Save the workflow instance to database. """ # This workflow continues a previous execution. if status == WorkflowStatus.HALTED: self.db_obj.current_object = 0 self.db_obj.save(status) def process(self, objects): """ :param objects: """ super(BibWorkflowEngine, self).process(objects) def restart(self, obj, task): """Restart the workflow engine after it was deserialized :param task: :param obj: """ self.log.debug("Restarting workflow from %s object and %s task" % (str(obj), str(task),)) # set the point from which to start processing if obj == 'prev': # start with the previous object self._i[0] -= 2 #TODO: check if there is any object there elif obj == 'current': # continue with the current object self._i[0] -= 1 elif obj == 'next': pass else: raise Exception('Unknown start point for object: %s' % obj) # set the task that will be executed first if task == 'prev': # the previous self._i[1][-1] -= 1 elif task == 'current': # restart the task again pass elif task == 'next': # continue with the next task self._i[1][-1] += 1 else: raise Exception('Unknown start pointfor task: %s' % obj) self.process(self._objects) self._unpickled = False @staticmethod def processing_factory(objects, self): """Default processing factory extended with saving objects before succesful processing. Default processing factory, will process objects in order :param objects: list of objects (passed in by self.process()) :keyword cls: engine object itself, because this method may be implemented by the standalone function, we pass the self also as a cls argument As the WFE proceeds, it increments the internal counter, the first position is the number of the element. This pointer increases before the object is taken 2nd pos is reserved for the array that points to the task position. The number there points to the task that is currently executed; when error happens, it will be there unchanged. The pointer is updated after the task finished running. """ self.before_processing(objects, self) i = self._i # negative index not allowed, -1 is special while len(objects) - 1 > i[0] >= -1: i[0] += 1 obj = objects[i[0]] obj.save(version=ObjectVersion.RUNNING, id_workflow=self.db_obj.uuid) callbacks = self.callback_chooser(obj, self) if callbacks: try: self.run_callbacks(callbacks, objects, obj) except StopProcessing: if DEBUG: msg = "Processing was stopped: '%s' (object: %s)" % \ (str(callbacks), repr(obj)) self.log.debug(msg) obj.log.debug(msg) break except JumpTokenBack as step: if step.args[0] > 0: raise WorkflowError("JumpTokenBack cannot" " be positive number") if DEBUG: self.log.debug('Warning, we go back [%s] objects' % step.args[0]) i[0] = max(-1, i[0] - 1 + step.args[0]) i[1] = [0] # reset the callbacks pointer except JumpTokenForward as step: if step.args[0] < 0: raise WorkflowError("JumpTokenForward cannot" " be negative number") if DEBUG: self.log.debug('We skip [%s] objects' % step.args[0]) i[0] = min(len(objects), i[0] - 1 + step.args[0]) i[1] = [0] # reset the callbacks pointer except ContinueNextToken: if DEBUG: self.log.debug('Stop processing for this object, ' 'continue with next') i[1] = [0] # reset the callbacks pointer continue except HaltProcessing as e: self.increase_counter_halted() extra_data = obj.get_extra_data() obj.set_extra_data(extra_data) if DEBUG: msg = 'Processing was halted at step: %s' % (i,) self.log.info(msg) obj.log.info(msg) # Re-raise the exception, # this is the only case when a WFE can be completely # stopped if type(e) == WorkflowHalt: raise e else: raise WorkflowHalt(e) except Exception as e: extra_data = obj.get_extra_data() obj.set_extra_data(extra_data) reraise(*sys.exc_info()) # We save the object once it is fully run through obj.save(version=ObjectVersion.FINAL) self.increase_counter_finished() i[1] = [0] # reset the callbacks pointer self.after_processing(objects, self) def execute_callback(self, callback, obj): """Executes the callback - override this method to implement logging""" obj.data = obj.get_data() obj.extra_data = obj.get_extra_data() obj.extra_data["_last_task_name"] = self.get_current_taskname() self.extra_data = self.get_extra_data() self.log.debug("Executing callback %s" % (repr(callback),)) try: callback(obj, self) finally: self.set_extra_data(self.extra_data) obj.set_data(obj.data) obj.extra_data["_task_counter"] = self._i[1] obj.set_extra_data(obj.extra_data) def get_current_taskname(self): """ Will attempt to return name of current task/step. Otherwise returns None. """ callback_list = self.getCallbacks() if callback_list: for i in self.getCurrTaskId(): callback_list = callback_list[i] if isinstance(callback_list, list): # With operator functions such as IF_ELSE # The final value is not a function, but a list.value # We currently then just take the __str__ of that list. return str(callback_list) return callback_list.func_name def get_current_object(self): """ Returns the currently active BibWorkflowObject or None if no object is active. """ obj_id = self.getCurrObjId() if obj_id < 0: return None return self._objects[obj_id] def halt(self, msg, widget=None): """Halt the workflow (stop also any parent wfe)""" raise WorkflowHalt(message=msg, widget=widget, id_workflow=self.uuid) def get_default_data_type(self): """ Returns default data type from workflow definition. """ return getattr(self.workflow_definition, "object_type", "") def set_counter_initial(self, obj_count): """ :param obj_count: """ self.db_obj.counter_initial = obj_count self.db_obj.counter_halted = 0 self.db_obj.counter_error = 0 self.db_obj.counter_finished = 0 def increase_counter_halted(self): self.db_obj.counter_halted += 1 def increase_counter_error(self): self.db_obj.counter_error += 1 def increase_counter_finished(self): self.db_obj.counter_finished += 1 def set_workflow_by_name(self, workflow_name): """ :param workflow_name: """ - workflow = get_workflow_definition(workflow_name) - self.workflow_definition = workflow + from .registry import workflows + if workflow_name not in workflows: + # No workflow with that name exists + raise WorkflowDefinitionError("Workflow '%s' does not exist" + % (workflow_name,), + workflow_name=workflow_name) + self.workflow_definition = workflows[workflow_name] self.setWorkflow(self.workflow_definition.workflow) def set_extra_data_params(self, **kwargs): """ :param kwargs: """ tmp = self.get_extra_data() if not tmp: tmp = {} for key, value in iteritems(kwargs): tmp[key] = value self.set_extra_data(tmp) diff --git a/invenio/modules/workflows/models.py b/invenio/modules/workflows/models.py index fdc585978..85f3b5ae7 100644 --- a/invenio/modules/workflows/models.py +++ b/invenio/modules/workflows/models.py @@ -1,691 +1,688 @@ # -*- coding: utf-8 -*- ## This file is part of Invenio. ## Copyright (C) 2012, 2013, 2014 CERN. ## ## Invenio is free software; you can redistribute it and/or ## modify it under the terms of the GNU General Public License as ## published by the Free Software Foundation; either version 2 of the ## License, or (at your option) any later version. ## ## Invenio is distributed in the hope that it will be useful, but ## WITHOUT ANY WARRANTY; without even the implied warranty of ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU ## General Public License for more details. ## ## You should have received a copy of the GNU General Public License ## along with Invenio; if not, write to the Free Software Foundation, Inc., ## 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA. import os import tempfile from six.moves import cPickle import base64 import logging from six import string_types, iteritems from datetime import datetime from sqlalchemy import desc from sqlalchemy.orm.exc import NoResultFound from invenio.ext.sqlalchemy import db from invenio.base.globals import cfg from .utils import redis_create_search_entry, WorkflowsTaskResult from .logger import (get_logger, BibWorkflowLogHandler) class ObjectVersion(object): INITIAL = 0 FINAL = 1 HALTED = 2 RUNNING = 3 def get_default_data(): """ Returns the base64 representation of the data default value """ data_default = {} return base64.b64encode(cPickle.dumps(data_default)) def get_default_extra_data(): """ Returns the base64 representation of the extra_data default value """ extra_data_default = {"_tasks_results": [], "owner": {}, "_task_counter": {}, "error_msg": "", "_last_task_name": "", "latest_object": -1, "_widget": None, "redis_search": {}, "source": ""} return base64.b64encode(cPickle.dumps(extra_data_default)) def session_manager(orig_func): """Decorator to wrap function with the session.""" def new_func(self, *a, **k): """Wrappend function to manage DB session.""" try: resp = orig_func(self, *a, **k) db.session.commit() return resp except: db.session.rollback() raise return new_func class Workflow(db.Model): __tablename__ = "bwlWORKFLOW" uuid = db.Column(db.String(36), primary_key=True, nullable=False) name = db.Column(db.String(255), default="Default workflow", nullable=False) created = db.Column(db.DateTime, default=datetime.now, nullable=False) modified = db.Column(db.DateTime, default=datetime.now, onupdate=datetime.now, nullable=False) id_user = db.Column(db.Integer, default=0, nullable=False) _extra_data = db.Column(db.LargeBinary, nullable=False, default=get_default_extra_data()) status = db.Column(db.Integer, default=0, nullable=False) current_object = db.Column(db.Integer, default="0", nullable=False) objects = db.relationship("BibWorkflowObject", backref="bwlWORKFLOW") counter_initial = db.Column(db.Integer, default=0, nullable=False) counter_halted = db.Column(db.Integer, default=0, nullable=False) counter_error = db.Column(db.Integer, default=0, nullable=False) counter_finished = db.Column(db.Integer, default=0, nullable=False) module_name = db.Column(db.String(64), nullable=False) def __repr__(self): return "<Workflow(name: %s, module: %s, cre: %s, mod: %s," \ "id_user: %s, status: %s)>" % \ (str(self.name), str(self.module_name), str(self.created), str(self.modified), str(self.id_user), str(self.status)) def __str__(self): return """Workflow: Uuid: %s Name: %s User id: %s Module name: %s Created: %s Modified: %s Status: %s Current object: %s Counters: initial=%s, halted=%s, error=%s, finished=%s Extra data: %s""" % (str(self.uuid), str(self.name), str(self.id_user), str(self.module_name), str(self.created), str(self.modified), str(self.status), str(self.current_object), str(self.counter_initial), str(self.counter_halted), str(self.counter_error), str(self.counter_finished), str(self._extra_data),) @classmethod def get(cls, *criteria, **filters): """ A wrapper for the filter and filter_by functions of sqlalchemy. Define a dict with which columns should be filtered by which values. e.g. Workflow.get(uuid=uuid) Workflow.get(Workflow.uuid != uuid) The function supports also "hybrid" arguments. e.g. Workflow.get(Workflow.module_name != 'i_hate_this_module', user_id=user_id) look up also sqalchemy BaseQuery's filter and filter_by documentation """ return cls.query.filter(*criteria).filter_by(**filters) @classmethod def get_status(cls, uuid=None): """ Returns the status of the workflow """ return cls.get(Workflow.uuid == uuid).one().status @classmethod def get_most_recent(cls, *criteria, **filters): """ Returns the most recently modified workflow. """ most_recent = cls.get(*criteria, **filters). \ order_by(desc(Workflow.modified)).first() if most_recent is None: raise NoResultFound else: return most_recent @classmethod def get_objects(cls, uuid=None): """ Returns the objects of the workflow """ return cls.get(Workflow.uuid == uuid).one().objects def get_extra_data(self, user_id=0, uuid=None, key=None, getter=None): """Returns a json of the column extra_data or if any of the other arguments are defined, a specific value. You can define either the key or the getter function. :param key: the key to access the desirable value :param getter: a callable that takes a dict as param and returns a value """ extra_data = Workflow.get(Workflow.id_user == self.id_user, Workflow.uuid == self.uuid).one()._extra_data extra_data = cPickle.loads(base64.b64decode(extra_data)) if key: return extra_data[key] elif callable(getter): return getter(extra_data) elif not key: return extra_data def set_extra_data(self, user_id=0, uuid=None, key=None, value=None, setter=None): """Modifies the json of the column extra_data or if any of the other arguments are defined, a specific value. You can define either the key, value or the setter function. :param key: the key to access the desirable value :param value: the new value :param setter: a callable that takes a dict as param and modifies it """ extra_data = Workflow.get(Workflow.id_user == user_id, Workflow.uuid == uuid).one()._extra_data extra_data = cPickle.loads(base64.b64decode(extra_data)) if key is not None and value is not None: extra_data[key] = value elif callable(setter): setter(extra_data) Workflow.get(Workflow.uuid == self.uuid).update( {'_extra_data': base64.b64encode(cPickle.dumps(extra_data))} ) @classmethod @session_manager def delete(cls, uuid=None): cls.get(Workflow.uuid == uuid).delete() @session_manager def save(self, status): """ Save object to persistent storage. :param status: """ self.modified = datetime.now() if status is not None: self.status = status db.session.add(self) class BibWorkflowObject(db.Model): # db table definition __tablename__ = "bwlOBJECT" id = db.Column(db.Integer, primary_key=True) # Our internal data column. Default is encoded dict. _data = db.Column(db.LargeBinary, nullable=False, default=get_default_data()) _extra_data = db.Column(db.LargeBinary, nullable=False, default=get_default_extra_data()) id_workflow = db.Column(db.String(36), db.ForeignKey("bwlWORKFLOW.uuid"), nullable=True) version = db.Column(db.Integer(3), default=ObjectVersion.INITIAL, nullable=False) id_parent = db.Column(db.Integer, db.ForeignKey("bwlOBJECT.id"), default=None) child_objects = db.relationship("BibWorkflowObject", remote_side=[id_parent]) created = db.Column(db.DateTime, default=datetime.now, nullable=False) modified = db.Column(db.DateTime, default=datetime.now, onupdate=datetime.now, nullable=False) status = db.Column(db.String(255), default="", nullable=False) data_type = db.Column(db.String(150), default="", nullable=True) uri = db.Column(db.String(500), default="") id_user = db.Column(db.Integer, default=0, nullable=False) child_logs = db.relationship("BibWorkflowObjectLog") workflow = db.relationship( Workflow, foreign_keys=[id_workflow], remote_side=Workflow.uuid ) _log = None @property def log(self): if not self._log: db_handler_obj = BibWorkflowLogHandler(BibWorkflowObjectLog, "id") self._log = get_logger(logger_name="object.%s" % (self.id,), db_handler_obj=db_handler_obj, loglevel=logging.DEBUG, obj=self) return self._log def get_data(self): """ Main method to retrieve data saved to the object. """ return cPickle.loads(base64.b64decode(self._data)) def set_data(self, value): """ Main method to update data saved to the object. """ self._data = base64.b64encode(cPickle.dumps(value)) def get_extra_data(self): """ Main method to retrieve data saved to the object. """ return cPickle.loads(base64.b64decode(self._extra_data)) def set_extra_data(self, value): """ Main method to update data saved to the object. """ self._extra_data = base64.b64encode(cPickle.dumps(value)) def __repr__(self): return "<BibWorkflowObject(id = %s, data = %s, id_workflow = %s, " \ "version = %s, id_parent = %s, created = %s, extra_data = %s)" \ % (str(self.id), str(self.get_data()), str(self.id_workflow), str(self.version), str(self.id_parent), str(self.created), str(self.get_extra_data())) def __eq__(self, other): if isinstance(other, BibWorkflowObject): if self._data == other._data and \ self._extra_data == other._extra_data and \ self.id_workflow == other.id_workflow and \ self.version == other.version and \ self.id_parent == other.id_parent and \ isinstance(self.created, datetime) and \ isinstance(self.modified, datetime): return True else: return False return NotImplemented def __ne__(self, other): return not self.__eq__(other) def add_task_result(self, name, result): """ Adds given task results to extra_data in order to be accessed and displayed later on by Holding Pen templates. """ task_name = self.extra_data["_last_task_name"] res_obj = WorkflowsTaskResult(task_name, name, result) self.extra_data["_tasks_results"].append(res_obj) def add_widget(self, widget, message): - """ - Assign a widget to this object for an action to be taken - in holdingpen. The widget is reffered to by a string with + """Assign a widget to this object for an action to be taken + in holding-pen. The widget is referred to by a string with the filename minus extension. Ex: approval_widget. A message is also needed to tell the user the action required. """ extra_data = self.get_extra_data() extra_data["_widget"] = widget extra_data["_message"] = message self.set_extra_data(extra_data) def get_widget(self): - """ - Retrive the currently assigned widget, if any. - """ + """Retrieve the currently assigned widget, if any. + :return: name of widget assigned as string, or None + """ try: return self.get_extra_data()["_widget"] except KeyError: # No widget return None def remove_widget(self): - """ - Removes the currently assigned widget. - """ + """Removes the currently assigned widget.""" extra_data = self.get_extra_data() extra_data["_widget"] = None extra_data["_message"] = "" self.set_extra_data(extra_data) def start_workflow(self, workflow_name, **kwargs): """Run the workflow specified on the object. Will start a new workflow execution for the object using workflows.api. :param workflow_name: name of workflow to run :type str """ from .api import start self.save() return start(workflow_name, data=[self], **kwargs) def continue_workflow(self, start_point="continue_next", **kwargs): """Run the workflow specified on the object. Will continue a previous execution for the object using workflows.api. :param start_point: where should the workflow start from? One of: * restart_prev: will restart from the previous task * continue_next: will continue to the next task * restart_task: will restart the current task :type str """ from .api import continue_oid from .errors import WorkflowAPIError self.save() if not self.id_workflow: raise WorkflowAPIError("No workflow associated with object: %r" % (repr(self),)) return continue_oid(self.id, start_point, **kwargs) def change_status(self, message): self.status = message def get_current_task(self): """ Returns the current progress structure from the workflow engine for this object. """ extra_data = self.get_extra_data() try: return extra_data["_task_counter"] except KeyError: # Assume old version "task_counter" return extra_data["task_counter"] def save_to_file(self, directory=None, prefix="workflow_object_data_", suffix=".obj"): """ Saves the contents of self.data['data'] to file. Returns path to saved file. Warning: Currently assumes non-binary content. """ if directory is None: directory = cfg['CFG_TMPSHAREDIR'] tmp_fd, filename = tempfile.mkstemp(dir=directory, prefix=prefix, suffix=suffix) os.write(tmp_fd, self.get_data()) os.close(tmp_fd) return filename def __getstate__(self): return self.__dict__ def __setstate__(self, state): self.__dict__ = state def copy(self, other): """Copies data and metadata except id and id_workflow""" self._data = other._data self._extra_data = other._extra_data self.version = other.version self.id_parent = other.id_parent self.created = other.created self.modified = other.modified self.status = other.status self.data_type = other.data_type self.uri = other.uri def get_formatted_data(self, format=None, formatter=None): """ Returns the data in some chewable format. """ from invenio.modules.records.api import Record from invenio.modules.formatter.engine import format_record data = self.get_data() if formatter: # A seperate formatter is supplied return formatter(data) if isinstance(data, dict): # Dicts are cool on its own, but maybe its SmartJson (record) try: new_dict_representation = Record(data) data = new_dict_representation.legacy_export_as_marc() except (TypeError, KeyError): # Maybe not, submission? return data if isinstance(data, string_types): # Its a string type, lets try to convert if format: # We can try formatter! # If already XML, format_record does not like it. if format != 'xm': try: return format_record(recID=None, of=format, xml_record=data) except TypeError: # Wrong kind of type pass else: # So, XML then from xml.dom.minidom import parseString try: pretty_data = parseString(data) return pretty_data.toprettyxml() except TypeError: # Probably not proper XML string then return "Data cannot be parsed: %s" % (data,) except Exception: # Some other parsing error pass # Just return raw string return data # Not any of the above types. How juicy! return data @session_manager def save(self, version=None, task_counter=None, id_workflow=None): """ Save object to persistent storage. :param version: :param task_counter: :param id_workflow: """ if task_counter is not None: self.log.debug("Saving task counter: %s" % (task_counter,)) extra_data = self.get_extra_data() extra_data["_task_counter"] = task_counter self.set_extra_data(extra_data) if version is not None: self.version = version if version in (ObjectVersion.FINAL, ObjectVersion.HALTED): redis_create_search_entry(self) if id_workflow is not None: self.id_workflow = id_workflow db.session.add(self) if self.id is not None: self.log.debug("Saving object: %s" % (self.id or "new",)) @classmethod def get(cls, *criteria, **filters): """ A wrapper for the filter and filter_by functions of sqlalchemy. Define a dict with which columns should be filtered by which values. e.g. Workflow.get(uuid=uuid) Workflow.get(Workflow.uuid != uuid) The function supports also "hybrid" arguments. e.g. Workflow.get(Workflow.module_name != 'i_hate_this_module', user_id=user_id) look up also sqalchemy BaseQuery's filter and filter_by documentation """ return cls.query.filter(*criteria).filter_by(**filters) @classmethod @session_manager def delete(cls, oid): cls.get(BibWorkflowObject.id == oid).delete() @classmethod @session_manager def create_object(cls, **kwargs): """Create a new Workflow Object with given content.""" obj = BibWorkflowObject(**kwargs) db.session.add(obj) return obj @classmethod @session_manager def create_object_revision(cls, old_obj, version, **kwargs): """Add a new revision to a Workflow Object.""" # Create new object and copy it obj = BibWorkflowObject(**kwargs) obj.copy(old_obj) # Overwrite some changes obj.id_parent = old_obj.id obj.version = version for key, value in iteritems(kwargs): setattr(obj, key, value) db.session.add(obj) return obj class BibWorkflowObjectLog(db.Model): """ This class represent a record of a log emit by an object into the database the object must be saved before using this class. Indeed it needs the id of the object into the database. """ __tablename__ = 'bwlOBJECTLOGGING' id = db.Column(db.Integer, primary_key=True) id_object = db.Column(db.Integer(255), db.ForeignKey('bwlOBJECT.id'), nullable=False) log_type = db.Column(db.Integer, default=0, nullable=False) created = db.Column(db.DateTime, default=datetime.now) message = db.Column(db.TEXT, default="", nullable=False) def __str__(self): return "%(severity)s: %(created)s - %(message)s" % { "severity": self.log_type, "created": self.created, "message": self.message } def __repr__(self): return "BibWorkflowObjectLog(%s)" % (", ".join( "log_type='%s'" % self.log_type, "created='%s'" % self.created, "message='%s'" % self.message, "id_object='%'" % self.id_object) ) @classmethod def get(cls, *criteria, **filters): """ A wrapper for the filter and filter_by functions of sqlalchemy. Define a dict with which columns should be filtered by which values. look up also sqalchemy BaseQuery's filter and filter_by documentation """ return cls.query.filter(*criteria).filter_by(**filters) @classmethod def get_most_recent(cls, *criteria, **filters): """ Returns the most recently created log. """ most_recent = cls.get(*criteria, **filters).order_by( desc(BibWorkflowObjectLog.created)).first() if most_recent is None: raise NoResultFound else: return most_recent @classmethod def delete(cls, id=None): cls.get(BibWorkflowObjectLog.id == id).delete() db.session.commit() class BibWorkflowEngineLog(db.Model): __tablename__ = "bwlWORKFLOWLOGGING" id = db.Column(db.Integer, primary_key=True) id_object = db.Column(db.String(255), nullable=False) log_type = db.Column(db.Integer, default=0, nullable=False) created = db.Column(db.DateTime, default=datetime.now) message = db.Column(db.TEXT, default="", nullable=False) def __str__(self): return "%(severity)s: %(created)s - %(message)s" % { "severity": self.log_type, "created": self.created, "message": self.message } def __repr__(self): return "BibWorkflowEngineLog(%s)" % (", ".join( "log_type='%s'" % self.log_type, "created='%s'" % self.created, "message='%s'" % self.message, "id_object='%'" % self.id_object) ) @classmethod def get(cls, *criteria, **filters): """ A wrapper for the filter and filter_by functions of sqlalchemy. Define a dict with which columns should be filtered by which values. look up also sqalchemy BaseQuery's filter and filter_by documentation """ return cls.query.filter(*criteria).filter_by(**filters) @classmethod def get_most_recent(cls, *criteria, **filters): """ Returns the most recently created log. """ most_recent = cls.get(*criteria, **filters).order_by( desc(BibWorkflowEngineLog.created)).first() if most_recent is None: raise NoResultFound else: return most_recent @classmethod def delete(cls, uuid=None): cls.get(BibWorkflowEngineLog.id == uuid).delete() db.session.commit() __all__ = ['Workflow', 'BibWorkflowObject', 'BibWorkflowObjectLog', 'BibWorkflowEngineLog'] diff --git a/invenio/modules/workflows/templates/workflows/entry_details.html b/invenio/modules/workflows/templates/workflows/entry_details.html index ddc017c71..8ebbe863e 100644 --- a/invenio/modules/workflows/templates/workflows/entry_details.html +++ b/invenio/modules/workflows/templates/workflows/entry_details.html @@ -1,73 +1,73 @@ {# ## This file is part of Invenio. ## Copyright (C) 2014 CERN. ## ## Invenio is free software; you can redistribute it and/or ## modify it under the terms of the GNU General Public License as ## published by the Free Software Foundation; either version 2 of the ## License, or (at your option) any later version. ## ## Invenio is distributed in the hope that it will be useful, but ## WITHOUT ANY WARRANTY; without even the implied warranty of ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU ## General Public License for more details. ## ## You should have received a copy of the GNU General Public License ## along with Invenio; if not, write to the Free Software Foundation, Inc., ## 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA. #} {% extends "workflows/admin_base.html" %} {% import 'workflows/utils.html' as utils %} {% js url_for('workflows.static', filename='js/workflows/entry_details.js'), '50-workflows' %} {% block javascript %} {{ super() }} <script type="text/javascript"> $(document).ready(function(){ WORKFLOWS_OBJECT_PREVIEW.bind_object_preview( "{{ url_for('workflows.entry_data_preview') }}", "{{ entry.id }}" ); }); </script> {% endblock javascript %} {% block workflowbody %} <div class="row"> <div class="col-md-12"> <h3 id="myModalLabel">{{ entry.id }}</h3> <p>Owner: <strong>{{ entry.id_user }}</strong> | Creation date: <strong>{{ entry.created }}</strong></p> </div> </div> <div class="row"> <div class="col-md-5"> Preview: <div class="well"> <div name="object_preview">{{- data_preview|safe -}}</div> </div> </div> <div class="col-md-3"> Workflow definition: <div class="well"> - {{ utils.function_display(workflow_func, entry.get_extra_data()['_task_counter']) }} + {{ utils.function_display(workflow_tasks, entry.get_extra_data()['_task_counter']) }} </div> </div> </div> <div class="row"> <div class="col-md-12"> Log: {% for log_object in log %} <pre>{{ log_object.message }}</pre><br/> {% endfor %} </div> </div> <div class="row"> <div class="col-md-12"> <a class="btn btn-primary" href="{{ url_for('holdingpen.details', objectid=entry.id)}}" >Open in HoldingPen <i class="glyphicon glyphicon-wrench"></i></a> </div> </div> {% endblock %} diff --git a/invenio/modules/workflows/templates/workflows/workflow_details.html b/invenio/modules/workflows/templates/workflows/workflow_details.html index b91fd207b..e00f214f4 100644 --- a/invenio/modules/workflows/templates/workflows/workflow_details.html +++ b/invenio/modules/workflows/templates/workflows/workflow_details.html @@ -1,61 +1,61 @@ {# ## This file is part of Invenio. ## Copyright (C) 2014 CERN. ## ## Invenio is free software; you can redistribute it and/or ## modify it under the terms of the GNU General Public License as ## published by the Free Software Foundation; either version 2 of the ## License, or (at your option) any later version. ## ## Invenio is distributed in the hope that it will be useful, but ## WITHOUT ANY WARRANTY; without even the implied warranty of ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU ## General Public License for more details. ## ## You should have received a copy of the GNU General Public License ## along with Invenio; if not, write to the Free Software Foundation, Inc., ## 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA. #} {% extends "workflows/admin_base.html" %} {% import 'workflows/utils.html' as utils %} {% block workflowbody %} <div class="row"> <div class="col-md-12"> <h3 class="modal-title" id="myModalLabel">{{ workflow_metadata.name }}</h3> <p>Owner: <strong>{{ workflow_metadata.uid }}</strong> | Creation date: <strong>{{ workflow_metadata.created }}</strong> | Last modification date: <strong>{{ workflow_metadata.modified }}</strong></p> </div> </div> <div class="row"> <div class="col-md-5"> Workflow definition: <div class="well"> - {{ utils.function_display(workflow_func, [-1]) }} + {{ utils.function_display(workflow_tasks, [-1]) }} </div> Workflow status <div class="well"> <div id="workflow_preview"> <p>All objects count: {{ workflow_metadata.counter_initial }}</p> <p>Finished objects count: {{ workflow_metadata.counter_finished }}</p> <p>Halted objects count: {{ workflow_metadata.counter_halted }}</p> <p>Error objects count: {{ workflow_metadata.counter_error }}</p> </div> </div> </div> <div class="col-md-7"> Error message <div class="well"> <div class="workflow_message"> <p>{{ workflow_metadata.error_msg }}</p> </div> </div> </div> </div> <div class="row"> Log: <pre>{{ log }}</pre> </div> {% endblock %} diff --git a/invenio/modules/workflows/utils.py b/invenio/modules/workflows/utils.py index 90044bb1c..f22eb3fc3 100644 --- a/invenio/modules/workflows/utils.py +++ b/invenio/modules/workflows/utils.py @@ -1,284 +1,278 @@ # -*- coding: utf-8 -*- ## ## This file is part of Invenio. ## Copyright (C) 2012, 2013, 2014 CERN. ## ## Invenio is free software; you can redistribute it and/or ## modify it under the terms of the GNU General Public License as ## published by the Free Software Foundation; either version 2 of the ## License, or (at your option) any later version. ## ## Invenio is distributed in the hope that it will be useful, but ## WITHOUT ANY WARRANTY; without even the implied warranty of ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU ## General Public License for more details. ## ## You should have received a copy of the GNU General Public License ## along with Invenio; if not, write to the Free Software Foundation, Inc., ## 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA. from __future__ import print_function, absolute_import import re import redis -import traceback from six import iteritems from invenio.ext.logging import register_exception -from .errors import WorkflowDefinitionError - - REGEXP_RECORD = re.compile("<record.*?>(.*?)</record>", re.DOTALL) class BibWorkflowObjectIdContainer(object): """ This class is only made to be able to store a workflow ID and to retrieve easily the workflow from this ID. It is used maily to overide some problem with SQLAlchemy when we change of execution thread ( for example Celery ) """ def __init__(self, bibworkflowobject=None): if bibworkflowobject is not None: self.id = bibworkflowobject.id else: self.id = None def get_object(self): from invenio.modules.workflows.models import BibWorkflowObject if self.id is not None: return BibWorkflowObject.query.filter( BibWorkflowObject.id == self.id ).one() else: return None def from_dict(self, dict_to_process): self.id = dict_to_process[str(self.__class__)]["id"] return self def to_dict(self): return {str(self.__class__): self.__dict__} def __str__(self): return "BibWorkflowObject(%s)" % (str(self.id),) class WorkflowsTaskResult(object): """ Class to contain the current task results. """ def __init__(self, task_name, name, result): self.task_name = task_name self.name = name self.result = result def get_workflow_definition(name): """ Tries to load the given workflow from the system. """ from .registry import workflows - try: - return workflows[name] - except Exception as e: - raise WorkflowDefinitionError("Error with workflow '%s': %s\n%s" % - (name, str(e), traceback.format_exc()), - workflow_name=name) + if name in workflows: + return getattr(workflows[name], "workflow", None) + else: + return None def determineDataType(data): # If data is a dictionary and contains type key, # we can directly derive the data_type if isinstance(data, dict): if 'type' in data: data_type = data['type'] else: data_type = 'dict' else: # If data is not a dictionary, we try to guess MIME type # by using magic library try: from magic import Magic mime_checker = Magic(mime=True) data_type = mime_checker.from_buffer(data) # noqa except: register_exception( stream="warning", prefix="BibWorkflowObject.determineDataType:" " Impossible to resolve data type." ) data_type = "" return data_type ## TODO special thanks to http://code.activestate.com/recipes/440514-dictproperty-properties-for-dictionary-attributes/ class dictproperty(object): class _proxy(object): def __init__(self, obj, fget, fset, fdel): self._obj = obj self._fget = fget self._fset = fset self._fdel = fdel def __getitem__(self, key): try: return self._fget(self._obj, key) except TypeError: print("can't read item") def __setitem__(self, key, value): try: self._fset(self._obj, key, value) except TypeError: print("can't set item %s: %s" % (str(key), str(value),)) def __delitem__(self, key): try: self._fdel(self._obj, key) except TypeError: print("can't delete item") def __init__(self, fget=None, fset=None, fdel=None, doc=None): self._fget = fget self._fset = fset self._fdel = fdel self.__doc__ = doc def __get__(self, obj, objtype=None): if obj is None: return self return self._proxy(obj, self._fget, self._fset, self._fdel) def redis_create_search_entry(bwobject): redis_server = set_up_redis() extra_data = bwobject.get_extra_data() #creates database entries to not loose key value pairs in redis for key, value in iteritems(extra_data["redis_search"]): redis_server.sadd("holdingpen_sort", str(key)) redis_server.sadd("holdingpen_sort:%s" % (str(key),), str(value)) redis_server.sadd("holdingpen_sort:%s:%s" % (str(key), str(value),), bwobject.id) redis_server.sadd("holdingpen_sort", "owner") redis_server.sadd("holdingpen_sort:owner", extra_data['owner']) redis_server.sadd("holdingpen_sort:owner:%s" % (extra_data['owner'],), bwobject.id) redis_server.sadd("holdingpen_sort:last_task_name:%s" % (extra_data['_last_task_name'],), bwobject.id) def filter_holdingpen_results(key, *args): """Function filters holdingpen entries by given key: value pair. It returns list of IDs.""" redis_server = set_up_redis() new_args = [] for a in args: new_args.append("holdingpen_sort:" + a) return redis_server.sinter("holdingpen_sort:" + key, *new_args) def get_redis_keys(key=None): redis_server = set_up_redis() if key: return list(redis_server.smembers("holdingpen_sort:%s" % (str(key),))) else: return list(redis_server.smembers("holdingpen_sort")) def get_redis_values(key): redis_server = set_up_redis() return redis_server.smembers("holdingpen_sort:%s" % (str(key),)) def set_up_redis(): """ Sets up the redis server for the saving of the HPContainers @return: Redis server object. """ from flask import current_app redis_server = redis.Redis.from_url( current_app.config.get('CACHE_REDIS_URL', 'redis://localhost:6379') ) return redis_server def empty_redis(): redis_server = set_up_redis() redis_server.flushall() def sort_bwolist(bwolist, iSortCol_0, sSortDir_0): if iSortCol_0 == 0: if sSortDir_0 == 'desc': bwolist.sort(key=lambda x: x.id, reverse=True) else: bwolist.sort(key=lambda x: x.id, reverse=False) elif iSortCol_0 == 1: pass # if sSortDir_0 == 'desc': # bwolist.sort(key=lambda x: x.id_user, reverse=True) # else: # bwolist.sort(key=lambda x: x.id_user, reverse=False) elif iSortCol_0 == 2: pass # if sSortDir_0 == 'desc': # bwolist.sort(key=lambda x: x.id_user, reverse=True) # else: # bwolist.sort(key=lambda x: x.id_user, reverse=False) elif iSortCol_0 == 3: pass # if sSortDir_0 == 'desc': # bwolist.sort(key=lambda x: x.id_user, reverse=True) # else: # bwolist.sort(key=lambda x: x.id_user, reverse=False) elif iSortCol_0 == 4: if sSortDir_0 == 'desc': bwolist.sort(key=lambda x: x.id_workflow, reverse=True) else: bwolist.sort(key=lambda x: x.id_workflow, reverse=False) elif iSortCol_0 == 5: if sSortDir_0 == 'desc': bwolist.sort(key=lambda x: x.id_user, reverse=True) else: bwolist.sort(key=lambda x: x.id_user, reverse=False) elif iSortCol_0 == 6: if sSortDir_0 == 'desc': bwolist.sort(key=lambda x: x.created, reverse=True) else: bwolist.sort(key=lambda x: x.created, reverse=False) elif iSortCol_0 == 7: if sSortDir_0 == 'desc': bwolist.sort(key=lambda x: x.version, reverse=True) else: bwolist.sort(key=lambda x: x.version, reverse=False) elif iSortCol_0 == 8: if sSortDir_0 == 'desc': bwolist.sort(key=lambda x: x.version, reverse=True) else: bwolist.sort(key=lambda x: x.version, reverse=False) elif iSortCol_0 == 9: if sSortDir_0 == 'desc': bwolist.sort(key=lambda x: x.version, reverse=True) else: bwolist.sort(key=lambda x: x.version, reverse=False) return bwolist def parse_bwids(bwolist): import ast return list(ast.literal_eval(bwolist)) diff --git a/invenio/modules/workflows/views/admin.py b/invenio/modules/workflows/views/admin.py index 6851aae12..181a56358 100644 --- a/invenio/modules/workflows/views/admin.py +++ b/invenio/modules/workflows/views/admin.py @@ -1,177 +1,175 @@ # -*- coding: utf-8 -*- ## This file is part of Invenio. ## Copyright (C) 2012, 2013, 2014 CERN. ## ## Invenio is free software; you can redistribute it and/or ## modify it under the terms of the GNU General Public License as ## published by the Free Software Foundation; either version 2 of the ## License, or (at your option) any later version. ## ## Invenio is distributed in the hope that it will be useful, but ## WITHOUT ANY WARRANTY; without even the implied warranty of ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU ## General Public License for more details. ## ## You should have received a copy of the GNU General Public License ## along with Invenio; if not, write to the Free Software Foundation, Inc., ## 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA. """ invenio.modules.workflows.views.admin ------------------------------------- Workflows admin area for an overview over available workflow definitions, workflows and objects created. WARNING: Currently not fully working. """ from __future__ import print_function from six import text_type from flask import Blueprint from flask.ext.login import login_required from invenio.base.i18n import _ from invenio.base.decorators import wash_arguments, templated from flask.ext.breadcrumbs import default_breadcrumb_root, register_breadcrumb from ..api import start_delayed from ..utils import (get_workflow_definition, get_redis_keys as utils_get_redis_keys, filter_holdingpen_results) from ..models import Workflow, BibWorkflowObject, BibWorkflowEngineLog from ..registry import workflows blueprint = Blueprint('workflows', __name__, url_prefix="/admin/workflows", template_folder='../templates', static_folder='../static') default_breadcrumb_root(blueprint, '.workflows') @blueprint.route('/', methods=['GET', 'POST']) @blueprint.route('/index', methods=['GET', 'POST']) @login_required @register_breadcrumb(blueprint, '.', _('Workflows')) @templated('workflows/index.html') def index(): """ Dispalys main interface of BibWorkflow. """ w = Workflow.query.all() filter_keys = utils_get_redis_keys() return dict(workflows=w, filter_keys=filter_keys) @blueprint.route('/details/<int:objectid>', methods=['GET', 'POST']) @register_breadcrumb(blueprint, '.details', _("Record Details")) @login_required @templated('workflows/entry_details.html') def details(objectid): """ Displays entry details. """ from flask import Markup from pprint import pformat of = "hd" bwobject = BibWorkflowObject.query.filter( BibWorkflowObject.id == objectid ).first() workflow_object = Workflow.query.filter( Workflow.uuid == bwobject.id_workflow ).first() - # Workflow class: workflow.workflow is the workflow list - workflow = get_workflow_definition(workflow_object.name) + workflow_tasks = get_workflow_definition(workflow_object.name) formatted_data = bwobject.get_formatted_data(of) if isinstance(formatted_data, dict): formatted_data = pformat(formatted_data) if of and of in ("xm", "xml", "marcxml"): data = Markup.escape(formatted_data) else: data = formatted_data engine_log = BibWorkflowEngineLog.query.filter( BibWorkflowEngineLog.id_object == workflow_object.uuid ) return dict(entry=bwobject, log=engine_log, data_preview=data, - workflow_func=workflow.workflow) + workflow_tasks=workflow_tasks) @blueprint.route('/workflow_details/<id_workflow>', methods=['GET', 'POST']) @register_breadcrumb(blueprint, '.workflow_details', _("Workflow Details")) @login_required @templated('workflows/workflow_details.html') def workflow_details(id_workflow): workflow_object = Workflow.query.filter( Workflow.uuid == id_workflow ).first() - # Workflow class: workflow.workflow is the workflow list - workflow = get_workflow_definition(workflow_object.name) + workflow_tasks = get_workflow_definition(workflow_object.name) return dict(workflow_metadata=workflow_object, log="", - workflow_func=workflow.workflow) + workflow_tasks=workflow_tasks) @blueprint.route('/workflows', methods=['GET', 'POST']) @login_required @templated('workflows/workflow_list.html') def show_workflows(): return dict(workflows=workflows) @blueprint.route('/run_workflow', methods=['GET', 'POST']) @login_required @wash_arguments({'workflow_name': (text_type, "")}) def run_workflow(workflow_name, data={"data": 10}): start_delayed(workflow_name, data) return "Workflow has been started." @blueprint.route('/entry_data_preview', methods=['GET', 'POST']) @login_required @wash_arguments({'oid': (int, 0), 'of': (text_type, 'default')}) def entry_data_preview(oid, of): workflow_object = BibWorkflowObject.query.filter(BibWorkflowObject.id == oid).first() return _entry_data_preview(workflow_object.get_data(), of) @blueprint.route('/get_redis_keys', methods=['GET', 'POST']) @login_required @wash_arguments({'key': (text_type, "")}) def get_redis_keys(key): keys = utils_get_redis_keys(str(key)) options = "" for key in keys: options += "<option>%s</option>" % (key,) return options @blueprint.route('/get_redis_values', methods=['GET', 'POST']) @login_required @wash_arguments({'key': (text_type, "")}) def get_redis_values(key): values = filter_holdingpen_results(key) return str(values) def _entry_data_preview(data, of='default'): if format == 'hd' or format == 'xm': from invenio.modules.formatter import format_record try: data['record'] = format_record(recID=None, of=of, xml_record=data['record']) return data['record'] except ValueError: print("This is not a XML string") return data diff --git a/invenio/modules/workflows/views/holdingpen.py b/invenio/modules/workflows/views/holdingpen.py index ec9f5dddb..2112a36c7 100644 --- a/invenio/modules/workflows/views/holdingpen.py +++ b/invenio/modules/workflows/views/holdingpen.py @@ -1,604 +1,604 @@ # -*- coding: utf-8 -*- ## This file is part of Invenio. ## Copyright (C) 2013, 2014 CERN. ## ## Invenio is free software; you can redistribute it and/or ## modify it under the terms of the GNU General Public License as ## published by the Free Software Foundation; either version 2 of the ## License, or (at your option) any later version. ## ## Invenio is distributed in the hope that it will be useful, but ## WITHOUT ANY WARRANTY; without even the implied warranty of ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU ## General Public License for more details. ## ## You should have received a copy of the GNU General Public License ## along with Invenio; if not, write to the Free Software Foundation, Inc., ## 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA. """ invenio.modules.workflows.views.holdingpen ------------------------------------------ Holding Pen is an overlay over all objects (BibWorkflowObject) that have run through a workflow (BibWorkflowEngine). This area is targeted to catalogers and super users for inspecting ingestion workflows and submissions/depositions. Note: Currently work-in-progress. """ import re from six import iteritems, text_type from flask import (render_template, Blueprint, request, current_app, jsonify, url_for, flash) from flask.ext.login import login_required from flask.ext.breadcrumbs import default_breadcrumb_root, register_breadcrumb from flask.ext.menu import register_menu from invenio.base.decorators import templated, wash_arguments from invenio.base.i18n import _ from invenio.utils.date import pretty_date from ..models import BibWorkflowObject, Workflow, ObjectVersion from ..registry import widgets from ..utils import (get_workflow_definition, sort_bwolist) from ..api import continue_oid_delayed, start blueprint = Blueprint('holdingpen', __name__, url_prefix="/admin/holdingpen", template_folder='../templates', static_folder='../static') default_breadcrumb_root(blueprint, '.holdingpen') REG_TD = re.compile("<td title=\"(.+?)\">(.+?)</td>", re.DOTALL) @blueprint.route('/', methods=['GET', 'POST']) @blueprint.route('/index', methods=['GET', 'POST']) @login_required @register_menu(blueprint, 'personalize.holdingpen', _('Your Pending Actions')) @register_breadcrumb(blueprint, '.', _('Holdingpen')) @templated('workflows/hp_index.html') def index(): """ Displays main interface of Holdingpen. Acts as a hub for catalogers (may be removed) """ # FIXME: Add user filtering bwolist = get_holdingpen_objects(version_showing=[ObjectVersion.HALTED]) widget_list = get_widget_list(bwolist) return dict(tasks=widget_list) @blueprint.route('/maintable', methods=['GET', 'POST']) @register_breadcrumb(blueprint, '.records', _('Records')) @login_required @templated('workflows/hp_maintable.html') def maintable(): """ Displays main table interface of Holdingpen. """ bwolist = get_holdingpen_objects() widget_list = get_widget_list(bwolist) widget_static = [] for name, widget in iteritems(widgets): if getattr(widget, "static", None): widget_static.extend(widget.static) return dict(bwolist=bwolist, widget_list=widget_list, widget_static=widget_static) @blueprint.route('/batch_widget', methods=['GET', 'POST']) @login_required @wash_arguments({'bwolist': (text_type, "")}) def batch_widget(bwolist): """ Renders widget accepting single or multiple records. """ from ..utils import parse_bwids bwolist = parse_bwids(bwolist) try: bwolist = map(int, bwolist) except ValueError: # Bad ID, we just pass for now pass objlist = [] workflow_func_list = [] w_metadata_list = [] info_list = [] widgetlist = [] bwo_parent_list = [] logtext_list = [] objlist = [BibWorkflowObject.query.get(i) for i in bwolist] for bwobject in objlist: extracted_data = extract_data(bwobject) bwo_parent_list.append(extracted_data['bwparent']) logtext_list.append(extracted_data['logtext']) info_list.append(extracted_data['info']) w_metadata_list.append(extracted_data['w_metadata']) workflow_func_list.append(extracted_data['workflow_func']) if bwobject.get_widget() not in widgetlist: widgetlist.append(bwobject.get_widget()) widget_form = widgets[widgetlist[0]] result = widget_form().render(objlist, bwo_parent_list, info_list, logtext_list, w_metadata_list, workflow_func_list) url, parameters = result return render_template(url, **parameters) @blueprint.route('/load_table', methods=['GET', 'POST']) @login_required @templated('workflows/hp_maintable.html') def load_table(): """ Function used for the passing of JSON data to the DataTable 1] First checks for what record version to show 2] then sorting direction, 3] then if the user searched for something and finally it builds the JSON to send. """ version_showing = [] req = request.json s_search = request.args.get('sSearch', None) if req is not None: if "final" in req: version_showing.append(ObjectVersion.FINAL) if "halted" in req: version_showing.append(ObjectVersion.HALTED) if "running" in req: version_showing.append(ObjectVersion.RUNNING) if "initial" in req: version_showing.append(ObjectVersion.INITIAL) current_app.config['VERSION_SHOWING'] = version_showing elif "VERSION_SHOWING" in current_app.config: version_showing = current_app.config.get('VERSION_SHOWING', []) try: i_sortcol_0 = request.args.get('iSortCol_0') s_sortdir_0 = request.args.get('sSortDir_0') i_display_start = int(request.args.get('iDisplayStart')) i_display_length = int(request.args.get('iDisplayLength')) sEcho = int(request.args.get('sEcho')) + 1 except: i_sortcol_0 = current_app.config.get('iSortCol_0', 0) s_sortdir_0 = current_app.config.get('sSortDir_0', None) i_display_start = current_app.config.get('iDisplayStart', 0) i_display_length = current_app.config.get('iDisplayLength', 10) sEcho = current_app.config.get('sEcho', 0) + 1 bwolist = get_holdingpen_objects(ssearch=s_search, version_showing=version_showing) if 'iSortCol_0' in current_app.config: i_sortcol_0 = int(i_sortcol_0) if i_sortcol_0 != current_app.config['iSortCol_0'] \ or s_sortdir_0 != current_app.config['sSortDir_0']: bwolist = sort_bwolist(bwolist, i_sortcol_0, s_sortdir_0) current_app.config['iDisplayStart'] = i_display_start current_app.config['iDisplayLength'] = i_display_length current_app.config['iSortCol_0'] = i_sortcol_0 current_app.config['sSortDir_0'] = s_sortdir_0 current_app.config['sEcho'] = sEcho table_data = { "aaData": [] } try: table_data['iTotalRecords'] = len(bwolist) table_data['iTotalDisplayRecords'] = len(bwolist) except: bwolist = get_holdingpen_objects(version_showing=version_showing) table_data['iTotalRecords'] = len(bwolist) table_data['iTotalDisplayRecords'] = len(bwolist) # This will be simplified once Redis is utilized. records_showing = 0 for bwo in bwolist[i_display_start:i_display_start + i_display_length]: widget_name = bwo.get_widget() widget = widgets.get(widget_name, None) # if widget != None and bwo.version in VERSION_SHOWING: records_showing += 1 mini_widget = getattr(widget, "mini_widget", None) record = bwo.get_data() if not isinstance(record, dict): record = {} extra_data = bwo.get_extra_data() category_list = record.get('subject_term', []) if isinstance(category_list, dict): category_list = [category_list] categories = ["%s (%s)" % (subject['term'], subject['scheme']) for subject in category_list] row = render_template('workflows/row_formatter.html', object=bwo, record=record, extra_data=extra_data, categories=categories, widget=widget, mini_widget=mini_widget, pretty_date=pretty_date) d = {} for key, value in REG_TD.findall(row): d[key] = value.strip() table_data['aaData'].append( [d['id'], d['checkbox'], d['title'], d['source'], d['category'], d['pretty_date'], d['version'], d['type'], d['details'], d['widget'] ] ) table_data['sEcho'] = sEcho table_data['iTotalRecords'] = len(bwolist) table_data['iTotalDisplayRecords'] = len(bwolist) return jsonify(table_data) @blueprint.route('/get_version_showing', methods=['GET', 'POST']) @login_required def get_version_showing(): """ Returns current version showing, saved in current_app.config """ try: return current_app.config['VERSION_SHOWING'] except KeyError: return None @blueprint.route('/details/<int:objectid>', methods=['GET', 'POST']) @register_breadcrumb(blueprint, '.details', _("Record Details")) @login_required def details(objectid): """ Displays info about the object, and presents the data of all available versions of the object. (Initial, Error, Final) """ of = "hd" bwobject = BibWorkflowObject.query.get(objectid) formatted_data = bwobject.get_formatted_data(of) extracted_data = extract_data(bwobject) try: edit_record_widget = widgets['edit_record_widget']() except KeyError: # Could not load edit_record_widget edit_record_widget = [] return render_template('workflows/hp_details.html', bwobject=bwobject, bwparent=extracted_data['bwparent'], info=extracted_data['info'], log=extracted_data['logtext'], data_preview=formatted_data, workflow_func=extracted_data['workflow_func'], workflow=extracted_data['w_metadata'], edit_record_widget=edit_record_widget) @blueprint.route('/restart_record', methods=['GET', 'POST']) @login_required @wash_arguments({'objectid': (int, 0)}) def restart_record(objectid, start_point='continue_next'): """ Restarts the initial object in its workflow """ bwobject = BibWorkflowObject.query.get(objectid) workflow = Workflow.query.filter( Workflow.uuid == bwobject.id_workflow).first() start(workflow.name, [bwobject.get_data()]) return 'Record Restarted' @blueprint.route('/continue_record', methods=['GET', 'POST']) @login_required @wash_arguments({'objectid': (int, 0)}) def continue_record(objectid): """ Restarts the initial object in its workflow """ continue_oid_delayed(oid=objectid, start_point='continue_next') return 'Record continued workflow' @blueprint.route('/restart_record_prev', methods=['GET', 'POST']) @login_required @wash_arguments({'objectid': (int, 0)}) def restart_record_prev(objectid): """ Restarts the initial object in its workflow from the current task """ continue_oid_delayed(oid=objectid, start_point="restart_task") return 'Record restarted current task' @blueprint.route('/delete', methods=['GET', 'POST']) @login_required @wash_arguments({'objectid': (int, 0)}) def delete_from_db(objectid): """ Deletes all available versions of the object from the db """ BibWorkflowObject.delete(objectid) return 'Record Deleted' @blueprint.route('/delete_multi', methods=['GET', 'POST']) @login_required @wash_arguments({'bwolist': (text_type, "")}) def delete_multi(bwolist): from ..utils import parse_bwids bwolist = parse_bwids(bwolist) for objectid in bwolist: delete_from_db(objectid) return 'Records Deleted' @blueprint.route('/action/<objectid>', methods=['GET', 'POST']) @register_breadcrumb(blueprint, '.widget', _("Widget")) @login_required def show_widget(objectid): """ Renders the widget assigned to a specific record """ bwobject = BibWorkflowObject.query.filter( BibWorkflowObject.id == objectid).first_or_404() widget = bwobject.get_widget() # FIXME: add case here if no widget widget_form = widgets[widget] extracted_data = extract_data(bwobject) result = widget_form().render([bwobject], [extracted_data['bwparent']], [extracted_data['info']], [extracted_data['logtext']], [extracted_data['w_metadata']], [extracted_data['workflow_func']]) url, parameters = result return render_template(url, **parameters) @blueprint.route('/resolve', methods=['GET', 'POST']) @login_required @wash_arguments({'objectid': (text_type, '-1'), 'widget': (text_type, 'default')}) def resolve_widget(objectid, widget): """ Resolves the action taken in a widget. Calls the run_widget function of the specific widget. """ widget_form = widgets[widget] widget_form().run_widget(objectid) return "Done" @blueprint.route('/resolve_edit', methods=['GET', 'POST']) @login_required @wash_arguments({'objectid': (text_type, '0'), 'form': (text_type, '')}) def resolve_edit(objectid, form): """ Performs the changes to the record made in the edit record widget. """ if request: edit_record(request.form) return 'Record Edited' @blueprint.route('/entry_data_preview', methods=['GET', 'POST']) @login_required @wash_arguments({'objectid': (text_type, '0'), 'of': (text_type, None)}) def entry_data_preview(objectid, of): """ Presents the data in a human readble form or in xml code """ from flask import Markup from pprint import pformat bwobject = BibWorkflowObject.query.get(int(objectid)) if not bwobject: flash("No object found for %s" % (objectid,)) return jsonify(data={}) formatted_data = bwobject.get_formatted_data(of) if isinstance(formatted_data, dict): formatted_data = pformat(formatted_data) if of and of in ("xm", "xml", "marcxml"): data = Markup.escape(formatted_data) else: data = formatted_data return jsonify(data=data) @blueprint.route('/get_context', methods=['GET', 'POST']) @login_required def get_context(): """ Returns a JSON structure with URL maps for Holding Pen. """ context = {} context['url_prefix'] = blueprint.url_prefix context['holdingpen'] = { "url_load": url_for('holdingpen.load_table'), "url_preview": url_for('holdingpen.entry_data_preview'), "url_restart_record": url_for('holdingpen.restart_record'), "url_restart_record_prev": url_for('holdingpen.restart_record_prev'), "url_continue_record": url_for('holdingpen.continue_record'), "url_resolve_edit": url_for('holdingpen.resolve_edit') } try: context['version_showing'] = current_app.config['VERSION_SHOWING'] except KeyError: context['version_showing'] = ObjectVersion.HALTED context['widgets'] = [name for name, widget in iteritems(widgets) if getattr(widget, "static", None)] return jsonify(context) def get_info(bwobject): """ Parses the hpobject and extracts its info to a dictionary """ info = {} if bwobject.get_extra_data()['owner'] != {}: info['owner'] = bwobject.get_extra_data()['owner'] else: info['owner'] = 'None' info['parent id'] = bwobject.id_parent info['workflow id'] = bwobject.id_workflow info['object id'] = bwobject.id info['widget'] = bwobject.get_widget() return info def extract_data(bwobject): """ Extracts metadata for BibWorkflowObject needed for rendering the Record's details and widget page. """ extracted_data = {} if bwobject.id_parent is not None: extracted_data['bwparent'] = \ BibWorkflowObject.query.get(bwobject.id_parent) else: extracted_data['bwparent'] = None # TODO: read the logstuff from the db extracted_data['loginfo'] = "" extracted_data['logtext'] = {} for log in extracted_data['loginfo']: extracted_data['logtext'][log.get_extra_data()['last_task_name']] = \ log.message extracted_data['info'] = get_info(bwobject) try: extracted_data['info']['widget'] = bwobject.get_widget() except (KeyError, AttributeError): pass extracted_data['w_metadata'] = \ Workflow.query.filter(Workflow.uuid == bwobject.id_workflow).first() extracted_data['workflow_func'] = \ - get_workflow_definition(extracted_data['w_metadata'].name).workflow + get_workflow_definition(extracted_data['w_metadata'].name) return extracted_data def edit_record(form): """ Will call the edit record widget resolve function """ for key in form.iterkeys(): # print '%s: %s' % (key, form[key]) pass def get_widget_list(object_list): """ Returns a dict of widget names mapped to the number of halted objects associated with that widget. """ widget_dict = {} found_widgets = [] # First get a list of all to count up later for bwo in object_list: widget_name = bwo.get_widget() if widget_name is not None: found_widgets.append(widget_name) # Get "real" widget name only once per widget for widget_name in set(found_widgets): if widget_name not in widgets: # Perhaps some old widget? Use stored name. widget_nicename = widget_name else: widget = widgets[widget_name] widget_nicename = getattr(widget, "__title__", widget_name) widget_dict[widget_nicename] = found_widgets.count(widget_name) return widget_dict def get_holdingpen_objects(isortcol_0=None, ssortdir_0=None, ssearch=None, version_showing=(ObjectVersion.HALTED,)): """ Looks for related BibWorkflowObject's for display in Holding Pen. Uses DataTable naming for filtering/sorting. Work in progress. """ if isortcol_0: isortcol_0 = int(isortcol_0) bwobject_list = BibWorkflowObject.query.filter( BibWorkflowObject.version.in_(version_showing) ).all() if ssearch and len(ssearch) < 2: bwobject_list_tmp = [] for bwo in bwobject_list: extra_data = bwo.get_extra_data() if bwo.id_parent == ssearch: bwobject_list_tmp.append(bwo) elif bwo.id_user == ssearch: bwobject_list_tmp.append(bwo) elif bwo.id_workflow == ssearch: bwobject_list_tmp.append(bwo) elif extra_data['_last_task_name'] == ssearch: bwobject_list_tmp.append(bwo) else: widget_name = bwo.get_widget() if widget_name: widget = widgets[widget_name] if ssearch in widget.__title__ or ssearch in widget_name: bwobject_list_tmp.append(bwo) bwobject_list = bwobject_list_tmp if isortcol_0 == -6: if ssortdir_0 == 'desc': bwobject_list.reverse() return bwobject_list