From 9762484382b861d6dc8ef24265c797813a83c505 Mon Sep 17 00:00:00 2001 From: James Corbett Date: Tue, 16 Jul 2024 16:34:28 -0700 Subject: [PATCH 1/7] dws: remove FLUX_KVS_NAMESPACE from environment Problem: in tests, coral2_dws.py runs as a Flux job, meaning FLUX_KVS_NAMESPACE is set. However, this makes KVS lookups look relative to that namespace, which is undesirable. Lookups would behave differently in production when coral2_dws runs as a systemd service. Remove FLUX_KVS_NAMESPACE from the environment. --- src/modules/coral2_dws.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/modules/coral2_dws.py b/src/modules/coral2_dws.py index 7b0310a..61c2837 100755 --- a/src/modules/coral2_dws.py +++ b/src/modules/coral2_dws.py @@ -793,11 +793,16 @@ def raise_self_exception(handle): testing purposes. Once https://github.com/flux-framework/flux-core/issues/3821 is implemented/closed, this can be replaced with that solution. + + Also, remove FLUX_KVS_NAMESPACE from the environment, because otherwise + KVS lookups will look relative to that namespace, changing the behavior + relative to when the script runs as a service. """ try: jobid = id_parse(os.environ["FLUX_JOB_ID"]) except KeyError: return + del os.environ["FLUX_KVS_NAMESPACE"] Future(handle.job_raise(jobid, "exception", 7, "dws watchers setup")).get() From 17b140318f043ca37c7bfdafdde7b8de82f99eab Mon Sep 17 00:00:00 2001 From: James Corbett Date: Tue, 16 Jul 2024 16:37:45 -0700 Subject: [PATCH 2/7] dws: write workflows to jobs' kvs Problem: HPE and others working with k8s workflows have been frustrated that k8s resources disappear shortly after their associated flux job moves to cleanup, and have requested that Flux save some of the resources. Save workflow resources to jobs' KVS. More resources may be needed in the future, but start small to reduce KVS pressure. Fix some whitespace/style errors in the same file. --- src/modules/coral2_dws.py | 24 +++++++++++++++++------- 1 file changed, 17 insertions(+), 7 deletions(-) diff --git a/src/modules/coral2_dws.py b/src/modules/coral2_dws.py index 61c2837..7e1a6c6 100755 --- a/src/modules/coral2_dws.py +++ b/src/modules/coral2_dws.py @@ -148,11 +148,18 @@ def remove_finalizer(workflow_name, k8s_api, workflow): ) -def move_workflow_to_teardown(winfo, k8s_api, workflow=None): +def move_workflow_to_teardown(handle, winfo, k8s_api, workflow=None): """Helper function for moving a workflow to Teardown.""" if workflow is None: workflow = k8s_api.get_namespaced_custom_object(*WORKFLOW_CRD, winfo.name) - LOGGER.debug("Moving workflow %s to Teardown, dump is %s", winfo.name, workflow) + try: + kvsdir = flux.job.job_kvs(handle, winfo.jobid) + kvsdir["rabbit_workflow"] = workflow + kvsdir.commit() + except Exception: + LOGGER.exception( + "Failed to update KVS for job %s: workflow is", winfo.jobid, workflow + ) try: workflow["metadata"]["finalizers"].remove(_FINALIZER) except ValueError: @@ -270,7 +277,10 @@ def setup_cb(handle, _t, msg, k8s_api): nodes_per_nnf[nnf_name] = nodes_per_nnf.get(nnf_name, 0) + 1 handle.rpc( "job-manager.memo", - payload={"id": jobid, "memo": {"rabbits": Hostlist(nodes_per_nnf.keys()).encode()}}, + payload={ + "id": jobid, + "memo": {"rabbits": Hostlist(nodes_per_nnf.keys()).encode()}, + }, ).then(log_rpc_response) k8s_api.patch_namespaced_custom_object( COMPUTE_CRD.group, @@ -322,7 +332,7 @@ def post_run_cb(handle, _t, msg, k8s_api): if not run_started: # the job hit an exception before beginning to run; transition # the workflow immediately to 'teardown' - move_workflow_to_teardown(winfo, k8s_api) + move_workflow_to_teardown(handle, winfo, k8s_api) else: move_workflow_desiredstate(winfo.name, "PostRun", k8s_api) @@ -368,7 +378,7 @@ def workflow_state_change_cb(event, handle, k8s_api, disable_fluxion): jobid, ) try: - move_workflow_to_teardown(winfo, k8s_api, workflow) + move_workflow_to_teardown(handle, winfo, k8s_api, workflow) except ApiException: LOGGER.exception( "Failed to move workflow '%s' with jobid %s to 'teardown' " @@ -460,7 +470,7 @@ def _workflow_state_change_cb_inner( move_workflow_desiredstate(winfo.name, "DataOut", k8s_api) elif state_complete(workflow, "DataOut"): # move workflow to next stage, teardown - move_workflow_to_teardown(winfo, k8s_api, workflow) + move_workflow_to_teardown(handle, winfo, k8s_api, workflow) if workflow["status"].get("status") == "Error": # a fatal error has occurred in the workflows, raise a job exception LOGGER.info("workflow '%s' hit an error: %s", winfo.name, workflow) @@ -476,7 +486,7 @@ def _workflow_state_change_cb_inner( # workflow is in PostRun or DataOut, the exception won't affect the dws-epilog # action holding the job, so the workflow should be moved to Teardown now. if workflow["spec"]["desiredState"] in ("PostRun", "DataOut"): - move_workflow_to_teardown(winfo, k8s_api, workflow) + move_workflow_to_teardown(handle, winfo, k8s_api, workflow) elif workflow["status"].get("status") == "TransientCondition": # a potentially fatal error has occurred, but may resolve itself LOGGER.warning( From 130da176953f47e17560984c983ee4e1cf5ccebb Mon Sep 17 00:00:00 2001 From: James Corbett Date: Tue, 16 Jul 2024 16:46:35 -0700 Subject: [PATCH 3/7] test: add workflow job-kvs tests Problem: there are no tests to ensure that workflows are written out to jobs' kvs properly. Add tests. --- t/t1002-dws-workflow-obj.t | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/t/t1002-dws-workflow-obj.t b/t/t1002-dws-workflow-obj.t index f449422..674df07 100755 --- a/t/t1002-dws-workflow-obj.t +++ b/t/t1002-dws-workflow-obj.t @@ -145,7 +145,12 @@ test_expect_success 'job submission with valid DW string works' ' flux job wait-event -vt 30 -m description=${EPILOG_NAME} \ ${jobid} epilog-finish && flux job wait-event -vt 15 ${jobid} clean && - flux jobs -n ${jobid} -o "{user.rabbits}" | flux hostlist -q - + flux jobs -n ${jobid} -o "{user.rabbits}" | flux hostlist -q - && + flux job info ${jobid} rabbit_workflow && + flux job info ${jobid} rabbit_workflow | \ + jq -e ".metadata.name == \"fluxjob-$(flux job id ${jobid})\"" && + flux job info ${jobid} rabbit_workflow | jq -e ".spec.wlmID == \"flux\"" && + flux job info ${jobid} rabbit_workflow | jq -e ".kind == \"Workflow\"" ' test_expect_success 'job requesting copy-offload in DW string works' ' @@ -172,7 +177,12 @@ test_expect_success 'job requesting copy-offload in DW string works' ' ${jobid} epilog-start && flux job wait-event -vt 30 -m description=${EPILOG_NAME} \ ${jobid} epilog-finish && - flux job wait-event -vt 15 ${jobid} clean + flux job wait-event -vt 15 ${jobid} clean && + flux job info ${jobid} rabbit_workflow && + flux job info ${jobid} rabbit_workflow | \ + jq -e ".metadata.name == \"fluxjob-$(flux job id ${jobid})\"" && + flux job info ${jobid} rabbit_workflow | jq -e ".spec.wlmID == \"flux\"" && + flux job info ${jobid} rabbit_workflow | jq -e ".kind == \"Workflow\"" ' test_expect_success 'job requesting too much storage is rejected' ' From dbf1ff6b991825dc7914a1c8aed42c0d3829b2d3 Mon Sep 17 00:00:00 2001 From: James Corbett Date: Tue, 16 Jul 2024 17:34:52 -0700 Subject: [PATCH 4/7] dws: stop logging workflows Problem: now that workflows are saved to the KVS, there is no need to log them out, which only clutters the journal. Stop logging them. --- src/modules/coral2_dws.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/src/modules/coral2_dws.py b/src/modules/coral2_dws.py index 7e1a6c6..6c99465 100755 --- a/src/modules/coral2_dws.py +++ b/src/modules/coral2_dws.py @@ -473,7 +473,6 @@ def _workflow_state_change_cb_inner( move_workflow_to_teardown(handle, winfo, k8s_api, workflow) if workflow["status"].get("status") == "Error": # a fatal error has occurred in the workflows, raise a job exception - LOGGER.info("workflow '%s' hit an error: %s", winfo.name, workflow) handle.job_raise( jobid, "exception", @@ -664,11 +663,6 @@ def kill_workflows_in_tc(_reactor, watcher, _r, tc_timeout): # iterate over it. for winfo in WORKFLOWS_IN_TC.copy(): if curr_time - winfo.transient_condition.last_time > tc_timeout: - LOGGER.info( - "Workflow '%s' was in TransientCondition too long: %s", - winfo.name, - winfo.transient_condition.workflow, - ) watcher.flux_handle.job_raise( winfo.jobid, "exception", From 0aa0da98f8109e4b929355f28b2a9cf52a0e91ea Mon Sep 17 00:00:00 2001 From: James Corbett Date: Tue, 16 Jul 2024 17:45:39 -0700 Subject: [PATCH 5/7] dws: move workflows in TC to teardown Problem: as in #169, for most states, raising an exception should be enough to trigger other logic that eventually moves the workflow to Teardown. However, if the workflow is in PostRun or DataOut, the exception won't affect the dws-epilog action holding the job, so the workflow should be moved to Teardown immediately. Move workflows that are stuck in TransientCondition in DataOut or PostRun to Teardown immediately. --- src/modules/coral2_dws.py | 20 ++++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/src/modules/coral2_dws.py b/src/modules/coral2_dws.py index 6c99465..7221bf4 100755 --- a/src/modules/coral2_dws.py +++ b/src/modules/coral2_dws.py @@ -651,12 +651,13 @@ def init_rabbits(k8s_api, handle, watchers, args): ) -def kill_workflows_in_tc(_reactor, watcher, _r, tc_timeout): +def kill_workflows_in_tc(_reactor, watcher, _r, arg): """Callback firing every (tc_timeout / 2) seconds. Raise exceptions on jobs stuck in TransientCondition for more than tc_timeout seconds. """ + tc_timeout, k8s_api = arg curr_time = time.time() # iterate over a copy of the set # otherwise an exception occurs because we modify the set as we @@ -670,6 +671,21 @@ def kill_workflows_in_tc(_reactor, watcher, _r, tc_timeout): "DWS/Rabbit interactions failed: workflow in 'TransientCondition' " f"state too long: {winfo.transient_condition.last_message}", ) + # for most states, raising an exception should be enough to trigger other + # logic that eventually moves the workflow to Teardown. However, if the + # workflow is in PostRun or DataOut, the exception won't affect the + # dws-epilog action holding the job, so the workflow should be moved + # to Teardown now. + if winfo.transient_condition.workflow["spec"]["desiredState"] in ( + "PostRun", + "DataOut", + ): + move_workflow_to_teardown( + watcher.flux_handle, + winfo, + k8s_api, + winfo.transient_condition.workflow, + ) WORKFLOWS_IN_TC.discard(winfo) @@ -838,7 +854,7 @@ def main(): args.transient_condition_timeout / 2, kill_workflows_in_tc, repeat=args.transient_condition_timeout / 2, - args=args.transient_condition_timeout, + args=(args.transient_condition_timeout, k8s_api), ).start() # start watching k8s workflow resources and operate on them when updates occur # or new RPCs are received From 6143e5b95da6a08ff9c56ed10a141efaf01692af Mon Sep 17 00:00:00 2001 From: James Corbett Date: Tue, 16 Jul 2024 17:55:37 -0700 Subject: [PATCH 6/7] dws: change log level of some messages to INFO Problem: setting the log level to DEBUG makes the k8s library very verbose, which is fine sometimes but too much for some use cases. However, the INFO level is very open. Change some log messages to INFO from WARNING and DEBUG. --- src/modules/coral2_dws.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/modules/coral2_dws.py b/src/modules/coral2_dws.py index 7221bf4..3e220da 100755 --- a/src/modules/coral2_dws.py +++ b/src/modules/coral2_dws.py @@ -173,7 +173,7 @@ def move_workflow_to_teardown(handle, winfo, k8s_api, workflow=None): }, ) winfo.toredown = True - if LOGGER.isEnabledFor(logging.DEBUG): + if LOGGER.isEnabledFor(logging.INFO): try: api_response = k8s_api.list_namespaced_custom_object( *DATAMOVEMENT_CRD, @@ -183,14 +183,14 @@ def move_workflow_to_teardown(handle, winfo, k8s_api, workflow=None): ), ) except Exception as exc: - LOGGER.debug( + LOGGER.info( "Failed to fetch nnfdatamovement crds for workflow '%s': %s", winfo.name, exc, ) else: for crd in api_response["items"]: - LOGGER.debug( + LOGGER.info( "Found nnfdatamovement crd for workflow '%s': %s", winfo.name, crd ) @@ -488,7 +488,7 @@ def _workflow_state_change_cb_inner( move_workflow_to_teardown(handle, winfo, k8s_api, workflow) elif workflow["status"].get("status") == "TransientCondition": # a potentially fatal error has occurred, but may resolve itself - LOGGER.warning( + LOGGER.info( "Workflow '%s' has TransientCondition set, message is '%s', workflow is %s", winfo.name, workflow["status"].get("message", ""), From 619ee55c39b5ca0e1fe2e61598ffdc2545ff8e78 Mon Sep 17 00:00:00 2001 From: James Corbett Date: Tue, 16 Jul 2024 18:01:39 -0700 Subject: [PATCH 7/7] dws: fix argparse log-level config Problem: a single -v flag has no effect on coral2_dws, because it checks for > 1 verbose flags when it should be > 0 or >= 1. Change the logic to be > 0. --- src/modules/coral2_dws.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/modules/coral2_dws.py b/src/modules/coral2_dws.py index 3e220da..358c219 100755 --- a/src/modules/coral2_dws.py +++ b/src/modules/coral2_dws.py @@ -756,9 +756,9 @@ def setup_parsing(): def config_logging(args): """Configure logging for the script.""" log_level = logging.WARNING - if args.verbose > 1: + if args.verbose > 0: log_level = logging.INFO - if args.verbose > 2: + if args.verbose > 1: log_level = logging.DEBUG logging.basicConfig( format="%(asctime)s - %(levelname)s - %(message)s",