Page Menu
Home
c4science
Search
Configure Global Search
Log In
Files
F62867353
utils.py
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Subscribers
None
File Metadata
Details
File Info
Storage
Attached
Created
Thu, May 16, 04:31
Size
7 KB
Mime Type
text/x-python
Expires
Sat, May 18, 04:31 (2 d)
Engine
blob
Format
Raw Data
Handle
17700948
Attached To
rTWTEST master_thesis_Lee
utils.py
View Options
# Copyright 2020 enviPath UG & Co. KG
#
# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
# documentation files (the "Software"), to deal in the Software without restriction, including without limitation
# the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software,
# and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all copies or substantial portions of
# the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
# TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
# THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF
# CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
# DEALINGS IN THE SOFTWARE.
from
collections
import
defaultdict
from
enviPath_python.objects
import
Pathway
,
Setting
class
MultiGenUtils
(
object
):
@staticmethod
def
evaluate
(
pathway
:
Pathway
,
setting
:
Setting
):
"""
Takes an pathway and uses the setting to predict a pathway with the exact same root node and compares
the resulting pathway against the provided one.
:param pathway: The pathway that is tried to predict.
:param setting: The setting used for prediction
:return:
"""
# TODO
pass
@staticmethod
def
assemble_upsream
(
pathway
:
Pathway
)
->
dict
:
res
=
defaultdict
(
set
)
for
edge
in
pathway
.
get_edges
():
res
[
edge
.
get_end_nodes
]
.
add
(
edge
.
get_start_nodes
())
return
res
@staticmethod
def
assemble_eval_weights
(
pathway
:
Pathway
)
->
defaultdict
[
set
]:
res
=
defaultdict
(
lambda
x
:
1
)
for
node
in
pathway
.
get_nodes
():
res
[
node
]
=
1
/
2
**
node
.
get_depth
()
return
res
@staticmethod
def
compare_pathways
(
pred
:
Pathway
,
data
:
Pathway
):
correct_ndoes
=
set
()
incorrect_nodes
=
set
()
correct_edges
=
set
()
incorrect_edges
=
set
()
pred_upstream
=
MultiGenUtils
.
assemble_upsream
(
pred
)
pred_eval_weights
=
MultiGenUtils
.
assemble_eval_weights
(
pred
)
data_upstream
=
MultiGenUtils
.
assemble_upsream
(
data
)
data_eval_weights
=
MultiGenUtils
.
assemble_eval_weights
(
data
)
tp_pred
=
0.0
tp_data
=
0.0
fp
=
0.0
fn
=
0.0
for
node
,
outgoing_nodes
in
data_upstream
.
items
():
if
node
in
pred_upstream
:
if
node
.
get_depth
()
==
1
:
# No upstream nodes available as this is the root
continue
else
:
if
data_upstream
[
node
]
.
intersection
(
pred_upstream
[
node
]):
correct_ndoes
.
add
(
node
)
for
edge
in
data
.
get_edges
():
if
node
in
edge
.
get_end_nodes
:
correct_edges
.
add
(
edge
)
tp_pred
=
tp_pred
+
pred_eval_weights
[
node
]
else
:
# No overlap
# TODO duplicate
fn
=
fn
+
pred_eval_weights
[
node
]
incorrect_nodes
.
add
(
node
)
for
edge
in
data
.
get_edges
():
if
node
in
edge
.
get_end_nodes
:
incorrect_edges
.
add
(
edge
)
else
:
# TODO duplicate
fn
=
fn
+
pred_eval_weights
[
node
]
incorrect_nodes
.
add
(
node
)
for
edge
in
data
.
get_edges
():
if
node
in
edge
.
get_end_nodes
:
incorrect_edges
.
add
(
edge
)
return
tp_pred
,
tp_data
,
fp
,
fn
class
PackageUtils
(
object
):
@staticmethod
def
merge_packages
(
name
,
description
,
*
packages
):
result
=
{
"aliases"
:
[],
"description"
:
description
,
"name"
:
name
,
}
id_lookup
=
dict
()
# Combine unique objects and for duplicates obtain a lookup
compounds
,
compound_id_lookup
=
PackageUtils
.
_merge_compounds
(
packages
)
result
[
"compounds"
]
=
compounds
id_lookup
.
update
(
**
compound_id_lookup
)
rules
,
rule_id_lookup
=
PackageUtils
.
_merge_rules
(
packages
)
result
[
"rules"
]
=
rules
id_lookup
.
update
(
**
rule_id_lookup
)
reactions
,
reaction_id_lookup
=
PackageUtils
.
_merge_reactions
(
packages
)
result
[
"reactions"
]
=
reactions
id_lookup
.
update
(
**
reaction_id_lookup
)
pathways
,
pathway_id_lookup
=
PackageUtils
.
_merge_pathways
(
packages
)
result
[
"pathways"
]
=
pathways
id_lookup
.
update
(
**
pathway_id_lookup
)
scenarios
,
scenario_id_lookup
=
PackageUtils
.
_merge_scenarios
(
packages
)
result
[
"scenarios"
]
=
scenarios
id_lookup
.
update
(
**
scenario_id_lookup
)
# Fix links to base entities
result
=
PackageUtils
.
_fix_links
(
result
,
id_lookup
)
return
result
@staticmethod
def
_merge_compounds
(
*
packages
):
lookup
=
dict
()
compounds
=
defaultdict
(
list
)
# Collect all compounds grouped by normalized structure SMILES
for
package
in
packages
:
for
compound
in
package
[
"compounds"
]:
compounds
[
compound
[
"normalizedStructure"
]]
.
append
(
compound
)
result
=
list
()
for
key
,
values
in
compounds
.
items
():
merged_compound
=
None
for
val
in
values
:
if
merged_compound
is
None
:
merged_compound
=
val
continue
for
structure
in
val
[
'structures'
]:
already_existing
=
False
for
existing_structure
in
merged_compound
[
'structures'
]:
if
structure
[
'smiles'
]
==
existing_structure
[
'smiles'
]:
lookup
[
structure
[
'id'
]]
=
existing_structure
[
'id'
]
# TODO merge names?
# Other names as alias
already_existing
=
True
break
if
already_existing
:
continue
merged_compound
[
'structures'
]
.
append
(
structure
)
result
.
append
(
merged_compound
)
return
result
,
lookup
@staticmethod
def
_merge_rules
(
*
packages
):
lookup
=
dict
()
simple_rules
=
defaultdict
(
list
)
parallel_rules
=
defaultdict
(
list
)
sequential_rules
=
defaultdict
(
list
)
for
package
in
packages
:
for
rule
in
package
[
"rules"
]:
if
rule
[
"identifier"
]
==
"simple-rule"
:
simple_rules
[
rule
[
"smirks"
]]
.
append
(
rule
)
elif
rule
[
"identifier"
]
==
"parallel-rule"
:
sorted_smirks
=
tuple
(
sorted
([
r
[
'smirks'
]
for
r
in
rule
[
'simpleRules'
]]))
parallel_rules
[
sorted_smirks
]
.
append
(
rule
)
elif
rule
[
'identifier'
]
==
"sequential-rule"
:
sorted_smirks
=
tuple
(
sorted
([
r
[
'smirks'
]
for
r
in
rule
[
'simpleRules'
]]))
sequential_rules
[
sorted_smirks
]
.
append
(
rule
)
else
:
raise
ValueError
(
"Unknown rule identifiere {}"
.
format
(
rule
[
"identifier"
]))
result
=
[]
return
result
,
lookup
@staticmethod
def
_merge_reactions
(
*
packages
):
return
[],
{}
@staticmethod
def
_merge_pathways
(
*
packages
):
result
=
list
()
for
package
in
packages
:
result
.
extend
(
package
[
"pathways"
])
return
result
,
{}
@staticmethod
def
_merge_scenarios
(
*
packages
):
result
=
list
()
for
package
in
packages
:
result
.
extend
(
package
[
"scenarios"
])
return
result
,
{}
@staticmethod
def
_fix_links
(
package
,
lookup
):
return
package
Event Timeline
Log In to Comment