Skip to content

Commit

Permalink
Add termination endpoint workflows
Browse files Browse the repository at this point in the history
  • Loading branch information
nathandf committed Nov 5, 2024
1 parent 82487cb commit 41b162a
Show file tree
Hide file tree
Showing 11 changed files with 423 additions and 184 deletions.
29 changes: 29 additions & 0 deletions src/api/specs/WorkflowsAPI.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2085,6 +2085,35 @@ paths:
$ref: '#/components/schemas/RespPipelineLock'

'/v3/workflows/groups/{group_id}/pipelines/{pipeline_id}/runs/{pipeline_run_uuid}':
post:
tags:
- PipelineRuns
summary: Terminate a running pipeline
description: Terminate a running pipeline
operationId: terminatePipeline
parameters:
- name: group_id
in: path
required: true
schema:
$ref: '#/components/schemas/ID'
- name: pipeline_id
in: path
required: true
schema:
$ref: '#/components/schemas/ID'
- name: pipeline_run_uuid
in: path
required: true
schema:
type: string
responses:
'200':
description: Pipeline terminated.
content:
application/json:
schema:
$ref: '#/components/schemas/RespPipelineRun'
get:
tags:
- PipelineRuns
Expand Down
29 changes: 9 additions & 20 deletions src/api/src/backend/helpers/PipelineDispatchRequestBuilder.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from uuid import uuid4
from django.forms import model_to_dict

from backend.utils.parse_directives import parse_directives as parse
from backend.conf.constants import WORKFLOW_EXECUTOR_ACCESS_TOKEN
from backend.serializers import TaskSerializer, PipelineSerializer

Expand All @@ -19,6 +18,7 @@ def build(
description=None,
commit=None,
directives=None,
run=None,
args={}
):
# Get the pipeline tasks, their contexts, destinations, and respective
Expand Down Expand Up @@ -89,26 +89,15 @@ def build(

request["meta"]["origin"] = base_url # Origin of the request

request["pipeline_run"] = {}
request["pipeline_run"] = run if run else {}
if not run:
uuid = uuid4()
request["pipeline_run"]["uuid"] = uuid
request["pipeline_run"]["name"] = name or uuid
request["pipeline_run"]["description"] = description

# Generate the uuid for this pipeline run
uuid = uuid4()
request["pipeline_run"]["uuid"] = uuid
request["pipeline_run"]["name"] = name or uuid
request["pipeline_run"]["description"] = description

# # Parse the directives from the commit message
# directives_request = {}
# if commit != None:
# directives_request = parse(commit)

# if directives != None and len(directives) > 0:
# directive_str = f"[{'|'.join([d for d in directives])}]"
# directives_request = parse(directive_str)

# request["directives"] = directives_request

request["directives"] = {}
# if not directives are provided. Default to RUN
request["directives"] = directives if directives else {"RUN": run.uuid}

return request

Expand Down
65 changes: 47 additions & 18 deletions src/api/src/backend/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,17 +64,58 @@
(TASK_PROTOCOL_FTPS, "ftps"),
]

FUNCTION_TASK_RUNTIME_PYTHON_LATEST = EnumRuntimeEnvironment.PythonLatest
FUNCTION_TASK_RUNTIME_PYTHON_SLIM = EnumRuntimeEnvironment.PythonSlim
FUNCTION_TASK_RUNTIME_PYTHON312 = EnumRuntimeEnvironment.Python312
FUNCTION_TASK_RUNTIME_PYTHON312_SLIM = EnumRuntimeEnvironment.Python312Slim
FUNCTION_TASK_RUNTIME_PYTHON311 = EnumRuntimeEnvironment.Python311
FUNCTION_TASK_RUNTIME_PYTHON311_SLIM = EnumRuntimeEnvironment.Python311Slim
FUNCTION_TASK_RUNTIME_PYTHON310 = EnumRuntimeEnvironment.Python10
FUNCTION_TASK_RUNTIME_PYTHON310_SLIM = EnumRuntimeEnvironment.Python10Slim
FUNCTION_TASK_RUNTIME_PYTHON39 = EnumRuntimeEnvironment.Python39
FUNCTION_TASK_RUNTIME_PYTHON39_SLIM = EnumRuntimeEnvironment.Python39Slim
FUNCTION_TASK_RUNTIME_PYTHON38 = EnumRuntimeEnvironment.Python38
FUNCTION_TASK_RUNTIME_PYTHON38_SLIM = EnumRuntimeEnvironment.Python38Slim
FUNCTION_TASK_RUNTIME_PYTHON_TENSORFLOW_LATEST = EnumRuntimeEnvironment.TensorflowLatest
FUNCTION_TASK_RUNTIME_PYTHON_TENSORFLOW_LATEST_GPU = EnumRuntimeEnvironment.TensorflowLatestGPU
FUNCTION_TASK_RUNTIME_PYTHON_TENSORFLOW2120 = EnumRuntimeEnvironment.Tensorflow2120
FUNCTION_TASK_RUNTIME_PYTHON_TENSORFLOW2120_GPU = EnumRuntimeEnvironment.Tensorflow2120GPU
FUNCTION_TASK_RUNTIME_PYTHON_PYTORCH_LATEST = EnumRuntimeEnvironment.PytorchLatest
FUNCTION_TASK_RUNTIME_PYTHON_PYTORCH_LATEST_GPU = EnumRuntimeEnvironment.HuggingfaceTranformersPytorchGPULatest
FUNCTION_TASK_RUNTIME_PYTHON_HUGGINGFACE_TRANSFORMERS_PYTORCH_GPU4292 = EnumRuntimeEnvironment.HuggingfaceTranformersPytorchGPU4292
FUNCTION_TASK_RUNTIME_PYTHON_SINGULARITY = EnumRuntimeEnvironment.PythonSingularity
FUNCTION_TASK_RUNTIME_PYTHON_PYGEOFLOOD = EnumRuntimeEnvironment.PyGeoFlood
FUNCTION_TASK_RUNTIMES = [
(FUNCTION_TASK_RUNTIME_PYTHON_LATEST, EnumRuntimeEnvironment.PythonLatest),
(FUNCTION_TASK_RUNTIME_PYTHON_SLIM, EnumRuntimeEnvironment.PythonSlim),
(FUNCTION_TASK_RUNTIME_PYTHON312, EnumRuntimeEnvironment.Python312),
(FUNCTION_TASK_RUNTIME_PYTHON312_SLIM, EnumRuntimeEnvironment.Python312Slim),
(FUNCTION_TASK_RUNTIME_PYTHON311, EnumRuntimeEnvironment.Python311),
(FUNCTION_TASK_RUNTIME_PYTHON311_SLIM, EnumRuntimeEnvironment.Python311Slim),
(FUNCTION_TASK_RUNTIME_PYTHON310, EnumRuntimeEnvironment.Python10),
(FUNCTION_TASK_RUNTIME_PYTHON310_SLIM, EnumRuntimeEnvironment.Python10Slim),
(FUNCTION_TASK_RUNTIME_PYTHON39, EnumRuntimeEnvironment.Python39),
(FUNCTION_TASK_RUNTIME_PYTHON_SINGULARITY, EnumRuntimeEnvironment.PythonSingularity)
(FUNCTION_TASK_RUNTIME_PYTHON39_SLIM, EnumRuntimeEnvironment.Python39Slim),
(FUNCTION_TASK_RUNTIME_PYTHON38, EnumRuntimeEnvironment.Python38),
(FUNCTION_TASK_RUNTIME_PYTHON38_SLIM, EnumRuntimeEnvironment.Python38Slim),
(FUNCTION_TASK_RUNTIME_PYTHON_TENSORFLOW_LATEST, EnumRuntimeEnvironment.TensorflowLatest),
(FUNCTION_TASK_RUNTIME_PYTHON_TENSORFLOW_LATEST_GPU, EnumRuntimeEnvironment.TensorflowLatestGPU),
(FUNCTION_TASK_RUNTIME_PYTHON_TENSORFLOW2120, EnumRuntimeEnvironment.Tensorflow2120),
(FUNCTION_TASK_RUNTIME_PYTHON_TENSORFLOW2120_GPU, EnumRuntimeEnvironment.Tensorflow2120GPU),
(FUNCTION_TASK_RUNTIME_PYTHON_PYTORCH_LATEST, EnumRuntimeEnvironment.PytorchLatest),
(FUNCTION_TASK_RUNTIME_PYTHON_PYTORCH_LATEST_GPU, EnumRuntimeEnvironment.HuggingfaceTranformersPytorchGPULatest),
(FUNCTION_TASK_RUNTIME_PYTHON_HUGGINGFACE_TRANSFORMERS_PYTORCH_GPU4292, EnumRuntimeEnvironment.HuggingfaceTranformersPytorchGPU4292),
(FUNCTION_TASK_RUNTIME_PYTHON_SINGULARITY, EnumRuntimeEnvironment.PythonSingularity),
(FUNCTION_TASK_RUNTIME_PYTHON_PYGEOFLOOD, EnumRuntimeEnvironment.PyGeoFlood),
]

FUNCTION_TASK_INSTALLERS = [
(EnumInstaller.Pip, EnumInstaller.Pip)
]

TASK_FLAVOR_C1_TINY = EnumTaskFlavor.C1_TINY
TASK_FLAVOR_C1_XXSML = EnumTaskFlavor.C1_XXSML
TASK_FLAVOR_C1_XSML = EnumTaskFlavor.C1_XSML
TASK_FLAVOR_C1_SML = EnumTaskFlavor.C1_SML
TASK_FLAVOR_C1_MED = EnumTaskFlavor.C1_MED
TASK_FLAVOR_C1_LRG = EnumTaskFlavor.C1_LRG
Expand All @@ -85,6 +126,9 @@
TASK_FLAVOR_G1_NVD_LRG = EnumTaskFlavor.G1_NVD_LRG

TASK_FLAVORS = [
(TASK_FLAVOR_C1_TINY, EnumTaskFlavor.C1_TINY),
(TASK_FLAVOR_C1_XXSML, EnumTaskFlavor.C1_XXSML),
(TASK_FLAVOR_C1_XSML, EnumTaskFlavor.C1_XSML),
(TASK_FLAVOR_C1_SML, EnumTaskFlavor.C1_SML),
(TASK_FLAVOR_C1_MED, EnumTaskFlavor.C1_MED),
(TASK_FLAVOR_C1_LRG, EnumTaskFlavor.C1_LRG),
Expand Down Expand Up @@ -452,10 +496,10 @@ class PipelineRun(models.Model):

class Secret(models.Model):
id = models.CharField(max_length=128)
tenant_id = models.CharField(max_length=128)
description = models.TextField(null=True)
sk_secret_name = models.CharField(max_length=128, unique=True)
owner = models.CharField(max_length=64)
sk_secret_name = models.CharField(max_length=128, unique=True)
tenant_id = models.CharField(max_length=128)
uuid = models.UUIDField(primary_key=True, default=uuid.uuid4)

class Meta:
Expand Down Expand Up @@ -542,10 +586,6 @@ class Meta:

def clean(self):
errors = {}

# Validate runtimes
(success, error) = self.validate_function_task_installers()
if not success: errors = {**errors, "invalid-runtime-installer": error}

# Validate packages schema
(success, error) = self.validate_packages_schema()
Expand All @@ -554,17 +594,6 @@ def clean(self):
if errors:
raise ValidationError(errors)

def validate_function_task_installers(self) -> Tuple[bool, str]:
installer_runtime_mapping = {
FUNCTION_TASK_RUNTIME_PYTHON39: [EnumInstaller.Pip],
FUNCTION_TASK_RUNTIME_PYTHON_SINGULARITY: [EnumInstaller.Pip]
}

installers_for_runtime = installer_runtime_mapping.get(self.runtime, None)
if installers_for_runtime == None: return (False, f"Invalid runtime '{self.runtime}'")
if self.installer not in installers_for_runtime:
return (False, f"Installer '{self.installer}' for runtime {self.runtime}")

def validate_packages_schema(self) -> Tuple[bool, str]:
if type(self.packages) != list:
return (False, f"Invalid installer: Installer '{self.installer}' for runtime {self.runtime}")
Expand Down
35 changes: 18 additions & 17 deletions src/api/src/backend/services/PipelineDispatcher.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import json, logging
import json

from uuid import UUID

from django.db import IntegrityError, DatabaseError, OperationalError
from django.utils import timezone

from backend.utils import logger
from backend.services.MessageBroker import service as broker
from backend.models import Pipeline, PipelineRun, RUN_STATUS_SUBMITTED
from backend.errors.api import ServerError
Expand All @@ -14,20 +14,21 @@ class PipelineDispatcher:
def __init__(self):
self.error = None

def dispatch(self, service_request: dict, pipeline):
def dispatch(self, service_request: dict, pipeline, pipeline_run=None):

now = timezone.now()
try:
# Create the pipeline run object
pipeline_run = PipelineRun.objects.create(
name=service_request["pipeline_run"]["name"],
description=service_request["pipeline_run"]["description"],
pipeline=pipeline,
status=RUN_STATUS_SUBMITTED,
uuid=service_request["pipeline_run"]["uuid"],
started_at=now,
last_modified=now
)
# Create the pipeline run object if one was not provied
if pipeline_run != None:
pipeline_run = PipelineRun.objects.create(
name=service_request["pipeline_run"]["name"],
description=service_request["pipeline_run"]["description"],
pipeline=pipeline,
status=RUN_STATUS_SUBMITTED,
uuid=service_request["pipeline_run"]["uuid"],
started_at=now,
last_modified=now
)

# Update the pipeline object with the pipeline run
pipeline = Pipeline.objects.filter(pk=pipeline.uuid).first()
Expand All @@ -43,11 +44,10 @@ def dispatch(self, service_request: dict, pipeline):
service_request["pipeline"].update(service_request_update)

except (IntegrityError, DatabaseError, OperationalError) as e:
message = f"Failed to create PipelineRun: {e.__cause__}"
logging.error(message)
logger.exception(e.__cause__)
raise ServerError(message=message)
except Exception as e:
logging.error(str(e))
logger.exception(e.__cause__)
raise ServerError(message=str(e))

try:
Expand All @@ -57,7 +57,8 @@ def dispatch(self, service_request: dict, pipeline):
)
except Exception as e: # TODO use exact exception
message = f"Failed publish the service request to the message broker: {e.__cause__}"
logging.error(message)
logger.error(message)
logger.exception(e.__cause__)
raise ServerError(message=message)

return pipeline_run
Expand Down
95 changes: 93 additions & 2 deletions src/api/src/backend/views/PipelineRuns.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,105 @@

from backend.views.RestrictedAPIView import RestrictedAPIView
from backend.views.http.responses.BaseResponse import BaseResponse
from backend.views.http.responses.models import ModelResponse, ModelListResponse
from backend.views.http.responses.errors import (
ServerError,
ServerError as ServerErrorResp,
Forbidden,
NotFound,
BadRequest
)
from backend.services.GroupService import service as group_service
from backend.models import PipelineRun, Pipeline
from backend.helpers.PipelineDispatchRequestBuilder import PipelineDispatchRequestBuilder
from backend.services.PipelineDispatcher import service as pipeline_dispatcher
from backend.errors.api import ServerError
from backend.utils import logger
from backend.services.CredentialsService import service as credentials_service


request_builder = PipelineDispatchRequestBuilder(credentials_service)

class PipelineRuns(RestrictedAPIView):
def post(self, request, group_id, pipeline_id, pipeline_run_uuid):
try:
# Get the group
group = group_service.get(group_id, request.tenant_id)
if group == None:
return NotFound(f"No group found with id '{group_id}'")

# Check that the user belongs to the group
if not group_service.user_in_group(request.username, group_id, request.tenant_id):
return Forbidden(message="You do not have access to this group")

# Find a pipeline that matches the request data
pipeline = Pipeline.objects.filter(
id=pipeline_id,
group=group
).prefetch_related(
"group",
"archives",
"archives__archive",
"tasks",
"tasks__context",
"tasks__context__credentials",
"tasks__context__identity",
"tasks__destination",
"tasks__destination__credentials",
"tasks__destination__identity",
).first()

# Return if NotFound if no pipeline found
if pipeline == None:
return NotFound(f"Pipline '{pipeline_id}' does not exist")

# Return NotFound if run not found
pipeline_run = PipelineRun.objects.filter(
pipeline=pipeline,
uuid=pipeline_run_uuid
).next()

if not pipeline_run:
return BadRequest(f"PiplineRun with uuid '{pipeline_run_uuid}' does not exist")

try:
# Build the pipeline dispatch request
pipeline_dispatch_request = request_builder.build(
request.base_url,
group,
pipeline,
directives={"TEMINATE_RUN": [pipeline_run_uuid]},
args={},
run=pipeline_run,
)
# Dispatch the request
pipeline_dispatcher.dispatch(pipeline_dispatch_request, pipeline, pipeline_run=pipeline_run)
except ServerError as e:
return ServerErrorResp(message=str(e))
except Exception as e:
return ServerErrorResp(message=str(e))


# Format the started at and last_modified
run = model_to_dict(run)

run["started_at"] = run["started_at"].strftime("%Y-%m-%d %H:%M:%S") if run["started_at"] else None
run["last_modified"] = run["last_modified"].strftime("%Y-%m-%d %H:%M:%S") if run["last_modified"] else None

return BaseResponse(
status=200,
success=True,
message="success",
result=run
)

# TODO catch the specific error thrown by the group service
except (DatabaseError, IntegrityError, OperationalError) as e:
logger.exception(e.__cause__)
return ServerError(message=e.__cause__)
except Exception as e:
logger.exception(e.__cause__)
return ServerError(message=e)


def get(self, request, group_id, pipeline_id, pipeline_run_uuid=None, *_, **__):
try:
# Get the group
Expand Down Expand Up @@ -62,8 +149,10 @@ def get(self, request, group_id, pipeline_id, pipeline_run_uuid=None, *_, **__)

# TODO catch the specific error thrown by the group service
except (DatabaseError, IntegrityError, OperationalError) as e:
logger.exception(e.__cause__)
return ServerError(message=e.__cause__)
except Exception as e:
logger.exception(e.__cause__)
return ServerError(message=e)


Expand All @@ -85,8 +174,10 @@ def list(self, pipeline):
result=runs
)
except (DatabaseError, IntegrityError, OperationalError) as e:
logger.exception(e.__cause__)
return ServerError(message=e.__cause__)
except Exception as e:
logger.exception(e.__cause__)
return ServerError(message=e)


Loading

0 comments on commit 41b162a

Please sign in to comment.