Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix unlinking old job out/err files. #6533

Merged
merged 5 commits into from
Jan 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 10 additions & 6 deletions cylc/flow/job_runner_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -373,14 +373,15 @@ def job_kill(self, st_file_path):

@classmethod
def _create_nn(cls, job_file_path):
"""Create NN symbolic link, if necessary.
"""Create NN symbolic link if necessary, and remove any old job logs.

If NN => 01, remove numbered dirs with submit numbers greater than 01.

If NN => 01, remove numbered directories with submit numbers greater
than 01.
Helper for "self._job_submit_impl".

"""
job_file_dir = os.path.dirname(job_file_path)

source = os.path.basename(job_file_dir)
task_log_dir = os.path.dirname(job_file_dir)
nn_path = os.path.join(task_log_dir, "NN")
Expand All @@ -393,6 +394,7 @@ def _create_nn(cls, job_file_path):
old_source = None
if old_source is None:
os.symlink(source, nn_path)

# On submit 1, remove any left over digit directories from prev runs
if source == "01":
for name in os.listdir(task_log_dir):
Expand All @@ -401,6 +403,11 @@ def _create_nn(cls, job_file_path):
rmtree(
os.path.join(task_log_dir, name), ignore_errors=True)

# Delete old job logs if necessary
for name in JOB_LOG_ERR, JOB_LOG_OUT:
with suppress(FileNotFoundError):
os.unlink(os.path.join(job_file_dir, name))

@classmethod
def _filter_submit_output(cls, st_file_path, job_runner, out, err):
"""Filter submit command output, if relevant."""
Expand Down Expand Up @@ -567,9 +574,6 @@ def _job_submit_impl(

# Create NN symbolic link, if necessary
self._create_nn(job_file_path)
for name in JOB_LOG_ERR, JOB_LOG_OUT:
with suppress(OSError):
os.unlink(os.path.join(job_file_path, name))

# Start new status file
with open(f"{job_file_path}.status", "w") as job_status_file:
Expand Down
51 changes: 51 additions & 0 deletions tests/integration/test_job_runner_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from cylc.flow.pathutil import get_workflow_run_job_dir
from cylc.flow.task_state import TASK_STATUS_RUNNING
from cylc.flow.subprocctx import SubProcContext
from cylc.flow.task_job_logs import JOB_LOG_OUT, JOB_LOG_ERR


async def test_kill_error(one, start, test_dir, capsys, log_filter):
Expand Down Expand Up @@ -82,3 +83,53 @@ async def test_kill_error(one, start, test_dir, capsys, log_filter):
level=logging.WARNING,
)
assert itask.state(TASK_STATUS_RUNNING)


async def test_create_nn_new(one, start):
"""Test _create_nn.

It should create the NN symlink.
"""
async with start(one):
itask = one.pool.get_tasks()[0]

workflow_job_log_dir = Path(get_workflow_run_job_dir(one.workflow))
job_id = itask.tokens.duplicate(job='01').relative_id
job_log_dir = Path(workflow_job_log_dir, job_id)
job_log_dir.mkdir(parents=True)

# call _create_nn
JobRunnerManager()._create_nn(job_log_dir / 'job')

# check the symlink exists
assert (job_log_dir.parent / "NN").is_symlink()


async def test_create_nn_old(one, start):
"""Test _create_nn.

It should remove existing job logs, if the dir already exists.
"""
async with start(one):
itask = one.pool.get_tasks()[0]

# fake some old job logs
workflow_job_log_dir = Path(get_workflow_run_job_dir(one.workflow))
job_id = itask.tokens.duplicate(job='01').relative_id
job_log_dir = Path(workflow_job_log_dir, job_id)
job_log_dir.mkdir(parents=True)

job_logs = []
for name in JOB_LOG_OUT, JOB_LOG_ERR:
job_logs.append(job_log_dir / name)

# create the logs
for job_log in job_logs:
job_log.touch()

# call _create_nn
JobRunnerManager()._create_nn(job_log_dir / 'job')

# check they were removed
for job_log in job_logs:
assert not job_log.is_file()
Loading