From 222878894e1f0f79a60b82ae31bc647276cc9b89 Mon Sep 17 00:00:00 2001 From: Nathan Freeman Date: Fri, 16 Aug 2024 10:25:24 -0500 Subject: [PATCH 01/38] Serialize task output on task create --- src/api/src/backend/services/TaskService.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/api/src/backend/services/TaskService.py b/src/api/src/backend/services/TaskService.py index 6758b212..00fc15d6 100644 --- a/src/api/src/backend/services/TaskService.py +++ b/src/api/src/backend/services/TaskService.py @@ -73,11 +73,15 @@ def create(self, pipeline, request): self.rollback() raise e - # Convert the input to jsonserializable + # Convert the input and output to jsonserializable _input = {} for key in request.input: _input[key] = request.input[key].dict() + output = {} + for key in request.output: + output[key] = request.output[key].dict() + # Prepare the uses property uses = getattr(request, "uses", None) if uses != None: @@ -111,7 +115,7 @@ def create(self, pipeline, request): input=_input, installer=getattr(request, "installer", None), id=request.id, - output=request.output, + output=output, packages=getattr(request, "packages", None), pipeline=pipeline, poll=getattr(request, "poll", None), From 4619f941557fc239f73bf02a2c6cc44bed1115f2 Mon Sep 17 00:00:00 2001 From: Nathan Freeman Date: Wed, 28 Aug 2024 13:20:04 -0500 Subject: [PATCH 02/38] Add task model serializers --- .../serializers/ApplicationTaskSerializer.py | 11 ++++++ .../backend/serializers/ContextSerializer.py | 19 +++++++++++ .../serializers/CredentialsSerializer.py | 16 +++++++++ .../serializers/DestinationSerializer.py | 16 +++++++++ .../serializers/ImageBuildTaskSerializer.py | 15 ++++++++ .../serializers/RequestTaskSerializer.py | 18 ++++++++++ .../serializers/TapisActorTaskSerializer.py | 13 +++++++ .../serializers/TapisJobTaskSerializer.py | 12 +++++++ .../src/backend/serializers/TaskSerializer.py | 34 ++++++++++++++++++- .../serializers/TemplateTaskSerializer.py | 11 ++++++ src/api/src/backend/serializers/__init__.py | 11 +++++- src/api/src/backend/views/Tasks.py | 13 +++++-- 12 files changed, 185 insertions(+), 4 deletions(-) create mode 100644 src/api/src/backend/serializers/ApplicationTaskSerializer.py create mode 100644 src/api/src/backend/serializers/ContextSerializer.py create mode 100644 src/api/src/backend/serializers/CredentialsSerializer.py create mode 100644 src/api/src/backend/serializers/DestinationSerializer.py create mode 100644 src/api/src/backend/serializers/ImageBuildTaskSerializer.py create mode 100644 src/api/src/backend/serializers/RequestTaskSerializer.py create mode 100644 src/api/src/backend/serializers/TapisActorTaskSerializer.py create mode 100644 src/api/src/backend/serializers/TapisJobTaskSerializer.py create mode 100644 src/api/src/backend/serializers/TemplateTaskSerializer.py diff --git a/src/api/src/backend/serializers/ApplicationTaskSerializer.py b/src/api/src/backend/serializers/ApplicationTaskSerializer.py new file mode 100644 index 00000000..2ad78320 --- /dev/null +++ b/src/api/src/backend/serializers/ApplicationTaskSerializer.py @@ -0,0 +1,11 @@ +from backend.serializers import BaseTaskSerializer + + +class ApplicationTaskSerializer: + @staticmethod + def serialize(model, base=None): + task = base if base != None else BaseTaskSerializer.serialize(model) + + task["image"] = model.image + + return task \ No newline at end of file diff --git a/src/api/src/backend/serializers/ContextSerializer.py b/src/api/src/backend/serializers/ContextSerializer.py new file mode 100644 index 00000000..6e45a3e3 --- /dev/null +++ b/src/api/src/backend/serializers/ContextSerializer.py @@ -0,0 +1,19 @@ +from backend.serializers.CredentialsSerializer import CredentialsSerializer +from backend.serializers.UUIDSerializer import UUIDSerializer + + +class ContextSerializer: + @staticmethod + def serialize(model): + context = {} + context["branch"] = model + context["credentials"] = CredentialsSerializer.serialize(model.credentials) + context["recipe_file_path"] = model.recipe_file_path + context["sub_path"] = model.sub_path + context["tag"] = model.tag + context["type"] = model.type + context["url"] = model.url + context["visibility"] = model.visibility + context["uuid"] = UUIDSerializer.serialize(model.uuid) + + return context \ No newline at end of file diff --git a/src/api/src/backend/serializers/CredentialsSerializer.py b/src/api/src/backend/serializers/CredentialsSerializer.py new file mode 100644 index 00000000..79aad317 --- /dev/null +++ b/src/api/src/backend/serializers/CredentialsSerializer.py @@ -0,0 +1,16 @@ +from backend.serializers.UUIDSerializer import UUIDSerializer + + +class CredentialsSerializer: + @staticmethod + def serialize(model): + + credential = {} + + credential["sk_id"] = model.sk_id + credential["owner"] = model.owner + credential["created_at"] = model.created_at + credential["uuid"] = UUIDSerializer.serialize(model.uuid) + + + return credential \ No newline at end of file diff --git a/src/api/src/backend/serializers/DestinationSerializer.py b/src/api/src/backend/serializers/DestinationSerializer.py new file mode 100644 index 00000000..a1574d35 --- /dev/null +++ b/src/api/src/backend/serializers/DestinationSerializer.py @@ -0,0 +1,16 @@ +from backend.serializers.CredentialsSerializer import CredentialsSerializer +from backend.serializers.UUIDSerializer import UUIDSerializer + + +class DestinationSerializer: + @staticmethod + def serialize(model): + destination = {} + destination["tag"] = model.tag, + destination["type"] = model.type, + destination["url"] = model.url, + destination["filename"] = model.filename, + destination["credentials"] = CredentialsSerializer.serialize(model.credentials) + destination["uuid"] = UUIDSerializer.serialize(model.uuid) + + return destination \ No newline at end of file diff --git a/src/api/src/backend/serializers/ImageBuildTaskSerializer.py b/src/api/src/backend/serializers/ImageBuildTaskSerializer.py new file mode 100644 index 00000000..ae2e144e --- /dev/null +++ b/src/api/src/backend/serializers/ImageBuildTaskSerializer.py @@ -0,0 +1,15 @@ +from backend.serializers import BaseTaskSerializer +from backend.serializers.ContextSerializer import ContextSerializer +from backend.serializers.DestinationSerializer import DestinationSerializer + + +class ImageBuildTaskSerializer: + @staticmethod + def serialize(model, base=None): + task = base if base != None else BaseTaskSerializer.serialize(model) + + task["builder"] = model.builder + task["context"] = ContextSerializer.serialize(model.context) + task["destination"] = DestinationSerializer(model.destination) + + return task \ No newline at end of file diff --git a/src/api/src/backend/serializers/RequestTaskSerializer.py b/src/api/src/backend/serializers/RequestTaskSerializer.py new file mode 100644 index 00000000..bb687966 --- /dev/null +++ b/src/api/src/backend/serializers/RequestTaskSerializer.py @@ -0,0 +1,18 @@ +from backend.serializers import BaseTaskSerializer +from backend.serializers.CredentialsSerializer import CredentialsSerializer + + +class RequestTaskSerializer: + @staticmethod + def serialize(model, base=None): + task = base if base != None else BaseTaskSerializer.serialize(model) + + task["auth"] = CredentialsSerializer.serialize(model.auth) + task["data"] = model.data + task["headers"] = model.headers + task["http_method"] = model.http_method + task["protocol"] = model.protocol + task["query_params"] = model.query_params + task["url"] = model.url + + return task \ No newline at end of file diff --git a/src/api/src/backend/serializers/TapisActorTaskSerializer.py b/src/api/src/backend/serializers/TapisActorTaskSerializer.py new file mode 100644 index 00000000..d0050ad5 --- /dev/null +++ b/src/api/src/backend/serializers/TapisActorTaskSerializer.py @@ -0,0 +1,13 @@ +from backend.serializers import BaseTaskSerializer + + +class TapisActorTaskSerializer: + @staticmethod + def serialize(model, base=None): + task = base if base != None else BaseTaskSerializer.serialize(model) + + task["tapis_actor_id"] = model.tapis_job_def + task["poll"] = model.poll + task["tapis_actor_message"] = model.tapis_actor_message + + return task \ No newline at end of file diff --git a/src/api/src/backend/serializers/TapisJobTaskSerializer.py b/src/api/src/backend/serializers/TapisJobTaskSerializer.py new file mode 100644 index 00000000..78e5508d --- /dev/null +++ b/src/api/src/backend/serializers/TapisJobTaskSerializer.py @@ -0,0 +1,12 @@ +from backend.serializers import BaseTaskSerializer + + +class TapisJobTaskSerializer: + @staticmethod + def serialize(model, base=None): + task = base if base != None else BaseTaskSerializer.serialize(model) + + task["tapis_job_def"] = model.tapis_job_def + task["poll"] = model.poll + + return task \ No newline at end of file diff --git a/src/api/src/backend/serializers/TaskSerializer.py b/src/api/src/backend/serializers/TaskSerializer.py index 43e6ee3d..5e7607ff 100644 --- a/src/api/src/backend/serializers/TaskSerializer.py +++ b/src/api/src/backend/serializers/TaskSerializer.py @@ -1,6 +1,20 @@ from backend.serializers.BaseTaskSerializer import BaseTaskSerializer from backend.serializers.FunctionTaskSerializer import FunctionTaskSerializer -from backend.models import TASK_TYPE_FUNCTION +from backend.serializers.ImageBuildTaskSerializer import ImageBuildTaskSerializer +from backend.serializers.TapisActorTaskSerializer import TapisActorTaskSerializer +from backend.serializers.TapisJobTaskSerializer import TapisJobTaskSerializer +from backend.serializers.ApplicationTaskSerializer import ApplicationTaskSerializer +from backend.serializers.TemplateTaskSerializer import TemplateTaskSerializer +from backend.serializers.RequestTaskSerializer import RequestTaskSerializer +from backend.models import ( + TASK_TYPE_FUNCTION, + TASK_TYPE_APPLICATION, + TASK_TYPE_TEMPLATE, + TASK_TYPE_TAPIS_ACTOR, + TASK_TYPE_IMAGE_BUILD, + TASK_TYPE_REQUEST, + TASK_TYPE_TAPIS_JOB +) class TaskSerializer: @staticmethod @@ -10,4 +24,22 @@ def serialize(model): if model.type == TASK_TYPE_FUNCTION: return FunctionTaskSerializer.serialize(model, base=base) + if model.type == TASK_TYPE_TEMPLATE: + return TemplateTaskSerializer.serialize(model, base=base) + + if model.type == TASK_TYPE_IMAGE_BUILD: + return ImageBuildTaskSerializer.serialize(model, base=base) + + if model.type == TASK_TYPE_TAPIS_JOB: + return TapisJobTaskSerializer.serialize(model, base=base) + + if model.type == TASK_TYPE_TAPIS_ACTOR: + return TapisActorTaskSerializer.serialize(model, base=base) + + if model.type == TASK_TYPE_REQUEST: + return RequestTaskSerializer.serialize(model, base=base) + + if model.type == TASK_TYPE_APPLICATION: + return ApplicationTaskSerializer.serialize(model, base=base) + raise NotImplementedError(f"Task Serializer does not have a method for serializing tasks of type '{model.type}'") \ No newline at end of file diff --git a/src/api/src/backend/serializers/TemplateTaskSerializer.py b/src/api/src/backend/serializers/TemplateTaskSerializer.py new file mode 100644 index 00000000..b19aeb3f --- /dev/null +++ b/src/api/src/backend/serializers/TemplateTaskSerializer.py @@ -0,0 +1,11 @@ +from backend.serializers import BaseTaskSerializer + + +class TemplateTaskSerializer: + @staticmethod + def serialize(model, base=None): + # Returning only the base task because the only property relavent to + # the template task (the `uses` property) is handled there. + task = base if base != None else BaseTaskSerializer.serialize(model) + + return task \ No newline at end of file diff --git a/src/api/src/backend/serializers/__init__.py b/src/api/src/backend/serializers/__init__.py index c9e0e722..16ae9984 100644 --- a/src/api/src/backend/serializers/__init__.py +++ b/src/api/src/backend/serializers/__init__.py @@ -4,4 +4,13 @@ from backend.serializers.BaseTaskSerializer import BaseTaskSerializer from backend.serializers.TaskSerializer import TaskSerializer from backend.serializers.TaskDTOSerializer import TaskDTOSerializer -from backend.serializers.FunctionTaskSerializer import FunctionTaskSerializer \ No newline at end of file +from backend.serializers.FunctionTaskSerializer import FunctionTaskSerializer +from backend.serializers.ImageBuildTaskSerializer import ImageBuildTaskSerializer +from backend.serializers.TapisActorTaskSerializer import TapisActorTaskSerializer +from backend.serializers.TapisJobTaskSerializer import TapisJobTaskSerializer +from backend.serializers.ApplicationTaskSerializer import ApplicationTaskSerializer +from backend.serializers.TemplateTaskSerializer import TemplateTaskSerializer +from backend.serializers.RequestTaskSerializer import RequestTaskSerializer +from backend.serializers.ContextSerializer import ContextSerializer +from backend.serializers.DestinationSerializer import DestinationSerializer +from backend.serializers.CredentialsSerializer import CredentialsSerializer \ No newline at end of file diff --git a/src/api/src/backend/views/Tasks.py b/src/api/src/backend/views/Tasks.py index 3fd6f56a..0c626150 100644 --- a/src/api/src/backend/views/Tasks.py +++ b/src/api/src/backend/views/Tasks.py @@ -7,6 +7,7 @@ from backend.views.http.responses import BaseResponse, ResourceURLResponse from backend.views.http.responses.errors import BadRequest, Forbidden, NotFound, MethodNotAllowed, ServerError from backend.views.http.responses.models import ModelListResponse, ModelResponse +from backend.views.http.responses import BaseResponse from backend.services.TaskService import service as task_service from backend.services.GroupService import service as group_service from backend.serializers import TaskSerializer, TaskDTOSerializer @@ -52,8 +53,16 @@ def get(self, request, group_id, pipeline_id, task_id=None): return ModelResponse(task) - def list(self, pipeline, *_, **__): - return ModelListResponse(Task.objects.filter(pipeline=pipeline)) + def list(self, pipeline, *_, **__): + task_models = Task.objects.filter(pipeline=pipeline) + tasks = [] + try: + for task_model in task_models: + tasks.append(TaskSerializer.serialize(task_model)) + except Exception as e: + return ServerError(f"{e}") + + return BaseResponse(result=tasks) def post(self, request, group_id, pipeline_id, *_, **__): # Validate the request body From 30d805393b52999fd7c5d863901bb09f4c7901ca Mon Sep 17 00:00:00 2001 From: Nathan Freeman Date: Wed, 28 Aug 2024 13:52:39 -0500 Subject: [PATCH 03/38] Fix credential serializer when credentials model is None --- src/api/src/backend/serializers/CredentialsSerializer.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/api/src/backend/serializers/CredentialsSerializer.py b/src/api/src/backend/serializers/CredentialsSerializer.py index 79aad317..71c3fc04 100644 --- a/src/api/src/backend/serializers/CredentialsSerializer.py +++ b/src/api/src/backend/serializers/CredentialsSerializer.py @@ -4,9 +4,10 @@ class CredentialsSerializer: @staticmethod def serialize(model): - credential = {} - + if model == None: + return None + credential["sk_id"] = model.sk_id credential["owner"] = model.owner credential["created_at"] = model.created_at From 3da772cbe80623650e551a2932962394c84bc4f3 Mon Sep 17 00:00:00 2001 From: Nathan Freeman Date: Wed, 28 Aug 2024 14:17:45 -0500 Subject: [PATCH 04/38] Log exceptions in listTask, Fix ImageBuildSerializer --- src/api/src/backend/serializers/ImageBuildTaskSerializer.py | 2 +- src/api/src/backend/views/Tasks.py | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/src/api/src/backend/serializers/ImageBuildTaskSerializer.py b/src/api/src/backend/serializers/ImageBuildTaskSerializer.py index ae2e144e..21834455 100644 --- a/src/api/src/backend/serializers/ImageBuildTaskSerializer.py +++ b/src/api/src/backend/serializers/ImageBuildTaskSerializer.py @@ -10,6 +10,6 @@ def serialize(model, base=None): task["builder"] = model.builder task["context"] = ContextSerializer.serialize(model.context) - task["destination"] = DestinationSerializer(model.destination) + task["destination"] = DestinationSerializer.serialize(model.destination) return task \ No newline at end of file diff --git a/src/api/src/backend/views/Tasks.py b/src/api/src/backend/views/Tasks.py index 0c626150..1b2ecb9c 100644 --- a/src/api/src/backend/views/Tasks.py +++ b/src/api/src/backend/views/Tasks.py @@ -110,6 +110,7 @@ def post(self, request, group_id, pipeline_id, *_, **__): except (IntegrityError, OperationalError) as e: return BadRequest(message=e.__cause__) except Exception as e: + logger.exception(e.__cause__) return ServerError(f"{e}") return ResourceURLResponse( @@ -192,6 +193,7 @@ def delete(self, request, group_id, pipeline_id, task_id): try: task.delete() except Exception as e: + logger.exception(e.__cause__) return ServerError(str(e)) return BaseResponse(message=f"Deleted task '{task_id}' on pipeline '{pipeline_id}") From dfeb56060a8fa865e16ad39b5f7dc82b351345ee Mon Sep 17 00:00:00 2001 From: Nathan Freeman Date: Wed, 28 Aug 2024 14:48:09 -0500 Subject: [PATCH 05/38] debug --- src/api/src/backend/views/Tasks.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/api/src/backend/views/Tasks.py b/src/api/src/backend/views/Tasks.py index 1b2ecb9c..0c94b938 100644 --- a/src/api/src/backend/views/Tasks.py +++ b/src/api/src/backend/views/Tasks.py @@ -60,6 +60,7 @@ def list(self, pipeline, *_, **__): for task_model in task_models: tasks.append(TaskSerializer.serialize(task_model)) except Exception as e: + logger.exception(e.__cause__) return ServerError(f"{e}") return BaseResponse(result=tasks) From 4a3ede046ad4f613c13a9fd4e2dcf442913f19be Mon Sep 17 00:00:00 2001 From: Nathan Freeman Date: Wed, 28 Aug 2024 15:05:49 -0500 Subject: [PATCH 06/38] debug --- src/api/src/backend/views/Tasks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/api/src/backend/views/Tasks.py b/src/api/src/backend/views/Tasks.py index 0c94b938..d1e55880 100644 --- a/src/api/src/backend/views/Tasks.py +++ b/src/api/src/backend/views/Tasks.py @@ -54,9 +54,9 @@ def get(self, request, group_id, pipeline_id, task_id=None): def list(self, pipeline, *_, **__): - task_models = Task.objects.filter(pipeline=pipeline) tasks = [] try: + task_models = Task.objects.filter(pipeline=pipeline) for task_model in task_models: tasks.append(TaskSerializer.serialize(task_model)) except Exception as e: From 2c1d03cdcfe8e1f6c1672615893ad3e53fa61bd3 Mon Sep 17 00:00:00 2001 From: Nathan Freeman Date: Wed, 28 Aug 2024 15:19:36 -0500 Subject: [PATCH 07/38] Add execption handling in get and list tasks --- src/api/src/backend/views/Tasks.py | 54 ++++++++++++++++-------------- 1 file changed, 29 insertions(+), 25 deletions(-) diff --git a/src/api/src/backend/views/Tasks.py b/src/api/src/backend/views/Tasks.py index d1e55880..b810e76b 100644 --- a/src/api/src/backend/views/Tasks.py +++ b/src/api/src/backend/views/Tasks.py @@ -18,37 +18,41 @@ class Tasks(RestrictedAPIView): def get(self, request, group_id, pipeline_id, task_id=None): - # Get the group - group = group_service.get(group_id, request.tenant_id) - if group == None: - return NotFound(f"No group found with id '{group_id}'") + try: + # Get the group + group = group_service.get(group_id, request.tenant_id) + if group == None: + return NotFound(f"No group found with id '{group_id}'") - # Check that the user belongs to the group - if not group_service.user_in_group(request.username, group_id, request.tenant_id): - return Forbidden(message="You do not have access to this group") + # Check that the user belongs to the group + if not group_service.user_in_group(request.username, group_id, request.tenant_id): + return Forbidden(message="You do not have access to this group") - # Get the pipline - pipeline = Pipeline.objects.filter( - group=group, - id=pipeline_id - ).first() + # Get the pipline + pipeline = Pipeline.objects.filter( + group=group, + id=pipeline_id + ).first() - # Return if BadRequest if no pipeline found - if pipeline == None: - return BadRequest(f"Pipline with id '{pipeline_id}' does not exist") - - if task_id == None: - return self.list(pipeline) + # Return if BadRequest if no pipeline found + if pipeline == None: + return BadRequest(f"Pipline with id '{pipeline_id}' does not exist") + + if task_id == None: + return self.list(pipeline) - # Check that the user belongs to the group that is attached - # to this pipline - if not group_service.user_in_group(request.username, group_id, request.tenant_id): - return Forbidden(message="You cannot view tasks for this pipeline") + # Check that the user belongs to the group that is attached + # to this pipline + if not group_service.user_in_group(request.username, group_id, request.tenant_id): + return Forbidden(message="You cannot view tasks for this pipeline") - task = Task.objects.filter(pipeline=pipeline, id=task_id).first() + task = Task.objects.filter(pipeline=pipeline, id=task_id).first() - if task == None: - return NotFound(f"Task with id '{task_id}' does not exists for pipeline '{pipeline_id}'") + if task == None: + return NotFound(f"Task with id '{task_id}' does not exists for pipeline '{pipeline_id}'") + except Exception as e: + logger.exception(e.__cuase__) + return ServerError(str(e)) return ModelResponse(task) From 71f2139539a0a43d60cc6fb107152c6ab152ab44 Mon Sep 17 00:00:00 2001 From: Nathan Freeman Date: Wed, 28 Aug 2024 15:48:22 -0500 Subject: [PATCH 08/38] Return list task result in try block --- src/api/src/backend/views/Tasks.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/src/api/src/backend/views/Tasks.py b/src/api/src/backend/views/Tasks.py index b810e76b..088fbc3f 100644 --- a/src/api/src/backend/views/Tasks.py +++ b/src/api/src/backend/views/Tasks.py @@ -50,25 +50,24 @@ def get(self, request, group_id, pipeline_id, task_id=None): if task == None: return NotFound(f"Task with id '{task_id}' does not exists for pipeline '{pipeline_id}'") + + return ModelResponse(task) except Exception as e: logger.exception(e.__cuase__) return ServerError(str(e)) - return ModelResponse(task) - - def list(self, pipeline, *_, **__): tasks = [] try: task_models = Task.objects.filter(pipeline=pipeline) for task_model in task_models: tasks.append(TaskSerializer.serialize(task_model)) + + return BaseResponse(result=tasks) except Exception as e: logger.exception(e.__cause__) return ServerError(f"{e}") - return BaseResponse(result=tasks) - def post(self, request, group_id, pipeline_id, *_, **__): # Validate the request body if "type" not in self.request_body: From 0a1ad1bb7b68ce23b641a55c57e4835b1e46b4bb Mon Sep 17 00:00:00 2001 From: Nathan Freeman Date: Wed, 28 Aug 2024 16:03:19 -0500 Subject: [PATCH 09/38] Fix bug in ContextSerializer --- src/api/src/backend/serializers/ContextSerializer.py | 4 +++- src/api/src/backend/serializers/CredentialsSerializer.py | 2 +- src/api/src/backend/serializers/DestinationSerializer.py | 2 ++ 3 files changed, 6 insertions(+), 2 deletions(-) diff --git a/src/api/src/backend/serializers/ContextSerializer.py b/src/api/src/backend/serializers/ContextSerializer.py index 6e45a3e3..af0136c5 100644 --- a/src/api/src/backend/serializers/ContextSerializer.py +++ b/src/api/src/backend/serializers/ContextSerializer.py @@ -5,8 +5,10 @@ class ContextSerializer: @staticmethod def serialize(model): + if model == None: + return None context = {} - context["branch"] = model + context["branch"] = model.branch context["credentials"] = CredentialsSerializer.serialize(model.credentials) context["recipe_file_path"] = model.recipe_file_path context["sub_path"] = model.sub_path diff --git a/src/api/src/backend/serializers/CredentialsSerializer.py b/src/api/src/backend/serializers/CredentialsSerializer.py index 71c3fc04..819208ab 100644 --- a/src/api/src/backend/serializers/CredentialsSerializer.py +++ b/src/api/src/backend/serializers/CredentialsSerializer.py @@ -4,10 +4,10 @@ class CredentialsSerializer: @staticmethod def serialize(model): - credential = {} if model == None: return None + credential = {} credential["sk_id"] = model.sk_id credential["owner"] = model.owner credential["created_at"] = model.created_at diff --git a/src/api/src/backend/serializers/DestinationSerializer.py b/src/api/src/backend/serializers/DestinationSerializer.py index a1574d35..e4950745 100644 --- a/src/api/src/backend/serializers/DestinationSerializer.py +++ b/src/api/src/backend/serializers/DestinationSerializer.py @@ -5,6 +5,8 @@ class DestinationSerializer: @staticmethod def serialize(model): + if model == None: + return None destination = {} destination["tag"] = model.tag, destination["type"] = model.type, From c76b001684de381494f8b1560b87110a5a19a528 Mon Sep 17 00:00:00 2001 From: Nathan Freeman Date: Wed, 28 Aug 2024 16:15:57 -0500 Subject: [PATCH 10/38] Add flavor to task serializer --- src/api/src/backend/serializers/BaseTaskSerializer.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/api/src/backend/serializers/BaseTaskSerializer.py b/src/api/src/backend/serializers/BaseTaskSerializer.py index d9f75278..894eda8c 100644 --- a/src/api/src/backend/serializers/BaseTaskSerializer.py +++ b/src/api/src/backend/serializers/BaseTaskSerializer.py @@ -18,6 +18,7 @@ def serialize(model): # Build execution profile task["execution_profile"] = { + "flavor": model.falvor, "invocation_mode": model.invocation_mode, "max_exec_time": model.max_exec_time, "max_retries": model.max_retries, From 3c59aaea3bef52dc31e90dd68c852a3e5b74c76b Mon Sep 17 00:00:00 2001 From: Nathan Freeman Date: Wed, 28 Aug 2024 16:25:00 -0500 Subject: [PATCH 11/38] Add flavor to task serializer --- src/api/src/backend/serializers/BaseTaskSerializer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/api/src/backend/serializers/BaseTaskSerializer.py b/src/api/src/backend/serializers/BaseTaskSerializer.py index 894eda8c..ac4f845c 100644 --- a/src/api/src/backend/serializers/BaseTaskSerializer.py +++ b/src/api/src/backend/serializers/BaseTaskSerializer.py @@ -18,7 +18,7 @@ def serialize(model): # Build execution profile task["execution_profile"] = { - "flavor": model.falvor, + "flavor": model.flavor, "invocation_mode": model.invocation_mode, "max_exec_time": model.max_exec_time, "max_retries": model.max_retries, From 8caae63fd144d83f607a3e6647e875dca350c2b8 Mon Sep 17 00:00:00 2001 From: Nathan Freeman Date: Thu, 29 Aug 2024 10:19:44 -0500 Subject: [PATCH 12/38] Create PipelineSerializer and use in Pipeline get request --- .../backend/serializers/PipelineSerializer.py | 35 +++++++++ src/api/src/backend/views/Pipelines.py | 74 ++++++++++--------- 2 files changed, 76 insertions(+), 33 deletions(-) create mode 100644 src/api/src/backend/serializers/PipelineSerializer.py diff --git a/src/api/src/backend/serializers/PipelineSerializer.py b/src/api/src/backend/serializers/PipelineSerializer.py new file mode 100644 index 00000000..8a9266eb --- /dev/null +++ b/src/api/src/backend/serializers/PipelineSerializer.py @@ -0,0 +1,35 @@ +from backend.serializers.UUIDSerializer import UUIDSerializer +from backend.serializers.TaskSerializer import TaskSerializer + + +class PipelineSerializer: + @staticmethod + def serialize(pipeline_model, task_models): + + pipeline = {} + pipeline["id"] = pipeline_model.id + pipeline["description"] = pipeline_model.description + pipeline["created_at"] = pipeline_model.created_at + pipeline["updated_at"] = pipeline_model.updated_at + pipeline["env"] = pipeline_model.env + pipeline["params"] = pipeline_model.params + pipeline["uses"] = pipeline_model.uses + pipeline["enabled"] = pipeline_model.enabled + pipeline["owner"] = pipeline_model.owner + pipeline["execution_profile"] = { + "max_exec_time": pipeline_model.max_exec_time, + "max_retries": pipeline_model.max_retries, + "invocation_mode": pipeline_model.invocation_mode, + "lock_expiration_policy": pipeline_model.lock_expiration_policy, + "duplicate_submission_policy": pipeline_model.duplicate_submission_policy, + "retry_policy": pipeline_model.retry_policy + } + pipeline["uuid"] = UUIDSerializer.serialize(pipeline_model.uuid) + pipeline["tasks"] = [ TaskSerializer.serialize(t) for t in task_models ] + + # TODO Remove when certain these properties are not needed + pipeline["group"] = UUIDSerializer.serialize(pipeline_model.group) + pipeline["current_run"] = UUIDSerializer.serialize(pipeline_model.current_run) + pipeline["last_run"] = UUIDSerializer.serialize(pipeline_model.last_run) + + return pipeline \ No newline at end of file diff --git a/src/api/src/backend/views/Pipelines.py b/src/api/src/backend/views/Pipelines.py index 7f2a267b..a49a5c7b 100644 --- a/src/api/src/backend/views/Pipelines.py +++ b/src/api/src/backend/views/Pipelines.py @@ -2,6 +2,7 @@ from pydantic import ValidationError from django.db import DatabaseError, IntegrityError, OperationalError from django.forms import model_to_dict +from backend.utils import logger from backend.views.RestrictedAPIView import RestrictedAPIView from backend.views.http.responses.errors import ( @@ -25,6 +26,7 @@ from backend.services.GroupService import service as group_service from backend.errors.api import BadRequestError, ServerError from backend.helpers import resource_url_builder +from backend.serializers import PipelineSerializer PIPELINE_TYPE_CI = "ci" @@ -38,42 +40,48 @@ class Pipelines(RestrictedAPIView): def get(self, request, group_id, pipeline_id=None): - # Get the group - group = group_service.get(group_id, request.tenant_id) - if group == None: - return NotFound(f"No group found with id '{group_id}'") - - # Check that the user belongs to the group - if not group_service.user_in_group(request.username, group_id, request.tenant_id): - return Forbidden(message="You do not have access to this group") - - # Get a list of all pipelines that belong to the user's groups - # if no id is provided - if pipeline_id == None: - return self.list(group) - - # Get the pipeline by the id provided in the path params - pipeline = PipelineModel.objects.filter( - id=pipeline_id, - group=group - ).prefetch_related("tasks").first() - - if pipeline == None: - return NotFound(f"Pipeline not found with id '{pipeline_id}'") - - # Get the pipeline tasks. - tasks = pipeline.tasks.all() - tasks_result = [ model_to_dict(task) for task in tasks ] + try: + # Get the group + group = group_service.get(group_id, request.tenant_id) + if group == None: + return NotFound(f"No group found with id '{group_id}'") + + # Check that the user belongs to the group + if not group_service.user_in_group(request.username, group_id, request.tenant_id): + return Forbidden(message="You do not have access to this group") + + # Get a list of all pipelines that belong to the user's groups + # if no id is provided + if pipeline_id == None: + return self.list(group) + + # Get the pipeline by the id provided in the path params + pipeline = PipelineModel.objects.filter( + id=pipeline_id, + group=group + ).prefetch_related("tasks").first() + + if pipeline == None: + return NotFound(f"Pipeline not found with id '{pipeline_id}'") + + # Get the pipeline tasks. + tasks = pipeline.tasks.all() - # Convert model into a dict an - result = model_to_dict(pipeline) - result["tasks"] = tasks_result - - return BaseResponse(result=result) + # Convert pipeline and task models into a dict + result = PipelineSerializer.serialize(pipeline, tasks) + + return BaseResponse(result=result) + except Exception as e: + logger.exception(e.__cause__) + return ServerErrorResp(str(e)) def list(self, group): - pipelines = PipelineModel.objects.filter(group=group) - return ModelListResponse(pipelines) + try: + pipelines = PipelineModel.objects.filter(group=group) + return ModelListResponse(pipelines) + except Exception as e: + logger.exception(e.__cause__) + return ServerErrorResp(str(e)) def post(self, request, group_id, *_, **__): """Pipeline requests with type 'ci' are supported in order to make the From 3eb141735a1b55f744bcc49b004cb6467f7c07d2 Mon Sep 17 00:00:00 2001 From: Nathan Freeman Date: Thu, 29 Aug 2024 10:22:31 -0500 Subject: [PATCH 13/38] Serialize pipeline models in listModels, make task_models optional in PipelineSerializer --- src/api/src/backend/serializers/PipelineSerializer.py | 7 +++++-- src/api/src/backend/views/Pipelines.py | 6 ++++-- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/src/api/src/backend/serializers/PipelineSerializer.py b/src/api/src/backend/serializers/PipelineSerializer.py index 8a9266eb..4273418d 100644 --- a/src/api/src/backend/serializers/PipelineSerializer.py +++ b/src/api/src/backend/serializers/PipelineSerializer.py @@ -4,7 +4,7 @@ class PipelineSerializer: @staticmethod - def serialize(pipeline_model, task_models): + def serialize(pipeline_model, task_models=None): pipeline = {} pipeline["id"] = pipeline_model.id @@ -25,7 +25,10 @@ def serialize(pipeline_model, task_models): "retry_policy": pipeline_model.retry_policy } pipeline["uuid"] = UUIDSerializer.serialize(pipeline_model.uuid) - pipeline["tasks"] = [ TaskSerializer.serialize(t) for t in task_models ] + + # Serialize the task models + if task_models != None: + pipeline["tasks"] = [ TaskSerializer.serialize(t) for t in task_models ] # TODO Remove when certain these properties are not needed pipeline["group"] = UUIDSerializer.serialize(pipeline_model.group) diff --git a/src/api/src/backend/views/Pipelines.py b/src/api/src/backend/views/Pipelines.py index a49a5c7b..34246b17 100644 --- a/src/api/src/backend/views/Pipelines.py +++ b/src/api/src/backend/views/Pipelines.py @@ -77,8 +77,10 @@ def get(self, request, group_id, pipeline_id=None): def list(self, group): try: - pipelines = PipelineModel.objects.filter(group=group) - return ModelListResponse(pipelines) + pipeline_models = PipelineModel.objects.filter(group=group) + result = [PipelineSerializer.serialize(p) for p in pipeline_models] + + return BaseResponse(result=result) except Exception as e: logger.exception(e.__cause__) return ServerErrorResp(str(e)) From 378cd3792b8eb3748e98142ed04475c8f04eb6bb Mon Sep 17 00:00:00 2001 From: Nathan Freeman Date: Thu, 29 Aug 2024 10:30:07 -0500 Subject: [PATCH 14/38] Properly import PipelineSerializer --- src/api/src/backend/serializers/PipelineSerializer.py | 1 - src/api/src/backend/serializers/__init__.py | 3 ++- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/api/src/backend/serializers/PipelineSerializer.py b/src/api/src/backend/serializers/PipelineSerializer.py index 4273418d..86b9bad1 100644 --- a/src/api/src/backend/serializers/PipelineSerializer.py +++ b/src/api/src/backend/serializers/PipelineSerializer.py @@ -5,7 +5,6 @@ class PipelineSerializer: @staticmethod def serialize(pipeline_model, task_models=None): - pipeline = {} pipeline["id"] = pipeline_model.id pipeline["description"] = pipeline_model.description diff --git a/src/api/src/backend/serializers/__init__.py b/src/api/src/backend/serializers/__init__.py index 16ae9984..6ed429ca 100644 --- a/src/api/src/backend/serializers/__init__.py +++ b/src/api/src/backend/serializers/__init__.py @@ -13,4 +13,5 @@ from backend.serializers.RequestTaskSerializer import RequestTaskSerializer from backend.serializers.ContextSerializer import ContextSerializer from backend.serializers.DestinationSerializer import DestinationSerializer -from backend.serializers.CredentialsSerializer import CredentialsSerializer \ No newline at end of file +from backend.serializers.CredentialsSerializer import CredentialsSerializer +from backend.serializers.PipelineSerializer import PipelineSerializer \ No newline at end of file From c9335c9ef8fb728450050136f0c3873406884649 Mon Sep 17 00:00:00 2001 From: Nathan Freeman Date: Thu, 29 Aug 2024 10:36:44 -0500 Subject: [PATCH 15/38] remove group, last_run, current_run from serialized pipeline --- src/api/src/backend/serializers/PipelineSerializer.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/api/src/backend/serializers/PipelineSerializer.py b/src/api/src/backend/serializers/PipelineSerializer.py index 86b9bad1..695a014d 100644 --- a/src/api/src/backend/serializers/PipelineSerializer.py +++ b/src/api/src/backend/serializers/PipelineSerializer.py @@ -30,8 +30,8 @@ def serialize(pipeline_model, task_models=None): pipeline["tasks"] = [ TaskSerializer.serialize(t) for t in task_models ] # TODO Remove when certain these properties are not needed - pipeline["group"] = UUIDSerializer.serialize(pipeline_model.group) - pipeline["current_run"] = UUIDSerializer.serialize(pipeline_model.current_run) - pipeline["last_run"] = UUIDSerializer.serialize(pipeline_model.last_run) + # pipeline["group"] = UUIDSerializer.serialize(pipeline_model.group) + # pipeline["current_run"] = UUIDSerializer.serialize(pipeline_model.current_run) + # pipeline["last_run"] = UUIDSerializer.serialize(pipeline_model.last_run) return pipeline \ No newline at end of file From 329eba97af9d08a150aa681e363c83d5a02204ef Mon Sep 17 00:00:00 2001 From: Nathan Freeman Date: Thu, 29 Aug 2024 17:08:31 -0500 Subject: [PATCH 16/38] Add group back to pipeline serializer --- src/api/src/backend/serializers/PipelineSerializer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/api/src/backend/serializers/PipelineSerializer.py b/src/api/src/backend/serializers/PipelineSerializer.py index 695a014d..cb340ac8 100644 --- a/src/api/src/backend/serializers/PipelineSerializer.py +++ b/src/api/src/backend/serializers/PipelineSerializer.py @@ -30,7 +30,7 @@ def serialize(pipeline_model, task_models=None): pipeline["tasks"] = [ TaskSerializer.serialize(t) for t in task_models ] # TODO Remove when certain these properties are not needed - # pipeline["group"] = UUIDSerializer.serialize(pipeline_model.group) + pipeline["group"] = UUIDSerializer.serialize(pipeline_model.group) # pipeline["current_run"] = UUIDSerializer.serialize(pipeline_model.current_run) # pipeline["last_run"] = UUIDSerializer.serialize(pipeline_model.last_run) From b2fd8739c246f5b89c86a42af44d20f47d562d39 Mon Sep 17 00:00:00 2001 From: Nathan Freeman Date: Thu, 29 Aug 2024 17:09:24 -0500 Subject: [PATCH 17/38] with debug --- src/api/src/backend/serializers/PipelineSerializer.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/api/src/backend/serializers/PipelineSerializer.py b/src/api/src/backend/serializers/PipelineSerializer.py index cb340ac8..012a6c11 100644 --- a/src/api/src/backend/serializers/PipelineSerializer.py +++ b/src/api/src/backend/serializers/PipelineSerializer.py @@ -30,6 +30,8 @@ def serialize(pipeline_model, task_models=None): pipeline["tasks"] = [ TaskSerializer.serialize(t) for t in task_models ] # TODO Remove when certain these properties are not needed + print(type(pipeline_model.group)) + print(pipeline_model.group) pipeline["group"] = UUIDSerializer.serialize(pipeline_model.group) # pipeline["current_run"] = UUIDSerializer.serialize(pipeline_model.current_run) # pipeline["last_run"] = UUIDSerializer.serialize(pipeline_model.last_run) From 5d294c0e1f236743c027f06f614332d10cd0d8ad Mon Sep 17 00:00:00 2001 From: Nathan Freeman Date: Thu, 29 Aug 2024 17:43:33 -0500 Subject: [PATCH 18/38] Fix group serialization in pipeline serializer --- src/api/src/backend/serializers/PipelineSerializer.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/api/src/backend/serializers/PipelineSerializer.py b/src/api/src/backend/serializers/PipelineSerializer.py index 012a6c11..60b4a4ed 100644 --- a/src/api/src/backend/serializers/PipelineSerializer.py +++ b/src/api/src/backend/serializers/PipelineSerializer.py @@ -30,9 +30,7 @@ def serialize(pipeline_model, task_models=None): pipeline["tasks"] = [ TaskSerializer.serialize(t) for t in task_models ] # TODO Remove when certain these properties are not needed - print(type(pipeline_model.group)) - print(pipeline_model.group) - pipeline["group"] = UUIDSerializer.serialize(pipeline_model.group) + pipeline["group"] = UUIDSerializer.serialize(pipeline_model.group.uuid) # pipeline["current_run"] = UUIDSerializer.serialize(pipeline_model.current_run) # pipeline["last_run"] = UUIDSerializer.serialize(pipeline_model.last_run) From ecd8659420496b5ab8461d6f6ede739de23576ae Mon Sep 17 00:00:00 2001 From: Nathan Freeman Date: Tue, 3 Sep 2024 10:37:12 -0500 Subject: [PATCH 19/38] debug destination serializer --- src/api/specs/WorkflowsAPI.yaml | 106 ++++++++++++------ .../serializers/ImageBuildTaskSerializer.py | 3 + 2 files changed, 75 insertions(+), 34 deletions(-) diff --git a/src/api/specs/WorkflowsAPI.yaml b/src/api/specs/WorkflowsAPI.yaml index e2e3130a..f5566114 100644 --- a/src/api/specs/WorkflowsAPI.yaml +++ b/src/api/specs/WorkflowsAPI.yaml @@ -2517,6 +2517,78 @@ components: destination: $ref: '#/components/schemas/Destination' + Context: + type: object + required: + - type + properties: + branch: + type: string + credentials: + default: null + build_file_path: + type: string + sub_path: + type: string + type: + $ref: "#/components/schemas/EnumContextType" + url: + type: string + visibility: + $ref: "#/components/schemas/EnumContextVisibility" + + + Destination: + anyOf: + - $ref: '#/components/schemas/DockerhubDestination' + - $ref: '#/components/schemas/LocalDestination' + discriminator: + propertyName: type + mapping: + dockerhub: "#/components/schemas/DockerhubDestination" + local: "#/components/schemas/LocalDestination" + + BaseDestination: + type: object + required: + - type + properties: + type: + $ref: '#/components/schemas/EnumDestinationType' + + LocalDestination: + allOf: + - $ref: "#/components/schemas/BaseDestination" + - type: object + properties: + filename: + type: string + + RegistryDestination: + allOf: + - $ref: "#/components/schemas/BaseDestination" + - type: object + properties: + credentials: + $ref: '#/components/schemas/DockerhubCred' + tag: + type: string + url: + type: string + + DockerhubDestination: + allOf: + - $ref: '#/components/schemas/BaseDestination' + - $ref: '#/components/schemas/RegistryDestination' + + DockerhubCred: + type: object + properties: + token: + type: string + username: + type: string + RequestTask: allOf: - $ref: '#/components/schemas/BaseTask' @@ -2611,40 +2683,6 @@ components: type: boolean can_skip: type: boolean - - # --- Context --- - Context: - type: object - required: - - type - properties: - branch: - type: string - credentials: - default: null - build_file_path: - type: string - sub_path: - type: string - type: - $ref: "#/components/schemas/EnumContextType" - url: - type: string - visibility: - $ref: "#/components/schemas/EnumContextVisibility" - - # --- Destination --- - Destination: - type: object - properties: - credentials: - default: null - tag: - type: string - type: - $ref: '#/components/schemas/EnumDestinationType' - url: - type: string # --- Group and GroupUser --- GroupDetail: diff --git a/src/api/src/backend/serializers/ImageBuildTaskSerializer.py b/src/api/src/backend/serializers/ImageBuildTaskSerializer.py index 21834455..a7207c5d 100644 --- a/src/api/src/backend/serializers/ImageBuildTaskSerializer.py +++ b/src/api/src/backend/serializers/ImageBuildTaskSerializer.py @@ -1,6 +1,7 @@ from backend.serializers import BaseTaskSerializer from backend.serializers.ContextSerializer import ContextSerializer from backend.serializers.DestinationSerializer import DestinationSerializer +from pprint import pprint class ImageBuildTaskSerializer: @@ -11,5 +12,7 @@ def serialize(model, base=None): task["builder"] = model.builder task["context"] = ContextSerializer.serialize(model.context) task["destination"] = DestinationSerializer.serialize(model.destination) + + pprint(task["destination"]) return task \ No newline at end of file From 1b4c9106f929d68533b1b4240fe2fbae52e09df9 Mon Sep 17 00:00:00 2001 From: Nathan Freeman Date: Tue, 3 Sep 2024 11:03:53 -0500 Subject: [PATCH 20/38] debug destination serializer --- src/api/src/backend/serializers/DestinationSerializer.py | 5 +++++ src/api/src/backend/serializers/ImageBuildTaskSerializer.py | 3 --- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/src/api/src/backend/serializers/DestinationSerializer.py b/src/api/src/backend/serializers/DestinationSerializer.py index e4950745..801ba565 100644 --- a/src/api/src/backend/serializers/DestinationSerializer.py +++ b/src/api/src/backend/serializers/DestinationSerializer.py @@ -14,5 +14,10 @@ def serialize(model): destination["filename"] = model.filename, destination["credentials"] = CredentialsSerializer.serialize(model.credentials) destination["uuid"] = UUIDSerializer.serialize(model.uuid) + + print("tag", model.tag, type(model.tag)) + print("type", model.type, type(model.type)) + print("url", model.url, type(model.url)) + print("filename", model.filename, type(model.filename)) return destination \ No newline at end of file diff --git a/src/api/src/backend/serializers/ImageBuildTaskSerializer.py b/src/api/src/backend/serializers/ImageBuildTaskSerializer.py index a7207c5d..21834455 100644 --- a/src/api/src/backend/serializers/ImageBuildTaskSerializer.py +++ b/src/api/src/backend/serializers/ImageBuildTaskSerializer.py @@ -1,7 +1,6 @@ from backend.serializers import BaseTaskSerializer from backend.serializers.ContextSerializer import ContextSerializer from backend.serializers.DestinationSerializer import DestinationSerializer -from pprint import pprint class ImageBuildTaskSerializer: @@ -12,7 +11,5 @@ def serialize(model, base=None): task["builder"] = model.builder task["context"] = ContextSerializer.serialize(model.context) task["destination"] = DestinationSerializer.serialize(model.destination) - - pprint(task["destination"]) return task \ No newline at end of file From 57523ef4835d59b8520547c487bcb4d556ca3283 Mon Sep 17 00:00:00 2001 From: Nathan Freeman Date: Tue, 3 Sep 2024 12:57:43 -0500 Subject: [PATCH 21/38] more debug --- src/api/src/backend/serializers/DestinationSerializer.py | 1 + src/api/src/backend/serializers/ImageBuildTaskSerializer.py | 6 ++++++ 2 files changed, 7 insertions(+) diff --git a/src/api/src/backend/serializers/DestinationSerializer.py b/src/api/src/backend/serializers/DestinationSerializer.py index 801ba565..74b90f4b 100644 --- a/src/api/src/backend/serializers/DestinationSerializer.py +++ b/src/api/src/backend/serializers/DestinationSerializer.py @@ -19,5 +19,6 @@ def serialize(model): print("type", model.type, type(model.type)) print("url", model.url, type(model.url)) print("filename", model.filename, type(model.filename)) + print("destination", destination) return destination \ No newline at end of file diff --git a/src/api/src/backend/serializers/ImageBuildTaskSerializer.py b/src/api/src/backend/serializers/ImageBuildTaskSerializer.py index 21834455..a0c672f9 100644 --- a/src/api/src/backend/serializers/ImageBuildTaskSerializer.py +++ b/src/api/src/backend/serializers/ImageBuildTaskSerializer.py @@ -1,6 +1,7 @@ from backend.serializers import BaseTaskSerializer from backend.serializers.ContextSerializer import ContextSerializer from backend.serializers.DestinationSerializer import DestinationSerializer +from pprint import pprint class ImageBuildTaskSerializer: @@ -11,5 +12,10 @@ def serialize(model, base=None): task["builder"] = model.builder task["context"] = ContextSerializer.serialize(model.context) task["destination"] = DestinationSerializer.serialize(model.destination) + + print("after dest serializer: ") + pprint(task.destination) + print("task") + pprint(task) return task \ No newline at end of file From 0d978baa7d4b1027836c6ab0917ed32757265168 Mon Sep 17 00:00:00 2001 From: Nathan Freeman Date: Tue, 3 Sep 2024 13:06:56 -0500 Subject: [PATCH 22/38] Fix destination serializer --- .../backend/serializers/DestinationSerializer.py | 14 ++++---------- .../serializers/ImageBuildTaskSerializer.py | 5 ----- 2 files changed, 4 insertions(+), 15 deletions(-) diff --git a/src/api/src/backend/serializers/DestinationSerializer.py b/src/api/src/backend/serializers/DestinationSerializer.py index 74b90f4b..eba826f9 100644 --- a/src/api/src/backend/serializers/DestinationSerializer.py +++ b/src/api/src/backend/serializers/DestinationSerializer.py @@ -8,17 +8,11 @@ def serialize(model): if model == None: return None destination = {} - destination["tag"] = model.tag, - destination["type"] = model.type, - destination["url"] = model.url, - destination["filename"] = model.filename, + destination["tag"] = model.tag + destination["type"] = model.type + destination["url"] = model.url + destination["filename"] = model.filename destination["credentials"] = CredentialsSerializer.serialize(model.credentials) destination["uuid"] = UUIDSerializer.serialize(model.uuid) - - print("tag", model.tag, type(model.tag)) - print("type", model.type, type(model.type)) - print("url", model.url, type(model.url)) - print("filename", model.filename, type(model.filename)) - print("destination", destination) return destination \ No newline at end of file diff --git a/src/api/src/backend/serializers/ImageBuildTaskSerializer.py b/src/api/src/backend/serializers/ImageBuildTaskSerializer.py index a0c672f9..d999d0dd 100644 --- a/src/api/src/backend/serializers/ImageBuildTaskSerializer.py +++ b/src/api/src/backend/serializers/ImageBuildTaskSerializer.py @@ -12,10 +12,5 @@ def serialize(model, base=None): task["builder"] = model.builder task["context"] = ContextSerializer.serialize(model.context) task["destination"] = DestinationSerializer.serialize(model.destination) - - print("after dest serializer: ") - pprint(task.destination) - print("task") - pprint(task) return task \ No newline at end of file From 114959f587cac694ae5a01de4d763ef0141d7b89 Mon Sep 17 00:00:00 2001 From: Nathan Freeman Date: Tue, 3 Sep 2024 15:01:04 -0500 Subject: [PATCH 23/38] Use Task serializer in pipeline dispatch request builder --- .../helpers/PipelineDispatchRequestBuilder.py | 67 ++++++------------- src/api/src/backend/views/http/requests.py | 1 - src/engine/src/owe_python_sdk/schema.py | 1 - 3 files changed, 19 insertions(+), 50 deletions(-) diff --git a/src/api/src/backend/helpers/PipelineDispatchRequestBuilder.py b/src/api/src/backend/helpers/PipelineDispatchRequestBuilder.py index 2130b2d4..f9c08877 100644 --- a/src/api/src/backend/helpers/PipelineDispatchRequestBuilder.py +++ b/src/api/src/backend/helpers/PipelineDispatchRequestBuilder.py @@ -3,6 +3,7 @@ from backend.utils.parse_directives import parse_directives as parse from backend.conf.constants import WORKFLOW_EXECUTOR_ACCESS_TOKEN +from backend.serializers import TaskSerializer class PipelineDispatchRequestBuilder: @@ -27,12 +28,11 @@ def build( # Convert tasks' schema to the schema expected by the workflow engine tasks_request = [] for task in tasks: - # Convert the task to a dict and add the execution profile property - # for all tasks. - preprocessed_task = self._preprocess_task(task) + # Convert the task to a dict + serialized_task = TaskSerializer.serialze(task) # Handle the task-specific schema conversions - task_request = getattr(self, f"_{task.type}")(preprocessed_task, task) + task_request = getattr(self, f"_{task.type}")(serialized_task, task) tasks_request.append(task_request) # Get the archives for this pipeline @@ -47,7 +47,6 @@ def build( # Convert pipleline to a dict and build the request request = {} - request["group"] = model_to_dict(group) request["pipeline"] = model_to_dict(pipeline) request["archives"] = archives @@ -85,13 +84,17 @@ def build( } request["meta"] = {} + + # Add the group to the meta so it can be used to determine the idemp key + request["meta"]["group"] = model_to_dict(group) + # Properties to help uniquely identity a pipeline submission. If the workflow # executor is currently running a pipeline with the same values as the # properties provided in "idempotency_key", the workflow executor # will then take the appropriate action dictated by the # pipeline.duplicate_submission_policy (allow, allow_terminate, deny) request["meta"]["idempotency_key"] = [ - "group.id", + "meta.group.id", "args.tapis_tenant_id", "pipeline.id" ] @@ -106,36 +109,27 @@ def build( request["pipeline_run"]["name"] = name or uuid request["pipeline_run"]["description"] = description - # Parse the directives from the commit message - directives_request = {} - if commit != None: - directives_request = parse(commit) + # # Parse the directives from the commit message + # directives_request = {} + # if commit != None: + # directives_request = parse(commit) - if directives != None and len(directives) > 0: - directive_str = f"[{'|'.join([d for d in directives])}]" - directives_request = parse(directive_str) + # if directives != None and len(directives) > 0: + # directive_str = f"[{'|'.join([d for d in directives])}]" + # directives_request = parse(directive_str) - request["directives"] = directives_request + # request["directives"] = directives_request + + request["directives"] = {} return request def _image_build(self, task_request, task): - task_request["context"] = model_to_dict(task.context) - - # Resolve which context credentials to use if any provided context_creds = None if task.context.credentials != None: context_creds = task.context.credentials - - # Identity takes precedence over credentials placed directly in - # the context - if task.context.identity != None: - context_creds = task.context.identity.credentials - task_request["context"]["credentials"] = None if context_creds != None: - task_request["context"]["credentials"] = model_to_dict(context_creds) - # Get the context credentials data context_cred_data = self.secret_service.get_secret(context_creds.sk_id) task_request["context"]["credentials"] = context_cred_data @@ -146,18 +140,11 @@ def _image_build(self, task_request, task): del task_request["context"]["recipe_file_path"] # Destination credentials - task_request["destination"] = model_to_dict(task.destination) - destination_creds = None if task.destination.credentials != None: destination_creds = task.destination.credentials - if task.destination.identity != None: - destination_creds = task.destination.identity.credentials - if destination_creds != None: - task_request["destination"]["credentials"] = model_to_dict(destination_creds) - # Get the context credentials data destination_cred_data = self.secret_service.get_secret(destination_creds.sk_id) task_request["destination"]["credentials"] = destination_cred_data @@ -183,20 +170,4 @@ def _function(self, task_request, _): return task_request def _container_run(self, task_request, _): - return task_request - - def _preprocess_task(self, task): - # Convert to dict - task_request = model_to_dict(task) - - # Map task data model properties to the workflow engine expected schema for - # the execution profile - task_request["execution_profile"] = { - "flavor": task_request["flavor"], - "max_exec_time": task_request["max_exec_time"], - "max_retries": task_request["max_retries"], - "invocation_mode": task_request["invocation_mode"], - "retry_policy": task_request["retry_policy"] - } - return task_request \ No newline at end of file diff --git a/src/api/src/backend/views/http/requests.py b/src/api/src/backend/views/http/requests.py index 68ad3816..9e0672af 100644 --- a/src/api/src/backend/views/http/requests.py +++ b/src/api/src/backend/views/http/requests.py @@ -835,7 +835,6 @@ class WorkflowSubmissionRequest(BaseModel): archives: List[Archive] = [] env: Env = {} args: Args = {} - group: Group pipeline: Pipeline pipeline_run: PipelineRun meta: WorkflowSubmissionRequestMeta diff --git a/src/engine/src/owe_python_sdk/schema.py b/src/engine/src/owe_python_sdk/schema.py index 68ad3816..9e0672af 100644 --- a/src/engine/src/owe_python_sdk/schema.py +++ b/src/engine/src/owe_python_sdk/schema.py @@ -835,7 +835,6 @@ class WorkflowSubmissionRequest(BaseModel): archives: List[Archive] = [] env: Env = {} args: Args = {} - group: Group pipeline: Pipeline pipeline_run: PipelineRun meta: WorkflowSubmissionRequestMeta From 6a1f22a3f1a0e8d342a2be15dfb9968b2f4d1d6b Mon Sep 17 00:00:00 2001 From: Nathan Freeman Date: Tue, 3 Sep 2024 15:37:39 -0500 Subject: [PATCH 24/38] Use pipeline serializer in request builder, refactor. --- .../helpers/PipelineDispatchRequestBuilder.py | 38 +++++++------------ 1 file changed, 13 insertions(+), 25 deletions(-) diff --git a/src/api/src/backend/helpers/PipelineDispatchRequestBuilder.py b/src/api/src/backend/helpers/PipelineDispatchRequestBuilder.py index f9c08877..1a17aa05 100644 --- a/src/api/src/backend/helpers/PipelineDispatchRequestBuilder.py +++ b/src/api/src/backend/helpers/PipelineDispatchRequestBuilder.py @@ -3,7 +3,7 @@ from backend.utils.parse_directives import parse_directives as parse from backend.conf.constants import WORKFLOW_EXECUTOR_ACCESS_TOKEN -from backend.serializers import TaskSerializer +from backend.serializers import TaskSerializer, PipelineSerializer class PipelineDispatchRequestBuilder: @@ -35,36 +35,24 @@ def build( task_request = getattr(self, f"_{task.type}")(serialized_task, task) tasks_request.append(task_request) - # Get the archives for this pipeline - archives = [] - pipeline_archives = pipeline.archives.all() - for pipeline_archive in pipeline_archives: - # Fetch any credentials or identities for required to - # access this archive - # TODO Handle creds/identity for archives - archives.append(model_to_dict(pipeline_archive.archive)) - # Convert pipleline to a dict and build the request + # Serialize the pipeline and it's tasks request = {} - - request["pipeline"] = model_to_dict(pipeline) - request["archives"] = archives - - # Move the execution profile props from the pipeline object to the - # execution profile property - request["pipeline"]["execution_profile"] = { - "max_exec_time": request["pipeline"]["max_exec_time"], - "duplicate_submission_policy": request["pipeline"]["duplicate_submission_policy"], - "max_retries": request["pipeline"]["max_retries"], - "invocation_mode": request["pipeline"]["invocation_mode"], - "retry_policy": request["pipeline"]["retry_policy"] - } - + request["pipeline"] = PipelineSerializer.serialize(pipeline) request["pipeline"]["tasks"] = tasks_request - # Populate the env for this request. Populate values from SK + # Populate the env for this request. request["env"] = request["pipeline"]["env"] + # Serialize the archives + request["archives"] = [] + pipeline_archives = pipeline.archives.all() + for pipeline_archive in pipeline_archives: + # Fetch any credentials or identities for required to + # access this archive + # TODO Handle creds/identity for archives + request["archives"].append(model_to_dict(pipeline_archive.archive)) + req_args = {} for key in args: req_args[key] = args[key].dict() From 0fdd3796c3b55118ce9be7e67ec4215aecf43337 Mon Sep 17 00:00:00 2001 From: Nathan Freeman Date: Wed, 4 Sep 2024 11:21:45 -0500 Subject: [PATCH 25/38] Bugfix: misspelling in PDRB --- src/api/src/backend/helpers/PipelineDispatchRequestBuilder.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/api/src/backend/helpers/PipelineDispatchRequestBuilder.py b/src/api/src/backend/helpers/PipelineDispatchRequestBuilder.py index 1a17aa05..3afdd4e8 100644 --- a/src/api/src/backend/helpers/PipelineDispatchRequestBuilder.py +++ b/src/api/src/backend/helpers/PipelineDispatchRequestBuilder.py @@ -29,7 +29,7 @@ def build( tasks_request = [] for task in tasks: # Convert the task to a dict - serialized_task = TaskSerializer.serialze(task) + serialized_task = TaskSerializer.serialize(task) # Handle the task-specific schema conversions task_request = getattr(self, f"_{task.type}")(serialized_task, task) From f7a97e9a9f696e77cdc50fdf8e9f5156b1c0daba Mon Sep 17 00:00:00 2001 From: Nathan Freeman Date: Wed, 4 Sep 2024 11:57:35 -0500 Subject: [PATCH 26/38] Fix ref before assignment in request builder --- src/api/src/backend/helpers/PipelineDispatchRequestBuilder.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/api/src/backend/helpers/PipelineDispatchRequestBuilder.py b/src/api/src/backend/helpers/PipelineDispatchRequestBuilder.py index 3afdd4e8..c37785ba 100644 --- a/src/api/src/backend/helpers/PipelineDispatchRequestBuilder.py +++ b/src/api/src/backend/helpers/PipelineDispatchRequestBuilder.py @@ -63,7 +63,7 @@ def build( "value": WORKFLOW_EXECUTOR_ACCESS_TOKEN }, "tapis_tenant_id": { - "value": request["group"]["tenant_id"] + "value": group.tenant_id }, "tapis_pipeline_owner": { "value": request["pipeline"]["owner"] From 0e737a101feada6a141f9be30e4c25cd22d34025 Mon Sep 17 00:00:00 2001 From: Nathan Freeman Date: Wed, 4 Sep 2024 12:22:24 -0500 Subject: [PATCH 27/38] Move group_id to args in pipeline dispatch request builder --- .../src/backend/helpers/PipelineDispatchRequestBuilder.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/api/src/backend/helpers/PipelineDispatchRequestBuilder.py b/src/api/src/backend/helpers/PipelineDispatchRequestBuilder.py index c37785ba..50e1079c 100644 --- a/src/api/src/backend/helpers/PipelineDispatchRequestBuilder.py +++ b/src/api/src/backend/helpers/PipelineDispatchRequestBuilder.py @@ -68,21 +68,21 @@ def build( "tapis_pipeline_owner": { "value": request["pipeline"]["owner"] }, + "tapis_workflows_group_id": { + "value": group.id + }, **req_args } request["meta"] = {} - # Add the group to the meta so it can be used to determine the idemp key - request["meta"]["group"] = model_to_dict(group) - # Properties to help uniquely identity a pipeline submission. If the workflow # executor is currently running a pipeline with the same values as the # properties provided in "idempotency_key", the workflow executor # will then take the appropriate action dictated by the # pipeline.duplicate_submission_policy (allow, allow_terminate, deny) request["meta"]["idempotency_key"] = [ - "meta.group.id", + "args.tapis_workflows_group_id", "args.tapis_tenant_id", "pipeline.id" ] From a64bf1986fd99006e8a08674d0797eded0611f98 Mon Sep 17 00:00:00 2001 From: Nathan Freeman Date: Wed, 4 Sep 2024 12:34:52 -0500 Subject: [PATCH 28/38] Add last and current run back to pipeline serializer --- src/api/src/backend/serializers/PipelineSerializer.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/api/src/backend/serializers/PipelineSerializer.py b/src/api/src/backend/serializers/PipelineSerializer.py index 60b4a4ed..d6602252 100644 --- a/src/api/src/backend/serializers/PipelineSerializer.py +++ b/src/api/src/backend/serializers/PipelineSerializer.py @@ -31,7 +31,7 @@ def serialize(pipeline_model, task_models=None): # TODO Remove when certain these properties are not needed pipeline["group"] = UUIDSerializer.serialize(pipeline_model.group.uuid) - # pipeline["current_run"] = UUIDSerializer.serialize(pipeline_model.current_run) - # pipeline["last_run"] = UUIDSerializer.serialize(pipeline_model.last_run) + pipeline["current_run"] = UUIDSerializer.serialize(pipeline_model.current_run.uuid) + pipeline["last_run"] = UUIDSerializer.serialize(pipeline_model.last_run.uuid) return pipeline \ No newline at end of file From 3d23af141781642d551c61f322561000d2d3f6f2 Mon Sep 17 00:00:00 2001 From: Nathan Freeman Date: Wed, 4 Sep 2024 12:36:59 -0500 Subject: [PATCH 29/38] Remove all references to groups --- src/engine/src/owe_python_sdk/TaskExecutor.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/engine/src/owe_python_sdk/TaskExecutor.py b/src/engine/src/owe_python_sdk/TaskExecutor.py index d06e7c85..037db081 100644 --- a/src/engine/src/owe_python_sdk/TaskExecutor.py +++ b/src/engine/src/owe_python_sdk/TaskExecutor.py @@ -27,7 +27,6 @@ def __init__(self, task, ctx, exchange: EventExchange, plugins=[]): self.ctx = ctx self.task = task self.pipeline = self.ctx.pipeline - self.group = self.ctx.group self.directives = self.ctx.directives self.polling_interval = DEFAULT_POLLING_INTERVAL self._resources: list[Resource] = [] From db39fd2a388ef1c982222f5325c8adcd8e54fc33 Mon Sep 17 00:00:00 2001 From: Nathan Freeman Date: Wed, 4 Sep 2024 12:47:39 -0500 Subject: [PATCH 30/38] Fix pipeline serializer --- src/api/src/backend/serializers/PipelineSerializer.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/src/api/src/backend/serializers/PipelineSerializer.py b/src/api/src/backend/serializers/PipelineSerializer.py index d6602252..1bc31c5a 100644 --- a/src/api/src/backend/serializers/PipelineSerializer.py +++ b/src/api/src/backend/serializers/PipelineSerializer.py @@ -29,9 +29,14 @@ def serialize(pipeline_model, task_models=None): if task_models != None: pipeline["tasks"] = [ TaskSerializer.serialize(t) for t in task_models ] - # TODO Remove when certain these properties are not needed pipeline["group"] = UUIDSerializer.serialize(pipeline_model.group.uuid) - pipeline["current_run"] = UUIDSerializer.serialize(pipeline_model.current_run.uuid) - pipeline["last_run"] = UUIDSerializer.serialize(pipeline_model.last_run.uuid) + + pipeline["current_run"] = None + if pipeline_model.current_run != None: + pipeline["current_run"] = UUIDSerializer.serialize(pipeline_model.current_run.uuid) + + pipeline["last_run"] = None + if pipeline_model.last_run != None: + pipeline["last_run"] = UUIDSerializer.serialize(pipeline_model.last_run.uuid) return pipeline \ No newline at end of file From d30bf9166dd00db728fffe6bbd055436661971bd Mon Sep 17 00:00:00 2001 From: Nathan Freeman Date: Wed, 4 Sep 2024 13:04:27 -0500 Subject: [PATCH 31/38] Remove old references to exec profile properties --- .../core/tasks/executors/builders/singularity/Singularity.py | 4 ++-- src/engine/src/owe_python_sdk/TaskExecutor.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/engine/src/core/tasks/executors/builders/singularity/Singularity.py b/src/engine/src/core/tasks/executors/builders/singularity/Singularity.py index 8161e363..e7f3948a 100644 --- a/src/engine/src/core/tasks/executors/builders/singularity/Singularity.py +++ b/src/engine/src/core/tasks/executors/builders/singularity/Singularity.py @@ -131,8 +131,8 @@ def _create_job(self): # Job spec v1jobspec_props = {} - if self.task.max_exec_time > 0: - v1jobspec_props["active_deadline_seconds"] = self.task.max_exec_time + if self.task.execution_profile.max_exec_time > 0: + v1jobspec_props["active_deadline_seconds"] = self.task.execution_profile.max_exec_time self.task.max_retries = 0 if self.task.max_retries < 0 else self.task.max_retries job_spec = V1JobSpec( diff --git a/src/engine/src/owe_python_sdk/TaskExecutor.py b/src/engine/src/owe_python_sdk/TaskExecutor.py index 037db081..43fa3d95 100644 --- a/src/engine/src/owe_python_sdk/TaskExecutor.py +++ b/src/engine/src/owe_python_sdk/TaskExecutor.py @@ -44,7 +44,7 @@ def __init__(self, task, ctx, exchange: EventExchange, plugins=[]): def _set_polling_interval(self, task): # Default is already the DEFAULT_POLLING_INTERVAL - if task.max_exec_time <= 0: return + if task.execution_profile.max_exec_time <= 0: return # TODO Replace line below. # Calculate the interval based on the max_exec_time of the task From 6e35643055247bea18584675dbe2283fdb6cae6d Mon Sep 17 00:00:00 2001 From: Nathan Freeman Date: Wed, 4 Sep 2024 13:31:20 -0500 Subject: [PATCH 32/38] Fix old refs to task execution profile props that existed directly on the task objecct --- src/engine/src/core/tasks/executors/Application.py | 4 ++-- src/engine/src/core/tasks/executors/Function.py | 2 +- src/engine/src/core/tasks/executors/builders/kaniko/Kaniko.py | 4 ++-- .../core/tasks/executors/builders/singularity/Singularity.py | 4 ++-- 4 files changed, 7 insertions(+), 7 deletions(-) diff --git a/src/engine/src/core/tasks/executors/Application.py b/src/engine/src/core/tasks/executors/Application.py index 65dd7eb4..a0f55849 100644 --- a/src/engine/src/core/tasks/executors/Application.py +++ b/src/engine/src/core/tasks/executors/Application.py @@ -50,9 +50,9 @@ def execute(self): ) # Job spec - self.task.max_retries = 0 if self.task.max_retries < 0 else self.task.max_retries + self.task.execution_profile.max_retries = 0 if self.task.execution_profile.max_retries < 0 else self.task.execution_profile.max_retries job_spec = client.V1PodSpec( - backoff_limit=(self.task.max_retries), template=template) + backoff_limit=(self.task.execution_profile.max_retries), template=template) # Job metadata metadata = client.V1ObjectMeta( diff --git a/src/engine/src/core/tasks/executors/Function.py b/src/engine/src/core/tasks/executors/Function.py index 2a70dd58..9ebece67 100644 --- a/src/engine/src/core/tasks/executors/Function.py +++ b/src/engine/src/core/tasks/executors/Function.py @@ -73,7 +73,7 @@ def execute(self): namespace=KUBERNETES_NAMESPACE, ), spec=client.V1JobSpec( - backoff_limit=0 if self.task.max_retries < 0 else self.task.max_retries, + backoff_limit=0 if self.task.execution_profile.max_retries < 0 else self.task.execution_profile.max_retries, template=client.V1PodTemplateSpec( spec=client.V1PodSpec( containers=[ diff --git a/src/engine/src/core/tasks/executors/builders/kaniko/Kaniko.py b/src/engine/src/core/tasks/executors/builders/kaniko/Kaniko.py index cb26201e..4281d988 100644 --- a/src/engine/src/core/tasks/executors/builders/kaniko/Kaniko.py +++ b/src/engine/src/core/tasks/executors/builders/kaniko/Kaniko.py @@ -169,9 +169,9 @@ def _create_job(self): ) # Job spec - self.task.max_retries = 0 if self.task.max_retries < 0 else self.task.max_retries + self.task.execution_profile.max_retries = 0 if self.task.execution_profile.max_retries < 0 else self.task.execution_profile.max_retries job_spec = client.V1JobSpec( - backoff_limit=(self.task.max_retries), template=template) + backoff_limit=(self.task.execution_profile.max_retries), template=template) # Job metadata metadata = client.V1ObjectMeta( diff --git a/src/engine/src/core/tasks/executors/builders/singularity/Singularity.py b/src/engine/src/core/tasks/executors/builders/singularity/Singularity.py index e7f3948a..a3509c3c 100644 --- a/src/engine/src/core/tasks/executors/builders/singularity/Singularity.py +++ b/src/engine/src/core/tasks/executors/builders/singularity/Singularity.py @@ -134,10 +134,10 @@ def _create_job(self): if self.task.execution_profile.max_exec_time > 0: v1jobspec_props["active_deadline_seconds"] = self.task.execution_profile.max_exec_time - self.task.max_retries = 0 if self.task.max_retries < 0 else self.task.max_retries + self.task.execution_profile.max_retries = 0 if self.task.execution_profile.max_retries < 0 else self.task.execution_profile.max_retries job_spec = V1JobSpec( **v1jobspec_props, - backoff_limit=self.task.max_retries, + backoff_limit=self.task.execution_profile.max_retries, template=template ) From d2afaf9429ec40475fa280d83d7f683b54638d66 Mon Sep 17 00:00:00 2001 From: Nathan Freeman Date: Wed, 4 Sep 2024 15:32:25 -0500 Subject: [PATCH 33/38] Write stdout and stderr in task executor as string and not bytes --- src/engine/src/owe_python_sdk/TaskExecutor.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/engine/src/owe_python_sdk/TaskExecutor.py b/src/engine/src/owe_python_sdk/TaskExecutor.py index 43fa3d95..2c812cfb 100644 --- a/src/engine/src/owe_python_sdk/TaskExecutor.py +++ b/src/engine/src/owe_python_sdk/TaskExecutor.py @@ -68,10 +68,10 @@ def _set_output(self, filename, value, flag="wb"): with open(f"{self.task.output_dir}{filename.lstrip('/')}", flag) as file: file.write(value) - def _stdout(self, value, flag="wb"): + def _stdout(self, value, flag="w"): self._set_output(STDOUT, value, flag=flag) - def _stderr(self, value, flag="wb"): + def _stderr(self, value, flag="w"): self._set_output(STDERR, value, flag=flag) def _get_task_output_files(self): From 7dabe07d8a09a03542195dabd9aa00bd63d4b237 Mon Sep 17 00:00:00 2001 From: Nathan Freeman Date: Wed, 4 Sep 2024 16:14:48 -0500 Subject: [PATCH 34/38] Decode the bytes to UTF8 in Tapis notification middleware --- .../event_handlers/notifications/TapisWorkflowsAPIBackend.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/engine/src/contrib/tapis/middleware/event_handlers/notifications/TapisWorkflowsAPIBackend.py b/src/engine/src/contrib/tapis/middleware/event_handlers/notifications/TapisWorkflowsAPIBackend.py index 638408dd..7c58cc29 100644 --- a/src/engine/src/contrib/tapis/middleware/event_handlers/notifications/TapisWorkflowsAPIBackend.py +++ b/src/engine/src/contrib/tapis/middleware/event_handlers/notifications/TapisWorkflowsAPIBackend.py @@ -233,7 +233,7 @@ def _tail_output(self, task, filename, flag="rb", max_bytes=10000): with open(f"{task.output_dir}{filename.lstrip('/')}", flag) as file: file.seek(offset, os.SEEK_END) - return str(file.read()) + return file.read().decode("utf-8") def _tail_stdout(self, task): return self._tail_output(task, STDOUT) From c836db31c3e7a6d820a6f5e2bdd923f008c5e0d8 Mon Sep 17 00:00:00 2001 From: Nathan Freeman Date: Thu, 5 Sep 2024 13:39:59 -0500 Subject: [PATCH 35/38] Remove token from job pods --- src/engine/src/core/tasks/executors/Application.py | 1 + src/engine/src/core/tasks/executors/Function.py | 1 + .../src/core/tasks/executors/builders/kaniko/Kaniko.py | 5 ++++- .../core/tasks/executors/builders/singularity/Singularity.py | 1 + 4 files changed, 7 insertions(+), 1 deletion(-) diff --git a/src/engine/src/core/tasks/executors/Application.py b/src/engine/src/core/tasks/executors/Application.py index a0f55849..7cd2cda6 100644 --- a/src/engine/src/core/tasks/executors/Application.py +++ b/src/engine/src/core/tasks/executors/Application.py @@ -43,6 +43,7 @@ def execute(self): # Pod template and pod template spec template = client.V1PodTemplateSpec( spec=client.V1PodSpec( + automount_service_account_token=False, containers=[container], restart_policy="Never", volumes=volumes diff --git a/src/engine/src/core/tasks/executors/Function.py b/src/engine/src/core/tasks/executors/Function.py index 9ebece67..318883ab 100644 --- a/src/engine/src/core/tasks/executors/Function.py +++ b/src/engine/src/core/tasks/executors/Function.py @@ -76,6 +76,7 @@ def execute(self): backoff_limit=0 if self.task.execution_profile.max_retries < 0 else self.task.execution_profile.max_retries, template=client.V1PodTemplateSpec( spec=client.V1PodSpec( + automount_service_account_token=False, containers=[ client.V1Container( name=job_name, diff --git a/src/engine/src/core/tasks/executors/builders/kaniko/Kaniko.py b/src/engine/src/core/tasks/executors/builders/kaniko/Kaniko.py index 4281d988..b17f51f1 100644 --- a/src/engine/src/core/tasks/executors/builders/kaniko/Kaniko.py +++ b/src/engine/src/core/tasks/executors/builders/kaniko/Kaniko.py @@ -164,7 +164,10 @@ def _create_job(self): # Pod template and pod template spec template = client.V1PodTemplateSpec( spec=client.V1PodSpec( - containers=[container], restart_policy="Never", volumes=volumes + automount_service_account_token=False, + containers=[container], + restart_policy="Never", + volumes=volumes ) ) diff --git a/src/engine/src/core/tasks/executors/builders/singularity/Singularity.py b/src/engine/src/core/tasks/executors/builders/singularity/Singularity.py index a3509c3c..b7d72e16 100644 --- a/src/engine/src/core/tasks/executors/builders/singularity/Singularity.py +++ b/src/engine/src/core/tasks/executors/builders/singularity/Singularity.py @@ -123,6 +123,7 @@ def _create_job(self): # Pod template and pod template spec template = V1PodTemplateSpec( spec=V1PodSpec( + automount_service_account_token=False, containers=[container], restart_policy="Never", volumes=volumes From 188199dc61d128d50610b70599f1efe4755507f1 Mon Sep 17 00:00:00 2001 From: Nathan Freeman Date: Thu, 5 Sep 2024 14:31:08 -0500 Subject: [PATCH 36/38] Debug --- .../src/core/workflows/WorkflowExecutor.py | 42 +++++++++---------- 1 file changed, 21 insertions(+), 21 deletions(-) diff --git a/src/engine/src/core/workflows/WorkflowExecutor.py b/src/engine/src/core/workflows/WorkflowExecutor.py index a3390729..81250ba8 100644 --- a/src/engine/src/core/workflows/WorkflowExecutor.py +++ b/src/engine/src/core/workflows/WorkflowExecutor.py @@ -139,8 +139,8 @@ def __init__(self, _id=None, plugins=[]): self._set_initial_state() # Logging formatters. Makes logs more useful and pretty - def p_str(self, status): return f"{lbuf('[PIPELINE]')} {self.state.ctx.idempotency_key} {status} {self.state.ctx.pipeline.id}" - def t_str(self, task, status): return f"{lbuf('[TASK]')} {self.state.ctx.idempotency_key} {status} {self.state.ctx.pipeline.id}.{task.id}" + def p_log(self, status): return f"{lbuf('[PIPELINE]')} {self.state.ctx.idempotency_key} {status} {self.state.ctx.pipeline.id}" + def t_log(self, task, status): return f"{lbuf('[TASK]')} {self.state.ctx.idempotency_key} {status} {self.state.ctx.pipeline.id}.{task.id}" @interruptable() def start(self, ctx, threads): @@ -168,7 +168,7 @@ def start(self, ctx, threads): unstarted_threads = self._fetch_ready_tasks() # Log the pipeline status change - self.state.ctx.logger.info(self.p_str("ACTIVE")) + self.state.ctx.logger.info(self.p_log("ACTIVE")) # Publish the active event self.publish(Event(PIPELINE_ACTIVE, self.state.ctx)) @@ -202,7 +202,7 @@ def _staging(self, ctx): # Prepare task objects and create the directory structure for task output and execution self._prepare_tasks() - self.state.ctx.logger.info(f'{self.p_str("STAGING")} {self.state.ctx.pipeline_run.uuid}') + self.state.ctx.logger.info(f'{self.p_log("STAGING")} {self.state.ctx.pipeline_run.uuid}') # Notification handlers are used to relay/persist updates about a pipeline run # and its tasks to some external resource @@ -228,7 +228,7 @@ def _prepare_tasks(self): work detailed in the task definition.""" self.state.ctx.output = {} for task in self.state.ctx.pipeline.tasks: - self.state.ctx.logger.info(self.t_str(task, "STAGING")) + self.state.ctx.logger.info(self.t_log(task, "STAGING")) # Publish the task active event self.publish(Event(TASK_STAGING, self.state.ctx, task=task)) @@ -323,9 +323,8 @@ def _start_task(self, task): expression_error = True task_result = TaskResult(1, errors=[str(e)]) - # Execute the task + # Stage task inputs if not skip and not expression_error: - # Stage task inputs task_input_file_staging_service = self.container.load( "TaskInputFileStagingService" ) @@ -337,7 +336,7 @@ def _start_task(self, task): task, TaskResult(1, errors=[str(e)]) ) - self.state.ctx.logger.info(self.t_str(task, "FAILED")) + self.state.ctx.logger.info(self.t_log(task, "FAILED")) self.publish(Event(TASK_FAILED, self.state.ctx, task=task)) # NOTE Triggers hook _on_change_ready_task @@ -345,7 +344,7 @@ def _start_task(self, task): return # Log the task status - self.state.ctx.logger.info(self.t_str(task, "ACTIVE")) + self.state.ctx.logger.info(self.t_log(task, "ACTIVE")) # Publish the task active event self.publish(Event(TASK_ACTIVE, self.state.ctx, task=task)) @@ -356,8 +355,8 @@ def _start_task(self, task): self.state.ctx.pipeline_run.uuid, task ) - - # Run the task executor and get the task result + + # Execute the task task_result = executor.execute() # Set the output of the task @@ -402,6 +401,7 @@ def _on_task_terminal_state(self, task, task_result): # Run the on_pipeline_terminal_state callback if all tasks are complete. if len(self.state.tasks) == len(self.state.finished): + print("PIPELINE SHOWING AS COMPLETED") self._on_pipeline_terminal_state(event=PIPELINE_COMPLETED) return [] @@ -433,7 +433,7 @@ def _on_pipeline_terminal_state(self, event=None, message=""): # - pipeline.execution_profile.success_condidition = ALL_TASKS_MUST_COMPLETE # - pipeline.execution_profile.success_condidition = ALL_TASKS_MUST_FAIL - self.state.ctx.logger.info(self.p_str(msg)) + self.state.ctx.logger.info(self.p_log(msg)) # Publish the event. Triggers the archivers if there are any on ...COMPLETE self.publish(Event(event, self.state.ctx)) @@ -445,7 +445,7 @@ def _on_pipeline_terminal_state(self, event=None, message=""): @interruptable() def _on_task_completed(self, task, task_result): # Log the completion - self.state.ctx.logger.info(self.t_str(task, "COMPLETED")) + self.state.ctx.logger.info(self.t_log(task, "COMPLETED")) # Notify the subscribers that the task was completed self.publish(Event(TASK_COMPLETED, self.state.ctx, task=task, result=task_result)) @@ -458,7 +458,7 @@ def _on_task_completed(self, task, task_result): @interruptable() def _on_task_skipped(self, task, _): # Log the task active - self.state.ctx.logger.info(self.t_str(task, "SKIPPED")) + self.state.ctx.logger.info(self.t_log(task, "SKIPPED")) # Publish the task active event self.publish(Event(TASK_SKIPPED, self.state.ctx, task=task)) @@ -470,7 +470,7 @@ def _on_task_skipped(self, task, _): @interruptable() def _on_task_failed(self, task, task_result): # Log the failure - self.state.ctx.logger.info(self.t_str(task, f"FAILED: {task_result.errors}")) + self.state.ctx.logger.info(self.t_log(task, f"FAILED: {task_result.errors}")) # Notify the subscribers that the task was completed self.publish(Event(TASK_FAILED, self.state.ctx, task=task, result=task_result)) @@ -576,7 +576,7 @@ def _prepare_pipeline(self): def _prepare_pipeline_fs(self): """Creates all of the directories necessary to run the pipeline, store temp files, and cache data""" - server_logger.debug(self.p_str("PREPARING FILESYSTEM")) + server_logger.debug(self.p_log("PREPARING FILESYSTEM")) # Set the directories # References to paths on the nfs server @@ -705,7 +705,7 @@ def _deregister_executor(self, run_uuid, task): executor.cleanup() del self.state.executors[f"{run_uuid}.{task.id}"] # TODO use server logger below - # self.state.ctx.logger.debug(self.t_str(task, "EXECUTOR DEREGISTERED")) + # self.state.ctx.logger.debug(self.t_log(task, "EXECUTOR DEREGISTERED")) @interruptable() def _get_executor(self, run_uuid, task): @@ -745,7 +745,7 @@ def _setup_loggers(self): @interruptable(rollback="_reset_event_exchange") def _initialize_notification_handlers(self): - self.state.ctx.logger.debug(self.p_str("INITIALIZING NOTIFICATION HANDLERS")) + self.state.ctx.logger.debug(self.p_log("INITIALIZING NOTIFICATION HANDLERS")) # Initialize the notification event handlers from plugins. Notification event handlers are used to persist updates about the # pipeline and its tasks for plugin in self._plugins: @@ -768,7 +768,7 @@ def _initialize_archivers(self): # No archivers specified. Return if len(self.state.ctx.archives) < 1: return - self.state.ctx.logger.debug(self.p_str("INITIALIZING ARCHIVERS")) + self.state.ctx.logger.debug(self.p_log("INITIALIZING ARCHIVERS")) # TODO Handle for multiple archives self.state.ctx.archive = self.state.ctx.archives[0] @@ -797,7 +797,7 @@ def _initialize_archivers(self): ), None) if middleware == None: - self.state.ctx.logger.error(self.p_str(f"FAILED TO INITIALIZE ARCHIVER: No Archive Middleware found with type {self.state.ctx.archive.type}")) + self.state.ctx.logger.error(self.p_log(f"FAILED TO INITIALIZE ARCHIVER: No Archive Middleware found with type {self.state.ctx.archive.type}")) return try: @@ -847,7 +847,7 @@ def _on_change_state(self, state): return # Log the terminating status - self.state.ctx.logger.info(self.p_str("TERMINATING")) + self.state.ctx.logger.info(self.p_log("TERMINATING")) # Publish the termination event self.publish(Event(PIPELINE_TERMINATED, self.state.ctx)) From 5f2ee22ed754a4483ad7399d77a5a827294fd5ac Mon Sep 17 00:00:00 2001 From: Nathan Freeman Date: Thu, 5 Sep 2024 17:06:01 -0500 Subject: [PATCH 37/38] debug --- .../src/core/workflows/WorkflowExecutor.py | 39 ++++++++++--------- 1 file changed, 20 insertions(+), 19 deletions(-) diff --git a/src/engine/src/core/workflows/WorkflowExecutor.py b/src/engine/src/core/workflows/WorkflowExecutor.py index 81250ba8..5ab4e236 100644 --- a/src/engine/src/core/workflows/WorkflowExecutor.py +++ b/src/engine/src/core/workflows/WorkflowExecutor.py @@ -161,7 +161,7 @@ def start(self, ctx, threads): ) if not validated: - self._on_pipeline_terminal_state(event=PIPELINE_FAILED, message=err) + self._handle_pipeline_terminal_state(event=PIPELINE_FAILED, message=err) return # Get the first tasks @@ -177,7 +177,7 @@ def start(self, ctx, threads): self.state.ready_tasks += unstarted_threads except Exception as e: # Trigger the terminal state callback. - self._on_pipeline_terminal_state(event=PIPELINE_FAILED, message=str(e)) + self._handle_pipeline_terminal_state(event=PIPELINE_FAILED, message=str(e)) @interruptable() def _staging(self, ctx): @@ -216,9 +216,9 @@ def _staging(self, ctx): try: self._set_tasks(self.state.ctx.pipeline.tasks) except InvalidDependenciesError as e: - self._on_pipeline_terminal_state(PIPELINE_FAILED, message=str(e)) + self._handle_pipeline_terminal_state(PIPELINE_FAILED, message=str(e)) except Exception as e: - self._on_pipeline_terminal_state(PIPELINE_FAILED, message=str(e)) + self._handle_pipeline_terminal_state(PIPELINE_FAILED, message=str(e)) @interruptable() def _prepare_tasks(self): @@ -269,7 +269,7 @@ def _prepare_tasks(self): task = template_mapper.map(task, task.uses) except Exception as e: # Trigger the terminal state callback. - self._on_pipeline_terminal_state(event=PIPELINE_FAILED, message=str(e)) + self._handle_pipeline_terminal_state(event=PIPELINE_FAILED, message=str(e)) # Add a key to the output for the task self.state.ctx.output[task.id] = None @@ -332,7 +332,7 @@ def _start_task(self, task): task_input_file_staging_service.stage(task) except TaskInputStagingError as e: # Get the next queued tasks if any - unstarted_threads = self._on_task_terminal_state( + unstarted_threads = self._handle_task_terminal_state( task, TaskResult(1, errors=[str(e)]) ) @@ -372,21 +372,21 @@ def _start_task(self, task): task_result = TaskResult(1, errors=[str(e)]) # Get the next queued tasks if any - unstarted_threads = self._on_task_terminal_state(task, task_result) + unstarted_threads = self._handle_task_terminal_state(task, task_result) # NOTE Triggers hook _on_change_ready_task self.state.ready_tasks += unstarted_threads @interruptable() - def _on_task_terminal_state(self, task, task_result): + def _handle_task_terminal_state(self, task, task_result): # Determine the correct callback to use. - callback = self._on_task_completed + callback = self._handle_task_completed if task_result.skipped: - callback = self._on_task_skipped + callback = self._handle_task_skipped if not task_result.success and not task_result.skipped: - callback = self._on_task_failed + callback = self._handle_task_failed # Call the callback. Marks task as completed or failed. # Also publishes a TASK_COMPLETED or TASK_FAILED based on the result @@ -399,21 +399,22 @@ def _on_task_terminal_state(self, task, task_result): # during the initialization and execution of the task executor self._deregister_executor(self.state.ctx.pipeline_run.uuid, task) - # Run the on_pipeline_terminal_state callback if all tasks are complete. + # Run the handle_pipeline_terminal_state callback if all tasks are complete. if len(self.state.tasks) == len(self.state.finished): - print("PIPELINE SHOWING AS COMPLETED") - self._on_pipeline_terminal_state(event=PIPELINE_COMPLETED) + print("*********** PIPELINE COMPLETED") + self._handle_pipeline_terminal_state(event=PIPELINE_COMPLETED) return [] if task_result.status > 0 and task.can_fail == False: - self._on_pipeline_terminal_state(event=PIPELINE_FAILED) + print("*********** PIPELINE FAILED") + self._handle_pipeline_terminal_state(event=PIPELINE_FAILED) return [] # Execute all possible queued tasks return self._fetch_ready_tasks() @interruptable() - def _on_pipeline_terminal_state(self, event=None, message=""): + def _handle_pipeline_terminal_state(self, event=None, message=""): # No event was provided. Determine if complete or failed from number # of failed tasks if event == None: @@ -443,7 +444,7 @@ def _on_pipeline_terminal_state(self, event=None, message=""): self._set_initial_state() @interruptable() - def _on_task_completed(self, task, task_result): + def _handle_task_completed(self, task, task_result): # Log the completion self.state.ctx.logger.info(self.t_log(task, "COMPLETED")) @@ -456,7 +457,7 @@ def _on_task_completed(self, task, task_result): self.state.succeeded.append(task.id) @interruptable() - def _on_task_skipped(self, task, _): + def _handle_task_skipped(self, task, _): # Log the task active self.state.ctx.logger.info(self.t_log(task, "SKIPPED")) @@ -468,7 +469,7 @@ def _on_task_skipped(self, task, _): self.state.skipped.append(task.id) @interruptable() - def _on_task_failed(self, task, task_result): + def _handle_task_failed(self, task, task_result): # Log the failure self.state.ctx.logger.info(self.t_log(task, f"FAILED: {task_result.errors}")) From d02b932005566b47419c3b19f0d618d4c6491dbd Mon Sep 17 00:00:00 2001 From: Nathan Freeman Date: Fri, 6 Sep 2024 10:07:25 -0500 Subject: [PATCH 38/38] Remove debug --- src/engine/src/core/workflows/WorkflowExecutor.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/engine/src/core/workflows/WorkflowExecutor.py b/src/engine/src/core/workflows/WorkflowExecutor.py index 5ab4e236..856e66ae 100644 --- a/src/engine/src/core/workflows/WorkflowExecutor.py +++ b/src/engine/src/core/workflows/WorkflowExecutor.py @@ -401,12 +401,10 @@ def _handle_task_terminal_state(self, task, task_result): # Run the handle_pipeline_terminal_state callback if all tasks are complete. if len(self.state.tasks) == len(self.state.finished): - print("*********** PIPELINE COMPLETED") self._handle_pipeline_terminal_state(event=PIPELINE_COMPLETED) return [] if task_result.status > 0 and task.can_fail == False: - print("*********** PIPELINE FAILED") self._handle_pipeline_terminal_state(event=PIPELINE_FAILED) return []