Skip to content

Commit

Permalink
initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
jdr0887 committed Jan 23, 2025
1 parent b4345bd commit ca8cee0
Show file tree
Hide file tree
Showing 3 changed files with 180 additions and 0 deletions.
49 changes: 49 additions & 0 deletions cfde-config.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
---
name: "MetabolomicsWorkbench"
version: "2024-05-08"
provenance_id: "infores:metabolomics_workbench"
predicate_mapping:
produces: "biolink:produces"
causally_influences: "biolink:produces"
correlated_with_condition: "biolink:correlated_with"
edge_file: "MW.edges.csv"
node_files:
- node_file:
name: "MW.Anatomy.nodes.csv"
type: "Anatomy"
primary_id_column: "UBERON"
secondary_id_column: "CHV"
- node_file:
name: "MW.Disease or Phenotype.nodes.csv"
type: "Disease or Phenotype"
primary_id_column: "MONDO"
secondary_id_column: "CHV"
- node_file:
name: "MW.Gene.nodes.csv"
type: "Gene"
primary_id_column: "HGNC"
secondary_id_column: "OMIM"
- node_file:
name: "MW.Metabolite.nodes.csv"
type: "Metabolite"
primary_id_column: "PUBCHEM"
secondary_id_column: "PUBMED"
---
name: "LINCS"
version: "2024-05-08"
provenance_id: "infores:lincs"
predicate_mapping:
in_similarity_relationship_with: "biolink:chemically_similar_to"
negatively_regulates: "RO:0002212"
positively_regulates: "RO:0002213"
edge_file: "LINCS.edges.csv"
node_files:
- node_file:
name: "LINCS.Gene.nodes.csv"
type: "Gene"
primary_id_column: "HGNC"
secondary_id_column: "OMIM"
- node_file:
name: "LINCS.Compound.nodes.csv"
type: "Compound"
primary_id_column: "PUBCHEM"
11 changes: 11 additions & 0 deletions graph_specs/cfde-graph-spec.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
graphs:

- graph_id: CFDE_Automat
graph_name: CFDE KP
graph_description: 'Common Fund Data Ecosystem KG'
graph_url: https://github.com/NCATSTranslator/Translator-All/wiki/CFDE-KP
output_format: none
sources:
- source_id: LINCS
- source_id: MetabolomicsWorkbench

120 changes: 120 additions & 0 deletions parsers/MetabolomicsWorkbench/src/loadMetabolomicsWorkbench.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
import os
import zipfile

import polars as pl
import yaml
from yaml import SafeLoader

from Common.kgxmodel import kgxnode, kgxedge
from Common.loader_interface import SourceDataLoader
from Common.biolink_constants import *
from Common.prefixes import PUBCHEM_COMPOUND
from Common.utils import GetData


class MetabolomicsWorkbenchLoader(SourceDataLoader):

source_id: str = 'MetabolomicsWorkbench'

def __init__(self, test_mode: bool = False, source_data_dir: str = None):
"""
:param test_mode - sets the run into test mode
:param source_data_dir - the specific storage directory to save files in
"""
super().__init__(test_mode=test_mode, source_data_dir=source_data_dir)
self.data_url = 'https://cfde-drc.s3.amazonaws.com/Metabolomics/KG%20Assertions'
self.data_file = "MW.zip"

with open('/ORION/cfde-config.yml', 'r') as file:
yaml_data = list(yaml.load_all(file, Loader=SafeLoader))
self.config = list(filter(lambda x: x["name"] == self.source_id, yaml_data))[0]

def get_latest_source_version(self) -> str:
return self.config['version']

def get_data(self) -> bool:
data_puller = GetData()
source_data_url = f'{self.data_url}/{self.get_latest_source_version()}/{self.data_file}'
data_puller.pull_via_http(source_data_url, self.data_path)
return True

def parse_data(self) -> dict:
"""
Parses the data file for graph nodes/edges
:return: ret_val: load_metadata
"""
zip_file = os.path.join(self.data_path, self.data_file)

with zipfile.ZipFile(zip_file, 'r') as zip_ref:
zip_ref.extractall(self.data_path)

nodes = pl.DataFrame(schema={"id": pl.String, "original_id": pl.String, "name": pl.String, "category": pl.List(pl.String)})

predicate_mapping = dict(self.config['predicate_mapping'])

for file in self.config["node_files"]:
if file["node_file"]["secondary_id_column"]:
tmp_df = pl.scan_csv(os.path.join(self.data_path, file["node_file"]["name"]), has_header=True).select(
pl.when(pl.col(file["node_file"]["primary_id_column"]).is_null()).then(pl.col(file["node_file"]["secondary_id_column"])).otherwise(pl.col(file["node_file"]["primary_id_column"])).alias("id"),
pl.col("").alias("original_id"),
pl.col("label").alias("name"),
pl.when(pl.col("type").is_null()).then(pl.lit(file["node_file"]["type"])).otherwise(pl.col("type")).cast(pl.List(pl.String)).alias("category")
)
else:
tmp_df = pl.scan_csv(os.path.join(self.data_path, file["node_file"]["name"]), has_header=True).select(
pl.col(file["node_file"]["primary_id_column"]).alias("id"),
pl.col("").alias("original_id"),
pl.col("label").alias("name"),
pl.when(pl.col("type").is_null()).then(pl.lit(file["node_file"]["type"])).otherwise(pl.col("type")).cast(pl.List(pl.String)).alias("category")
)
tmp_df = tmp_df.with_columns(
pl.when(pl.col("id").str.starts_with("PUBCHEM")).then(pl.col("id").str.replace("PUBCHEM", PUBCHEM_COMPOUND)).otherwise(pl.col("id")).alias("id"),
pl.col("original_id"),
pl.col("name"),
pl.col("category")
).collect()
nodes = pl.concat([nodes, tmp_df], how="vertical")

node_mapping = dict(zip(nodes["original_id"], nodes["id"]))

df_missing = nodes.filter(pl.any_horizontal(pl.all().is_null()))
unmapped_path = os.path.join(self.data_path, "unmapped.jsonl")
df_missing.write_ndjson(unmapped_path)

missing_mapping = dict(zip(df_missing["original_id"], df_missing["id"]))

nodes = nodes.drop_nulls()
nodes.drop_in_place("original_id")

for row in nodes.rows(named=True):
node = kgxnode(identifier=row['id'], name=row['name'], categories=row['category'])
self.final_node_list.append(node)

# nodes_path = os.path.join(self.data_path, "source_nodes.jsonl")
# nodes.write_ndjson(nodes_path)

edges = pl.scan_csv(os.path.join(self.data_path, self.config['edge_file']), has_header=True).select(
pl.col("source").alias("subject"),
pl.col("relation").alias("predicate"),
pl.col("target").alias("object"),
pl.lit(self.config['provenance_id']).alias(PRIMARY_KNOWLEDGE_SOURCE),
pl.lit("data_analysis_pipeline").alias(AGENT_TYPE),
pl.lit("knowledge_assertion").alias(KNOWLEDGE_LEVEL),
).collect()

edges = edges.with_columns(pl.col("subject").replace(missing_mapping), pl.col("predicate"), pl.col("object").replace(missing_mapping)).drop_nulls()
edges = edges.with_columns(pl.col("subject").replace(node_mapping), pl.col("predicate").replace(predicate_mapping), pl.col("object").replace(node_mapping))

for row in edges.rows(named=True):
edge = kgxedge(subject_id=row['subject'], predicate=row['predicate'], object_id=row['object'], primary_knowledge_source=row[PRIMARY_KNOWLEDGE_SOURCE], edgeprops={ KNOWLEDGE_LEVEL: row[KNOWLEDGE_LEVEL], AGENT_TYPE: row[AGENT_TYPE]})
self.final_edge_list.append(edge)

return { 'record_counter': len(edges), 'skipped_record_counter': len(df_missing), 'errors': []}


if __name__ == '__main__':
source_data_dir = str(os.path.join(os.environ.get("ORION_STORAGE"), "MetabolomicsWorkbench", "2024-05-08"))
loader = MetabolomicsWorkbenchLoader(source_data_dir=source_data_dir)
loader.get_data()
print(loader.parse_data())

0 comments on commit ca8cee0

Please sign in to comment.