diff --git a/Common/kgx_file_normalizer.py b/Common/kgx_file_normalizer.py index cdd97c9e..d0354c37 100644 --- a/Common/kgx_file_normalizer.py +++ b/Common/kgx_file_normalizer.py @@ -246,7 +246,9 @@ def normalize_edge_file(self): knowledge_sources = set() try: - with jsonlines.open(self.source_edges_file_path) as source_json_reader: + with jsonlines.open(self.source_edges_file_path) as source_json_reader, \ + open(self.edges_output_file_path, 'w') as edges_out: + for edges_subset in chunk_iterator(source_json_reader, EDGE_NORMALIZATION_BATCH_SIZE): number_of_source_edges += len(edges_subset) @@ -258,7 +260,6 @@ def normalize_edge_file(self): self.logger.error( f'Edge normalization service failed to return results for {edge_norm_failures}') - normalized_edges = [] for edge in edges_subset: normalized_subject_ids = None normalized_object_ids = None @@ -325,21 +326,21 @@ def normalize_edge_file(self): normalized_edge.update(normalized_edge_properties) # if normalization switched the direction of the predicate, swap the nodes + normalized_edge[SUBJECT_ID] = norm_subject_id + normalized_edge[OBJECT_ID] = norm_object_id + if edge_inverted_by_normalization: - normalized_edge[OBJECT_ID] = norm_subject_id - normalized_edge[SUBJECT_ID] = norm_object_id - else: - normalized_edge[SUBJECT_ID] = norm_subject_id - normalized_edge[OBJECT_ID] = norm_object_id + normalized_edge = invert_edge(normalized_edge) - normalized_edges.append(normalized_edge) + edges_out.write(normalized_edge) + normalized_edge_count += 1 - # this counter tracks the number of new edges created from each individual edge in the original file - # this could happen due to rare cases of normalization splits where one node normalizes to many + # this counter tracks the number of new edges created from each individual edge in the + # original file this could happen due to rare cases of normalization splits where one + # node normalizes to many if edge_count > 1: edge_splits += edge_count - 1 - graph_merger.merge_edges(normalized_edges) self.logger.info(f'Processed {number_of_source_edges} edges so far...') except OSError as e: @@ -356,16 +357,6 @@ def normalize_edge_file(self): elif infores_status == INFORES_STATUS_INVALID: invalid_infores_ids.append(knowledge_source) - try: - self.logger.debug(f'Writing normalized edges to file...') - with open(self.edges_output_file_path, 'w') as edges_out: - for edge_line in graph_merger.get_merged_edges_jsonl(): - edges_out.write(edge_line) - normalized_edge_count += 1 - except OSError as e: - norm_error_msg = f'Error writing edges file {self.edges_output_file_path}' - raise NormalizationFailedError(error_message=norm_error_msg, actual_error=e) - try: self.logger.debug(f'Writing predicate map to file...') edge_norm_json = {} @@ -399,6 +390,17 @@ def normalize_edge_file(self): self.logger.warning(f'Normalization found invalid infores identifiers: {invalid_infores_ids}') +def invert_edge(edge): + inverted_edge = {} + for key, value in edge.items(): + if key.startswith(SUBJECT_ID): + inverted_edge[key.replace(SUBJECT_ID, OBJECT_ID)] = value + elif key.startswith(OBJECT_ID): + inverted_edge[key.replace(OBJECT_ID, SUBJECT_ID)] = value + else: + inverted_edge[key] = value + return inverted_edge + """ Given a nodes file and an edges file, remove all of the nodes from the nodes file that aren't attached to edges. diff --git a/tests/test_normalization.py b/tests/test_normalization.py index 0fe347b6..f466977d 100644 --- a/tests/test_normalization.py +++ b/tests/test_normalization.py @@ -1,7 +1,8 @@ import pytest -from Common.biolink_constants import NAMED_THING, GENE, SEQUENCE_VARIANT, INFORMATION_CONTENT, NODE_TYPES +from Common.biolink_constants import * from Common.normalization import NodeNormalizer, EdgeNormalizer, EdgeNormalizationResult, \ FALLBACK_EDGE_PREDICATE, CUSTOM_NODE_TYPES +from Common.kgx_file_normalizer import invert_edge INVALID_NODE_TYPE = "testing:Type1" @@ -138,7 +139,8 @@ def test_edge_normalization(): edge_list = [{'predicate': 'SEMMEDDB:CAUSES'}, {'predicate': 'RO:0000052'}, {'predicate': 'RO:0002200'}, - {'predicate': 'BADPREFIX:123456'}] + {'predicate': 'BADPREFIX:123456'}, + {'predicate': 'biolink:affected_by'}] edge_normalizer = EdgeNormalizer() edge_normalizer.normalize_edge_data(edge_list) @@ -151,3 +153,25 @@ def test_edge_normalization(): edge_norm_result: EdgeNormalizationResult = edge_normalizer.edge_normalization_lookup['BADPREFIX:123456'] assert edge_norm_result.predicate == FALLBACK_EDGE_PREDICATE + + edge_norm_result: EdgeNormalizationResult = edge_normalizer.edge_normalization_lookup['biolink:affected_by'] + assert edge_norm_result.predicate == 'biolink:affects' + assert edge_norm_result.inverted is True + + +def test_edge_inversion(): + edge_1 = { + SUBJECT_ID: 'hgnc:1', + OBJECT_ID: 'hgnc:2', + SUBJECT_ASPECT_QUALIFIER: 'some_aspect', + SUBJECT_DIRECTION_QUALIFIER: 'up', + f'{OBJECT_ID}_fake_qualifier': 'test_value' + } + inverted_edge = invert_edge(edge_1) + assert inverted_edge == { + OBJECT_ID: 'hgnc:1', + SUBJECT_ID: 'hgnc:2', + OBJECT_ASPECT_QUALIFIER: 'some_aspect', + OBJECT_DIRECTION_QUALIFIER: 'up', + f'{SUBJECT_ID}_fake_qualifier': 'test_value' + }