Skip to content

Commit

Permalink
Merge pull request #252 from jameshcorbett/add-requires-hostlist-for-…
Browse files Browse the repository at this point in the history
…down-rabbits

dws: add negative requires hostlist for down rabbits
  • Loading branch information
mergify[bot] authored Jan 17, 2025
2 parents 33771d1 + 7509ba4 commit e15e1da
Show file tree
Hide file tree
Showing 4 changed files with 185 additions and 22 deletions.
87 changes: 79 additions & 8 deletions src/job-manager/plugins/dws-jobtap.c
Original file line number Diff line number Diff line change
Expand Up @@ -556,44 +556,115 @@ static int exception_cb (flux_plugin_t *p,
return 0;
}

/*
* Generate a new jobspec constraints object for a job so that it can avoid
* attempting to run on nodes attached to down rabbits.
*/
static json_t *generate_constraints(flux_t *h, flux_plugin_t *p, flux_jobid_t jobid, const char *exclude_str){
flux_plugin_arg_t *args = flux_jobtap_job_lookup (p, jobid);
json_t *constraints = NULL;
json_t *not;
if (!args
|| flux_plugin_arg_unpack (args,
FLUX_PLUGIN_ARG_IN,
"{s:{s:{s:{s?o}}}}",
"jobspec",
"attributes",
"system",
"constraints",
&constraints)
< 0) {
flux_log_error (h, "Failed to unpack args");
flux_plugin_arg_destroy (args);
return NULL;
}
if (!constraints) {
if (!(constraints = json_pack ("{s:[{s:[s]}]}", "not", "hostlist", exclude_str))){
flux_log_error (h, "Failed to create new constraints object");
flux_plugin_arg_destroy (args);
return NULL;
}
flux_plugin_arg_destroy (args);
return constraints;
}
else { // deep copy the constraints because we don't want to modify it in-place
if (!(constraints = json_deep_copy (constraints))) {
flux_log_error (h, "Failed to deep copy constraints object");
flux_plugin_arg_destroy (args);
return NULL;
}
}
flux_plugin_arg_destroy (args);
if (!(not = json_object_get (constraints, "not"))) {
if (json_object_set_new (constraints, "not", json_pack ("[{s:[s]}]", "hostlist", exclude_str)) < 0) {
flux_log_error (h, "Failed to create new NOT constraints object");
json_decref (constraints);
return NULL;
}
return constraints;
}
if (json_array_append_new (not, json_pack ("{s:[s]}", "hostlist", exclude_str)) < 0) {
flux_log_error (h, "Failed to create new NOT constraints object");
json_decref (constraints);
return NULL;
}
return constraints;
}

static void resource_update_msg_cb (flux_t *h,
flux_msg_handler_t *mh,
const flux_msg_t *msg,
void *arg)
{
flux_plugin_t *p = (flux_plugin_t *)arg;
json_int_t jobid;
json_t *resources = NULL, *errmsg;
json_t *resources = NULL, *errmsg, *constraints = NULL;
int copy_offload;
const char *errmsg_str;
const char *errmsg_str, *exclude_str;

if (flux_msg_unpack (msg,
"{s:I, s:o, s:b, s:o}",
"{s:I, s:o, s:b, s:o, s:s}",
"id", &jobid,
"resources", &resources,
"copy-offload", &copy_offload,
"errmsg", &errmsg)
"errmsg", &errmsg,
"exclude", &exclude_str)
< 0) {
flux_log_error (h, "received malformed dws.resource-update RPC");
return;
}
if (strlen(exclude_str) > 0) {
if (!(constraints = generate_constraints (h, p, jobid, exclude_str))) {
flux_log_error (h, "Could not generate exclusion hostlist");
raise_job_exception (p, jobid, "dws", "Could not generate exclusion hostlist");
return;
}
}
if (!json_is_null (errmsg)) {
if (!(errmsg_str = json_string_value (errmsg))){
flux_log_error (h, "received malformed dws.resource-update RPC, errmsg must be string or JSON null");
errmsg_str = "<could not fetch error message>";
}
raise_job_exception (p, jobid, "dws", errmsg_str);
json_decref (constraints);
return;
}
else if (flux_jobtap_jobspec_update_id_pack (p,
else if (flux_jobtap_job_aux_set (p,
jobid,
"flux::dws-copy-offload",
copy_offload ? (void*) 1 : (void*) 0,
NULL) < 0
|| flux_jobtap_jobspec_update_id_pack (p,
(flux_jobid_t) jobid,
"{s: O}", "resources",
resources) < 0
|| flux_jobtap_job_aux_set (p, jobid, "flux::dws-copy-offload", copy_offload ? (void*) 1 : (void*) 0, NULL) < 0 ) {
"{s:O, s:o*}",
"resources", resources,
"attributes.system.constraints", constraints) < 0) {
flux_log_error (h, "could not update jobspec with new constraints and resources");
raise_job_exception (p,
jobid,
"dws",
"Internal error: failed to update jobspec");
json_decref (constraints);
return;
}
if (flux_jobtap_dependency_remove (p, jobid, CREATE_DEP_NAME) < 0) {
Expand Down
36 changes: 23 additions & 13 deletions src/modules/coral2_dws.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
WORKFLOW_NAME_PREFIX = "fluxjob-"
WORKFLOW_NAME_FORMAT = WORKFLOW_NAME_PREFIX + "{jobid}"
_MIN_ALLOCATION_SIZE = 4 # minimum rabbit allocation size
_EXCLUDE_HOSTS = Hostlist()

_EXITCODE_NORESTART = 3 # exit code indicating to systemd not to restart

Expand Down Expand Up @@ -500,6 +501,7 @@ def _workflow_state_change_cb_inner(workflow, winfo, handle, k8s_api, disable_fl
"resources": resources,
"copy-offload": copy_offload,
"errmsg": errmsg,
"exclude": _EXCLUDE_HOSTS.encode(),
},
).then(log_rpc_response)
save_workflow_to_kvs(handle, jobid, workflow)
Expand Down Expand Up @@ -597,14 +599,21 @@ def drain_offline_nodes(handle, rabbit_name, nodelist, allowlist):
).then(log_rpc_response)


def mark_rabbit(handle, status, resource_path, ssdcount, name):
def mark_rabbit(handle, status, resource_path, ssdcount, name, disable_fluxion):
"""Send an RPC to mark a rabbit as up or down."""
if status == "Ready":
LOGGER.debug("Marking rabbit %s as up", name)
status = "up"
if disable_fluxion:
_EXCLUDE_HOSTS.delete(_RABBITS_TO_HOSTLISTS[name])
return
else:
LOGGER.debug("Marking rabbit %s as down, status is %s", name, status)
status = "down"
if disable_fluxion:
_EXCLUDE_HOSTS.append(_RABBITS_TO_HOSTLISTS[name])
_EXCLUDE_HOSTS.uniq()
return
for ssdnum in range(ssdcount):
payload = {"resource_path": resource_path + f"/ssd{ssdnum}", "status": status}
handle.rpc("sched-fluxion-resource.set_status", payload).then(log_rpc_response)
Expand All @@ -622,14 +631,13 @@ def rabbit_state_change_cb(event, handle, rabbit_rpaths, disable_fluxion, allowl
"Encountered an unknown Storage object '%s' in the event stream", name
)
return
if not disable_fluxion:
try:
status = rabbit["status"]["status"]
except KeyError:
# if rabbit doesn't have a status, consider it down
mark_rabbit(handle, "Down", *rabbit_rpaths[name], name)
else:
mark_rabbit(handle, status, *rabbit_rpaths[name], name)
try:
status = rabbit["status"]["status"]
except KeyError:
# if rabbit doesn't have a status, consider it down
mark_rabbit(handle, "Down", *rabbit_rpaths[name], name, disable_fluxion)
else:
mark_rabbit(handle, status, *rabbit_rpaths[name], name, disable_fluxion)
try:
computes = rabbit["status"]["access"]["computes"]
except KeyError:
Expand Down Expand Up @@ -694,8 +702,8 @@ def init_rabbits(k8s_api, handle, watchers, disable_fluxion, drain_queues):
)
if disable_fluxion:
# don't mark the rabbit up or down but add the rabbit to the mapping
rabbit_rpaths[name] = None
elif name not in rabbit_rpaths:
rabbit_rpaths[name] = (None, None)
if name not in rabbit_rpaths:
LOGGER.error(
"Encountered an unknown Storage object '%s' in the event stream", name
)
Expand All @@ -704,9 +712,11 @@ def init_rabbits(k8s_api, handle, watchers, disable_fluxion, drain_queues):
rabbit_status = rabbit["status"]["status"]
except KeyError:
# if rabbit doesn't have a status, consider it down
mark_rabbit(handle, "Down", *rabbit_rpaths[name], name)
mark_rabbit(handle, "Down", *rabbit_rpaths[name], name, disable_fluxion)
else:
mark_rabbit(handle, rabbit_status, *rabbit_rpaths[name], name)
mark_rabbit(
handle, rabbit_status, *rabbit_rpaths[name], name, disable_fluxion
)
# rabbits don't have a 'status' field until they boot
try:
computes = rabbit["status"]["access"]["computes"]
Expand Down
1 change: 1 addition & 0 deletions t/dws-dependencies/coral2_dws.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ def create_cb(fh, t, msg, arg):
"resources": msg.payload["resources"],
"copy-offload": False,
"errmsg": None,
"exclude": "",
},
)

Expand Down
83 changes: 82 additions & 1 deletion t/t1003-dws-nnf-watch.t
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ flux setattr log-stderr-level 1
DATA_DIR=${SHARNESS_TEST_SRCDIR}/data/nnf-watch/
DWS_MODULE_PATH=${FLUX_SOURCE_DIR}/src/modules/coral2_dws.py
RPC=${FLUX_BUILD_DIR}/t/util/rpc
PLUGINPATH=${FLUX_BUILD_DIR}/src/job-manager/plugins/.libs

if test_have_prereq NO_DWS_K8S; then
skip_all='skipping DWS workflow tests due to no DWS K8s'
Expand Down Expand Up @@ -209,8 +210,88 @@ test_expect_success 'return the storage resource to Live mode' '
kubectl get storages kind-worker2 -ojson | jq -e ".status.access.computes[0].status == \"Ready\""
'

test_expect_success 'exec Storage watching script with --disable-fluxion' '
flux cancel ${jobid} &&
flux resource undrain compute-01 &&
echo "
[rabbit]
drain_compute_nodes = false
" | flux config load &&
flux jobtap load ${PLUGINPATH}/dws-jobtap.so &&
jobid=$(flux submit \
--setattr=system.alloc-bypass.R="$(flux R encode -r0)" --output=dws5.out \
--error=dws5.err -o per-resource.type=node flux python ${DWS_MODULE_PATH} \
-vvv --disable-fluxion) &&
flux job wait-event -vt 15 -p guest.exec.eventlog ${jobid} shell.start &&
flux job wait-event -vt 15 -m "note=dws watchers setup" ${jobid} exception &&
${RPC} "dws.watch_test"
'

test_expect_success 'Storages are up and rabbit jobs can run' '
kubectl get storages kind-worker2 -ojson | jq -e ".spec.state == \"Enabled\"" &&
kubectl get storages kind-worker2 -ojson | jq -e ".status.status == \"Ready\"" &&
kubectl get storages kind-worker2 -ojson | jq -e ".status.access.computes[0].status == \"Ready\"" &&
kubectl get storages kind-worker3 -ojson | jq -e ".spec.state == \"Enabled\"" &&
kubectl get storages kind-worker3 -ojson | jq -e ".status.status == \"Ready\"" &&
JOBID=$(flux submit --setattr=system.dw="#DW jobdw capacity=10GiB type=xfs \
name=project1" -N1 -n1 hostname) &&
flux job wait-event -vt 10 ${JOBID} jobspec-update &&
flux job wait-event -vt 10 ${JOBID} alloc &&
flux job wait-event -vt 10 -m status=0 ${JOBID} finish &&
flux job wait-event -vt 20 ${JOBID} clean &&
flux job attach $JOBID
'

test_expect_success 'update to the Storage status is caught by the watch' '
kubectl patch storages kind-worker2 \
--type merge --patch-file ${DATA_DIR}/down.yaml &&
kubectl get storages kind-worker2 -ojson | jq -e ".spec.state == \"Disabled\"" &&
sleep 0.2 &&
kubectl get storages kind-worker2 -ojson | jq -e ".status.status == \"Disabled\"" &&
kubectl patch storages kind-worker3 \
--type merge --patch-file ${DATA_DIR}/down.yaml &&
kubectl get storages kind-worker3 -ojson | jq -e ".spec.state == \"Disabled\"" &&
sleep 0.2 &&
kubectl get storages kind-worker3 -ojson | jq -e ".status.status == \"Disabled\"" &&
sleep 3
'

test_expect_success 'rabbits now marked as down are not allocated' '
JOBID=$(flux submit --setattr=system.dw="#DW jobdw capacity=10GiB type=xfs \
name=project1" -N1 -n1 hostname) &&
flux job wait-event -vt 10 ${JOBID} jobspec-update &&
test_must_fail flux job wait-event -vt 3 ${JOBID} alloc &&
flux job wait-event -vt 1 ${JOBID} exception &&
flux job wait-event -vt 2 ${JOBID} clean
'

test_expect_success 'revert the changes to the Storage' '
kubectl patch storages kind-worker2 \
--type merge --patch-file ${DATA_DIR}/up.yaml &&
kubectl get storages kind-worker2 -ojson | jq -e ".spec.state == \"Enabled\"" &&
sleep 0.2 &&
kubectl get storages kind-worker2 -ojson | jq -e ".status.status == \"Ready\"" &&
kubectl patch storages kind-worker3 \
--type merge --patch-file ${DATA_DIR}/up.yaml &&
kubectl get storages kind-worker3 -ojson | jq -e ".spec.state == \"Enabled\"" &&
sleep 0.2 &&
kubectl get storages kind-worker3 -ojson | jq -e ".status.status == \"Ready\"" &&
sleep 1
'

test_expect_success 'rabbits now marked as up and can be allocated' '
JOBID=$(flux submit --setattr=system.dw="#DW jobdw capacity=10GiB type=xfs \
name=project1" -N1 -n1 hostname) &&
flux jobs && flux resource list &&
flux job wait-event -vt 10 ${JOBID} jobspec-update &&
flux job wait-event -vt 5 ${JOBID} alloc &&
flux job wait-event -vt 25 -m status=0 ${JOBID} finish
flux job wait-event -vt 20 ${JOBID} clean
'

test_expect_success 'unload fluxion' '
flux cancel ${jobid}; flux module remove sched-fluxion-qmanager &&
flux cancel ${jobid}; flux job wait-event -vt 1 ${jobid} clean &&
flux module remove sched-fluxion-qmanager &&
flux module remove sched-fluxion-resource
'

Expand Down

0 comments on commit e15e1da

Please sign in to comment.