diff --git a/jobbergate-agent/CHANGELOG.md b/jobbergate-agent/CHANGELOG.md index cd37345c..7480f778 100644 --- a/jobbergate-agent/CHANGELOG.md +++ b/jobbergate-agent/CHANGELOG.md @@ -4,6 +4,7 @@ This file keeps track of all notable changes to jobbergate-agent ## Unreleased - Fixed issue when downloading job script files for job submission with large names +- Cache slurm submissions to avoid the resubmission of the same job if the job status update fails [PENG-2342] ## 5.3.0a5 -- 2024-08-30 ## 5.3.0a4 -- 2024-08-23 diff --git a/jobbergate-agent/jobbergate_agent/jobbergate/submit.py b/jobbergate-agent/jobbergate_agent/jobbergate/submit.py index db37f8b0..296f7f31 100644 --- a/jobbergate-agent/jobbergate_agent/jobbergate/submit.py +++ b/jobbergate-agent/jobbergate_agent/jobbergate/submit.py @@ -281,9 +281,17 @@ async def submit_pending_jobs() -> None: do_else=lambda: logger.debug(f"Finished submitting pending job_submission {pending_job_submission.id}"), re_raise=False, ): - slurm_job_id = await submit_job_script(pending_job_submission, user_mapper) + cache_file = SETTINGS.CACHE_DIR / f"{pending_job_submission.id}.slurm_job_id" + if cache_file.exists(): + logger.debug(f"Found cache file for job submission {pending_job_submission.id}") + slurm_job_id = int(cache_file.read_text()) + else: + slurm_job_id = await submit_job_script(pending_job_submission, user_mapper) + cache_file.write_text(str(slurm_job_id)) + slurm_job_data: SlurmJobData = await fetch_job_data(slurm_job_id, info_handler) await mark_as_submitted(pending_job_submission.id, slurm_job_id, slurm_job_data) + cache_file.unlink(missing_ok=True) logger.debug("...Finished submitting pending jobs") diff --git a/jobbergate-agent/tests/jobbergate/test_submit.py b/jobbergate-agent/tests/jobbergate/test_submit.py index 808d178f..103a9fba 100644 --- a/jobbergate-agent/tests/jobbergate/test_submit.py +++ b/jobbergate-agent/tests/jobbergate/test_submit.py @@ -637,6 +637,8 @@ async def test_submit_job_script__raises_exception_if_sbatch_fails( @pytest.mark.asyncio @pytest.mark.usefixtures("mock_access_token") async def test_submit_pending_jobs( + tweak_settings, + tmp_path, dummy_job_script_files, dummy_template_source, mocker, @@ -666,6 +668,12 @@ async def test_submit_pending_jobs( owner_email="email3@dummy.com", job_script={"files": dummy_job_script_files}, ), + PendingJobSubmission( + id=4, + name="sub4", + owner_email="email4@dummy.com", + job_script={"files": dummy_job_script_files}, + ), ] mocker.patch( @@ -678,7 +686,7 @@ def _mocked_submit_job_script(pending_job_submission: PendingJobSubmission, user raise Exception("BOOM!") return pending_job_submission.id * 11 - def _mocked_mark_as_submitted(job_submission_id: int, slurm_job_id: int): + def _mocked_mark_as_submitted(job_submission_id: int, slurm_job_id: int, slurm_job_data: SlurmJobData): if job_submission_id == 2: raise Exception("BANG!") @@ -701,7 +709,10 @@ def _mocked_mark_as_submitted(job_submission_id: int, slurm_job_id: int): test_mapper = manufacture() - await submit_pending_jobs() + with tweak_settings(CACHE_DIR=tmp_path): + cached_submissions = {sub.id: tmp_path / f"{sub.id}.slurm_job_id" for sub in pending_submissions} + cached_submissions[4].write_text("44") + await submit_pending_jobs() mock_submit.assert_has_calls( [ @@ -716,6 +727,13 @@ def _mocked_mark_as_submitted(job_submission_id: int, slurm_job_id: int): [ mocker.call(1, 11, SlurmJobData(job_state="RUNNING", job_info="{}")), mocker.call(2, 22, SlurmJobData(job_state="RUNNING", job_info="{}")), + mocker.call(4, 44, SlurmJobData(job_state="RUNNING", job_info="{}")), ] ) - assert mock_mark.call_count == 2 + assert mock_mark.call_count == 3 + + assert not cached_submissions[1].exists() + assert cached_submissions[2].exists() + assert cached_submissions[2].read_text() == "22" + assert not cached_submissions[3].exists() + assert not cached_submissions[4].exists()