Skip to content

Commit

Permalink
Merge branch 'dev' into staging
Browse files Browse the repository at this point in the history
  • Loading branch information
nathandf committed Sep 6, 2024
2 parents e8bf5b5 + d02b932 commit ceb147a
Show file tree
Hide file tree
Showing 27 changed files with 480 additions and 230 deletions.
106 changes: 72 additions & 34 deletions src/api/specs/WorkflowsAPI.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2517,6 +2517,78 @@ components:
destination:
$ref: '#/components/schemas/Destination'

Context:
type: object
required:
- type
properties:
branch:
type: string
credentials:
default: null
build_file_path:
type: string
sub_path:
type: string
type:
$ref: "#/components/schemas/EnumContextType"
url:
type: string
visibility:
$ref: "#/components/schemas/EnumContextVisibility"


Destination:
anyOf:
- $ref: '#/components/schemas/DockerhubDestination'
- $ref: '#/components/schemas/LocalDestination'
discriminator:
propertyName: type
mapping:
dockerhub: "#/components/schemas/DockerhubDestination"
local: "#/components/schemas/LocalDestination"

BaseDestination:
type: object
required:
- type
properties:
type:
$ref: '#/components/schemas/EnumDestinationType'

LocalDestination:
allOf:
- $ref: "#/components/schemas/BaseDestination"
- type: object
properties:
filename:
type: string

RegistryDestination:
allOf:
- $ref: "#/components/schemas/BaseDestination"
- type: object
properties:
credentials:
$ref: '#/components/schemas/DockerhubCred'
tag:
type: string
url:
type: string

DockerhubDestination:
allOf:
- $ref: '#/components/schemas/BaseDestination'
- $ref: '#/components/schemas/RegistryDestination'

DockerhubCred:
type: object
properties:
token:
type: string
username:
type: string

RequestTask:
allOf:
- $ref: '#/components/schemas/BaseTask'
Expand Down Expand Up @@ -2611,40 +2683,6 @@ components:
type: boolean
can_skip:
type: boolean

# --- Context ---
Context:
type: object
required:
- type
properties:
branch:
type: string
credentials:
default: null
build_file_path:
type: string
sub_path:
type: string
type:
$ref: "#/components/schemas/EnumContextType"
url:
type: string
visibility:
$ref: "#/components/schemas/EnumContextVisibility"

# --- Destination ---
Destination:
type: object
properties:
credentials:
default: null
tag:
type: string
type:
$ref: '#/components/schemas/EnumDestinationType'
url:
type: string

# --- Group and GroupUser ---
GroupDetail:
Expand Down
105 changes: 32 additions & 73 deletions src/api/src/backend/helpers/PipelineDispatchRequestBuilder.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

from backend.utils.parse_directives import parse_directives as parse
from backend.conf.constants import WORKFLOW_EXECUTOR_ACCESS_TOKEN
from backend.serializers import TaskSerializer, PipelineSerializer


class PipelineDispatchRequestBuilder:
Expand All @@ -27,45 +28,31 @@ def build(
# Convert tasks' schema to the schema expected by the workflow engine
tasks_request = []
for task in tasks:
# Convert the task to a dict and add the execution profile property
# for all tasks.
preprocessed_task = self._preprocess_task(task)
# Convert the task to a dict
serialized_task = TaskSerializer.serialize(task)

# Handle the task-specific schema conversions
task_request = getattr(self, f"_{task.type}")(preprocessed_task, task)
task_request = getattr(self, f"_{task.type}")(serialized_task, task)
tasks_request.append(task_request)

# Get the archives for this pipeline
archives = []
pipeline_archives = pipeline.archives.all()
for pipeline_archive in pipeline_archives:
# Fetch any credentials or identities for required to
# access this archive
# TODO Handle creds/identity for archives
archives.append(model_to_dict(pipeline_archive.archive))

# Convert pipleline to a dict and build the request
# Serialize the pipeline and it's tasks
request = {}

request["group"] = model_to_dict(group)
request["pipeline"] = model_to_dict(pipeline)
request["archives"] = archives

# Move the execution profile props from the pipeline object to the
# execution profile property
request["pipeline"]["execution_profile"] = {
"max_exec_time": request["pipeline"]["max_exec_time"],
"duplicate_submission_policy": request["pipeline"]["duplicate_submission_policy"],
"max_retries": request["pipeline"]["max_retries"],
"invocation_mode": request["pipeline"]["invocation_mode"],
"retry_policy": request["pipeline"]["retry_policy"]
}

request["pipeline"] = PipelineSerializer.serialize(pipeline)
request["pipeline"]["tasks"] = tasks_request

# Populate the env for this request. Populate values from SK
# Populate the env for this request.
request["env"] = request["pipeline"]["env"]

# Serialize the archives
request["archives"] = []
pipeline_archives = pipeline.archives.all()
for pipeline_archive in pipeline_archives:
# Fetch any credentials or identities for required to
# access this archive
# TODO Handle creds/identity for archives
request["archives"].append(model_to_dict(pipeline_archive.archive))

req_args = {}
for key in args:
req_args[key] = args[key].dict()
Expand All @@ -76,22 +63,26 @@ def build(
"value": WORKFLOW_EXECUTOR_ACCESS_TOKEN
},
"tapis_tenant_id": {
"value": request["group"]["tenant_id"]
"value": group.tenant_id
},
"tapis_pipeline_owner": {
"value": request["pipeline"]["owner"]
},
"tapis_workflows_group_id": {
"value": group.id
},
**req_args
}

request["meta"] = {}

# Properties to help uniquely identity a pipeline submission. If the workflow
# executor is currently running a pipeline with the same values as the
# properties provided in "idempotency_key", the workflow executor
# will then take the appropriate action dictated by the
# pipeline.duplicate_submission_policy (allow, allow_terminate, deny)
request["meta"]["idempotency_key"] = [
"group.id",
"args.tapis_workflows_group_id",
"args.tapis_tenant_id",
"pipeline.id"
]
Expand All @@ -106,36 +97,27 @@ def build(
request["pipeline_run"]["name"] = name or uuid
request["pipeline_run"]["description"] = description

# Parse the directives from the commit message
directives_request = {}
if commit != None:
directives_request = parse(commit)
# # Parse the directives from the commit message
# directives_request = {}
# if commit != None:
# directives_request = parse(commit)

if directives != None and len(directives) > 0:
directive_str = f"[{'|'.join([d for d in directives])}]"
directives_request = parse(directive_str)
# if directives != None and len(directives) > 0:
# directive_str = f"[{'|'.join([d for d in directives])}]"
# directives_request = parse(directive_str)

request["directives"] = directives_request
# request["directives"] = directives_request

request["directives"] = {}

return request

def _image_build(self, task_request, task):
task_request["context"] = model_to_dict(task.context)

# Resolve which context credentials to use if any provided
context_creds = None
if task.context.credentials != None:
context_creds = task.context.credentials

# Identity takes precedence over credentials placed directly in
# the context
if task.context.identity != None:
context_creds = task.context.identity.credentials

task_request["context"]["credentials"] = None
if context_creds != None:
task_request["context"]["credentials"] = model_to_dict(context_creds)

# Get the context credentials data
context_cred_data = self.secret_service.get_secret(context_creds.sk_id)
task_request["context"]["credentials"] = context_cred_data
Expand All @@ -146,18 +128,11 @@ def _image_build(self, task_request, task):
del task_request["context"]["recipe_file_path"]

# Destination credentials
task_request["destination"] = model_to_dict(task.destination)

destination_creds = None
if task.destination.credentials != None:
destination_creds = task.destination.credentials

if task.destination.identity != None:
destination_creds = task.destination.identity.credentials

if destination_creds != None:
task_request["destination"]["credentials"] = model_to_dict(destination_creds)

# Get the context credentials data
destination_cred_data = self.secret_service.get_secret(destination_creds.sk_id)
task_request["destination"]["credentials"] = destination_cred_data
Expand All @@ -183,20 +158,4 @@ def _function(self, task_request, _):
return task_request

def _container_run(self, task_request, _):
return task_request

def _preprocess_task(self, task):
# Convert to dict
task_request = model_to_dict(task)

# Map task data model properties to the workflow engine expected schema for
# the execution profile
task_request["execution_profile"] = {
"flavor": task_request["flavor"],
"max_exec_time": task_request["max_exec_time"],
"max_retries": task_request["max_retries"],
"invocation_mode": task_request["invocation_mode"],
"retry_policy": task_request["retry_policy"]
}

return task_request
11 changes: 11 additions & 0 deletions src/api/src/backend/serializers/ApplicationTaskSerializer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from backend.serializers import BaseTaskSerializer


class ApplicationTaskSerializer:
@staticmethod
def serialize(model, base=None):
task = base if base != None else BaseTaskSerializer.serialize(model)

task["image"] = model.image

return task
1 change: 1 addition & 0 deletions src/api/src/backend/serializers/BaseTaskSerializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ def serialize(model):

# Build execution profile
task["execution_profile"] = {
"flavor": model.flavor,
"invocation_mode": model.invocation_mode,
"max_exec_time": model.max_exec_time,
"max_retries": model.max_retries,
Expand Down
21 changes: 21 additions & 0 deletions src/api/src/backend/serializers/ContextSerializer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from backend.serializers.CredentialsSerializer import CredentialsSerializer
from backend.serializers.UUIDSerializer import UUIDSerializer


class ContextSerializer:
@staticmethod
def serialize(model):
if model == None:
return None
context = {}
context["branch"] = model.branch
context["credentials"] = CredentialsSerializer.serialize(model.credentials)
context["recipe_file_path"] = model.recipe_file_path
context["sub_path"] = model.sub_path
context["tag"] = model.tag
context["type"] = model.type
context["url"] = model.url
context["visibility"] = model.visibility
context["uuid"] = UUIDSerializer.serialize(model.uuid)

return context
17 changes: 17 additions & 0 deletions src/api/src/backend/serializers/CredentialsSerializer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from backend.serializers.UUIDSerializer import UUIDSerializer


class CredentialsSerializer:
@staticmethod
def serialize(model):
if model == None:
return None

credential = {}
credential["sk_id"] = model.sk_id
credential["owner"] = model.owner
credential["created_at"] = model.created_at
credential["uuid"] = UUIDSerializer.serialize(model.uuid)


return credential
18 changes: 18 additions & 0 deletions src/api/src/backend/serializers/DestinationSerializer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from backend.serializers.CredentialsSerializer import CredentialsSerializer
from backend.serializers.UUIDSerializer import UUIDSerializer


class DestinationSerializer:
@staticmethod
def serialize(model):
if model == None:
return None
destination = {}
destination["tag"] = model.tag
destination["type"] = model.type
destination["url"] = model.url
destination["filename"] = model.filename
destination["credentials"] = CredentialsSerializer.serialize(model.credentials)
destination["uuid"] = UUIDSerializer.serialize(model.uuid)

return destination
Loading

0 comments on commit ceb147a

Please sign in to comment.