Skip to content

Commit

Permalink
split data and package versioning
Browse files Browse the repository at this point in the history
  • Loading branch information
glass-ships committed Jan 23, 2024
1 parent 4baae98 commit 3133882
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 96 deletions.
2 changes: 1 addition & 1 deletion Jenkinsfile
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ pipeline {
}
stage('transform') {
steps {
sh 'poetry run ingest transform --all --log --rdf'
sh 'poetry run ingest transform --all --log --rdf --write-versions'
sh '''
sed -i.bak 's@\r@@g' output/transform_output/*.tsv
rm output/transform_output/*.bak
Expand Down
2 changes: 1 addition & 1 deletion Jenkinsfile-download
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ pipeline {
stage('download') {
steps {
sh '''
poetry run downloader src/monarch_ingest/download.yaml
poetry run ingest download --all --write-versions
'''
}
}
Expand Down
77 changes: 59 additions & 18 deletions src/monarch_ingest/cli_utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import csv
import gc
import os
import pathlib
import tarfile
from pathlib import Path
from typing import Optional
Expand All @@ -10,7 +9,6 @@
from linkml.utils.helpers import convert_to_snake_case

# from loguru import logger

import pandas
import sh

Expand All @@ -25,8 +23,6 @@
from monarch_ingest.utils.log_utils import get_logger
from monarch_ingest.utils.export_utils import export

# from koza.utils.log_utils import get_logger


OUTPUT_DIR = "output"

Expand Down Expand Up @@ -134,7 +130,7 @@ def transform_phenio(
nodes = f"{output_dir}/transform_output/phenio_nodes.tsv"
edges = f"{output_dir}/transform_output/phenio_edges.tsv"

if (force == False) and file_exists(nodes) and file_exists(edges):
if (force is False) and file_exists(nodes) and file_exists(edges):
logger.info(f"Transformed output exists - skipping ingest: Phenio - To run this ingest anyway, use --force")
# if log: logger.removeHandler(fh)
return
Expand All @@ -155,7 +151,7 @@ def transform_phenio(
# associations that we'll also need
exclude_prefixes = ["HGNC", "FlyBase", "http", "biolink"]

pathlib.Path(f"{output_dir}/qc/").mkdir(parents=True, exist_ok=True)
Path(f"{output_dir}/qc/").mkdir(parents=True, exist_ok=True)
excluded_nodes = nodes_df[nodes_df["id"].str.startswith(tuple(exclude_prefixes))]
nodes_df = nodes_df[~nodes_df["id"].str.startswith(tuple(exclude_prefixes))]

Expand Down Expand Up @@ -291,6 +287,45 @@ def transform_all(
# if log: logger.removeHandler(fh)


def get_data_versions(output_dir: str = OUTPUT_DIR):
import requests as r

data = {}
data["phenio"] = r.get("https://api.github.com/repos/monarch-initiative/phenio/releases").json()[0]["tag_name"]
data["hpo"] = r.get("https://api.github.com/repos/obophenotype/human-phenotype-ontology/releases").json()[0][
"tag_name"
]
data["mondo"] = r.get("https://api.github.com/repos/monarch-initiative/mondo/releases").json()[0]["tag_name"]
data["alliance"] = r.get("https://fms.alliancegenome.org/api/releaseversion/current").json()["releaseVersion"]
Path(f"{output_dir}").mkdir(parents=True, exist_ok=True)
with open(f"{output_dir}/versions.yaml", "w") as f:
f.write("data:\n")
for data_source, version in data.items():
f.write(f" {data_source}: {version}\n")


def get_pkg_versions(output_dir: str = OUTPUT_DIR):
import yaml
from importlib.metadata import version

packages = {}
packages["biolink"] = version("biolink-model")
packages["monarch-ingest"] = version("monarch-ingest")
packages["koza"] = version("koza")

with open("data/versions.yaml", "r") as f:
data_versions = yaml.load(f, Loader=yaml.FullLoader)["data"]

Path(f"{output_dir}").mkdir(parents=True, exist_ok=True)
with open(f"{output_dir}/versions.yaml", "w") as f:
f.write("packages:\n")
for p, v in packages.items():
f.write(f" {p}: {v}\n")
f.write("\ndata:\n")
for d, v in data_versions.items():
f.write(f" {d}: {v}\n")


def merge_files(
name: str = "monarch-kg",
input_dir: str = f"{OUTPUT_DIR}/transform_output",
Expand All @@ -316,18 +351,22 @@ def apply_closure(
output_dir: str = OUTPUT_DIR,
):
output_file = f"{output_dir}/{name}-denormalized-edges.tsv"
add_closure(kg_archive=f"{output_dir}/{name}.tar.gz",
closure_file=closure_file,
output_file=output_file,
fields=['subject',
'object',
'qualifiers',
'frequency_qualifier',
'onset_qualifier',
'sex_qualifier',
'stage_qualifier'],
evidence_fields=['has_evidence', 'publications'],
grouping_fields=['subject', 'negated', 'predicate', 'object'])
add_closure(
kg_archive=f"{output_dir}/{name}.tar.gz",
closure_file=closure_file,
output_file=output_file,
fields=[
"subject",
"object",
"qualifiers",
"frequency_qualifier",
"onset_qualifier",
"sex_qualifier",
"stage_qualifier",
],
evidence_fields=["has_evidence", "publications"],
grouping_fields=["subject", "negated", "predicate", "object"],
)
sh.pigz(output_file, force=True)


Expand Down Expand Up @@ -397,9 +436,11 @@ def load_jsonl():
os.remove("output/monarch-kg_nodes.jsonl")
os.remove("output/monarch-kg_edges.jsonl")


def export_tsv():
export()


def do_release(dir: str = OUTPUT_DIR, kghub: bool = False):
import datetime

Expand Down
134 changes: 58 additions & 76 deletions src/monarch_ingest/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,22 @@

from kghub_downloader.download_utils import download_from_yaml
from monarch_ingest.cli_utils import (
apply_closure,
apply_closure,
do_release,
export_tsv,
get_data_versions,
get_pkg_versions,
load_jsonl,
load_sqlite,
load_solr,
merge_files,
transform_one,
transform_phenio,
transform_all,
load_sqlite,
load_solr,
merge_files,
transform_one,
transform_phenio,
transform_all,
)

import typer

typer_app = typer.Typer()

OUTPUT_DIR = "output"
Expand All @@ -24,63 +27,28 @@
def callback(version: Optional[bool] = typer.Option(None, "--version", is_eager=True)):
if version:
from monarch_ingest import __version__

typer.echo(f"monarch_ingest version: {__version__}")
raise typer.Exit()
raise typer.Exit()


@typer_app.command()
def download(
ingests: Optional[List[str]] = typer.Option(None, help="Which ingests to download data for"),
ingests: Optional[List[str]] = typer.Option(None, help="Which ingests to download data for"),
all: bool = typer.Option(False, help="Download all ingest datasets"),
write_versions: bool = typer.Option(False, help="Write versions of ingests to versions.yaml")
):
write_versions: bool = typer.Option(False, help="Write versions of ingests to versions.yaml"),
):
"""Downloads data defined in download.yaml"""

if ingests:
download_from_yaml(
yaml_file='src/monarch_ingest/download.yaml',
output_dir='.',
yaml_file="src/monarch_ingest/download.yaml",
output_dir=".",
tags=ingests,
)
elif all:
download_from_yaml(
yaml_file='src/monarch_ingest/download.yaml',
output_dir='.'
)

download_from_yaml(yaml_file="src/monarch_ingest/download.yaml", output_dir=".")
if write_versions:
# TODO:
# - Find a way to get versions of other data sources
# - May need beautifulsoup to scrape some web pages
# - Split data and packages into separate sections
import requests as r
from importlib.metadata import version

packages = {}
data = {}

# get biolink model version
packages["biolink"] = version("biolink-model")
packages["monarch-ingest"] = version("monarch-ingest")

# github api query for mondo, phenio, hpo versions
data["phenio"] = r.get("https://api.github.com/repos/monarch-initiative/phenio/releases").json()[0]["tag_name"]
data["hpo"] = r.get("https://api.github.com/repos/obophenotype/human-phenotype-ontology/releases").json()[0]["tag_name"]
data["mondo"] = r.get("https://api.github.com/repos/monarch-initiative/mondo/releases").json()[0]["tag_name"]

# get alliance version from alliance api endpoint
data["alliance"] = r.get("https://fms.alliancegenome.org/api/releaseversion/current").json()["releaseVersion"]

# zfin -> daily build, no version (or use beautifulsoup)

# write to versions.yaml
with open("versions.yaml", "w") as f:
f.write("packages:\n")
for package, version in packages.items():
f.write(f" {package}: {version}\n")
f.write("data:\n")
for data_source, version in data.items():
f.write(f" {data_source}: {version}\n")
get_data_versions(output_dir="data")


@typer_app.command()
Expand All @@ -90,48 +58,58 @@ def transform(
ingest: str = typer.Option(None, "--ingest", "-i", help="Run a single ingest (see ingests.yaml for a list)"),
phenio: bool = typer.Option(False, help="Run the phenio transform"),
all: bool = typer.Option(False, "--all", "-a", help="Ingest all sources"),
force: bool = typer.Option(False, "--force", "-f", help="Force ingest, even if output exists (on by default for single ingests)"),
force: bool = typer.Option(
False, "--force", "-f", help="Force ingest, even if output exists (on by default for single ingests)"
),
rdf: bool = typer.Option(False, help="Output rdf files along with tsv"),
verbose: Optional[bool] = typer.Option(None, "--debug/--quiet", "-d/-q", help="Use --quiet to suppress log output, --debug for verbose, including Koza logs"),
verbose: Optional[bool] = typer.Option(
None,
"--debug/--quiet",
"-d/-q",
help="Use --quiet to suppress log output, --debug for verbose, including Koza logs",
),
log: bool = typer.Option(False, "--log", "-l", help="Write DEBUG level logs to ./logs/ for each ingest"),
row_limit: int = typer.Option(None, "--row-limit", "-n", help="Number of rows to process"),
write_versions: bool = typer.Option(False, help="Write data/package versions to output_dir/versions.yaml"),
# parallel: int = typer.Option(None, "--parallel", "-p", help="Utilize Dask to perform multiple ingests in parallel"),
):
"""Run Koza transformation on specified Monarch ingests"""

if phenio:
transform_phenio(
output_dir=output_dir,
force=force,
verbose=verbose
)
transform_phenio(output_dir=output_dir, force=force, verbose=verbose)
elif ingest:
transform_one(
ingest = ingest,
output_dir = output_dir,
row_limit = row_limit,
rdf = rdf,
force = True if force is None else force,
verbose = verbose,
log = log,
ingest=ingest,
output_dir=output_dir,
row_limit=row_limit,
rdf=rdf,
force=True if force is None else force,
verbose=verbose,
log=log,
)
elif all:
transform_all(
output_dir = output_dir,
row_limit = row_limit,
rdf = rdf,
force = force,
output_dir=output_dir,
row_limit=row_limit,
rdf=rdf,
force=force,
verbose=verbose,
log = log,
log=log,
)
if write_versions:
get_pkg_versions(output_dir=output_dir)


@typer_app.command()
def merge(
input_dir: str = typer.Option(f"{OUTPUT_DIR}/transform_output", help="Directory with nodes and edges to be merged",),
input_dir: str = typer.Option(
f"{OUTPUT_DIR}/transform_output",
help="Directory with nodes and edges to be merged",
),
output_dir: str = typer.Option(f"{OUTPUT_DIR}", help="Directory to output data"),
verbose: Optional[bool] = typer.Option(None, "--debug/--quiet", "-d/-q", help="Use --quiet to suppress log output, --debug for verbose"),
):
verbose: Optional[bool] = typer.Option(
None, "--debug/--quiet", "-d/-q", help="Use --quiet to suppress log output, --debug for verbose"
),
):
"""Merge nodes and edges into kg"""
merge_files(input_dir=input_dir, output_dir=output_dir, verbose=verbose)

Expand All @@ -140,10 +118,12 @@ def merge(
def closure():
apply_closure()


@typer_app.command()
def jsonl():
load_jsonl()


@typer_app.command()
def sqlite():
load_sqlite()
Expand All @@ -153,15 +133,17 @@ def sqlite():
def solr():
load_solr()


@typer_app.command()
def export():
export_tsv()


@typer_app.command()
def release(
dir: str = typer.Option(f"{OUTPUT_DIR}", help="Directory with kg to be released"),
kghub: bool = typer.Option(False, help="Also release to kghub S3 bucket")
):
kghub: bool = typer.Option(False, help="Also release to kghub S3 bucket"),
):
"""Copy data to Monarch GCP data buckets"""
do_release(dir, kghub)

Expand Down

0 comments on commit 3133882

Please sign in to comment.