From 38d6e09fa75720b84e2dfdbc4876185dbb83d556 Mon Sep 17 00:00:00 2001 From: Juliana Karoline de Sousa Date: Tue, 28 Nov 2023 11:35:33 -0300 Subject: [PATCH] [ASP-4245] Update agent download logic file (#419) * ASP-4245 Add env var to control download of files * ASP-4245 Update submit to use new env var * ASp-4245 Add fixture to test supporting files * Add unit tests * ASP-4245 Add entry to CHANGELOG * ASP-4245 Refact logic for submitting job script * ASP-4245 Rename env var to WRITE_SUBMISSION_FILES * ASP-4245 Update unit tests * ASP-4245 Update entry on CHANGELOG --- jobbergate-agent/CHANGELOG.md | 2 +- .../jobbergate_agent/jobbergate/submit.py | 110 +++++++-- jobbergate-agent/jobbergate_agent/settings.py | 3 + jobbergate-agent/tests/jobbergate/conftest.py | 36 +++ .../tests/jobbergate/test_submit.py | 215 ++++++++++++++++++ 5 files changed, 347 insertions(+), 19 deletions(-) diff --git a/jobbergate-agent/CHANGELOG.md b/jobbergate-agent/CHANGELOG.md index 8600c649..c2572695 100644 --- a/jobbergate-agent/CHANGELOG.md +++ b/jobbergate-agent/CHANGELOG.md @@ -3,7 +3,7 @@ This file keeps track of all notable changes to jobbergate-core ## Unreleased - +- Added setting to specify if the job script files should be written to the submit directory ## 4.2.0a1 -- 2023-11-13 - Added setting to control the timeout on `httpx` requests diff --git a/jobbergate-agent/jobbergate_agent/jobbergate/submit.py b/jobbergate-agent/jobbergate_agent/jobbergate/submit.py index 1899a418..0b9a8e42 100644 --- a/jobbergate-agent/jobbergate_agent/jobbergate/submit.py +++ b/jobbergate-agent/jobbergate_agent/jobbergate/submit.py @@ -1,4 +1,6 @@ +import asyncio import json +from pathlib import Path from typing import Any, Dict, cast from buzz import handle_errors @@ -10,6 +12,7 @@ from jobbergate_agent.jobbergate.api import SubmissionNotifier, fetch_pending_submissions, mark_as_submitted from jobbergate_agent.jobbergate.constants import FileType, JobSubmissionStatus from jobbergate_agent.jobbergate.schemas import ( + JobScriptFile, PendingJobSubmission, SlurmJobParams, SlurmJobSubmission, @@ -45,6 +48,91 @@ def get_job_parameters(slurm_parameters: Dict[str, Any], **kwargs) -> SlurmJobPa return SlurmJobParams.parse_obj(merged_parameters) +async def retrieve_submission_file(file: JobScriptFile) -> str: + """ + Get a submission file from the backend and return the decoded file content. + """ + response = await backend_client.get(file.path) + response.raise_for_status() + + return response.content.decode("utf-8") + + +async def write_submission_file(file_content: str, filename: str, submit_dir: Path): + """ + Write a decoded file content to the submit_dir. + """ + local_script_path = submit_dir / filename + local_script_path.parent.mkdir(parents=True, exist_ok=True) + local_script_path.write_bytes(file_content.encode("utf-8")) + + logger.debug(f"Copied file to {local_script_path}") + + +async def process_supporting_files(pending_job_submission: PendingJobSubmission, submit_dir: Path): + """ + Process the submission support files. + + Write the support files to the submit_dir if WRITE_SUBMISSION_FILES is set to True. + Reject the submission if there are support files with WRITE_SUBMISSION_FILES set to False. + """ + supporting_files = [file for file in pending_job_submission.job_script.files if file.file_type == FileType.SUPPORT] + + if SETTINGS.WRITE_SUBMISSION_FILES: + # Write the supporting submission support files to the submit dir + logger.debug(f"Writing supporting submission files to {submit_dir}") + + # Retrieve the files from the backend + files_to_retrieve = [retrieve_submission_file(file) for file in supporting_files] + files_content = await asyncio.gather(*files_to_retrieve) + + # Write the files to the submit dir + files_to_write = [ + write_submission_file(file_content, file.filename, submit_dir) + for file_content, file in zip(files_content, supporting_files) + ] + await asyncio.gather(*files_to_write) + else: + # Reject the submission if there are supporting files with WRITE_SUBMISSION_FILES set to False + logger.debug(f"Can't write files for submission {pending_job_submission.id}") + + JobSubmissionError.require_condition( + not supporting_files, + "Job submission rejected. The submission has supporting files that can't be downloaded to " + "the execution dir. Set `WRITE_SUBMISSION_FILES` setting to `True` to download the " + "job script files to the execution dir.", + ) + + +async def get_job_script_file(pending_job_submission: PendingJobSubmission, submit_dir: Path) -> str: + """ + Get the job script file from the backend. + + Write the job script file to the submit_dir if WRITE_SUBMISSION_FILES is set to True. + """ + job_script_file = None + + for file in pending_job_submission.job_script.files: + if file.file_type == FileType.ENTRYPOINT: # Should have only one entrypoint + job_script_file = file + break + + JobSubmissionError.require_condition( + job_script_file, + "Could not find an executable script in retrieved job script data.", + ) + + # Make static type checkers happy + assert job_script_file is not None + + job_script = await retrieve_submission_file(job_script_file) + + if SETTINGS.WRITE_SUBMISSION_FILES: + await write_submission_file(job_script, job_script_file.filename, submit_dir) + + return job_script + + async def submit_job_script( pending_job_submission: PendingJobSubmission, user_mapper: SlurmUserMapper, @@ -72,25 +160,11 @@ async def submit_job_script( submit_dir = pending_job_submission.execution_directory or SETTINGS.DEFAULT_SLURM_WORK_DIR - job_script = None - - for metadata in pending_job_submission.job_script.files: - local_script_path = submit_dir / metadata.filename - local_script_path.parent.mkdir(parents=True, exist_ok=True) - - response = await backend_client.get(metadata.path) - response.raise_for_status() - local_script_path.write_bytes(response.content) + logger.debug(f"Processing submission files for job submission {pending_job_submission.id}") + await process_supporting_files(pending_job_submission, submit_dir) - if metadata.file_type == FileType.ENTRYPOINT: - job_script = response.content.decode("utf-8") - - logger.debug(f"Copied job script file to {local_script_path}") - - JobSubmissionError.require_condition( - job_script, - "Could not find an executable script in retrieved job script data.", - ) + logger.debug(f"Fetching job script for job submission {pending_job_submission.id}") + job_script = await get_job_script_file(pending_job_submission, submit_dir) async with handle_errors_async( "Failed to extract Slurm parameters", diff --git a/jobbergate-agent/jobbergate_agent/settings.py b/jobbergate-agent/jobbergate_agent/settings.py index 116278ca..78d94eb3 100644 --- a/jobbergate-agent/jobbergate_agent/settings.py +++ b/jobbergate-agent/jobbergate_agent/settings.py @@ -50,6 +50,9 @@ class Settings(BaseSettings): TASK_JOBS_INTERVAL_SECONDS: int = Field(60, ge=10, le=3600) # seconds TASK_GARBAGE_COLLECTION_HOUR: Optional[int] = Field(None, ge=0, le=23) # hour of day + # Job submission settings + WRITE_SUBMISSION_FILES: bool = True + @root_validator def compute_extra_settings(cls, values): """ diff --git a/jobbergate-agent/tests/jobbergate/conftest.py b/jobbergate-agent/tests/jobbergate/conftest.py index a028037e..0e682dfc 100644 --- a/jobbergate-agent/tests/jobbergate/conftest.py +++ b/jobbergate-agent/tests/jobbergate/conftest.py @@ -26,6 +26,9 @@ def dummy_template_source(): @pytest.fixture def dummy_job_script_files(): + """ + Provide a fixture that returns a list of dummy job script files. + """ return [ { "parent_id": 1, @@ -35,6 +38,25 @@ def dummy_job_script_files(): ] +@pytest.fixture +def dummy_job_script_with_supporting_files(): + """ + Provide a fixture that returns a list of dummy job script files with supporting files. + """ + return [ + { + "parent_id": 1, + "filename": "application.sh", + "file_type": "ENTRYPOINT", + }, + { + "parent_id": 1, + "filename": "input.txt", + "file_type": "SUPPORT", + }, + ] + + @pytest.fixture def dummy_pending_job_submission_data(dummy_job_script_files, tmp_path): """ @@ -47,3 +69,17 @@ def dummy_pending_job_submission_data(dummy_job_script_files, tmp_path): job_script={"files": dummy_job_script_files}, slurm_job_id=13, ) + + +@pytest.fixture +def dummy_pending_job_submission_with_supporting_files_data(dummy_job_script_with_supporting_files, tmp_path): + """ + Provide a fixture that returns a dict that is compatible with PendingJobSubmission and has supporting files. + """ + return dict( + id=1, + name="sub1", + owner_email="email1.@dummy.com", + job_script={"files": dummy_job_script_with_supporting_files}, + slurm_job_id=13, + ) diff --git a/jobbergate-agent/tests/jobbergate/test_submit.py b/jobbergate-agent/tests/jobbergate/test_submit.py index 572c30d5..909414de 100644 --- a/jobbergate-agent/tests/jobbergate/test_submit.py +++ b/jobbergate-agent/tests/jobbergate/test_submit.py @@ -11,6 +11,7 @@ from jobbergate_agent.jobbergate.constants import JobSubmissionStatus from jobbergate_agent.jobbergate.schemas import ( + JobScriptFile, PendingJobSubmission, SlurmJobParams, SlurmJobSubmission, @@ -18,15 +19,229 @@ ) from jobbergate_agent.jobbergate.submit import ( get_job_parameters, + get_job_script_file, + process_supporting_files, + retrieve_submission_file, submit_job_script, submit_pending_jobs, unpack_error_from_slurm_response, + write_submission_file, ) from jobbergate_agent.settings import SETTINGS from jobbergate_agent.utils.exception import JobSubmissionError, SlurmrestdError from jobbergate_agent.utils.user_mapper import SingleUserMapper +@pytest.mark.usefixtures("mock_access_token") +@pytest.mark.asyncio +async def test_retrieve_submission_file__success(): + """ + Test that the ``retrieve_submission_file()`` function can retrieve a submission file + from the backend and return its content. + """ + job_script_file = JobScriptFile( + parent_id=1, + filename="application.sh", + file_type="ENTRYPOINT", + path="/jobbergate/job-scripts/1/upload/application.sh", + ) + + async with respx.mock: + download_route = respx.get(f"{SETTINGS.BASE_API_URL}/jobbergate/job-scripts/1/upload/application.sh") + download_route.mock( + return_value=httpx.Response( + status_code=200, + content="I am a job script".encode("utf-8"), + ), + ) + + actual_content = await retrieve_submission_file(job_script_file) + + assert actual_content == "I am a job script" + assert download_route.call_count == 1 + last_request = download_route.calls.last.request + assert last_request.url == f"{SETTINGS.BASE_API_URL}/jobbergate/job-scripts/1/upload/application.sh" + + +@pytest.mark.usefixtures("mock_access_token") +@pytest.mark.asyncio +async def test_retrieve_submission_file__raises_exception(): + """ + Test that the ``retrieve_submission_file()`` function raises an exception if the + backend returns a non-200 response. + """ + job_script_file = JobScriptFile( + parent_id=1, + filename="application.sh", + file_type="ENTRYPOINT", + path="/jobbergate/job-scripts/1/upload/application.sh", + ) + + async with respx.mock: + download_route = respx.get(f"{SETTINGS.BASE_API_URL}/jobbergate/job-scripts/1/upload/application.sh") + download_route.mock( + return_value=httpx.Response( + status_code=400, + ), + ) + + with pytest.raises( + httpx.HTTPStatusError, + ): + await retrieve_submission_file(job_script_file) + + +@pytest.mark.asyncio +async def test_write_submission_file__success(tmp_path): + """ + Test that the ``write_submission_file()`` function can write a submission file to disk. + """ + job_script_content = "I am a job script" + submit_dir = tmp_path / "submit" + submit_dir.mkdir() + + await write_submission_file(job_script_content, "application.sh", submit_dir) + + assert (submit_dir / "application.sh").read_text() == "I am a job script" + + +@pytest.mark.usefixtures("mock_access_token") +@pytest.mark.asyncio +async def test_process_supporting_files__with_write_submission_files_set_to_true( + tmp_path, + dummy_pending_job_submission_with_supporting_files_data, + tweak_settings, +): + """ + Test that the ``process_supporting_files()`` function can write submission files to disk. + + The files should be written to the submit_dir if WRITE_SUBMISSION_FILES is set to True. + """ + pending_job_submission = PendingJobSubmission(**dummy_pending_job_submission_with_supporting_files_data) + submit_dir = tmp_path / "submit" + submit_dir.mkdir() + + async with respx.mock: + download_support_route = respx.get(f"{SETTINGS.BASE_API_URL}/jobbergate/job-scripts/1/upload/input.txt") + download_support_route.mock( + return_value=httpx.Response( + status_code=200, + content="I am a supporting file".encode("utf-8"), + ), + ) + with tweak_settings(WRITE_SUBMISSION_FILES=True): + await process_supporting_files(pending_job_submission, submit_dir) + + assert (submit_dir / "input.txt").read_text() == "I am a supporting file" + assert download_support_route.call_count == 1 + + +@pytest.mark.asyncio +async def test_process_supporting_files__with_write_submission_files_set_to_false_and_supporting_files( + tmp_path, + dummy_pending_job_submission_with_supporting_files_data, + tweak_settings, +): + """ + Test that the ``process_supporting_files()`` function can reject a submission if there + are supporting files and WRITE_SUBMISSION_FILES is set to False. + """ + pending_job_submission = PendingJobSubmission(**dummy_pending_job_submission_with_supporting_files_data) + submit_dir = tmp_path / "submit" + submit_dir.mkdir() + + with tweak_settings(WRITE_SUBMISSION_FILES=False): + with pytest.raises( + JobSubmissionError, + match="Job submission rejected. The submission has supporting files that can't be downloaded to " + "the execution dir. Set `WRITE_SUBMISSION_FILES` setting to `True` to download the " + "job script files to the execution dir.", + ): + await process_supporting_files(pending_job_submission, submit_dir) + + +@pytest.mark.asyncio +async def test_process_supporting_files__with_write_submission_files_set_to_false_and_no_supporting_files( + tmp_path, + dummy_pending_job_submission_data, + tweak_settings, +): + """ + Test that the ``process_supporting_files()`` function can accept a submission if there + are no supporting files and WRITE_SUBMISSION_FILES is set to False. + """ + pending_job_submission = PendingJobSubmission(**dummy_pending_job_submission_data) + submit_dir = tmp_path / "submit" + submit_dir.mkdir() + + with tweak_settings(WRITE_SUBMISSION_FILES=False): + await process_supporting_files(pending_job_submission, submit_dir) + + +@pytest.mark.usefixtures("mock_access_token") +@pytest.mark.asyncio +async def test_get_job_script_file__success_with_write(tmp_path, dummy_pending_job_submission_data, tweak_settings): + """ + Test that the ``get_job_script_file()`` function can retrieve a job script file + from the backend, write the content to the submit dir, and return its content. + """ + pending_job_submission = PendingJobSubmission(**dummy_pending_job_submission_data) + submit_dir = tmp_path / "submit" + submit_dir.mkdir() + + async with respx.mock: + download_route = respx.get(f"{SETTINGS.BASE_API_URL}/jobbergate/job-scripts/1/upload/application.sh") + download_route.mock( + return_value=httpx.Response( + status_code=200, + content="I am a job script".encode("utf-8"), + ), + ) + + with tweak_settings(WRITE_SUBMISSION_FILES=True): + actual_content = await get_job_script_file(pending_job_submission, submit_dir) + + assert actual_content == "I am a job script" + assert (submit_dir / "application.sh").read_text() == "I am a job script" + assert download_route.call_count == 1 + last_request = download_route.calls.last.request + assert last_request.url == f"{SETTINGS.BASE_API_URL}/jobbergate/job-scripts/1/upload/application.sh" + + +@pytest.mark.usefixtures("mock_access_token") +@pytest.mark.asyncio +async def test_get_job_script_file__success_without_write( + tmp_path, + dummy_pending_job_submission_data, + tweak_settings, +): + """ + Test that the ``get_job_script_file()`` function can retrieve a job script file + from the backend and return its content. + """ + pending_job_submission = PendingJobSubmission(**dummy_pending_job_submission_data) + submit_dir = tmp_path / "submit" + submit_dir.mkdir() + + async with respx.mock: + download_route = respx.get(f"{SETTINGS.BASE_API_URL}/jobbergate/job-scripts/1/upload/application.sh") + download_route.mock( + return_value=httpx.Response( + status_code=200, + content="I am a job script".encode("utf-8"), + ), + ) + + with tweak_settings(WRITE_SUBMISSION_FILES=False): + actual_content = await get_job_script_file(pending_job_submission, submit_dir) + + assert actual_content == "I am a job script" + assert not (submit_dir / "application.sh").exists() + assert download_route.call_count == 1 + last_request = download_route.calls.last.request + assert last_request.url == f"{SETTINGS.BASE_API_URL}/jobbergate/job-scripts/1/upload/application.sh" + + @pytest.mark.asyncio @pytest.mark.usefixtures("mock_access_token") async def test_submit_job_script__success(dummy_pending_job_submission_data, dummy_template_source):