From a9136f2219bf31ef62c2b95bacb840bf735db561 Mon Sep 17 00:00:00 2001 From: Nathan Freeman Date: Mon, 16 Oct 2023 14:52:49 -0500 Subject: [PATCH] add pipeline and task 'staging' status. update spec. add migration --- src/api/specs/WorkflowsAPI.yaml | 3 ++- ...nerun_status_alter_taskexecution_status.py | 23 +++++++++++++++++++ src/engine/src/contrib/tapis/TapisPlugin.py | 4 ++-- .../notifications/TapisWorkflowsAPIBackend.py | 14 +++++++++-- .../workflows/executors/WorkflowExecutor.py | 5 +++- src/engine/src/owe_python_sdk/events/types.py | 21 +++++++++-------- src/shared/OWESpec.yaml | 3 ++- 7 files changed, 56 insertions(+), 17 deletions(-) create mode 100644 src/api/src/backend/migrations/0027_alter_pipelinerun_status_alter_taskexecution_status.py diff --git a/src/api/specs/WorkflowsAPI.yaml b/src/api/specs/WorkflowsAPI.yaml index 2e5a44b5..772c9cd8 100644 --- a/src/api/specs/WorkflowsAPI.yaml +++ b/src/api/specs/WorkflowsAPI.yaml @@ -3047,8 +3047,9 @@ components: EnumRunStatus: type: string enum: - - active + - staging - submitted + - active - pending - backoff - completed diff --git a/src/api/src/backend/migrations/0027_alter_pipelinerun_status_alter_taskexecution_status.py b/src/api/src/backend/migrations/0027_alter_pipelinerun_status_alter_taskexecution_status.py new file mode 100644 index 00000000..fda7408d --- /dev/null +++ b/src/api/src/backend/migrations/0027_alter_pipelinerun_status_alter_taskexecution_status.py @@ -0,0 +1,23 @@ +# Generated by Django 4.1.2 on 2023-10-16 18:36 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('backend', '0026_delete_event'), + ] + + operations = [ + migrations.AlterField( + model_name='pipelinerun', + name='status', + field=models.CharField(choices=[('pending', 'pending'), ('staging', 'staging'), ('active', 'active'), ('backoff', 'backoff'), ('completed', 'completed'), ('failed', 'failed'), ('suspended', 'suspended'), ('archiving', 'archiving'), ('terminated', 'terminated'), ('submitted', 'submitted'), ('deferred', 'deferred')], default='submitted', max_length=16), + ), + migrations.AlterField( + model_name='taskexecution', + name='status', + field=models.CharField(choices=[('pending', 'pending'), ('staging', 'staging'), ('active', 'active'), ('backoff', 'backoff'), ('completed', 'completed'), ('failed', 'failed'), ('suspended', 'suspended'), ('archiving', 'archiving'), ('terminated', 'terminated'), ('skipped', 'skipped')], default='pending', max_length=16), + ), + ] \ No newline at end of file diff --git a/src/engine/src/contrib/tapis/TapisPlugin.py b/src/engine/src/contrib/tapis/TapisPlugin.py index c785dcb6..21c598ea 100644 --- a/src/engine/src/contrib/tapis/TapisPlugin.py +++ b/src/engine/src/contrib/tapis/TapisPlugin.py @@ -6,7 +6,7 @@ PIPELINE_ACTIVE, PIPELINE_COMPLETED, PIPELINE_ARCHIVING, PIPELINE_FAILED, PIPELINE_STAGING, PIPELINE_SUSPENDED, PIPELINE_TERMINATED, TASK_ACTIVE, TASK_ARCHIVING, TASK_BACKOFF, TASK_COMPLETED, TASK_FAILED, TASK_PENDING, - TASK_SUSPENDED, TASK_TERMINATED + TASK_SUSPENDED, TASK_TERMINATED, TASK_STAGING, TASK_SKIPPED ) @@ -34,7 +34,7 @@ def __init__(self, name): PIPELINE_ACTIVE, PIPELINE_COMPLETED, PIPELINE_ARCHIVING, PIPELINE_FAILED, PIPELINE_STAGING, PIPELINE_SUSPENDED, PIPELINE_TERMINATED, TASK_ACTIVE, TASK_ARCHIVING, TASK_BACKOFF, TASK_COMPLETED, TASK_FAILED, TASK_PENDING, - TASK_SUSPENDED, TASK_TERMINATED + TASK_SUSPENDED, TASK_TERMINATED, TASK_STAGING, TASK_SKIPPED ] ) ) diff --git a/src/engine/src/contrib/tapis/middleware/event_handlers/notifications/TapisWorkflowsAPIBackend.py b/src/engine/src/contrib/tapis/middleware/event_handlers/notifications/TapisWorkflowsAPIBackend.py index ab4d41b9..37018975 100644 --- a/src/engine/src/contrib/tapis/middleware/event_handlers/notifications/TapisWorkflowsAPIBackend.py +++ b/src/engine/src/contrib/tapis/middleware/event_handlers/notifications/TapisWorkflowsAPIBackend.py @@ -8,7 +8,7 @@ PIPELINE_ACTIVE, PIPELINE_ARCHIVING, PIPELINE_COMPLETED, PIPELINE_FAILED, PIPELINE_STAGING, PIPELINE_SUSPENDED, PIPELINE_TERMINATED, PIPELINE_SKIPPED, TASK_ACTIVE, TASK_ARCHIVING, TASK_BACKOFF, TASK_COMPLETED, TASK_FAILED, - TASK_PENDING, TASK_SUSPENDED, TASK_TERMINATED, TASK_SKIPPED + TASK_PENDING, TASK_STAGING, TASK_SUSPENDED, TASK_TERMINATED, TASK_SKIPPED ) from owe_python_sdk.constants import STDERR, STDOUT @@ -31,6 +31,7 @@ def __init__(self, ctx): TASK_COMPLETED: self._task_completed, TASK_FAILED: self._task_failed, TASK_PENDING: self._task_pending, + TASK_STAGING: self._task_staging, TASK_SUSPENDED: self._task_suspended, TASK_TERMINATED: self._task_terminated, # TASK_SKIPPED: self._task_skipped @@ -196,7 +197,16 @@ def _task_pending(self, event): pipeline_run_uuid=event.payload.pipeline_run.uuid, task_execution_uuid=event.task.execution_uuid, status="pending", - last_message="Task waiting to be executed", + last_message="Task awaiting execution", + **self._kwargs + ) + + def _task_staging(self, event): + self.service_client.workflows.updateTaskExecutionStatus( + pipeline_run_uuid=event.payload.pipeline_run.uuid, + task_execution_uuid=event.task.execution_uuid, + status="staging", + last_message="Workflow executor preparing to execute task", **self._kwargs ) diff --git a/src/engine/src/core/workflows/executors/WorkflowExecutor.py b/src/engine/src/core/workflows/executors/WorkflowExecutor.py index 3bb81986..fad6f692 100644 --- a/src/engine/src/core/workflows/executors/WorkflowExecutor.py +++ b/src/engine/src/core/workflows/executors/WorkflowExecutor.py @@ -16,7 +16,7 @@ ) from owe_python_sdk.events.types import ( PIPELINE_ACTIVE, PIPELINE_COMPLETED, PIPELINE_FAILED, PIPELINE_TERMINATED, - TASK_ACTIVE, TASK_COMPLETED, TASK_FAILED + PIPELINE_STAGING, TASK_STAGING, TASK_ACTIVE, TASK_COMPLETED, TASK_FAILED ) from helpers.GraphValidator import GraphValidator # From shared from helpers.GitCacheService import GitCacheService @@ -175,6 +175,9 @@ def _staging(self, ctx): # Prepare the file system for this pipeline and handle pipeline templating self._prepare_pipeline() + # Publish the active event + self.publish(Event(PIPELINE_STAGING, self.state.ctx)) + # Setup the server and the pipeline run loggers self._setup_loggers() diff --git a/src/engine/src/owe_python_sdk/events/types.py b/src/engine/src/owe_python_sdk/events/types.py index c8e2f56a..bd9f0df2 100644 --- a/src/engine/src/owe_python_sdk/events/types.py +++ b/src/engine/src/owe_python_sdk/events/types.py @@ -7,13 +7,14 @@ PIPELINE_TERMINATED = 7 TASK_ACTIVE = 8 TASK_PENDING = 9 -TASK_COMPLETED = 10 -TASK_FAILED = 11 -TASK_SUSPENDED = 12 -TASK_ARCHIVING = 13 # NOTE Currently only archiving at the pipeline level. When implemented, remove this comment. -TASK_TERMINATED = 14 -TASK_BACKOFF = 15 -ARCHIVING_FAILED = 16 -ARCHIVING_COMPLETED = 17 -TASK_SKIPPED = 18 -PIPELINE_SKIPPED = 19 \ No newline at end of file +TASK_STAGING = 10 +TASK_COMPLETED = 11 +TASK_FAILED = 12 +TASK_SUSPENDED = 13 +TASK_ARCHIVING = 14 # NOTE Currently only archiving at the pipeline level. When implemented, remove this comment. +TASK_TERMINATED = 15 +TASK_BACKOFF = 16 +ARCHIVING_FAILED = 17 +ARCHIVING_COMPLETED = 18 +TASK_SKIPPED = 19 +PIPELINE_SKIPPED = 20 \ No newline at end of file diff --git a/src/shared/OWESpec.yaml b/src/shared/OWESpec.yaml index 2e5a44b5..772c9cd8 100644 --- a/src/shared/OWESpec.yaml +++ b/src/shared/OWESpec.yaml @@ -3047,8 +3047,9 @@ components: EnumRunStatus: type: string enum: - - active + - staging - submitted + - active - pending - backoff - completed