Page Menu
Home
c4science
Search
Configure Global Search
Log In
Files
F90931566
bibworkflow_engine.py
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Subscribers
None
File Metadata
Details
File Info
Storage
Attached
Created
Wed, Nov 6, 03:24
Size
15 KB
Mime Type
text/x-python
Expires
Fri, Nov 8, 03:24 (1 d, 23 h)
Engine
blob
Format
Raw Data
Handle
22155325
Attached To
R3600 invenio-infoscience
bibworkflow_engine.py
View Options
# -*- coding: utf-8 -*-
## This file is part of Invenio.
## Copyright (C) 2012, 2013 CERN.
##
## Invenio is free software; you can redistribute it and/or
## modify it under the terms of the GNU General Public License as
## published by the Free Software Foundation; either version 2 of the
## License, or (at your option) any later version.
##
## Invenio is distributed in the hope that it will be useful, but
## WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
## General Public License for more details.
##
## You should have received a copy of the GNU General Public License
## along with Invenio; if not, write to the Free Software Foundation, Inc.,
## 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA.
import
sys
import
cPickle
import
traceback
import
logging
from
invenio.bibworkflow_model
import
(
Workflow
,
WorkflowLogging
,
BibWorkflowObject
)
from
workflow.engine
import
(
GenericWorkflowEngine
,
ContinueNextToken
,
HaltProcessing
,
StopProcessing
,
JumpTokenBack
,
JumpTokenForward
,
WorkflowError
)
from
datetime
import
datetime
from
invenio.sqlalchemyutils
import
db
from
invenio.config
import
CFG_DEVEL_SITE
from
invenio.bibworkflow_utils
import
get_workflow_definition
from
uuid
import
uuid1
as
new_uuid
from
invenio.bibworkflow_utils
import
dictproperty
from
invenio.bibworkflow_config
import
(
CFG_WORKFLOW_STATUS
,
CFG_OBJECT_VERSION
,
CFG_LOG_TYPE
)
DEBUG
=
CFG_DEVEL_SITE
>
0
class
BibWorkflowEngine
(
GenericWorkflowEngine
):
def
__init__
(
self
,
name
=
None
,
uuid
=
None
,
curr_obj
=
0
,
workflow_object
=
None
,
id_user
=
0
,
module_name
=
"Unknown"
,
**
kwargs
):
super
(
BibWorkflowEngine
,
self
)
.
__init__
()
self
.
db_obj
=
None
if
isinstance
(
workflow_object
,
Workflow
):
self
.
db_obj
=
workflow_object
else
:
# If uuid is defined we try to get the db object from DB.
if
uuid
is
not
None
:
self
.
db_obj
=
\
Workflow
.
get
(
Workflow
.
uuid
==
uuid
)
.
first
()
else
:
uuid
=
new_uuid
()
if
self
.
db_obj
is
None
:
self
.
db_obj
=
Workflow
(
name
=
name
,
id_user
=
id_user
,
current_object
=
curr_obj
,
module_name
=
module_name
,
uuid
=
uuid
)
self
.
_create_db_obj
()
self
.
set_workflow_by_name
(
name
)
self
.
set_extra_data_params
(
**
kwargs
)
def
log_info
(
self
,
message
,
error_msg
=
""
):
self
.
log
.
info
(
message
+
"
\n
"
+
error_msg
)
log_obj
=
WorkflowLogging
(
id_workflow
=
self
.
db_obj
.
uuid
,
log_type
=
CFG_LOG_TYPE
.
INFO
,
message
=
message
,
error_msg
=
error_msg
,
extra_data
=
self
.
db_obj
.
extra_data
)
db
.
session
.
add
(
log_obj
)
def
log_error
(
self
,
message
,
error_msg
=
""
):
self
.
log
.
error
(
message
+
"
\n
"
+
error_msg
)
log_obj
=
WorkflowLogging
(
id_workflow
=
self
.
db_obj
.
uuid
,
log_type
=
CFG_LOG_TYPE
.
ERROR
,
message
=
message
,
error_msg
=
error_msg
,
extra_data
=
self
.
db_obj
.
extra_data
)
db
.
session
.
add
(
log_obj
)
def
log_debug
(
self
,
message
,
error_msg
=
""
):
self
.
log
.
debug
(
message
+
"
\n
"
+
error_msg
)
log_obj
=
WorkflowLogging
(
id_workflow
=
self
.
db_obj
.
uuid
,
log_type
=
CFG_LOG_TYPE
.
DEBUG
,
message
=
message
,
error_msg
=
error_msg
,
extra_data
=
self
.
db_obj
.
extra_data
)
db
.
session
.
add
(
log_obj
)
def
extra_data_get
(
self
,
key
):
if
key
not
in
self
.
db_obj
.
extra_data
.
keys
():
raise
KeyError
return
self
.
db_obj
.
extra_data
[
key
]
def
extra_data_set
(
self
,
key
,
value
):
self
.
db_obj
.
extra_data
[
key
]
=
value
extra_data
=
dictproperty
(
fget
=
extra_data_get
,
fset
=
extra_data_set
,
doc
=
"Sets up property"
)
del
extra_data_get
,
extra_data_set
@property
def
counter_object
(
self
):
return
self
.
db_obj
.
counter_object
@property
def
uuid
(
self
):
return
self
.
db_obj
.
uuid
@property
def
id_user
(
self
):
return
self
.
db_obj
.
id_user
@property
def
module_name
(
self
):
return
self
.
db_obj
.
module_name
@property
def
name
(
self
):
return
self
.
db_obj
.
name
@property
def
status
(
self
):
return
self
.
db_obj
.
status
def
__getstate__
(
self
):
if
not
self
.
_picklable_safe
:
raise
cPickle
.
PickleError
(
"The instance of the workflow engine "
"cannot be serialized, "
"because it was constructed with "
"custom, user-supplied callbacks. "
"Either use PickableWorkflowEngine or "
"provide your own __getstate__ method."
)
state
=
self
.
__dict__
.
copy
()
del
state
[
'log'
]
return
state
def
__setstate__
(
self
,
state
):
if
len
(
self
.
_objects
)
<
self
.
_i
[
0
]:
raise
cPickle
.
PickleError
(
"The workflow instance "
"inconsistent state, "
"too few objects"
)
self
.
log
=
logging
.
getLogger
(
"workflow.
%s
"
%
self
.
__class__
)
# default logging
self
.
__dict__
=
state
def
__repr__
(
self
):
return
"<BibWorkflow_engine(
%s
)>"
%
(
self
.
db_obj
.
name
,)
def
__str__
(
self
,
log
=
False
):
return
"""-------------------------------
BibWorkflowEngine
-------------------------------
%s
-------------------------------
"""
%
(
self
.
db_obj
.
__str__
(),)
@staticmethod
def
before_processing
(
objects
,
self
):
"""
Executed before processing the workflow.
"""
# 1. Save workflow (ourselves).
if
not
self
.
db_obj
.
uuid
:
self
.
save
()
self
.
set_counter_initial
(
len
(
objects
))
self
.
log_info
(
"Workflow has been started"
)
# 2. We want to save all the objects as version 0.
for
o
in
objects
:
same_workflow
=
\
o
.
id_workflow
and
\
o
.
id_workflow
==
self
.
db_obj
.
uuid
if
o
.
id
and
same_workflow
:
# If object exists and we are running the same workflow,
# do nothing
o
.
log_info
(
"object saving process : was already existing"
)
continue
# Set the current workflow id in the object
if
o
.
version
==
CFG_OBJECT_VERSION
.
INITIAL
\
and
o
.
id_workflow
is
not
None
:
o
.
log_info
(
"object saving process : was already existing"
)
pass
else
:
o
.
id_workflow
=
self
.
uuid
o
.
save
(
o
.
version
)
GenericWorkflowEngine
.
before_processing
(
objects
,
self
)
@staticmethod
def
after_processing
(
objects
,
self
):
self
.
_i
=
[
-
1
,
[
0
]]
if
self
.
has_completed
():
self
.
save
(
CFG_WORKFLOW_STATUS
.
COMPLETED
)
else
:
self
.
save
(
CFG_WORKFLOW_STATUS
.
FINISHED
)
def
_create_db_obj
(
self
):
db
.
session
.
add
(
self
.
db_obj
)
db
.
session
.
commit
()
self
.
log_info
(
"Workflow saved to db as new object."
)
def
_update_db
(
self
):
db
.
session
.
commit
()
self
.
log_info
(
"Workflow saved to db."
)
def
has_completed
(
self
):
"""
Returns True of workflow is fully completed meaning
that all associated objects are in FINAL state.
"""
number_of_objects
=
BibWorkflowObject
.
query
.
filter
(
BibWorkflowObject
.
id_workflow
==
self
.
uuid
,
BibWorkflowObject
.
version
.
in_
([
CFG_OBJECT_VERSION
.
HALTED
,
CFG_OBJECT_VERSION
.
RUNNING
])
)
.
count
()
return
number_of_objects
==
0
def
save
(
self
,
status
=
CFG_WORKFLOW_STATUS
.
NEW
):
"""
Save the workflow instance to database.
Just storing the necessary data.
No serialization (!).
Status: 0 - new, 1 - running, 2 - halted, 3 - error, 4 - finished
"""
if
not
self
.
db_obj
.
uuid
:
# We do not have an ID,
# so we need to add ourselves (first execution).
self
.
_create_db_obj
()
else
:
# This workflow continues a previous execution.
if
status
in
(
CFG_WORKFLOW_STATUS
.
FINISHED
,
CFG_WORKFLOW_STATUS
.
HALTED
):
self
.
db_obj
.
current_object
=
0
self
.
db_obj
.
modified
=
datetime
.
now
()
self
.
db_obj
.
status
=
status
self
.
_update_db
()
def
process
(
self
,
objects
):
super
(
BibWorkflowEngine
,
self
)
.
process
(
objects
)
def
restart
(
self
,
obj
,
task
):
"""Restart the workflow engine after it was deserialized
"""
self
.
log_info
(
"Restarting workflow from
%s
object and
%s
task"
%
(
str
(
obj
),
str
(
task
),))
# set the point from which to start processing
if
obj
==
'prev'
:
# start with the previous object
self
.
_i
[
0
]
-=
2
#TODO: check if there is any object there
elif
obj
==
'current'
:
# continue with the current object
self
.
_i
[
0
]
-=
1
elif
obj
==
'next'
:
pass
else
:
raise
Exception
(
'Unknown start point for object:
%s
'
%
obj
)
# set the task that will be executed first
if
task
==
'prev'
:
# the previous
self
.
_i
[
1
][
-
1
]
-=
1
elif
task
==
'current'
:
# restart the task again
self
.
_i
[
1
][
-
1
]
-=
0
elif
task
==
'next'
:
# continue with the next task
self
.
_i
[
1
][
-
1
]
+=
1
else
:
raise
Exception
(
'Unknown start pointfor task:
%s
'
%
obj
)
self
.
process
(
self
.
_objects
)
self
.
_unpickled
=
False
@staticmethod
def
processing_factory
(
objects
,
self
):
"""Default processing factory extended with saving objects
before succesful processing.
Default processing factory, will process objects in order
@var objects: list of objects (passed in by self.process())
@keyword cls: engine object itself, because this method may
be implemented by the standalone function, we pass the
self also as a cls argument
As the WFE proceeds, it increments the internal counter, the
first position is the number of the element. This pointer increases
before the object is taken
2nd pos is reserved for the array that points to the task position.
The number there points to the task that is currently executed;
when error happens, it will be there unchanged. The pointer is
updated after the task finished running.
"""
self
.
before_processing
(
objects
,
self
)
i
=
self
.
_i
# negative index not allowed, -1 is special
while
i
[
0
]
<
len
(
objects
)
-
1
and
i
[
0
]
>=
-
1
:
i
[
0
]
+=
1
obj
=
objects
[
i
[
0
]]
obj
.
log_info
(
"Object is selected for processing"
)
callbacks
=
self
.
callback_chooser
(
obj
,
self
)
if
callbacks
:
try
:
self
.
run_callbacks
(
callbacks
,
objects
,
obj
)
i
[
1
]
=
[
0
]
# reset the callbacks pointer
except
StopProcessing
:
if
DEBUG
:
self
.
log_debug
(
"Processing was stopped: '
%s
' "
"(object:
%s
)"
%
(
str
(
callbacks
),
repr
(
obj
)))
obj
.
log_debug
(
"Processing has stopped"
)
break
except
JumpTokenBack
,
step
:
if
step
.
args
[
0
]
>
0
:
raise
WorkflowError
(
"JumpTokenBack cannot"
" be positive number"
)
if
DEBUG
:
self
.
log_debug
(
'Warning, we go back [
%s
] objects'
%
step
.
args
[
0
])
obj
.
log_debug
(
"Object preempted"
)
i
[
0
]
=
max
(
-
1
,
i
[
0
]
-
1
+
step
.
args
[
0
])
i
[
1
]
=
[
0
]
# reset the callbacks pointer
except
JumpTokenForward
,
step
:
if
step
.
args
[
0
]
<
0
:
raise
WorkflowError
(
"JumpTokenForward cannot"
" be negative number"
)
if
DEBUG
:
self
.
log_debug
(
'We skip [
%s
] objects'
%
step
.
args
[
0
])
obj
.
log_debug
(
"Object preempted"
)
i
[
0
]
=
min
(
len
(
objects
),
i
[
0
]
-
1
+
step
.
args
[
0
])
i
[
1
]
=
[
0
]
# reset the callbacks pointer
except
ContinueNextToken
:
if
DEBUG
:
self
.
log_debug
(
'Stop processing for this object, '
'continue with next'
)
obj
.
log_debug
(
"Object preempted"
)
i
[
1
]
=
[
0
]
# reset the callbacks pointer
continue
except
HaltProcessing
:
self
.
increase_counter_halted
()
if
DEBUG
:
self
.
log_info
(
'Processing was halted at step:
%s
'
%
i
)
# reraise the exception,
#this is the only case when a WFE can be completely
# stopped
obj
.
log_info
(
"Object proccesing is halted"
)
raise
except
Exception
,
e
:
self
.
log_info
(
message
=
"Unexpected error:
%r
"
%
(
e
,),
error_msg
=
traceback
.
format_exc
())
obj
.
log_error
(
"Something terribly wrong"
" happen to this object"
)
raise
# We save the object once it is fully run through
obj
.
save
(
CFG_OBJECT_VERSION
.
FINAL
)
obj
.
log_info
(
"Object proccesing is finished"
)
self
.
increase_counter_finished
()
##### Update the workflow
####self.save()
#obj.get_log().info('Success!')
self
.
log_info
(
"Done saving object:
%i
"
%
(
obj
.
id
,
))
self
.
after_processing
(
objects
,
self
)
def
execute_callback
(
self
,
callback
,
obj
):
"""Executes the callback - override this method to implement logging"""
obj
.
data
=
obj
.
get_data
()
try
:
callback
(
obj
,
self
)
finally
:
obj
.
set_data
(
obj
.
data
)
def
halt
(
self
,
msg
):
"""Halt the workflow (stop also any parent wfe)"""
self
.
log_debug
(
"Processing halted at task
%s
with message:
%s
"
%
(
self
.
getCurrTaskId
(),
msg
,
))
raise
HaltProcessing
(
"Processing halted at task
%s
with message:
%s
"
%
(
self
.
getCurrTaskId
(),
msg
,
))
def
set_counter_initial
(
self
,
obj_count
):
self
.
db_obj
.
counter_initial
=
obj_count
self
.
db_obj
.
counter_halted
=
0
self
.
db_obj
.
counter_error
=
0
self
.
db_obj
.
counter_finished
=
0
def
increase_counter_halted
(
self
):
self
.
db_obj
.
counter_halted
+=
1
def
increase_counter_error
(
self
):
self
.
db_obj
.
counter_error
+=
1
def
increase_counter_finished
(
self
):
self
.
db_obj
.
counter_finished
+=
1
def
set_workflow_by_name
(
self
,
workflow_name
):
workflow
=
get_workflow_definition
(
workflow_name
)
self
.
workflow_definition
=
workflow
self
.
setWorkflow
(
self
.
workflow_definition
.
workflow
)
def
set_extra_data_params
(
self
,
**
kwargs
):
for
key
,
value
in
kwargs
.
iteritems
():
self
.
extra_data
[
key
]
=
value
Event Timeline
Log In to Comment