Skip to content

Commit

Permalink
getting closer to done
Browse files Browse the repository at this point in the history
  • Loading branch information
jdr0887 committed Jan 22, 2025
1 parent d5081fb commit 2e45fa8
Showing 1 changed file with 48 additions and 84 deletions.
132 changes: 48 additions & 84 deletions parsers/MetabolomicsWorkbench/src/loadMetabolomicsWorkbench.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,20 @@
import os
import enum
import zipfile
import pathlib
from collections.abc import Mapping

import polars as pl
import yaml
from yaml import SafeLoader

from Common.extractor import Extractor
from Common.kgxmodel import kgxnode, kgxedge
from Common.loader_interface import SourceDataLoader
from Common.biolink_constants import *
from Common.prefixes import PUBCHEM_COMPOUND
from Common.utils import GetData




class MetabolomicsWorkbenchLoader(SourceDataLoader):

source_id: str = 'MetabolomicsWorkbench'
provenance_id: str = 'infores:metabolomics_workbench'
parsing_version: str = '2024-05-08'

def __init__(self, test_mode: bool = False, source_data_dir: str = None):
"""
Expand All @@ -30,9 +25,12 @@ def __init__(self, test_mode: bool = False, source_data_dir: str = None):
self.data_url = 'https://cfde-drc.s3.amazonaws.com/Metabolomics/KG%20Assertions'
self.data_file = "MW.zip"

with open('/ORION/cfde-config.yml', 'r') as file:
yaml_data = list(yaml.load_all(file, Loader=SafeLoader))
self.config = list(filter(lambda x: x["name"] == self.source_id, yaml_data))[0]

def get_latest_source_version(self) -> str:
latest_version = '2024-05-08'
return latest_version
return self.config['version']

def get_data(self) -> bool:
data_puller = GetData()
Expand All @@ -53,39 +51,31 @@ def parse_data(self) -> dict:

nodes = pl.DataFrame(schema={"id": pl.String, "original_id": pl.String, "name": pl.String, "category": pl.List(pl.String)})

anatomy_df = pl.scan_csv(os.path.join(self.data_path, "MW.Anatomy.nodes.csv"), has_header=True).select(
pl.when(pl.col("UBERON").is_null()).then(pl.col("CHV")).otherwise(pl.col("UBERON")).alias("id"),
pl.col("").alias("original_id"),
pl.col("label").alias("name"),
pl.when(pl.col("type").is_null()).then(pl.lit("Anatomy")).otherwise(pl.col("type")).cast(pl.List(pl.String)).alias("category")
).collect()
null_nodes_count = int(anatomy_df.null_count().item(0,0))

disease_df = pl.scan_csv(os.path.join(self.data_path, "MW.Disease or Phenotype.nodes.csv"), has_header=True).select(
pl.when(pl.col("MONDO").is_null()).then(pl.col("CHV")).otherwise(pl.col("MONDO")).alias("id"),
pl.col("").alias("original_id"),
pl.col("label").alias("name"),
pl.when(pl.col("type").is_null()).then(pl.lit("Disease or Phenotype")).otherwise(pl.col("type")).cast(pl.List(pl.String)).alias("category")
).collect()
null_nodes_count = null_nodes_count + int(disease_df.null_count().item(0,0))

gene_df = pl.scan_csv(os.path.join(self.data_path, "MW.Gene.nodes.csv"), has_header=True).select(
pl.when(pl.col("HGNC").is_null()).then(pl.col("OMIM")).otherwise(pl.col("HGNC")).alias("id"),
pl.col("").alias("original_id"),
pl.col("label").alias("name"),
pl.when(pl.col("type").is_null()).then(pl.lit("Gene")).otherwise(pl.col("type")).cast(pl.List(pl.String)).alias("category")
).collect()
null_nodes_count = null_nodes_count + int(gene_df.null_count().item(0,0))
predicate_mapping = dict(self.config['predicate_mapping'])

for file in self.config["node_files"]:
if file["node_file"]["secondary_id_column"]:
tmp_df = pl.scan_csv(os.path.join(self.data_path, file["node_file"]["name"]), has_header=True).select(
pl.when(pl.col(file["node_file"]["primary_id_column"]).is_null()).then(pl.col(file["node_file"]["secondary_id_column"])).otherwise(pl.col(file["node_file"]["primary_id_column"])).alias("id"),
pl.col("").alias("original_id"),
pl.col("label").alias("name"),
pl.when(pl.col("type").is_null()).then(pl.lit(file["node_file"]["type"])).otherwise(pl.col("type")).cast(pl.List(pl.String)).alias("category")
)
else:
tmp_df = pl.scan_csv(os.path.join(self.data_path, file["node_file"]["name"]), has_header=True).select(
pl.col(file["node_file"]["primary_id_column"]).alias("id"),
pl.col("").alias("original_id"),
pl.col("label").alias("name"),
pl.when(pl.col("type").is_null()).then(pl.lit(file["node_file"]["type"])).otherwise(pl.col("type")).cast(pl.List(pl.String)).alias("category")
)
tmp_df = tmp_df.with_columns(
pl.when(pl.col("id").str.starts_with("PUBCHEM")).then(pl.col("id").str.replace("PUBCHEM", PUBCHEM_COMPOUND)).otherwise(pl.col("id")).alias("id"),
pl.col("original_id"),
pl.col("name"),
pl.col("category")
).collect()
nodes = pl.concat([nodes, tmp_df], how="vertical")

metabolite_df = pl.scan_csv(os.path.join(self.data_path, "MW.Metabolite.nodes.csv"), has_header=True).select(
pl.when(pl.col("PUBCHEM").is_null()).then(pl.col("PUBMED")).otherwise(pl.col("PUBCHEM")).alias("id"),
pl.col("").alias("original_id"),
pl.col("label").alias("name"),
pl.when(pl.col("type").is_null()).then(pl.lit("Metabolite")).otherwise(pl.col("type")).cast(pl.List(pl.String)).alias("category")
).collect()
null_nodes_count = null_nodes_count + int(metabolite_df.null_count().item(0,0))

nodes = pl.concat([nodes, anatomy_df, disease_df, gene_df, metabolite_df], how="vertical")
node_mapping = dict(zip(nodes["original_id"], nodes["id"]))

df_missing = nodes.filter(pl.any_horizontal(pl.all().is_null()))
Expand All @@ -97,60 +87,34 @@ def parse_data(self) -> dict:
nodes = nodes.drop_nulls()
nodes.drop_in_place("original_id")

nodes_path = os.path.join(self.data_path, "source_nodes.jsonl")
nodes.write_ndjson(nodes_path)
for row in nodes.rows(named=True):
node = kgxnode(identifier=row['id'], name=row['name'], categories=row['category'])
self.final_node_list.append(node)

predicate_mapping = {"produces": "biolink:produces", "causally_influences": "biolink:produces", "correlated_with_condition": "biolink:correlated_with"}
# nodes_path = os.path.join(self.data_path, "source_nodes.jsonl")
# nodes.write_ndjson(nodes_path)

edges = pl.scan_csv(os.path.join(self.data_path, "MW.edges.csv"), has_header=True).select(
edges = pl.scan_csv(os.path.join(self.data_path, self.config['edge_file']), has_header=True).select(
pl.col("source").alias("subject"),
pl.col("relation").alias("predicate"),
pl.col("target").alias("object"),
pl.lit("infores:mw").alias(PRIMARY_KNOWLEDGE_SOURCE),
pl.lit(self.config['provenance_id']).alias(PRIMARY_KNOWLEDGE_SOURCE),
pl.lit("data_analysis_pipeline").alias(AGENT_TYPE),
pl.lit("knowledge_assertion").alias(KNOWLEDGE_LEVEL),
).collect()

edges = edges.with_columns(pl.col("subject").replace(missing_mapping), pl.col("predicate"), pl.col("object").replace(missing_mapping))
edges = edges.drop_nulls()

edges = edges.with_columns(pl.col("subject").replace(missing_mapping), pl.col("predicate"), pl.col("object").replace(missing_mapping)).drop_nulls()
edges = edges.with_columns(pl.col("subject").replace(node_mapping), pl.col("predicate").replace(predicate_mapping), pl.col("object").replace(node_mapping))
edges_path = os.path.join(self.data_path, "source_edges.jsonl")
edges.write_ndjson(edges_path)

# class GENERICDATACOLS(enum.IntEnum):
# SOURCE_ID = 2
# SOURCE_LABEL = 3
# TARGET_ID = 5
# TARGET_LABEL = 6
# PREDICATE = 7
#
#
# PREDICATE_MAPPING = {
# "in_similarity_relationship_with": "biolink:chemically_similar_to",
# "negatively_regulates": "RO:0002212",
# "positively_regulates": "RO:0002213"
# }

# extractor = Extractor(file_writer=self.output_file_writer)
# mw_edge_file: str = os.path.join(self.data_path, "MW.edges.csv")
# with open(mw_edge_file, 'rt') as fp:
# extractor.csv_extract(fp,
# lambda line: self.resolve_id(line[GENERICDATACOLS.SOURCE_ID.value]), # source id
# lambda line: self.resolve_id(line[GENERICDATACOLS.TARGET_ID.value]), # target id
# lambda line: PREDICATE_MAPPING[line[GENERICDATACOLS.PREDICATE.value]], # predicate extractor
# lambda line: {}, # subject properties
# lambda line: {}, # object properties
# lambda line: self.get_edge_properties(), # edge properties
# comment_character='#',
# delim=',',
# has_header_row=True)

return { 'record_counter': 0, 'skipped_record_counter': null_nodes_count, 'errors': []}

if __name__ == '__main__':
for row in edges.rows(named=True):
edge = kgxedge(subject_id=row['subject'], predicate=row['predicate'], object_id=row['object'], primary_knowledge_source=row[PRIMARY_KNOWLEDGE_SOURCE], edgeprops={ KNOWLEDGE_LEVEL: row[KNOWLEDGE_LEVEL], AGENT_TYPE: row[AGENT_TYPE]})
self.final_edge_list.append(edge)

return { 'record_counter': len(edges), 'skipped_record_counter': len(df_missing), 'errors': []}


if __name__ == '__main__':
source_data_dir = str(os.path.join(os.environ.get("ORION_STORAGE"), "MetabolomicsWorkbench", "2024-05-08"))
loader = MetabolomicsWorkbenchLoader(source_data_dir=source_data_dir)
loader.get_data()
# print(loader.parse_data())
print(loader.parse_data())

0 comments on commit 2e45fa8

Please sign in to comment.