From cc8420bb9f4fd35280a13819b00616cc26bf126a Mon Sep 17 00:00:00 2001 From: Marco Donadoni Date: Tue, 23 Jan 2024 15:27:39 +0100 Subject: [PATCH] feat(run-manager): call shutdown endpoint before workflow stop (#559) Call the shutdown endpoint of reana-job-controller before stopping a running workflow, so that running jobs are correctly cleaned up. Closes reanahub/reana-workflow-controller#252 Closes reanahub/reana-workflow-controller#546 --- reana_workflow_controller/config.py | 3 +++ .../workflow_run_manager.py | 23 +++++++++---------- tests/test_workflow_run_manager.py | 20 ++++++---------- 3 files changed, 21 insertions(+), 25 deletions(-) diff --git a/reana_workflow_controller/config.py b/reana_workflow_controller/config.py index 1bca65e6..fb3cff0c 100644 --- a/reana_workflow_controller/config.py +++ b/reana_workflow_controller/config.py @@ -143,6 +143,9 @@ JOB_CONTROLLER_CONTAINER_PORT = 5000 """Default container port for REANA Job Controller sidecar.""" +JOB_CONTROLLER_SHUTDOWN_ENDPOINT = "/shutdown" +"""Endpoint of reana-job-controller used to stop all the jobs.""" + JOB_CONTROLLER_NAME = "job-controller" """Default job controller container name.""" diff --git a/reana_workflow_controller/workflow_run_manager.py b/reana_workflow_controller/workflow_run_manager.py index 3e41c2e2..08d85afe 100644 --- a/reana_workflow_controller/workflow_run_manager.py +++ b/reana_workflow_controller/workflow_run_manager.py @@ -58,6 +58,8 @@ from reana_workflow_controller.config import ( # isort:skip IMAGE_PULL_SECRETS, + JOB_CONTROLLER_CONTAINER_PORT, + JOB_CONTROLLER_SHUTDOWN_ENDPOINT, REANA_KUBERNETES_JOBS_MAX_USER_MEMORY_LIMIT, REANA_KUBERNETES_JOBS_MEMORY_LIMIT, REANA_KUBERNETES_JOBS_TIMEOUT_LIMIT, @@ -438,18 +440,6 @@ def _delete_k8s_job_quiet(self, job_name): def stop_batch_workflow_run(self): """Stop a batch workflow run along with all its dependent jobs.""" - jobs_to_delete = self.get_workflow_running_jobs() - - for job in jobs_to_delete: - job_id = job.backend_job_id - if self._delete_k8s_job_quiet(job_id): - job.status = JobStatus.stopped - Session.add(job) - - # Commit the session once all the jobs have been processed. - Session.commit() - - # Delete the workflow run batch job workflow_run_name = self._workflow_run_name_generator("batch") self._delete_k8s_job_quiet(workflow_run_name) @@ -575,6 +565,15 @@ def _create_job_spec( command=["/bin/bash", "-c"], args=self._create_job_controller_startup_cmd(user), ports=[], + # Make sure that all the jobs are stopped before the deletion of the run-batch pod + lifecycle=client.V1Lifecycle( + pre_stop=client.V1Handler( + http_get=client.V1HTTPGetAction( + port=JOB_CONTROLLER_CONTAINER_PORT, + path=JOB_CONTROLLER_SHUTDOWN_ENDPOINT, + ) + ) + ), ) job_controller_env_vars.extend( diff --git a/tests/test_workflow_run_manager.py b/tests/test_workflow_run_manager.py index f8411f1a..190a7e4f 100644 --- a/tests/test_workflow_run_manager.py +++ b/tests/test_workflow_run_manager.py @@ -110,25 +110,19 @@ def test_stop_workflow_backend_only_kubernetes( """Test deletion of workflows with only Kubernetes based jobs.""" workflow = sample_serial_workflow_in_db workflow.status = RunStatus.running - workflow_jobs = add_kubernetes_jobs_to_workflow(workflow) - backend_job_ids = [job.backend_job_id for job in workflow_jobs] with patch( "reana_workflow_controller.workflow_run_manager." "current_k8s_batchv1_api_client" ) as api_client: kwrm = KubernetesWorkflowRunManager(workflow) kwrm.stop_batch_workflow_run() - for delete_call in api_client.delete_namespaced_job.call_args_list: - job_id = delete_call.args[0] - if job_id in backend_job_ids: - del backend_job_ids[backend_job_ids.index(job_id)] - # Check that the status of the job with that ID in the database is set to stopped - assert ( - Job.query.filter_by(backend_job_id=job_id).one().status - == JobStatus.stopped - ) - - assert not backend_job_ids + # jobs are deleted by reana-job-controller, so this should be called + # only once to delete the run-batch pod + api_client.delete_namespaced_job.assert_called_once() + assert ( + api_client.delete_namespaced_job.call_args.args[0] + == f"reana-run-batch-{workflow.id_}" + ) def test_interactive_session_closure(sample_serial_workflow_in_db, session):