Page MenuHomec4science

utils.py
No OneTemporary

File Metadata

Created
Thu, May 16, 04:31

utils.py

# Copyright 2020 enviPath UG & Co. KG
#
# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
# documentation files (the "Software"), to deal in the Software without restriction, including without limitation
# the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software,
# and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all copies or substantial portions of
# the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
# TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
# THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF
# CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
# DEALINGS IN THE SOFTWARE.
from collections import defaultdict
from enviPath_python.objects import Pathway, Setting
class MultiGenUtils(object):
@staticmethod
def evaluate(pathway: Pathway, setting: Setting):
"""
Takes an pathway and uses the setting to predict a pathway with the exact same root node and compares
the resulting pathway against the provided one.
:param pathway: The pathway that is tried to predict.
:param setting: The setting used for prediction
:return:
"""
# TODO
pass
@staticmethod
def assemble_upsream(pathway: Pathway) -> dict:
res = defaultdict(set)
for edge in pathway.get_edges():
res[edge.get_end_nodes].add(edge.get_start_nodes())
return res
@staticmethod
def assemble_eval_weights(pathway: Pathway) -> defaultdict[set]:
res = defaultdict(lambda x: 1)
for node in pathway.get_nodes():
res[node] = 1 / 2 ** node.get_depth()
return res
@staticmethod
def compare_pathways(pred: Pathway, data: Pathway):
correct_ndoes = set()
incorrect_nodes = set()
correct_edges = set()
incorrect_edges = set()
pred_upstream = MultiGenUtils.assemble_upsream(pred)
pred_eval_weights = MultiGenUtils.assemble_eval_weights(pred)
data_upstream = MultiGenUtils.assemble_upsream(data)
data_eval_weights = MultiGenUtils.assemble_eval_weights(data)
tp_pred = 0.0
tp_data = 0.0
fp = 0.0
fn = 0.0
for node, outgoing_nodes in data_upstream.items():
if node in pred_upstream:
if node.get_depth() == 1:
# No upstream nodes available as this is the root
continue
else:
if data_upstream[node].intersection(pred_upstream[node]):
correct_ndoes.add(node)
for edge in data.get_edges():
if node in edge.get_end_nodes:
correct_edges.add(edge)
tp_pred = tp_pred + pred_eval_weights[node]
else: # No overlap
# TODO duplicate
fn = fn + pred_eval_weights[node]
incorrect_nodes.add(node)
for edge in data.get_edges():
if node in edge.get_end_nodes:
incorrect_edges.add(edge)
else:
# TODO duplicate
fn = fn + pred_eval_weights[node]
incorrect_nodes.add(node)
for edge in data.get_edges():
if node in edge.get_end_nodes:
incorrect_edges.add(edge)
return tp_pred, tp_data, fp, fn
class PackageUtils(object):
@staticmethod
def merge_packages(name, description, *packages):
result = {
"aliases": [],
"description": description,
"name": name,
}
id_lookup = dict()
# Combine unique objects and for duplicates obtain a lookup
compounds, compound_id_lookup = PackageUtils._merge_compounds(packages)
result["compounds"] = compounds
id_lookup.update(**compound_id_lookup)
rules, rule_id_lookup = PackageUtils._merge_rules(packages)
result["rules"] = rules
id_lookup.update(**rule_id_lookup)
reactions, reaction_id_lookup = PackageUtils._merge_reactions(packages)
result["reactions"] = reactions
id_lookup.update(**reaction_id_lookup)
pathways, pathway_id_lookup = PackageUtils._merge_pathways(packages)
result["pathways"] = pathways
id_lookup.update(**pathway_id_lookup)
scenarios, scenario_id_lookup = PackageUtils._merge_scenarios(packages)
result["scenarios"] = scenarios
id_lookup.update(**scenario_id_lookup)
# Fix links to base entities
result = PackageUtils._fix_links(result, id_lookup)
return result
@staticmethod
def _merge_compounds(*packages):
lookup = dict()
compounds = defaultdict(list)
# Collect all compounds grouped by normalized structure SMILES
for package in packages:
for compound in package["compounds"]:
compounds[compound["normalizedStructure"]].append(compound)
result = list()
for key, values in compounds.items():
merged_compound = None
for val in values:
if merged_compound is None:
merged_compound = val
continue
for structure in val['structures']:
already_existing = False
for existing_structure in merged_compound['structures']:
if structure['smiles'] == existing_structure['smiles']:
lookup[structure['id']] = existing_structure['id']
# TODO merge names?
# Other names as alias
already_existing = True
break
if already_existing:
continue
merged_compound['structures'].append(structure)
result.append(merged_compound)
return result, lookup
@staticmethod
def _merge_rules(*packages):
lookup = dict()
simple_rules = defaultdict(list)
parallel_rules = defaultdict(list)
sequential_rules = defaultdict(list)
for package in packages:
for rule in package["rules"]:
if rule["identifier"] == "simple-rule":
simple_rules[rule["smirks"]].append(rule)
elif rule["identifier"] == "parallel-rule":
sorted_smirks = tuple(sorted([r['smirks'] for r in rule['simpleRules']]))
parallel_rules[sorted_smirks].append(rule)
elif rule['identifier'] == "sequential-rule":
sorted_smirks = tuple(sorted([r['smirks'] for r in rule['simpleRules']]))
sequential_rules[sorted_smirks].append(rule)
else:
raise ValueError("Unknown rule identifiere {}".format(rule["identifier"]))
result = []
return result, lookup
@staticmethod
def _merge_reactions(*packages):
return [], {}
@staticmethod
def _merge_pathways(*packages):
result = list()
for package in packages:
result.extend(package["pathways"])
return result, {}
@staticmethod
def _merge_scenarios(*packages):
result = list()
for package in packages:
result.extend(package["scenarios"])
return result, {}
@staticmethod
def _fix_links(package, lookup):
return package

Event Timeline