Page MenuHomec4science

old_py2ttl.py
No OneTemporary

File Metadata

Created
Sat, May 18, 04:10

old_py2ttl.py

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
""" This program can be used to convert py classes that are
subclasses of class_spec.SemanticClass to
a owl ontology in turtle format.
DEPRECATED
"""
# Copyright (C) University of Basel 2019 {{{1
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/> 1}}}
from deprecated import deprecated
import getopt
import importlib
import importlib.util
import inspect
import lxml.etree as ET
from os import sep, path, listdir
from os.path import isfile, isdir, dirname, basename
from progress.bar import Bar
from rdflib import Graph, URIRef, Literal, BNode, OWL, RDF, RDFS, XSD
import re
import sys
if dirname(__file__) not in sys.path:
sys.path.append(dirname(__file__))
from class_spec import SemanticClass
from config import check_config_files_exist, get_datatypes_dir, PROJECT_NAME, PROJECT_ONTOLOGY_FILE, PROJECT_URL, SHARED_ONTOLOGIES_DIR
from knora_base import KNORA_BASE
sys.path.append('shared_util')
from myxmlwriter import dict2xml
__author__ = "Christian Steiner"
__maintainer__ = __author__
__copyright__ = 'University of Basel'
__email__ = "christian.steiner@unibas.ch"
__status__ = "Development"
__license__ = "GPL v3"
__version__ = "0.0.1"
DEPRECATED_MSG = """This converter creates a KNORA conform (or quasi KNORA conform) ontology. Departing from the goal of creating such an ontology,
the semantic_dictionaries changed. Hence, it will no longer be possible to convert the semantic classes with this converter."""
class Py2TTLConverter:
"""This class can be used convert semantic_dictionaries to a owl ontology in turtle format.
"""
UNITTESTING = False
INFO_DICT_KEYS = [ 'cardinality_restriction', 'comment', 'label', 'name', 'xpath' ]
@deprecated(reason=DEPRECATED_MSG)
def __init__(self, project_ontology_file=None, create_super_cls_for_multi_property=True):
self.list_value = -99
self.class_uri_dict = {}
self.uri_xpath_mapping = {}
self.uri_mapping4cls_and_properties = {}
self.create_super_cls_for_multi_property = create_super_cls_for_multi_property
self.project_graph = Graph()
self.base_uriref = URIRef(PROJECT_URL)
self.project_name = PROJECT_NAME
self.ns = { self.base_uriref + '#': self.project_name }
if project_ontology_file is not None and isfile(project_ontology_file):
self.project_graph.parse(project_ontology_file, format="turtle")
if len(self.project_graph) > 0:
self.base_uriref = self.project_graph.value(predicate=RDF.type, object=OWL.Ontology, any=False)
self.ns = { uriref: ns for ns, uriref in self.project_graph.namespace_manager.namespaces() }
self.project_name = self.ns.get(self.base_uriref + '#')
self.project_graph.bind(self.project_name, self.base_uriref + '#')
self.uri_mapping4cls_and_properties.update({ 'ontology': { 'project_name': self.project_name, 'project_uri': self.base_uriref + '#' }})
self.uri_mapping4cls_and_properties.update({ 'classes': {} })
def addClass(self, cls_uri, comment='', label='', super_uri=KNORA_BASE.Resource):
"""Add a class to project_graph.
"""
if comment == '' and cls_uri.split('#')[1] in self.class_uri_dict:
comment, label = self.get_comment_label(self.class_uri_dict.get(cls_uri.split('#')[1]))
self.project_graph.add((cls_uri, RDF.type, OWL.Class))
if comment != '':
self.project_graph.add((cls_uri, RDFS.comment, Literal(comment, lang='en')))
if label != '':
self.project_graph.add((cls_uri, RDFS.label, Literal(label, lang='en')))
self.project_graph.add((cls_uri, RDFS.subClassOf, super_uri))
def addProperty(self, property_uri, super_uri, subject_uri, object_uri, comment, label, cardinality, info_dict={}):
"""Add a property to self.project_graph.
"""
self.project_graph.add((property_uri, RDF.type, OWL.ObjectProperty))
self.project_graph.add((property_uri, RDFS.subPropertyOf, super_uri))
self.project_graph.add((property_uri, KNORA_BASE.objectClassConstraint, object_uri))
self.project_graph.add((property_uri, KNORA_BASE.subjectClassConstraint, subject_uri))
self.project_graph.add((property_uri, RDFS.comment, Literal(comment, lang='en')))
self.project_graph.add((property_uri, RDFS.label, Literal(label, lang='en')))
self.addRestriction2Class(subject_uri, property_uri, cardinality=cardinality, info_dict=info_dict)
def addRestriction2Class(self, cls_uri, property_uri, cardinality=0, comment="", label="", info_dict={}):
"""Adds restriction on property_uri to class cls_uri.
"""
if (cls_uri, None, None) not in self.project_graph:
self.addClass(cls_uri, comment=comment, label=label)
restriction = BNode()
if 'cardinality_restriction' in info_dict.keys():
cardinality_restriction = URIRef(OWL + info_dict['cardinality_restriction'])
else:
cardinality_restriction = OWL.minCardinality if cardinality == 0 else OWL.cardinality
self.project_graph.add((cls_uri, RDFS.subClassOf, restriction))
self.project_graph.add((restriction, RDF.type, OWL.Restriction))
self.project_graph.add((restriction, OWL.onProperty, property_uri))
self.project_graph.add((restriction, cardinality_restriction, Literal(str(cardinality), datatype=XSD.nonNegativeInteger)))
def convert_py2ttl(self, datatypes_dir, target_ontology_file):
"""Convert all classes contained in datatypes_dir that are subclasses of class_spec.SemanticClass to rdf.
:return: exit code (int)
"""
if isdir(datatypes_dir):
semantic_classes = self.get_semantic_classes(datatypes_dir)
if not Py2TTLConverter.UNITTESTING:
bar = Bar('creating classes and properties', max=len(semantic_classes))
for cls in semantic_classes:
self.createClassAndProperties(cls)
not bool(Py2TTLConverter.UNITTESTING) and bar.next()
not bool(Py2TTLConverter.UNITTESTING) and bar.finish()
f = open(target_ontology_file, 'wb+')
f.write(self.project_graph.serialize(format="turtle"))
f.close()
if not Py2TTLConverter.UNITTESTING:
xml_file = 'mapping_file4' + datatypes_dir.replace(sep, '.') + '2' + target_ontology_file.replace('.' + sep, '').replace(sep, '.').replace('.ttl', '.xml')
dict2xml(self.uri_mapping4cls_and_properties, xml_file)
else:
print('Error: dir {} does not exist!'.format(datatypes_dir))
usage
return 1
return 0
def createClassAndProperties(self, cls):
"""Creates a owl:Class and some owl:ObjectProperty from semantic_dictionary of a python class.
"""
if not cls.__name__ in self.class_uri_dict:
self.class_uri_dict.update({cls.__name__: cls})
semantic_dict = cls.get_semantic_dictionary()
super_uri = KNORA_BASE.Resource
super_cls = None
if bool(semantic_dict['class'].get('rdfs:subClassOf')):
super_uri = URIRef(semantic_dict['class'].get('rdfs:subClassOf'))
if bool(semantic_dict['class'].get('type')):
super_cls = semantic_dict['class'].get('type')
self.createClassAndProperties(super_cls)
super_uri = URIRef(self.base_uriref + '#' + super_cls.__name__)
cls_uri = URIRef(self.base_uriref + '#' + cls.__name__)
comment, label = self.get_comment_label(cls)
self.addClass(cls_uri, comment, label, super_uri)
uri_mapping4properties = {}
for property_key in self._get_semantic_dictionary_keys_super_first(semantic_dict['properties']):
super_semantic_dict = {} if super_cls is None else super_cls.get_semantic_dictionary()
if len(super_semantic_dict) == 0 or not bool(super_semantic_dict['properties'].get(property_key))\
or type(super_semantic_dict['properties'].get(property_key)) == dict\
or super_semantic_dict['properties'].get(property_key)[1] != self.list_value\
or semantic_dict['properties'].get(property_key)[0] != super_semantic_dict['properties'].get(property_key)[0]:
try:
if type(semantic_dict['properties'].get(property_key)) == dict:
property_dict4key = semantic_dict['properties'].get(property_key)
object_uri = None
property_cls = None
property_uri = None
cardinality = property_dict4key.get('cardinality')
xpath = property_dict4key.get('xpath')
if property_dict4key.get('flag') == 'ordered_list':
object_uri, seqnum_uri, part_property_uri = self.createPartProperty(property_dict4key)
property_uri = URIRef(self.base_uriref + '#' + property_dict4key.get('name'))
self.uri_mapping4cls_and_properties['classes'].update({\
property_dict4key['class'].get('class_name'):\
{ 'class_uri': object_uri,\
'properties': { SemanticClass.HAS_SEQNUM: seqnum_uri, SemanticClass.HAS_PART: part_property_uri} }\
})
#print(property_key, object_uri, property_uri, seqnum_uri, part_property_uri)
else:
property_cls = property_dict4key.get('class')
subject_uri, property_uri = self.createProperty(cls_uri, property_key, property_cls=property_cls,\
property_uri=property_uri, cardinality=cardinality, object_uri=object_uri, info_dict=property_dict4key)
else:
property_cls, cardinality, xpath = semantic_dict['properties'].get(property_key)
subject_uri, property_uri = self.createProperty(cls_uri, property_key, property_cls, cardinality)
if not subject_uri in self.uri_xpath_mapping:
self.uri_xpath_mapping.update({ subject_uri: {}})
self.uri_xpath_mapping.get(subject_uri).update({property_uri: xpath})
uri_mapping4properties.update({ property_key: property_uri })
except ValueError:
raise Exception('Class {} does not have a xpath spec in its get_semantic_dictionary()'.format(cls))
elif bool(self.uri_mapping4cls_and_properties.get('classes').get(super_cls.__name__).get('properties').get(property_key)):
property_uri = self.uri_mapping4cls_and_properties['classes'][super_cls.__name__]['properties'][property_key]
uri_mapping4properties.update({ property_key: property_uri})
self.uri_mapping4cls_and_properties.get('classes').update({ cls.__name__: { 'class_uri': cls_uri, 'properties': uri_mapping4properties }})
def createPartProperty(self, info_dict):
"""Creates a owl:ObjectProperty from a dictionary created by SemanticClass.get_cls_hasPart_objectCls_dictionaries().
:return: subject_uri (rdflib.URIRef)
"""
if info_dict.get('flag') == 'ordered_list' and type(info_dict.get('class')) == dict:
dictionary = info_dict.get('class')
subject_cls_name = dictionary.get('class_name')
subject_label = dictionary.get('label')
subject_comment = dictionary.get('comment')
subject_uri = URIRef(self.base_uriref + '#' + subject_cls_name)
self.addClass(subject_uri, comment=subject_comment, label=subject_label)
seqnum_dictionary = dictionary.get(SemanticClass.HAS_SEQNUM)
seqnum_name = seqnum_dictionary.get('name')
seqnum_xpath= seqnum_dictionary.get('xpath')
seqnum_property_uri = URIRef(self.base_uriref + '#' + seqnum_name)
subject_uri, seqnum_property_uri = self.createProperty(subject_uri, property_uri=seqnum_property_uri, property_cls=int,\
cardinality=seqnum_dictionary.get('cardinality'), super_uri=KNORA_BASE.seqnum, info_dict=seqnum_dictionary)
if not subject_uri in self.uri_xpath_mapping:
self.uri_xpath_mapping.update({ subject_uri: {}})
self.uri_xpath_mapping.get(subject_uri).update({seqnum_property_uri: seqnum_xpath})
part_dictionary = dictionary.get(SemanticClass.HAS_PART)
part_property_uri = URIRef(self.base_uriref + '#' + part_dictionary.get('name'))
part_xpath = part_dictionary.get('xpath')
object_uri = URIRef(self.base_uriref + '#' + part_dictionary.get('class').__name__)
subject_uri, property_uri = self.createProperty(subject_uri, property_uri=part_property_uri, object_uri=object_uri,\
cardinality=part_dictionary.get('cardinality'), info_dict=part_dictionary)
self.uri_xpath_mapping.get(subject_uri).update({part_property_uri: part_xpath})
return subject_uri, seqnum_property_uri, property_uri
def createProperty(self, cls_uri, property_name=None, property_cls=None, cardinality=0, property_uri=None, super_uri=None, object_uri=None, info_dict={}):
"""Creates a owl:ObjectProperty.
:return: tuple of subject_uri (rdflib.URIRef) and property_uri (rdflib.URIRef) of created property
"""
inferredSubClass = RDFS.subClassOf * '*'
if property_uri is None:
name = self.createPropertyName(property_name=property_name)\
if 'name' not in info_dict.keys() else info_dict['name']
property_uri = URIRef(self.base_uriref + '#' + name)
subject_uri = cls_uri
label = 'has ' + name.replace('has','')\
if 'label' not in info_dict.keys() else info_dict['label']
if super_uri is None:
super_uri = KNORA_BASE.hasValue
if (property_uri, None, None) not in self.project_graph:
if object_uri is None and property_cls.__module__ == 'builtins':
datatype_mapping = { float: KNORA_BASE.DecimalValue, int: KNORA_BASE.IntValue,\
str: KNORA_BASE.TextValue, bool: KNORA_BASE.BooleanValue }
if property_cls == bool:
cardinality = 1
info_dict.update({'cardinality_restriction': 'cardinality'})
object_uri = datatype_mapping.get(property_cls)
if object_uri == KNORA_BASE.TextValue:
if property_name == 'URL':
object_uri = KNORA_BASE.UriValue
elif property_name == 'file_name':
object_uri = KNORA_BASE.FileValue
else:
if object_uri is None:
object_uri = URIRef(self.base_uriref + '#' + property_cls.__name__)
# if class X has a list of objects Y, we create a property YbelongsToX.
if cardinality == self.list_value:
subject_uri = object_uri
object_uri = cls_uri
result = self.project_graph.query(\
'select ?p where {'\
+ ' ?p <{0}> ?s .'.format(KNORA_BASE.subjectClassConstraint)\
+ ' ?p <{0}> <{1}> .'.format(KNORA_BASE.objectClassConstraint, object_uri)\
+ ' <{0}> <http://www.w3.org/2000/01/rdf-schema#subClassOf> ?s .'.format(subject_uri)\
+ ' }')
# if subject_uri is a subclass of a uri that is a subjectClassConstraint to a property_uri
# that has object_uri as its objectClassConstraint, then we do not create a new property YbelongsToX,
# instead we return subject_uri and this already existing property_uri.
if len(result) > 0:
return subject_uri, [ property_uri for property_uri in result ][0]
name = self.createPropertyName(subject_uri=subject_uri, object_uri=object_uri)
property_uri = URIRef(self.base_uriref + '#' + name)
cardinality = 1
label = subject_uri.split('#')[1] + ' belongs to ' + object_uri.split('#')[1]
super_uri = KNORA_BASE.hasLinkTo
property_value_uri = URIRef(property_uri + 'Value')
comment = 'Reification statement of relation between {} and {}'.format(subject_uri.split('#')[1], object_uri.split('#')[1])
reification_info_dict = {}
if 'cardinality_restriction' in info_dict.keys():
reification_info_dict.update({'cardinality_restriction': info_dict['cardinality_restriction']})
self.addProperty(property_value_uri, KNORA_BASE.hasLinkToValue, subject_uri, KNORA_BASE.LinkValue,\
comment, label + ' - statement', cardinality, info_dict=reification_info_dict)
comment = 'connects {} with {}'.format(subject_uri.split('#')[1], object_uri.split('#')[1])\
if 'comment' not in info_dict.keys() else info_dict['comment']
self.addProperty(property_uri, super_uri, subject_uri, object_uri, comment, label, cardinality, info_dict=info_dict)
elif not True in [\
(cls_uri, inferredSubClass, o) in self.project_graph\
for o in self.project_graph.objects(property_uri, KNORA_BASE.subjectClassConstraint)\
]:
# if cls_uri is NOT a subclass of a cls specified by KNORA_BASE.subjectClassConstraint
self.addRestriction2Class(subject_uri, property_uri, cardinality=cardinality, info_dict=info_dict)
if self.create_super_cls_for_multi_property:
self.createSuperClassForSubjectClassConstraint(property_uri, subject_uri)
else:
# not sure if Knora accepts this, i.e. several subject_uris specified by KNORA_BASE.subjectClassConstraint.
self.project_graph.add((property_uri, KNORA_BASE.subjectClassConstraint, subject_uri))
return subject_uri, property_uri
def createPropertyName(self, property_name=None, subject_uri=None, object_uri=None, connector='BelongsTo', prefix='has'):
"""Returns a property name.
"""
if property_name is not None:
property_name = ''.join([ property_name.split('_')[0].lower() ] + [ text.capitalize() for text in property_name.split('_')[1:] ])
return prefix + property_name[0].upper() + property_name[1:] if property_name[0].islower()\
else prefix + property_name
elif subject_uri is not None:
property_name = subject_uri.split('#')[1] + self.createPropertyName(object_uri=object_uri, prefix=connector)
return property_name[0].lower() + property_name[1:]
elif object_uri is not None:
return prefix + object_uri.split('#')[1]
else:
return prefix
def createSuperClassForSubjectClassConstraint(self, property_uri, sub_uri):
"""Creates a super class for classes that share a property.
"""
super_uri = URIRef(property_uri.replace('has', '') + 'Holder')
self.project_graph.add((sub_uri, RDFS.subClassOf, super_uri))
self.project_graph.remove((sub_uri, RDFS.subClassOf, KNORA_BASE.Resource))
if (super_uri, RDF.type, OWL.Class) not in self.project_graph:
label = 'holder of ' + property_uri.split('#')[1].replace('has', '')
comment = 'super class for classes that have a ' + property_uri.split('#')[1].replace('has', '')
self.addRestriction2Class(super_uri, property_uri, comment=comment, label=label)
for object_uri in self.project_graph.objects(subject=property_uri, predicate=KNORA_BASE.subjectClassConstraint):
self.project_graph.remove((property_uri, KNORA_BASE.subjectClassConstraint, object_uri))
self.project_graph.add((object_uri, RDFS.subClassOf, super_uri))
self.project_graph.remove((object_uri, RDFS.subClassOf, KNORA_BASE.Resource))
self.project_graph.add((property_uri, KNORA_BASE.subjectClassConstraint, super_uri))
objectClass = self.project_graph.value(subject=property_uri, predicate=KNORA_BASE.objectClassConstraint, any=False)
comment = 'connects {} with {}'.format(super_uri.split('#')[1], objectClass.split('#')[1].replace('has', ''))
self.project_graph.remove((property_uri, RDFS.comment, None))
self.project_graph.add((property_uri, RDFS.comment, Literal(comment, lang='en')))
def get_comment_label(self, cls):
"""Returns comment and label from cls __doc__.
"""
comment = cls.__doc__.replace('\n','').lstrip()
label = cls.__name__
if '.' in cls.__doc__:
comment = [ text for text in cls.__doc__.split('\n') if text != '' ][0].lstrip()
if '@label' in cls.__doc__:
m = re.search('(@label[:]*\s)(.*[\.]*)', cls.__doc__)
label_tag, label = m.groups()
elif re.search('([A-Z][a-z]+)', label):
m = re.search('([A-Z]\w+)([A-Z]\w+)', label)
label = ' '.join([ text.lower() for text in re.split(r'([A-Z][a-z]+)', label) if text != '' ])
return comment, label
def get_semantic_classes(self, datatypes_dir):
"""Returns a list of all classes that are contained in datatypes_dir that are subclasses of class_spec.SemanticClass.
:return: a list of (str_name, class)
"""
base_dir = dirname(dirname(__file__))
sys.path.append(base_dir)
root_modul_name = datatypes_dir.replace('/','.')
try:
self.list_value = SemanticClass.LIST
except AttributeError:
pass
files = [ file.replace('.py','') for file in listdir(datatypes_dir) if file.endswith('.py') and not file.startswith('test_') and not file.startswith('_')]
all_modules = []
for name in files:
all_modules.append(importlib.import_module('{}.{}'.format(root_modul_name, name)))
all_classes = []
for modul in all_modules:
all_classes += inspect.getmembers(modul, inspect.isclass)
all_classes = sorted(set(all_classes))
semantic_classes = [ cls for name, cls in all_classes if issubclass(cls, SemanticClass) and not (cls == SemanticClass)]
return semantic_classes
def _get_builtin_cls_keys(self, property_dict):
"""Returns a list of keys for classes that are builtin.
"""
builtin_cls_keys = []
for key in property_dict.keys():
property_cls = property_dict.get(key).get('class')\
if type(property_dict.get(key)) is dict\
else property_dict.get(key)[0]
if type(property_cls) != dict\
and property_cls.__module__ == 'builtins':
builtin_cls_keys.append(key)
return builtin_cls_keys
def _get_semantic_dictionary_keys_super_first(self, property_dict):
"""Sorts the keys of the property part of a semantic dictionary
and returns the keys for super classes before keys of subclasses.
:return: a sorted list of keys.
"""
builtin_cls_keys = self._get_builtin_cls_keys(property_dict)
complex_cls_keys = []
for key in [ key for key in property_dict.keys()\
if key not in builtin_cls_keys ]:
current_cls = property_dict.get(key).get('class')\
if type(property_dict.get(key)) is dict\
else property_dict.get(key)[0]
key_inserted = False
for index, cls_key in enumerate(complex_cls_keys):
potential_sub_cls = property_dict.get(cls_key).get('class')\
if type(property_dict.get(cls_key)) is dict\
else property_dict.get(cls_key)[0]
if type(potential_sub_cls) != dict\
and type(current_cls) != dict\
and issubclass(potential_sub_cls, current_cls):
complex_cls_keys.insert(index, key)
key_inserted = True
break
if not key_inserted:
complex_cls_keys.append(key)
return builtin_cls_keys + complex_cls_keys
def write_mapping_file(self, datatypes_dir, target_ontology_file):
"""Write a mapping xml file for each semantic class.
"""
pass
def create_dummy_cls(class_name):
"""Return a dummy class for class_name (str).
"""
exec('class %s:pass' % class_name)
return eval('%s' % class_name)
def usage():
"""prints information on how to use the script
"""
print(main.__doc__)
def main(argv):
"""This program can be used to convert py classes that are subclasses of class_spec.SemanticClass to owl:Class.
py2ttl/py2ttl.py [OPTIONS <dir>]
<dir> [optional] directory containing datatypes that are subclasses of class_spec.SemanticClass.
Overwrites DATATYPES_DIR in py2ttl/config.py.
OPTIONS:
-h|--help: show help
-s|--source=source_ontology_file source ontology ttl file, option overwrites PROJECT_ONTOLOGY_FILE in py2ttl/config.py
-t|--target=target_ontology_file target ontology ttl file, default: 'PROJECT_PREFIX-ontology_autogenerated.ttl'
:return: exit code (int)
"""
check_config_files_exist()
datatypes_dir = get_datatypes_dir()
source_ontology_file = PROJECT_ONTOLOGY_FILE
target_ontology_file = '.{0}{1}-ontology_autogenerated.ttl'.format(sep, PROJECT_NAME)
try:
opts, args = getopt.getopt(argv, "hs:t:", ["help","source=", "target="])
except getopt.GetoptError:
usage()
return 2
for opt, arg in opts:
if opt in ('-h', '--help'):
usage()
return 0
elif opt in ('-t', '--target'):
target_ontology_file = arg
elif opt in ('-s', '--source'):
source_ontology_file = arg
converter = Py2TTLConverter(project_ontology_file=source_ontology_file)
if len(args) < 1 and datatypes_dir is not None:
return converter.convert_py2ttl(datatypes_dir, target_ontology_file)
else:
for datatypes_dir in args:
if converter.convert_py2ttl(datatypes_dir, target_ontology_file) > 0:
return 2
return 0 if len(args) > 1 else 2
if __name__ == "__main__":
sys.exit(main(sys.argv[1:]))

Event Timeline