diff --git a/src/modules/coral2_dws.py b/src/modules/coral2_dws.py index 7b0310a..358c219 100755 --- a/src/modules/coral2_dws.py +++ b/src/modules/coral2_dws.py @@ -148,11 +148,18 @@ def remove_finalizer(workflow_name, k8s_api, workflow): ) -def move_workflow_to_teardown(winfo, k8s_api, workflow=None): +def move_workflow_to_teardown(handle, winfo, k8s_api, workflow=None): """Helper function for moving a workflow to Teardown.""" if workflow is None: workflow = k8s_api.get_namespaced_custom_object(*WORKFLOW_CRD, winfo.name) - LOGGER.debug("Moving workflow %s to Teardown, dump is %s", winfo.name, workflow) + try: + kvsdir = flux.job.job_kvs(handle, winfo.jobid) + kvsdir["rabbit_workflow"] = workflow + kvsdir.commit() + except Exception: + LOGGER.exception( + "Failed to update KVS for job %s: workflow is", winfo.jobid, workflow + ) try: workflow["metadata"]["finalizers"].remove(_FINALIZER) except ValueError: @@ -166,7 +173,7 @@ def move_workflow_to_teardown(winfo, k8s_api, workflow=None): }, ) winfo.toredown = True - if LOGGER.isEnabledFor(logging.DEBUG): + if LOGGER.isEnabledFor(logging.INFO): try: api_response = k8s_api.list_namespaced_custom_object( *DATAMOVEMENT_CRD, @@ -176,14 +183,14 @@ def move_workflow_to_teardown(winfo, k8s_api, workflow=None): ), ) except Exception as exc: - LOGGER.debug( + LOGGER.info( "Failed to fetch nnfdatamovement crds for workflow '%s': %s", winfo.name, exc, ) else: for crd in api_response["items"]: - LOGGER.debug( + LOGGER.info( "Found nnfdatamovement crd for workflow '%s': %s", winfo.name, crd ) @@ -270,7 +277,10 @@ def setup_cb(handle, _t, msg, k8s_api): nodes_per_nnf[nnf_name] = nodes_per_nnf.get(nnf_name, 0) + 1 handle.rpc( "job-manager.memo", - payload={"id": jobid, "memo": {"rabbits": Hostlist(nodes_per_nnf.keys()).encode()}}, + payload={ + "id": jobid, + "memo": {"rabbits": Hostlist(nodes_per_nnf.keys()).encode()}, + }, ).then(log_rpc_response) k8s_api.patch_namespaced_custom_object( COMPUTE_CRD.group, @@ -322,7 +332,7 @@ def post_run_cb(handle, _t, msg, k8s_api): if not run_started: # the job hit an exception before beginning to run; transition # the workflow immediately to 'teardown' - move_workflow_to_teardown(winfo, k8s_api) + move_workflow_to_teardown(handle, winfo, k8s_api) else: move_workflow_desiredstate(winfo.name, "PostRun", k8s_api) @@ -368,7 +378,7 @@ def workflow_state_change_cb(event, handle, k8s_api, disable_fluxion): jobid, ) try: - move_workflow_to_teardown(winfo, k8s_api, workflow) + move_workflow_to_teardown(handle, winfo, k8s_api, workflow) except ApiException: LOGGER.exception( "Failed to move workflow '%s' with jobid %s to 'teardown' " @@ -460,10 +470,9 @@ def _workflow_state_change_cb_inner( move_workflow_desiredstate(winfo.name, "DataOut", k8s_api) elif state_complete(workflow, "DataOut"): # move workflow to next stage, teardown - move_workflow_to_teardown(winfo, k8s_api, workflow) + move_workflow_to_teardown(handle, winfo, k8s_api, workflow) if workflow["status"].get("status") == "Error": # a fatal error has occurred in the workflows, raise a job exception - LOGGER.info("workflow '%s' hit an error: %s", winfo.name, workflow) handle.job_raise( jobid, "exception", @@ -476,10 +485,10 @@ def _workflow_state_change_cb_inner( # workflow is in PostRun or DataOut, the exception won't affect the dws-epilog # action holding the job, so the workflow should be moved to Teardown now. if workflow["spec"]["desiredState"] in ("PostRun", "DataOut"): - move_workflow_to_teardown(winfo, k8s_api, workflow) + move_workflow_to_teardown(handle, winfo, k8s_api, workflow) elif workflow["status"].get("status") == "TransientCondition": # a potentially fatal error has occurred, but may resolve itself - LOGGER.warning( + LOGGER.info( "Workflow '%s' has TransientCondition set, message is '%s', workflow is %s", winfo.name, workflow["status"].get("message", ""), @@ -642,23 +651,19 @@ def init_rabbits(k8s_api, handle, watchers, args): ) -def kill_workflows_in_tc(_reactor, watcher, _r, tc_timeout): +def kill_workflows_in_tc(_reactor, watcher, _r, arg): """Callback firing every (tc_timeout / 2) seconds. Raise exceptions on jobs stuck in TransientCondition for more than tc_timeout seconds. """ + tc_timeout, k8s_api = arg curr_time = time.time() # iterate over a copy of the set # otherwise an exception occurs because we modify the set as we # iterate over it. for winfo in WORKFLOWS_IN_TC.copy(): if curr_time - winfo.transient_condition.last_time > tc_timeout: - LOGGER.info( - "Workflow '%s' was in TransientCondition too long: %s", - winfo.name, - winfo.transient_condition.workflow, - ) watcher.flux_handle.job_raise( winfo.jobid, "exception", @@ -666,6 +671,21 @@ def kill_workflows_in_tc(_reactor, watcher, _r, tc_timeout): "DWS/Rabbit interactions failed: workflow in 'TransientCondition' " f"state too long: {winfo.transient_condition.last_message}", ) + # for most states, raising an exception should be enough to trigger other + # logic that eventually moves the workflow to Teardown. However, if the + # workflow is in PostRun or DataOut, the exception won't affect the + # dws-epilog action holding the job, so the workflow should be moved + # to Teardown now. + if winfo.transient_condition.workflow["spec"]["desiredState"] in ( + "PostRun", + "DataOut", + ): + move_workflow_to_teardown( + watcher.flux_handle, + winfo, + k8s_api, + winfo.transient_condition.workflow, + ) WORKFLOWS_IN_TC.discard(winfo) @@ -736,9 +756,9 @@ def setup_parsing(): def config_logging(args): """Configure logging for the script.""" log_level = logging.WARNING - if args.verbose > 1: + if args.verbose > 0: log_level = logging.INFO - if args.verbose > 2: + if args.verbose > 1: log_level = logging.DEBUG logging.basicConfig( format="%(asctime)s - %(levelname)s - %(message)s", @@ -793,11 +813,16 @@ def raise_self_exception(handle): testing purposes. Once https://github.com/flux-framework/flux-core/issues/3821 is implemented/closed, this can be replaced with that solution. + + Also, remove FLUX_KVS_NAMESPACE from the environment, because otherwise + KVS lookups will look relative to that namespace, changing the behavior + relative to when the script runs as a service. """ try: jobid = id_parse(os.environ["FLUX_JOB_ID"]) except KeyError: return + del os.environ["FLUX_KVS_NAMESPACE"] Future(handle.job_raise(jobid, "exception", 7, "dws watchers setup")).get() @@ -829,7 +854,7 @@ def main(): args.transient_condition_timeout / 2, kill_workflows_in_tc, repeat=args.transient_condition_timeout / 2, - args=args.transient_condition_timeout, + args=(args.transient_condition_timeout, k8s_api), ).start() # start watching k8s workflow resources and operate on them when updates occur # or new RPCs are received diff --git a/t/t1002-dws-workflow-obj.t b/t/t1002-dws-workflow-obj.t index f449422..674df07 100755 --- a/t/t1002-dws-workflow-obj.t +++ b/t/t1002-dws-workflow-obj.t @@ -145,7 +145,12 @@ test_expect_success 'job submission with valid DW string works' ' flux job wait-event -vt 30 -m description=${EPILOG_NAME} \ ${jobid} epilog-finish && flux job wait-event -vt 15 ${jobid} clean && - flux jobs -n ${jobid} -o "{user.rabbits}" | flux hostlist -q - + flux jobs -n ${jobid} -o "{user.rabbits}" | flux hostlist -q - && + flux job info ${jobid} rabbit_workflow && + flux job info ${jobid} rabbit_workflow | \ + jq -e ".metadata.name == \"fluxjob-$(flux job id ${jobid})\"" && + flux job info ${jobid} rabbit_workflow | jq -e ".spec.wlmID == \"flux\"" && + flux job info ${jobid} rabbit_workflow | jq -e ".kind == \"Workflow\"" ' test_expect_success 'job requesting copy-offload in DW string works' ' @@ -172,7 +177,12 @@ test_expect_success 'job requesting copy-offload in DW string works' ' ${jobid} epilog-start && flux job wait-event -vt 30 -m description=${EPILOG_NAME} \ ${jobid} epilog-finish && - flux job wait-event -vt 15 ${jobid} clean + flux job wait-event -vt 15 ${jobid} clean && + flux job info ${jobid} rabbit_workflow && + flux job info ${jobid} rabbit_workflow | \ + jq -e ".metadata.name == \"fluxjob-$(flux job id ${jobid})\"" && + flux job info ${jobid} rabbit_workflow | jq -e ".spec.wlmID == \"flux\"" && + flux job info ${jobid} rabbit_workflow | jq -e ".kind == \"Workflow\"" ' test_expect_success 'job requesting too much storage is rejected' '