Skip to content

Commit

Permalink
test changing task mode with broadcast
Browse files Browse the repository at this point in the history
broaden the definition of task-mode

Save workflow states to the runtime config object.
  • Loading branch information
wxtim committed Jan 16, 2025
1 parent 6f3a94b commit 9542506
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 10 deletions.
8 changes: 8 additions & 0 deletions cylc/flow/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
is_relative_to,
)
from cylc.flow.task_qualifiers import ALT_QUALIFIERS
from cylc.flow.run_modes import WORKFLOW_ONLY_MODES
from cylc.flow.run_modes.simulation import configure_sim_mode
from cylc.flow.run_modes.skip import skip_mode_validate
from cylc.flow.subprocctx import SubFuncContext
Expand Down Expand Up @@ -2448,6 +2449,13 @@ def _get_taskdef(self, name: str) -> TaskDef:

try:
rtcfg = self.cfg['runtime'][name]

# If the workflow mode is simulation or dummy always
# override the task config:
workflow_run_mode = RunMode.get(self.options)
if workflow_run_mode.value in WORKFLOW_ONLY_MODES:
rtcfg['run mode'] = workflow_run_mode.value

except KeyError:
raise WorkflowConfigError("Task not defined: %s" % name) from None
# We may want to put in some handling for cases of changing the
Expand Down
5 changes: 3 additions & 2 deletions cylc/flow/network/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@
)
from cylc.flow.id import Tokens
from cylc.flow.run_modes import (
TASK_CONFIG_RUN_MODES, WORKFLOW_RUN_MODES, RunMode)
WORKFLOW_RUN_MODES, RunMode)
from cylc.flow.task_outputs import SORT_ORDERS
from cylc.flow.task_state import (
TASK_STATUS_DESC,
Expand Down Expand Up @@ -633,7 +633,8 @@ class Meta:
# The run mode for the task.
TaskRunMode = graphene.Enum(
'TaskRunMode',
[(m.capitalize(), m) for m in TASK_CONFIG_RUN_MODES],
[(k.capitalize(), k.lower()) for k in RunMode.__members__.keys()],
# [(m.capitalize(), m) for m in TASK_CONFIG_RUN_MODES],
description=lambda x: RunMode(x.value).describe() if x else None,
)

Expand Down
75 changes: 67 additions & 8 deletions tests/integration/scripts/test_show.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import json
import pytest
import re
from types import SimpleNamespace

from colorama import init as colour_init
Expand All @@ -26,6 +27,9 @@
)


RE_STATE = re.compile('state:.*')


@pytest.fixture(scope='module')
def mod_my_conf():
"""A workflow configuration with some workflow metadata."""
Expand Down Expand Up @@ -171,9 +175,9 @@ async def test_task_instance_query(
'scheduling': {
'graph': {'R1': 'zed & dog & cat & ant'},
},
}
},
),
paused_start=False
paused_start=False,
)
async with start(schd):
await schd.update_data_structure()
Expand All @@ -196,23 +200,32 @@ async def test_task_instance_query(
]


@pytest.mark.parametrize(
'workflow_run_mode, run_mode_info',
(
('live', 'Skip'),
('dummy', 'Dummy'),
('simulation', 'Simulation'),
)
)
@pytest.mark.parametrize(
'attributes_bool, flow_nums, expected_state, expected_flows',
[
pytest.param(
False, [1], 'state: waiting (skip)', None,
False, [1], 'state: waiting (run mode={})', None,
),
pytest.param(
True,
[1, 2],
'state: waiting (held,queued,runahead,skip)',
'state: waiting (held,queued,runahead,run mode={})',
'flows: [1,2]',
)
]
)
async def test_task_instance_state_flows(
flow, scheduler, start, capsys,
attributes_bool, flow_nums, expected_state, expected_flows
workflow_run_mode, run_mode_info,
attributes_bool, flow_nums, expected_state, expected_flows
):
"""It should print task instance state, attributes, and flows."""

Expand All @@ -232,9 +245,10 @@ async def test_task_instance_state_flows(
'runtime': {
'a': {'run mode': 'skip'}
}
}
},
),
paused_start=True
paused_start=True,
run_mode=workflow_run_mode,
)
async with start(schd):

Expand Down Expand Up @@ -264,7 +278,7 @@ async def test_task_instance_state_flows(
line for line in out.splitlines()
if line.startswith("state:")
] == [
expected_state,
expected_state.format(run_mode_info),
]
if expected_flows is not None:
assert [
Expand All @@ -273,3 +287,48 @@ async def test_task_instance_state_flows(
] == [
expected_flows,
]


async def test_mode_changes(flow, scheduler, start, capsys):
"""Broadcasting a change of run mode changes run mode shown by cylc show.
"""
opts = SimpleNamespace(
comms_timeout=5,
json=False,
task_defs=None,
list_prereqs=False,
)
schd = scheduler(
flow({'scheduling': {'graph': {'R1': 'a'}}}),
paused_start=True,
run_mode='live'
)

async with start(schd):
# Control: No mode set, the Run Mode setting is not shown:
await schd.update_data_structure()
ret = await show(
schd.workflow,
[Tokens('//1/a')],
opts,
)
assert ret == 0
out, _ = capsys.readouterr()
state, = RE_STATE.findall(out)
assert 'waiting' in state

# Broadcast change task to skip mode:
schd.broadcast_mgr.put_broadcast(['1'], ['a'], [{'run mode': 'skip'}])
await schd.update_data_structure()

# show now shows skip mode:
ret = await show(
schd.workflow,
[Tokens('//1/a')],
opts,
)
assert ret == 0

out, _ = capsys.readouterr()
state, = RE_STATE.findall(out)
assert 'run mode=Skip' in state

0 comments on commit 9542506

Please sign in to comment.