diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index a78d4833..eb959fe5 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -13,27 +13,27 @@ jobs: runs-on: ubuntu-latest steps: - name: Check out the repo - uses: actions/checkout@v2 + uses: actions/checkout@v4 - name: Get the version id: get_version run: echo ::set-output name=VERSION::${GITHUB_REF/refs\/tags\//} - - name: Extract metadata (tags, labels) for Docker - id: meta - uses: docker/metadata-action@98669ae865ea3cffbcbaa878cf57c20bbf1c6c38 - with: - images: - ghcr.io/${{ github.repository }} - name: Login to ghcr - uses: docker/login-action@v1 + uses: docker/login-action@f4ef78c080cd8ba55a85445d5b36e214a81df20a with: registry: ${{ env.REGISTRY }} username: ${{ github.actor }} password: ${{ secrets.GITHUB_TOKEN }} + - name: Extract metadata (tags, labels) for Docker + id: meta + uses: docker/metadata-action@9ec57ed1fcdbf14dcef7dfbe97b2010124a938b7 + with: + images: + ghcr.io/${{ github.repository }} - name: Push to GitHub Packages - uses: docker/build-push-action@ad44023a93711e3deb337508980b4b5e9bcdc5dc + uses: docker/build-push-action@3b5e8027fcad23fda98b2e3ac259d8d67585f671 with: context: . push: true tags: ${{ steps.meta.outputs.tags }} labels: ${{ steps.meta.outputs.labels }} - build-args: VERSION=${{ steps.get_version.outputs.VERSION }} \ No newline at end of file + build-args: VERSION=${{ steps.get_version.outputs.VERSION }} diff --git a/Common/biolink_constants.py b/Common/biolink_constants.py index 80b12438..15bd0a32 100644 --- a/Common/biolink_constants.py +++ b/Common/biolink_constants.py @@ -48,10 +48,13 @@ PREDICATE = 'predicate' PRIMARY_KNOWLEDGE_SOURCE = 'primary_knowledge_source' AGGREGATOR_KNOWLEDGE_SOURCES = 'aggregator_knowledge_source' +SUPPORTING_DATA_SOURCE = 'supporting_data_source' P_VALUE = 'p_value' ADJUSTED_P_VALUE = 'adjusted_p_value' AGENT_TYPE = 'agent_type' KNOWLEDGE_LEVEL = 'knowledge_level' +MAX_RESEARCH_PHASE = 'max_research_phase' +HAS_SUPPORTING_STUDY_RESULT = 'has_supporting_study_result' # enums for knowledge level KNOWLEDGE_ASSERTION = 'knowledge_assertion' @@ -137,6 +140,7 @@ PREDICATE, PRIMARY_KNOWLEDGE_SOURCE, AGGREGATOR_KNOWLEDGE_SOURCES, + SUPPORTING_DATA_SOURCE, PUBLICATIONS, SYNONYMS, DESCRIPTION, @@ -147,6 +151,8 @@ FDA_APPROVAL_STATUS, KNOWLEDGE_LEVEL, MECHANISM_OF_ACTION, + MAX_RESEARCH_PHASE, + HAS_SUPPORTING_STUDY_RESULT, # qualifiers ANATOMICAL_CONTEXT_QUALIFIER, CAUSAL_MECHANISM_QUALIFIER, diff --git a/Common/build_manager.py b/Common/build_manager.py index 059284f8..047942cb 100644 --- a/Common/build_manager.py +++ b/Common/build_manager.py @@ -12,8 +12,8 @@ from Common.load_manager import SourceDataManager from Common.kgx_file_merger import KGXFileMerger from Common.neo4j_tools import create_neo4j_dump -from Common.kgxmodel import GraphSpec, SubGraphSource, DataSource, NormalizationScheme -from Common.normalization import NORMALIZATION_CODE_VERSION +from Common.kgxmodel import GraphSpec, SubGraphSource, DataSource +from Common.normalization import NORMALIZATION_CODE_VERSION, NormalizationScheme from Common.metadata import Metadata, GraphMetadata, SourceMetadata from Common.supplementation import SequenceVariantSupplementation from Common.biolink_constants import PRIMARY_KNOWLEDGE_SOURCE, AGGREGATOR_KNOWLEDGE_SOURCES, PREDICATE, PUBLICATIONS diff --git a/Common/data_sources.py b/Common/data_sources.py index b105a82a..082cf923 100644 --- a/Common/data_sources.py +++ b/Common/data_sources.py @@ -4,6 +4,7 @@ BINDING_DB = 'BINDING-DB' CAM_KP = 'CAM-KP' CHEBI_PROPERTIES = 'CHEBIProps' +CLINICAL_TRIALS_KP = 'ClinicalTrialsKP' CORD19 = 'Cord19' CTD = 'CTD' DRUG_CENTRAL = 'DrugCentral' @@ -51,6 +52,7 @@ BINDING_DB: ("parsers.BINDING.src.loadBINDINGDB", "BINDINGDBLoader"), CAM_KP: ("parsers.camkp.src.loadCAMKP", "CAMKPLoader"), CHEBI_PROPERTIES: ("parsers.chebi.src.loadChebiProperties", "ChebiPropertiesLoader"), + CLINICAL_TRIALS_KP: ("parsers.clinicaltrials.src.loadCTKP", "CTKPLoader"), CORD19: ("parsers.cord19.src.loadCord19", "Cord19Loader"), CTD: ("parsers.CTD.src.loadCTD", "CTDLoader"), DRUG_CENTRAL: ("parsers.drugcentral.src.loaddrugcentral", "DrugCentralLoader"), diff --git a/Common/kgx_file_converter.py b/Common/kgx_file_converter.py index 304c3b1b..e2f8d4bf 100644 --- a/Common/kgx_file_converter.py +++ b/Common/kgx_file_converter.py @@ -94,7 +94,7 @@ def __determine_properties_and_types(file_path: str, required_properties: dict): for key, value in entity.items(): if value is None: property_type_counts[key]["None"] += 1 - if key in required_properties: + if key in required_properties and key != "name": print(f'WARNING: Required property ({key}) was None: {entity.items()}') raise Exception( f'None found as a value for a required property (property: {key}) in line {entity.items()}') @@ -134,7 +134,7 @@ def __determine_properties_and_types(file_path: str, required_properties: dict): # if 'None' in prop_types: # print(f'WARNING: None found as a value for property {prop}') - if prop in required_properties and (num_prop_types > 1): + if prop in required_properties and (num_prop_types > 1) and prop != "name": # TODO this should just enforce that required properties are the correct type, # instead of trying to establish the type raise Exception(f'Required property {prop} had multiple conflicting types: {type_counts.items()}') @@ -192,7 +192,10 @@ def __convert_to_csv(input_file: str, for item in quick_jsonl_file_iterator(input_file): for key in list(item.keys()): if item[key] is None: - del item[key] + if key == "name": + item["name"] = item["id"] + else: + del item[key] else: prop_type = properties[key] # convert lists into strings with an array delimiter diff --git a/Common/kgx_file_merger.py b/Common/kgx_file_merger.py index 9ceb0105..b6d54159 100644 --- a/Common/kgx_file_merger.py +++ b/Common/kgx_file_merger.py @@ -85,8 +85,10 @@ def merge_primary_sources(self, needs_on_disk_merge = False for graph_source in graph_sources: if isinstance(graph_source, SubGraphSource): - needs_on_disk_merge = True - break + for source_id in graph_source.graph_metadata.get_source_ids(): + if source_id in RESOURCE_HOGS: + needs_on_disk_merge = True + break elif graph_source.id in RESOURCE_HOGS: needs_on_disk_merge = True break diff --git a/Common/kgx_file_normalizer.py b/Common/kgx_file_normalizer.py index b0ad5457..cdd97c9e 100644 --- a/Common/kgx_file_normalizer.py +++ b/Common/kgx_file_normalizer.py @@ -5,25 +5,13 @@ from Common.biolink_utils import BiolinkInformationResources, INFORES_STATUS_INVALID, INFORES_STATUS_DEPRECATED from Common.biolink_constants import SEQUENCE_VARIANT, PRIMARY_KNOWLEDGE_SOURCE, AGGREGATOR_KNOWLEDGE_SOURCES, \ PUBLICATIONS, OBJECT_ID, SUBJECT_ID, PREDICATE, SUBCLASS_OF -from Common.normalization import NodeNormalizer, EdgeNormalizer, EdgeNormalizationResult +from Common.normalization import NormalizationScheme, NodeNormalizer, EdgeNormalizer, EdgeNormalizationResult, \ + NormalizationFailedError from Common.utils import LoggingUtil, chunk_iterator from Common.kgx_file_writer import KGXFileWriter -from Common.kgxmodel import NormalizationScheme from Common.merging import MemoryGraphMerger, DiskGraphMerger -class NormalizationBrokenError(Exception): - def __init__(self, error_message: str, actual_error: Exception=None): - self.error_message = error_message - self.actual_error = actual_error - - -class NormalizationFailedError(Exception): - def __init__(self, error_message: str, actual_error: Exception=None): - self.error_message = error_message - self.actual_error = actual_error - - EDGE_PROPERTIES_THAT_SHOULD_BE_SETS = {AGGREGATOR_KNOWLEDGE_SOURCES, PUBLICATIONS} NODE_NORMALIZATION_BATCH_SIZE = 1_000_000 EDGE_NORMALIZATION_BATCH_SIZE = 1_000_000 @@ -350,6 +338,7 @@ def normalize_edge_file(self): # this could happen due to rare cases of normalization splits where one node normalizes to many if edge_count > 1: edge_splits += edge_count - 1 + graph_merger.merge_edges(normalized_edges) self.logger.info(f'Processed {number_of_source_edges} edges so far...') diff --git a/Common/kgxmodel.py b/Common/kgxmodel.py index 6f7954c5..1813a5a7 100644 --- a/Common/kgxmodel.py +++ b/Common/kgxmodel.py @@ -1,7 +1,8 @@ from dataclasses import dataclass, InitVar from typing import Callable from Common.biolink_constants import NAMED_THING -from Common.normalization import NORMALIZATION_CODE_VERSION +from Common.metadata import GraphMetadata +from Common.normalization import NormalizationScheme class kgxnode: @@ -35,31 +36,6 @@ def __init__(self, self.properties = {} -@dataclass -class NormalizationScheme: - node_normalization_version: str = 'latest' - edge_normalization_version: str = 'latest' - normalization_code_version: str = NORMALIZATION_CODE_VERSION - strict: bool = True - conflation: bool = False - - def get_composite_normalization_version(self): - composite_normalization_version = f'{self.node_normalization_version}_' \ - f'{self.edge_normalization_version}_{self.normalization_code_version}' - if self.conflation: - composite_normalization_version += '_conflated' - if self.strict: - composite_normalization_version += '_strict' - return composite_normalization_version - - def get_metadata_representation(self): - return {'node_normalization_version': self.node_normalization_version, - 'edge_normalization_version': self.edge_normalization_version, - 'normalization_code_version': self.normalization_code_version, - 'conflation': self.conflation, - 'strict': self.strict} - - @dataclass class GraphSpec: graph_id: str @@ -93,13 +69,13 @@ class GraphSource: @dataclass class SubGraphSource(GraphSource): - graph_metadata: dict = None + graph_metadata: GraphMetadata = None def get_metadata_representation(self): return {'graph_id': self.id, 'release_version': self.version, 'merge_strategy:': self.merge_strategy, - 'graph_metadata': self.graph_metadata} + 'graph_metadata': self.graph_metadata.metadata if self.graph_metadata else None} @dataclass diff --git a/Common/load_manager.py b/Common/load_manager.py index 323dfe53..9ba5aa44 100644 --- a/Common/load_manager.py +++ b/Common/load_manager.py @@ -5,9 +5,8 @@ from Common.data_sources import SourceDataLoaderClassFactory, RESOURCE_HOGS, get_available_data_sources from Common.utils import LoggingUtil, GetDataPullError -from Common.kgx_file_normalizer import KGXFileNormalizer, NormalizationBrokenError, NormalizationFailedError -from Common.kgxmodel import NormalizationScheme -from Common.normalization import NodeNormalizer, EdgeNormalizer +from Common.kgx_file_normalizer import KGXFileNormalizer +from Common.normalization import NormalizationScheme, NodeNormalizer, EdgeNormalizer, NormalizationFailedError from Common.metadata import SourceMetadata from Common.loader_interface import SourceDataBrokenError, SourceDataFailedError from Common.supplementation import SequenceVariantSupplementation, SupplementationFailedError @@ -356,17 +355,6 @@ def normalize_source(self, normalization_status=SourceMetadata.STABLE, normalization_info=normalization_info) return True - except NormalizationBrokenError as broken_error: - error_message = f"{source_id} NormalizationBrokenError: {broken_error.error_message}" - if broken_error.actual_error: - error_message += f" - {broken_error.actual_error}" - self.logger.error(error_message) - source_metadata.update_normalization_metadata(parsing_version, - composite_normalization_version, - normalization_status=SourceMetadata.BROKEN, - normalization_error=error_message, - normalization_time=current_time) - return False except NormalizationFailedError as failed_error: error_message = f"{source_id} NormalizationFailedError: {failed_error.error_message}" if failed_error.actual_error: diff --git a/Common/merging.py b/Common/merging.py index d1b01d85..ce617f0f 100644 --- a/Common/merging.py +++ b/Common/merging.py @@ -19,17 +19,34 @@ def edge_key_function(edge): def entity_merging_function(entity_1, entity_2, properties_that_are_sets): - for key, value in entity_2.items(): - # TODO - make sure this is the behavior we want - - # for properties that are lists append the values - # otherwise keep the first one - if key in entity_1: - if isinstance(value, list): - entity_1[key].extend(value) - if key in properties_that_are_sets: - entity_1[key] = list(set(entity_1[key])) + # for every property of entity 2 + for key, entity_2_value in entity_2.items(): + # if entity 1 also has the property and entity_2_value is not null/empty: + # concatenate values if one is a list, otherwise ignore the property from entity 2 + if (key in entity_1) and entity_2_value: + entity_1_value = entity_1[key] + entity_1_is_list = isinstance(entity_1_value, list) + entity_2_is_list = isinstance(entity_2_value, list) + if entity_1_is_list and entity_2_is_list: + # if they're both lists just combine them + entity_1_value.extend(entity_2_value) + elif entity_1_is_list: + # if 1 is a list and 2 isn't, append the value of 2 to the list from 1 + entity_1_value.append(entity_2_value) + elif entity_2_is_list: + if entity_1_value: + # if 2 is a list and 1 has a value, add the value of 1 to the list from 2 + entity_1[key] = [entity_1_value] + entity_2_value + else: + # if 2 is a list and 1 doesn't have a value, just use the list from 2 + entity_1[key] = entity_2_value + # else: + # if neither is a list, do nothing (keep the value from 1) + if (entity_1_is_list or entity_2_is_list) and (key in properties_that_are_sets): + entity_1[key] = list(set(entity_1[key])) else: - entity_1[key] = value + # if entity 1 doesn't have the property, add the property from entity 2 + entity_1[key] = entity_2_value return entity_1 diff --git a/Common/metadata.py b/Common/metadata.py index db9b6d92..98070eb5 100644 --- a/Common/metadata.py +++ b/Common/metadata.py @@ -3,7 +3,7 @@ import json from xxhash import xxh64_hexdigest -from Common.kgxmodel import NormalizationScheme +from Common.normalization import NormalizationScheme class Metadata: @@ -124,6 +124,9 @@ def get_build_status(self): def get_graph_version(self): return self.metadata['graph_version'] + def get_source_ids(self): + return [source['source_id'] for source in self.metadata['sources']] + class SourceMetadata(Metadata): diff --git a/Common/neo4j_tools.py b/Common/neo4j_tools.py index 889db44b..0b3b69e6 100644 --- a/Common/neo4j_tools.py +++ b/Common/neo4j_tools.py @@ -37,11 +37,12 @@ def import_csv_files(self, return password_exit_code self.logger.info(f'Importing csv files to neo4j...') - neo4j_import_cmd = ["neo4j-admin", "import", f"--nodes={csv_nodes_filename}", - f"--relationships={csv_edges_filename}", + neo4j_import_cmd = ['neo4j-admin', 'database', 'import', 'full', + f'--nodes={csv_nodes_filename}', + f'--relationships={csv_edges_filename}', '--delimiter=TAB', '--array-delimiter=U+001F', - '--force'] + '--overwrite-destination=true'] import_results: subprocess.CompletedProcess = subprocess.run(neo4j_import_cmd, cwd=graph_directory, capture_output=True) @@ -60,7 +61,7 @@ def load_backup_dump(self, return password_exit_code self.logger.info(f'Loading a neo4j backup dump {dump_file_path}...') - neo4j_load_cmd = ['neo4j-admin', 'load', f'--from={dump_file_path}', '--force'] + neo4j_load_cmd = ['neo4j-admin', 'database', 'load', f'--from-path={dump_file_path}', '--overwrite-destination=true', 'neo4j'] load_results: subprocess.CompletedProcess = subprocess.run(neo4j_load_cmd, capture_output=True) self.logger.info(load_results.stdout) @@ -71,10 +72,23 @@ def load_backup_dump(self, self.logger.error(error_message) return load_results_return_code + def migrate_dump_to_neo4j_5(self): + self.logger.info(f'Migrating db dump to neo4j 5...') + neo4j_migrate_cmd = ['neo4j-admin', 'database', 'migrate', '--force-btree-indexes-to-range', 'neo4j'] + migrate_results: subprocess.CompletedProcess = subprocess.run(neo4j_migrate_cmd, + capture_output=True) + self.logger.info(migrate_results.stdout) + results_return_code = migrate_results.returncode + if results_return_code != 0: + error_message = f'Neo4j migrate subprocess error (ExitCode {results_return_code}): ' \ + f'{migrate_results.stderr.decode("UTF-8")}' + self.logger.error(error_message) + return results_return_code + def create_backup_dump(self, - dump_file_path: str = None): + dump_directory: str = None): self.logger.info(f'Creating a backup dump of the neo4j...') - neo4j_dump_cmd = ['neo4j-admin', 'dump', f'--to={dump_file_path}'] + neo4j_dump_cmd = ['neo4j-admin', 'database', 'dump', 'neo4j', f'--to-path={dump_directory}'] dump_results: subprocess.CompletedProcess = subprocess.run(neo4j_dump_cmd, capture_output=True) self.logger.info(dump_results.stdout) @@ -107,7 +121,7 @@ def __issue_neo4j_command(self, command: str): def set_initial_password(self): self.logger.info('Setting initial password for Neo4j...') - neo4j_cmd = ['neo4j-admin', 'set-initial-password', self.password] + neo4j_cmd = ['neo4j-admin', 'dbms', 'set-initial-password', self.password] neo4j_results: subprocess.CompletedProcess = subprocess.run(neo4j_cmd, capture_output=True) self.logger.info(neo4j_results.stdout) @@ -139,7 +153,7 @@ def add_db_indexes(self): with self.neo4j_driver.session() as session: # node name index - node_name_index_cypher = f'CREATE INDEX node_name_index FOR (n:`{NAMED_THING}`) on (n.name)' + node_name_index_cypher = f'CREATE INDEX node_name_index FOR (n:`{NAMED_THING}`) ON (n.name)' self.logger.info(f'Adding node name index on {NAMED_THING}.name') session.run(node_name_index_cypher).consume() indexes_added += 1 @@ -151,8 +165,8 @@ def add_db_indexes(self): self.logger.info(f'Adding node id indexes for node labels: {node_labels}') for node_label in node_labels: node_label_index = f'node_id_{node_label.replace(":", "_")}' - node_name_index_cypher = f'CREATE CONSTRAINT {node_label_index} ON (n:`{node_label}`) ' \ - f'ASSERT n.id IS UNIQUE' + node_name_index_cypher = f'CREATE CONSTRAINT {node_label_index} FOR (n:`{node_label}`) ' \ + f'REQUIRE n.id IS UNIQUE' session.run(node_name_index_cypher).consume() indexes_added += 1 index_names.append(node_label_index) @@ -227,7 +241,11 @@ def create_neo4j_dump(nodes_filepath: str, edges_output_file=csv_edges_file_path) if logger: logger.info(f'CSV files created for {graph_id}({graph_version})...') - graph_dump_name = f'graph_{graph_version}.db.dump' if graph_version else 'graph.db.dump' + + # would like to do the following, but apparently you can't specify a custom name for the dump now + # graph_dump_name = f'graph_{graph_version}.neo4j5.db.dump' if graph_version else 'graph.neo4j5.db.dump' + # graph_dump_file_path = os.path.join(output_directory, graph_dump_name) + graph_dump_name = 'neo4j.dump' graph_dump_file_path = os.path.join(output_directory, graph_dump_name) if os.path.exists(graph_dump_file_path): if logger: @@ -258,7 +276,7 @@ def create_neo4j_dump(nodes_filepath: str, if stop_exit_code != 0: return False - dump_exit_code = neo4j_access.create_backup_dump(graph_dump_file_path) + dump_exit_code = neo4j_access.create_backup_dump(output_directory) if dump_exit_code != 0: return False diff --git a/Common/normalization.py b/Common/normalization.py index fd43d151..39150eeb 100644 --- a/Common/normalization.py +++ b/Common/normalization.py @@ -3,6 +3,9 @@ import requests import time +from requests.adapters import HTTPAdapter, Retry +from dataclasses import dataclass + from robokop_genetics.genetics_normalization import GeneticsNormalizer from Common.biolink_constants import * from Common.utils import LoggingUtil @@ -15,6 +18,36 @@ # predicate to use when normalization fails FALLBACK_EDGE_PREDICATE = 'biolink:related_to' +@dataclass +class NormalizationScheme: + node_normalization_version: str = 'latest' + edge_normalization_version: str = 'latest' + normalization_code_version: str = NORMALIZATION_CODE_VERSION + strict: bool = True + conflation: bool = False + + def get_composite_normalization_version(self): + composite_normalization_version = f'{self.node_normalization_version}_' \ + f'{self.edge_normalization_version}_{self.normalization_code_version}' + if self.conflation: + composite_normalization_version += '_conflated' + if self.strict: + composite_normalization_version += '_strict' + return composite_normalization_version + + def get_metadata_representation(self): + return {'node_normalization_version': self.node_normalization_version, + 'edge_normalization_version': self.edge_normalization_version, + 'normalization_code_version': self.normalization_code_version, + 'conflation': self.conflation, + 'strict': self.strict} + + +class NormalizationFailedError(Exception): + def __init__(self, error_message: str, actual_error: Exception = None): + self.error_message = error_message + self.actual_error = actual_error + class NodeNormalizer: """ @@ -70,109 +103,95 @@ def __init__(self, self.sequence_variant_normalizer = None self.variant_node_types = None - def hit_node_norm_service(self, curies, retries=0): - resp: requests.models.Response = requests.post(f'{self.node_norm_endpoint}get_normalized_nodes', - json={'curies': curies, - 'conflate': self.conflate_node_types, - 'drug_chemical_conflate': self.conflate_node_types, - 'description': True}) + self.requests_session = self.get_normalization_requests_session() + + def hit_node_norm_service(self, curies): + resp = self.requests_session.post(f'{self.node_norm_endpoint}get_normalized_nodes', + json={'curies': curies, + 'conflate': self.conflate_node_types, + 'drug_chemical_conflate': self.conflate_node_types, + 'description': True}) if resp.status_code == 200: # if successful return the json as an object - return resp.json() - else: - error_message = f'Node norm response code: {resp.status_code}' - if resp.status_code >= 500: - # if 5xx retry 3 times - retries += 1 - if retries == 4: - error_message += ', retried 3 times, giving up..' - self.logger.error(error_message) - resp.raise_for_status() - else: - error_message += f', retrying.. (attempt {retries})' - time.sleep(retries * 3) - self.logger.error(error_message) - return self.hit_node_norm_service(curies, retries) + response_json = resp.json() + if response_json: + return response_json else: - # we should never get a legitimate 4xx response from node norm, - # crash with an error for troubleshooting - if resp.status_code == 422: - error_message += f'(curies: {curies})' - self.logger.error(error_message) - resp.raise_for_status() + error_message = f"Node Normalization service {self.node_norm_endpoint} returned 200 " \ + f"but with an empty result for (curies: {curies})" + raise NormalizationFailedError(error_message=error_message) + else: + error_message = f'Node norm response code: {resp.status_code} (curies: {curies})' + self.logger.error(error_message) + resp.raise_for_status() - def normalize_node_data(self, node_list: list, block_size: int = 1000) -> list: + def normalize_node_data(self, node_list: list, batch_size: int = 1000) -> list: """ - This method calls the NodeNormalization web service to get the normalized identifier and name of the node. - the data comes in as a node list. + This method calls the NodeNormalization web service and normalizes a list of nodes. - :param node_list: A list with items to normalize - :param block_size: the number of curies in the request + :param node_list: A list of unique nodes to normalize + :param batch_size: the number of curies to be sent to NodeNormalization at once :return: """ - self.logger.debug(f'Start of normalize_node_data. items: {len(node_list)}') - - # init the cache - this accumulates all the results from the node norm service - cached_node_norms: dict = {} + # look up all valid biolink node types if needed + # this is used when strict normalization is off to ensure only valid types go into the graph as NODE_TYPES + if not self.strict_normalization and not self.biolink_compliant_node_types: + biolink_lookup = EdgeNormalizer(edge_normalization_version=self.biolink_version) + self.biolink_compliant_node_types = biolink_lookup.get_valid_node_types() - # create a unique set of node ids - tmp_normalize: set = set([node['id'] for node in node_list]) + # make a list of the node ids, we used to deduplicate here, but now we expect the list to be unique ids + to_normalize: list = [node['id'] for node in node_list] - # convert the set to a list so we can iterate through it - to_normalize: list = list(tmp_normalize) - - # init the array index lower boundary + # use indexes and slice to grab batch_size sized chunks of ids from the list start_index: int = 0 - - # get the last index of the list last_index: int = len(to_normalize) - - self.logger.debug(f'{last_index} unique nodes found in this group.') - - # grab chunks of the data frame + chunks_of_ids = [] while True: if start_index < last_index: - # define the end index of the slice - end_index: int = start_index + block_size + end_index: int = start_index + batch_size - # force the end index to be the last index to insure no overflow + # force the end index to be no greater than the last index to ensure no overflow if end_index >= last_index: end_index = last_index - self.logger.debug(f'Working block {start_index} to {end_index}.') - - # collect a slice of records from the data frame - data_chunk: list = to_normalize[start_index: end_index] - - # hit the node norm api - normalization_json = self.hit_node_norm_service(curies=data_chunk) - if normalization_json: - # merge the normalization results with what we have gotten so far - cached_node_norms.update(**normalization_json) - else: - # this shouldn't happen but if the API returns an empty dict instead of nulls, - # assume none of the curies normalize - empty_responses = {curie: None for curie in data_chunk} - cached_node_norms.update(empty_responses) + # collect a slice of block_size curies from the full list + chunks_of_ids.append(to_normalize[start_index: end_index]) # move on down the list - start_index += block_size + start_index += batch_size else: break + # we should be able to do the following, but it's causing RemoteDisconnected errors with node norm + # + # hit the node norm api with the chunks of curies in parallel + # we could try to optimize the number of max_workers for ThreadPoolExecutor more specifically, + # by default python attempts to find a reasonable # based on os.cpu_count() + # with ThreadPoolExecutor() as executor: + # executor_results = executor.map(self.hit_node_norm_service, chunks_of_ids) + # + # normalization_results = list(executor_results) + # for normalization_json, ids in zip(normalization_results, chunks_of_ids): + # if not normalization_json: + # raise NormalizationFailedError(f'!!! Normalization json results missing for ids: {ids}') + # else: + # merge the normalization results into one dictionary + # node_normalization_results.update(**normalization_json) + + # until we can get threading working, hit node norm sequentially + node_normalization_results: dict = {} + for chunk in chunks_of_ids: + results = self.hit_node_norm_service(chunk) + node_normalization_results.update(**results) + # reset the node index node_idx = 0 # node ids that failed to normalize failed_to_normalize: list = [] - # look up valid node types if needed - if not self.strict_normalization and not self.biolink_compliant_node_types: - biolink_lookup = EdgeNormalizer(edge_normalization_version=self.biolink_version) - self.biolink_compliant_node_types = biolink_lookup.get_valid_node_types() - # for each node update the node with normalized information # store the normalized IDs in self.node_normalization_lookup for later look up while node_idx < len(node_list): @@ -217,7 +236,7 @@ def normalize_node_data(self, node_list: list, block_size: int = 1000) -> list: current_node[NODE_TYPES] = list(set(current_node[NODE_TYPES])) # did we get a response from the normalizer - current_node_normalization = cached_node_norms[current_node_id] + current_node_normalization = node_normalization_results[current_node_id] if current_node_normalization is not None: current_node_id_section = current_node_normalization['id'] @@ -341,6 +360,17 @@ def get_current_node_norm_version(self): # this shouldn't happen, raise an exception resp.raise_for_status() + @staticmethod + def get_normalization_requests_session(): + pool_maxsize = max(os.cpu_count(), 10) + s = requests.Session() + retries = Retry(total=8, + backoff_factor=1, + status_forcelist=[502, 503, 504, 403, 429]) + s.mount('https://', HTTPAdapter(max_retries=retries, pool_maxsize=pool_maxsize)) + s.mount('http://', HTTPAdapter(max_retries=retries, pool_maxsize=pool_maxsize)) + return s + class EdgeNormalizationResult: def __init__(self, @@ -398,10 +428,8 @@ def normalize_edge_data(self, """ # find the predicates that have not been normalized yet - predicates_to_normalize = set() - for edge in edge_list: - if edge[PREDICATE] not in self.edge_normalization_lookup: - predicates_to_normalize.add(edge[PREDICATE]) + predicates_to_normalize = {edge[PREDICATE] for edge in edge_list + if edge[PREDICATE] not in self.edge_normalization_lookup} # convert the set to a list so we can iterate through it predicates_to_normalize_list = list(predicates_to_normalize) diff --git a/Common/predicates.py b/Common/predicates.py index e6fd89c2..966247cf 100644 --- a/Common/predicates.py +++ b/Common/predicates.py @@ -8,43 +8,43 @@ "allosteric_antagonist": f"{DGIDB}:antagonist", "allosteric_modulator": f"{DGIDB}:modulator", "antagonist": f"{DGIDB}:antagonist", - "antibody": f"{DGIDB}:binder", - "antibody_binding": f"{DGIDB}:binder", + "antibody": f"RO:0002436", + "antibody_binding": f"RO:0002436", "antisense_inhibitor": f"{DGIDB}:inhibitor", - "app_ki": f"RO:0002434", - "app_km": f"RO:0002434", - "binding_agent": f"{DGIDB}:binder", + "app_ki": f"RO:0002434", # apparent Ki? if so change to RO:0002436 + "app_km": f"RO:0002434", # apperent Km? if so change to RO:0002436 + "binding_agent": f"RO:0002436", "blocker": f"{DGIDB}:blocker", "channel_blocker": f"{DGIDB}:channel_blocker", "ec50": f"{DGIDB}:agonist", - "ed50": f"RO:0002434", + "ed50": f"RO:0002434", # Effective Dose. Where does this predicate come from? CB (2024_07): "it makes no sense to have an ed50 between a chemical and a gene/protein" "gating_inhibitor": f"{DGIDB}:gating_inhibitor", - "gi50": f"RO:0002434", + "gi50": f"{DGIDB}:Inhibitor", # Growth Inhibitor "ic50": f"{DGIDB}:inhibitor", "inhibitor": f"{DGIDB}:inhibitor", - "interacts_with": f"RO:0002434", + "interacts_with": f"RO:0002434", # Where does this predicate come from? Possiblely needs to be modified to RO:0002436 "inverse_agonist": f"{DGIDB}:inverse_agonist", - "ka": f"RO:0002434", - "kact": f"RO:0002434", - "kb": f"{DGIDB}:binder", - "kd": f"{DGIDB}:binder", - "kd1": f"RO:0002434", + "ka": f"RO:0002436", + "kact": f"RO:0002436", # is this a miss type of kcat? + "kb": f"RO:0002436", # {DGIDB}:binder maps to biolink:binds which is depreciated + "kd": f"RO:0002436", + "kd1": f"RO:0002436", # RO:0002434 maps to biolink:related_to "ki": f"{DGIDB}:inhibitor", - "km": f"RO:0002434", - "ks": f"RO:0002434", + "km": f"RO:0002436", + "ks": f"RO:0002436", "modulator": f"{DGIDB}:modulator", - "mic": f"RO:0002434", - "mpc": f"RO:0002434", + "mic": f"RO:0002434", # What is this referring to? + "mpc": f"RO:0002434", # What is this referring to? "negative_modulator": f"{CHEMBL_MECHANISM}:negative_modulator", "negative_allosteric_modulator": f"{CHEMBL_MECHANISM}:negative_modulator", "opener": f"{CHEMBL_MECHANISM}:opener", "other": f"{DGIDB}:other", "partial_agonist": f"{DGIDB}:partial_agonist", - "pa2": f"RO:0002434", + "pa2": f"RO:0002434", # What is this referring to? "pharmacological_chaperone": f"{DGIDB}:chaperone", "positive_allosteric_modulator": f"{CHEMBL_MECHANISM}:positive_modulator", "positive_modulator": f"{CHEMBL_MECHANISM}:positive_modulator", "releasing_agent": f"{CHEMBL_MECHANISM}:releasing_agent", "substrate": f"{CHEMBL_MECHANISM}:substrate", - "xc50": f"RO:0002434" + "xc50": f"RO:0002436" # This is related to ec50 and ic50 both of which describe binding events } diff --git a/Common/supplementation.py b/Common/supplementation.py index 9665ceb9..8a27f4f1 100644 --- a/Common/supplementation.py +++ b/Common/supplementation.py @@ -8,11 +8,10 @@ from zipfile import ZipFile from collections import defaultdict from Common.biolink_constants import * -from Common.normalization import FALLBACK_EDGE_PREDICATE +from Common.normalization import FALLBACK_EDGE_PREDICATE, NormalizationScheme from Common.utils import LoggingUtil from Common.kgx_file_writer import KGXFileWriter from Common.kgx_file_normalizer import KGXFileNormalizer -from Common.kgxmodel import NormalizationScheme SNPEFF_SO_PREDICATES = { diff --git a/Common/utils.py b/Common/utils.py index cca7257a..30ef06f1 100644 --- a/Common/utils.py +++ b/Common/utils.py @@ -264,23 +264,30 @@ def get_http_file_modified_date(self, file_url: str): self.logger.error(error_message) raise GetDataPullError(error_message) - def pull_via_http(self, url: str, data_dir: str, is_gzip=False) -> int: + def pull_via_http(self, url: str, data_dir: str, is_gzip=False, saved_file_name: str = None) -> int: """ gets the file from an http stream. :param url: :param data_dir: :param is_gzip: + :param saved_file_name: :return: the number of bytes read """ - # get the filename - data_file: str = url.split('/')[-1] + # is_gzip isn't used on the main branch, but it's probably on some branches or forks, + # lets throw this for a while, so it's not mysteriously removed + if is_gzip: + raise NotImplementedError(f'is_gzip is deprecated, unzip files during parsing not retrieval!') - # init the byte counter + # get the name of the file to write + data_file: str = saved_file_name if saved_file_name else url.split('/')[-1] + + # this tracks how much, if any, of the file is downloaded + # (it's not really used anymore, it could be more simple) byte_counter: int = 0 - # get the file if its not there + # check if the file exists already if not os.path.exists(os.path.join(data_dir, data_file)): self.logger.debug(f'Retrieving {url} -> {data_dir}') @@ -288,17 +295,9 @@ def pull_via_http(self, url: str, data_dir: str, is_gzip=False) -> int: hdr = {'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; Win64; x64)'} req = request.Request(url, headers=hdr) - # get the the file data handle + # get the file data handle file_data = request.urlopen(req) - # is this a gzip file - if is_gzip: - # get a handle to the data - file_data = gzip.GzipFile(fileobj=file_data) - - # strip off the .gz if exists - data_file = data_file.replace('.gz', '') - with open(os.path.join(data_dir, data_file), 'wb') as fp: # specify the buffered data block size block = 131072 diff --git a/Dockerfile b/Dockerfile index a65ba0b6..4031f26d 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,5 +1,5 @@ # A docker container with neo4j, java and python for Data Services -FROM neo4j:4.4.10 +FROM neo4j:5.19.0-community-bullseye RUN apt-get update \ && apt-get -y install python3 \ diff --git a/README.md b/README.md index 36f24fad..56dbbe1b 100644 --- a/README.md +++ b/README.md @@ -31,12 +31,20 @@ git clone https://github.com/RobokopU24/ORION.git Next create directories where data sources, graphs, and logs will be stored. ORION_STORAGE - for storing data sources + ORION_GRAPHS - for storing knowledge graphs + ORION_LOGS - for storing logs -You can do this manually, or use the script indicated below to set up a standard configuration (Option 1 or 2). +You can do this manually, or use the script indicated below to set up a default configuration. + +Option 1: Use this script to create the directories and set the environment variables: +``` +cd ~/ORION_root/ORION/ +source ./set_up_test_env.sh +``` -Option 1: Create three directories and set environment variables specifying paths to the locations of those directories. +Option 2: Create three directories and manually set environment variables specifying paths to the locations of those directories. ``` mkdir ~/ORION_root/storage/ export ORION_STORAGE=~/ORION_root/storage/ @@ -48,12 +56,6 @@ mkdir ~/ORION_root/logs/ export ORION_LOGS=~/ORION_root/logs/ ``` -Option 2: Use this script to create the directories and set the environment variables: -``` -cd ~/ORION_root/ORION/ -source ./set_up_test_env.sh -``` - Next create or select a Graph Spec yaml file where the content of knowledge graphs to be built will be specified. Use either of the following options, but not both: @@ -91,11 +93,11 @@ docker-compose up ``` If you want to specify an individual graph you can override the default command with a graph id from your Spec. ``` -docker-compose run --rm data_services python /ORION/Common/build_manager.py Example_Graph_ID +docker-compose run --rm orion python /ORION/Common/build_manager.py Example_Graph_ID ``` To run the ORION pipeline for a single data source, you can use: ``` -docker-compose run --rm data_services python /ORION/Common/load_manager.py Example_Source +docker-compose run --rm orion python /ORION/Common/load_manager.py Example_Source ``` To see available arguments and a list of supported data sources: ``` @@ -142,5 +144,5 @@ Now you can use that source ID in a graph spec to include your new source in a g After you alter the codebase, or if you are experiencing issues or errors you may want to run tests: ``` -docker-compose run --rm data_services pytest /ORION +docker-compose run --rm orion pytest /ORION ``` \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml index 49f50611..a22dd7b7 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,7 +1,5 @@ -version: "3.7" services: orion: - platform: linux/amd64 build: context: . command: [python, /ORION/Common/build_manager.py, all] @@ -12,7 +10,6 @@ services: - ORION_GRAPH_SPEC - ORION_GRAPH_SPEC_URL - ORION_OUTPUT_URL - - ORION_NEO4J_PASSWORD - EDGE_NORMALIZATION_ENDPOINT - NODE_NORMALIZATION_ENDPOINT - NAME_RESOLVER_ENDPOINT diff --git a/graph_specs/ctkp-graph-spec.yaml b/graph_specs/ctkp-graph-spec.yaml new file mode 100644 index 00000000..cffefab6 --- /dev/null +++ b/graph_specs/ctkp-graph-spec.yaml @@ -0,0 +1,9 @@ +graphs: + + - graph_id: CTKP_Automat + graph_name: Clinical Trials KP + graph_description: 'The Clinical Trials KP, created and maintained by the Multiomics Provider, provides information on Clinical Trials, ultimately derived from researcher submissions to clinicaltrials.gov, via the Aggregate Analysis of Clinical Trials (AACT) database). Information on select trials includes the NCT Identifier of the trial, interventions used, diseases/conditions relevant to the trial, adverse events, etc.' + graph_url: https://github.com/NCATSTranslator/Translator-All/wiki/Clinical-Trials-KP + output_format: neo4j + sources: + - source_id: ClinicalTrialsKP \ No newline at end of file diff --git a/helm/orion/renci-values.yaml b/helm/orion/renci-values.yaml index 1b4a2e6c..4a81cd9d 100644 --- a/helm/orion/renci-values.yaml +++ b/helm/orion/renci-values.yaml @@ -46,8 +46,8 @@ orion: normalization: nodeNormEndpoint: https://nodenormalization-sri.renci.org/ edgeNormEndpoint: https://bl-lookup-sri.renci.org/ - bl_version: 4.2.0 - outputURL: https://stars.renci.org/var/plater/bl-4.2.0/ + bl_version: 4.2.1 + outputURL: https://stars.renci.org/var/plater/bl-4.2.1/ pharos: host: pod-host-or-ip diff --git a/parsers/BINDING/src/loadBINDINGDB.py b/parsers/BINDING/src/loadBINDINGDB.py index e587425e..9b2b0db1 100644 --- a/parsers/BINDING/src/loadBINDINGDB.py +++ b/parsers/BINDING/src/loadBINDINGDB.py @@ -2,12 +2,13 @@ import enum import math import json +import requests + from zipfile import ZipFile -import requests as rq -import requests.exceptions +from requests.adapters import HTTPAdapter, Retry from parsers.BINDING.src.bindingdb_constraints import LOG_SCALE_AFFINITY_THRESHOLD #Change the binding affinity threshold here. Default is 10 uM Ki,Kd,EC50,orIC50 -from Common.utils import GetData +from Common.utils import GetData, GetDataPullError from Common.loader_interface import SourceDataLoader from Common.extractor import Extractor from Common.biolink_constants import PUBLICATIONS, AFFINITY, AFFINITY_PARAMETER, KNOWLEDGE_LEVEL, AGENT_TYPE, KNOWLEDGE_ASSERTION, MANUAL_AGENT @@ -52,7 +53,7 @@ class BINDINGDBLoader(SourceDataLoader): source_data_url = "https://www.bindingdb.org/rwd/bind/chemsearch/marvin/SDFdownload.jsp?all_download=yes" license = "All data and download files in bindingDB are freely available under a 'Creative Commons BY 3.0' license.'" attribution = 'https://www.bindingdb.org/rwd/bind/info.jsp' - parsing_version = '1.5' + parsing_version = '1.6' def __init__(self, test_mode: bool = False, source_data_dir: str = None): """ @@ -66,21 +67,21 @@ def __init__(self, test_mode: bool = False, source_data_dir: str = None): self.affinity_threshold = LOG_SCALE_AFFINITY_THRESHOLD self.measure_to_predicate = { - "pKi": "biolink:binds", + "pKi": "{DGIDB}:inhibitor", #inhibition constant "pIC50": "CTD:decreases_activity_of", - "pKd": "biolink:binds", + "pKd": "RO:0002436", "pEC50": "CTD:increases_activity_of", - "k_on": "biolink:binds", - "k_off": "biolink:binds" + "k_on": "RO:0002436", + "k_off": "RO:0002436" } self.bindingdb_version = None self.bindingdb_version = self.get_latest_source_version() self.bindingdb_data_url = f"https://www.bindingdb.org/bind/downloads/" - self.BD_archive_file_name = f"BindingDB_All_{self.bindingdb_version}_tsv.zip" - self.BD_file_name = f"BindingDB_All_{self.bindingdb_version}.tsv" - self.data_files = [self.BD_archive_file_name] + self.bd_archive_file_name = f"BindingDB_All_{self.bindingdb_version}_tsv.zip" + self.bd_file_name = f"BindingDB_All.tsv" + self.data_files = [self.bd_archive_file_name] def get_latest_source_version(self) -> str: """ @@ -90,26 +91,35 @@ def get_latest_source_version(self) -> str: if self.bindingdb_version: return self.bindingdb_version try: + s = requests.Session() + retries = Retry(total=5, + backoff_factor=2) + s.mount('https://', HTTPAdapter(max_retries=retries)) + ### The method below gets the database version from the html, but this may be subject to change. ### - binding_db_download_page_response = rq.get('https://www.bindingdb.org/rwd/bind/chemsearch/marvin/Download.jsp',) + binding_db_download_page_response = requests.get('https://www.bindingdb.org/rwd/bind/chemsearch/marvin/Download.jsp', timeout=8) version_index = binding_db_download_page_response.text.index('BindingDB_All_2D_') + 17 bindingdb_version = binding_db_download_page_response.text[version_index:version_index + 6] - except requests.exceptions.SSLError: - # currently the binding db SSL implementation is outdated/broken with the latest packages - self.logger.error(f'BINDING-DB had an SSL error while attempting to retrieve version. Returning default.') - return '202404' + self.bindingdb_version = bindingdb_version + return f"{bindingdb_version}" - return f"{bindingdb_version}" + except requests.exceptions.SSLError: + # BINDING-DB often has ssl related errors with the jsp page + error_message = f'BINDING-DB had an SSL error while attempting to retrieve version..' + except requests.exceptions.Timeout: + error_message = f'BINDING-DB timed out attempting to retrieve version...' + except ValueError: + error_message = f'BINDING-DB get_latest_source_version got a response but could not determine the version' + raise GetDataPullError(error_message=error_message) def get_data(self) -> int: """ Gets the bindingdb data. - """ + # download the zipped data data_puller = GetData() - for source in self.data_files: - source_url = f"{self.bindingdb_data_url}{source}" - data_puller.pull_via_http(source_url, self.data_path) + source_url = f"{self.bindingdb_data_url}{self.bd_archive_file_name}" + data_puller.pull_via_http(source_url, self.data_path) return True def parse_data(self) -> dict: @@ -123,7 +133,8 @@ def parse_data(self) -> dict: data_store= dict() columns = [[x.value,x.name] for x in BD_EDGEUMAN if x.name not in ['PMID','PUBCHEM_AID','PATENT_NUMBER','PUBCHEM_CID','UNIPROT_TARGET_CHAIN']] - for n,row in enumerate(generate_zipfile_rows(os.path.join(self.data_path,self.BD_archive_file_name), self.BD_file_name)): + zipped_data_path = os.path.join(self.data_path, self.bd_archive_file_name) + for n,row in enumerate(generate_zipfile_rows(zipped_data_path, self.bd_file_name)): if n == 0: continue if self.test_mode: diff --git a/parsers/FooDB/src/loadFDB.py b/parsers/FooDB/src/loadFDB.py index 2622d4bc..149a78d1 100644 --- a/parsers/FooDB/src/loadFDB.py +++ b/parsers/FooDB/src/loadFDB.py @@ -82,6 +82,7 @@ def get_data(self): # and get a reference to the data gatherer gd: GetData = GetData(self.logger.level) + if(self.full_url_path==None): self.get_latest_source_version() # get all the files noted above file_count, foodb_dir, self.tar_dir_name = gd.get_foodb_files(self.full_url_path, self.data_path, self.archive_name, self.data_files) diff --git a/parsers/GTEx/src/loadGTEx.py b/parsers/GTEx/src/loadGTEx.py index 86a3e8be..dc15ea93 100644 --- a/parsers/GTEx/src/loadGTEx.py +++ b/parsers/GTEx/src/loadGTEx.py @@ -22,6 +22,7 @@ class GTExLoader(SourceDataLoader): has_sequence_variants = True # this probably won't change very often - just hard code it for now + # TODO have GTEX dynamically get version and file url (starting point: https://gtexportal.org/api/v2/metadata/dataset) GTEX_VERSION = "8" # tissue name to uberon curies, the tissue names will match gtex file names diff --git a/parsers/IntAct/src/loadIA.py b/parsers/IntAct/src/loadIA.py index b77cd40b..84422394 100644 --- a/parsers/IntAct/src/loadIA.py +++ b/parsers/IntAct/src/loadIA.py @@ -84,6 +84,9 @@ def __init__(self, test_mode: bool = False, source_data_dir: str = None): """ super().__init__(test_mode=test_mode, source_data_dir=source_data_dir) + self.ftp_site = 'ftp.ebi.ac.uk' + self.ftp_dir = '/pub/databases/IntAct/current/psimitab/' + self.data_file: str = 'intact.zip' self.source_db: str = 'IntAct Molecular Interaction Database' @@ -100,7 +103,7 @@ def get_latest_source_version(self) -> str: gd = GetData(self.logger.level) # get the file date - ret_val: str = gd.get_ftp_file_date('ftp.ebi.ac.uk', '/pub/databases/IntAct/current/psimitab/', self.data_file) + ret_val: str = gd.get_ftp_file_date(self.ftp_site, self.ftp_dir, self.data_file) # return to the caller return ret_val @@ -112,7 +115,7 @@ def get_data(self) -> int: """ # get a reference to the data gathering class gd: GetData = GetData(self.logger.level) - file_count: int = gd.pull_via_ftp('ftp.ebi.ac.uk', '/pub/databases/IntAct/current/psimitab/', [self.data_file], + file_count: int = gd.pull_via_ftp(self.ftp_site, self.ftp_dir, [self.data_file], self.data_path) return file_count diff --git a/parsers/PHAROS/src/loadPHAROS.py b/parsers/PHAROS/src/loadPHAROS.py index 13e11d16..6fc62bef 100644 --- a/parsers/PHAROS/src/loadPHAROS.py +++ b/parsers/PHAROS/src/loadPHAROS.py @@ -19,7 +19,7 @@ class PHAROSLoader(SourceDataLoader): source_data_url = "https://pharos.nih.gov/" license = "Data accessed from Pharos and TCRD is publicly available from the primary sources listed above. Please respect their individual licenses regarding proper use and redistribution." attribution = 'Sheils, T., Mathias, S. et al, "TCRD and Pharos 2021: mining the human proteome for disease biology", Nucl. Acids Res., 2021. DOI: 10.1093/nar/gkaa993' - parsing_version: str = '1.6' + parsing_version: str = '1.7' GENE_TO_DISEASE_QUERY: str = """select distinct x.value, d.did, d.name, p.sym, d.dtype, d.score from disease d @@ -63,16 +63,18 @@ class PHAROSLoader(SourceDataLoader): } # we might want more granularity here but for now it's one-to-one source with KL/AT + # we will need to develop a procedure for merging KL/AT moving forward PHAROS_KL_AT_lookup = { 'CTD': (PREDICATION, MANUAL_AGENT), 'DisGeNET': (NOT_PROVIDED, NOT_PROVIDED), 'DrugCentral Indication': (KNOWLEDGE_ASSERTION, MANUAL_AGENT), 'eRAM': (NOT_PROVIDED, NOT_PROVIDED), + # For more information about JensenLab Databases: DOI: https://doi.org/10.1093/database/baac019 'JensenLab Experiment TIGA': (PREDICATION, AUTOMATED_AGENT), - 'JensenLab Knowledge AmyCo': (NOT_PROVIDED, NOT_PROVIDED), - 'JensenLab Knowledge MedlinePlus': (NOT_PROVIDED, NOT_PROVIDED), - 'JensenLab Knowledge UniProtKB-KW': (NOT_PROVIDED, NOT_PROVIDED), - 'JensenLab Text Mining': (NOT_PROVIDED, NOT_PROVIDED), + 'JensenLab Knowledge AmyCo': (KNOWLEDGE_ASSERTION, MANUAL_AGENT), + 'JensenLab Knowledge MedlinePlus': (KNOWLEDGE_ASSERTION, MANUAL_AGENT), + 'JensenLab Knowledge UniProtKB-KW': (KNOWLEDGE_ASSERTION, MANUAL_VALIDATION_OF_AUTOMATED_AGENT), + 'JensenLab Text Mining': (NOT_PROVIDED, TEXT_MINING_AGENT), 'Monarch': (NOT_PROVIDED, NOT_PROVIDED), 'UniProt Disease': (KNOWLEDGE_ASSERTION, MANUAL_AGENT) } diff --git a/parsers/Reactome/src/loadReactome.py b/parsers/Reactome/src/loadReactome.py index dac5b697..a816269b 100755 --- a/parsers/Reactome/src/loadReactome.py +++ b/parsers/Reactome/src/loadReactome.py @@ -109,9 +109,12 @@ def __init__(self, test_mode: bool = False, source_data_dir: str = None): super().__init__(test_mode=test_mode, source_data_dir=source_data_dir) self.version_url: str = 'https://reactome.org/about/news' + # we'll rename the neo4j dump as we download it to make neo4j usage easier + # (community edition only allows one database, having just one named 'neo4j' helps) self.neo4j_dump_file = 'reactome.graphdb.dump' + self.saved_neo4j_dump_file = 'neo4j.dump' self.data_url = 'https://reactome.org/download/current/' - self.data_files = [self.neo4j_dump_file] + self.data_files = [self.saved_neo4j_dump_file] self.triple_file: str = 'reactomeContents_CriticalTriples.csv' self.triple_path = os.path.dirname(os.path.abspath(__file__)) @@ -142,15 +145,25 @@ def get_latest_source_version(self) -> str: def get_data(self) -> bool: gd: GetData = GetData(self.logger.level) - for dt_file in self.data_files: - gd.pull_via_http(f'{self.data_url}{dt_file}', - self.data_path) + gd.pull_via_http(f'{self.data_url}{self.neo4j_dump_file}', + self.data_path, saved_file_name=self.saved_neo4j_dump_file) return True def parse_data(self): neo4j_tools = Neo4jTools() - neo4j_tools.load_backup_dump(f'{self.data_path}/{self.neo4j_dump_file}') - neo4j_tools.start_neo4j() + + neo4j_status_code = neo4j_tools.load_backup_dump(f'{self.data_path}/') + if neo4j_status_code: + raise SystemError('Neo4j failed to load the backup dump.') + + neo4j_status_code = neo4j_tools.migrate_dump_to_neo4j_5() + if neo4j_status_code: + raise SystemError('Neo4j failed to migrate the dump to neo4j 5.') + + neo4j_status_code = neo4j_tools.start_neo4j() + if neo4j_status_code: + raise SystemError('Neo4j failed to start.') + neo4j_tools.wait_for_neo4j_initialization() neo4j_driver = neo4j_tools.neo4j_driver diff --git a/parsers/ViralProteome/src/loadUniRef.py b/parsers/ViralProteome/src/loadUniRef.py index 7bd0d900..efdbbc41 100644 --- a/parsers/ViralProteome/src/loadUniRef.py +++ b/parsers/ViralProteome/src/loadUniRef.py @@ -90,6 +90,7 @@ def get_uniref_data(self) -> set: # are we in test mode if not self.test_mode: # get the list of taxa + #TODO: It looks like gd.get_ncbi_taxon_id_set doesn't resolve. It was removed in https://github.com/RobokopU24/ORION/commit/d3860356f2dac5779d1c15d651e644921dc48f88 target_taxon_set: set = gd.get_ncbi_taxon_id_set(self.data_path, self.TYPE_VIRUS) else: # create a test set of target taxa diff --git a/parsers/clinicaltrials/src/loadCTKP.py b/parsers/clinicaltrials/src/loadCTKP.py new file mode 100644 index 00000000..4b95ba1f --- /dev/null +++ b/parsers/clinicaltrials/src/loadCTKP.py @@ -0,0 +1,222 @@ +import enum +import os +import requests +import json + +from Common.biolink_constants import * +from Common.extractor import Extractor +from Common.utils import GetData +from Common.loader_interface import SourceDataLoader +from Common.utils import GetDataPullError + + +# the data header columns the nodes files are: +class NODESDATACOLS(enum.IntEnum): + ID = 0 + NAME = 1 + CATEGORY = 2 + + +# the data header columns for the edges file are: +class EDGESDATACOLS(enum.IntEnum): + ID = 0 + SUBJECT = 1 + PREDICATE = 2 + OBJECT = 3 + SUBJECT_NAME = 4 + OBJECT_NAME = 5 + CATEGORY = 6 + KNOWLEDGE_LEVEL = 7 + AGENT_TYPE = 8 + NCTID = 9 + PHASE = 10 + PRIMARY_PURPOSE = 11 + INTERVENTION_MODEL = 12 + TIME_PERSPECTIVE = 13 + OVERALL_STATUS = 14 + START_DATE = 15 + ENROLLMENT = 16 + ENROLLMENT_TYPE = 17 + AGE_RANGE = 18 + CHILD = 19 + ADULT = 20 + OLDER_ADULT = 21 + UNII = 22 + + +class CTKPLoader(SourceDataLoader): + source_id: str = "ClinicalTrialsKP" + provenance_id: str = "infores:biothings-multiomics-clinicaltrials" + description = "The Clinical Trials KP, created and maintained by the Multiomics Provider, provides information on Clinical Trials, ultimately derived from researcher submissions to clinicaltrials.gov, via the Aggregate Analysis of Clinical Trials (AACT) database). Information on select trials includes the NCT Identifier of the trial, interventions used, diseases/conditions relevant to the trial, adverse events, etc." + source_data_url = "https://aact.ctti-clinicaltrials.org/" + license = "https://github.com/ctti-clinicaltrials/aact/blob/dev/LICENSE" + attribution = "" + parsing_version = "1.0" + + def __init__(self, test_mode: bool = False, source_data_dir: str = None): + """ + :param test_mode - sets the run into test mode + :param source_data_dir - the specific storage directory to save files in + """ + super().__init__(test_mode=test_mode, source_data_dir=source_data_dir) + + # until we can use the manifest to determine versions and source data file locations we'll hard code it + self.node_file_name = 'clinical_trials_kg_nodes_v2.2.10.tsv' + self.edge_file_name = 'clinical_trials_kg_edges_v2.2.10.tsv' + self.data_url = "https://db.systemsbiology.net/gestalt/KG/" + + # once we use the manifest, we'll rename the files while downloading and they can be called something generic + # self.node_file_name = 'nodes.tsv' + # self.edge_file_name = 'edges.tsv' + + self.data_files = [ + self.node_file_name, + self.edge_file_name + ] + + self.aact_infores = "infores:aact" + self.ctgov_infores = "infores:clinicaltrials" + self.treats_predicate = "biolink:treats" + self.source_record_url = "https://db.systemsbiology.net/gestalt/cgi-pub/KGinfo.pl?id=" + + def get_latest_source_version(self) -> str: + latest_version = "2.2.10" + # we'd like to do this but for now we're using the dev version which is not in the manifest + # latest_version = self.get_manifest()['version'] + return latest_version + + @staticmethod + def get_manifest(): + manifest_response = requests.get('https://github.com/multiomicsKP/clinical_trials_kp/blob/main/manifest.json') + if manifest_response.status_code == 200: + manifest = manifest_response.json() + return manifest + else: + manifest_response.raise_for_status() + + def get_data(self) -> int: + """ + manifest = self.get_manifest() + source_data_urls = manifest['dumper']['data_url'] + nodes_url = None + edges_url = None + for data_url in source_data_urls: + if 'nodes' in data_url: + nodes_url = data_url + elif 'edges' in data_url: + edges_url = data_url + if not nodes_url and edges_url: + raise GetDataPullError(f'Could not determine nodes and edges files in CTKP manifest data urls: {source_data_urls}') + data_puller = GetData() + data_puller.pull_via_http(nodes_url, self.data_path, saved_file_name=self.node_file_name) + data_puller.pull_via_http(edges_url, self.data_path, saved_file_name=self.edge_file_name) + """ + data_puller = GetData() + for source in self.data_files: + source_url = f"{self.data_url}{source}" + data_puller.pull_via_http(source_url, self.data_path) + return True + + def parse_data(self) -> dict: + """ + Parses the data file for graph nodes/edges and writes them to the KGX files. + + :return: ret_val: record counts + """ + + extractor = Extractor(file_writer=self.output_file_writer) + + # get the nodes + # it's not really necessary because normalization will overwrite the only information here (name and category) + nodes_file: str = os.path.join(self.data_path, self.node_file_name) + with open(nodes_file, 'r') as fp: + extractor.csv_extract(fp, + lambda line: line[NODESDATACOLS.ID.value], # subject id + lambda line: None, # object id + lambda line: None, # predicate + lambda line: {NAME: line[NODESDATACOLS.NAME.value], + NODE_TYPES: line[NODESDATACOLS.CATEGORY.value]}, # subject props + lambda line: {}, # object props + lambda line: {}, # edgeprops + comment_character=None, + delim='\t', + has_header_row=True) + + edges_file: str = os.path.join(self.data_path, self.edge_file_name) + with open(edges_file, 'r') as fp: + extractor.csv_extract(fp, + lambda line: line[EDGESDATACOLS.SUBJECT.value], # subject id + lambda line: line[EDGESDATACOLS.OBJECT.value], # object id + lambda line: line[EDGESDATACOLS.PREDICATE.value], # predicate + lambda line: {}, # subject props + lambda line: {}, # object props + lambda line: self.get_edge_properties(line), # edgeprops + comment_character=None, + delim='\t', + has_header_row=True) + + return extractor.load_metadata + + def get_edge_properties(self, line): + + supporting_studies = [] + pred = str(line[EDGESDATACOLS.PREDICATE.value]) + nctids = str(line[EDGESDATACOLS.NCTID.value]).split(',') + phases = str(line[EDGESDATACOLS.PHASE.value]).split(',') + status = str(line[EDGESDATACOLS.OVERALL_STATUS.value]).split(',') + enroll = str(line[EDGESDATACOLS.ENROLLMENT.value]).split(',') + en_typ = str(line[EDGESDATACOLS.ENROLLMENT_TYPE.value]).split(',') + max_phase = 0 + elevate_to_prediction = False + for nctid, phase, stat, enrollment, enrollment_type in zip(nctids, phases, status, enroll, en_typ): + if float(phase) > max_phase: + max_phase = float(phase) + try: + enrollment = int(enrollment) + except ValueError: + enrollment = -1 + + supporting_study_attributes = { + "id": nctid, + "tested_intervention": "unsure" if pred == "biolink:mentioned_in_trials_for" else "yes", + "phase": phase, + "status": stat, + "study_size": enrollment + } + # convert to TRAPI format + supporting_studies.append( + {"attribute_type_id": HAS_SUPPORTING_STUDY_RESULT, + "value": nctid, + "attributes": [{"attribute_type_id": key, + "value": value} for key, value in supporting_study_attributes.items()]}) + + # if pred == "biolink:in_clinical_trials_for" and max_phase >= 4: + # elevate_to_prediction = True + + if pred == self.treats_predicate: + primary_knowledge_source = self.provenance_id + aggregator_knowledge_sources = [self.aact_infores] + supporting_data_source = self.ctgov_infores + else: + primary_knowledge_source = self.ctgov_infores + aggregator_knowledge_sources = [self.aact_infores, self.provenance_id] + supporting_data_source = None + + edge_attributes = { + EDGE_ID: line[EDGESDATACOLS.ID.value], + PRIMARY_KNOWLEDGE_SOURCE: primary_knowledge_source, + AGGREGATOR_KNOWLEDGE_SOURCES: aggregator_knowledge_sources, + KNOWLEDGE_LEVEL: line[EDGESDATACOLS.KNOWLEDGE_LEVEL.value], + AGENT_TYPE: line[EDGESDATACOLS.AGENT_TYPE.value], + MAX_RESEARCH_PHASE: str(float(max_phase)), + "elevate_to_prediction": elevate_to_prediction, # this isn't in biolink so not using a constant for now + # note source_record_urls should be paired with specific knowledge sources but currently + # there's no implementation for that, just pass it as a normal attribute for now + "source_record_urls": [self.source_record_url + line[EDGESDATACOLS.ID.value]] + } + if supporting_data_source: + edge_attributes[SUPPORTING_DATA_SOURCE] = supporting_data_source + # to handle nested attributes, use the "attributes" property which supports TRAPI attributes as json strings + if supporting_studies: + edge_attributes["attributes"] = [json.dumps(study) for study in supporting_studies] + return edge_attributes diff --git a/parsers/drugcentral/src/loaddrugcentral.py b/parsers/drugcentral/src/loaddrugcentral.py index 8e8da236..bd828a85 100644 --- a/parsers/drugcentral/src/loaddrugcentral.py +++ b/parsers/drugcentral/src/loaddrugcentral.py @@ -22,14 +22,14 @@ class DrugCentralLoader(SourceDataLoader): source_data_url = "https://drugcentral.org/download" license = "https://drugcentral.org/privacy" attribution = "https://drugcentral.org/about" - parsing_version: str = '1.4' + parsing_version: str = '1.5' - omop_relationmap = {'off-label use': 'RO:0002606', # is substance that treats - 'reduce risk': 'RO:0002606', # is substance that treats + omop_relationmap = {'off-label use': 'biolink:applied_to_treat', # is substance that treats + 'reduce risk': 'biolink:preventative_for_condition', # is substance that treats 'contraindication': 'NCIT:C37933', # contraindication 'symptomatic treatment': 'RO:0002606', # is substance that treats 'indication': 'RO:0002606', # is substance that treats - 'diagnosis': 'RO:0002606'} # there's only one row like this. + 'diagnosis': 'DrugCentral:5271'} # there's only one row like this. act_type_to_knowledge_source_map = {'IUPHAR': 'infores:gtopdb', 'KEGG DRUG': 'infores:kegg', diff --git a/parsers/gtopdb/src/loadGtoPdb.py b/parsers/gtopdb/src/loadGtoPdb.py index d75542cd..71625048 100644 --- a/parsers/gtopdb/src/loadGtoPdb.py +++ b/parsers/gtopdb/src/loadGtoPdb.py @@ -57,7 +57,7 @@ class GtoPdbLoader(SourceDataLoader): source_data_url = "http://www.guidetopharmacology.org/" license = "https://www.guidetopharmacology.org/about.jsp#license" attribution = "https://www.guidetopharmacology.org/citing.jsp" - parsing_version: str = '1.3' + parsing_version: str = '1.4' def __init__(self, test_mode: bool = False, source_data_dir: str = None): """ diff --git a/parsers/panther/src/loadPanther.py b/parsers/panther/src/loadPanther.py index d32133a5..ec8144f0 100644 --- a/parsers/panther/src/loadPanther.py +++ b/parsers/panther/src/loadPanther.py @@ -115,6 +115,7 @@ def get_data(self) -> int: # do the real thing if we arent in debug mode if not self.test_mode: # get the complete data set + #TODO make these class level variables. file_count: int = gd.pull_via_ftp('ftp.pantherdb.org', f'/sequence_classifications/{self.data_version}/PANTHER_Sequence_Classification_files/', [self.data_file], self.data_path) else: file_count: int = 1 diff --git a/parsers/yeast/src/loadHistoneMap.py b/parsers/yeast/src/loadHistoneMap.py index 617462e7..84859adc 100644 --- a/parsers/yeast/src/loadHistoneMap.py +++ b/parsers/yeast/src/loadHistoneMap.py @@ -275,9 +275,8 @@ def fetch_histone_data(self, "CTD:increases_abundance_of"] self.logger.debug('Histone Modifications Mapped to GO Terms!') - csv_fname = f"HistonePTM2GO.csv" histonePTM2GO_df = pd.DataFrame.from_dict(histonePTM2GO) - histonePTM2GO_df.to_csv(os.path.join(output_directory, csv_fname), encoding="utf-8-sig", index=False) + histonePTM2GO_df.to_csv(os.path.join(output_directory, self.histone_mod_to_go_term_file_name), encoding="utf-8-sig", index=False) for chr in chromosome_lengths.keys(): m = int(chromosome_lengths[chr]) for i in range(m): # Create loci nodes for chromosomes @@ -303,7 +302,7 @@ def fetch_histone_data(self, data['histoneMod'].append(ptm) genomelocidf = pd.DataFrame(data) self.logger.debug('Histone Modifications Loci Collected!') - genomelocidf.to_csv(os.path.join(output_directory, HISTONE_LOCI_FILE), encoding="utf-8-sig", index=False) + genomelocidf.to_csv(os.path.join(output_directory, self.histone_mod_list_file_name), encoding="utf-8-sig", index=False) if not generate_gene_mapping: return @@ -336,5 +335,4 @@ def fetch_histone_data(self, genomelocidf = genomelocidf.merge(just_windows, how='inner', on=['chromosomeID', 'start', 'end', 'loci']) self.logger.debug(f"Histone Modifications Mapping Complete!") - csv_f3name = f"HistoneMod2Gene.csv" - genomelocidf.to_csv(os.path.join(output_directory, csv_f3name), encoding="utf-8-sig", index=False) + genomelocidf.to_csv(os.path.join(output_directory, self.histone_mod_to_gene_file_name), encoding="utf-8-sig", index=False) diff --git a/requirements.txt b/requirements.txt index 736dc01a..519dcb40 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,18 +1,18 @@ -pandas==2.2.1 +pandas==2.2.2 requests==2.32.3 -pytest==8.1.1 -git+https://github.com/ObesityHub/robokop-genetics.git +pytest==8.2.0 +robokop-genetics==0.5.0 # intermine is on pypi but as of 6/23 it's broken for python 3.10+, this fork fixes the issue git+https://github.com/EvanDietzMorris/intermine-ws-python.git jsonlines==4.0.0 -pyyaml==6.0 -beautifulsoup4==4.11.1 +pyyaml==6.0.1 +beautifulsoup4==4.12.3 psycopg2-binary==2.9.9 -orjson==3.9.15 +orjson==3.10.3 xxhash==3.4.1 -mysql-connector-python==8.3.0 -neo4j==5.10.0 +mysql-connector-python==8.4.0 +neo4j==5.20.0 pyoxigraph==0.3.22 -curies==0.7.8 -prefixmaps==0.2.2 -bmt==1.2.1 +curies==0.7.9 +prefixmaps==0.2.4 +bmt==1.4.1 diff --git a/set_up_test_env.sh b/set_up_test_env.sh index 2cf5a754..1ef6edd3 100644 --- a/set_up_test_env.sh +++ b/set_up_test_env.sh @@ -26,8 +26,11 @@ export PYTHONPATH="$PYTHONPATH:$PWD" #The following environment variables are optional -export EDGE_NORMALIZATION_ENDPOINT=https://bl-lookup-sri.renci.org/ -export NODE_NORMALIZATION_ENDPOINT=https://nodenormalization-sri.renci.org/ -export NAME_RESOLVER_ENDPOINT=https://name-resolution-sri.renci.org/ -export ORION_OUTPUT_URL=https://localhost/ # this is currently only used to generate metadata -export BL_VERSION=4.1.6 +# export EDGE_NORMALIZATION_ENDPOINT=https://bl-lookup-sri.renci.org/ +# export NODE_NORMALIZATION_ENDPOINT=https://nodenormalization-sri.renci.org/ +# export NAME_RESOLVER_ENDPOINT=https://name-resolution-sri.renci.org/ +# export ORION_OUTPUT_URL=https://localhost/ # this is currently only used to generate metadata +# export BL_VERSION=4.2.1 + +# if you are building your own docker image and issues occur, setting the correct platform may help +# export DOCKER_PLATFORM=linux/arm64