Skip to content

Commit

Permalink
Add unit tests for async VCF processing
Browse files Browse the repository at this point in the history
  • Loading branch information
ehclark committed Nov 15, 2024
1 parent 2d56bb2 commit 01a30ac
Show file tree
Hide file tree
Showing 3 changed files with 198 additions and 1 deletion.
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ test = [
"pytest",
"pytest-cov",
"pytest-mock",
"httpx"
"httpx",
"celery[pytest]",
]
dev = [
"ruff==0.5.0",
Expand Down
15 changes: 15 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
from anyvar.anyvar import AnyVar, create_storage, create_translator
from anyvar.restapi.main import app as anyvar_restapi

pytest_plugins = ("celery.contrib.pytest",)


def pytest_collection_modifyitems(items):
"""Modify test items in place to ensure test modules run in a given order."""
Expand Down Expand Up @@ -58,3 +60,16 @@ def alleles(test_data_dir) -> dict:
"""Provide allele fixture object."""
with (test_data_dir / "variations.json").open() as f:
return json.load(f)["alleles"]


@pytest.fixture(scope="session")
def celery_config():
return {
"broker_url": os.environ.get("CELERY_BROKER_URL", "redis://"),
"result_backend": os.environ.get("CELERY_BACKEND_URL", "redis://"),
"task_default_queue": "anyvar_q",
"event_queue_prefix": "anyvar_ev",
"task_serializer": "json",
"result_serializer": "json",
"accept_content": ["application/json"],
}
181 changes: 181 additions & 0 deletions tests/test_vcf.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,19 @@
"""Test VCF input/output features."""

import io
import os
import pathlib
import shutil
import time
from http import HTTPStatus

import pytest
from billiard.exceptions import TimeLimitExceeded
from celery.contrib.testing.worker import start_worker
from celery.exceptions import WorkerLostError

import anyvar.anyvar
from anyvar.queueing.celery_worker import celery_app


@pytest.fixture()
Expand Down Expand Up @@ -130,3 +140,174 @@ def test_vcf_registration_invalid_assembly(client, sample_vcf_grch37):
files={"vcf": ("test.vcf", sample_vcf_grch37)},
)
assert resp.status_code == HTTPStatus.UNPROCESSABLE_ENTITY


def test_vcf_registration_async(client, sample_vcf_grch38, mocker):
"""Test the async VCF annotation process using a real Celery worker and background task"""
mocker.patch.dict(
os.environ, {"ANYVAR_VCF_ASYNC_WORK_DIR": "tests/tmp_async_work_dir"}
)
assert anyvar.anyvar.has_queueing_enabled(), "async vcf queueing is not enabled"
with start_worker(
celery_app,
pool="solo",
loglevel="info",
perform_ping_check=False,
shutdown_timeout=30,
):
resp = client.put(
"/vcf",
params={"assembly": "GRCh38", "run_id": "12345", "run_async": True},
files={"vcf": ("test.vcf", sample_vcf_grch38)},
)
assert resp.status_code == HTTPStatus.ACCEPTED
assert "status_message" in resp.json()
assert (
resp.json()["status_message"] == "Run submitted. Check status at /vcf/12345"
)
assert "status" in resp.json()
assert resp.json()["status"] == "PENDING"
assert "run_id" in resp.json()
assert resp.json()["run_id"] == "12345"

time.sleep(5)

resp = client.get("/vcf/12345")
assert resp.status_code == HTTPStatus.OK
assert (
b"VRS_Allele_IDs=ga4gh:VA.ryPubD68BB0D-D78L_kK4993mXmsNNWe,ga4gh:VA._QhHH18HBAIeLos6npRgR-S_0lAX5KR6"
in resp.content
)
shutil.rmtree("tests/tmp_async_work_dir")


def test_vcf_get_result_no_async(client, mocker):
"""Tests that a 400 is returned when async processing is not enabled"""
mocker.patch.dict(
os.environ, {"ANYVAR_VCF_ASYNC_WORK_DIR": "", "CELERY_BROKER_URL": ""}
)
resp = client.get("/vcf/12345")
assert resp.status_code == HTTPStatus.BAD_REQUEST
assert "error" in resp.json()
assert (
resp.json()["error"]
== "Required modules and/or configurations for asynchronous VCF annotation are missing"
)


def test_vcf_get_result_success(client, mocker):
"""Tests the get async VCF result endpoint when annotation was successful"""
mocker.patch.dict(
os.environ, {"ANYVAR_VCF_ASYNC_WORK_DIR": "./", "CELERY_BROKER_URL": "redis://"}
)
mock_result = mocker.patch("anyvar.restapi.main.AsyncResult")
mock_result.return_value.status = "SUCCESS"
mock_result.return_value.result = __file__
mock_bg_tasks = mocker.patch("anyvar.restapi.main.BackgroundTasks.add_task")
resp = client.get("/vcf/12345")
assert resp.status_code == HTTPStatus.OK
with pathlib.Path(__file__).open(mode="rb") as fd:
assert resp.content == fd.read()
mock_result.return_value.forget.assert_called_once()
mock_bg_tasks.assert_called_with(os.unlink, __file__)


def test_vcf_get_result_failure_timeout(client, mocker):
"""Tests the get async VCF result endpoint when annotation fails due to task timeout"""
mocker.patch.dict(
os.environ, {"ANYVAR_VCF_ASYNC_WORK_DIR": "./", "CELERY_BROKER_URL": "redis://"}
)
mock_result = mocker.patch("anyvar.restapi.main.AsyncResult")
mock_result.return_value.status = "FAILURE"
mock_result.return_value.result = TimeLimitExceeded("task timed out")
mock_result.return_value.kwargs = {"input_file_path": __file__}
mock_bg_tasks = mocker.patch("anyvar.restapi.main.BackgroundTasks.add_task")
resp = client.get("/vcf/12345")
assert resp.status_code == HTTPStatus.INTERNAL_SERVER_ERROR
assert "error" in resp.json()
assert resp.json()["error"] == "TimeLimitExceeded('task timed out',)"
assert "error_code" in resp.json()
assert resp.json()["error_code"] == "TIME_LIMIT_EXCEEDED"
mock_result.return_value.forget.assert_called_once()
mock_bg_tasks.assert_called_once()


def test_vcf_get_result_failure_worker_lost(client, mocker):
"""Tests the get async VCF result endpoint when annotation failed due to lost worker"""
mocker.patch.dict(
os.environ, {"ANYVAR_VCF_ASYNC_WORK_DIR": "./", "CELERY_BROKER_URL": "redis://"}
)
mock_result = mocker.patch("anyvar.restapi.main.AsyncResult")
mock_result.return_value.status = "FAILURE"
mock_result.return_value.result = WorkerLostError("killed")
mock_result.return_value.kwargs = {"input_file_path": __file__}
mock_bg_tasks = mocker.patch("anyvar.restapi.main.BackgroundTasks.add_task")
resp = client.get("/vcf/12345")
assert resp.status_code == HTTPStatus.INTERNAL_SERVER_ERROR
assert "error" in resp.json()
assert resp.json()["error"] == "killed"
assert "error_code" in resp.json()
assert resp.json()["error_code"] == "WORKER_LOST_ERROR"
mock_result.return_value.forget.assert_called_once()
mock_bg_tasks.assert_called_once()


def test_vcf_get_result_failure_other(client, mocker):
"""Tests the get async VCF result endpoint when annotation failed due to an error"""
mocker.patch.dict(
os.environ,
{
"ANYVAR_VCF_ASYNC_WORK_DIR": "./",
"CELERY_BROKER_URL": "redis://",
"ANYVAR_VCF_ASYNC_FAILURE_STATUS_CODE": "200",
},
)
mock_result = mocker.patch("anyvar.restapi.main.AsyncResult")
mock_result.return_value.status = "FAILURE"
mock_result.return_value.result = KeyError("foo")
mock_result.return_value.kwargs = {"input_file_path": __file__}
mock_bg_tasks = mocker.patch("anyvar.restapi.main.BackgroundTasks.add_task")
resp = client.get("/vcf/12345")
assert resp.status_code == HTTPStatus.OK
assert "error" in resp.json()
assert resp.json()["error"] == "'foo'"
assert "error_code" in resp.json()
assert resp.json()["error_code"] == "RUN_FAILURE"
mock_result.return_value.forget.assert_called_once()
mock_bg_tasks.assert_called_once()


def test_vcf_get_result_notfound(client, mocker):
"""Tests the get async VCF result endpoint when run id is not found"""
mocker.patch.dict(
os.environ, {"ANYVAR_VCF_ASYNC_WORK_DIR": "./", "CELERY_BROKER_URL": "redis://"}
)
mock_result = mocker.patch("anyvar.restapi.main.AsyncResult")
mock_result.return_value.status = "PENDING"
resp = client.get("/vcf/12345")
assert resp.status_code == HTTPStatus.NOT_FOUND
assert "status_message" in resp.json()
assert resp.json()["status_message"] == "Run not found"
assert "status" in resp.json()
assert resp.json()["status"] == "NOT_FOUND"
assert "run_id" in resp.json()
assert resp.json()["run_id"] == "12345"


def test_vcf_get_result_notcomplete(client, mocker):
"""Tests the get async VCF result endpoint when annotation is not yet complete"""
mocker.patch.dict(
os.environ, {"ANYVAR_VCF_ASYNC_WORK_DIR": "./", "CELERY_BROKER_URL": "redis://"}
)
mock_result = mocker.patch("anyvar.restapi.main.AsyncResult")
mock_result.return_value.status = "SENT"
resp = client.get("/vcf/12345")
assert resp.status_code == HTTPStatus.ACCEPTED
assert "status_message" in resp.json()
assert (
resp.json()["status_message"] == "Run not completed. Check status at /vcf/12345"
)
assert "status" in resp.json()
assert resp.json()["status"] == "PENDING"
assert "run_id" in resp.json()
assert resp.json()["run_id"] == "12345"

0 comments on commit 01a30ac

Please sign in to comment.