Skip to content

Commit

Permalink
[ASP-4245] Update agent download logic file (#419)
Browse files Browse the repository at this point in the history
* ASP-4245 Add env var to control download of files

* ASP-4245 Update submit to use new env var

* ASp-4245 Add fixture to test supporting files

* Add unit tests

* ASP-4245 Add entry to CHANGELOG

* ASP-4245 Refact logic for submitting job script

* ASP-4245 Rename env var to WRITE_SUBMISSION_FILES

* ASP-4245 Update unit tests

* ASP-4245 Update entry on CHANGELOG
  • Loading branch information
julianaklulo authored Nov 28, 2023
1 parent f660cb7 commit 38d6e09
Show file tree
Hide file tree
Showing 5 changed files with 347 additions and 19 deletions.
2 changes: 1 addition & 1 deletion jobbergate-agent/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
This file keeps track of all notable changes to jobbergate-core

## Unreleased

- Added setting to specify if the job script files should be written to the submit directory

## 4.2.0a1 -- 2023-11-13
- Added setting to control the timeout on `httpx` requests
Expand Down
110 changes: 92 additions & 18 deletions jobbergate-agent/jobbergate_agent/jobbergate/submit.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import asyncio
import json
from pathlib import Path
from typing import Any, Dict, cast

from buzz import handle_errors
Expand All @@ -10,6 +12,7 @@
from jobbergate_agent.jobbergate.api import SubmissionNotifier, fetch_pending_submissions, mark_as_submitted
from jobbergate_agent.jobbergate.constants import FileType, JobSubmissionStatus
from jobbergate_agent.jobbergate.schemas import (
JobScriptFile,
PendingJobSubmission,
SlurmJobParams,
SlurmJobSubmission,
Expand Down Expand Up @@ -45,6 +48,91 @@ def get_job_parameters(slurm_parameters: Dict[str, Any], **kwargs) -> SlurmJobPa
return SlurmJobParams.parse_obj(merged_parameters)


async def retrieve_submission_file(file: JobScriptFile) -> str:
"""
Get a submission file from the backend and return the decoded file content.
"""
response = await backend_client.get(file.path)
response.raise_for_status()

return response.content.decode("utf-8")


async def write_submission_file(file_content: str, filename: str, submit_dir: Path):
"""
Write a decoded file content to the submit_dir.
"""
local_script_path = submit_dir / filename
local_script_path.parent.mkdir(parents=True, exist_ok=True)
local_script_path.write_bytes(file_content.encode("utf-8"))

logger.debug(f"Copied file to {local_script_path}")


async def process_supporting_files(pending_job_submission: PendingJobSubmission, submit_dir: Path):
"""
Process the submission support files.
Write the support files to the submit_dir if WRITE_SUBMISSION_FILES is set to True.
Reject the submission if there are support files with WRITE_SUBMISSION_FILES set to False.
"""
supporting_files = [file for file in pending_job_submission.job_script.files if file.file_type == FileType.SUPPORT]

if SETTINGS.WRITE_SUBMISSION_FILES:
# Write the supporting submission support files to the submit dir
logger.debug(f"Writing supporting submission files to {submit_dir}")

# Retrieve the files from the backend
files_to_retrieve = [retrieve_submission_file(file) for file in supporting_files]
files_content = await asyncio.gather(*files_to_retrieve)

# Write the files to the submit dir
files_to_write = [
write_submission_file(file_content, file.filename, submit_dir)
for file_content, file in zip(files_content, supporting_files)
]
await asyncio.gather(*files_to_write)
else:
# Reject the submission if there are supporting files with WRITE_SUBMISSION_FILES set to False
logger.debug(f"Can't write files for submission {pending_job_submission.id}")

JobSubmissionError.require_condition(
not supporting_files,
"Job submission rejected. The submission has supporting files that can't be downloaded to "
"the execution dir. Set `WRITE_SUBMISSION_FILES` setting to `True` to download the "
"job script files to the execution dir.",
)


async def get_job_script_file(pending_job_submission: PendingJobSubmission, submit_dir: Path) -> str:
"""
Get the job script file from the backend.
Write the job script file to the submit_dir if WRITE_SUBMISSION_FILES is set to True.
"""
job_script_file = None

for file in pending_job_submission.job_script.files:
if file.file_type == FileType.ENTRYPOINT: # Should have only one entrypoint
job_script_file = file
break

JobSubmissionError.require_condition(
job_script_file,
"Could not find an executable script in retrieved job script data.",
)

# Make static type checkers happy
assert job_script_file is not None

job_script = await retrieve_submission_file(job_script_file)

if SETTINGS.WRITE_SUBMISSION_FILES:
await write_submission_file(job_script, job_script_file.filename, submit_dir)

return job_script


async def submit_job_script(
pending_job_submission: PendingJobSubmission,
user_mapper: SlurmUserMapper,
Expand Down Expand Up @@ -72,25 +160,11 @@ async def submit_job_script(

submit_dir = pending_job_submission.execution_directory or SETTINGS.DEFAULT_SLURM_WORK_DIR

job_script = None

for metadata in pending_job_submission.job_script.files:
local_script_path = submit_dir / metadata.filename
local_script_path.parent.mkdir(parents=True, exist_ok=True)

response = await backend_client.get(metadata.path)
response.raise_for_status()
local_script_path.write_bytes(response.content)
logger.debug(f"Processing submission files for job submission {pending_job_submission.id}")
await process_supporting_files(pending_job_submission, submit_dir)

if metadata.file_type == FileType.ENTRYPOINT:
job_script = response.content.decode("utf-8")

logger.debug(f"Copied job script file to {local_script_path}")

JobSubmissionError.require_condition(
job_script,
"Could not find an executable script in retrieved job script data.",
)
logger.debug(f"Fetching job script for job submission {pending_job_submission.id}")
job_script = await get_job_script_file(pending_job_submission, submit_dir)

async with handle_errors_async(
"Failed to extract Slurm parameters",
Expand Down
3 changes: 3 additions & 0 deletions jobbergate-agent/jobbergate_agent/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ class Settings(BaseSettings):
TASK_JOBS_INTERVAL_SECONDS: int = Field(60, ge=10, le=3600) # seconds
TASK_GARBAGE_COLLECTION_HOUR: Optional[int] = Field(None, ge=0, le=23) # hour of day

# Job submission settings
WRITE_SUBMISSION_FILES: bool = True

@root_validator
def compute_extra_settings(cls, values):
"""
Expand Down
36 changes: 36 additions & 0 deletions jobbergate-agent/tests/jobbergate/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ def dummy_template_source():

@pytest.fixture
def dummy_job_script_files():
"""
Provide a fixture that returns a list of dummy job script files.
"""
return [
{
"parent_id": 1,
Expand All @@ -35,6 +38,25 @@ def dummy_job_script_files():
]


@pytest.fixture
def dummy_job_script_with_supporting_files():
"""
Provide a fixture that returns a list of dummy job script files with supporting files.
"""
return [
{
"parent_id": 1,
"filename": "application.sh",
"file_type": "ENTRYPOINT",
},
{
"parent_id": 1,
"filename": "input.txt",
"file_type": "SUPPORT",
},
]


@pytest.fixture
def dummy_pending_job_submission_data(dummy_job_script_files, tmp_path):
"""
Expand All @@ -47,3 +69,17 @@ def dummy_pending_job_submission_data(dummy_job_script_files, tmp_path):
job_script={"files": dummy_job_script_files},
slurm_job_id=13,
)


@pytest.fixture
def dummy_pending_job_submission_with_supporting_files_data(dummy_job_script_with_supporting_files, tmp_path):
"""
Provide a fixture that returns a dict that is compatible with PendingJobSubmission and has supporting files.
"""
return dict(
id=1,
name="sub1",
owner_email="email1.@dummy.com",
job_script={"files": dummy_job_script_with_supporting_files},
slurm_job_id=13,
)
Loading

0 comments on commit 38d6e09

Please sign in to comment.