Skip to content

Commit

Permalink
Merge pull request #183 from jameshcorbett/kvs-workflows
Browse files Browse the repository at this point in the history
dws: saving workflows to KVS
  • Loading branch information
mergify[bot] authored Jul 17, 2024
2 parents 3b4ff91 + 619ee55 commit d1f5c17
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 23 deletions.
67 changes: 46 additions & 21 deletions src/modules/coral2_dws.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,11 +148,18 @@ def remove_finalizer(workflow_name, k8s_api, workflow):
)


def move_workflow_to_teardown(winfo, k8s_api, workflow=None):
def move_workflow_to_teardown(handle, winfo, k8s_api, workflow=None):
"""Helper function for moving a workflow to Teardown."""
if workflow is None:
workflow = k8s_api.get_namespaced_custom_object(*WORKFLOW_CRD, winfo.name)
LOGGER.debug("Moving workflow %s to Teardown, dump is %s", winfo.name, workflow)
try:
kvsdir = flux.job.job_kvs(handle, winfo.jobid)
kvsdir["rabbit_workflow"] = workflow
kvsdir.commit()
except Exception:
LOGGER.exception(
"Failed to update KVS for job %s: workflow is", winfo.jobid, workflow
)
try:
workflow["metadata"]["finalizers"].remove(_FINALIZER)
except ValueError:
Expand All @@ -166,7 +173,7 @@ def move_workflow_to_teardown(winfo, k8s_api, workflow=None):
},
)
winfo.toredown = True
if LOGGER.isEnabledFor(logging.DEBUG):
if LOGGER.isEnabledFor(logging.INFO):
try:
api_response = k8s_api.list_namespaced_custom_object(
*DATAMOVEMENT_CRD,
Expand All @@ -176,14 +183,14 @@ def move_workflow_to_teardown(winfo, k8s_api, workflow=None):
),
)
except Exception as exc:
LOGGER.debug(
LOGGER.info(
"Failed to fetch nnfdatamovement crds for workflow '%s': %s",
winfo.name,
exc,
)
else:
for crd in api_response["items"]:
LOGGER.debug(
LOGGER.info(
"Found nnfdatamovement crd for workflow '%s': %s", winfo.name, crd
)

Expand Down Expand Up @@ -270,7 +277,10 @@ def setup_cb(handle, _t, msg, k8s_api):
nodes_per_nnf[nnf_name] = nodes_per_nnf.get(nnf_name, 0) + 1
handle.rpc(
"job-manager.memo",
payload={"id": jobid, "memo": {"rabbits": Hostlist(nodes_per_nnf.keys()).encode()}},
payload={
"id": jobid,
"memo": {"rabbits": Hostlist(nodes_per_nnf.keys()).encode()},
},
).then(log_rpc_response)
k8s_api.patch_namespaced_custom_object(
COMPUTE_CRD.group,
Expand Down Expand Up @@ -322,7 +332,7 @@ def post_run_cb(handle, _t, msg, k8s_api):
if not run_started:
# the job hit an exception before beginning to run; transition
# the workflow immediately to 'teardown'
move_workflow_to_teardown(winfo, k8s_api)
move_workflow_to_teardown(handle, winfo, k8s_api)
else:
move_workflow_desiredstate(winfo.name, "PostRun", k8s_api)

Expand Down Expand Up @@ -368,7 +378,7 @@ def workflow_state_change_cb(event, handle, k8s_api, disable_fluxion):
jobid,
)
try:
move_workflow_to_teardown(winfo, k8s_api, workflow)
move_workflow_to_teardown(handle, winfo, k8s_api, workflow)
except ApiException:
LOGGER.exception(
"Failed to move workflow '%s' with jobid %s to 'teardown' "
Expand Down Expand Up @@ -460,10 +470,9 @@ def _workflow_state_change_cb_inner(
move_workflow_desiredstate(winfo.name, "DataOut", k8s_api)
elif state_complete(workflow, "DataOut"):
# move workflow to next stage, teardown
move_workflow_to_teardown(winfo, k8s_api, workflow)
move_workflow_to_teardown(handle, winfo, k8s_api, workflow)
if workflow["status"].get("status") == "Error":
# a fatal error has occurred in the workflows, raise a job exception
LOGGER.info("workflow '%s' hit an error: %s", winfo.name, workflow)
handle.job_raise(
jobid,
"exception",
Expand All @@ -476,10 +485,10 @@ def _workflow_state_change_cb_inner(
# workflow is in PostRun or DataOut, the exception won't affect the dws-epilog
# action holding the job, so the workflow should be moved to Teardown now.
if workflow["spec"]["desiredState"] in ("PostRun", "DataOut"):
move_workflow_to_teardown(winfo, k8s_api, workflow)
move_workflow_to_teardown(handle, winfo, k8s_api, workflow)
elif workflow["status"].get("status") == "TransientCondition":
# a potentially fatal error has occurred, but may resolve itself
LOGGER.warning(
LOGGER.info(
"Workflow '%s' has TransientCondition set, message is '%s', workflow is %s",
winfo.name,
workflow["status"].get("message", ""),
Expand Down Expand Up @@ -642,30 +651,41 @@ def init_rabbits(k8s_api, handle, watchers, args):
)


def kill_workflows_in_tc(_reactor, watcher, _r, tc_timeout):
def kill_workflows_in_tc(_reactor, watcher, _r, arg):
"""Callback firing every (tc_timeout / 2) seconds.
Raise exceptions on jobs stuck in TransientCondition for more than
tc_timeout seconds.
"""
tc_timeout, k8s_api = arg
curr_time = time.time()
# iterate over a copy of the set
# otherwise an exception occurs because we modify the set as we
# iterate over it.
for winfo in WORKFLOWS_IN_TC.copy():
if curr_time - winfo.transient_condition.last_time > tc_timeout:
LOGGER.info(
"Workflow '%s' was in TransientCondition too long: %s",
winfo.name,
winfo.transient_condition.workflow,
)
watcher.flux_handle.job_raise(
winfo.jobid,
"exception",
0,
"DWS/Rabbit interactions failed: workflow in 'TransientCondition' "
f"state too long: {winfo.transient_condition.last_message}",
)
# for most states, raising an exception should be enough to trigger other
# logic that eventually moves the workflow to Teardown. However, if the
# workflow is in PostRun or DataOut, the exception won't affect the
# dws-epilog action holding the job, so the workflow should be moved
# to Teardown now.
if winfo.transient_condition.workflow["spec"]["desiredState"] in (
"PostRun",
"DataOut",
):
move_workflow_to_teardown(
watcher.flux_handle,
winfo,
k8s_api,
winfo.transient_condition.workflow,
)
WORKFLOWS_IN_TC.discard(winfo)


Expand Down Expand Up @@ -736,9 +756,9 @@ def setup_parsing():
def config_logging(args):
"""Configure logging for the script."""
log_level = logging.WARNING
if args.verbose > 1:
if args.verbose > 0:
log_level = logging.INFO
if args.verbose > 2:
if args.verbose > 1:
log_level = logging.DEBUG
logging.basicConfig(
format="%(asctime)s - %(levelname)s - %(message)s",
Expand Down Expand Up @@ -793,11 +813,16 @@ def raise_self_exception(handle):
testing purposes.
Once https://github.com/flux-framework/flux-core/issues/3821 is
implemented/closed, this can be replaced with that solution.
Also, remove FLUX_KVS_NAMESPACE from the environment, because otherwise
KVS lookups will look relative to that namespace, changing the behavior
relative to when the script runs as a service.
"""
try:
jobid = id_parse(os.environ["FLUX_JOB_ID"])
except KeyError:
return
del os.environ["FLUX_KVS_NAMESPACE"]
Future(handle.job_raise(jobid, "exception", 7, "dws watchers setup")).get()


Expand Down Expand Up @@ -829,7 +854,7 @@ def main():
args.transient_condition_timeout / 2,
kill_workflows_in_tc,
repeat=args.transient_condition_timeout / 2,
args=args.transient_condition_timeout,
args=(args.transient_condition_timeout, k8s_api),
).start()
# start watching k8s workflow resources and operate on them when updates occur
# or new RPCs are received
Expand Down
14 changes: 12 additions & 2 deletions t/t1002-dws-workflow-obj.t
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,12 @@ test_expect_success 'job submission with valid DW string works' '
flux job wait-event -vt 30 -m description=${EPILOG_NAME} \
${jobid} epilog-finish &&
flux job wait-event -vt 15 ${jobid} clean &&
flux jobs -n ${jobid} -o "{user.rabbits}" | flux hostlist -q -
flux jobs -n ${jobid} -o "{user.rabbits}" | flux hostlist -q - &&
flux job info ${jobid} rabbit_workflow &&
flux job info ${jobid} rabbit_workflow | \
jq -e ".metadata.name == \"fluxjob-$(flux job id ${jobid})\"" &&
flux job info ${jobid} rabbit_workflow | jq -e ".spec.wlmID == \"flux\"" &&
flux job info ${jobid} rabbit_workflow | jq -e ".kind == \"Workflow\""
'

test_expect_success 'job requesting copy-offload in DW string works' '
Expand All @@ -172,7 +177,12 @@ test_expect_success 'job requesting copy-offload in DW string works' '
${jobid} epilog-start &&
flux job wait-event -vt 30 -m description=${EPILOG_NAME} \
${jobid} epilog-finish &&
flux job wait-event -vt 15 ${jobid} clean
flux job wait-event -vt 15 ${jobid} clean &&
flux job info ${jobid} rabbit_workflow &&
flux job info ${jobid} rabbit_workflow | \
jq -e ".metadata.name == \"fluxjob-$(flux job id ${jobid})\"" &&
flux job info ${jobid} rabbit_workflow | jq -e ".spec.wlmID == \"flux\"" &&
flux job info ${jobid} rabbit_workflow | jq -e ".kind == \"Workflow\""
'

test_expect_success 'job requesting too much storage is rejected' '
Expand Down

0 comments on commit d1f5c17

Please sign in to comment.