Page Menu
Home
c4science
Search
Configure Global Search
Log In
Files
F94919204
bibfield_utils.py
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Subscribers
None
File Metadata
Details
File Info
Storage
Attached
Created
Wed, Dec 11, 09:54
Size
17 KB
Mime Type
text/x-python
Expires
Fri, Dec 13, 09:54 (2 d)
Engine
blob
Format
Raw Data
Handle
22899902
Attached To
R3600 invenio-infoscience
bibfield_utils.py
View Options
# -*- coding: utf-8 -*-
##
## This file is part of Invenio.
## Copyright (C) 2004, 2005, 2006, 2007, 2008, 2010, 2011, 2013 CERN.
##
## Invenio is free software; you can redistribute it and/or
## modify it under the terms of the GNU General Public License as
## published by the Free Software Foundation; either version 2 of the
## License, or (at your option) any later version.
##
## Invenio is distributed in the hope that it will be useful, but
## WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
## General Public License for more details.
##
## You should have received a copy of the GNU General Public License
## along with Invenio; if not, write to the Free Software Foundation, Inc.,
## 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA.
"""
BibField Utils
Helper classes and functions to work with BibField
"""
__revision__
=
"$Id$"
import
datetime
import
os
import
re
import
six
from
invenio.config
import
CFG_PYLIBDIR
from
invenio.pluginutils
import
PluginContainer
from
invenio.containerutils
import
SmartDict
from
invenio.importutils
import
try_to_eval
from
invenio.bibfield_config_engine
import
BibFieldParser
as
FieldParser
CFG_BIBFIELD_FUNCTIONS
=
PluginContainer
(
os
.
path
.
join
(
CFG_PYLIBDIR
,
'invenio'
,
'bibfield_functions'
,
'*.py'
))
CFG_BIBFIELD_PRODUCERS
=
PluginContainer
(
os
.
path
.
join
(
CFG_PYLIBDIR
,
'invenio'
,
'bibfield_functions'
,
'produce_*.py'
))
class
BibFieldException
(
Exception
):
"""
General exception to use within BibField
"""
pass
class
InvenioBibFieldContinuableError
(
Exception
):
"""BibField continuable error"""
pass
class
InvenioBibFieldError
(
Exception
):
"""BibField fatal error, @see CFG_BIBUPLOAD_BIBFIELD_STOP_ERROR_POLICY"""
class
SmartJson
(
SmartDict
):
"""Base class for record Json structure"""
def
__init__
(
self
,
json
):
super
(
SmartJson
,
self
)
.
__init__
(
json
)
self
.
_dict_bson
=
SmartDict
()
self
.
_validator
=
None
# if '__meta_metadata__.__additional_info__.model_meta_classes' in self:
# meta_classes = [import_string(str_cls)
# for str_cls in self['__meta_metadata__.__additional_info__.model_meta_classes']]
# self.__class__ = type(self.__class__.__name__,
# [self.__class__] + meta_classes, {})
def
__getitem__
(
self
,
key
):
"""
Uses the load capabilities to output the information stored in the DB.
"""
try
:
return
self
.
_dict_bson
[
key
]
except
KeyError
:
#We will try to find the key inside the json dict and load it
pass
main_key
=
SmartDict
.
main_key_pattern
.
sub
(
''
,
key
)
if
main_key
in
self
.
_dict
[
'__meta_metadata__'
][
'__aliases__'
]:
try
:
rest_of_key
=
SmartDict
.
main_key_pattern
.
findall
(
key
)[
0
]
except
IndexError
:
rest_of_key
=
''
return
self
[
self
.
_dict
[
'__meta_metadata__'
][
'__aliases__'
][
main_key
]
+
rest_of_key
]
try
:
if
self
.
_dict
[
'__meta_metadata__'
][
main_key
][
'type'
]
==
'calculated'
:
self
.
_load_precalculated_value
(
main_key
)
else
:
self
.
_loads
(
main_key
)
except
KeyError
:
self
.
_loads
(
main_key
)
return
self
.
_dict_bson
[
key
]
def
__setitem__
(
self
,
key
,
value
):
"""
Uses the dumps capabilities to set the items to store them in the DB
"""
main_key
=
SmartDict
.
main_key_pattern
.
sub
(
''
,
key
)
if
main_key
in
self
:
self
.
_dict_bson
[
key
]
=
value
else
:
from
invenio.bibfield
import
CFG_BIBFIELD_READERS
as
readers
reader
=
readers
[
'bibfield_
%s
reader.py'
%
(
self
[
'__meta_metadata__'
][
'__additional_info__'
][
'master_format'
],
)]()
reader
.
set
(
self
,
main_key
)
self
.
_dict_bson
[
key
]
=
value
self
.
_dumps
(
main_key
)
def
__eq__
(
self
,
other
):
try
:
for
key
in
self
.
keys
():
if
key
in
(
'__meta_metadata__'
,
):
pass
if
not
self
.
get
(
k
)
==
other
.
get
(
k
):
return
False
except
:
return
False
return
True
def
items
(
self
):
for
key
in
self
.
keys
():
yield
(
key
,
self
[
key
])
@property
def
fatal_errors
(
self
):
"""@return All the fatal/non-continuable errors that check_record has found"""
return
self
.
get
(
'__meta_metadata__.__errors__'
,
[])
@property
def
continuable_errors
(
self
):
"""@return All the continuable errors that check_record has found"""
return
self
.
get
(
'__meta_metadata__.__continuable_errors__'
,
[])
@property
def
validation_errors
(
self
):
if
self
.
_validator
is
None
:
self
.
validate
()
return
self
.
_validator
.
errors
def
check_record
(
self
,
reset
=
True
):
"""
Using the checking rules defined inside bibfied configurations files checks
if the record is well build. If not it stores the problems inside
self['__error_messages'] splitting then by continuable errors and fatal/non-continuable
errors
"""
def
check_rules
(
checker_functions
,
key
):
"""docstring for check_rule"""
for
checker_function
in
checker_functions
:
if
'all'
in
checker_function
[
0
]
or
self
[
'__meta_metadata__.__additional_info__.master_format'
]
in
checker_function
[
0
]:
try
:
try_to_eval
(
"
%s
(self,'
%s
',
%s
)"
%
(
checker_function
[
1
],
key
,
checker_function
[
2
]))
except
InvenioBibFieldContinuableError
,
err
:
self
[
'__meta_metadata__'
][
'__continuable_errors__'
]
.
append
(
'Checking CError - '
+
str
(
err
))
except
InvenioBibFieldError
,
err
:
self
[
'__meta_metadata__'
][
'__errors__'
]
.
append
(
'Checking Error - '
+
str
(
err
))
if
reset
or
'__meta_metadata___.__errors__'
not
in
self
or
'__meta_metadata___.__continuable_error__'
not
in
self
:
self
[
'__meta_metadata__'
][
'__errors__'
]
=
[]
self
[
'__meta_metadata__'
][
'__continuable_errors__'
]
=
[]
for
key
in
self
.
keys
():
try
:
check_rules
(
FieldParser
.
field_definitions
()[
key
][
'checker'
],
key
)
except
TypeError
:
for
kkey
in
FieldParser
.
field_definitions
()[
key
]:
check_rules
(
FieldParser
.
field_definitions
()[
kkey
][
'checker'
],
kkey
)
except
KeyError
:
continue
def
get
(
self
,
key
,
default
=
None
,
reset_cache
=
False
):
if
reset_cache
:
main_key
=
SmartDict
.
main_key_pattern
.
sub
(
''
,
key
)
self
.
_load_precalculated_value
(
main_key
,
force
=
True
)
try
:
return
self
[
key
]
except
KeyError
:
return
default
def
get_persistent_identifiers
(
self
):
"""
Using _persistent_identifiers_keys calculated fields gets a subset
of the record containing al persistent indentifiers
"""
return
dict
((
key
,
self
[
key
])
for
key
in
self
.
get
(
'persistent_identifiers_keys'
,
reset_cache
=
True
))
# def is_empty(self):
# """
# One record is empty if there is nothing stored inside rec_json or there is
# only '__key'
# """
# if len(self.keys()) == 0 or \
# all(key.startswith('__') for key in self.keys()):
# return True
# return False
def
dumps
(
self
):
""" """
for
key
in
self
.
_dict_bson
.
keys
():
if
key
==
'__meta_metadata__'
:
continue
self
.
_dumps
(
key
)
return
self
.
_dict
def
loads
(
self
):
""" """
for
key
in
self
.
_dict
.
keys
():
if
key
==
'__meta_metadata__'
:
continue
self
.
_loads
(
key
)
return
self
.
_dict_bson
.
_dict
def
produce
(
self
,
output
,
fields
=
None
):
return
CFG_BIBFIELD_PRODUCERS
[
'produce_'
+
output
](
self
,
fields
=
fields
)
def
validate
(
self
):
def
find_schema
(
json_id
):
schema
=
FieldParser
.
field_definitions
(
self
[
'__meta_metadata__'
][
'__additional_info__'
][
'namespace'
])
.
get
(
json_id
,
{})
if
isinstance
(
schema
,
list
):
for
jjson_id
in
schema
:
yield
FieldParser
.
field_definitions
(
self
[
'__meta_metadata__'
][
'__additional_info__'
][
'namespace'
])
.
get
(
jjson_id
,
{})
.
get
(
'schema'
,
{})
raise
StopIteration
()
yield
schema
.
get
(
'schema'
,
{})
if
self
.
_validator
is
None
:
schema
=
{}
# model_fields = ModelParser.model_definitions(self['__meta_metadata__']['__additional_info__']['namespace']).get(fields, {})
# if model_fields:
# for field in self.document.keys():
# if field not in model_fields:
# model_fields[field] = field
# model_field = [json_id for json_id in model_fields.values()]
# else:
# model_fields = self.document.keys()
model_fields
=
self
.
document
.
keys
()
for
json_id
in
model_fields
:
for
schema
in
find_schema
(
json_id
):
self
.
schema
.
update
(
schema
)
self
.
_validator
=
Validator
(
schema
=
shema
)
return
self
.
_validator
.
validate
(
self
)
def
_dumps
(
self
,
field
):
""" """
try
:
self
.
_dict
[
field
]
=
reduce
(
lambda
obj
,
key
:
obj
[
key
],
\
self
.
_dict
[
'__meta_metadata__'
][
field
][
'dumps'
],
\
FieldParser
.
field_definitions
(
self
[
'__meta_metadata__'
][
'__additional_info__'
][
'namespace'
]))(
self
.
_dict_bson
[
field
])
except
(
KeyError
,
IndexError
):
if
self
[
'__meta_metadata__'
][
field
][
'memoize'
]
or
\
self
[
'__meta_metadata__'
][
field
][
'type'
]
in
(
'derived'
,
'creator'
,
'UNKNOW'
):
self
.
_dict
[
field
]
=
self
.
_dict_bson
[
field
]
def
_loads
(
self
,
field
):
""" """
try
:
self
.
_dict_bson
[
field
]
=
reduce
(
lambda
obj
,
key
:
obj
[
key
],
\
self
.
_dict
[
'__meta_metadata__'
][
field
][
'loads'
],
\
FieldParser
.
field_definition
(
self
[
'__meta_metadata__'
][
'__additional_info__'
][
'namespace'
]))(
self
.
_dict
[
field
])
except
(
KeyError
,
IndexError
):
self
.
_dict_bson
[
field
]
=
self
.
_dict
[
field
]
def
_load_precalculated_value
(
self
,
field
,
force
=
False
):
"""
"""
if
self
.
_dict
[
'__meta_metadata__'
][
field
][
'memoize'
]
is
None
:
func
=
reduce
(
lambda
obj
,
key
:
obj
[
key
],
\
self
.
_dict
[
'__meta_metadata__'
][
field
][
'function'
],
\
FieldParser
.
field_definitions
())
self
.
_dict_bson
[
field
]
=
try_to_eval
(
func
,
CFG_BIBFIELD_FUNCTIONS
,
self
=
self
)
else
:
live_time
=
datetime
.
timedelta
(
0
,
self
.
_dict
[
'__meta_metadata__'
][
field
][
'memoize'
])
timestamp
=
datetime
.
datetime
.
strptime
(
self
.
_dict
[
'__meta_metadata__'
][
field
][
'timestamp'
],
"%Y-%m-
%d
T%H:%M:%S"
)
if
datetime
.
datetime
.
now
()
>
timestamp
+
live_time
or
force
:
old_value
=
self
.
_dict_bson
[
field
]
func
=
reduce
(
lambda
obj
,
key
:
obj
[
key
],
\
self
.
_dict
[
'__meta_metadata__'
][
field
][
'function'
],
\
FieldParser
.
field_definitions
(
self
[
'__meta_metadata__'
][
'__additional_info__'
][
'namespace'
]))
self
.
_dict_bson
[
field
]
=
try_to_eval
(
func
,
CFG_BIBFIELD_FUNCTIONS
,
self
=
self
)
if
not
old_value
==
self
.
_dict_bson
[
field
]:
#FIXME: trigger update in DB and fire signal to update others
pass
# Legacy methods, try not to use them as they are already deprecated
def
legacy_export_as_marc
(
self
):
"""
It creates a valid marcxml using the legacy rules defined in the config
file
"""
from
collections
import
Iterable
def
encode_for_marcxml
(
value
):
from
invenio.textutils
import
encode_for_xml
if
isinstance
(
value
,
unicode
):
value
=
value
.
encode
(
'utf8'
)
return
encode_for_xml
(
str
(
value
))
export
=
'<record>'
marc_dicts
=
self
.
produce
(
'json_for_marc'
)
for
marc_dict
in
marc_dicts
:
content
=
''
tag
=
''
ind1
=
''
ind2
=
''
for
key
,
value
in
marc_dict
.
iteritems
():
if
isinstance
(
value
,
six
.
string_types
)
or
not
isinstance
(
value
,
Iterable
):
value
=
[
value
]
for
v
in
value
:
if
v
is
None
:
continue
if
key
.
startswith
(
'00'
)
and
len
(
key
)
==
3
:
# Control Field (No indicators no subfields)
export
+=
'<controlfield tag="
%s
">
%s
</controlfield>
\n
'
%
(
key
,
encode_for_marcxml
(
v
))
elif
len
(
key
)
==
6
:
if
not
(
tag
==
key
[:
3
]
and
ind1
==
key
[
3
]
.
replace
(
'_'
,
''
)
and
ind2
==
key
[
4
]
.
replace
(
'_'
,
''
)):
tag
=
key
[:
3
]
ind1
=
key
[
3
]
.
replace
(
'_'
,
''
)
ind2
=
key
[
4
]
.
replace
(
'_'
,
''
)
if
content
:
export
+=
'<datafield tag="
%s
" ind1="
%s
" ind2="
%s
">
%s
</datafield>
\n
'
%
(
tag
,
ind1
,
ind2
,
content
)
content
=
''
content
+=
'<subfield code="
%s
">
%s
</subfield>'
%
(
key
[
5
],
encode_for_marcxml
(
v
))
else
:
pass
if
content
:
export
+=
'<datafield tag="
%s
" ind1="
%s
" ind2="
%s
">
%s
</datafield>
\n
'
%
(
tag
,
ind1
,
ind2
,
content
)
export
+=
'</record>'
return
export
def
legacy_create_recstruct
(
self
):
"""
It creates the recstruct representation using the legacy rules defined in
the configuration file
#CHECK: it might be a bit overkilling
"""
from
invenio.bibrecord
import
create_record
return
create_record
(
self
.
legacy_export_as_marc
())[
0
]
# def is_cacheable(self, field):
# """
# Check if a field is inside the __do_not_cache or not
# @return True if it is not in __do_not_cache
# """
# return not get_main_field(field) in self.rec_json['__do_not_cache']
# def update_field_cache(self, field):
# """
# Updates the value of the cache for the given calculated field
# """
# field = get_main_field(field)
# if re.search('^_[a-zA-Z0-9]', field) and not field in self.rec_json['__do_not_cache']:
# self.rec_json[field] = self._recalculate_field_value(field)[field]
#TODO: waiting for a pull request to Cerberus to be merged
from
cerberus
import
Validator
as
ValidatorBase
from
cerberus
import
ValidationError
,
SchemaError
from
cerberus
import
errors
class
Validator
(
ValidatorBase
):
"""
"""
def
__init__
(
self
,
schema
=
None
,
transparent_schema_rules
=
True
,
ignore_none_values
=
False
,
allow_unknown
=
True
):
super
(
Validator
,
self
)
.
__init__
(
schema
,
transparent_schema_rules
,
ignore_none_values
,
allow_unknown
)
def
_validate
(
self
,
document
,
schema
=
None
,
update
=
False
):
self
.
_errors
=
{}
self
.
update
=
update
if
schema
is
not
None
:
self
.
schema
=
schema
elif
self
.
schema
is
None
:
raise
SchemaError
(
errors
.
ERROR_SCHEMA_MISSING
)
if
not
isinstance
(
self
.
schema
,
dict
):
raise
SchemaError
(
errors
.
ERROR_SCHEMA_FORMAT
%
str
(
self
.
schema
))
if
document
is
None
:
raise
ValidationError
(
errors
.
ERROR_DOCUMENT_MISSING
)
if
not
hasattr
(
document
,
'get'
):
raise
ValidationError
(
errors
.
ERROR_DOCUMENT_FORMAT
%
str
(
document
))
self
.
document
=
document
special_rules
=
[
"required"
,
"nullable"
,
"type"
]
for
field
,
value
in
self
.
document
.
items
():
if
self
.
ignore_none_values
and
value
is
None
:
continue
definition
=
self
.
schema
.
get
(
field
)
if
definition
:
if
isinstance
(
definition
,
dict
):
if
definition
.
get
(
"nullable"
,
False
)
==
True
\
and
value
is
None
:
# noqa
continue
if
'type'
in
definition
:
self
.
_validate_type
(
definition
[
'type'
],
field
,
value
)
if
self
.
errors
:
continue
definition_rules
=
[
rule
for
rule
in
definition
.
keys
()
if
rule
not
in
special_rules
]
for
rule
in
definition_rules
:
validatorname
=
"_validate_"
+
rule
.
replace
(
" "
,
"_"
)
validator
=
getattr
(
self
,
validatorname
,
None
)
if
validator
:
validator
(
definition
[
rule
],
field
,
value
)
elif
not
self
.
transparent_schema_rules
:
raise
SchemaError
(
errors
.
ERROR_UNKNOWN_RULE
%
(
rule
,
field
))
else
:
raise
SchemaError
(
errors
.
ERROR_DEFINITION_FORMAT
%
field
)
else
:
if
not
self
.
allow_unknown
:
self
.
_error
(
field
,
errors
.
ERROR_UNKNOWN_FIELD
)
if
not
self
.
update
:
self
.
_validate_required_fields
()
return
len
(
self
.
_errors
)
==
0
Event Timeline
Log In to Comment