From 2f9d0cfe716305c95b539a5bc16af41daa896df4 Mon Sep 17 00:00:00 2001 From: Eugene Clark Date: Mon, 18 Nov 2024 11:19:06 -0500 Subject: [PATCH] Don't allow duplicate run ids to be active concurrently --- src/anyvar/restapi/main.py | 11 ++++++++++- tests/test_vcf.py | 38 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 48 insertions(+), 1 deletion(-) diff --git a/src/anyvar/restapi/main.py b/src/anyvar/restapi/main.py index 09df063..879d750 100644 --- a/src/anyvar/restapi/main.py +++ b/src/anyvar/restapi/main.py @@ -326,8 +326,17 @@ async def _annotate_vcf_async( allow_async_write: bool, assembly: str, run_id: str | None, -) -> RunStatusResponse: +) -> RunStatusResponse | ErrorResponse: """Annotate with VRS IDs asynchronously. See `annotate_vcf()` for parameter definitions.""" + # if run_id is provided, validate it does not already exist + if run_id: + existing_result = AsyncResult(id=run_id) + if existing_result.status != "PENDING": + response.status_code = status.HTTP_400_BAD_REQUEST + return ErrorResponse( + error=f"An existing run with id {run_id} is {existing_result.status}. Fetch the completed run result before submitting with the same run_id." + ) + # write file to shared storage area with a directory for each day and a random file name async_work_dir = os.environ.get("ANYVAR_VCF_ASYNC_WORK_DIR", None) utc_now = datetime.datetime.now(tz=datetime.UTC) diff --git a/tests/test_vcf.py b/tests/test_vcf.py index d5019a0..b0d1578 100644 --- a/tests/test_vcf.py +++ b/tests/test_vcf.py @@ -181,6 +181,44 @@ def test_vcf_registration_async(client, sample_vcf_grch38, mocker): shutil.rmtree("tests/tmp_async_work_dir") +def test_vcf_submit_no_async(client, sample_vcf_grch38, mocker): + """Tests that a 400 is returned when async processing is not enabled""" + mocker.patch.dict( + os.environ, {"ANYVAR_VCF_ASYNC_WORK_DIR": "", "CELERY_BROKER_URL": ""} + ) + resp = client.put( + "/vcf", + params={"assembly": "GRCh38", "run_id": "12345", "run_async": True}, + files={"vcf": ("test.vcf", sample_vcf_grch38)}, + ) + assert resp.status_code == HTTPStatus.BAD_REQUEST + assert "error" in resp.json() + assert ( + resp.json()["error"] + == "Required modules and/or configurations for asynchronous VCF annotation are missing" + ) + + +def test_vcf_submit_duplicate_run_id(client, sample_vcf_grch38, mocker): + """Tests the submit VCF endpoint when there is already a run for the specified run id""" + mocker.patch.dict( + os.environ, {"ANYVAR_VCF_ASYNC_WORK_DIR": "./", "CELERY_BROKER_URL": "redis://"} + ) + mock_result = mocker.patch("anyvar.restapi.main.AsyncResult") + mock_result.return_value.status = "SENT" + resp = client.put( + "/vcf", + params={"assembly": "GRCh38", "run_id": "12345", "run_async": True}, + files={"vcf": ("test.vcf", sample_vcf_grch38)}, + ) + assert resp.status_code == HTTPStatus.BAD_REQUEST + assert "error" in resp.json() + assert ( + resp.json()["error"] + == "An existing run with id 12345 is SENT. Fetch the completed run result before submitting with the same run_id." + ) + + def test_vcf_get_result_no_async(client, mocker): """Tests that a 400 is returned when async processing is not enabled""" mocker.patch.dict(