diff --git a/.github/scripts/Bio_QC_check.py b/.github/scripts/Bio_QC_check.py new file mode 100644 index 00000000..4b6848ee --- /dev/null +++ b/.github/scripts/Bio_QC_check.py @@ -0,0 +1,46 @@ +import os +import requests + +PREDICATE_KEYWORDS = ["predicate", "biolink:", "edges"] +LABEL_NAME = "Biological Context QC" # Label to add if keywords are found + +# GitHub API variables +GITHUB_TOKEN = os.getenv("GITHUB_TOKEN") +REPO_NAME = os.getenv("GITHUB_REPOSITORY") +ISSUE_NUMBER = os.getenv("ISSUE_NUMBER") +print("GITHUB_TOKEN:", GITHUB_TOKEN) +print("REPO_NAME:", REPO_NAME) +print("ISSUE_NUMBER:", ISSUE_NUMBER) + +headers = {"Authorization": f"Bearer {GITHUB_TOKEN}"} +api_url = f"https://api.github.com/repos/{REPO_NAME}" + +def get_issue_details(issue_number): + response = requests.get(f"{api_url}/issues/{issue_number}", headers=headers) + response.raise_for_status() + return response.json() + +def add_label(issue_number, label_name): + response = requests.post( + f"{api_url}/issues/{issue_number}/labels", + headers=headers, + json={"labels": [label_name]} + ) + response.raise_for_status() + print(f"Label '{label_name}' added to issue/PR #{issue_number}") + +def check_keywords_in_text(text, keywords): + return any(keyword in text for keyword in keywords) + +def main(): + issue_details = get_issue_details(ISSUE_NUMBER) + title = issue_details["title"] + body = issue_details["body"] + + if check_keywords_in_text(title, PREDICATE_KEYWORDS) or check_keywords_in_text(body, PREDICATE_KEYWORDS): + add_label(ISSUE_NUMBER, LABEL_NAME) + else: + print("No predicate keywords found.") + +if __name__ == "__main__": + main() diff --git a/.github/workflows/label-predicate-changes.yml b/.github/workflows/label-predicate-changes.yml new file mode 100644 index 00000000..fd663254 --- /dev/null +++ b/.github/workflows/label-predicate-changes.yml @@ -0,0 +1,32 @@ +name: 'Label Predicate Changes' + +on: + pull_request: + types: [opened, edited, synchronize] + issues: + types: [opened, edited] + +jobs: + label_check: + runs-on: ubuntu-latest + + steps: + - name: Check out code + uses: actions/checkout@v3 + + - name: Set up Python + uses: actions/setup-python@v4 + with: + python-version: 3.9 + + - name: Install dependencies + run: | + pip install -r requirements.txt + pip install PyGithub + + - name: Run predicate check + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + ISSUE_NUMBER: ${{ github.event.pull_request.number || github.event.issue.number }} + run: | + python .github/scripts/Bio_QC_check.py \ No newline at end of file diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index a78d4833..eb959fe5 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -13,27 +13,27 @@ jobs: runs-on: ubuntu-latest steps: - name: Check out the repo - uses: actions/checkout@v2 + uses: actions/checkout@v4 - name: Get the version id: get_version run: echo ::set-output name=VERSION::${GITHUB_REF/refs\/tags\//} - - name: Extract metadata (tags, labels) for Docker - id: meta - uses: docker/metadata-action@98669ae865ea3cffbcbaa878cf57c20bbf1c6c38 - with: - images: - ghcr.io/${{ github.repository }} - name: Login to ghcr - uses: docker/login-action@v1 + uses: docker/login-action@f4ef78c080cd8ba55a85445d5b36e214a81df20a with: registry: ${{ env.REGISTRY }} username: ${{ github.actor }} password: ${{ secrets.GITHUB_TOKEN }} + - name: Extract metadata (tags, labels) for Docker + id: meta + uses: docker/metadata-action@9ec57ed1fcdbf14dcef7dfbe97b2010124a938b7 + with: + images: + ghcr.io/${{ github.repository }} - name: Push to GitHub Packages - uses: docker/build-push-action@ad44023a93711e3deb337508980b4b5e9bcdc5dc + uses: docker/build-push-action@3b5e8027fcad23fda98b2e3ac259d8d67585f671 with: context: . push: true tags: ${{ steps.meta.outputs.tags }} labels: ${{ steps.meta.outputs.labels }} - build-args: VERSION=${{ steps.get_version.outputs.VERSION }} \ No newline at end of file + build-args: VERSION=${{ steps.get_version.outputs.VERSION }} diff --git a/Common/biolink_constants.py b/Common/biolink_constants.py index 80b12438..15bd0a32 100644 --- a/Common/biolink_constants.py +++ b/Common/biolink_constants.py @@ -48,10 +48,13 @@ PREDICATE = 'predicate' PRIMARY_KNOWLEDGE_SOURCE = 'primary_knowledge_source' AGGREGATOR_KNOWLEDGE_SOURCES = 'aggregator_knowledge_source' +SUPPORTING_DATA_SOURCE = 'supporting_data_source' P_VALUE = 'p_value' ADJUSTED_P_VALUE = 'adjusted_p_value' AGENT_TYPE = 'agent_type' KNOWLEDGE_LEVEL = 'knowledge_level' +MAX_RESEARCH_PHASE = 'max_research_phase' +HAS_SUPPORTING_STUDY_RESULT = 'has_supporting_study_result' # enums for knowledge level KNOWLEDGE_ASSERTION = 'knowledge_assertion' @@ -137,6 +140,7 @@ PREDICATE, PRIMARY_KNOWLEDGE_SOURCE, AGGREGATOR_KNOWLEDGE_SOURCES, + SUPPORTING_DATA_SOURCE, PUBLICATIONS, SYNONYMS, DESCRIPTION, @@ -147,6 +151,8 @@ FDA_APPROVAL_STATUS, KNOWLEDGE_LEVEL, MECHANISM_OF_ACTION, + MAX_RESEARCH_PHASE, + HAS_SUPPORTING_STUDY_RESULT, # qualifiers ANATOMICAL_CONTEXT_QUALIFIER, CAUSAL_MECHANISM_QUALIFIER, diff --git a/Common/build_manager.py b/Common/build_manager.py index b44e1a08..1b199b7f 100644 --- a/Common/build_manager.py +++ b/Common/build_manager.py @@ -12,17 +12,19 @@ from Common.load_manager import SourceDataManager from Common.kgx_file_merger import KGXFileMerger from Common.neo4j_tools import create_neo4j_dump -from Common.kgxmodel import GraphSpec, SubGraphSource, DataSource, NormalizationScheme -from Common.normalization import NORMALIZATION_CODE_VERSION +from Common.kgxmodel import GraphSpec, SubGraphSource, DataSource +from Common.normalization import NORMALIZATION_CODE_VERSION, NormalizationScheme from Common.metadata import Metadata, GraphMetadata, SourceMetadata from Common.supplementation import SequenceVariantSupplementation from Common.biolink_constants import PRIMARY_KNOWLEDGE_SOURCE, AGGREGATOR_KNOWLEDGE_SOURCES, PREDICATE, PUBLICATIONS from Common.meta_kg import MetaKnowledgeGraphBuilder, META_KG_FILENAME, TEST_DATA_FILENAME from Common.redundant_kg import generate_redundant_kg +from Common.collapse_qualifiers import generate_collapsed_qualifiers_kg NODES_FILENAME = 'nodes.jsonl' EDGES_FILENAME = 'edges.jsonl' REDUNDANT_EDGES_FILENAME = 'redundant_edges.jsonl' +COLLAPSED_QUALIFIERS_FILENAME = 'collapsed_qualifier_edges.jsonl' class GraphBuilder: @@ -115,6 +117,49 @@ def build_graph(self, graph_id: str): output_formats = graph_spec.graph_output_format.lower().split('+') if graph_spec.graph_output_format else [] nodes_filepath = os.path.join(graph_output_dir, NODES_FILENAME) edges_filepath = os.path.join(graph_output_dir, EDGES_FILENAME) + + if 'redundant_jsonl' in output_formats: + self.logger.info(f'Generating redundant edge KG for {graph_id}...') + redundant_filepath = edges_filepath.replace(EDGES_FILENAME, REDUNDANT_EDGES_FILENAME) + generate_redundant_kg(edges_filepath, redundant_filepath) + + if 'redundant_neo4j' in output_formats: + self.logger.info(f'Generating redundant edge KG for {graph_id}...') + redundant_filepath = edges_filepath.replace(EDGES_FILENAME, REDUNDANT_EDGES_FILENAME) + generate_redundant_kg(edges_filepath, redundant_filepath) + self.logger.info(f'Starting Neo4j dump pipeline for redundant {graph_id}...') + dump_success = create_neo4j_dump(nodes_filepath=nodes_filepath, + edges_filepath=redundant_filepath, + output_directory=graph_output_dir, + graph_id=graph_id, + graph_version=graph_version, + logger=self.logger) + + if dump_success: + graph_output_url = self.get_graph_output_URL(graph_id, graph_version) + graph_metadata.set_dump_url(f'{graph_output_url}graph_{graph_version}_redundant.db.dump') + + if 'collapsed_qualifiers_jsonl' in output_formats: + self.logger.info(f'Generating collapsed qualifier predicates KG for {graph_id}...') + collapsed_qualifiers_filepath = edges_filepath.replace(EDGES_FILENAME, COLLAPSED_QUALIFIERS_FILENAME) + generate_collapsed_qualifiers_kg(edges_filepath, collapsed_qualifiers_filepath) + + if 'collapsed_qualifiers_neo4j' in output_formats: + self.logger.info(f'Generating collapsed qualifier predicates KG for {graph_id}...') + collapsed_qualifiers_filepath = edges_filepath.replace(EDGES_FILENAME, COLLAPSED_QUALIFIERS_FILENAME) + generate_collapsed_qualifiers_kg(edges_filepath, collapsed_qualifiers_filepath) + self.logger.info(f'Starting Neo4j dump pipeline for {graph_id} with collapsed qualifiers...') + dump_success = create_neo4j_dump(nodes_filepath=nodes_filepath, + edges_filepath=collapsed_qualifiers_filepath, + output_directory=graph_output_dir, + graph_id=graph_id, + graph_version=graph_version, + logger=self.logger) + + if dump_success: + graph_output_url = self.get_graph_output_URL(graph_id, graph_version) + graph_metadata.set_dump_url(f'{graph_output_url}graph_{graph_version}_collapsed_qualifiers.db.dump') + if 'neo4j' in output_formats: self.logger.info(f'Starting Neo4j dump pipeline for {graph_id}...') dump_success = create_neo4j_dump(nodes_filepath=nodes_filepath, @@ -128,19 +173,13 @@ def build_graph(self, graph_id: str): graph_output_url = self.get_graph_output_URL(graph_id, graph_version) graph_metadata.set_dump_url(f'{graph_output_url}graph_{graph_version}.db.dump') - if 'redundant_jsonl' in output_formats: - self.logger.info(f'Generating redundant edge KG for {graph_id}...') - redundant_filepath = edges_filepath.replace(EDGES_FILENAME, REDUNDANT_EDGES_FILENAME) - generate_redundant_kg(edges_filepath, redundant_filepath) - def build_dependencies(self, graph_spec: GraphSpec): for subgraph_source in graph_spec.subgraphs: subgraph_id = subgraph_source.id subgraph_version = subgraph_source.version if self.check_for_existing_graph_dir(subgraph_id, subgraph_version): # load previous metadata - graph_metadata = self.get_graph_metadata(subgraph_id, subgraph_version) - subgraph_source.graph_metadata = graph_metadata.metadata + subgraph_source.graph_metadata = self.get_graph_metadata(subgraph_id, subgraph_version) elif self.current_graph_versions[subgraph_id] == subgraph_version: self.logger.warning(f'For graph {graph_spec.graph_id} subgraph dependency ' f'{subgraph_id} version {subgraph_version} is not ready. Building now...') diff --git a/Common/collapse_qualifiers.py b/Common/collapse_qualifiers.py new file mode 100644 index 00000000..f3563a0a --- /dev/null +++ b/Common/collapse_qualifiers.py @@ -0,0 +1,171 @@ +try: + from tqdm import tqdm + TQDM_AVAILABLE = True +except ImportError: + TQDM_AVAILABLE = False + +from Common.biolink_constants import PREDICATE, QUALIFIED_PREDICATE, SUBJECT_DERIVATIVE_QUALIFIER, SUBJECT_FORM_OR_VARIANT_QUALIFIER, SUBJECT_PART_QUALIFIER, \ + SUBJECT_DIRECTION_QUALIFIER, SUBJECT_ASPECT_QUALIFIER, OBJECT_DERIVATIVE_QUALIFIER, OBJECT_FORM_OR_VARIANT_QUALIFIER, \ + OBJECT_PART_QUALIFIER, OBJECT_DIRECTION_QUALIFIER, OBJECT_ASPECT_QUALIFIER, CAUSAL_MECHANISM_QUALIFIER, \ + ANATOMICAL_CONTEXT_QUALIFIER, SPECIES_CONTEXT_QUALIFIER +from Common.biolink_utils import get_biolink_model_toolkit +from Common.utils import quick_jsonl_file_iterator +from Common.kgx_file_writer import KGXFileWriter + +### The goal of this script is to collapse the qualifiers, which are in edge properties, into a single statement, then replace the +### existing predicate label with the collapsed qualifier statement. + +### Call the biolink model toolkit to get the list of all qualifiers. This may change, but the way qualifiers are handled is currently hard-coded in this script. +bmt = get_biolink_model_toolkit() + +def write_edge_no_q(edge, predicate, qualifiers): + tmp_edge = edge.copy() + tmp_edge[PREDICATE] = f"{predicate}" + for qualifier in qualifiers.keys(): + tmp_edge.pop(qualifier, None) + return tmp_edge + +def aspect_qualifier_semantic_adjustment(aspect_qualifier): + # TODO check if other aspect qualifiers besides molecular interaction need to be treated differently. + if aspect_qualifier.split('_')[-1] == 'interaction': + aspect_conversion = aspect_qualifier + "_with" + else: + aspect_conversion = aspect_qualifier + "_of" + return aspect_conversion + +def form_or_variant_qualifier_semantic_adjustment(form_or_variant_qualifier): + # TODO check if other form_or_variant_qualifier qualifiers besides molecular interaction need to be treated differently. + form_or_variant_conversion = form_or_variant_qualifier + "_of" + return form_or_variant_conversion + +def causal_mechanism_qualifier_semantic_adjustment(causal_mechanism_qualifier): + # TODO check if other causal_mechanism qualifiers besides molecular interaction need to be treated differently. + causal_mechanism_qualifier = "via_"+ causal_mechanism_qualifier + return causal_mechanism_qualifier + +def species_context_qualifier_semantic_adjustment(species_context_qualifier): + species_context_qualifier = "in_"+ species_context_qualifier + return species_context_qualifier + +def anatomical_context_qualifier_semantic_adjustment(anatomical_context_qualifier, species_context_qualifier=False): + if species_context_qualifier == False: + anatomical_context_qualifier = "in_"+ anatomical_context_qualifier + return anatomical_context_qualifier + +def generate_collapsed_qualifiers_kg(infile, edges_file_path): + + with KGXFileWriter(edges_output_file_path=edges_file_path) as kgx_file_writer: + for edge in tqdm(quick_jsonl_file_iterator(infile)) if TQDM_AVAILABLE else quick_jsonl_file_iterator(infile): + + try: + edge_predicate = edge['predicate'] + except KeyError: + print(f"Collapsed Qualifiers Graph Failed - missing predicate on edge: {edge}") + break + + qualifiers = {key:value for key, value in edge.items() if bmt.is_qualifier(key)} + # Count the number of qualifiers and print a warning if number of qualifiers we handle in the next section doesn't match number of qualifiers detected. + # This will help warn us if new qualifiers are added in the future while giving us the option to still run the script as is. + qualifier_count = len(qualifiers.keys()) + counted_qualifiers = 0 + + # The following section crafts a new collapsed qualifier statement to replace the edge predicate, but needs to do some semantic adjustment. + # This is where to edit if the biolink model ever changes and handles qualifiers differently. + # Take guidance from: https://biolink.github.io/biolink-model/reading-a-qualifier-based-statement/ + # Example jsonl edge used here: {"subject":"UNII:7PK6VC94OU","predicate":"biolink:affects","object":"NCBIGene:6531","primary_knowledge_source":"infores:ctd","description":"decreases activity of","NCBITaxon":"9606","publications":["PMID:30776375"],"knowledge_level":"knowledge_assertion","agent_type":"manual_agent","subject_direction_qualifier":"increased","subject_aspect_qualifier":"abundance","subject_form_or_variant_qualifier":"mutant_form","subject_derivative_qualifier":"transcript","subject_part_qualifier":"polyA_tail","object_aspect_qualifier":"activity","object_direction_qualifier":"upregulated","object_form_or_variant_qualifier":"wildtype_form","object_derivative_qualifier":"protein","object_part_qualifier":"catalytic_site","causal_mechanism_qualifier":"phosyphorylation","species_context_qualifier":"human","anatomical_context_qualifier":"liver","qualified_predicate":"biolink:causes"} + + qualifier_statement = "" + + # Add on subject direction and aspect qualifiers first. eg. "increased_abundance_of_" + if SUBJECT_DIRECTION_QUALIFIER in qualifiers.keys(): + counted_qualifiers+= 1 + qualifier_statement+= qualifiers[SUBJECT_DIRECTION_QUALIFIER] + qualifier_statement+= "_" + if SUBJECT_ASPECT_QUALIFIER in qualifiers.keys(): + counted_qualifiers+= 1 + qualifier_statement+= aspect_qualifier_semantic_adjustment(qualifiers[SUBJECT_ASPECT_QUALIFIER]) + qualifier_statement+= "_" + # Add on subject form_or_variant qualifiers. eg. "increased_abundance_of_mutant_form_of_" + if SUBJECT_FORM_OR_VARIANT_QUALIFIER in qualifiers.keys(): + counted_qualifiers+= 1 + qualifier_statement+= form_or_variant_qualifier_semantic_adjustment(qualifiers[SUBJECT_FORM_OR_VARIANT_QUALIFIER]) + qualifier_statement+= "_" + # Add placeholder slot for subject node. eg. "increased_abundance_of_mutant_form_of_" + qualifier_statement+= "_" + # Add on subject derivative and part qualifiers. eg. "increased_abundance_of_mutant_form_of_transcript_poly_A_tail" + if SUBJECT_DERIVATIVE_QUALIFIER in qualifiers.keys(): + counted_qualifiers+= 1 + qualifier_statement+= qualifiers[SUBJECT_DERIVATIVE_QUALIFIER] + qualifier_statement+= "_" + if SUBJECT_PART_QUALIFIER in qualifiers.keys(): + counted_qualifiers+= 1 + qualifier_statement+= qualifiers[SUBJECT_PART_QUALIFIER] + qualifier_statement+= "_" + + # Add the qualified predicate. eg. "increased_abundance_of_mutant_form_of__transcript_poly_A_tail_causes" + if QUALIFIED_PREDICATE in qualifiers.keys(): + counted_qualifiers+= 1 + qualifier_statement+= qualifiers[QUALIFIED_PREDICATE].replace("biolink:","") + qualifier_statement+= "_" + + # Add on object direction and aspect qualifiers. eg. "increased_abundance_of_mutant_form_of_transcript_poly_A_tail_causes_upregulated_activity_of" + if OBJECT_DIRECTION_QUALIFIER in qualifiers.keys(): + counted_qualifiers+= 1 + qualifier_statement+= qualifiers[OBJECT_DIRECTION_QUALIFIER] + qualifier_statement+= "_" + if OBJECT_ASPECT_QUALIFIER in qualifiers.keys(): + counted_qualifiers+= 1 + qualifier_statement+= aspect_qualifier_semantic_adjustment(qualifiers[OBJECT_ASPECT_QUALIFIER]) + qualifier_statement+= "_" + # Add on object form_or_variant qualifiers. eg. "increased_abundance_of_mutant_form_of_transcript_poly_A_tail_causes_upregulated_activity_of_mutant_form_of" + if OBJECT_FORM_OR_VARIANT_QUALIFIER in qualifiers.keys(): + counted_qualifiers+= 1 + qualifier_statement+= form_or_variant_qualifier_semantic_adjustment(qualifiers[OBJECT_FORM_OR_VARIANT_QUALIFIER]) + qualifier_statement+= "_" + # Add placeholder slot for object node. eg. "increased_abundance_of_mutant_form_of_transcript_poly_A_tail_causes_upregulated_activity_of_mutant_form_of_" + qualifier_statement+= "" + + # Add on object derivative and part qualifiers. eg. "increased_abundance_of_mutant_form_of_transcript_poly_A_tail_causes_upregulated_activity_of_mutant_form_of__protein_catalytic_site" + # Need to start putting "_" before each qualifier as any given one could be the last in the statement. + if OBJECT_DERIVATIVE_QUALIFIER in qualifiers.keys(): + counted_qualifiers+= 1 + qualifier_statement+= "_" + qualifier_statement+= qualifiers[OBJECT_DERIVATIVE_QUALIFIER] + if OBJECT_PART_QUALIFIER in qualifiers.keys(): + counted_qualifiers+= 1 + qualifier_statement+= "_" + qualifier_statement+= qualifiers[OBJECT_PART_QUALIFIER] + + # Add on mechanism qualifiers. eg. "increased_abundance_of_mutant_form_of_transcript_poly_A_tail_causes_upregulated_activity_of_mutant_form_of__protein_catalytic_site_via_phosphorylation" + if CAUSAL_MECHANISM_QUALIFIER in qualifiers.keys(): + counted_qualifiers+= 1 + qualifier_statement+= "_" + qualifier_statement+= causal_mechanism_qualifier_semantic_adjustment(qualifiers[CAUSAL_MECHANISM_QUALIFIER]) + + # Add on species qualifiers. eg. "increased_abundance_of_mutant_form_of_transcript_poly_A_tail_causes_upregulated_activity_of_mutant_form_of__protein_catalytic_site_via_phosphorylation_in_human" + if SPECIES_CONTEXT_QUALIFIER in qualifiers.keys(): + counted_qualifiers+= 1 + qualifier_statement+= "_" + qualifier_statement+= species_context_qualifier_semantic_adjustment(qualifiers[SPECIES_CONTEXT_QUALIFIER]) + + # Add on anatomical context qualifiers. eg. "increased_abundance_of_mutant_form_of_transcript_poly_A_tail_causes_upregulated_activity_of_mutant_form_of__protein_catalytic_site_via_phosphorylation_in_human_liver" + if ANATOMICAL_CONTEXT_QUALIFIER in qualifiers.keys(): + counted_qualifiers+= 1 + qualifier_statement+= "_" + if SPECIES_CONTEXT_QUALIFIER in qualifiers.keys(): + species_qualifier = True + else: + species_qualifier = False + qualifier_statement+= anatomical_context_qualifier_semantic_adjustment(qualifiers[ANATOMICAL_CONTEXT_QUALIFIER], species_qualifier) + + if counted_qualifiers < qualifier_count: + print(f"Qualifiers on edge: {edge} are not all being handled correctly. Please revise collapse_qualifiers.py to handle all qualifiers.") + + # Either rewrite the original edge if no qualifier collapsing happened, or rewrite with new predicate from qualifier_statement. + edges_to_write = [] + if qualifier_statement != "": + edges_to_write.append(write_edge_no_q(edge, qualifier_statement, qualifiers)) + else: + edges_to_write.append(edge) + + kgx_file_writer.write_normalized_edges(edges_to_write) diff --git a/Common/data_sources.py b/Common/data_sources.py index cb39764c..0aeb437e 100644 --- a/Common/data_sources.py +++ b/Common/data_sources.py @@ -4,6 +4,7 @@ BINDING_DB = 'BINDING-DB' CAM_KP = 'CAM-KP' CHEBI_PROPERTIES = 'CHEBIProps' +CLINICAL_TRIALS_KP = 'ClinicalTrialsKP' CORD19 = 'Cord19' CTD = 'CTD' DRUG_CENTRAL = 'DrugCentral' @@ -18,6 +19,7 @@ HMDB = 'HMDB' HUMAN_GOA = 'HumanGOA' INTACT = 'IntAct' +LINCS = 'LINCS' LITCOIN = 'LitCoin' LITCOIN_SAPBERT = 'LitCoinSapBERT' LITCOIN_ENTITY_EXTRACTOR = 'LitCoinEntityExtractor' @@ -52,6 +54,7 @@ BINDING_DB: ("parsers.BINDING.src.loadBINDINGDB", "BINDINGDBLoader"), CAM_KP: ("parsers.camkp.src.loadCAMKP", "CAMKPLoader"), CHEBI_PROPERTIES: ("parsers.chebi.src.loadChebiProperties", "ChebiPropertiesLoader"), + CLINICAL_TRIALS_KP: ("parsers.clinicaltrials.src.loadCTKP", "CTKPLoader"), CORD19: ("parsers.cord19.src.loadCord19", "Cord19Loader"), CTD: ("parsers.CTD.src.loadCTD", "CTDLoader"), DRUG_CENTRAL: ("parsers.drugcentral.src.loaddrugcentral", "DrugCentralLoader"), @@ -66,6 +69,7 @@ HUMAN_GOA: ("parsers.GOA.src.loadGOA", "HumanGOALoader"), HUMAN_STRING: ("parsers.STRING.src.loadSTRINGDB", "HumanSTRINGDBLoader"), INTACT: ("parsers.IntAct.src.loadIA", "IALoader"), + LINCS: ("parsers.LINCS.src.loadLINCS", "LINCSLoader"), LITCOIN: ("parsers.LitCoin.src.loadLitCoin", "LitCoinLoader"), LITCOIN_ENTITY_EXTRACTOR: ("parsers.LitCoin.src.loadLitCoin", "LitCoinEntityExtractorLoader"), LITCOIN_SAPBERT: ("parsers.LitCoin.src.loadLitCoin", "LitCoinSapBERTLoader"), diff --git a/Common/kgx_file_converter.py b/Common/kgx_file_converter.py index 304c3b1b..e2f8d4bf 100644 --- a/Common/kgx_file_converter.py +++ b/Common/kgx_file_converter.py @@ -94,7 +94,7 @@ def __determine_properties_and_types(file_path: str, required_properties: dict): for key, value in entity.items(): if value is None: property_type_counts[key]["None"] += 1 - if key in required_properties: + if key in required_properties and key != "name": print(f'WARNING: Required property ({key}) was None: {entity.items()}') raise Exception( f'None found as a value for a required property (property: {key}) in line {entity.items()}') @@ -134,7 +134,7 @@ def __determine_properties_and_types(file_path: str, required_properties: dict): # if 'None' in prop_types: # print(f'WARNING: None found as a value for property {prop}') - if prop in required_properties and (num_prop_types > 1): + if prop in required_properties and (num_prop_types > 1) and prop != "name": # TODO this should just enforce that required properties are the correct type, # instead of trying to establish the type raise Exception(f'Required property {prop} had multiple conflicting types: {type_counts.items()}') @@ -192,7 +192,10 @@ def __convert_to_csv(input_file: str, for item in quick_jsonl_file_iterator(input_file): for key in list(item.keys()): if item[key] is None: - del item[key] + if key == "name": + item["name"] = item["id"] + else: + del item[key] else: prop_type = properties[key] # convert lists into strings with an array delimiter diff --git a/Common/kgx_file_merger.py b/Common/kgx_file_merger.py index 9ceb0105..b6d54159 100644 --- a/Common/kgx_file_merger.py +++ b/Common/kgx_file_merger.py @@ -85,8 +85,10 @@ def merge_primary_sources(self, needs_on_disk_merge = False for graph_source in graph_sources: if isinstance(graph_source, SubGraphSource): - needs_on_disk_merge = True - break + for source_id in graph_source.graph_metadata.get_source_ids(): + if source_id in RESOURCE_HOGS: + needs_on_disk_merge = True + break elif graph_source.id in RESOURCE_HOGS: needs_on_disk_merge = True break diff --git a/Common/kgx_file_normalizer.py b/Common/kgx_file_normalizer.py index b0ad5457..cdd97c9e 100644 --- a/Common/kgx_file_normalizer.py +++ b/Common/kgx_file_normalizer.py @@ -5,25 +5,13 @@ from Common.biolink_utils import BiolinkInformationResources, INFORES_STATUS_INVALID, INFORES_STATUS_DEPRECATED from Common.biolink_constants import SEQUENCE_VARIANT, PRIMARY_KNOWLEDGE_SOURCE, AGGREGATOR_KNOWLEDGE_SOURCES, \ PUBLICATIONS, OBJECT_ID, SUBJECT_ID, PREDICATE, SUBCLASS_OF -from Common.normalization import NodeNormalizer, EdgeNormalizer, EdgeNormalizationResult +from Common.normalization import NormalizationScheme, NodeNormalizer, EdgeNormalizer, EdgeNormalizationResult, \ + NormalizationFailedError from Common.utils import LoggingUtil, chunk_iterator from Common.kgx_file_writer import KGXFileWriter -from Common.kgxmodel import NormalizationScheme from Common.merging import MemoryGraphMerger, DiskGraphMerger -class NormalizationBrokenError(Exception): - def __init__(self, error_message: str, actual_error: Exception=None): - self.error_message = error_message - self.actual_error = actual_error - - -class NormalizationFailedError(Exception): - def __init__(self, error_message: str, actual_error: Exception=None): - self.error_message = error_message - self.actual_error = actual_error - - EDGE_PROPERTIES_THAT_SHOULD_BE_SETS = {AGGREGATOR_KNOWLEDGE_SOURCES, PUBLICATIONS} NODE_NORMALIZATION_BATCH_SIZE = 1_000_000 EDGE_NORMALIZATION_BATCH_SIZE = 1_000_000 @@ -350,6 +338,7 @@ def normalize_edge_file(self): # this could happen due to rare cases of normalization splits where one node normalizes to many if edge_count > 1: edge_splits += edge_count - 1 + graph_merger.merge_edges(normalized_edges) self.logger.info(f'Processed {number_of_source_edges} edges so far...') diff --git a/Common/kgxmodel.py b/Common/kgxmodel.py index 65cb9088..d18c2f82 100644 --- a/Common/kgxmodel.py +++ b/Common/kgxmodel.py @@ -1,6 +1,7 @@ from dataclasses import dataclass from Common.biolink_constants import NAMED_THING -from Common.normalization import NORMALIZATION_CODE_VERSION +from Common.metadata import GraphMetadata +from Common.normalization import NormalizationScheme class kgxnode: def __init__(self, @@ -33,31 +34,6 @@ def __init__(self, self.properties = {} -@dataclass -class NormalizationScheme: - node_normalization_version: str = 'latest' - edge_normalization_version: str = 'latest' - normalization_code_version: str = NORMALIZATION_CODE_VERSION - strict: bool = True - conflation: bool = False - - def get_composite_normalization_version(self): - composite_normalization_version = f'{self.node_normalization_version}_' \ - f'{self.edge_normalization_version}_{self.normalization_code_version}' - if self.conflation: - composite_normalization_version += '_conflated' - if self.strict: - composite_normalization_version += '_strict' - return composite_normalization_version - - def get_metadata_representation(self): - return {'node_normalization_version': self.node_normalization_version, - 'edge_normalization_version': self.edge_normalization_version, - 'normalization_code_version': self.normalization_code_version, - 'conflation': self.conflation, - 'strict': self.strict} - - @dataclass class GraphSpec: graph_id: str @@ -91,13 +67,13 @@ class GraphSource: @dataclass class SubGraphSource(GraphSource): - graph_metadata: dict = None + graph_metadata: GraphMetadata = None def get_metadata_representation(self): return {'graph_id': self.id, 'release_version': self.version, 'merge_strategy:': self.merge_strategy, - 'graph_metadata': self.graph_metadata} + 'graph_metadata': self.graph_metadata.metadata if self.graph_metadata else None} @dataclass diff --git a/Common/load_manager.py b/Common/load_manager.py index 323dfe53..9ba5aa44 100644 --- a/Common/load_manager.py +++ b/Common/load_manager.py @@ -5,9 +5,8 @@ from Common.data_sources import SourceDataLoaderClassFactory, RESOURCE_HOGS, get_available_data_sources from Common.utils import LoggingUtil, GetDataPullError -from Common.kgx_file_normalizer import KGXFileNormalizer, NormalizationBrokenError, NormalizationFailedError -from Common.kgxmodel import NormalizationScheme -from Common.normalization import NodeNormalizer, EdgeNormalizer +from Common.kgx_file_normalizer import KGXFileNormalizer +from Common.normalization import NormalizationScheme, NodeNormalizer, EdgeNormalizer, NormalizationFailedError from Common.metadata import SourceMetadata from Common.loader_interface import SourceDataBrokenError, SourceDataFailedError from Common.supplementation import SequenceVariantSupplementation, SupplementationFailedError @@ -356,17 +355,6 @@ def normalize_source(self, normalization_status=SourceMetadata.STABLE, normalization_info=normalization_info) return True - except NormalizationBrokenError as broken_error: - error_message = f"{source_id} NormalizationBrokenError: {broken_error.error_message}" - if broken_error.actual_error: - error_message += f" - {broken_error.actual_error}" - self.logger.error(error_message) - source_metadata.update_normalization_metadata(parsing_version, - composite_normalization_version, - normalization_status=SourceMetadata.BROKEN, - normalization_error=error_message, - normalization_time=current_time) - return False except NormalizationFailedError as failed_error: error_message = f"{source_id} NormalizationFailedError: {failed_error.error_message}" if failed_error.actual_error: diff --git a/Common/merging.py b/Common/merging.py index d1b01d85..ce617f0f 100644 --- a/Common/merging.py +++ b/Common/merging.py @@ -19,17 +19,34 @@ def edge_key_function(edge): def entity_merging_function(entity_1, entity_2, properties_that_are_sets): - for key, value in entity_2.items(): - # TODO - make sure this is the behavior we want - - # for properties that are lists append the values - # otherwise keep the first one - if key in entity_1: - if isinstance(value, list): - entity_1[key].extend(value) - if key in properties_that_are_sets: - entity_1[key] = list(set(entity_1[key])) + # for every property of entity 2 + for key, entity_2_value in entity_2.items(): + # if entity 1 also has the property and entity_2_value is not null/empty: + # concatenate values if one is a list, otherwise ignore the property from entity 2 + if (key in entity_1) and entity_2_value: + entity_1_value = entity_1[key] + entity_1_is_list = isinstance(entity_1_value, list) + entity_2_is_list = isinstance(entity_2_value, list) + if entity_1_is_list and entity_2_is_list: + # if they're both lists just combine them + entity_1_value.extend(entity_2_value) + elif entity_1_is_list: + # if 1 is a list and 2 isn't, append the value of 2 to the list from 1 + entity_1_value.append(entity_2_value) + elif entity_2_is_list: + if entity_1_value: + # if 2 is a list and 1 has a value, add the value of 1 to the list from 2 + entity_1[key] = [entity_1_value] + entity_2_value + else: + # if 2 is a list and 1 doesn't have a value, just use the list from 2 + entity_1[key] = entity_2_value + # else: + # if neither is a list, do nothing (keep the value from 1) + if (entity_1_is_list or entity_2_is_list) and (key in properties_that_are_sets): + entity_1[key] = list(set(entity_1[key])) else: - entity_1[key] = value + # if entity 1 doesn't have the property, add the property from entity 2 + entity_1[key] = entity_2_value return entity_1 diff --git a/Common/metadata.py b/Common/metadata.py index ec9bfecd..9a467f7d 100644 --- a/Common/metadata.py +++ b/Common/metadata.py @@ -3,7 +3,7 @@ import json from xxhash import xxh64_hexdigest -from Common.kgxmodel import NormalizationScheme +from Common.normalization import NormalizationScheme class Metadata: @@ -122,6 +122,9 @@ def get_build_status(self): def get_graph_version(self): return self.metadata['graph_version'] + def get_source_ids(self): + return [source['source_id'] for source in self.metadata['sources']] + class SourceMetadata(Metadata): diff --git a/Common/neo4j_tools.py b/Common/neo4j_tools.py index 889db44b..0b3b69e6 100644 --- a/Common/neo4j_tools.py +++ b/Common/neo4j_tools.py @@ -37,11 +37,12 @@ def import_csv_files(self, return password_exit_code self.logger.info(f'Importing csv files to neo4j...') - neo4j_import_cmd = ["neo4j-admin", "import", f"--nodes={csv_nodes_filename}", - f"--relationships={csv_edges_filename}", + neo4j_import_cmd = ['neo4j-admin', 'database', 'import', 'full', + f'--nodes={csv_nodes_filename}', + f'--relationships={csv_edges_filename}', '--delimiter=TAB', '--array-delimiter=U+001F', - '--force'] + '--overwrite-destination=true'] import_results: subprocess.CompletedProcess = subprocess.run(neo4j_import_cmd, cwd=graph_directory, capture_output=True) @@ -60,7 +61,7 @@ def load_backup_dump(self, return password_exit_code self.logger.info(f'Loading a neo4j backup dump {dump_file_path}...') - neo4j_load_cmd = ['neo4j-admin', 'load', f'--from={dump_file_path}', '--force'] + neo4j_load_cmd = ['neo4j-admin', 'database', 'load', f'--from-path={dump_file_path}', '--overwrite-destination=true', 'neo4j'] load_results: subprocess.CompletedProcess = subprocess.run(neo4j_load_cmd, capture_output=True) self.logger.info(load_results.stdout) @@ -71,10 +72,23 @@ def load_backup_dump(self, self.logger.error(error_message) return load_results_return_code + def migrate_dump_to_neo4j_5(self): + self.logger.info(f'Migrating db dump to neo4j 5...') + neo4j_migrate_cmd = ['neo4j-admin', 'database', 'migrate', '--force-btree-indexes-to-range', 'neo4j'] + migrate_results: subprocess.CompletedProcess = subprocess.run(neo4j_migrate_cmd, + capture_output=True) + self.logger.info(migrate_results.stdout) + results_return_code = migrate_results.returncode + if results_return_code != 0: + error_message = f'Neo4j migrate subprocess error (ExitCode {results_return_code}): ' \ + f'{migrate_results.stderr.decode("UTF-8")}' + self.logger.error(error_message) + return results_return_code + def create_backup_dump(self, - dump_file_path: str = None): + dump_directory: str = None): self.logger.info(f'Creating a backup dump of the neo4j...') - neo4j_dump_cmd = ['neo4j-admin', 'dump', f'--to={dump_file_path}'] + neo4j_dump_cmd = ['neo4j-admin', 'database', 'dump', 'neo4j', f'--to-path={dump_directory}'] dump_results: subprocess.CompletedProcess = subprocess.run(neo4j_dump_cmd, capture_output=True) self.logger.info(dump_results.stdout) @@ -107,7 +121,7 @@ def __issue_neo4j_command(self, command: str): def set_initial_password(self): self.logger.info('Setting initial password for Neo4j...') - neo4j_cmd = ['neo4j-admin', 'set-initial-password', self.password] + neo4j_cmd = ['neo4j-admin', 'dbms', 'set-initial-password', self.password] neo4j_results: subprocess.CompletedProcess = subprocess.run(neo4j_cmd, capture_output=True) self.logger.info(neo4j_results.stdout) @@ -139,7 +153,7 @@ def add_db_indexes(self): with self.neo4j_driver.session() as session: # node name index - node_name_index_cypher = f'CREATE INDEX node_name_index FOR (n:`{NAMED_THING}`) on (n.name)' + node_name_index_cypher = f'CREATE INDEX node_name_index FOR (n:`{NAMED_THING}`) ON (n.name)' self.logger.info(f'Adding node name index on {NAMED_THING}.name') session.run(node_name_index_cypher).consume() indexes_added += 1 @@ -151,8 +165,8 @@ def add_db_indexes(self): self.logger.info(f'Adding node id indexes for node labels: {node_labels}') for node_label in node_labels: node_label_index = f'node_id_{node_label.replace(":", "_")}' - node_name_index_cypher = f'CREATE CONSTRAINT {node_label_index} ON (n:`{node_label}`) ' \ - f'ASSERT n.id IS UNIQUE' + node_name_index_cypher = f'CREATE CONSTRAINT {node_label_index} FOR (n:`{node_label}`) ' \ + f'REQUIRE n.id IS UNIQUE' session.run(node_name_index_cypher).consume() indexes_added += 1 index_names.append(node_label_index) @@ -227,7 +241,11 @@ def create_neo4j_dump(nodes_filepath: str, edges_output_file=csv_edges_file_path) if logger: logger.info(f'CSV files created for {graph_id}({graph_version})...') - graph_dump_name = f'graph_{graph_version}.db.dump' if graph_version else 'graph.db.dump' + + # would like to do the following, but apparently you can't specify a custom name for the dump now + # graph_dump_name = f'graph_{graph_version}.neo4j5.db.dump' if graph_version else 'graph.neo4j5.db.dump' + # graph_dump_file_path = os.path.join(output_directory, graph_dump_name) + graph_dump_name = 'neo4j.dump' graph_dump_file_path = os.path.join(output_directory, graph_dump_name) if os.path.exists(graph_dump_file_path): if logger: @@ -258,7 +276,7 @@ def create_neo4j_dump(nodes_filepath: str, if stop_exit_code != 0: return False - dump_exit_code = neo4j_access.create_backup_dump(graph_dump_file_path) + dump_exit_code = neo4j_access.create_backup_dump(output_directory) if dump_exit_code != 0: return False diff --git a/Common/normalization.py b/Common/normalization.py index fd43d151..39150eeb 100644 --- a/Common/normalization.py +++ b/Common/normalization.py @@ -3,6 +3,9 @@ import requests import time +from requests.adapters import HTTPAdapter, Retry +from dataclasses import dataclass + from robokop_genetics.genetics_normalization import GeneticsNormalizer from Common.biolink_constants import * from Common.utils import LoggingUtil @@ -15,6 +18,36 @@ # predicate to use when normalization fails FALLBACK_EDGE_PREDICATE = 'biolink:related_to' +@dataclass +class NormalizationScheme: + node_normalization_version: str = 'latest' + edge_normalization_version: str = 'latest' + normalization_code_version: str = NORMALIZATION_CODE_VERSION + strict: bool = True + conflation: bool = False + + def get_composite_normalization_version(self): + composite_normalization_version = f'{self.node_normalization_version}_' \ + f'{self.edge_normalization_version}_{self.normalization_code_version}' + if self.conflation: + composite_normalization_version += '_conflated' + if self.strict: + composite_normalization_version += '_strict' + return composite_normalization_version + + def get_metadata_representation(self): + return {'node_normalization_version': self.node_normalization_version, + 'edge_normalization_version': self.edge_normalization_version, + 'normalization_code_version': self.normalization_code_version, + 'conflation': self.conflation, + 'strict': self.strict} + + +class NormalizationFailedError(Exception): + def __init__(self, error_message: str, actual_error: Exception = None): + self.error_message = error_message + self.actual_error = actual_error + class NodeNormalizer: """ @@ -70,109 +103,95 @@ def __init__(self, self.sequence_variant_normalizer = None self.variant_node_types = None - def hit_node_norm_service(self, curies, retries=0): - resp: requests.models.Response = requests.post(f'{self.node_norm_endpoint}get_normalized_nodes', - json={'curies': curies, - 'conflate': self.conflate_node_types, - 'drug_chemical_conflate': self.conflate_node_types, - 'description': True}) + self.requests_session = self.get_normalization_requests_session() + + def hit_node_norm_service(self, curies): + resp = self.requests_session.post(f'{self.node_norm_endpoint}get_normalized_nodes', + json={'curies': curies, + 'conflate': self.conflate_node_types, + 'drug_chemical_conflate': self.conflate_node_types, + 'description': True}) if resp.status_code == 200: # if successful return the json as an object - return resp.json() - else: - error_message = f'Node norm response code: {resp.status_code}' - if resp.status_code >= 500: - # if 5xx retry 3 times - retries += 1 - if retries == 4: - error_message += ', retried 3 times, giving up..' - self.logger.error(error_message) - resp.raise_for_status() - else: - error_message += f', retrying.. (attempt {retries})' - time.sleep(retries * 3) - self.logger.error(error_message) - return self.hit_node_norm_service(curies, retries) + response_json = resp.json() + if response_json: + return response_json else: - # we should never get a legitimate 4xx response from node norm, - # crash with an error for troubleshooting - if resp.status_code == 422: - error_message += f'(curies: {curies})' - self.logger.error(error_message) - resp.raise_for_status() + error_message = f"Node Normalization service {self.node_norm_endpoint} returned 200 " \ + f"but with an empty result for (curies: {curies})" + raise NormalizationFailedError(error_message=error_message) + else: + error_message = f'Node norm response code: {resp.status_code} (curies: {curies})' + self.logger.error(error_message) + resp.raise_for_status() - def normalize_node_data(self, node_list: list, block_size: int = 1000) -> list: + def normalize_node_data(self, node_list: list, batch_size: int = 1000) -> list: """ - This method calls the NodeNormalization web service to get the normalized identifier and name of the node. - the data comes in as a node list. + This method calls the NodeNormalization web service and normalizes a list of nodes. - :param node_list: A list with items to normalize - :param block_size: the number of curies in the request + :param node_list: A list of unique nodes to normalize + :param batch_size: the number of curies to be sent to NodeNormalization at once :return: """ - self.logger.debug(f'Start of normalize_node_data. items: {len(node_list)}') - - # init the cache - this accumulates all the results from the node norm service - cached_node_norms: dict = {} + # look up all valid biolink node types if needed + # this is used when strict normalization is off to ensure only valid types go into the graph as NODE_TYPES + if not self.strict_normalization and not self.biolink_compliant_node_types: + biolink_lookup = EdgeNormalizer(edge_normalization_version=self.biolink_version) + self.biolink_compliant_node_types = biolink_lookup.get_valid_node_types() - # create a unique set of node ids - tmp_normalize: set = set([node['id'] for node in node_list]) + # make a list of the node ids, we used to deduplicate here, but now we expect the list to be unique ids + to_normalize: list = [node['id'] for node in node_list] - # convert the set to a list so we can iterate through it - to_normalize: list = list(tmp_normalize) - - # init the array index lower boundary + # use indexes and slice to grab batch_size sized chunks of ids from the list start_index: int = 0 - - # get the last index of the list last_index: int = len(to_normalize) - - self.logger.debug(f'{last_index} unique nodes found in this group.') - - # grab chunks of the data frame + chunks_of_ids = [] while True: if start_index < last_index: - # define the end index of the slice - end_index: int = start_index + block_size + end_index: int = start_index + batch_size - # force the end index to be the last index to insure no overflow + # force the end index to be no greater than the last index to ensure no overflow if end_index >= last_index: end_index = last_index - self.logger.debug(f'Working block {start_index} to {end_index}.') - - # collect a slice of records from the data frame - data_chunk: list = to_normalize[start_index: end_index] - - # hit the node norm api - normalization_json = self.hit_node_norm_service(curies=data_chunk) - if normalization_json: - # merge the normalization results with what we have gotten so far - cached_node_norms.update(**normalization_json) - else: - # this shouldn't happen but if the API returns an empty dict instead of nulls, - # assume none of the curies normalize - empty_responses = {curie: None for curie in data_chunk} - cached_node_norms.update(empty_responses) + # collect a slice of block_size curies from the full list + chunks_of_ids.append(to_normalize[start_index: end_index]) # move on down the list - start_index += block_size + start_index += batch_size else: break + # we should be able to do the following, but it's causing RemoteDisconnected errors with node norm + # + # hit the node norm api with the chunks of curies in parallel + # we could try to optimize the number of max_workers for ThreadPoolExecutor more specifically, + # by default python attempts to find a reasonable # based on os.cpu_count() + # with ThreadPoolExecutor() as executor: + # executor_results = executor.map(self.hit_node_norm_service, chunks_of_ids) + # + # normalization_results = list(executor_results) + # for normalization_json, ids in zip(normalization_results, chunks_of_ids): + # if not normalization_json: + # raise NormalizationFailedError(f'!!! Normalization json results missing for ids: {ids}') + # else: + # merge the normalization results into one dictionary + # node_normalization_results.update(**normalization_json) + + # until we can get threading working, hit node norm sequentially + node_normalization_results: dict = {} + for chunk in chunks_of_ids: + results = self.hit_node_norm_service(chunk) + node_normalization_results.update(**results) + # reset the node index node_idx = 0 # node ids that failed to normalize failed_to_normalize: list = [] - # look up valid node types if needed - if not self.strict_normalization and not self.biolink_compliant_node_types: - biolink_lookup = EdgeNormalizer(edge_normalization_version=self.biolink_version) - self.biolink_compliant_node_types = biolink_lookup.get_valid_node_types() - # for each node update the node with normalized information # store the normalized IDs in self.node_normalization_lookup for later look up while node_idx < len(node_list): @@ -217,7 +236,7 @@ def normalize_node_data(self, node_list: list, block_size: int = 1000) -> list: current_node[NODE_TYPES] = list(set(current_node[NODE_TYPES])) # did we get a response from the normalizer - current_node_normalization = cached_node_norms[current_node_id] + current_node_normalization = node_normalization_results[current_node_id] if current_node_normalization is not None: current_node_id_section = current_node_normalization['id'] @@ -341,6 +360,17 @@ def get_current_node_norm_version(self): # this shouldn't happen, raise an exception resp.raise_for_status() + @staticmethod + def get_normalization_requests_session(): + pool_maxsize = max(os.cpu_count(), 10) + s = requests.Session() + retries = Retry(total=8, + backoff_factor=1, + status_forcelist=[502, 503, 504, 403, 429]) + s.mount('https://', HTTPAdapter(max_retries=retries, pool_maxsize=pool_maxsize)) + s.mount('http://', HTTPAdapter(max_retries=retries, pool_maxsize=pool_maxsize)) + return s + class EdgeNormalizationResult: def __init__(self, @@ -398,10 +428,8 @@ def normalize_edge_data(self, """ # find the predicates that have not been normalized yet - predicates_to_normalize = set() - for edge in edge_list: - if edge[PREDICATE] not in self.edge_normalization_lookup: - predicates_to_normalize.add(edge[PREDICATE]) + predicates_to_normalize = {edge[PREDICATE] for edge in edge_list + if edge[PREDICATE] not in self.edge_normalization_lookup} # convert the set to a list so we can iterate through it predicates_to_normalize_list = list(predicates_to_normalize) diff --git a/Common/supplementation.py b/Common/supplementation.py index 9665ceb9..8a27f4f1 100644 --- a/Common/supplementation.py +++ b/Common/supplementation.py @@ -8,11 +8,10 @@ from zipfile import ZipFile from collections import defaultdict from Common.biolink_constants import * -from Common.normalization import FALLBACK_EDGE_PREDICATE +from Common.normalization import FALLBACK_EDGE_PREDICATE, NormalizationScheme from Common.utils import LoggingUtil from Common.kgx_file_writer import KGXFileWriter from Common.kgx_file_normalizer import KGXFileNormalizer -from Common.kgxmodel import NormalizationScheme SNPEFF_SO_PREDICATES = { diff --git a/Common/utils.py b/Common/utils.py index cca7257a..30ef06f1 100644 --- a/Common/utils.py +++ b/Common/utils.py @@ -264,23 +264,30 @@ def get_http_file_modified_date(self, file_url: str): self.logger.error(error_message) raise GetDataPullError(error_message) - def pull_via_http(self, url: str, data_dir: str, is_gzip=False) -> int: + def pull_via_http(self, url: str, data_dir: str, is_gzip=False, saved_file_name: str = None) -> int: """ gets the file from an http stream. :param url: :param data_dir: :param is_gzip: + :param saved_file_name: :return: the number of bytes read """ - # get the filename - data_file: str = url.split('/')[-1] + # is_gzip isn't used on the main branch, but it's probably on some branches or forks, + # lets throw this for a while, so it's not mysteriously removed + if is_gzip: + raise NotImplementedError(f'is_gzip is deprecated, unzip files during parsing not retrieval!') - # init the byte counter + # get the name of the file to write + data_file: str = saved_file_name if saved_file_name else url.split('/')[-1] + + # this tracks how much, if any, of the file is downloaded + # (it's not really used anymore, it could be more simple) byte_counter: int = 0 - # get the file if its not there + # check if the file exists already if not os.path.exists(os.path.join(data_dir, data_file)): self.logger.debug(f'Retrieving {url} -> {data_dir}') @@ -288,17 +295,9 @@ def pull_via_http(self, url: str, data_dir: str, is_gzip=False) -> int: hdr = {'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; Win64; x64)'} req = request.Request(url, headers=hdr) - # get the the file data handle + # get the file data handle file_data = request.urlopen(req) - # is this a gzip file - if is_gzip: - # get a handle to the data - file_data = gzip.GzipFile(fileobj=file_data) - - # strip off the .gz if exists - data_file = data_file.replace('.gz', '') - with open(os.path.join(data_dir, data_file), 'wb') as fp: # specify the buffered data block size block = 131072 diff --git a/Dockerfile b/Dockerfile index a65ba0b6..4031f26d 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,5 +1,5 @@ # A docker container with neo4j, java and python for Data Services -FROM neo4j:4.4.10 +FROM neo4j:5.19.0-community-bullseye RUN apt-get update \ && apt-get -y install python3 \ diff --git a/docker-compose.yml b/docker-compose.yml index 49f50611..a22dd7b7 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,7 +1,5 @@ -version: "3.7" services: orion: - platform: linux/amd64 build: context: . command: [python, /ORION/Common/build_manager.py, all] @@ -12,7 +10,6 @@ services: - ORION_GRAPH_SPEC - ORION_GRAPH_SPEC_URL - ORION_OUTPUT_URL - - ORION_NEO4J_PASSWORD - EDGE_NORMALIZATION_ENDPOINT - NODE_NORMALIZATION_ENDPOINT - NAME_RESOLVER_ENDPOINT diff --git a/graph_specs/ctkp-graph-spec.yaml b/graph_specs/ctkp-graph-spec.yaml new file mode 100644 index 00000000..cffefab6 --- /dev/null +++ b/graph_specs/ctkp-graph-spec.yaml @@ -0,0 +1,9 @@ +graphs: + + - graph_id: CTKP_Automat + graph_name: Clinical Trials KP + graph_description: 'The Clinical Trials KP, created and maintained by the Multiomics Provider, provides information on Clinical Trials, ultimately derived from researcher submissions to clinicaltrials.gov, via the Aggregate Analysis of Clinical Trials (AACT) database). Information on select trials includes the NCT Identifier of the trial, interventions used, diseases/conditions relevant to the trial, adverse events, etc.' + graph_url: https://github.com/NCATSTranslator/Translator-All/wiki/Clinical-Trials-KP + output_format: neo4j + sources: + - source_id: ClinicalTrialsKP \ No newline at end of file diff --git a/helm/orion/renci-values.yaml b/helm/orion/renci-values.yaml index 1b4a2e6c..4a81cd9d 100644 --- a/helm/orion/renci-values.yaml +++ b/helm/orion/renci-values.yaml @@ -46,8 +46,8 @@ orion: normalization: nodeNormEndpoint: https://nodenormalization-sri.renci.org/ edgeNormEndpoint: https://bl-lookup-sri.renci.org/ - bl_version: 4.2.0 - outputURL: https://stars.renci.org/var/plater/bl-4.2.0/ + bl_version: 4.2.1 + outputURL: https://stars.renci.org/var/plater/bl-4.2.1/ pharos: host: pod-host-or-ip diff --git a/parsers/BINDING/src/loadBINDINGDB.py b/parsers/BINDING/src/loadBINDINGDB.py index 226dd2e5..042c240c 100644 --- a/parsers/BINDING/src/loadBINDINGDB.py +++ b/parsers/BINDING/src/loadBINDINGDB.py @@ -2,15 +2,17 @@ import enum import math import json +import requests + from zipfile import ZipFile -import requests as rq -import requests.exceptions +from requests.adapters import HTTPAdapter, Retry from parsers.BINDING.src.bindingdb_constraints import LOG_SCALE_AFFINITY_THRESHOLD #Change the binding affinity threshold here. Default is 10 uM Ki,Kd,EC50,orIC50 -from Common.utils import GetData +from Common.utils import GetData, GetDataPullError from Common.loader_interface import SourceDataLoader from Common.extractor import Extractor -from Common.biolink_constants import PUBLICATIONS, AFFINITY, AFFINITY_PARAMETER, KNOWLEDGE_LEVEL, AGENT_TYPE, KNOWLEDGE_ASSERTION, MANUAL_AGENT +from Common.biolink_constants import PUBLICATIONS, AFFINITY, AFFINITY_PARAMETER, KNOWLEDGE_LEVEL, AGENT_TYPE, \ + KNOWLEDGE_ASSERTION, MANUAL_AGENT # Full Binding Data. @@ -78,9 +80,9 @@ def __init__(self, test_mode: bool = False, source_data_dir: str = None): self.bindingdb_version = self.get_latest_source_version() self.bindingdb_data_url = f"https://www.bindingdb.org/bind/downloads/" - self.BD_archive_file_name = f"BindingDB_All_{self.bindingdb_version}_tsv.zip" - self.BD_file_name = f"BindingDB_All_{self.bindingdb_version}.tsv" - self.data_files = [self.BD_archive_file_name] + self.bd_archive_file_name = f"BindingDB_All_{self.bindingdb_version}_tsv.zip" + self.bd_file_name = f"BindingDB_All.tsv" + self.data_files = [self.bd_archive_file_name] def get_latest_source_version(self) -> str: """ @@ -90,26 +92,35 @@ def get_latest_source_version(self) -> str: if self.bindingdb_version: return self.bindingdb_version try: + s = requests.Session() + retries = Retry(total=5, + backoff_factor=2) + s.mount('https://', HTTPAdapter(max_retries=retries)) + ### The method below gets the database version from the html, but this may be subject to change. ### - binding_db_download_page_response = rq.get('https://www.bindingdb.org/rwd/bind/chemsearch/marvin/Download.jsp',) + binding_db_download_page_response = requests.get('https://www.bindingdb.org/rwd/bind/chemsearch/marvin/Download.jsp', timeout=8) version_index = binding_db_download_page_response.text.index('BindingDB_All_2D_') + 17 bindingdb_version = binding_db_download_page_response.text[version_index:version_index + 6] - except requests.exceptions.SSLError: - # currently the binding db SSL implementation is outdated/broken with the latest packages - self.logger.error(f'BINDING-DB had an SSL error while attempting to retrieve version. Returning default.') - return '202404' + self.bindingdb_version = bindingdb_version + return f"{bindingdb_version}" - return f"{bindingdb_version}" + except requests.exceptions.SSLError: + # BINDING-DB often has ssl related errors with the jsp page + error_message = f'BINDING-DB had an SSL error while attempting to retrieve version..' + except requests.exceptions.Timeout: + error_message = f'BINDING-DB timed out attempting to retrieve version...' + except ValueError: + error_message = f'BINDING-DB get_latest_source_version got a response but could not determine the version' + raise GetDataPullError(error_message=error_message) def get_data(self) -> int: """ Gets the bindingdb data. - """ + # download the zipped data data_puller = GetData() - for source in self.data_files: - source_url = f"{self.bindingdb_data_url}{source}" - data_puller.pull_via_http(source_url, self.data_path) + source_url = f"{self.bindingdb_data_url}{self.bd_archive_file_name}" + data_puller.pull_via_http(source_url, self.data_path) return True def parse_data(self) -> dict: @@ -123,7 +134,8 @@ def parse_data(self) -> dict: data_store= dict() columns = [[x.value,x.name] for x in BD_EDGEUMAN if x.name not in ['PMID','PUBCHEM_AID','PATENT_NUMBER','PUBCHEM_CID','UNIPROT_TARGET_CHAIN']] - for n,row in enumerate(generate_zipfile_rows(os.path.join(self.data_path,self.BD_archive_file_name), self.BD_file_name)): + zipped_data_path = os.path.join(self.data_path, self.bd_archive_file_name) + for n,row in enumerate(generate_zipfile_rows(zipped_data_path, self.bd_file_name)): if n == 0: continue if self.test_mode: diff --git a/parsers/CTD/src/loadCTD.py b/parsers/CTD/src/loadCTD.py index 608f561d..9bd9e75c 100644 --- a/parsers/CTD/src/loadCTD.py +++ b/parsers/CTD/src/loadCTD.py @@ -533,7 +533,7 @@ def convert_predicates(predicate): :return: """ # the capture regex - regex = '\/|\ |\^' + regex = r'\/|\ |\^' # clean up the predicate cleaned_predicate = re.sub(regex, '_', predicate) diff --git a/parsers/FooDB/src/loadFDB.py b/parsers/FooDB/src/loadFDB.py index 2622d4bc..149a78d1 100644 --- a/parsers/FooDB/src/loadFDB.py +++ b/parsers/FooDB/src/loadFDB.py @@ -82,6 +82,7 @@ def get_data(self): # and get a reference to the data gatherer gd: GetData = GetData(self.logger.level) + if(self.full_url_path==None): self.get_latest_source_version() # get all the files noted above file_count, foodb_dir, self.tar_dir_name = gd.get_foodb_files(self.full_url_path, self.data_path, self.archive_name, self.data_files) diff --git a/parsers/GTEx/src/loadGTEx.py b/parsers/GTEx/src/loadGTEx.py index 86a3e8be..dc15ea93 100644 --- a/parsers/GTEx/src/loadGTEx.py +++ b/parsers/GTEx/src/loadGTEx.py @@ -22,6 +22,7 @@ class GTExLoader(SourceDataLoader): has_sequence_variants = True # this probably won't change very often - just hard code it for now + # TODO have GTEX dynamically get version and file url (starting point: https://gtexportal.org/api/v2/metadata/dataset) GTEX_VERSION = "8" # tissue name to uberon curies, the tissue names will match gtex file names diff --git a/parsers/GenomeAlliance/src/loadGenomeAlliance.py b/parsers/GenomeAlliance/src/loadGenomeAlliance.py index 9027f519..059b86d6 100644 --- a/parsers/GenomeAlliance/src/loadGenomeAlliance.py +++ b/parsers/GenomeAlliance/src/loadGenomeAlliance.py @@ -2,6 +2,7 @@ import os import enum import gzip +import requests from Common.utils import GetData from Common.loader_interface import SourceDataLoader @@ -36,8 +37,11 @@ def __init__(self, test_mode: bool = False, source_data_dir: str = None): self.latest_version = None self.latest_version = self.get_latest_source_version() - self.genome_alliance_url = f'https://download.alliancegenome.org/{self.get_latest_source_version()}/ORTHOLOGY-ALLIANCE/COMBINED/' - self.genome_alliance_ortholog_file = 'ORTHOLOGY-ALLIANCE_COMBINED_25.tsv.gz' + #self.genome_alliance_url = f'https://download.alliancegenome.org/{self.get_latest_source_version()}/ORTHOLOGY-ALLIANCE/COMBINED/' + #self.genome_alliance_ortholog_file = 'ORTHOLOGY-ALLIANCE_COMBINED_25.tsv.gz' + + self.genome_alliance_url = 'https://fms.alliancegenome.org/download/' + self.self.genome_alliance_ortholog_file = 'ORTHOLOGY-ALLIANCE_COMBINED.tsv.gz' self.data_files = [self.genome_alliance_ortholog_file] def get_latest_source_version(self) -> str: @@ -46,8 +50,11 @@ def get_latest_source_version(self) -> str: :return: """ - if not self.latest_version: - self.latest_version = '5.3.0' + #if not self.latest_version: + # self.latest_version = '5.3.0' + + self.latest_version = requests.get("https://www.alliancegenome.org/api/releaseInfo").json()['releaseVersion'] + return self.latest_version def get_data(self) -> int: diff --git a/parsers/IntAct/src/loadIA.py b/parsers/IntAct/src/loadIA.py index b77cd40b..84422394 100644 --- a/parsers/IntAct/src/loadIA.py +++ b/parsers/IntAct/src/loadIA.py @@ -84,6 +84,9 @@ def __init__(self, test_mode: bool = False, source_data_dir: str = None): """ super().__init__(test_mode=test_mode, source_data_dir=source_data_dir) + self.ftp_site = 'ftp.ebi.ac.uk' + self.ftp_dir = '/pub/databases/IntAct/current/psimitab/' + self.data_file: str = 'intact.zip' self.source_db: str = 'IntAct Molecular Interaction Database' @@ -100,7 +103,7 @@ def get_latest_source_version(self) -> str: gd = GetData(self.logger.level) # get the file date - ret_val: str = gd.get_ftp_file_date('ftp.ebi.ac.uk', '/pub/databases/IntAct/current/psimitab/', self.data_file) + ret_val: str = gd.get_ftp_file_date(self.ftp_site, self.ftp_dir, self.data_file) # return to the caller return ret_val @@ -112,7 +115,7 @@ def get_data(self) -> int: """ # get a reference to the data gathering class gd: GetData = GetData(self.logger.level) - file_count: int = gd.pull_via_ftp('ftp.ebi.ac.uk', '/pub/databases/IntAct/current/psimitab/', [self.data_file], + file_count: int = gd.pull_via_ftp(self.ftp_site, self.ftp_dir, [self.data_file], self.data_path) return file_count diff --git a/parsers/KinAce/src/loadKinAce.py b/parsers/KinAce/src/loadKinAce.py index 62175f05..e66b373a 100644 --- a/parsers/KinAce/src/loadKinAce.py +++ b/parsers/KinAce/src/loadKinAce.py @@ -1,37 +1,43 @@ import os import enum -from zipfile import ZipFile as zipfile -import pandas as pd +import requests from Common.utils import GetData from Common.loader_interface import SourceDataLoader from Common.extractor import Extractor -from Common.biolink_constants import PUBLICATIONS +from Common.biolink_constants import * -# Full Kinase-Substrate Phosphorylation Data. - -#make this reflect the column that the data is found in -class BD_EDGEUMAN(enum.IntEnum): - KINASE = 1 - SUBSTRATE = 2 - P_SITE = 3 - PRIMARY_SOURCE = 4 - SECONDARY_SOURCE = 5 ############## # Class: Loading kinase-substrate phosphorylation reactions from KinAce # By: Jon-Michael Beasley # Date: 03/7/2024 ############## + +class DATACOLS(enum.IntEnum): + kinase = 0 + substrate = 2 + p_site = 4 + primary_source = 5 + PUBLICATIONS = 7 + + class KinAceLoader(SourceDataLoader): source_id: str = 'KinAce' provenance_id: str = 'infores:kinace' - description = "The KinAce web portal aggregates and visualizes the network of interactions between protein-kinases and their substrates in the human genome." + description = ("The KinAce web portal aggregates and visualizes the network of interactions between " + "protein-kinases and their substrates in the human genome.") source_data_url = "https://kinace.kinametrix.com/session/ff792906de38db0d1c9900ac5882497b/download/download0?w=" - license = "All data and download files in bindingDB are freely available under a 'Creative Commons BY 3.0' license.'" + license = "Creative Commons Attribution 4.0 International" attribution = 'https://kinace.kinametrix.com/#section-about' - parsing_version = '1.0' + parsing_version = '1.2' + + KINACE_INFORES_MAPPING = { + 'PhosphoSitePlus': 'infores:psite-plus', + 'EPSD': 'infores:epsd', + 'iPTMNet': 'infores:iptmnet' + } def __init__(self, test_mode: bool = False, source_data_dir: str = None): """ @@ -41,12 +47,12 @@ def __init__(self, test_mode: bool = False, source_data_dir: str = None): # call the super super().__init__(test_mode=test_mode, source_data_dir=source_data_dir) - self.kinace_version = "2023-10-30" - #self.kinace_version = self.get_latest_source_version() - self.kinace_data_url = f"https://raw.githubusercontent.com/GauravPandeyLab/KinAce/master/data/{self.kinace_version}-kinace-dataset.zip" - - self.archive_file_name = f"{self.kinace_version}-kinace-dataset.zip" - self.interactions_file_name = f"ksi_source.csv" + # self.kinace_version = "2023-10-30" + # KinAce downloaded this data on the 30th of October 2023. However, they have made changes to the files since + # I suggest using the last commit date of the file to version this data set + self.kinace_data_url = f"https://github.com/GauravPandeyLab/KiNet/raw/master/data/ksi_source_full_dataset.csv" + # Let's use the full source for completeness rather than the pruned list + self.interactions_file_name = f"ksi_source_full_dataset.csv" self.data_files = [self.interactions_file_name] def get_latest_source_version(self) -> str: @@ -54,10 +60,14 @@ def get_latest_source_version(self) -> str: gets the latest version of the data :return: """ - if self.kinace_version: - return self.kinace_version - - return f"{self.kinace_version}" + url = (f"https://api.github.com/repos/GauravPandeyLab/KiNet/commits?" + f"path=./data/{self.interactions_file_name}&per_page=1") + response = requests.get(url) + commits = response.json() + last_commit_date = commits[0]['commit']['committer']['date'] + date_version = last_commit_date[:10] + + return f"{date_version}" def get_data(self) -> int: """ @@ -67,10 +77,34 @@ def get_data(self) -> int: data_puller = GetData() source_url = f"{self.kinace_data_url}" data_puller.pull_via_http(source_url, self.data_path) - with zipfile(os.path.join(self.data_path, self.archive_file_name), 'r') as zip_ref: - zip_ref.extract(self.interactions_file_name, self.data_path) + return True + def get_pmids(self, line): + publication_list = [] + + if line[DATACOLS.PUBLICATIONS.value] in ['', 'NA']: + return publication_list + + ids = line[DATACOLS.PUBLICATIONS.value].split(';') + publication_list = ['PMID:' + i.strip() for i in ids if i.strip()] + + return publication_list + + def get_KL_AT_assignments(self, line): + knowledge_level = NOT_PROVIDED + agent_type = NOT_PROVIDED + if line[DATACOLS.primary_source.value] == 'PhosphoSitePlus': + knowledge_level = KNOWLEDGE_ASSERTION + agent_type = MANUAL_AGENT + elif line[DATACOLS.primary_source.value] == 'EPSD': + knowledge_level = NOT_PROVIDED + agent_type = NOT_PROVIDED + elif line[DATACOLS.primary_source.value] == 'iPTMNet': + knowledge_level = NOT_PROVIDED + agent_type = TEXT_MINING_AGENT + return [knowledge_level, agent_type] + def parse_data(self) -> dict: """ Parses the data file for graph nodes/edges @@ -78,35 +112,27 @@ def parse_data(self) -> dict: :return: ret_val: load_metadata """ - print('ok parsing') - # with zipfile(os.path.join(self.data_path, self.archive_file_name), 'r') as zip_ref: - # zip_ref.extract(self.interactions_file_name, self.data_path) - data = pd.read_csv(os.path.join(self.data_path, self.interactions_file_name)) - data = data.groupby(["Kinase", "Substrate"]).agg({"Site": list, "PrimarySource": list, "SecondarySource": list}).reset_index() - # Define a function to deduplicate lists - def deduplicate_list(lst): - lst = [x for x in lst if x == x] - return list(set(lst)) - # Apply deduplication function to each aggregated list - data['Site'] = data.apply(lambda row: list(set([x for x in row['Site'] if x==x])), axis=1) - data['PrimarySource'] = data.apply(lambda row: list(set([x for x in row['PrimarySource'] if x==x])), axis=1) - data['SecondarySource'] = data.apply(lambda row: list(set([x for x in row['SecondarySource'] if x==x])), axis=1) - data.to_csv(os.path.join(self.data_path, self.interactions_file_name)) + extractor = Extractor(file_writer=self.output_file_writer) - with open(os.path.join(self.data_path, self.interactions_file_name), 'rt') as fp: - extractor.csv_extract(fp, - lambda line: f"UniProtKB:{line[1]}", # subject id - lambda line: f"UniProtKB:{line[2]}", # object id - lambda line: "biolink:phosphorylates", # predicate - lambda line: {}, #Node 1 props - lambda line: {}, #Node 2 props - lambda line: { - 'phosphorylation_sites':line[3], - 'primary_sources':line[4], - 'secondary_sources':line[5] - }, #Edge props - comment_character=None, - delim=",", - has_header_row=True - ) - return extractor.load_metadata \ No newline at end of file + + with open(os.path.join(self.data_path, self.interactions_file_name)) as csvfile: + # change to csv reader + extractor.csv_extract(csvfile, + subject_extractor=lambda line: f"UniProtKB:{line[DATACOLS.kinase.value]}", + object_extractor=lambda line: f"UniProtKB:{line[DATACOLS.substrate.value]}", + predicate_extractor=lambda line: "biolink:affects", # predicate + edge_property_extractor=lambda line: + {QUALIFIED_PREDICATE: 'biolink:causes', + OBJECT_DIRECTION_QUALIFIER: 'increased', + OBJECT_ASPECT_QUALIFIER: 'phosphorylation', + 'phosphorylation_sites': [line[DATACOLS.p_site.value]], + KNOWLEDGE_LEVEL: self.get_KL_AT_assignments(line)[0], + AGENT_TYPE: self.get_KL_AT_assignments(line)[1], + PRIMARY_KNOWLEDGE_SOURCE: + self.KINACE_INFORES_MAPPING.get(line[DATACOLS.primary_source.value], None), + AGGREGATOR_KNOWLEDGE_SOURCES: [self.provenance_id], + PUBLICATIONS: self.get_pmids(line)}, + has_header_row=True, + delim=',' + ) + return extractor.load_metadata diff --git a/parsers/LINCS/src/loadLINCS.py b/parsers/LINCS/src/loadLINCS.py new file mode 100644 index 00000000..f71e16a7 --- /dev/null +++ b/parsers/LINCS/src/loadLINCS.py @@ -0,0 +1,95 @@ +import os +import enum + +from Common.extractor import Extractor +from Common.loader_interface import SourceDataLoader +from Common.biolink_constants import * +from Common.prefixes import PUBCHEM_COMPOUND +from Common.utils import GetData + + +class GENERICDATACOLS(enum.IntEnum): + SOURCE_ID = 2 + SOURCE_LABEL = 3 + TARGET_ID = 5 + TARGET_LABEL = 6 + PREDICATE = 7 + + +PREDICATE_MAPPING = { + "in_similarity_relationship_with": "biolink:chemically_similar_to", + "negatively_regulates": "RO:0002212", + "positively_regulates": "RO:0002213" +} + + +############## +# Class: LINCS loader +# +# By: James Chung +# Date: 10/30/2024 +# Desc: Class that loads/parses the data in Library of Integrated Network-Based Cellular Signatures. +# +############## +class LINCSLoader(SourceDataLoader): + + source_id: str = 'LINCS' + provenance_id: str = 'infores:lincs' + parsing_version: str = '1.0' + + def __init__(self, test_mode: bool = False, source_data_dir: str = None): + """ + :param test_mode - sets the run into test mode + :param source_data_dir - the specific storage directory to save files in + """ + super().__init__(test_mode=test_mode, source_data_dir=source_data_dir) + + self.data_url = 'https://stars.renci.org/var/data_services/LINCS/' + self.edge_file = "LINCS.lookup.edges.csv" + self.data_files = [self.edge_file] + + def get_latest_source_version(self) -> str: + # The KG was generated from Data Distillery KG. There was no version defined. + latest_version = 'v1.0' + return latest_version + + def get_data(self) -> bool: + source_data_url = f'{self.data_url}{self.edge_file}' + data_puller = GetData() + data_puller.pull_via_http(source_data_url, self.data_path) + return True + + def parse_data(self) -> dict: + """ + Parses the data file for graph nodes/edges + + :return: ret_val: load_metadata + """ + extractor = Extractor(file_writer=self.output_file_writer) + lincs_file: str = os.path.join(self.data_path, self.edge_file) + with open(lincs_file, 'rt') as fp: + extractor.csv_extract(fp, + lambda line: self.resolve_id(line[GENERICDATACOLS.SOURCE_ID.value]), # source id + lambda line: self.resolve_id(line[GENERICDATACOLS.TARGET_ID.value]), # target id + lambda line: PREDICATE_MAPPING[line[GENERICDATACOLS.PREDICATE.value]], # predicate extractor + lambda line: {}, # subject properties + lambda line: {}, # object properties + lambda line: self.get_edge_properties(), # edge properties + comment_character='#', + delim=',', + has_header_row=True) + return extractor.load_metadata + + @staticmethod + def resolve_id(idstring: str): + if idstring.startswith("PUBCHEM"): + return idstring.replace("PUBCHEM", PUBCHEM_COMPOUND) + return idstring + + def get_edge_properties(self): + properties = { + PRIMARY_KNOWLEDGE_SOURCE: self.provenance_id, + KNOWLEDGE_LEVEL: KNOWLEDGE_ASSERTION, + AGENT_TYPE: DATA_PIPELINE + } + return properties diff --git a/parsers/PHAROS/src/loadPHAROS.py b/parsers/PHAROS/src/loadPHAROS.py index 6fc62bef..d1ddd298 100644 --- a/parsers/PHAROS/src/loadPHAROS.py +++ b/parsers/PHAROS/src/loadPHAROS.py @@ -1,7 +1,6 @@ import os import argparse import re -import requests from Common.loader_interface import SourceDataLoader, SourceDataBrokenError, SourceDataFailedError from Common.kgxmodel import kgxnode, kgxedge @@ -94,6 +93,7 @@ def __init__(self, test_mode: bool = False, source_data_dir: str = None): self.source_db = 'Target Central Resource Database' self.pharos_db = None self.genetic_association_predicate = 'WIKIDATA_PROPERTY:P2293' + self.target_for_predicate = "biolink:target_for" def get_latest_source_version(self) -> str: """ @@ -216,9 +216,13 @@ def parse_gene_to_disease(self) -> (int, int): self.output_file_writer.write_kgx_node(gene_node) if edge_provenance: + if edge_provenance == "infores:drugcentral": + assigned_predicate = self.target_for_predicate + else: + assigned_predicate = self.genetic_association_predicate gene_to_disease_edge = kgxedge(subject_id=gene_id, object_id=disease_id, - predicate=self.genetic_association_predicate, + predicate=assigned_predicate, edgeprops=edge_properties, primary_knowledge_source=edge_provenance, aggregator_knowledge_sources=[self.provenance_id]) @@ -381,11 +385,11 @@ def get_edge_props(self, result) -> (str, list, dict, str): # if there was affinity data save it affinity = result['affinity'] if affinity is not None and affinity != '': - props['affinity'] = float(affinity) + props[AFFINITY] = float(affinity) affinity_paramater = result['affinity_parameter'] if affinity_paramater: - props['affinity_parameter'] = f'p{result["affinity_parameter"]}' + props[AFFINITY_PARAMETER] = f'p{result["affinity_parameter"]}' # return to the caller return predicate, pmids, props, provenance diff --git a/parsers/Reactome/src/loadReactome.py b/parsers/Reactome/src/loadReactome.py index dac5b697..a816269b 100755 --- a/parsers/Reactome/src/loadReactome.py +++ b/parsers/Reactome/src/loadReactome.py @@ -109,9 +109,12 @@ def __init__(self, test_mode: bool = False, source_data_dir: str = None): super().__init__(test_mode=test_mode, source_data_dir=source_data_dir) self.version_url: str = 'https://reactome.org/about/news' + # we'll rename the neo4j dump as we download it to make neo4j usage easier + # (community edition only allows one database, having just one named 'neo4j' helps) self.neo4j_dump_file = 'reactome.graphdb.dump' + self.saved_neo4j_dump_file = 'neo4j.dump' self.data_url = 'https://reactome.org/download/current/' - self.data_files = [self.neo4j_dump_file] + self.data_files = [self.saved_neo4j_dump_file] self.triple_file: str = 'reactomeContents_CriticalTriples.csv' self.triple_path = os.path.dirname(os.path.abspath(__file__)) @@ -142,15 +145,25 @@ def get_latest_source_version(self) -> str: def get_data(self) -> bool: gd: GetData = GetData(self.logger.level) - for dt_file in self.data_files: - gd.pull_via_http(f'{self.data_url}{dt_file}', - self.data_path) + gd.pull_via_http(f'{self.data_url}{self.neo4j_dump_file}', + self.data_path, saved_file_name=self.saved_neo4j_dump_file) return True def parse_data(self): neo4j_tools = Neo4jTools() - neo4j_tools.load_backup_dump(f'{self.data_path}/{self.neo4j_dump_file}') - neo4j_tools.start_neo4j() + + neo4j_status_code = neo4j_tools.load_backup_dump(f'{self.data_path}/') + if neo4j_status_code: + raise SystemError('Neo4j failed to load the backup dump.') + + neo4j_status_code = neo4j_tools.migrate_dump_to_neo4j_5() + if neo4j_status_code: + raise SystemError('Neo4j failed to migrate the dump to neo4j 5.') + + neo4j_status_code = neo4j_tools.start_neo4j() + if neo4j_status_code: + raise SystemError('Neo4j failed to start.') + neo4j_tools.wait_for_neo4j_initialization() neo4j_driver = neo4j_tools.neo4j_driver diff --git a/parsers/ViralProteome/src/loadUniRef.py b/parsers/ViralProteome/src/loadUniRef.py index 7bd0d900..efdbbc41 100644 --- a/parsers/ViralProteome/src/loadUniRef.py +++ b/parsers/ViralProteome/src/loadUniRef.py @@ -90,6 +90,7 @@ def get_uniref_data(self) -> set: # are we in test mode if not self.test_mode: # get the list of taxa + #TODO: It looks like gd.get_ncbi_taxon_id_set doesn't resolve. It was removed in https://github.com/RobokopU24/ORION/commit/d3860356f2dac5779d1c15d651e644921dc48f88 target_taxon_set: set = gd.get_ncbi_taxon_id_set(self.data_path, self.TYPE_VIRUS) else: # create a test set of target taxa diff --git a/parsers/clinicaltrials/src/loadCTKP.py b/parsers/clinicaltrials/src/loadCTKP.py new file mode 100644 index 00000000..4b95ba1f --- /dev/null +++ b/parsers/clinicaltrials/src/loadCTKP.py @@ -0,0 +1,222 @@ +import enum +import os +import requests +import json + +from Common.biolink_constants import * +from Common.extractor import Extractor +from Common.utils import GetData +from Common.loader_interface import SourceDataLoader +from Common.utils import GetDataPullError + + +# the data header columns the nodes files are: +class NODESDATACOLS(enum.IntEnum): + ID = 0 + NAME = 1 + CATEGORY = 2 + + +# the data header columns for the edges file are: +class EDGESDATACOLS(enum.IntEnum): + ID = 0 + SUBJECT = 1 + PREDICATE = 2 + OBJECT = 3 + SUBJECT_NAME = 4 + OBJECT_NAME = 5 + CATEGORY = 6 + KNOWLEDGE_LEVEL = 7 + AGENT_TYPE = 8 + NCTID = 9 + PHASE = 10 + PRIMARY_PURPOSE = 11 + INTERVENTION_MODEL = 12 + TIME_PERSPECTIVE = 13 + OVERALL_STATUS = 14 + START_DATE = 15 + ENROLLMENT = 16 + ENROLLMENT_TYPE = 17 + AGE_RANGE = 18 + CHILD = 19 + ADULT = 20 + OLDER_ADULT = 21 + UNII = 22 + + +class CTKPLoader(SourceDataLoader): + source_id: str = "ClinicalTrialsKP" + provenance_id: str = "infores:biothings-multiomics-clinicaltrials" + description = "The Clinical Trials KP, created and maintained by the Multiomics Provider, provides information on Clinical Trials, ultimately derived from researcher submissions to clinicaltrials.gov, via the Aggregate Analysis of Clinical Trials (AACT) database). Information on select trials includes the NCT Identifier of the trial, interventions used, diseases/conditions relevant to the trial, adverse events, etc." + source_data_url = "https://aact.ctti-clinicaltrials.org/" + license = "https://github.com/ctti-clinicaltrials/aact/blob/dev/LICENSE" + attribution = "" + parsing_version = "1.0" + + def __init__(self, test_mode: bool = False, source_data_dir: str = None): + """ + :param test_mode - sets the run into test mode + :param source_data_dir - the specific storage directory to save files in + """ + super().__init__(test_mode=test_mode, source_data_dir=source_data_dir) + + # until we can use the manifest to determine versions and source data file locations we'll hard code it + self.node_file_name = 'clinical_trials_kg_nodes_v2.2.10.tsv' + self.edge_file_name = 'clinical_trials_kg_edges_v2.2.10.tsv' + self.data_url = "https://db.systemsbiology.net/gestalt/KG/" + + # once we use the manifest, we'll rename the files while downloading and they can be called something generic + # self.node_file_name = 'nodes.tsv' + # self.edge_file_name = 'edges.tsv' + + self.data_files = [ + self.node_file_name, + self.edge_file_name + ] + + self.aact_infores = "infores:aact" + self.ctgov_infores = "infores:clinicaltrials" + self.treats_predicate = "biolink:treats" + self.source_record_url = "https://db.systemsbiology.net/gestalt/cgi-pub/KGinfo.pl?id=" + + def get_latest_source_version(self) -> str: + latest_version = "2.2.10" + # we'd like to do this but for now we're using the dev version which is not in the manifest + # latest_version = self.get_manifest()['version'] + return latest_version + + @staticmethod + def get_manifest(): + manifest_response = requests.get('https://github.com/multiomicsKP/clinical_trials_kp/blob/main/manifest.json') + if manifest_response.status_code == 200: + manifest = manifest_response.json() + return manifest + else: + manifest_response.raise_for_status() + + def get_data(self) -> int: + """ + manifest = self.get_manifest() + source_data_urls = manifest['dumper']['data_url'] + nodes_url = None + edges_url = None + for data_url in source_data_urls: + if 'nodes' in data_url: + nodes_url = data_url + elif 'edges' in data_url: + edges_url = data_url + if not nodes_url and edges_url: + raise GetDataPullError(f'Could not determine nodes and edges files in CTKP manifest data urls: {source_data_urls}') + data_puller = GetData() + data_puller.pull_via_http(nodes_url, self.data_path, saved_file_name=self.node_file_name) + data_puller.pull_via_http(edges_url, self.data_path, saved_file_name=self.edge_file_name) + """ + data_puller = GetData() + for source in self.data_files: + source_url = f"{self.data_url}{source}" + data_puller.pull_via_http(source_url, self.data_path) + return True + + def parse_data(self) -> dict: + """ + Parses the data file for graph nodes/edges and writes them to the KGX files. + + :return: ret_val: record counts + """ + + extractor = Extractor(file_writer=self.output_file_writer) + + # get the nodes + # it's not really necessary because normalization will overwrite the only information here (name and category) + nodes_file: str = os.path.join(self.data_path, self.node_file_name) + with open(nodes_file, 'r') as fp: + extractor.csv_extract(fp, + lambda line: line[NODESDATACOLS.ID.value], # subject id + lambda line: None, # object id + lambda line: None, # predicate + lambda line: {NAME: line[NODESDATACOLS.NAME.value], + NODE_TYPES: line[NODESDATACOLS.CATEGORY.value]}, # subject props + lambda line: {}, # object props + lambda line: {}, # edgeprops + comment_character=None, + delim='\t', + has_header_row=True) + + edges_file: str = os.path.join(self.data_path, self.edge_file_name) + with open(edges_file, 'r') as fp: + extractor.csv_extract(fp, + lambda line: line[EDGESDATACOLS.SUBJECT.value], # subject id + lambda line: line[EDGESDATACOLS.OBJECT.value], # object id + lambda line: line[EDGESDATACOLS.PREDICATE.value], # predicate + lambda line: {}, # subject props + lambda line: {}, # object props + lambda line: self.get_edge_properties(line), # edgeprops + comment_character=None, + delim='\t', + has_header_row=True) + + return extractor.load_metadata + + def get_edge_properties(self, line): + + supporting_studies = [] + pred = str(line[EDGESDATACOLS.PREDICATE.value]) + nctids = str(line[EDGESDATACOLS.NCTID.value]).split(',') + phases = str(line[EDGESDATACOLS.PHASE.value]).split(',') + status = str(line[EDGESDATACOLS.OVERALL_STATUS.value]).split(',') + enroll = str(line[EDGESDATACOLS.ENROLLMENT.value]).split(',') + en_typ = str(line[EDGESDATACOLS.ENROLLMENT_TYPE.value]).split(',') + max_phase = 0 + elevate_to_prediction = False + for nctid, phase, stat, enrollment, enrollment_type in zip(nctids, phases, status, enroll, en_typ): + if float(phase) > max_phase: + max_phase = float(phase) + try: + enrollment = int(enrollment) + except ValueError: + enrollment = -1 + + supporting_study_attributes = { + "id": nctid, + "tested_intervention": "unsure" if pred == "biolink:mentioned_in_trials_for" else "yes", + "phase": phase, + "status": stat, + "study_size": enrollment + } + # convert to TRAPI format + supporting_studies.append( + {"attribute_type_id": HAS_SUPPORTING_STUDY_RESULT, + "value": nctid, + "attributes": [{"attribute_type_id": key, + "value": value} for key, value in supporting_study_attributes.items()]}) + + # if pred == "biolink:in_clinical_trials_for" and max_phase >= 4: + # elevate_to_prediction = True + + if pred == self.treats_predicate: + primary_knowledge_source = self.provenance_id + aggregator_knowledge_sources = [self.aact_infores] + supporting_data_source = self.ctgov_infores + else: + primary_knowledge_source = self.ctgov_infores + aggregator_knowledge_sources = [self.aact_infores, self.provenance_id] + supporting_data_source = None + + edge_attributes = { + EDGE_ID: line[EDGESDATACOLS.ID.value], + PRIMARY_KNOWLEDGE_SOURCE: primary_knowledge_source, + AGGREGATOR_KNOWLEDGE_SOURCES: aggregator_knowledge_sources, + KNOWLEDGE_LEVEL: line[EDGESDATACOLS.KNOWLEDGE_LEVEL.value], + AGENT_TYPE: line[EDGESDATACOLS.AGENT_TYPE.value], + MAX_RESEARCH_PHASE: str(float(max_phase)), + "elevate_to_prediction": elevate_to_prediction, # this isn't in biolink so not using a constant for now + # note source_record_urls should be paired with specific knowledge sources but currently + # there's no implementation for that, just pass it as a normal attribute for now + "source_record_urls": [self.source_record_url + line[EDGESDATACOLS.ID.value]] + } + if supporting_data_source: + edge_attributes[SUPPORTING_DATA_SOURCE] = supporting_data_source + # to handle nested attributes, use the "attributes" property which supports TRAPI attributes as json strings + if supporting_studies: + edge_attributes["attributes"] = [json.dumps(study) for study in supporting_studies] + return edge_attributes diff --git a/parsers/drugcentral/src/loaddrugcentral.py b/parsers/drugcentral/src/loaddrugcentral.py index bd828a85..50b56b8b 100644 --- a/parsers/drugcentral/src/loaddrugcentral.py +++ b/parsers/drugcentral/src/loaddrugcentral.py @@ -8,7 +8,7 @@ from Common.loader_interface import SourceDataLoader, SourceDataFailedError, SourceDataBrokenError from Common.utils import GetData, snakify from Common.biolink_constants import PRIMARY_KNOWLEDGE_SOURCE, AGGREGATOR_KNOWLEDGE_SOURCES, PUBLICATIONS, \ - KNOWLEDGE_LEVEL, KNOWLEDGE_ASSERTION, AGENT_TYPE, MANUAL_AGENT + KNOWLEDGE_LEVEL, KNOWLEDGE_ASSERTION, AGENT_TYPE, MANUAL_AGENT, AFFINITY, AFFINITY_PARAMETER from Common.prefixes import DRUGCENTRAL, MEDDRA, UMLS, UNIPROTKB, PUBMED from Common.predicates import DGIDB_PREDICATE_MAPPING from Common.db_connectors import PostgresConnector @@ -186,8 +186,8 @@ def get_bioactivity_attributes(self, line): edge_props = {KNOWLEDGE_LEVEL: KNOWLEDGE_ASSERTION, AGENT_TYPE: MANUAL_AGENT} if line['act_type'] is not None: - edge_props['affinity'] = line['act_value'] - edge_props['affinityParameter'] = line['act_type'] + edge_props[AFFINITY] = line['act_value'] + edge_props[AFFINITY_PARAMETER] = f"p{line['act_type']}" if line['act_source'] == 'SCIENTIFIC LITERATURE' and line['act_source_url'] is not None: papersource = line['act_source_url'] if papersource.startswith('http://www.ncbi.nlm.nih.gov/pubmed'): diff --git a/parsers/hgnc/src/loadHGNC.py b/parsers/hgnc/src/loadHGNC.py index 9eee0d56..09a0df57 100644 --- a/parsers/hgnc/src/loadHGNC.py +++ b/parsers/hgnc/src/loadHGNC.py @@ -1,6 +1,7 @@ import argparse import csv import os +import requests from Common.utils import GetData from Common.loader_interface import SourceDataLoader @@ -10,7 +11,7 @@ ############## -# Class: HGNC metabolites loader +# Class: HGNC loader # # By: Phil Owen # Date: 3/31/2021 @@ -21,10 +22,10 @@ class HGNCLoader(SourceDataLoader): source_id: str = HGNC provenance_id: str = 'infores:hgnc' description = "The HUGO Gene Nomenclature Committee (HGNC) database provides open access to HGNC-approved unique symbols and names for human genes, gene groups, and associated resources, including links to genomic, proteomic and phenotypic information." - source_data_url = "ftp://ftp.ebi.ac.uk/pub/databases/genenames/hgnc/archive/" + source_data_url = "https://www.genenames.org/download/archive/" license = "https://www.genenames.org/about/" attribution = "https://www.genenames.org/about/" - parsing_version: str = '1.2' + parsing_version: str = '1.3' def __init__(self, test_mode: bool = False, source_data_dir: str = None): """ @@ -32,17 +33,10 @@ def __init__(self, test_mode: bool = False, source_data_dir: str = None): :param source_data_dir - the specific storage directory to save files in """ super().__init__(test_mode=test_mode, source_data_dir=source_data_dir) - + self.source_db = 'HUGO Gene Nomenclature Committee' self.complete_set_file_name = 'hgnc_complete_set.txt' - # self.gene_groups_file_name = 'hgnc_genes_in_groups.txt' - self.data_files: list = [self.complete_set_file_name, - # self.gene_groups_file_name - ] - self.test_mode: bool = test_mode - self.source_db: str = 'HUGO Gene Nomenclature Committee' - - self.ftp_site = 'ftp.ebi.ac.uk' - self.ftp_dir = '/pub/databases/genenames/hgnc/tsv/' + self.data_file = self.complete_set_file_name + self.data_url = "https://storage.googleapis.com/public-download-files/hgnc/tsv/tsv/" def get_latest_source_version(self) -> str: """ @@ -50,27 +44,25 @@ def get_latest_source_version(self) -> str: :return: the data version """ - data_puller = GetData() - # HGNC files change very frequently, excluding the day makes sure we only update it once per month - data_file_date = data_puller.get_ftp_file_date(self.ftp_site, - self.ftp_dir, - self.data_files[0], - exclude_day=True) - return data_file_date + headers = {"Accept": "application/json"} + info_response = requests.get('https://www.genenames.org/rest/info', headers=headers) + if info_response.ok: + info_json = info_response.json() + modified_date = info_json['lastModified'] + latest_version = modified_date.split('T')[0] + return latest_version + else: + info_response.raise_for_status() def get_data(self) -> int: """ Gets the HGNC data from two sources. """ - # get a reference to the data gathering class - gd: GetData = GetData(self.logger.level) - file_count: int = gd.pull_via_ftp(self.ftp_site, self.ftp_dir, [self.complete_set_file_name], self.data_path) - - # get the gene groups dataset - # byte_count: int = gd.pull_via_http('https://www.genenames.org/cgi-bin/genegroup/download-all/' + self.self.gene_groups_file_name, self.data_path) - - return file_count + gd: GetData = GetData() + data_file_url = self.data_url + self.data_file + gd.pull_via_http(url=data_file_url, data_dir=self.data_path) + return True def parse_data(self) -> dict: """ diff --git a/parsers/monarchkg/src/loadMonarchKG.py b/parsers/monarchkg/src/loadMonarchKG.py index 2417cb2f..73b4ee10 100644 --- a/parsers/monarchkg/src/loadMonarchKG.py +++ b/parsers/monarchkg/src/loadMonarchKG.py @@ -2,11 +2,12 @@ import os import tarfile import orjson +import requests from Common.loader_interface import SourceDataLoader from Common.kgxmodel import kgxedge from Common.biolink_constants import * -from Common.utils import GetData +from Common.utils import GetData, GetDataPullError ############## @@ -29,7 +30,7 @@ def __init__(self, test_mode: bool = False, source_data_dir: str = None): # there is a /latest/ for this url, but without a valid get_latest_source_version function, # it could create a mismatch, pin to this version for now - self.data_url = 'https://data.monarchinitiative.org/monarch-kg-dev/2024-03-18/' + self.data_url = 'https://data.monarchinitiative.org/monarch-kg-dev/latest/' self.monarch_graph_archive = 'monarch-kg.jsonl.tar.gz' self.monarch_edge_file_archive_path = 'monarch-kg_edges.jsonl' self.data_files = [self.monarch_graph_archive] @@ -63,9 +64,17 @@ def __init__(self, test_mode: bool = False, source_data_dir: str = None): } def get_latest_source_version(self) -> str: - # possible to retrieve from /latest/index.html with beautifulsoup or some html parser but not ideal, - # planning to try to set up a better method with owners - latest_version = '2024-03-18' + """ + Gets the name of latest monarch kg version from metadata. + """ + latest_version = None + try: + metadata_yaml : requests.Response = requests.get("https://data.monarchinitiative.org/monarch-kg-dev/latest/metadata.yaml") + for line in metadata_yaml.text.split('\n'): + if("kg-version:" in line): latest_version = line.replace("kg-version:","").strip() + if(latest_version==None):raise ValueError("Cannot find 'kg-version' in Monarch KG metadata yaml.") + except Exception as e: + raise GetDataPullError(error_message=f'Unable to determine latest version for Monarch KG: {e}') return latest_version def get_data(self) -> bool: @@ -85,6 +94,10 @@ def parse_data(self) -> dict: skipped_ignore_knowledge_source = 0 skipped_undesired_predicate = 0 full_tar_path = os.path.join(self.data_path, self.monarch_graph_archive) + protected_edge_labels = [SUBJECT_ID, OBJECT_ID, PREDICATE,PRIMARY_KNOWLEDGE_SOURCE, + AGGREGATOR_KNOWLEDGE_SOURCES, KNOWLEDGE_LEVEL, AGENT_TYPE, + PUBLICATIONS, "biolink:primary_knowledge_source", "biolink:aggregator_knowledge_source"] + with tarfile.open(full_tar_path, 'r') as tar_files: with tar_files.extractfile(self.monarch_edge_file_archive_path) as edges_file: for line in edges_file: @@ -116,13 +129,14 @@ def parse_data(self) -> dict: KNOWLEDGE_LEVEL: monarch_edge[KNOWLEDGE_LEVEL] if KNOWLEDGE_LEVEL in monarch_edge else NOT_PROVIDED, AGENT_TYPE: monarch_edge[AGENT_TYPE] if AGENT_TYPE in monarch_edge else NOT_PROVIDED } + if monarch_edge[PUBLICATIONS]: edge_properties[PUBLICATIONS] = monarch_edge[PUBLICATIONS] + for edge_attribute in monarch_edge: - if '_qualifier' in edge_attribute and monarch_edge[edge_attribute]: + if edge_attribute not in protected_edge_labels and monarch_edge[edge_attribute]: edge_properties[edge_attribute] = monarch_edge[edge_attribute] - elif edge_attribute == QUALIFIED_PREDICATE and monarch_edge[QUALIFIED_PREDICATE]: - edge_properties[QUALIFIED_PREDICATE] = monarch_edge[QUALIFIED_PREDICATE] + output_edge = kgxedge( subject_id=subject_id, predicate=predicate, diff --git a/parsers/panther/src/loadPanther.py b/parsers/panther/src/loadPanther.py index d32133a5..ec8144f0 100644 --- a/parsers/panther/src/loadPanther.py +++ b/parsers/panther/src/loadPanther.py @@ -115,6 +115,7 @@ def get_data(self) -> int: # do the real thing if we arent in debug mode if not self.test_mode: # get the complete data set + #TODO make these class level variables. file_count: int = gd.pull_via_ftp('ftp.pantherdb.org', f'/sequence_classifications/{self.data_version}/PANTHER_Sequence_Classification_files/', [self.data_file], self.data_path) else: file_count: int = 1 diff --git a/parsers/yeast/src/loadHistoneMap.py b/parsers/yeast/src/loadHistoneMap.py index 617462e7..84859adc 100644 --- a/parsers/yeast/src/loadHistoneMap.py +++ b/parsers/yeast/src/loadHistoneMap.py @@ -275,9 +275,8 @@ def fetch_histone_data(self, "CTD:increases_abundance_of"] self.logger.debug('Histone Modifications Mapped to GO Terms!') - csv_fname = f"HistonePTM2GO.csv" histonePTM2GO_df = pd.DataFrame.from_dict(histonePTM2GO) - histonePTM2GO_df.to_csv(os.path.join(output_directory, csv_fname), encoding="utf-8-sig", index=False) + histonePTM2GO_df.to_csv(os.path.join(output_directory, self.histone_mod_to_go_term_file_name), encoding="utf-8-sig", index=False) for chr in chromosome_lengths.keys(): m = int(chromosome_lengths[chr]) for i in range(m): # Create loci nodes for chromosomes @@ -303,7 +302,7 @@ def fetch_histone_data(self, data['histoneMod'].append(ptm) genomelocidf = pd.DataFrame(data) self.logger.debug('Histone Modifications Loci Collected!') - genomelocidf.to_csv(os.path.join(output_directory, HISTONE_LOCI_FILE), encoding="utf-8-sig", index=False) + genomelocidf.to_csv(os.path.join(output_directory, self.histone_mod_list_file_name), encoding="utf-8-sig", index=False) if not generate_gene_mapping: return @@ -336,5 +335,4 @@ def fetch_histone_data(self, genomelocidf = genomelocidf.merge(just_windows, how='inner', on=['chromosomeID', 'start', 'end', 'loci']) self.logger.debug(f"Histone Modifications Mapping Complete!") - csv_f3name = f"HistoneMod2Gene.csv" - genomelocidf.to_csv(os.path.join(output_directory, csv_f3name), encoding="utf-8-sig", index=False) + genomelocidf.to_csv(os.path.join(output_directory, self.histone_mod_to_gene_file_name), encoding="utf-8-sig", index=False) diff --git a/requirements.txt b/requirements.txt index 95a54b3c..03a7ca86 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,19 +1,19 @@ -pandas==2.2.1 +pandas==2.2.2 requests==2.32.3 requests-toolbelt>=1.0.0 -pytest==8.1.1 -git+https://github.com/ObesityHub/robokop-genetics.git +pytest==8.2.0 +robokop-genetics==0.5.0 # intermine is on pypi but as of 6/23 it's broken for python 3.10+, this fork fixes the issue git+https://github.com/EvanDietzMorris/intermine-ws-python.git jsonlines==4.0.0 -pyyaml==6.0 -beautifulsoup4==4.11.1 +pyyaml==6.0.1 +beautifulsoup4==4.12.3 psycopg2-binary==2.9.9 -orjson==3.9.15 +orjson==3.10.3 xxhash==3.4.1 -mysql-connector-python==8.3.0 -neo4j==5.10.0 +mysql-connector-python==9.1.0 +neo4j==5.20.0 pyoxigraph==0.3.22 -curies==0.7.8 -prefixmaps==0.2.2 -bmt==1.2.1 +curies==0.7.9 +prefixmaps==0.2.4 +bmt==1.4.1 \ No newline at end of file diff --git a/set_up_test_env.sh b/set_up_test_env.sh index 2cf5a754..1ef6edd3 100644 --- a/set_up_test_env.sh +++ b/set_up_test_env.sh @@ -26,8 +26,11 @@ export PYTHONPATH="$PYTHONPATH:$PWD" #The following environment variables are optional -export EDGE_NORMALIZATION_ENDPOINT=https://bl-lookup-sri.renci.org/ -export NODE_NORMALIZATION_ENDPOINT=https://nodenormalization-sri.renci.org/ -export NAME_RESOLVER_ENDPOINT=https://name-resolution-sri.renci.org/ -export ORION_OUTPUT_URL=https://localhost/ # this is currently only used to generate metadata -export BL_VERSION=4.1.6 +# export EDGE_NORMALIZATION_ENDPOINT=https://bl-lookup-sri.renci.org/ +# export NODE_NORMALIZATION_ENDPOINT=https://nodenormalization-sri.renci.org/ +# export NAME_RESOLVER_ENDPOINT=https://name-resolution-sri.renci.org/ +# export ORION_OUTPUT_URL=https://localhost/ # this is currently only used to generate metadata +# export BL_VERSION=4.2.1 + +# if you are building your own docker image and issues occur, setting the correct platform may help +# export DOCKER_PLATFORM=linux/arm64