Skip to content

Commit

Permalink
endpoint for creating etl pipelines
Browse files Browse the repository at this point in the history
  • Loading branch information
nathandf committed Oct 10, 2023
1 parent 634b5d2 commit f3785f4
Show file tree
Hide file tree
Showing 3 changed files with 154 additions and 62 deletions.
6 changes: 5 additions & 1 deletion src/api/src/backend/conf/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,8 @@
"notifications",
"sk",
"streams"
]
]

LATEST_TAPIS_ETL_PIPELINE_TEMPLATE_NAME = "tapis/etl-pipeline@v1beta"
TAPIS_ETL_TEMPLATE_REPO_URL = "https://github.com/tapis-project/tapis-owe-templates.git"
TAPIS_ETL_TEMPLATE_REPO_BRANCH = "master"
198 changes: 140 additions & 58 deletions src/api/src/backend/views/ETLPipelines.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,43 @@
import os, json

from typing import List
from pydantic import ValidationError
from django.db import DatabaseError, IntegrityError, OperationalError
from django.forms import model_to_dict
from git import Repo

from backend.views.RestrictedAPIView import RestrictedAPIView
from backend.views.http.responses.errors import (
Conflict,
BadRequest,
NotFound,
Forbidden,
ServerError as ServerErrorResp
ServerError as ServerErrorResp,
UnprocessableEntity
)
from backend.views.http.responses import BaseResponse, ResourceURLResponse
from backend.views.http.requests import ImageBuildTask
from backend.views.http.requests import (
Uses,
GitRepository,
TemplateTask,
TapisJobTask,
TaskDependency
)
from backend.views.http.etl import TapisETLPipeline
from backend.models import (
Pipeline as PipelineModel,
Archive,
PipelineArchive,
TASK_TYPE_IMAGE_BUILD
PipelineArchive
)
from backend.services.TaskService import service as task_service
from backend.services.GroupService import service as group_service
from backend.errors.api import BadRequestError, ServerError
from backend.helpers import resource_url_builder
from backend.conf.constants import (
LATEST_TAPIS_ETL_PIPELINE_TEMPLATE_NAME,
TAPIS_ETL_TEMPLATE_REPO_BRANCH,
TAPIS_ETL_TEMPLATE_REPO_URL
)


class ETLPipelines(RestrictedAPIView):
Expand All @@ -38,16 +52,20 @@ def post(self, request, group_id, *_, **__):
if not group_service.user_in_group(request.username, group_id, request.tenant_id):
return Forbidden(message="You do not have access to this group")

# Git repository that contains the pipeline and task definitions for the
# tapis etl pipeline
uses = Uses(
name=LATEST_TAPIS_ETL_PIPELINE_TEMPLATE_NAME,
source=GitRepository(
url=TAPIS_ETL_TEMPLATE_REPO_URL,
branch=TAPIS_ETL_TEMPLATE_REPO_BRANCH
)
)

# Validate the request body based on the type of pipeline specified
prepared_request = self.prepare(
TapisETLPipeline,
uses={
"name": "tapis/etl-pipeline@v1beta",
"source": {
"url": "https://github.com/tapis-project/tapis-owe-templates.git",
"branch": "master"
}
}
uses=uses
)

# Return the failure view instance if validation failed
Expand All @@ -61,24 +79,83 @@ def post(self, request, group_id, *_, **__):
if PipelineModel.objects.filter(id=body.id, group=group).exists():
return Conflict(f"A Pipeline already exists with the id '{body.id}'")

# Check that the pipeline contains at least 1 tapis job definition
if len(body.jobs) < 1:
return BadRequest("An ETL pipeline must contain at least 1 Tapis Job definition")
# Clone the git repository that contains the pipeline and task definitions that will be used
tapis_owe_templates_dir = "/tmp/git/tapis-owe-templates"
try:
Repo.clone(uses.source.url, tapis_owe_templates_dir)
except Exception as e:
return ServerErrorResp(f"Error cloning the Tapis OWE Template repository: {str(e)}")

try:
# Open the owe-config.json file
with open(os.path.join(tapis_owe_templates_dir, "owe-config.json")) as file:
owe_config = json.loads(file.read())

# Open the etl pipeline schema.json
with open(
os.path.join(
tapis_owe_templates_dir,
owe_config.get(LATEST_TAPIS_ETL_PIPELINE_TEMPLATE_NAME).get("path")
)
) as file:
pipeline_template = json.loads(file.read())
except Exception as e:
return UnprocessableEntity(f"Configuration Error (owe-config.json): {str(e)}")


# Create the pipeline
try:
pipeline = PipelineModel.objects.create(
id=body.id,
description=getattr(body, "description", None) or pipeline_template.get("description"),
group=group,
owner=request.username,
uses=request.uses.dict(),
max_exec_time=body.execution_profile.max_exec_time,
invocation_mode=body.execution_profile.invocation_mode,
max_retries=body.execution_profile.max_retries,
retry_policy=body.execution_profile.retry_policy,
duplicate_submission_policy=body.execution_profile.duplicate_submission_policy,
env=body.dict()["env"],
params=body.dict()["params"]
**body.execution_profile,
duplicate_submission_policy=(
pipeline_template
.get("execution_profile")
.get("duplicate_submission_policy")
),
env={
**pipeline_template.env,
"LOCAL_INBOX_SYSTEM_ID": {
"type": "string",
"value": body.local_inbox.system_id
},
"LOCAL_INBOX_DATA_PATH": {
"type": "string",
"value": body.local_inbox.data_path
},
"LOCAL_INBOX_MANIFEST_PATH": {
"type": "string",
"value": body.local_inbox.manifest_path
},
"LOCAL_OUTBOX_SYSTEM_ID": {
"type": "string",
"value": body.local_outbox.system_id
},
"LOCAL_OUTBOX_DATA_PATH": {
"type": "string",
"value": body.local_outbox.data_path
},
"LOCAL_OUTBOX_MANIFEST_PATH": {
"type": "string",
"value": body.local_outbox.manifest_path
},
"GLOBUS_SOURCE_ENDPOINT_ID": {
"type": "string",
"value": body.local_outbox.globus_endpoint_id
},
"GLOBUS_DESTINATION_ENDPOINT_ID": {
"type": "string",
"value": body.remote_inbox.globus_endpoint_id
},
"GLOBUS_CLIENT_ID": {
"type": "string",
"value": body.remote_inbox.globus_client_id
}
},
params=pipeline_template.get("params")
)
except (IntegrityError, OperationalError) as e:
return BadRequest(message=e.__cause__)
Expand Down Expand Up @@ -113,46 +190,49 @@ def post(self, request, group_id, *_, **__):
[pipeline_archive.delete() for pipeline_archive in pipeline_archives]
return BadRequest(message=e.__cause__)

# The first tapis job should be dependent on the gen-inbound-manifests task
last_task_id = "gen-inbound-manifests"

return fn(request, body, pipeline)

def build_ci_pipeline(self, request, body, pipeline):
try:
# Build an task_request from the pipeline request body
task_request = ImageBuildTask(
id="build",
builder=body.builder,
cache=body.cache,
description="Build an image from a repository and push it to an image registry",
destination=body.destination,
context=body.context,
pipeline_id=pipeline.id,
type=TASK_TYPE_IMAGE_BUILD
# Create a tapis job task for each job provided in the request.
tasks = []
for i, job in enumerate(request.jobs, start=1):
task_id = f"etl-job-{i}"
tasks.append(
TapisJobTask({
"id": task_id,
"type": "tapis_job",
"tapis_job_def": job,
"dependencies": [{"id": last_task_id}]
})
)
last_task_id = task_id

# Create 'build' task
task_service.create(pipeline, task_request)
except (ValidationError, BadRequestError) as e:
pipeline.delete()
return BadRequest(message=e)
except (IntegrityError, OperationalError, DatabaseError) as e:
pipeline.delete()
return BadRequest(message=e.__cause__)
except Exception as e:
pipeline.delete()
return ServerErrorResp(message=e)
# Add the tasks from the template to the tasks list
tasks.extend([TemplateTask(**task) for task in pipeline_template.tasks])

return ResourceURLResponse(
url=resource_url_builder(request.url.replace("/ci", "/pipelines"), pipeline.id))

def build_workflow_pipeline(self, request, body, pipeline):
# Create tasks
# TODO Use the TaskService to delete Tasks so it can delete
# all the secrets/credentials associated with it
tasks = []
for task_request in body.tasks:
# Update the dependecies of the gen-outbound-manifests task to
# include the last tapis job task
gen_outbound_manifests_task = next(filter(lambda t: t.id == "gen-outbound-manifests", ))
gen_outbound_manifests_task.depends_on.append(
TaskDependency(id=last_task_id)
)

# Add the tasks to the database
for i, job in enumerate(request.jobs, start=1):
task_id = f"job-{i}"
try:
tasks.append(task_service.create(pipeline, task_request))
tasks.append(
task_service.create(
pipeline,
{
"id": task_id,
"type": "tapis_job",
"tapis_job_def": job,
"dependencies": [{"id": last_task_id}]
}
)
)
last_task_id = task_id
except (ValidationError, BadRequestError) as e:
pipeline.delete()
task_service.delete(tasks)
Expand All @@ -169,4 +249,6 @@ def build_workflow_pipeline(self, request, body, pipeline):
return ServerErrorResp(message=e)

return ResourceURLResponse(
url=resource_url_builder(request.url, pipeline.id))
url=resource_url_builder(request.url, pipeline.id)
)

12 changes: 9 additions & 3 deletions src/api/src/backend/views/http/etl.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from enum import Enum
from typing import List, Dict, Union

from pydantic import BaseModel
from pydantic import BaseModel, validator

from .requests import _EnumMeta, Pipeline

Expand Down Expand Up @@ -55,9 +55,15 @@ class TapisETLPipeline(Pipeline):
remote_outbox: Dict = None
local_inbox: LocalInbox
jobs: List[Dict]
local_outbox: LocalOutbox
local_outbox: GlobusLocalOutbox
remote_inbox: Union[
GlobusRemoteInbox,
S3RemoteInbox
]
followup_tasks: List[Dict] = []
followup_tasks: List[Dict] = []

@validator("jobs")
def one_or_more_jobs(cls, value):
# Check that the pipeline contains at least 1 tapis job definition
if len(value) < 1:
raise ValueError("An ETL pipeline must contain at least 1 Tapis Job definition")

0 comments on commit f3785f4

Please sign in to comment.