diff --git a/reana_workflow_controller/config.py b/reana_workflow_controller/config.py index 1bca65e6..fb3cff0c 100644 --- a/reana_workflow_controller/config.py +++ b/reana_workflow_controller/config.py @@ -143,6 +143,9 @@ JOB_CONTROLLER_CONTAINER_PORT = 5000 """Default container port for REANA Job Controller sidecar.""" +JOB_CONTROLLER_SHUTDOWN_ENDPOINT = "/shutdown" +"""Endpoint of reana-job-controller used to stop all the jobs.""" + JOB_CONTROLLER_NAME = "job-controller" """Default job controller container name.""" diff --git a/reana_workflow_controller/consumer.py b/reana_workflow_controller/consumer.py index 25f7c0a3..f7b5b59d 100644 --- a/reana_workflow_controller/consumer.py +++ b/reana_workflow_controller/consumer.py @@ -107,7 +107,6 @@ def on_message(self, body, message): msg = body_dict["message"] if "progress" in msg: _update_run_progress(workflow_uuid, msg) - _update_job_progress(workflow_uuid, msg) # Caching: calculate input hash and store in JobCache if "caching_info" in msg: _update_job_cache(msg) @@ -228,22 +227,6 @@ def _update_run_progress(workflow_uuid, msg): Session.add(workflow) -def _update_job_progress(workflow_uuid, msg): - """Update job progress for jobs in received message.""" - for status_name, job_status in PROGRESS_STATUSES: - if status_name in msg["progress"]: - status_progress = msg["progress"][status_name] - for job_id in status_progress["job_ids"]: - try: - uuid.UUID(job_id) - except Exception: - continue - job = Session.query(Job).filter_by(id_=job_id).one_or_none() - if job: - job.workflow_uuid = workflow_uuid - job.status = job_status - - def _update_job_cache(msg): """Update caching information for finished job.""" cached_job = ( diff --git a/reana_workflow_controller/workflow_run_manager.py b/reana_workflow_controller/workflow_run_manager.py index 3e41c2e2..22a7d7ae 100644 --- a/reana_workflow_controller/workflow_run_manager.py +++ b/reana_workflow_controller/workflow_run_manager.py @@ -58,6 +58,8 @@ from reana_workflow_controller.config import ( # isort:skip IMAGE_PULL_SECRETS, + JOB_CONTROLLER_CONTAINER_PORT, + JOB_CONTROLLER_SHUTDOWN_ENDPOINT, REANA_KUBERNETES_JOBS_MAX_USER_MEMORY_LIMIT, REANA_KUBERNETES_JOBS_MEMORY_LIMIT, REANA_KUBERNETES_JOBS_TIMEOUT_LIMIT, @@ -438,18 +440,6 @@ def _delete_k8s_job_quiet(self, job_name): def stop_batch_workflow_run(self): """Stop a batch workflow run along with all its dependent jobs.""" - jobs_to_delete = self.get_workflow_running_jobs() - - for job in jobs_to_delete: - job_id = job.backend_job_id - if self._delete_k8s_job_quiet(job_id): - job.status = JobStatus.stopped - Session.add(job) - - # Commit the session once all the jobs have been processed. - Session.commit() - - # Delete the workflow run batch job workflow_run_name = self._workflow_run_name_generator("batch") self._delete_k8s_job_quiet(workflow_run_name) @@ -575,6 +565,15 @@ def _create_job_spec( command=["/bin/bash", "-c"], args=self._create_job_controller_startup_cmd(user), ports=[], + # Make sure that all the jobs are stopped before the deletion of the run-batch pod + lifecycle=client.V1Lifecycle( + pre_stop=client.V1Handler( + http_get=client.V1HTTPGetAction( + port=JOB_CONTROLLER_CONTAINER_PORT, + path=JOB_CONTROLLER_SHUTDOWN_ENDPOINT, + ) + ) + ), ) job_controller_env_vars.extend( @@ -699,7 +698,7 @@ def _create_job_spec( def _create_job_controller_startup_cmd(self, user=None): """Create job controller startup cmd.""" - base_cmd = "flask run -h 0.0.0.0;" + base_cmd = "exec flask run -h 0.0.0.0;" if user: add_group_cmd = "groupadd -f -g {} {};".format( WORKFLOW_RUNTIME_USER_GID, WORKFLOW_RUNTIME_USER_GID @@ -711,7 +710,7 @@ def _create_job_controller_startup_cmd(self, user=None): WORKFLOW_RUNTIME_USER_UID, self.workflow.workspace_path, ) - run_app_cmd = 'su {} /bin/bash -c "{}"'.format(user, base_cmd) + run_app_cmd = 'exec su {} /bin/bash -c "{}"'.format(user, base_cmd) full_cmd = add_group_cmd + add_user_cmd + chown_workspace_cmd + run_app_cmd return [full_cmd] else: diff --git a/tests/test_workflow_run_manager.py b/tests/test_workflow_run_manager.py index f8411f1a..190a7e4f 100644 --- a/tests/test_workflow_run_manager.py +++ b/tests/test_workflow_run_manager.py @@ -110,25 +110,19 @@ def test_stop_workflow_backend_only_kubernetes( """Test deletion of workflows with only Kubernetes based jobs.""" workflow = sample_serial_workflow_in_db workflow.status = RunStatus.running - workflow_jobs = add_kubernetes_jobs_to_workflow(workflow) - backend_job_ids = [job.backend_job_id for job in workflow_jobs] with patch( "reana_workflow_controller.workflow_run_manager." "current_k8s_batchv1_api_client" ) as api_client: kwrm = KubernetesWorkflowRunManager(workflow) kwrm.stop_batch_workflow_run() - for delete_call in api_client.delete_namespaced_job.call_args_list: - job_id = delete_call.args[0] - if job_id in backend_job_ids: - del backend_job_ids[backend_job_ids.index(job_id)] - # Check that the status of the job with that ID in the database is set to stopped - assert ( - Job.query.filter_by(backend_job_id=job_id).one().status - == JobStatus.stopped - ) - - assert not backend_job_ids + # jobs are deleted by reana-job-controller, so this should be called + # only once to delete the run-batch pod + api_client.delete_namespaced_job.assert_called_once() + assert ( + api_client.delete_namespaced_job.call_args.args[0] + == f"reana-run-batch-{workflow.id_}" + ) def test_interactive_session_closure(sample_serial_workflow_in_db, session):