Skip to content

Commit

Permalink
Update and sync schemas
Browse files Browse the repository at this point in the history
  • Loading branch information
nathandf committed Oct 9, 2023
1 parent 11a40e2 commit 21d2e25
Show file tree
Hide file tree
Showing 7 changed files with 242 additions and 15 deletions.
2 changes: 2 additions & 0 deletions src/api/src/backend/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from backend.views.UpdatePipelineRunStatus import UpdatePipelineRunStatus
from backend.views.UpdateTaskExecutionStatus import UpdateTaskExecutionStatus
from backend.views.CreateTaskExecution import CreateTaskExecution
from backend.views.ETLPipelines import ETLPipelines


urlpatterns = [
Expand All @@ -47,6 +48,7 @@

# Pipelines
path("groups/<str:group_id>/ci", Pipelines.as_view(), name="ci"),
path("groups/<str:group_id>/etl", ETLPipelines.as_view(), name="etl"),
path("groups/<str:group_id>/pipelines", Pipelines.as_view(), name="pipelines"),
path("groups/<str:group_id>/pipelines/<str:pipeline_id>", Pipelines.as_view(), name="pipeline"),
path("groups/<str:group_id>/pipelines/<str:pipeline_id>/owner/<str:username>", ChangePipelineOwner.as_view(), name="changePipelineOwner"),
Expand Down
200 changes: 200 additions & 0 deletions src/api/src/backend/views/ETLPipelines.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
from typing import List
from pydantic import ValidationError
from django.db import DatabaseError, IntegrityError, OperationalError
from django.forms import model_to_dict

from backend.views.RestrictedAPIView import RestrictedAPIView
from backend.views.http.responses.errors import (
Conflict,
BadRequest,
NotFound,
Forbidden,
ServerError as ServerErrorResp
)
from backend.views.http.responses.models import ModelListResponse
from backend.views.http.responses import BaseResponse, ResourceURLResponse
from backend.views.http.requests import WorkflowPipeline, CIPipeline, ImageBuildTask
from backend.views.http.etl import TapisETLPipeline
from backend.models import (
Pipeline,
Archive,
PipelineArchive,
TASK_TYPE_IMAGE_BUILD
)
from backend.services.TaskService import service as task_service
from backend.services.GroupService import service as group_service
from backend.errors.api import BadRequestError, ServerError
from backend.helpers import resource_url_builder


class ETLPipelines(RestrictedAPIView):
def post(self, request, group_id, *_, **__):
"""ETL Pipelines"""
# Get the group
group = group_service.get(group_id, request.tenant_id)
if group == None:
return NotFound(f"No group found with id '{group_id}'")

# Check that the user belongs to the group
if not group_service.user_in_group(request.username, group_id, request.tenant_id):
return Forbidden(message="You do not have access to this group")

# Validate the request body based on the type of pipeline specified
prepared_request = self.prepare(TapisETLPipeline)

# Return the failure view instance if validation failed
if not prepared_request.is_valid:
return prepared_request.failure_view

# Get the JSON encoded body from the validation result
body = prepared_request.body

# Check that the id of the pipeline is unique
if Pipeline.objects.filter(id=body.id, group=group).exists():
return Conflict(f"A Pipeline already exists with the id '{body.id}'")

# Create the pipeline
try:
pipeline = Pipeline.objects.create(
id=body.id,
group=group,
owner=request.username,
max_exec_time=body.execution_profile.max_exec_time,
invocation_mode=body.execution_profile.invocation_mode,
max_retries=body.execution_profile.max_retries,
retry_policy=body.execution_profile.retry_policy,
duplicate_submission_policy=body.execution_profile.duplicate_submission_policy,
env=body.dict()["env"],
params=body.dict()["params"]
)
except (IntegrityError, OperationalError) as e:
return BadRequest(message=e.__cause__)
except Exception as e:
return ServerErrorResp(f"{e}")

# Fetch the archives specified in the request then create relations
# between them and the pipline
pipeline_archives = []
try:
# Prevent duplicate pipeline archives by casting id array to 'set'
for archive_id in set(body.archive_ids):
# Fetch the archive object
archive = Archive.objects.filter(group=group, id=archive_id).first()

# Return bad request if archive not found
if archive == None:
pipeline.delete()
return BadRequest(message=f"Archive not found with an id of '{archive_id}' and group_id '{group.id}'")

pipeline_archives.append(
PipelineArchive.objects.create(
pipeline=pipeline,
archive=archive
)
)
except (IntegrityError, OperationalError, DatabaseError) as e:
# Delete the pipeline
pipeline.delete()

# Delete the pipeline archive relationships that were just created
[pipeline_archive.delete() for pipeline_archive in pipeline_archives]
return BadRequest(message=e.__cause__)

# Fetch the function for building the pipeline according to its type.
fn = getattr(self, PIPELINE_TYPE_MAPPING[body.type])

return fn(request, body, pipeline)

def delete(self, request, group_id, pipeline_id, *_, **__):
# Get the group
group = group_service.get(group_id, request.tenant_id)
if group == None:
return NotFound(f"No group found with id '{group_id}'")

# Check that the user belongs to the group
if not group_service.user_in_group(request.username, group_id, request.tenant_id):
return Forbidden(message="You do not have access to this group")

# Get the pipeline by the id provided in the path params
pipeline = Pipeline.objects.filter(
id=pipeline_id,
group=group
).prefetch_related("tasks").first()


if pipeline == None:
return NotFound(f"Pipeline not found with id '{pipeline_id}'")

# Delete operation only allowed by owner
if request.username != pipeline.owner:
return Forbidden(message="Only the owner of this pipeline can delete it")

# Delete the pipeline
try:
# Delete the tasks
tasks = pipeline.tasks.all()
task_service.delete(tasks)
except (DatabaseError, OperationalError) as e:
return ServerErrorResp(message=e.__cause__)
except ServerError as e:
return ServerErrorResp(message=e)

pipeline.delete()

msg = f"Pipeline '{pipeline_id}' deleted. {len(tasks)} task(s) deleted."
return BaseResponse(message=msg, result=msg)

def build_ci_pipeline(self, request, body, pipeline):
try:
# Build an task_request from the pipeline request body
task_request = ImageBuildTask(
id="build",
builder=body.builder,
cache=body.cache,
description="Build an image from a repository and push it to an image registry",
destination=body.destination,
context=body.context,
pipeline_id=pipeline.id,
type=TASK_TYPE_IMAGE_BUILD
)

# Create 'build' task
task_service.create(pipeline, task_request)
except (ValidationError, BadRequestError) as e:
pipeline.delete()
return BadRequest(message=e)
except (IntegrityError, OperationalError, DatabaseError) as e:
pipeline.delete()
return BadRequest(message=e.__cause__)
except Exception as e:
pipeline.delete()
return ServerErrorResp(message=e)

return ResourceURLResponse(
url=resource_url_builder(request.url.replace("/ci", "/pipelines"), pipeline.id))

def build_workflow_pipeline(self, request, body, pipeline):
# Create tasks
# TODO Use the TaskService to delete Tasks so it can delete
# all the secrets/credentials associated with it
tasks = []
for task_request in body.tasks:
try:
tasks.append(task_service.create(pipeline, task_request))
except (ValidationError, BadRequestError) as e:
pipeline.delete()
task_service.delete(tasks)
return BadRequest(message=e)
except (IntegrityError, OperationalError, DatabaseError) as e:
pipeline.delete()
task_service.delete(tasks)
return BadRequest(message=e.__cause__)
except ServerError as e:
return ServerErrorResp(message=e)
except Exception as e:
task_service.delete(tasks)
pipeline.delete()
return ServerErrorResp(message=e)

return ResourceURLResponse(
url=resource_url_builder(request.url, pipeline.id))
17 changes: 15 additions & 2 deletions src/api/src/backend/views/http/etl.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class LocalIOBox(BaseModel):
manifest_generation_policy: EnumManifestGenerationPolicy
manifest_priority: EnumManifestPriority = EnumManifestPriority.Oldest
manifests_path: str = None
exclude_pattern: str = None
exclude_pattern: List[str] = []

class LocalInbox(LocalIOBox):
manifest_generation_policy: EnumManifestGenerationPolicy = EnumManifestGenerationPolicy.OnePerFile
Expand All @@ -42,9 +42,22 @@ class GlobusRemoteInbox(BaseModel):
globus_endpoint_id: str
globus_client_id: str

class S3Auth:
access_key: str
access_secret: str

class S3RemoteInbox(BaseModel):
s3_auth: S3Auth
url: str
bucket: str

class TapisETLPipeline(BaseModel):
remote_outbox: Dict = None
local_inbox: LocalInbox
jobs: List[Dict]
local_outbox: LocalOutbox
remote_inbox: GlobusRemoteInbox
remote_inbox: Union[
GlobusRemoteInbox,
S3RemoteInbox
]
follow_tasks: List[Dict] = []
16 changes: 10 additions & 6 deletions src/api/src/backend/views/http/requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,8 @@ class ValueFromSecretRef(BaseModel):
class Spec(BaseModel):
required: bool = False
type: EnumTaskIOTypes

class SpecWithValue(Spec):
value: Value = None
value_from: ValueFromAll = None

Expand Down Expand Up @@ -272,24 +274,24 @@ def value_from_type_validation(cls, value):
)
return value

class TaskInputSpec(Spec):
class TaskInputSpec(SpecWithValue):
value: Value = None
value_from: ValueFromAll = None

class EnvSpec(Spec):
class EnvSpec(SpecWithValue):
value: Value = None
value_from: EnvSpecValueFrom = None

Env = Dict[str, EnvSpec]

class ParamSpec(Spec):
class ParamSpec(SpecWithValue):
type: EnumTaskIOTypes
value: Value = None
value_from: ParamSpecValueFrom = None

Params = Dict[str, ParamSpec]

KeyVal = Dict[str, Spec]
PipelineParams = Dict[str, Spec]
################## /Common ###################

class S3Auth(BaseModel):
Expand Down Expand Up @@ -524,6 +526,7 @@ class GitRepository(BaseModel):
class ClonedGitRepository(GitRepository):
directory: str


class Uses(BaseModel):
source: GitRepository
name: str = None
Expand Down Expand Up @@ -686,11 +689,12 @@ class Pipeline(BaseModel):
]
] = []
execution_profile: PipelineExecutionProfile = PipelineExecutionProfile(
max_exec_time=DEFAULT_MAX_EXEC_TIME*3)
max_exec_time=DEFAULT_MAX_EXEC_TIME*3
)
cron: str = None
archive_ids: List[str] = []
env: Env = {}
params: Params = {}
params: PipelineParams = {}

# NOTE This pre validation transformer is for backwards-compatibility
# Previous pipelines did not have environments or parmas
Expand Down
1 change: 1 addition & 0 deletions src/engine/src/conf/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
KANIKO_IMAGE_TAG = "latest"

FLAVORS = {
# EnumTaskFlavor.C1_XTINY: {"cpu": "1", "memory": ".01G", "disk": "1GB"},
EnumTaskFlavor.C1_TINY: {"cpu": "1", "memory": ".5G", "disk": "20GB"},
EnumTaskFlavor.C1_XXSML: {"cpu": "1", "memory": "1G", "disk": "20GB"},
EnumTaskFlavor.C1_XSML: {"cpu": "1", "memory": "2G", "disk": "20GB"},
Expand Down
20 changes: 13 additions & 7 deletions src/engine/src/owe_python_sdk/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,8 @@ class ValueFromSecretRef(BaseModel):
class Spec(BaseModel):
required: bool = False
type: EnumTaskIOTypes

class SpecWithValue(Spec):
value: Value = None
value_from: ValueFromAll = None

Expand Down Expand Up @@ -272,24 +274,24 @@ def value_from_type_validation(cls, value):
)
return value

class TaskInputSpec(Spec):
class TaskInputSpec(SpecWithValue):
value: Value = None
value_from: ValueFromAll = None

class EnvSpec(Spec):
class EnvSpec(SpecWithValue):
value: Value = None
value_from: EnvSpecValueFrom = None

Env = Dict[str, EnvSpec]

class ParamSpec(Spec):
class ParamSpec(SpecWithValue):
type: EnumTaskIOTypes
value: Value = None
value_from: ParamSpecValueFrom = None

Params = Dict[str, ParamSpec]

KeyVal = Dict[str, Spec]
PipelineParams = Dict[str, Spec]
################## /Common ###################

class S3Auth(BaseModel):
Expand Down Expand Up @@ -524,6 +526,7 @@ class GitRepository(BaseModel):
class ClonedGitRepository(GitRepository):
directory: str


class Uses(BaseModel):
source: GitRepository
name: str = None
Expand Down Expand Up @@ -686,11 +689,12 @@ class Pipeline(BaseModel):
]
] = []
execution_profile: PipelineExecutionProfile = PipelineExecutionProfile(
max_exec_time=DEFAULT_MAX_EXEC_TIME*3)
max_exec_time=DEFAULT_MAX_EXEC_TIME*3
)
cron: str = None
archive_ids: List[str] = []
env: Env = {}
params: Params = {}
params: PipelineParams = {}

# NOTE This pre validation transformer is for backwards-compatibility
# Previous pipelines did not have environments or parmas
Expand Down Expand Up @@ -816,4 +820,6 @@ def __init__(
self.is_valid = is_valid
self.body = body
self.message = message
self.failure_view = failure_view
self.failure_view = failure_view


1 change: 1 addition & 0 deletions src/shared/TapisServiceAPIGateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ def __init__(self,

self.client = None
try:
# NOTE FIXME Remove custom_spec_dict when the updated workflows openapi spec added to tapipy 10/4/23
self.client = get_service_tapis_client(tenants=tenants, jwt=jwt, custom_spec_dict={"workflows": "local: /src/conf/OWESpec.yaml"})
except Exception as e:
logging.error(f'Could not instantiate tapisservice client. Exception: {e}')
Expand Down

0 comments on commit 21d2e25

Please sign in to comment.