Skip to content

Commit

Permalink
removed edge merging during normalization, improved edge inversion
Browse files Browse the repository at this point in the history
  • Loading branch information
EvanDietzMorris committed Dec 19, 2024
1 parent 2ee8804 commit fe9c544
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 23 deletions.
44 changes: 23 additions & 21 deletions Common/kgx_file_normalizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,9 @@ def normalize_edge_file(self):
knowledge_sources = set()

try:
with jsonlines.open(self.source_edges_file_path) as source_json_reader:
with jsonlines.open(self.source_edges_file_path) as source_json_reader, \
open(self.edges_output_file_path, 'w') as edges_out:

for edges_subset in chunk_iterator(source_json_reader, EDGE_NORMALIZATION_BATCH_SIZE):

number_of_source_edges += len(edges_subset)
Expand All @@ -258,7 +260,6 @@ def normalize_edge_file(self):
self.logger.error(
f'Edge normalization service failed to return results for {edge_norm_failures}')

normalized_edges = []
for edge in edges_subset:
normalized_subject_ids = None
normalized_object_ids = None
Expand Down Expand Up @@ -325,21 +326,21 @@ def normalize_edge_file(self):
normalized_edge.update(normalized_edge_properties)

# if normalization switched the direction of the predicate, swap the nodes
normalized_edge[SUBJECT_ID] = norm_subject_id
normalized_edge[OBJECT_ID] = norm_object_id

if edge_inverted_by_normalization:
normalized_edge[OBJECT_ID] = norm_subject_id
normalized_edge[SUBJECT_ID] = norm_object_id
else:
normalized_edge[SUBJECT_ID] = norm_subject_id
normalized_edge[OBJECT_ID] = norm_object_id
normalized_edge = invert_edge(normalized_edge)

normalized_edges.append(normalized_edge)
edges_out.write(normalized_edge)
normalized_edge_count += 1

# this counter tracks the number of new edges created from each individual edge in the original file
# this could happen due to rare cases of normalization splits where one node normalizes to many
# this counter tracks the number of new edges created from each individual edge in the
# original file this could happen due to rare cases of normalization splits where one
# node normalizes to many
if edge_count > 1:
edge_splits += edge_count - 1

graph_merger.merge_edges(normalized_edges)
self.logger.info(f'Processed {number_of_source_edges} edges so far...')

except OSError as e:
Expand All @@ -356,16 +357,6 @@ def normalize_edge_file(self):
elif infores_status == INFORES_STATUS_INVALID:
invalid_infores_ids.append(knowledge_source)

try:
self.logger.debug(f'Writing normalized edges to file...')
with open(self.edges_output_file_path, 'w') as edges_out:
for edge_line in graph_merger.get_merged_edges_jsonl():
edges_out.write(edge_line)
normalized_edge_count += 1
except OSError as e:
norm_error_msg = f'Error writing edges file {self.edges_output_file_path}'
raise NormalizationFailedError(error_message=norm_error_msg, actual_error=e)

try:
self.logger.debug(f'Writing predicate map to file...')
edge_norm_json = {}
Expand Down Expand Up @@ -399,6 +390,17 @@ def normalize_edge_file(self):
self.logger.warning(f'Normalization found invalid infores identifiers: {invalid_infores_ids}')


def invert_edge(edge):
inverted_edge = {}
for key, value in edge.items():
if key.startswith(SUBJECT_ID):
inverted_edge[key.replace(SUBJECT_ID, OBJECT_ID)] = value
elif key.startswith(OBJECT_ID):
inverted_edge[key.replace(OBJECT_ID, SUBJECT_ID)] = value
else:
inverted_edge[key] = value
return inverted_edge


"""
Given a nodes file and an edges file, remove all of the nodes from the nodes file that aren't attached to edges.
Expand Down
28 changes: 26 additions & 2 deletions tests/test_normalization.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import pytest
from Common.biolink_constants import NAMED_THING, GENE, SEQUENCE_VARIANT, INFORMATION_CONTENT, NODE_TYPES
from Common.biolink_constants import *
from Common.normalization import NodeNormalizer, EdgeNormalizer, EdgeNormalizationResult, \
FALLBACK_EDGE_PREDICATE, CUSTOM_NODE_TYPES
from Common.kgx_file_normalizer import invert_edge

INVALID_NODE_TYPE = "testing:Type1"

Expand Down Expand Up @@ -138,7 +139,8 @@ def test_edge_normalization():
edge_list = [{'predicate': 'SEMMEDDB:CAUSES'},
{'predicate': 'RO:0000052'},
{'predicate': 'RO:0002200'},
{'predicate': 'BADPREFIX:123456'}]
{'predicate': 'BADPREFIX:123456'},
{'predicate': 'biolink:affected_by'}]
edge_normalizer = EdgeNormalizer()
edge_normalizer.normalize_edge_data(edge_list)

Expand All @@ -151,3 +153,25 @@ def test_edge_normalization():

edge_norm_result: EdgeNormalizationResult = edge_normalizer.edge_normalization_lookup['BADPREFIX:123456']
assert edge_norm_result.predicate == FALLBACK_EDGE_PREDICATE

edge_norm_result: EdgeNormalizationResult = edge_normalizer.edge_normalization_lookup['biolink:affected_by']
assert edge_norm_result.predicate == 'biolink:affects'
assert edge_norm_result.inverted is True


def test_edge_inversion():
edge_1 = {
SUBJECT_ID: 'hgnc:1',
OBJECT_ID: 'hgnc:2',
SUBJECT_ASPECT_QUALIFIER: 'some_aspect',
SUBJECT_DIRECTION_QUALIFIER: 'up',
f'{OBJECT_ID}_fake_qualifier': 'test_value'
}
inverted_edge = invert_edge(edge_1)
assert inverted_edge == {
OBJECT_ID: 'hgnc:1',
SUBJECT_ID: 'hgnc:2',
OBJECT_ASPECT_QUALIFIER: 'some_aspect',
OBJECT_DIRECTION_QUALIFIER: 'up',
f'{SUBJECT_ID}_fake_qualifier': 'test_value'
}

0 comments on commit fe9c544

Please sign in to comment.