Page Menu
Home
c4science
Search
Configure Global Search
Log In
Files
F92249779
models.py
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Subscribers
None
File Metadata
Details
File Info
Storage
Attached
Created
Mon, Nov 18, 18:29
Size
32 KB
Mime Type
text/x-python
Expires
Wed, Nov 20, 18:29 (2 d)
Engine
blob
Format
Raw Data
Handle
22404292
Attached To
R3600 invenio-infoscience
models.py
View Options
# -*- coding: utf-8 -*-
# This file is part of Invenio.
# Copyright (C) 2012, 2013, 2014, 2015 CERN.
#
# Invenio is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation; either version 2 of the
# License, or (at your option) any later version.
#
# Invenio is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Invenio; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA.
"""Models for BibWorkflow Objects."""
import
base64
import
logging
import
os
import
tempfile
from
datetime
import
datetime
from
invenio.base.globals
import
cfg
from
invenio.base.utils
import
classproperty
from
invenio.ext.logging
import
deprecated
from
invenio.ext.sqlalchemy
import
db
from
invenio.ext.sqlalchemy.utils
import
session_manager
from
six
import
callable
,
iteritems
from
six.moves
import
cPickle
from
sqlalchemy
import
desc
from
sqlalchemy.orm.exc
import
NoResultFound
from
.logger
import
BibWorkflowLogHandler
,
get_logger
class
ObjectVersion
(
object
):
"""Specify the different versions possible."""
INITIAL
=
0
COMPLETED
=
1
HALTED
=
2
RUNNING
=
3
WAITING
=
4
ERROR
=
5
MAPPING
=
{
"New"
:
0
,
"Done"
:
1
,
"Need action"
:
2
,
"In process"
:
3
,
"Waiting"
:
4
,
"Error"
:
5
}
@classproperty
@deprecated
(
"Please use ObjectVersion.COMPLETED "
"instead of ObjectVersion.FINAL"
)
def
FINAL
(
cls
):
return
cls
.
COMPLETED
@classmethod
def
name_from_version
(
cls
,
version
):
try
:
return
cls
.
MAPPING
.
keys
()[
cls
.
MAPPING
.
values
()
.
index
(
version
)]
except
ValueError
:
return
None
def
get_default_data
():
"""Return the base64 representation of the data default value."""
data_default
=
{}
return
base64
.
b64encode
(
cPickle
.
dumps
(
data_default
))
def
get_default_extra_data
():
"""Return the base64 representation of the extra_data default value."""
extra_data_default
=
{
"_tasks_results"
:
{},
"owner"
:
{},
"_task_counter"
:
{},
"_error_msg"
:
None
,
"_last_task_name"
:
""
,
"latest_object"
:
-
1
,
"_action"
:
None
,
"redis_search"
:
{},
"source"
:
""
,
"_task_history"
:
[]}
return
base64
.
b64encode
(
cPickle
.
dumps
(
extra_data_default
))
class
Workflow
(
db
.
Model
):
"""Represents a workflow instance.
Used by BibWorkflowEngine to store the state of the workflow.
"""
__tablename__
=
"bwlWORKFLOW"
uuid
=
db
.
Column
(
db
.
String
(
36
),
primary_key
=
True
,
nullable
=
False
)
name
=
db
.
Column
(
db
.
String
(
255
),
default
=
"Default workflow"
,
nullable
=
False
)
created
=
db
.
Column
(
db
.
DateTime
,
default
=
datetime
.
now
,
nullable
=
False
)
modified
=
db
.
Column
(
db
.
DateTime
,
default
=
datetime
.
now
,
onupdate
=
datetime
.
now
,
nullable
=
False
)
id_user
=
db
.
Column
(
db
.
Integer
,
default
=
0
,
nullable
=
False
)
_extra_data
=
db
.
Column
(
db
.
LargeBinary
,
nullable
=
False
,
default
=
get_default_extra_data
())
status
=
db
.
Column
(
db
.
Integer
,
default
=
0
,
nullable
=
False
)
current_object
=
db
.
Column
(
db
.
Integer
,
default
=
"0"
,
nullable
=
False
)
objects
=
db
.
relationship
(
"BibWorkflowObject"
,
backref
=
'bwlWORKFLOW'
,
cascade
=
"all, delete, delete-orphan"
)
counter_initial
=
db
.
Column
(
db
.
Integer
,
default
=
0
,
nullable
=
False
)
counter_halted
=
db
.
Column
(
db
.
Integer
,
default
=
0
,
nullable
=
False
)
counter_error
=
db
.
Column
(
db
.
Integer
,
default
=
0
,
nullable
=
False
)
counter_finished
=
db
.
Column
(
db
.
Integer
,
default
=
0
,
nullable
=
False
)
module_name
=
db
.
Column
(
db
.
String
(
64
),
nullable
=
False
)
child_logs
=
db
.
relationship
(
"BibWorkflowEngineLog"
,
backref
=
'bwlWORKFLOW'
,
cascade
=
"all, delete, delete-orphan"
)
def
__repr__
(
self
):
"""Represent a workflow object."""
return
"<Workflow(name:
%s
, module:
%s
, cre:
%s
, mod:
%s
,"
\
"id_user:
%s
, status:
%s
)>"
%
\
(
str
(
self
.
name
),
str
(
self
.
module_name
),
str
(
self
.
created
),
str
(
self
.
modified
),
str
(
self
.
id_user
),
str
(
self
.
status
))
def
__str__
(
self
):
"""Print a workflow object."""
return
"""Workflow:
Uuid: %s
Name: %s
User id: %s
Module name: %s
Created: %s
Modified: %s
Status: %s
Current object: %s
Counters: initial=%s, halted=%s, error=%s, finished=%s
Extra data: %s"""
%
(
str
(
self
.
uuid
),
str
(
self
.
name
),
str
(
self
.
id_user
),
str
(
self
.
module_name
),
str
(
self
.
created
),
str
(
self
.
modified
),
str
(
self
.
status
),
str
(
self
.
current_object
),
str
(
self
.
counter_initial
),
str
(
self
.
counter_halted
),
str
(
self
.
counter_error
),
str
(
self
.
counter_finished
),
str
(
self
.
_extra_data
))
@classmethod
def
get
(
cls
,
*
criteria
,
**
filters
):
"""Wrapper to get a specified object.
A wrapper for the filter and filter_by functions of sqlalchemy.
Define a dict with which columns should be filtered by which values.
.. code-block:: python
Workflow.get(uuid=uuid)
Workflow.get(Workflow.uuid != uuid)
The function supports also "hybrid" arguments.
.. code-block:: python
Workflow.get(Workflow.module_name != 'i_hate_this_module',
user_id=user_id)
See also SQLAlchemy BaseQuery's filter and filter_by documentation.
"""
return
cls
.
query
.
filter
(
*
criteria
)
.
filter_by
(
**
filters
)
@classmethod
def
get_status
(
cls
,
uuid
=
None
):
"""Return the status of the workflow."""
return
cls
.
get
(
Workflow
.
uuid
==
uuid
)
.
one
()
.
status
@classmethod
def
get_most_recent
(
cls
,
*
criteria
,
**
filters
):
"""Return the most recently modified workflow."""
most_recent
=
cls
.
get
(
*
criteria
,
**
filters
)
.
\
order_by
(
desc
(
Workflow
.
modified
))
.
first
()
if
most_recent
is
None
:
raise
NoResultFound
else
:
return
most_recent
@classmethod
def
get_objects
(
cls
,
uuid
=
None
):
"""Return the objects of the workflow."""
return
cls
.
get
(
Workflow
.
uuid
==
uuid
)
.
one
()
.
objects
def
get_extra_data
(
self
,
user_id
=
0
,
uuid
=
None
,
key
=
None
,
getter
=
None
):
"""Get the extra_data for the object.
Returns a JSON of the column extra_data or
if any of the other arguments are defined,
a specific value.
You can define either the key or the getter function.
:param key: the key to access the desirable value
:param getter: a callable that takes a dict as param and returns a value
"""
extra_data
=
Workflow
.
get
(
Workflow
.
id_user
==
self
.
id_user
,
Workflow
.
uuid
==
self
.
uuid
)
.
one
()
.
_extra_data
extra_data
=
cPickle
.
loads
(
base64
.
b64decode
(
extra_data
))
if
key
:
return
extra_data
[
key
]
elif
callable
(
getter
):
return
getter
(
extra_data
)
elif
not
key
:
return
extra_data
def
set_extra_data
(
self
,
user_id
=
0
,
uuid
=
None
,
key
=
None
,
value
=
None
,
setter
=
None
):
"""Replace extra_data.
Modifies the JSON of the column extra_data or
if any of the other arguments are defined, a specific value.
You can define either the key, value or the setter function.
:param key: the key to access the desirable value
:param value: the new value
:param setter: a callable that takes a dict as param and modifies it
"""
extra_data
=
Workflow
.
get
(
Workflow
.
id_user
==
user_id
,
Workflow
.
uuid
==
uuid
)
.
one
()
.
_extra_data
extra_data
=
cPickle
.
loads
(
base64
.
b64decode
(
extra_data
))
if
key
is
not
None
and
value
is
not
None
:
extra_data
[
key
]
=
value
elif
callable
(
setter
):
setter
(
extra_data
)
Workflow
.
get
(
Workflow
.
uuid
==
self
.
uuid
)
.
update
(
{
'_extra_data'
:
base64
.
b64encode
(
cPickle
.
dumps
(
extra_data
))}
)
@classmethod
@session_manager
def
delete
(
cls
,
uuid
=
None
):
"""Delete a workflow."""
cls
.
get
(
Workflow
.
uuid
==
uuid
)
.
delete
()
@session_manager
def
save
(
self
,
status
):
"""Save object to persistent storage."""
self
.
modified
=
datetime
.
now
()
if
status
is
not
None
:
self
.
status
=
status
db
.
session
.
add
(
self
)
class
BibWorkflowObject
(
db
.
Model
):
"""Data model for wrapping data being run in the workflows.
Main object being passed around in the workflows module
when using the workflows API.
It can be instantiated like this:
.. code-block:: python
obj = BibWorkflowObject()
obj.save()
Or, like this:
.. code-block:: python
obj = BibWorkflowObject.create_object()
BibWorkflowObject provides some handy functions such as:
.. code-block:: python
obj.set_data("<xml ..... />")
obj.get_data() == "<xml ..... />"
obj.set_extra_data({"param": value})
obj.get_extra_data() == {"param": value}
obj.add_task_result("myresult", {"result": 1})
Then to finally save the object
.. code-block:: python
obj.save()
Now you can for example run it in a workflow:
.. code-block:: python
obj.start_workflow("sample_workflow")
"""
# db table definition
__tablename__
=
"bwlOBJECT"
id
=
db
.
Column
(
db
.
Integer
,
primary_key
=
True
)
# Our internal data column. Default is encoded dict.
_data
=
db
.
Column
(
db
.
LargeBinary
,
nullable
=
False
,
default
=
get_default_data
())
_extra_data
=
db
.
Column
(
db
.
LargeBinary
,
nullable
=
False
,
default
=
get_default_extra_data
())
id_workflow
=
db
.
Column
(
db
.
String
(
36
),
db
.
ForeignKey
(
"bwlWORKFLOW.uuid"
),
nullable
=
True
)
version
=
db
.
Column
(
db
.
Integer
(
3
),
default
=
ObjectVersion
.
INITIAL
,
nullable
=
False
)
id_parent
=
db
.
Column
(
db
.
Integer
,
db
.
ForeignKey
(
"bwlOBJECT.id"
),
default
=
None
)
child_objects
=
db
.
relationship
(
"BibWorkflowObject"
,
remote_side
=
[
id_parent
])
created
=
db
.
Column
(
db
.
DateTime
,
default
=
datetime
.
now
,
nullable
=
False
)
modified
=
db
.
Column
(
db
.
DateTime
,
default
=
datetime
.
now
,
onupdate
=
datetime
.
now
,
nullable
=
False
)
status
=
db
.
Column
(
db
.
String
(
255
),
default
=
""
,
nullable
=
False
)
data_type
=
db
.
Column
(
db
.
String
(
150
),
default
=
""
,
nullable
=
True
)
uri
=
db
.
Column
(
db
.
String
(
500
),
default
=
""
)
id_user
=
db
.
Column
(
db
.
Integer
,
default
=
0
,
nullable
=
False
)
child_logs
=
db
.
relationship
(
"BibWorkflowObjectLog"
,
backref
=
'bibworkflowobject'
,
cascade
=
"all, delete, delete-orphan"
)
workflow
=
db
.
relationship
(
Workflow
,
foreign_keys
=
[
id_workflow
],
remote_side
=
Workflow
.
uuid
,
)
_log
=
None
@property
def
log
(
self
):
"""Access logger object for this instance."""
if
not
self
.
_log
:
db_handler_obj
=
BibWorkflowLogHandler
(
BibWorkflowObjectLog
,
"id"
)
self
.
_log
=
get_logger
(
logger_name
=
"object.
%s
"
%
(
self
.
id
,),
db_handler_obj
=
db_handler_obj
,
loglevel
=
logging
.
DEBUG
,
obj
=
self
)
return
self
.
_log
def
get_data
(
self
):
"""Get data saved in the object."""
return
cPickle
.
loads
(
base64
.
b64decode
(
self
.
_data
))
def
set_data
(
self
,
value
):
"""Save data to the object."""
self
.
_data
=
base64
.
b64encode
(
cPickle
.
dumps
(
value
))
def
get_extra_data
(
self
):
"""Get extra data saved to the object."""
return
cPickle
.
loads
(
base64
.
b64decode
(
self
.
_extra_data
))
def
set_extra_data
(
self
,
value
):
"""Save extra data to the object.
:param value: what you want to replace extra_data with.
:type value: dict
"""
self
.
_extra_data
=
base64
.
b64encode
(
cPickle
.
dumps
(
value
))
def
get_workflow_name
(
self
):
"""Return the workflow name for this object."""
try
:
if
self
.
id_workflow
:
return
Workflow
.
query
.
get
(
self
.
id_workflow
)
.
name
except
AttributeError
:
# Workflow non-existent
pass
return
def
update_task_history
(
self
,
last_task
):
"""Append last task to task history."""
from
.utils
import
get_func_info
if
"_task_history"
not
in
self
.
extra_data
:
self
.
extra_data
[
"_task_history"
]
=
[]
if
hasattr
(
last_task
,
'branch'
)
and
last_task
.
branch
:
return
elif
hasattr
(
last_task
,
'hide'
)
and
last_task
.
hide
:
return
else
:
self
.
extra_data
[
"_task_history"
]
.
append
(
get_func_info
(
last_task
))
def
get_formatted_data
(
self
,
of
=
"hd"
):
"""Get the formatted representation for this object."""
from
.registry
import
workflows
try
:
name
=
self
.
get_workflow_name
()
if
not
name
:
return
""
workflow_definition
=
workflows
[
name
]
formatted_data
=
workflow_definition
.
formatter
(
self
,
of
=
of
)
except
(
KeyError
,
AttributeError
):
# Somehow the workflow or formatter does not exist
from
invenio.ext.logging
import
register_exception
register_exception
(
alert_admin
=
True
)
formatted_data
=
""
return
formatted_data
def
__repr__
(
self
):
"""Represent a BibWorkflowObject."""
return
"<BibWorkflowObject(id =
%s
, data =
%s
, id_workflow =
%s
, "
\
"version =
%s
, id_parent =
%s
, created =
%s
, extra_data =
%s
)"
\
%
(
str
(
self
.
id
),
str
(
self
.
get_data
()),
str
(
self
.
id_workflow
),
str
(
self
.
version
),
str
(
self
.
id_parent
),
str
(
self
.
created
),
str
(
self
.
get_extra_data
()))
def
__eq__
(
self
,
other
):
"""Enable equal operators on BibWorkflowObjects."""
if
isinstance
(
other
,
BibWorkflowObject
):
if
self
.
_data
==
other
.
_data
and
\
self
.
_extra_data
==
other
.
_extra_data
and
\
self
.
id_workflow
==
other
.
id_workflow
and
\
self
.
version
==
other
.
version
and
\
self
.
id_parent
==
other
.
id_parent
and
\
isinstance
(
self
.
created
,
datetime
)
and
\
isinstance
(
self
.
modified
,
datetime
):
return
True
else
:
return
False
return
NotImplemented
def
__ne__
(
self
,
other
):
"""Enable equal operators on BibWorkflowObjects."""
return
not
self
.
__eq__
(
other
)
def
add_task_result
(
self
,
name
,
result
,
template
=
"workflows/results/default.html"
):
"""Add a new task result defined by name.
The name is the dictionary key used to group similar types of
results as well as a possible label for the result.
The result is a dictionary given as context to the template
when rendered. The result given here is added to a list of results
for this name.
.. code-block:: python
obj = BibWorkflowObject() # or BibWorkflowObject.query.get(id)
obj.add_task_result("foo", my_result, "path/to/template")
:param name: The name of the task in human friendly way.
It is used as a key and label for the result rendering.
:type name: string
:param result: The result to store - passed to render_template().
:type result: dict
:param template: The location of the template to render the result.
:type template: string
"""
extra_data
=
getattr
(
self
,
"extra_data"
,
self
.
get_extra_data
())
task_result
=
{
"name"
:
name
,
"result"
:
result
,
"template"
:
template
}
if
name
in
extra_data
[
"_tasks_results"
]:
extra_data
[
"_tasks_results"
][
name
]
.
append
(
task_result
)
else
:
extra_data
[
"_tasks_results"
][
name
]
=
[
task_result
]
self
.
set_extra_data
(
extra_data
)
def
update_task_results
(
self
,
name
,
results
):
"""Update tasks results by name.
The name is the dictionary key used to group similar types of
results as well as a possible label for the result.
This functions allows you to update (replace) the list of results
associated with a name where each result is structured like this:
.. code-block:: python
task_result = {
"name": "foo",
"result": result,
"template": template
}
obj = BibWorkflowObject() # or BibWorkflowObject.query.get(id)
obj.update_task_results("foo", [task_result])
:param name: The name of the task in human friendly way.
It is used as a key and label for the result rendering.
:type name: string
:param results: List of results to store - passed to render_template().
:type results: list
:param template: The location of the template to render the result.
:type template: string
"""
extra_data
=
getattr
(
self
,
"extra_data"
,
self
.
get_extra_data
())
extra_data
[
"_tasks_results"
][
name
]
=
results
self
.
set_extra_data
(
extra_data
)
def
get_tasks_results
(
self
):
"""Return the complete set of tasks results.
The result is given as a dictionary where each result is
structured like:
.. code-block:: python
task_result = {
"name": name,
"result": result,
"template": template
}
:return: dictionary of results as {name: [result, ..], ..}
"""
return
self
.
get_extra_data
()[
"_tasks_results"
]
def
set_action
(
self
,
action
,
message
):
"""Set the action to be taken for this object.
Assign an special "action" to this object to be taken
in consideration in Holding Pen. The widget is referred to
by a string with the filename minus extension.
A message is also needed to tell the user the action
required in a textual way.
:param action: name of the action to add (i.e. "approval")
:type action: string
:param message: message to show to the user
:type message: string
"""
extra_data
=
self
.
get_extra_data
()
extra_data
[
"_action"
]
=
action
extra_data
[
"_message"
]
=
message
self
.
set_extra_data
(
extra_data
)
def
get_action
(
self
):
"""Retrieve the currently assigned action, if any.
:return: name of action assigned as string, or None
"""
try
:
return
self
.
get_extra_data
()[
"_action"
]
except
KeyError
:
# No widget, try old _widget
extra_data
=
self
.
get_extra_data
()
if
"_widget"
in
extra_data
:
import
warnings
warnings
.
warn
(
"Widget's are now stored in '_action'"
,
DeprecationWarning
)
# Migrate to new naming
extra_data
[
"_action"
]
=
extra_data
[
'_widget'
]
del
extra_data
[
"_widget"
]
self
.
set_extra_data
(
extra_data
)
return
extra_data
[
"_action"
]
return
None
def
get_action_message
(
self
):
"""Retrieve the currently assigned widget, if any."""
try
:
return
self
.
get_extra_data
()[
"_message"
]
except
KeyError
:
# No widget
return
""
def
set_error_message
(
self
,
msg
):
"""Set an error message."""
extra_data
=
self
.
get_extra_data
()
extra_data
[
"_error_msg"
]
=
msg
self
.
set_extra_data
(
extra_data
)
def
reset_error_message
(
self
):
"""Reset the error message."""
extra_data
=
self
.
get_extra_data
()
if
"_error_msg"
in
extra_data
:
del
extra_data
[
"_error_msg"
]
self
.
set_extra_data
(
extra_data
)
def
get_error_message
(
self
):
"""Retrieve the error message, if any."""
if
"error_msg"
in
self
.
get_extra_data
():
# Backwards compatibility
extra_data
=
self
.
get_extra_data
()
msg
=
extra_data
[
"error_msg"
]
del
extra_data
[
"error_msg"
]
self
.
set_extra_data
(
extra_data
)
self
.
set_error_message
(
msg
)
try
:
return
self
.
get_extra_data
()[
"_error_msg"
]
except
KeyError
:
# No message
return
""
def
remove_action
(
self
):
"""Remove the currently assigned action."""
extra_data
=
self
.
get_extra_data
()
extra_data
[
"_action"
]
=
None
extra_data
[
"_message"
]
=
""
if
"_widget"
in
extra_data
:
del
extra_data
[
"_widget"
]
self
.
set_extra_data
(
extra_data
)
def
start_workflow
(
self
,
workflow_name
,
delayed
=
False
,
**
kwargs
):
"""Run the workflow specified on the object.
Will start workflows execution for the object using
:py:func:`.api.start` (or :py:func:`.api.start_delayed`
if `delayed=True`).
:param workflow_name: name of workflow to run
:type workflow_name: str
:param delayed: should the workflow run asynchronously?
:type delayed: bool
:return: BibWorkflowEngine (or AsynchronousResultWrapper).
"""
if
delayed
:
from
.api
import
start_delayed
as
start_func
else
:
from
.api
import
start
as
start_func
self
.
save
()
return
start_func
(
workflow_name
,
data
=
[
self
],
**
kwargs
)
def
continue_workflow
(
self
,
start_point
=
"continue_next"
,
delayed
=
False
,
**
kwargs
):
"""Continue the workflow for this object.
Will continue a previous execution for the object using
:py:func:`.api.continue_oid` (or :py:func:`.api.continue_oid_delayed`
if `delayed=True`).
The parameter `start_point` allows you to specify the point of where
the workflow shall continue:
* restart_prev: will restart from the previous task
* continue_next: will continue to the next task
* restart_task: will restart the current task
:param start_point: where should the workflow start from?
:type start_point: str
:param delayed: should the workflow run asynchronously?
:type delayed: bool
:return: BibWorkflowEngine (or AsynchronousResultWrapper).
"""
from
.errors
import
WorkflowAPIError
self
.
save
()
if
not
self
.
id_workflow
:
raise
WorkflowAPIError
(
"No workflow associated with object:
%r
"
%
(
repr
(
self
),))
if
delayed
:
from
.api
import
continue_oid_delayed
as
continue_func
else
:
from
.api
import
continue_oid
as
continue_func
return
continue_func
(
self
.
id
,
start_point
,
**
kwargs
)
def
change_status
(
self
,
message
):
"""Change the status."""
self
.
status
=
message
def
get_current_task
(
self
):
"""Return the current task from the workflow engine for this object."""
extra_data
=
self
.
get_extra_data
()
try
:
return
extra_data
[
"_task_counter"
]
except
KeyError
:
# Assume old version "task_counter"
return
extra_data
[
"task_counter"
]
def
save_to_file
(
self
,
directory
=
None
,
prefix
=
"workflow_object_data_"
,
suffix
=
".obj"
):
"""Save the contents of self.data['data'] to file.
Returns path to saved file.
Warning: Currently assumes non-binary content.
"""
if
directory
is
None
:
directory
=
cfg
[
'CFG_TMPSHAREDIR'
]
tmp_fd
,
filename
=
tempfile
.
mkstemp
(
dir
=
directory
,
prefix
=
prefix
,
suffix
=
suffix
)
os
.
write
(
tmp_fd
,
self
.
get_data
())
os
.
close
(
tmp_fd
)
return
filename
def
get_log
(
self
,
*
criteria
,
**
filters
):
"""Return a list of log entries from BibWorkflowObjectLog.
You can specify additional filters following the SQLAlchemy syntax.
Get all the logs for the object:
.. code-block:: python
b = BibWorkflowObject.query.get(1)
b.get_log()
Get all the logs for the object labeled as ERROR.
.. code-block:: python
b = BibWorkflowObject.query.get(1)
b.get_log(BibWorkflowObjectLog.log_type == logging.ERROR)
:return: list of BibWorkflowObjectLog
"""
criterions
=
[
BibWorkflowObjectLog
.
id_object
==
self
.
id
]
+
list
(
criteria
)
res
=
BibWorkflowObjectLog
.
query
.
filter
(
*
criterions
)
.
filter_by
(
**
filters
)
return
res
.
all
()
def
__getstate__
(
self
):
"""Return internal dict."""
return
self
.
__dict__
def
__setstate__
(
self
,
state
):
"""Update interal dict with given state."""
self
.
__dict__
=
state
def
copy
(
self
,
other
):
"""Copy data and metadata except id and id_workflow."""
self
.
_data
=
other
.
_data
self
.
_extra_data
=
other
.
_extra_data
self
.
version
=
other
.
version
self
.
id_parent
=
other
.
id_parent
self
.
created
=
other
.
created
self
.
modified
=
other
.
modified
self
.
status
=
other
.
status
self
.
data_type
=
other
.
data_type
self
.
uri
=
other
.
uri
@session_manager
def
save
(
self
,
version
=
None
,
task_counter
=
None
,
id_workflow
=
None
):
"""Save object to persistent storage."""
if
task_counter
is
not
None
:
if
isinstance
(
task_counter
,
list
):
self
.
log
.
debug
(
"Saving task counter:
%s
"
%
(
task_counter
,))
extra_data
=
self
.
get_extra_data
()
extra_data
[
"_task_counter"
]
=
task_counter
self
.
set_extra_data
(
extra_data
)
else
:
raise
ValueError
(
"Task counter must be a list!"
)
if
version
is
not
None
:
if
version
!=
self
.
version
:
self
.
modified
=
datetime
.
now
()
self
.
version
=
version
if
id_workflow
is
not
None
:
self
.
id_workflow
=
id_workflow
db
.
session
.
add
(
self
)
if
self
.
id
is
not
None
:
self
.
log
.
debug
(
"Saving object:
%s
"
%
(
self
.
id
or
"new"
,))
@classmethod
def
get
(
cls
,
*
criteria
,
**
filters
):
"""Wrapper of SQLAlchemy to get a BibWorkflowObject.
A wrapper for the filter and filter_by functions of SQLAlchemy.
Define a dict with which columns should be filtered by which values.
.. code-block:: python
Workflow.get(uuid=uuid)
Workflow.get(Workflow.uuid != uuid)
The function supports also "hybrid" arguments.
.. code-block:: python
Workflow.get(Workflow.module_name != 'i_hate_this_module',
user_id=user_id)
See also SQLAlchemy BaseQuery's filter and filter_by documentation.
"""
return
cls
.
query
.
filter
(
*
criteria
)
.
filter_by
(
**
filters
)
@classmethod
@session_manager
def
delete
(
cls
,
oid
):
"""Delete a BibWorkflowObject."""
cls
.
get
(
BibWorkflowObject
.
id
==
oid
)
.
delete
()
@classmethod
@session_manager
def
create_object
(
cls
,
**
kwargs
):
"""Create a new Workflow Object with given content."""
obj
=
BibWorkflowObject
(
**
kwargs
)
db
.
session
.
add
(
obj
)
return
obj
@classmethod
@session_manager
def
create_object_revision
(
cls
,
old_obj
,
version
,
**
kwargs
):
"""Create a Workflow Object copy with customized values."""
# Create new object and copy it
obj
=
BibWorkflowObject
(
**
kwargs
)
obj
.
copy
(
old_obj
)
# Overwrite some changes
obj
.
version
=
version
obj
.
created
=
datetime
.
now
()
obj
.
modified
=
datetime
.
now
()
for
key
,
value
in
iteritems
(
kwargs
):
setattr
(
obj
,
key
,
value
)
db
.
session
.
add
(
obj
)
return
obj
class
BibWorkflowObjectLog
(
db
.
Model
):
"""Represents a log entry for BibWorkflowObjects.
This class represent a record of a log emit by an object
into the database. The object must be saved before using
this class as it requires the object id.
"""
__tablename__
=
'bwlOBJECTLOGGING'
id
=
db
.
Column
(
db
.
Integer
,
primary_key
=
True
)
id_object
=
db
.
Column
(
db
.
Integer
(
255
),
db
.
ForeignKey
(
'bwlOBJECT.id'
),
nullable
=
False
)
log_type
=
db
.
Column
(
db
.
Integer
,
default
=
0
,
nullable
=
False
)
created
=
db
.
Column
(
db
.
DateTime
,
default
=
datetime
.
now
)
message
=
db
.
Column
(
db
.
TEXT
,
default
=
""
,
nullable
=
False
)
def
__str__
(
self
):
"""Print a log."""
return
"
%(severity)s
:
%(created)s
-
%(message)s
"
%
{
"severity"
:
self
.
log_type
,
"created"
:
self
.
created
,
"message"
:
self
.
message
}
def
__repr__
(
self
):
"""Represent a log message."""
return
"BibWorkflowObjectLog(
%s
)"
%
(
", "
.
join
([
"log_type='
%s
'"
%
self
.
log_type
,
"created='
%s
'"
%
self
.
created
,
"message='
%s
'"
%
self
.
message
,
"id_object='
%s
'"
%
self
.
id_object
,
]))
@classmethod
def
get
(
cls
,
*
criteria
,
**
filters
):
"""SQLAlchemy wrapper to get BibworkflowLogs.
A wrapper for the filter and filter_by functions of SQLAlchemy.
Define a dict with which columns should be filtered by which values.
See also SQLAlchemy BaseQuery's filter and filter_by documentation.
"""
return
cls
.
query
.
filter
(
*
criteria
)
.
filter_by
(
**
filters
)
@classmethod
def
get_most_recent
(
cls
,
*
criteria
,
**
filters
):
"""Return the most recently created log."""
most_recent
=
cls
.
get
(
*
criteria
,
**
filters
)
.
order_by
(
desc
(
BibWorkflowObjectLog
.
created
))
.
first
()
if
most_recent
is
None
:
raise
NoResultFound
else
:
return
most_recent
@classmethod
def
delete
(
cls
,
id
=
None
):
"""Delete an instance in database."""
cls
.
get
(
BibWorkflowObjectLog
.
id
==
id
)
.
delete
()
db
.
session
.
commit
()
class
BibWorkflowEngineLog
(
db
.
Model
):
"""Represents a log entry for BibWorkflowEngine.
This class represent a record of a log emit by an object
into the database. The object must be saved before using
this class as it requires the object id.
"""
__tablename__
=
"bwlWORKFLOWLOGGING"
id
=
db
.
Column
(
db
.
Integer
,
primary_key
=
True
)
id_object
=
db
.
Column
(
db
.
String
(
255
),
db
.
ForeignKey
(
'bwlWORKFLOW.uuid'
),
nullable
=
False
)
log_type
=
db
.
Column
(
db
.
Integer
,
default
=
0
,
nullable
=
False
)
created
=
db
.
Column
(
db
.
DateTime
,
default
=
datetime
.
now
)
message
=
db
.
Column
(
db
.
TEXT
,
default
=
""
,
nullable
=
False
)
def
__str__
(
self
):
"""Print a log."""
return
"
%(severity)s
:
%(created)s
-
%(message)s
"
%
{
"severity"
:
self
.
log_type
,
"created"
:
self
.
created
,
"message"
:
self
.
message
}
def
__repr__
(
self
):
"""Represent a log message."""
return
"BibWorkflowEngineLog(
%s
)"
%
(
", "
.
join
([
"log_type='
%s
'"
%
self
.
log_type
,
"created='
%s
'"
%
self
.
created
,
"message='
%s
'"
%
self
.
message
,
"id_object='
%s
'"
%
self
.
id_object
]))
@classmethod
def
get
(
cls
,
*
criteria
,
**
filters
):
"""Sqlalchemy wrapper to get BibWorkflowEngineLog.
A wrapper for the filter and filter_by functions of sqlalchemy.
Define a dict with which columns should be filtered by which values.
look up also sqalchemy BaseQuery's filter and filter_by documentation
"""
return
cls
.
query
.
filter
(
*
criteria
)
.
filter_by
(
**
filters
)
@classmethod
def
get_most_recent
(
cls
,
*
criteria
,
**
filters
):
"""Return the most recently created log."""
most_recent
=
cls
.
get
(
*
criteria
,
**
filters
)
.
order_by
(
desc
(
BibWorkflowEngineLog
.
created
))
.
first
()
if
most_recent
is
None
:
raise
NoResultFound
else
:
return
most_recent
@classmethod
def
delete
(
cls
,
uuid
=
None
):
"""Delete an instance in database."""
cls
.
get
(
BibWorkflowEngineLog
.
id
==
uuid
)
.
delete
()
db
.
session
.
commit
()
__all__
=
(
'Workflow'
,
'BibWorkflowObject'
,
'BibWorkflowObjectLog'
,
'BibWorkflowEngineLog'
)
Event Timeline
Log In to Comment