Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(run-manager): call shutdown endpoint before workflow stop (#559) #559

Merged
merged 3 commits into from
Feb 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions reana_workflow_controller/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,9 @@
JOB_CONTROLLER_CONTAINER_PORT = 5000
"""Default container port for REANA Job Controller sidecar."""

JOB_CONTROLLER_SHUTDOWN_ENDPOINT = "/shutdown"
"""Endpoint of reana-job-controller used to stop all the jobs."""

JOB_CONTROLLER_NAME = "job-controller"
"""Default job controller container name."""

Expand Down
17 changes: 0 additions & 17 deletions reana_workflow_controller/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,6 @@ def on_message(self, body, message):
msg = body_dict["message"]
if "progress" in msg:
_update_run_progress(workflow_uuid, msg)
_update_job_progress(workflow_uuid, msg)
# Caching: calculate input hash and store in JobCache
if "caching_info" in msg:
_update_job_cache(msg)
Expand Down Expand Up @@ -228,22 +227,6 @@ def _update_run_progress(workflow_uuid, msg):
Session.add(workflow)


def _update_job_progress(workflow_uuid, msg):
"""Update job progress for jobs in received message."""
for status_name, job_status in PROGRESS_STATUSES:
if status_name in msg["progress"]:
status_progress = msg["progress"][status_name]
for job_id in status_progress["job_ids"]:
try:
uuid.UUID(job_id)
except Exception:
continue
job = Session.query(Job).filter_by(id_=job_id).one_or_none()
if job:
job.workflow_uuid = workflow_uuid
job.status = job_status


def _update_job_cache(msg):
"""Update caching information for finished job."""
cached_job = (
Expand Down
27 changes: 13 additions & 14 deletions reana_workflow_controller/workflow_run_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@

from reana_workflow_controller.config import ( # isort:skip
IMAGE_PULL_SECRETS,
JOB_CONTROLLER_CONTAINER_PORT,
JOB_CONTROLLER_SHUTDOWN_ENDPOINT,
REANA_KUBERNETES_JOBS_MAX_USER_MEMORY_LIMIT,
REANA_KUBERNETES_JOBS_MEMORY_LIMIT,
REANA_KUBERNETES_JOBS_TIMEOUT_LIMIT,
Expand Down Expand Up @@ -438,18 +440,6 @@ def _delete_k8s_job_quiet(self, job_name):

def stop_batch_workflow_run(self):
"""Stop a batch workflow run along with all its dependent jobs."""
jobs_to_delete = self.get_workflow_running_jobs()

for job in jobs_to_delete:
job_id = job.backend_job_id
if self._delete_k8s_job_quiet(job_id):
job.status = JobStatus.stopped
Session.add(job)

# Commit the session once all the jobs have been processed.
Session.commit()

# Delete the workflow run batch job
workflow_run_name = self._workflow_run_name_generator("batch")
self._delete_k8s_job_quiet(workflow_run_name)

Expand Down Expand Up @@ -575,6 +565,15 @@ def _create_job_spec(
command=["/bin/bash", "-c"],
args=self._create_job_controller_startup_cmd(user),
ports=[],
# Make sure that all the jobs are stopped before the deletion of the run-batch pod
lifecycle=client.V1Lifecycle(
pre_stop=client.V1Handler(
http_get=client.V1HTTPGetAction(
port=JOB_CONTROLLER_CONTAINER_PORT,
path=JOB_CONTROLLER_SHUTDOWN_ENDPOINT,
)
)
),
)

job_controller_env_vars.extend(
Expand Down Expand Up @@ -699,7 +698,7 @@ def _create_job_spec(

def _create_job_controller_startup_cmd(self, user=None):
"""Create job controller startup cmd."""
base_cmd = "flask run -h 0.0.0.0;"
base_cmd = "exec flask run -h 0.0.0.0;"
if user:
add_group_cmd = "groupadd -f -g {} {};".format(
WORKFLOW_RUNTIME_USER_GID, WORKFLOW_RUNTIME_USER_GID
Expand All @@ -711,7 +710,7 @@ def _create_job_controller_startup_cmd(self, user=None):
WORKFLOW_RUNTIME_USER_UID,
self.workflow.workspace_path,
)
run_app_cmd = 'su {} /bin/bash -c "{}"'.format(user, base_cmd)
run_app_cmd = 'exec su {} /bin/bash -c "{}"'.format(user, base_cmd)
full_cmd = add_group_cmd + add_user_cmd + chown_workspace_cmd + run_app_cmd
return [full_cmd]
else:
Expand Down
20 changes: 7 additions & 13 deletions tests/test_workflow_run_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,25 +110,19 @@ def test_stop_workflow_backend_only_kubernetes(
"""Test deletion of workflows with only Kubernetes based jobs."""
workflow = sample_serial_workflow_in_db
workflow.status = RunStatus.running
workflow_jobs = add_kubernetes_jobs_to_workflow(workflow)
backend_job_ids = [job.backend_job_id for job in workflow_jobs]
with patch(
"reana_workflow_controller.workflow_run_manager."
"current_k8s_batchv1_api_client"
) as api_client:
kwrm = KubernetesWorkflowRunManager(workflow)
kwrm.stop_batch_workflow_run()
for delete_call in api_client.delete_namespaced_job.call_args_list:
job_id = delete_call.args[0]
if job_id in backend_job_ids:
del backend_job_ids[backend_job_ids.index(job_id)]
# Check that the status of the job with that ID in the database is set to stopped
assert (
Job.query.filter_by(backend_job_id=job_id).one().status
== JobStatus.stopped
)

assert not backend_job_ids
# jobs are deleted by reana-job-controller, so this should be called
# only once to delete the run-batch pod
api_client.delete_namespaced_job.assert_called_once()
assert (
api_client.delete_namespaced_job.call_args.args[0]
== f"reana-run-batch-{workflow.id_}"
)


def test_interactive_session_closure(sample_serial_workflow_in_db, session):
Expand Down
Loading