Skip to content

Commit

Permalink
add pipeline and task 'staging' status. update spec. add migration
Browse files Browse the repository at this point in the history
  • Loading branch information
nathandf committed Oct 16, 2023
1 parent 99324e4 commit a9136f2
Show file tree
Hide file tree
Showing 7 changed files with 56 additions and 17 deletions.
3 changes: 2 additions & 1 deletion src/api/specs/WorkflowsAPI.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3047,8 +3047,9 @@ components:
EnumRunStatus:
type: string
enum:
- active
- staging
- submitted
- active
- pending
- backoff
- completed
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Generated by Django 4.1.2 on 2023-10-16 18:36

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('backend', '0026_delete_event'),
]

operations = [
migrations.AlterField(
model_name='pipelinerun',
name='status',
field=models.CharField(choices=[('pending', 'pending'), ('staging', 'staging'), ('active', 'active'), ('backoff', 'backoff'), ('completed', 'completed'), ('failed', 'failed'), ('suspended', 'suspended'), ('archiving', 'archiving'), ('terminated', 'terminated'), ('submitted', 'submitted'), ('deferred', 'deferred')], default='submitted', max_length=16),
),
migrations.AlterField(
model_name='taskexecution',
name='status',
field=models.CharField(choices=[('pending', 'pending'), ('staging', 'staging'), ('active', 'active'), ('backoff', 'backoff'), ('completed', 'completed'), ('failed', 'failed'), ('suspended', 'suspended'), ('archiving', 'archiving'), ('terminated', 'terminated'), ('skipped', 'skipped')], default='pending', max_length=16),
),
]
4 changes: 2 additions & 2 deletions src/engine/src/contrib/tapis/TapisPlugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
PIPELINE_ACTIVE, PIPELINE_COMPLETED, PIPELINE_ARCHIVING, PIPELINE_FAILED,
PIPELINE_STAGING, PIPELINE_SUSPENDED, PIPELINE_TERMINATED, TASK_ACTIVE,
TASK_ARCHIVING, TASK_BACKOFF, TASK_COMPLETED, TASK_FAILED, TASK_PENDING,
TASK_SUSPENDED, TASK_TERMINATED
TASK_SUSPENDED, TASK_TERMINATED, TASK_STAGING, TASK_SKIPPED
)


Expand Down Expand Up @@ -34,7 +34,7 @@ def __init__(self, name):
PIPELINE_ACTIVE, PIPELINE_COMPLETED, PIPELINE_ARCHIVING, PIPELINE_FAILED,
PIPELINE_STAGING, PIPELINE_SUSPENDED, PIPELINE_TERMINATED, TASK_ACTIVE,
TASK_ARCHIVING, TASK_BACKOFF, TASK_COMPLETED, TASK_FAILED, TASK_PENDING,
TASK_SUSPENDED, TASK_TERMINATED
TASK_SUSPENDED, TASK_TERMINATED, TASK_STAGING, TASK_SKIPPED
]
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
PIPELINE_ACTIVE, PIPELINE_ARCHIVING, PIPELINE_COMPLETED, PIPELINE_FAILED,
PIPELINE_STAGING, PIPELINE_SUSPENDED, PIPELINE_TERMINATED, PIPELINE_SKIPPED,
TASK_ACTIVE, TASK_ARCHIVING, TASK_BACKOFF, TASK_COMPLETED, TASK_FAILED,
TASK_PENDING, TASK_SUSPENDED, TASK_TERMINATED, TASK_SKIPPED
TASK_PENDING, TASK_STAGING, TASK_SUSPENDED, TASK_TERMINATED, TASK_SKIPPED
)
from owe_python_sdk.constants import STDERR, STDOUT

Expand All @@ -31,6 +31,7 @@ def __init__(self, ctx):
TASK_COMPLETED: self._task_completed,
TASK_FAILED: self._task_failed,
TASK_PENDING: self._task_pending,
TASK_STAGING: self._task_staging,
TASK_SUSPENDED: self._task_suspended,
TASK_TERMINATED: self._task_terminated,
# TASK_SKIPPED: self._task_skipped
Expand Down Expand Up @@ -196,7 +197,16 @@ def _task_pending(self, event):
pipeline_run_uuid=event.payload.pipeline_run.uuid,
task_execution_uuid=event.task.execution_uuid,
status="pending",
last_message="Task waiting to be executed",
last_message="Task awaiting execution",
**self._kwargs
)

def _task_staging(self, event):
self.service_client.workflows.updateTaskExecutionStatus(
pipeline_run_uuid=event.payload.pipeline_run.uuid,
task_execution_uuid=event.task.execution_uuid,
status="staging",
last_message="Workflow executor preparing to execute task",
**self._kwargs
)

Expand Down
5 changes: 4 additions & 1 deletion src/engine/src/core/workflows/executors/WorkflowExecutor.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
)
from owe_python_sdk.events.types import (
PIPELINE_ACTIVE, PIPELINE_COMPLETED, PIPELINE_FAILED, PIPELINE_TERMINATED,
TASK_ACTIVE, TASK_COMPLETED, TASK_FAILED
PIPELINE_STAGING, TASK_STAGING, TASK_ACTIVE, TASK_COMPLETED, TASK_FAILED
)
from helpers.GraphValidator import GraphValidator # From shared
from helpers.GitCacheService import GitCacheService
Expand Down Expand Up @@ -175,6 +175,9 @@ def _staging(self, ctx):
# Prepare the file system for this pipeline and handle pipeline templating
self._prepare_pipeline()

# Publish the active event
self.publish(Event(PIPELINE_STAGING, self.state.ctx))

# Setup the server and the pipeline run loggers
self._setup_loggers()

Expand Down
21 changes: 11 additions & 10 deletions src/engine/src/owe_python_sdk/events/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,14 @@
PIPELINE_TERMINATED = 7
TASK_ACTIVE = 8
TASK_PENDING = 9
TASK_COMPLETED = 10
TASK_FAILED = 11
TASK_SUSPENDED = 12
TASK_ARCHIVING = 13 # NOTE Currently only archiving at the pipeline level. When implemented, remove this comment.
TASK_TERMINATED = 14
TASK_BACKOFF = 15
ARCHIVING_FAILED = 16
ARCHIVING_COMPLETED = 17
TASK_SKIPPED = 18
PIPELINE_SKIPPED = 19
TASK_STAGING = 10
TASK_COMPLETED = 11
TASK_FAILED = 12
TASK_SUSPENDED = 13
TASK_ARCHIVING = 14 # NOTE Currently only archiving at the pipeline level. When implemented, remove this comment.
TASK_TERMINATED = 15
TASK_BACKOFF = 16
ARCHIVING_FAILED = 17
ARCHIVING_COMPLETED = 18
TASK_SKIPPED = 19
PIPELINE_SKIPPED = 20
3 changes: 2 additions & 1 deletion src/shared/OWESpec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3047,8 +3047,9 @@ components:
EnumRunStatus:
type: string
enum:
- active
- staging
- submitted
- active
- pending
- backoff
- completed
Expand Down

0 comments on commit a9136f2

Please sign in to comment.