diff --git a/src/api/specs/WorkflowsAPI.yaml b/src/api/specs/WorkflowsAPI.yaml index e2e3130a..f5566114 100644 --- a/src/api/specs/WorkflowsAPI.yaml +++ b/src/api/specs/WorkflowsAPI.yaml @@ -2517,6 +2517,78 @@ components: destination: $ref: '#/components/schemas/Destination' + Context: + type: object + required: + - type + properties: + branch: + type: string + credentials: + default: null + build_file_path: + type: string + sub_path: + type: string + type: + $ref: "#/components/schemas/EnumContextType" + url: + type: string + visibility: + $ref: "#/components/schemas/EnumContextVisibility" + + + Destination: + anyOf: + - $ref: '#/components/schemas/DockerhubDestination' + - $ref: '#/components/schemas/LocalDestination' + discriminator: + propertyName: type + mapping: + dockerhub: "#/components/schemas/DockerhubDestination" + local: "#/components/schemas/LocalDestination" + + BaseDestination: + type: object + required: + - type + properties: + type: + $ref: '#/components/schemas/EnumDestinationType' + + LocalDestination: + allOf: + - $ref: "#/components/schemas/BaseDestination" + - type: object + properties: + filename: + type: string + + RegistryDestination: + allOf: + - $ref: "#/components/schemas/BaseDestination" + - type: object + properties: + credentials: + $ref: '#/components/schemas/DockerhubCred' + tag: + type: string + url: + type: string + + DockerhubDestination: + allOf: + - $ref: '#/components/schemas/BaseDestination' + - $ref: '#/components/schemas/RegistryDestination' + + DockerhubCred: + type: object + properties: + token: + type: string + username: + type: string + RequestTask: allOf: - $ref: '#/components/schemas/BaseTask' @@ -2611,40 +2683,6 @@ components: type: boolean can_skip: type: boolean - - # --- Context --- - Context: - type: object - required: - - type - properties: - branch: - type: string - credentials: - default: null - build_file_path: - type: string - sub_path: - type: string - type: - $ref: "#/components/schemas/EnumContextType" - url: - type: string - visibility: - $ref: "#/components/schemas/EnumContextVisibility" - - # --- Destination --- - Destination: - type: object - properties: - credentials: - default: null - tag: - type: string - type: - $ref: '#/components/schemas/EnumDestinationType' - url: - type: string # --- Group and GroupUser --- GroupDetail: diff --git a/src/api/src/backend/helpers/PipelineDispatchRequestBuilder.py b/src/api/src/backend/helpers/PipelineDispatchRequestBuilder.py index 2130b2d4..50e1079c 100644 --- a/src/api/src/backend/helpers/PipelineDispatchRequestBuilder.py +++ b/src/api/src/backend/helpers/PipelineDispatchRequestBuilder.py @@ -3,6 +3,7 @@ from backend.utils.parse_directives import parse_directives as parse from backend.conf.constants import WORKFLOW_EXECUTOR_ACCESS_TOKEN +from backend.serializers import TaskSerializer, PipelineSerializer class PipelineDispatchRequestBuilder: @@ -27,45 +28,31 @@ def build( # Convert tasks' schema to the schema expected by the workflow engine tasks_request = [] for task in tasks: - # Convert the task to a dict and add the execution profile property - # for all tasks. - preprocessed_task = self._preprocess_task(task) + # Convert the task to a dict + serialized_task = TaskSerializer.serialize(task) # Handle the task-specific schema conversions - task_request = getattr(self, f"_{task.type}")(preprocessed_task, task) + task_request = getattr(self, f"_{task.type}")(serialized_task, task) tasks_request.append(task_request) - # Get the archives for this pipeline - archives = [] - pipeline_archives = pipeline.archives.all() - for pipeline_archive in pipeline_archives: - # Fetch any credentials or identities for required to - # access this archive - # TODO Handle creds/identity for archives - archives.append(model_to_dict(pipeline_archive.archive)) - # Convert pipleline to a dict and build the request + # Serialize the pipeline and it's tasks request = {} - - request["group"] = model_to_dict(group) - request["pipeline"] = model_to_dict(pipeline) - request["archives"] = archives - - # Move the execution profile props from the pipeline object to the - # execution profile property - request["pipeline"]["execution_profile"] = { - "max_exec_time": request["pipeline"]["max_exec_time"], - "duplicate_submission_policy": request["pipeline"]["duplicate_submission_policy"], - "max_retries": request["pipeline"]["max_retries"], - "invocation_mode": request["pipeline"]["invocation_mode"], - "retry_policy": request["pipeline"]["retry_policy"] - } - + request["pipeline"] = PipelineSerializer.serialize(pipeline) request["pipeline"]["tasks"] = tasks_request - # Populate the env for this request. Populate values from SK + # Populate the env for this request. request["env"] = request["pipeline"]["env"] + # Serialize the archives + request["archives"] = [] + pipeline_archives = pipeline.archives.all() + for pipeline_archive in pipeline_archives: + # Fetch any credentials or identities for required to + # access this archive + # TODO Handle creds/identity for archives + request["archives"].append(model_to_dict(pipeline_archive.archive)) + req_args = {} for key in args: req_args[key] = args[key].dict() @@ -76,22 +63,26 @@ def build( "value": WORKFLOW_EXECUTOR_ACCESS_TOKEN }, "tapis_tenant_id": { - "value": request["group"]["tenant_id"] + "value": group.tenant_id }, "tapis_pipeline_owner": { "value": request["pipeline"]["owner"] }, + "tapis_workflows_group_id": { + "value": group.id + }, **req_args } request["meta"] = {} + # Properties to help uniquely identity a pipeline submission. If the workflow # executor is currently running a pipeline with the same values as the # properties provided in "idempotency_key", the workflow executor # will then take the appropriate action dictated by the # pipeline.duplicate_submission_policy (allow, allow_terminate, deny) request["meta"]["idempotency_key"] = [ - "group.id", + "args.tapis_workflows_group_id", "args.tapis_tenant_id", "pipeline.id" ] @@ -106,36 +97,27 @@ def build( request["pipeline_run"]["name"] = name or uuid request["pipeline_run"]["description"] = description - # Parse the directives from the commit message - directives_request = {} - if commit != None: - directives_request = parse(commit) + # # Parse the directives from the commit message + # directives_request = {} + # if commit != None: + # directives_request = parse(commit) - if directives != None and len(directives) > 0: - directive_str = f"[{'|'.join([d for d in directives])}]" - directives_request = parse(directive_str) + # if directives != None and len(directives) > 0: + # directive_str = f"[{'|'.join([d for d in directives])}]" + # directives_request = parse(directive_str) - request["directives"] = directives_request + # request["directives"] = directives_request + + request["directives"] = {} return request def _image_build(self, task_request, task): - task_request["context"] = model_to_dict(task.context) - - # Resolve which context credentials to use if any provided context_creds = None if task.context.credentials != None: context_creds = task.context.credentials - - # Identity takes precedence over credentials placed directly in - # the context - if task.context.identity != None: - context_creds = task.context.identity.credentials - task_request["context"]["credentials"] = None if context_creds != None: - task_request["context"]["credentials"] = model_to_dict(context_creds) - # Get the context credentials data context_cred_data = self.secret_service.get_secret(context_creds.sk_id) task_request["context"]["credentials"] = context_cred_data @@ -146,18 +128,11 @@ def _image_build(self, task_request, task): del task_request["context"]["recipe_file_path"] # Destination credentials - task_request["destination"] = model_to_dict(task.destination) - destination_creds = None if task.destination.credentials != None: destination_creds = task.destination.credentials - if task.destination.identity != None: - destination_creds = task.destination.identity.credentials - if destination_creds != None: - task_request["destination"]["credentials"] = model_to_dict(destination_creds) - # Get the context credentials data destination_cred_data = self.secret_service.get_secret(destination_creds.sk_id) task_request["destination"]["credentials"] = destination_cred_data @@ -183,20 +158,4 @@ def _function(self, task_request, _): return task_request def _container_run(self, task_request, _): - return task_request - - def _preprocess_task(self, task): - # Convert to dict - task_request = model_to_dict(task) - - # Map task data model properties to the workflow engine expected schema for - # the execution profile - task_request["execution_profile"] = { - "flavor": task_request["flavor"], - "max_exec_time": task_request["max_exec_time"], - "max_retries": task_request["max_retries"], - "invocation_mode": task_request["invocation_mode"], - "retry_policy": task_request["retry_policy"] - } - return task_request \ No newline at end of file diff --git a/src/api/src/backend/serializers/ApplicationTaskSerializer.py b/src/api/src/backend/serializers/ApplicationTaskSerializer.py new file mode 100644 index 00000000..2ad78320 --- /dev/null +++ b/src/api/src/backend/serializers/ApplicationTaskSerializer.py @@ -0,0 +1,11 @@ +from backend.serializers import BaseTaskSerializer + + +class ApplicationTaskSerializer: + @staticmethod + def serialize(model, base=None): + task = base if base != None else BaseTaskSerializer.serialize(model) + + task["image"] = model.image + + return task \ No newline at end of file diff --git a/src/api/src/backend/serializers/BaseTaskSerializer.py b/src/api/src/backend/serializers/BaseTaskSerializer.py index d9f75278..ac4f845c 100644 --- a/src/api/src/backend/serializers/BaseTaskSerializer.py +++ b/src/api/src/backend/serializers/BaseTaskSerializer.py @@ -18,6 +18,7 @@ def serialize(model): # Build execution profile task["execution_profile"] = { + "flavor": model.flavor, "invocation_mode": model.invocation_mode, "max_exec_time": model.max_exec_time, "max_retries": model.max_retries, diff --git a/src/api/src/backend/serializers/ContextSerializer.py b/src/api/src/backend/serializers/ContextSerializer.py new file mode 100644 index 00000000..af0136c5 --- /dev/null +++ b/src/api/src/backend/serializers/ContextSerializer.py @@ -0,0 +1,21 @@ +from backend.serializers.CredentialsSerializer import CredentialsSerializer +from backend.serializers.UUIDSerializer import UUIDSerializer + + +class ContextSerializer: + @staticmethod + def serialize(model): + if model == None: + return None + context = {} + context["branch"] = model.branch + context["credentials"] = CredentialsSerializer.serialize(model.credentials) + context["recipe_file_path"] = model.recipe_file_path + context["sub_path"] = model.sub_path + context["tag"] = model.tag + context["type"] = model.type + context["url"] = model.url + context["visibility"] = model.visibility + context["uuid"] = UUIDSerializer.serialize(model.uuid) + + return context \ No newline at end of file diff --git a/src/api/src/backend/serializers/CredentialsSerializer.py b/src/api/src/backend/serializers/CredentialsSerializer.py new file mode 100644 index 00000000..819208ab --- /dev/null +++ b/src/api/src/backend/serializers/CredentialsSerializer.py @@ -0,0 +1,17 @@ +from backend.serializers.UUIDSerializer import UUIDSerializer + + +class CredentialsSerializer: + @staticmethod + def serialize(model): + if model == None: + return None + + credential = {} + credential["sk_id"] = model.sk_id + credential["owner"] = model.owner + credential["created_at"] = model.created_at + credential["uuid"] = UUIDSerializer.serialize(model.uuid) + + + return credential \ No newline at end of file diff --git a/src/api/src/backend/serializers/DestinationSerializer.py b/src/api/src/backend/serializers/DestinationSerializer.py new file mode 100644 index 00000000..eba826f9 --- /dev/null +++ b/src/api/src/backend/serializers/DestinationSerializer.py @@ -0,0 +1,18 @@ +from backend.serializers.CredentialsSerializer import CredentialsSerializer +from backend.serializers.UUIDSerializer import UUIDSerializer + + +class DestinationSerializer: + @staticmethod + def serialize(model): + if model == None: + return None + destination = {} + destination["tag"] = model.tag + destination["type"] = model.type + destination["url"] = model.url + destination["filename"] = model.filename + destination["credentials"] = CredentialsSerializer.serialize(model.credentials) + destination["uuid"] = UUIDSerializer.serialize(model.uuid) + + return destination \ No newline at end of file diff --git a/src/api/src/backend/serializers/ImageBuildTaskSerializer.py b/src/api/src/backend/serializers/ImageBuildTaskSerializer.py new file mode 100644 index 00000000..d999d0dd --- /dev/null +++ b/src/api/src/backend/serializers/ImageBuildTaskSerializer.py @@ -0,0 +1,16 @@ +from backend.serializers import BaseTaskSerializer +from backend.serializers.ContextSerializer import ContextSerializer +from backend.serializers.DestinationSerializer import DestinationSerializer +from pprint import pprint + + +class ImageBuildTaskSerializer: + @staticmethod + def serialize(model, base=None): + task = base if base != None else BaseTaskSerializer.serialize(model) + + task["builder"] = model.builder + task["context"] = ContextSerializer.serialize(model.context) + task["destination"] = DestinationSerializer.serialize(model.destination) + + return task \ No newline at end of file diff --git a/src/api/src/backend/serializers/PipelineSerializer.py b/src/api/src/backend/serializers/PipelineSerializer.py new file mode 100644 index 00000000..1bc31c5a --- /dev/null +++ b/src/api/src/backend/serializers/PipelineSerializer.py @@ -0,0 +1,42 @@ +from backend.serializers.UUIDSerializer import UUIDSerializer +from backend.serializers.TaskSerializer import TaskSerializer + + +class PipelineSerializer: + @staticmethod + def serialize(pipeline_model, task_models=None): + pipeline = {} + pipeline["id"] = pipeline_model.id + pipeline["description"] = pipeline_model.description + pipeline["created_at"] = pipeline_model.created_at + pipeline["updated_at"] = pipeline_model.updated_at + pipeline["env"] = pipeline_model.env + pipeline["params"] = pipeline_model.params + pipeline["uses"] = pipeline_model.uses + pipeline["enabled"] = pipeline_model.enabled + pipeline["owner"] = pipeline_model.owner + pipeline["execution_profile"] = { + "max_exec_time": pipeline_model.max_exec_time, + "max_retries": pipeline_model.max_retries, + "invocation_mode": pipeline_model.invocation_mode, + "lock_expiration_policy": pipeline_model.lock_expiration_policy, + "duplicate_submission_policy": pipeline_model.duplicate_submission_policy, + "retry_policy": pipeline_model.retry_policy + } + pipeline["uuid"] = UUIDSerializer.serialize(pipeline_model.uuid) + + # Serialize the task models + if task_models != None: + pipeline["tasks"] = [ TaskSerializer.serialize(t) for t in task_models ] + + pipeline["group"] = UUIDSerializer.serialize(pipeline_model.group.uuid) + + pipeline["current_run"] = None + if pipeline_model.current_run != None: + pipeline["current_run"] = UUIDSerializer.serialize(pipeline_model.current_run.uuid) + + pipeline["last_run"] = None + if pipeline_model.last_run != None: + pipeline["last_run"] = UUIDSerializer.serialize(pipeline_model.last_run.uuid) + + return pipeline \ No newline at end of file diff --git a/src/api/src/backend/serializers/RequestTaskSerializer.py b/src/api/src/backend/serializers/RequestTaskSerializer.py new file mode 100644 index 00000000..bb687966 --- /dev/null +++ b/src/api/src/backend/serializers/RequestTaskSerializer.py @@ -0,0 +1,18 @@ +from backend.serializers import BaseTaskSerializer +from backend.serializers.CredentialsSerializer import CredentialsSerializer + + +class RequestTaskSerializer: + @staticmethod + def serialize(model, base=None): + task = base if base != None else BaseTaskSerializer.serialize(model) + + task["auth"] = CredentialsSerializer.serialize(model.auth) + task["data"] = model.data + task["headers"] = model.headers + task["http_method"] = model.http_method + task["protocol"] = model.protocol + task["query_params"] = model.query_params + task["url"] = model.url + + return task \ No newline at end of file diff --git a/src/api/src/backend/serializers/TapisActorTaskSerializer.py b/src/api/src/backend/serializers/TapisActorTaskSerializer.py new file mode 100644 index 00000000..d0050ad5 --- /dev/null +++ b/src/api/src/backend/serializers/TapisActorTaskSerializer.py @@ -0,0 +1,13 @@ +from backend.serializers import BaseTaskSerializer + + +class TapisActorTaskSerializer: + @staticmethod + def serialize(model, base=None): + task = base if base != None else BaseTaskSerializer.serialize(model) + + task["tapis_actor_id"] = model.tapis_job_def + task["poll"] = model.poll + task["tapis_actor_message"] = model.tapis_actor_message + + return task \ No newline at end of file diff --git a/src/api/src/backend/serializers/TapisJobTaskSerializer.py b/src/api/src/backend/serializers/TapisJobTaskSerializer.py new file mode 100644 index 00000000..78e5508d --- /dev/null +++ b/src/api/src/backend/serializers/TapisJobTaskSerializer.py @@ -0,0 +1,12 @@ +from backend.serializers import BaseTaskSerializer + + +class TapisJobTaskSerializer: + @staticmethod + def serialize(model, base=None): + task = base if base != None else BaseTaskSerializer.serialize(model) + + task["tapis_job_def"] = model.tapis_job_def + task["poll"] = model.poll + + return task \ No newline at end of file diff --git a/src/api/src/backend/serializers/TaskSerializer.py b/src/api/src/backend/serializers/TaskSerializer.py index 43e6ee3d..5e7607ff 100644 --- a/src/api/src/backend/serializers/TaskSerializer.py +++ b/src/api/src/backend/serializers/TaskSerializer.py @@ -1,6 +1,20 @@ from backend.serializers.BaseTaskSerializer import BaseTaskSerializer from backend.serializers.FunctionTaskSerializer import FunctionTaskSerializer -from backend.models import TASK_TYPE_FUNCTION +from backend.serializers.ImageBuildTaskSerializer import ImageBuildTaskSerializer +from backend.serializers.TapisActorTaskSerializer import TapisActorTaskSerializer +from backend.serializers.TapisJobTaskSerializer import TapisJobTaskSerializer +from backend.serializers.ApplicationTaskSerializer import ApplicationTaskSerializer +from backend.serializers.TemplateTaskSerializer import TemplateTaskSerializer +from backend.serializers.RequestTaskSerializer import RequestTaskSerializer +from backend.models import ( + TASK_TYPE_FUNCTION, + TASK_TYPE_APPLICATION, + TASK_TYPE_TEMPLATE, + TASK_TYPE_TAPIS_ACTOR, + TASK_TYPE_IMAGE_BUILD, + TASK_TYPE_REQUEST, + TASK_TYPE_TAPIS_JOB +) class TaskSerializer: @staticmethod @@ -10,4 +24,22 @@ def serialize(model): if model.type == TASK_TYPE_FUNCTION: return FunctionTaskSerializer.serialize(model, base=base) + if model.type == TASK_TYPE_TEMPLATE: + return TemplateTaskSerializer.serialize(model, base=base) + + if model.type == TASK_TYPE_IMAGE_BUILD: + return ImageBuildTaskSerializer.serialize(model, base=base) + + if model.type == TASK_TYPE_TAPIS_JOB: + return TapisJobTaskSerializer.serialize(model, base=base) + + if model.type == TASK_TYPE_TAPIS_ACTOR: + return TapisActorTaskSerializer.serialize(model, base=base) + + if model.type == TASK_TYPE_REQUEST: + return RequestTaskSerializer.serialize(model, base=base) + + if model.type == TASK_TYPE_APPLICATION: + return ApplicationTaskSerializer.serialize(model, base=base) + raise NotImplementedError(f"Task Serializer does not have a method for serializing tasks of type '{model.type}'") \ No newline at end of file diff --git a/src/api/src/backend/serializers/TemplateTaskSerializer.py b/src/api/src/backend/serializers/TemplateTaskSerializer.py new file mode 100644 index 00000000..b19aeb3f --- /dev/null +++ b/src/api/src/backend/serializers/TemplateTaskSerializer.py @@ -0,0 +1,11 @@ +from backend.serializers import BaseTaskSerializer + + +class TemplateTaskSerializer: + @staticmethod + def serialize(model, base=None): + # Returning only the base task because the only property relavent to + # the template task (the `uses` property) is handled there. + task = base if base != None else BaseTaskSerializer.serialize(model) + + return task \ No newline at end of file diff --git a/src/api/src/backend/serializers/__init__.py b/src/api/src/backend/serializers/__init__.py index c9e0e722..6ed429ca 100644 --- a/src/api/src/backend/serializers/__init__.py +++ b/src/api/src/backend/serializers/__init__.py @@ -4,4 +4,14 @@ from backend.serializers.BaseTaskSerializer import BaseTaskSerializer from backend.serializers.TaskSerializer import TaskSerializer from backend.serializers.TaskDTOSerializer import TaskDTOSerializer -from backend.serializers.FunctionTaskSerializer import FunctionTaskSerializer \ No newline at end of file +from backend.serializers.FunctionTaskSerializer import FunctionTaskSerializer +from backend.serializers.ImageBuildTaskSerializer import ImageBuildTaskSerializer +from backend.serializers.TapisActorTaskSerializer import TapisActorTaskSerializer +from backend.serializers.TapisJobTaskSerializer import TapisJobTaskSerializer +from backend.serializers.ApplicationTaskSerializer import ApplicationTaskSerializer +from backend.serializers.TemplateTaskSerializer import TemplateTaskSerializer +from backend.serializers.RequestTaskSerializer import RequestTaskSerializer +from backend.serializers.ContextSerializer import ContextSerializer +from backend.serializers.DestinationSerializer import DestinationSerializer +from backend.serializers.CredentialsSerializer import CredentialsSerializer +from backend.serializers.PipelineSerializer import PipelineSerializer \ No newline at end of file diff --git a/src/api/src/backend/services/TaskService.py b/src/api/src/backend/services/TaskService.py index 6758b212..00fc15d6 100644 --- a/src/api/src/backend/services/TaskService.py +++ b/src/api/src/backend/services/TaskService.py @@ -73,11 +73,15 @@ def create(self, pipeline, request): self.rollback() raise e - # Convert the input to jsonserializable + # Convert the input and output to jsonserializable _input = {} for key in request.input: _input[key] = request.input[key].dict() + output = {} + for key in request.output: + output[key] = request.output[key].dict() + # Prepare the uses property uses = getattr(request, "uses", None) if uses != None: @@ -111,7 +115,7 @@ def create(self, pipeline, request): input=_input, installer=getattr(request, "installer", None), id=request.id, - output=request.output, + output=output, packages=getattr(request, "packages", None), pipeline=pipeline, poll=getattr(request, "poll", None), diff --git a/src/api/src/backend/views/Pipelines.py b/src/api/src/backend/views/Pipelines.py index 7f2a267b..34246b17 100644 --- a/src/api/src/backend/views/Pipelines.py +++ b/src/api/src/backend/views/Pipelines.py @@ -2,6 +2,7 @@ from pydantic import ValidationError from django.db import DatabaseError, IntegrityError, OperationalError from django.forms import model_to_dict +from backend.utils import logger from backend.views.RestrictedAPIView import RestrictedAPIView from backend.views.http.responses.errors import ( @@ -25,6 +26,7 @@ from backend.services.GroupService import service as group_service from backend.errors.api import BadRequestError, ServerError from backend.helpers import resource_url_builder +from backend.serializers import PipelineSerializer PIPELINE_TYPE_CI = "ci" @@ -38,42 +40,50 @@ class Pipelines(RestrictedAPIView): def get(self, request, group_id, pipeline_id=None): - # Get the group - group = group_service.get(group_id, request.tenant_id) - if group == None: - return NotFound(f"No group found with id '{group_id}'") - - # Check that the user belongs to the group - if not group_service.user_in_group(request.username, group_id, request.tenant_id): - return Forbidden(message="You do not have access to this group") - - # Get a list of all pipelines that belong to the user's groups - # if no id is provided - if pipeline_id == None: - return self.list(group) - - # Get the pipeline by the id provided in the path params - pipeline = PipelineModel.objects.filter( - id=pipeline_id, - group=group - ).prefetch_related("tasks").first() - - if pipeline == None: - return NotFound(f"Pipeline not found with id '{pipeline_id}'") - - # Get the pipeline tasks. - tasks = pipeline.tasks.all() - tasks_result = [ model_to_dict(task) for task in tasks ] + try: + # Get the group + group = group_service.get(group_id, request.tenant_id) + if group == None: + return NotFound(f"No group found with id '{group_id}'") + + # Check that the user belongs to the group + if not group_service.user_in_group(request.username, group_id, request.tenant_id): + return Forbidden(message="You do not have access to this group") + + # Get a list of all pipelines that belong to the user's groups + # if no id is provided + if pipeline_id == None: + return self.list(group) + + # Get the pipeline by the id provided in the path params + pipeline = PipelineModel.objects.filter( + id=pipeline_id, + group=group + ).prefetch_related("tasks").first() + + if pipeline == None: + return NotFound(f"Pipeline not found with id '{pipeline_id}'") + + # Get the pipeline tasks. + tasks = pipeline.tasks.all() - # Convert model into a dict an - result = model_to_dict(pipeline) - result["tasks"] = tasks_result - - return BaseResponse(result=result) + # Convert pipeline and task models into a dict + result = PipelineSerializer.serialize(pipeline, tasks) + + return BaseResponse(result=result) + except Exception as e: + logger.exception(e.__cause__) + return ServerErrorResp(str(e)) def list(self, group): - pipelines = PipelineModel.objects.filter(group=group) - return ModelListResponse(pipelines) + try: + pipeline_models = PipelineModel.objects.filter(group=group) + result = [PipelineSerializer.serialize(p) for p in pipeline_models] + + return BaseResponse(result=result) + except Exception as e: + logger.exception(e.__cause__) + return ServerErrorResp(str(e)) def post(self, request, group_id, *_, **__): """Pipeline requests with type 'ci' are supported in order to make the diff --git a/src/api/src/backend/views/Tasks.py b/src/api/src/backend/views/Tasks.py index 3fd6f56a..088fbc3f 100644 --- a/src/api/src/backend/views/Tasks.py +++ b/src/api/src/backend/views/Tasks.py @@ -7,6 +7,7 @@ from backend.views.http.responses import BaseResponse, ResourceURLResponse from backend.views.http.responses.errors import BadRequest, Forbidden, NotFound, MethodNotAllowed, ServerError from backend.views.http.responses.models import ModelListResponse, ModelResponse +from backend.views.http.responses import BaseResponse from backend.services.TaskService import service as task_service from backend.services.GroupService import service as group_service from backend.serializers import TaskSerializer, TaskDTOSerializer @@ -17,44 +18,56 @@ class Tasks(RestrictedAPIView): def get(self, request, group_id, pipeline_id, task_id=None): - # Get the group - group = group_service.get(group_id, request.tenant_id) - if group == None: - return NotFound(f"No group found with id '{group_id}'") - - # Check that the user belongs to the group - if not group_service.user_in_group(request.username, group_id, request.tenant_id): - return Forbidden(message="You do not have access to this group") + try: + # Get the group + group = group_service.get(group_id, request.tenant_id) + if group == None: + return NotFound(f"No group found with id '{group_id}'") - # Get the pipline - pipeline = Pipeline.objects.filter( - group=group, - id=pipeline_id - ).first() + # Check that the user belongs to the group + if not group_service.user_in_group(request.username, group_id, request.tenant_id): + return Forbidden(message="You do not have access to this group") - # Return if BadRequest if no pipeline found - if pipeline == None: - return BadRequest(f"Pipline with id '{pipeline_id}' does not exist") - - if task_id == None: - return self.list(pipeline) + # Get the pipline + pipeline = Pipeline.objects.filter( + group=group, + id=pipeline_id + ).first() - # Check that the user belongs to the group that is attached - # to this pipline - if not group_service.user_in_group(request.username, group_id, request.tenant_id): - return Forbidden(message="You cannot view tasks for this pipeline") + # Return if BadRequest if no pipeline found + if pipeline == None: + return BadRequest(f"Pipline with id '{pipeline_id}' does not exist") + + if task_id == None: + return self.list(pipeline) - task = Task.objects.filter(pipeline=pipeline, id=task_id).first() + # Check that the user belongs to the group that is attached + # to this pipline + if not group_service.user_in_group(request.username, group_id, request.tenant_id): + return Forbidden(message="You cannot view tasks for this pipeline") - if task == None: - return NotFound(f"Task with id '{task_id}' does not exists for pipeline '{pipeline_id}'") + task = Task.objects.filter(pipeline=pipeline, id=task_id).first() - return ModelResponse(task) + if task == None: + return NotFound(f"Task with id '{task_id}' does not exists for pipeline '{pipeline_id}'") + + return ModelResponse(task) + except Exception as e: + logger.exception(e.__cuase__) + return ServerError(str(e)) + def list(self, pipeline, *_, **__): + tasks = [] + try: + task_models = Task.objects.filter(pipeline=pipeline) + for task_model in task_models: + tasks.append(TaskSerializer.serialize(task_model)) + + return BaseResponse(result=tasks) + except Exception as e: + logger.exception(e.__cause__) + return ServerError(f"{e}") - def list(self, pipeline, *_, **__): - return ModelListResponse(Task.objects.filter(pipeline=pipeline)) - def post(self, request, group_id, pipeline_id, *_, **__): # Validate the request body if "type" not in self.request_body: @@ -101,6 +114,7 @@ def post(self, request, group_id, pipeline_id, *_, **__): except (IntegrityError, OperationalError) as e: return BadRequest(message=e.__cause__) except Exception as e: + logger.exception(e.__cause__) return ServerError(f"{e}") return ResourceURLResponse( @@ -183,6 +197,7 @@ def delete(self, request, group_id, pipeline_id, task_id): try: task.delete() except Exception as e: + logger.exception(e.__cause__) return ServerError(str(e)) return BaseResponse(message=f"Deleted task '{task_id}' on pipeline '{pipeline_id}") diff --git a/src/api/src/backend/views/http/requests.py b/src/api/src/backend/views/http/requests.py index 68ad3816..9e0672af 100644 --- a/src/api/src/backend/views/http/requests.py +++ b/src/api/src/backend/views/http/requests.py @@ -835,7 +835,6 @@ class WorkflowSubmissionRequest(BaseModel): archives: List[Archive] = [] env: Env = {} args: Args = {} - group: Group pipeline: Pipeline pipeline_run: PipelineRun meta: WorkflowSubmissionRequestMeta diff --git a/src/engine/src/contrib/tapis/middleware/event_handlers/notifications/TapisWorkflowsAPIBackend.py b/src/engine/src/contrib/tapis/middleware/event_handlers/notifications/TapisWorkflowsAPIBackend.py index 638408dd..7c58cc29 100644 --- a/src/engine/src/contrib/tapis/middleware/event_handlers/notifications/TapisWorkflowsAPIBackend.py +++ b/src/engine/src/contrib/tapis/middleware/event_handlers/notifications/TapisWorkflowsAPIBackend.py @@ -233,7 +233,7 @@ def _tail_output(self, task, filename, flag="rb", max_bytes=10000): with open(f"{task.output_dir}{filename.lstrip('/')}", flag) as file: file.seek(offset, os.SEEK_END) - return str(file.read()) + return file.read().decode("utf-8") def _tail_stdout(self, task): return self._tail_output(task, STDOUT) diff --git a/src/engine/src/core/tasks/executors/Application.py b/src/engine/src/core/tasks/executors/Application.py index 65dd7eb4..7cd2cda6 100644 --- a/src/engine/src/core/tasks/executors/Application.py +++ b/src/engine/src/core/tasks/executors/Application.py @@ -43,6 +43,7 @@ def execute(self): # Pod template and pod template spec template = client.V1PodTemplateSpec( spec=client.V1PodSpec( + automount_service_account_token=False, containers=[container], restart_policy="Never", volumes=volumes @@ -50,9 +51,9 @@ def execute(self): ) # Job spec - self.task.max_retries = 0 if self.task.max_retries < 0 else self.task.max_retries + self.task.execution_profile.max_retries = 0 if self.task.execution_profile.max_retries < 0 else self.task.execution_profile.max_retries job_spec = client.V1PodSpec( - backoff_limit=(self.task.max_retries), template=template) + backoff_limit=(self.task.execution_profile.max_retries), template=template) # Job metadata metadata = client.V1ObjectMeta( diff --git a/src/engine/src/core/tasks/executors/Function.py b/src/engine/src/core/tasks/executors/Function.py index 2a70dd58..318883ab 100644 --- a/src/engine/src/core/tasks/executors/Function.py +++ b/src/engine/src/core/tasks/executors/Function.py @@ -73,9 +73,10 @@ def execute(self): namespace=KUBERNETES_NAMESPACE, ), spec=client.V1JobSpec( - backoff_limit=0 if self.task.max_retries < 0 else self.task.max_retries, + backoff_limit=0 if self.task.execution_profile.max_retries < 0 else self.task.execution_profile.max_retries, template=client.V1PodTemplateSpec( spec=client.V1PodSpec( + automount_service_account_token=False, containers=[ client.V1Container( name=job_name, diff --git a/src/engine/src/core/tasks/executors/builders/kaniko/Kaniko.py b/src/engine/src/core/tasks/executors/builders/kaniko/Kaniko.py index cb26201e..b17f51f1 100644 --- a/src/engine/src/core/tasks/executors/builders/kaniko/Kaniko.py +++ b/src/engine/src/core/tasks/executors/builders/kaniko/Kaniko.py @@ -164,14 +164,17 @@ def _create_job(self): # Pod template and pod template spec template = client.V1PodTemplateSpec( spec=client.V1PodSpec( - containers=[container], restart_policy="Never", volumes=volumes + automount_service_account_token=False, + containers=[container], + restart_policy="Never", + volumes=volumes ) ) # Job spec - self.task.max_retries = 0 if self.task.max_retries < 0 else self.task.max_retries + self.task.execution_profile.max_retries = 0 if self.task.execution_profile.max_retries < 0 else self.task.execution_profile.max_retries job_spec = client.V1JobSpec( - backoff_limit=(self.task.max_retries), template=template) + backoff_limit=(self.task.execution_profile.max_retries), template=template) # Job metadata metadata = client.V1ObjectMeta( diff --git a/src/engine/src/core/tasks/executors/builders/singularity/Singularity.py b/src/engine/src/core/tasks/executors/builders/singularity/Singularity.py index 8161e363..b7d72e16 100644 --- a/src/engine/src/core/tasks/executors/builders/singularity/Singularity.py +++ b/src/engine/src/core/tasks/executors/builders/singularity/Singularity.py @@ -123,6 +123,7 @@ def _create_job(self): # Pod template and pod template spec template = V1PodTemplateSpec( spec=V1PodSpec( + automount_service_account_token=False, containers=[container], restart_policy="Never", volumes=volumes @@ -131,13 +132,13 @@ def _create_job(self): # Job spec v1jobspec_props = {} - if self.task.max_exec_time > 0: - v1jobspec_props["active_deadline_seconds"] = self.task.max_exec_time + if self.task.execution_profile.max_exec_time > 0: + v1jobspec_props["active_deadline_seconds"] = self.task.execution_profile.max_exec_time - self.task.max_retries = 0 if self.task.max_retries < 0 else self.task.max_retries + self.task.execution_profile.max_retries = 0 if self.task.execution_profile.max_retries < 0 else self.task.execution_profile.max_retries job_spec = V1JobSpec( **v1jobspec_props, - backoff_limit=self.task.max_retries, + backoff_limit=self.task.execution_profile.max_retries, template=template ) diff --git a/src/engine/src/core/workflows/WorkflowExecutor.py b/src/engine/src/core/workflows/WorkflowExecutor.py index a3390729..856e66ae 100644 --- a/src/engine/src/core/workflows/WorkflowExecutor.py +++ b/src/engine/src/core/workflows/WorkflowExecutor.py @@ -139,8 +139,8 @@ def __init__(self, _id=None, plugins=[]): self._set_initial_state() # Logging formatters. Makes logs more useful and pretty - def p_str(self, status): return f"{lbuf('[PIPELINE]')} {self.state.ctx.idempotency_key} {status} {self.state.ctx.pipeline.id}" - def t_str(self, task, status): return f"{lbuf('[TASK]')} {self.state.ctx.idempotency_key} {status} {self.state.ctx.pipeline.id}.{task.id}" + def p_log(self, status): return f"{lbuf('[PIPELINE]')} {self.state.ctx.idempotency_key} {status} {self.state.ctx.pipeline.id}" + def t_log(self, task, status): return f"{lbuf('[TASK]')} {self.state.ctx.idempotency_key} {status} {self.state.ctx.pipeline.id}.{task.id}" @interruptable() def start(self, ctx, threads): @@ -161,14 +161,14 @@ def start(self, ctx, threads): ) if not validated: - self._on_pipeline_terminal_state(event=PIPELINE_FAILED, message=err) + self._handle_pipeline_terminal_state(event=PIPELINE_FAILED, message=err) return # Get the first tasks unstarted_threads = self._fetch_ready_tasks() # Log the pipeline status change - self.state.ctx.logger.info(self.p_str("ACTIVE")) + self.state.ctx.logger.info(self.p_log("ACTIVE")) # Publish the active event self.publish(Event(PIPELINE_ACTIVE, self.state.ctx)) @@ -177,7 +177,7 @@ def start(self, ctx, threads): self.state.ready_tasks += unstarted_threads except Exception as e: # Trigger the terminal state callback. - self._on_pipeline_terminal_state(event=PIPELINE_FAILED, message=str(e)) + self._handle_pipeline_terminal_state(event=PIPELINE_FAILED, message=str(e)) @interruptable() def _staging(self, ctx): @@ -202,7 +202,7 @@ def _staging(self, ctx): # Prepare task objects and create the directory structure for task output and execution self._prepare_tasks() - self.state.ctx.logger.info(f'{self.p_str("STAGING")} {self.state.ctx.pipeline_run.uuid}') + self.state.ctx.logger.info(f'{self.p_log("STAGING")} {self.state.ctx.pipeline_run.uuid}') # Notification handlers are used to relay/persist updates about a pipeline run # and its tasks to some external resource @@ -216,9 +216,9 @@ def _staging(self, ctx): try: self._set_tasks(self.state.ctx.pipeline.tasks) except InvalidDependenciesError as e: - self._on_pipeline_terminal_state(PIPELINE_FAILED, message=str(e)) + self._handle_pipeline_terminal_state(PIPELINE_FAILED, message=str(e)) except Exception as e: - self._on_pipeline_terminal_state(PIPELINE_FAILED, message=str(e)) + self._handle_pipeline_terminal_state(PIPELINE_FAILED, message=str(e)) @interruptable() def _prepare_tasks(self): @@ -228,7 +228,7 @@ def _prepare_tasks(self): work detailed in the task definition.""" self.state.ctx.output = {} for task in self.state.ctx.pipeline.tasks: - self.state.ctx.logger.info(self.t_str(task, "STAGING")) + self.state.ctx.logger.info(self.t_log(task, "STAGING")) # Publish the task active event self.publish(Event(TASK_STAGING, self.state.ctx, task=task)) @@ -269,7 +269,7 @@ def _prepare_tasks(self): task = template_mapper.map(task, task.uses) except Exception as e: # Trigger the terminal state callback. - self._on_pipeline_terminal_state(event=PIPELINE_FAILED, message=str(e)) + self._handle_pipeline_terminal_state(event=PIPELINE_FAILED, message=str(e)) # Add a key to the output for the task self.state.ctx.output[task.id] = None @@ -323,9 +323,8 @@ def _start_task(self, task): expression_error = True task_result = TaskResult(1, errors=[str(e)]) - # Execute the task + # Stage task inputs if not skip and not expression_error: - # Stage task inputs task_input_file_staging_service = self.container.load( "TaskInputFileStagingService" ) @@ -333,11 +332,11 @@ def _start_task(self, task): task_input_file_staging_service.stage(task) except TaskInputStagingError as e: # Get the next queued tasks if any - unstarted_threads = self._on_task_terminal_state( + unstarted_threads = self._handle_task_terminal_state( task, TaskResult(1, errors=[str(e)]) ) - self.state.ctx.logger.info(self.t_str(task, "FAILED")) + self.state.ctx.logger.info(self.t_log(task, "FAILED")) self.publish(Event(TASK_FAILED, self.state.ctx, task=task)) # NOTE Triggers hook _on_change_ready_task @@ -345,7 +344,7 @@ def _start_task(self, task): return # Log the task status - self.state.ctx.logger.info(self.t_str(task, "ACTIVE")) + self.state.ctx.logger.info(self.t_log(task, "ACTIVE")) # Publish the task active event self.publish(Event(TASK_ACTIVE, self.state.ctx, task=task)) @@ -356,8 +355,8 @@ def _start_task(self, task): self.state.ctx.pipeline_run.uuid, task ) - - # Run the task executor and get the task result + + # Execute the task task_result = executor.execute() # Set the output of the task @@ -373,21 +372,21 @@ def _start_task(self, task): task_result = TaskResult(1, errors=[str(e)]) # Get the next queued tasks if any - unstarted_threads = self._on_task_terminal_state(task, task_result) + unstarted_threads = self._handle_task_terminal_state(task, task_result) # NOTE Triggers hook _on_change_ready_task self.state.ready_tasks += unstarted_threads @interruptable() - def _on_task_terminal_state(self, task, task_result): + def _handle_task_terminal_state(self, task, task_result): # Determine the correct callback to use. - callback = self._on_task_completed + callback = self._handle_task_completed if task_result.skipped: - callback = self._on_task_skipped + callback = self._handle_task_skipped if not task_result.success and not task_result.skipped: - callback = self._on_task_failed + callback = self._handle_task_failed # Call the callback. Marks task as completed or failed. # Also publishes a TASK_COMPLETED or TASK_FAILED based on the result @@ -400,20 +399,20 @@ def _on_task_terminal_state(self, task, task_result): # during the initialization and execution of the task executor self._deregister_executor(self.state.ctx.pipeline_run.uuid, task) - # Run the on_pipeline_terminal_state callback if all tasks are complete. + # Run the handle_pipeline_terminal_state callback if all tasks are complete. if len(self.state.tasks) == len(self.state.finished): - self._on_pipeline_terminal_state(event=PIPELINE_COMPLETED) + self._handle_pipeline_terminal_state(event=PIPELINE_COMPLETED) return [] if task_result.status > 0 and task.can_fail == False: - self._on_pipeline_terminal_state(event=PIPELINE_FAILED) + self._handle_pipeline_terminal_state(event=PIPELINE_FAILED) return [] # Execute all possible queued tasks return self._fetch_ready_tasks() @interruptable() - def _on_pipeline_terminal_state(self, event=None, message=""): + def _handle_pipeline_terminal_state(self, event=None, message=""): # No event was provided. Determine if complete or failed from number # of failed tasks if event == None: @@ -433,7 +432,7 @@ def _on_pipeline_terminal_state(self, event=None, message=""): # - pipeline.execution_profile.success_condidition = ALL_TASKS_MUST_COMPLETE # - pipeline.execution_profile.success_condidition = ALL_TASKS_MUST_FAIL - self.state.ctx.logger.info(self.p_str(msg)) + self.state.ctx.logger.info(self.p_log(msg)) # Publish the event. Triggers the archivers if there are any on ...COMPLETE self.publish(Event(event, self.state.ctx)) @@ -443,9 +442,9 @@ def _on_pipeline_terminal_state(self, event=None, message=""): self._set_initial_state() @interruptable() - def _on_task_completed(self, task, task_result): + def _handle_task_completed(self, task, task_result): # Log the completion - self.state.ctx.logger.info(self.t_str(task, "COMPLETED")) + self.state.ctx.logger.info(self.t_log(task, "COMPLETED")) # Notify the subscribers that the task was completed self.publish(Event(TASK_COMPLETED, self.state.ctx, task=task, result=task_result)) @@ -456,9 +455,9 @@ def _on_task_completed(self, task, task_result): self.state.succeeded.append(task.id) @interruptable() - def _on_task_skipped(self, task, _): + def _handle_task_skipped(self, task, _): # Log the task active - self.state.ctx.logger.info(self.t_str(task, "SKIPPED")) + self.state.ctx.logger.info(self.t_log(task, "SKIPPED")) # Publish the task active event self.publish(Event(TASK_SKIPPED, self.state.ctx, task=task)) @@ -468,9 +467,9 @@ def _on_task_skipped(self, task, _): self.state.skipped.append(task.id) @interruptable() - def _on_task_failed(self, task, task_result): + def _handle_task_failed(self, task, task_result): # Log the failure - self.state.ctx.logger.info(self.t_str(task, f"FAILED: {task_result.errors}")) + self.state.ctx.logger.info(self.t_log(task, f"FAILED: {task_result.errors}")) # Notify the subscribers that the task was completed self.publish(Event(TASK_FAILED, self.state.ctx, task=task, result=task_result)) @@ -576,7 +575,7 @@ def _prepare_pipeline(self): def _prepare_pipeline_fs(self): """Creates all of the directories necessary to run the pipeline, store temp files, and cache data""" - server_logger.debug(self.p_str("PREPARING FILESYSTEM")) + server_logger.debug(self.p_log("PREPARING FILESYSTEM")) # Set the directories # References to paths on the nfs server @@ -705,7 +704,7 @@ def _deregister_executor(self, run_uuid, task): executor.cleanup() del self.state.executors[f"{run_uuid}.{task.id}"] # TODO use server logger below - # self.state.ctx.logger.debug(self.t_str(task, "EXECUTOR DEREGISTERED")) + # self.state.ctx.logger.debug(self.t_log(task, "EXECUTOR DEREGISTERED")) @interruptable() def _get_executor(self, run_uuid, task): @@ -745,7 +744,7 @@ def _setup_loggers(self): @interruptable(rollback="_reset_event_exchange") def _initialize_notification_handlers(self): - self.state.ctx.logger.debug(self.p_str("INITIALIZING NOTIFICATION HANDLERS")) + self.state.ctx.logger.debug(self.p_log("INITIALIZING NOTIFICATION HANDLERS")) # Initialize the notification event handlers from plugins. Notification event handlers are used to persist updates about the # pipeline and its tasks for plugin in self._plugins: @@ -768,7 +767,7 @@ def _initialize_archivers(self): # No archivers specified. Return if len(self.state.ctx.archives) < 1: return - self.state.ctx.logger.debug(self.p_str("INITIALIZING ARCHIVERS")) + self.state.ctx.logger.debug(self.p_log("INITIALIZING ARCHIVERS")) # TODO Handle for multiple archives self.state.ctx.archive = self.state.ctx.archives[0] @@ -797,7 +796,7 @@ def _initialize_archivers(self): ), None) if middleware == None: - self.state.ctx.logger.error(self.p_str(f"FAILED TO INITIALIZE ARCHIVER: No Archive Middleware found with type {self.state.ctx.archive.type}")) + self.state.ctx.logger.error(self.p_log(f"FAILED TO INITIALIZE ARCHIVER: No Archive Middleware found with type {self.state.ctx.archive.type}")) return try: @@ -847,7 +846,7 @@ def _on_change_state(self, state): return # Log the terminating status - self.state.ctx.logger.info(self.p_str("TERMINATING")) + self.state.ctx.logger.info(self.p_log("TERMINATING")) # Publish the termination event self.publish(Event(PIPELINE_TERMINATED, self.state.ctx)) diff --git a/src/engine/src/owe_python_sdk/TaskExecutor.py b/src/engine/src/owe_python_sdk/TaskExecutor.py index d06e7c85..2c812cfb 100644 --- a/src/engine/src/owe_python_sdk/TaskExecutor.py +++ b/src/engine/src/owe_python_sdk/TaskExecutor.py @@ -27,7 +27,6 @@ def __init__(self, task, ctx, exchange: EventExchange, plugins=[]): self.ctx = ctx self.task = task self.pipeline = self.ctx.pipeline - self.group = self.ctx.group self.directives = self.ctx.directives self.polling_interval = DEFAULT_POLLING_INTERVAL self._resources: list[Resource] = [] @@ -45,7 +44,7 @@ def __init__(self, task, ctx, exchange: EventExchange, plugins=[]): def _set_polling_interval(self, task): # Default is already the DEFAULT_POLLING_INTERVAL - if task.max_exec_time <= 0: return + if task.execution_profile.max_exec_time <= 0: return # TODO Replace line below. # Calculate the interval based on the max_exec_time of the task @@ -69,10 +68,10 @@ def _set_output(self, filename, value, flag="wb"): with open(f"{self.task.output_dir}{filename.lstrip('/')}", flag) as file: file.write(value) - def _stdout(self, value, flag="wb"): + def _stdout(self, value, flag="w"): self._set_output(STDOUT, value, flag=flag) - def _stderr(self, value, flag="wb"): + def _stderr(self, value, flag="w"): self._set_output(STDERR, value, flag=flag) def _get_task_output_files(self): diff --git a/src/engine/src/owe_python_sdk/schema.py b/src/engine/src/owe_python_sdk/schema.py index 68ad3816..9e0672af 100644 --- a/src/engine/src/owe_python_sdk/schema.py +++ b/src/engine/src/owe_python_sdk/schema.py @@ -835,7 +835,6 @@ class WorkflowSubmissionRequest(BaseModel): archives: List[Archive] = [] env: Env = {} args: Args = {} - group: Group pipeline: Pipeline pipeline_run: PipelineRun meta: WorkflowSubmissionRequestMeta