diff --git a/src/api/src/backend/urls.py b/src/api/src/backend/urls.py index e6ce02b7..43bd0eef 100644 --- a/src/api/src/backend/urls.py +++ b/src/api/src/backend/urls.py @@ -21,6 +21,7 @@ from backend.views.UpdatePipelineRunStatus import UpdatePipelineRunStatus from backend.views.UpdateTaskExecutionStatus import UpdateTaskExecutionStatus from backend.views.CreateTaskExecution import CreateTaskExecution +from backend.views.ETLPipelines import ETLPipelines urlpatterns = [ @@ -47,6 +48,7 @@ # Pipelines path("groups//ci", Pipelines.as_view(), name="ci"), + path("groups//etl", ETLPipelines.as_view(), name="etl"), path("groups//pipelines", Pipelines.as_view(), name="pipelines"), path("groups//pipelines/", Pipelines.as_view(), name="pipeline"), path("groups//pipelines//owner/", ChangePipelineOwner.as_view(), name="changePipelineOwner"), diff --git a/src/api/src/backend/views/ETLPipelines.py b/src/api/src/backend/views/ETLPipelines.py new file mode 100644 index 00000000..b2b2b54a --- /dev/null +++ b/src/api/src/backend/views/ETLPipelines.py @@ -0,0 +1,200 @@ +from typing import List +from pydantic import ValidationError +from django.db import DatabaseError, IntegrityError, OperationalError +from django.forms import model_to_dict + +from backend.views.RestrictedAPIView import RestrictedAPIView +from backend.views.http.responses.errors import ( + Conflict, + BadRequest, + NotFound, + Forbidden, + ServerError as ServerErrorResp +) +from backend.views.http.responses.models import ModelListResponse +from backend.views.http.responses import BaseResponse, ResourceURLResponse +from backend.views.http.requests import WorkflowPipeline, CIPipeline, ImageBuildTask +from backend.views.http.etl import TapisETLPipeline +from backend.models import ( + Pipeline, + Archive, + PipelineArchive, + TASK_TYPE_IMAGE_BUILD +) +from backend.services.TaskService import service as task_service +from backend.services.GroupService import service as group_service +from backend.errors.api import BadRequestError, ServerError +from backend.helpers import resource_url_builder + + +class ETLPipelines(RestrictedAPIView): + def post(self, request, group_id, *_, **__): + """ETL Pipelines""" + # Get the group + group = group_service.get(group_id, request.tenant_id) + if group == None: + return NotFound(f"No group found with id '{group_id}'") + + # Check that the user belongs to the group + if not group_service.user_in_group(request.username, group_id, request.tenant_id): + return Forbidden(message="You do not have access to this group") + + # Validate the request body based on the type of pipeline specified + prepared_request = self.prepare(TapisETLPipeline) + + # Return the failure view instance if validation failed + if not prepared_request.is_valid: + return prepared_request.failure_view + + # Get the JSON encoded body from the validation result + body = prepared_request.body + + # Check that the id of the pipeline is unique + if Pipeline.objects.filter(id=body.id, group=group).exists(): + return Conflict(f"A Pipeline already exists with the id '{body.id}'") + + # Create the pipeline + try: + pipeline = Pipeline.objects.create( + id=body.id, + group=group, + owner=request.username, + max_exec_time=body.execution_profile.max_exec_time, + invocation_mode=body.execution_profile.invocation_mode, + max_retries=body.execution_profile.max_retries, + retry_policy=body.execution_profile.retry_policy, + duplicate_submission_policy=body.execution_profile.duplicate_submission_policy, + env=body.dict()["env"], + params=body.dict()["params"] + ) + except (IntegrityError, OperationalError) as e: + return BadRequest(message=e.__cause__) + except Exception as e: + return ServerErrorResp(f"{e}") + + # Fetch the archives specified in the request then create relations + # between them and the pipline + pipeline_archives = [] + try: + # Prevent duplicate pipeline archives by casting id array to 'set' + for archive_id in set(body.archive_ids): + # Fetch the archive object + archive = Archive.objects.filter(group=group, id=archive_id).first() + + # Return bad request if archive not found + if archive == None: + pipeline.delete() + return BadRequest(message=f"Archive not found with an id of '{archive_id}' and group_id '{group.id}'") + + pipeline_archives.append( + PipelineArchive.objects.create( + pipeline=pipeline, + archive=archive + ) + ) + except (IntegrityError, OperationalError, DatabaseError) as e: + # Delete the pipeline + pipeline.delete() + + # Delete the pipeline archive relationships that were just created + [pipeline_archive.delete() for pipeline_archive in pipeline_archives] + return BadRequest(message=e.__cause__) + + # Fetch the function for building the pipeline according to its type. + fn = getattr(self, PIPELINE_TYPE_MAPPING[body.type]) + + return fn(request, body, pipeline) + + def delete(self, request, group_id, pipeline_id, *_, **__): + # Get the group + group = group_service.get(group_id, request.tenant_id) + if group == None: + return NotFound(f"No group found with id '{group_id}'") + + # Check that the user belongs to the group + if not group_service.user_in_group(request.username, group_id, request.tenant_id): + return Forbidden(message="You do not have access to this group") + + # Get the pipeline by the id provided in the path params + pipeline = Pipeline.objects.filter( + id=pipeline_id, + group=group + ).prefetch_related("tasks").first() + + + if pipeline == None: + return NotFound(f"Pipeline not found with id '{pipeline_id}'") + + # Delete operation only allowed by owner + if request.username != pipeline.owner: + return Forbidden(message="Only the owner of this pipeline can delete it") + + # Delete the pipeline + try: + # Delete the tasks + tasks = pipeline.tasks.all() + task_service.delete(tasks) + except (DatabaseError, OperationalError) as e: + return ServerErrorResp(message=e.__cause__) + except ServerError as e: + return ServerErrorResp(message=e) + + pipeline.delete() + + msg = f"Pipeline '{pipeline_id}' deleted. {len(tasks)} task(s) deleted." + return BaseResponse(message=msg, result=msg) + + def build_ci_pipeline(self, request, body, pipeline): + try: + # Build an task_request from the pipeline request body + task_request = ImageBuildTask( + id="build", + builder=body.builder, + cache=body.cache, + description="Build an image from a repository and push it to an image registry", + destination=body.destination, + context=body.context, + pipeline_id=pipeline.id, + type=TASK_TYPE_IMAGE_BUILD + ) + + # Create 'build' task + task_service.create(pipeline, task_request) + except (ValidationError, BadRequestError) as e: + pipeline.delete() + return BadRequest(message=e) + except (IntegrityError, OperationalError, DatabaseError) as e: + pipeline.delete() + return BadRequest(message=e.__cause__) + except Exception as e: + pipeline.delete() + return ServerErrorResp(message=e) + + return ResourceURLResponse( + url=resource_url_builder(request.url.replace("/ci", "/pipelines"), pipeline.id)) + + def build_workflow_pipeline(self, request, body, pipeline): + # Create tasks + # TODO Use the TaskService to delete Tasks so it can delete + # all the secrets/credentials associated with it + tasks = [] + for task_request in body.tasks: + try: + tasks.append(task_service.create(pipeline, task_request)) + except (ValidationError, BadRequestError) as e: + pipeline.delete() + task_service.delete(tasks) + return BadRequest(message=e) + except (IntegrityError, OperationalError, DatabaseError) as e: + pipeline.delete() + task_service.delete(tasks) + return BadRequest(message=e.__cause__) + except ServerError as e: + return ServerErrorResp(message=e) + except Exception as e: + task_service.delete(tasks) + pipeline.delete() + return ServerErrorResp(message=e) + + return ResourceURLResponse( + url=resource_url_builder(request.url, pipeline.id)) diff --git a/src/api/src/backend/views/http/etl.py b/src/api/src/backend/views/http/etl.py index e00ff813..e56d4f6b 100644 --- a/src/api/src/backend/views/http/etl.py +++ b/src/api/src/backend/views/http/etl.py @@ -21,7 +21,7 @@ class LocalIOBox(BaseModel): manifest_generation_policy: EnumManifestGenerationPolicy manifest_priority: EnumManifestPriority = EnumManifestPriority.Oldest manifests_path: str = None - exclude_pattern: str = None + exclude_pattern: List[str] = [] class LocalInbox(LocalIOBox): manifest_generation_policy: EnumManifestGenerationPolicy = EnumManifestGenerationPolicy.OnePerFile @@ -42,9 +42,22 @@ class GlobusRemoteInbox(BaseModel): globus_endpoint_id: str globus_client_id: str +class S3Auth: + access_key: str + access_secret: str + +class S3RemoteInbox(BaseModel): + s3_auth: S3Auth + url: str + bucket: str + class TapisETLPipeline(BaseModel): remote_outbox: Dict = None local_inbox: LocalInbox jobs: List[Dict] local_outbox: LocalOutbox - remote_inbox: GlobusRemoteInbox \ No newline at end of file + remote_inbox: Union[ + GlobusRemoteInbox, + S3RemoteInbox + ] + follow_tasks: List[Dict] = [] \ No newline at end of file diff --git a/src/api/src/backend/views/http/requests.py b/src/api/src/backend/views/http/requests.py index 57fe634e..4bad8fa1 100644 --- a/src/api/src/backend/views/http/requests.py +++ b/src/api/src/backend/views/http/requests.py @@ -225,6 +225,8 @@ class ValueFromSecretRef(BaseModel): class Spec(BaseModel): required: bool = False type: EnumTaskIOTypes + +class SpecWithValue(Spec): value: Value = None value_from: ValueFromAll = None @@ -272,24 +274,24 @@ def value_from_type_validation(cls, value): ) return value -class TaskInputSpec(Spec): +class TaskInputSpec(SpecWithValue): value: Value = None value_from: ValueFromAll = None -class EnvSpec(Spec): +class EnvSpec(SpecWithValue): value: Value = None value_from: EnvSpecValueFrom = None Env = Dict[str, EnvSpec] -class ParamSpec(Spec): +class ParamSpec(SpecWithValue): type: EnumTaskIOTypes value: Value = None value_from: ParamSpecValueFrom = None Params = Dict[str, ParamSpec] -KeyVal = Dict[str, Spec] +PipelineParams = Dict[str, Spec] ################## /Common ################### class S3Auth(BaseModel): @@ -524,6 +526,7 @@ class GitRepository(BaseModel): class ClonedGitRepository(GitRepository): directory: str + class Uses(BaseModel): source: GitRepository name: str = None @@ -686,11 +689,12 @@ class Pipeline(BaseModel): ] ] = [] execution_profile: PipelineExecutionProfile = PipelineExecutionProfile( - max_exec_time=DEFAULT_MAX_EXEC_TIME*3) + max_exec_time=DEFAULT_MAX_EXEC_TIME*3 + ) cron: str = None archive_ids: List[str] = [] env: Env = {} - params: Params = {} + params: PipelineParams = {} # NOTE This pre validation transformer is for backwards-compatibility # Previous pipelines did not have environments or parmas diff --git a/src/engine/src/conf/constants.py b/src/engine/src/conf/constants.py index 55a0d397..d9b2b6d0 100644 --- a/src/engine/src/conf/constants.py +++ b/src/engine/src/conf/constants.py @@ -84,6 +84,7 @@ KANIKO_IMAGE_TAG = "latest" FLAVORS = { + # EnumTaskFlavor.C1_XTINY: {"cpu": "1", "memory": ".01G", "disk": "1GB"}, EnumTaskFlavor.C1_TINY: {"cpu": "1", "memory": ".5G", "disk": "20GB"}, EnumTaskFlavor.C1_XXSML: {"cpu": "1", "memory": "1G", "disk": "20GB"}, EnumTaskFlavor.C1_XSML: {"cpu": "1", "memory": "2G", "disk": "20GB"}, diff --git a/src/engine/src/owe_python_sdk/schema.py b/src/engine/src/owe_python_sdk/schema.py index 3a38c99d..4bad8fa1 100644 --- a/src/engine/src/owe_python_sdk/schema.py +++ b/src/engine/src/owe_python_sdk/schema.py @@ -225,6 +225,8 @@ class ValueFromSecretRef(BaseModel): class Spec(BaseModel): required: bool = False type: EnumTaskIOTypes + +class SpecWithValue(Spec): value: Value = None value_from: ValueFromAll = None @@ -272,24 +274,24 @@ def value_from_type_validation(cls, value): ) return value -class TaskInputSpec(Spec): +class TaskInputSpec(SpecWithValue): value: Value = None value_from: ValueFromAll = None -class EnvSpec(Spec): +class EnvSpec(SpecWithValue): value: Value = None value_from: EnvSpecValueFrom = None Env = Dict[str, EnvSpec] -class ParamSpec(Spec): +class ParamSpec(SpecWithValue): type: EnumTaskIOTypes value: Value = None value_from: ParamSpecValueFrom = None Params = Dict[str, ParamSpec] -KeyVal = Dict[str, Spec] +PipelineParams = Dict[str, Spec] ################## /Common ################### class S3Auth(BaseModel): @@ -524,6 +526,7 @@ class GitRepository(BaseModel): class ClonedGitRepository(GitRepository): directory: str + class Uses(BaseModel): source: GitRepository name: str = None @@ -686,11 +689,12 @@ class Pipeline(BaseModel): ] ] = [] execution_profile: PipelineExecutionProfile = PipelineExecutionProfile( - max_exec_time=DEFAULT_MAX_EXEC_TIME*3) + max_exec_time=DEFAULT_MAX_EXEC_TIME*3 + ) cron: str = None archive_ids: List[str] = [] env: Env = {} - params: Params = {} + params: PipelineParams = {} # NOTE This pre validation transformer is for backwards-compatibility # Previous pipelines did not have environments or parmas @@ -816,4 +820,6 @@ def __init__( self.is_valid = is_valid self.body = body self.message = message - self.failure_view = failure_view \ No newline at end of file + self.failure_view = failure_view + + diff --git a/src/shared/TapisServiceAPIGateway.py b/src/shared/TapisServiceAPIGateway.py index f6b7a86a..961021ee 100644 --- a/src/shared/TapisServiceAPIGateway.py +++ b/src/shared/TapisServiceAPIGateway.py @@ -12,6 +12,7 @@ def __init__(self, self.client = None try: + # NOTE FIXME Remove custom_spec_dict when the updated workflows openapi spec added to tapipy 10/4/23 self.client = get_service_tapis_client(tenants=tenants, jwt=jwt, custom_spec_dict={"workflows": "local: /src/conf/OWESpec.yaml"}) except Exception as e: logging.error(f'Could not instantiate tapisservice client. Exception: {e}')