Skip to content

Commit

Permalink
using new format for cfde based parsers
Browse files Browse the repository at this point in the history
  • Loading branch information
jdr0887 committed Jan 23, 2025
1 parent cd6eaf6 commit 5290a53
Showing 1 changed file with 80 additions and 62 deletions.
142 changes: 80 additions & 62 deletions parsers/LINCS/src/loadLINCS.py
Original file line number Diff line number Diff line change
@@ -1,61 +1,40 @@
import os
import enum
import zipfile

from Common.extractor import Extractor
import polars as pl
import yaml
from yaml import SafeLoader

from Common.kgxmodel import kgxnode, kgxedge
from Common.loader_interface import SourceDataLoader
from Common.biolink_constants import *
from Common.prefixes import PUBCHEM_COMPOUND
from Common.utils import GetData


class GENERICDATACOLS(enum.IntEnum):
SOURCE_ID = 2
SOURCE_LABEL = 3
TARGET_ID = 5
TARGET_LABEL = 6
PREDICATE = 7


PREDICATE_MAPPING = {
"in_similarity_relationship_with": "biolink:chemically_similar_to",
"negatively_regulates": "RO:0002212",
"positively_regulates": "RO:0002213"
}


##############
# Class: LINCS loader
#
# By: James Chung
# Date: 10/30/2024
# Desc: Class that loads/parses the data in Library of Integrated Network-Based Cellular Signatures.
#
##############
class LINCSLoader(SourceDataLoader):

source_id: str = 'LINCS'
provenance_id: str = 'infores:lincs'
parsing_version: str = '1.0'

def __init__(self, test_mode: bool = False, source_data_dir: str = None):
"""
:param test_mode - sets the run into test mode
:param source_data_dir - the specific storage directory to save files in
"""
super().__init__(test_mode=test_mode, source_data_dir=source_data_dir)
self.data_url = 'https://cfde-drc.s3.amazonaws.com/LINCS/KG%20Assertions'
self.data_file = "LINCS.zip"

self.data_url = 'https://stars.renci.org/var/data_services/LINCS/'
self.edge_file = "LINCS.lookup.edges.csv"
self.data_files = [self.edge_file]
with open('/ORION/cfde-config.yml', 'r') as file:
yaml_data = list(yaml.load_all(file, Loader=SafeLoader))
self.config = list(filter(lambda x: x["name"] == self.source_id, yaml_data))[0]

def get_latest_source_version(self) -> str:
# The KG was generated from Data Distillery KG. There was no version defined.
latest_version = 'v1.0'
return latest_version
return self.config['version']

def get_data(self) -> bool:
source_data_url = f'{self.data_url}{self.edge_file}'
data_puller = GetData()
source_data_url = f'{self.data_url}/{self.get_latest_source_version()}/{self.data_file}'
data_puller.pull_via_http(source_data_url, self.data_path)
return True

Expand All @@ -65,31 +44,70 @@ def parse_data(self) -> dict:
:return: ret_val: load_metadata
"""
extractor = Extractor(file_writer=self.output_file_writer)
lincs_file: str = os.path.join(self.data_path, self.edge_file)
with open(lincs_file, 'rt') as fp:
extractor.csv_extract(fp,
lambda line: self.resolve_id(line[GENERICDATACOLS.SOURCE_ID.value]), # source id
lambda line: self.resolve_id(line[GENERICDATACOLS.TARGET_ID.value]), # target id
lambda line: PREDICATE_MAPPING[line[GENERICDATACOLS.PREDICATE.value]], # predicate extractor
lambda line: {}, # subject properties
lambda line: {}, # object properties
lambda line: self.get_edge_properties(), # edge properties
comment_character='#',
delim=',',
has_header_row=True)
return extractor.load_metadata

@staticmethod
def resolve_id(idstring: str):
if idstring.startswith("PUBCHEM"):
return idstring.replace("PUBCHEM", PUBCHEM_COMPOUND)
return idstring

def get_edge_properties(self):
properties = {
PRIMARY_KNOWLEDGE_SOURCE: self.provenance_id,
KNOWLEDGE_LEVEL: KNOWLEDGE_ASSERTION,
AGENT_TYPE: DATA_PIPELINE
}
return properties
zip_file = os.path.join(self.data_path, self.data_file)

with zipfile.ZipFile(zip_file, 'r') as zip_ref:
zip_ref.extractall(self.data_path)

nodes = pl.DataFrame(schema={"id": pl.String, "original_id": pl.String, "name": pl.String, "category": pl.List(pl.String)})

predicate_mapping = dict(self.config['predicate_mapping'])

for file in self.config["node_files"]:
if "secondary_id_column" in file["node_file"]:
tmp_df = pl.scan_csv(os.path.join(self.data_path, file["node_file"]["name"]), has_header=True).select(
pl.when(pl.col(file["node_file"]["primary_id_column"]).is_null()).then(pl.col(file["node_file"]["secondary_id_column"])).otherwise(pl.col(file["node_file"]["primary_id_column"])).alias("id"),
pl.col("").alias("original_id"),
pl.col("label").alias("name"),
pl.when(pl.col("type").is_null()).then(pl.lit(file["node_file"]["type"])).otherwise(pl.col("type")).cast(pl.List(pl.String)).alias("category")
)
else:
tmp_df = pl.scan_csv(os.path.join(self.data_path, file["node_file"]["name"]), has_header=True).select(
pl.col(file["node_file"]["primary_id_column"]).alias("id"),
pl.col("").alias("original_id"),
pl.col("label").alias("name"),
pl.when(pl.col("type").is_null()).then(pl.lit(file["node_file"]["type"])).otherwise(pl.col("type")).cast(pl.List(pl.String)).alias("category")
)
tmp_df = tmp_df.with_columns(
pl.when(pl.col("id").str.starts_with("PUBCHEM")).then(pl.col("id").str.replace("PUBCHEM", PUBCHEM_COMPOUND)).otherwise(pl.col("id")).alias("id"),
pl.col("original_id"),
pl.col("name"),
pl.col("category")
).collect()
nodes = pl.concat([nodes, tmp_df], how="vertical")

node_mapping = dict(zip(nodes["original_id"], nodes["id"]))

df_missing = nodes.filter(pl.any_horizontal(pl.all().is_null()))
unmapped_path = os.path.join(self.data_path, "unmapped.jsonl")
df_missing.write_ndjson(unmapped_path)

missing_mapping = dict(zip(df_missing["original_id"], df_missing["id"]))

nodes = nodes.drop_nulls()
nodes.drop_in_place("original_id")

for row in nodes.rows(named=True):
node = kgxnode(identifier=row['id'], name=row['name'], categories=row['category'])
self.final_node_list.append(node)

# nodes_path = os.path.join(self.data_path, "source_nodes.jsonl")
# nodes.write_ndjson(nodes_path)

edges = pl.scan_csv(os.path.join(self.data_path, self.config['edge_file']), has_header=True).select(
pl.col("source").alias("subject"),
pl.col("relation").alias("predicate"),
pl.col("target").alias("object"),
pl.lit(self.config['provenance_id']).alias(PRIMARY_KNOWLEDGE_SOURCE),
pl.lit("data_analysis_pipeline").alias(AGENT_TYPE),
pl.lit("knowledge_assertion").alias(KNOWLEDGE_LEVEL),
).collect()

edges = edges.with_columns(pl.col("subject").replace(missing_mapping), pl.col("predicate"), pl.col("object").replace(missing_mapping)).drop_nulls()
edges = edges.with_columns(pl.col("subject").replace(node_mapping), pl.col("predicate").replace(predicate_mapping), pl.col("object").replace(node_mapping))

for row in edges.rows(named=True):
edge = kgxedge(subject_id=row['subject'], predicate=row['predicate'], object_id=row['object'], primary_knowledge_source=row[PRIMARY_KNOWLEDGE_SOURCE], edgeprops={ KNOWLEDGE_LEVEL: row[KNOWLEDGE_LEVEL], AGENT_TYPE: row[AGENT_TYPE]})
self.final_edge_list.append(edge)

return { 'record_counter': len(edges), 'skipped_record_counter': len(df_missing), 'errors': []}

0 comments on commit 5290a53

Please sign in to comment.