Skip to content

Commit

Permalink
coordinate task dependencies in etl endpoint.
Browse files Browse the repository at this point in the history
  • Loading branch information
nathandf committed Oct 10, 2023
1 parent f3785f4 commit d4dd890
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 17 deletions.
36 changes: 20 additions & 16 deletions src/api/src/backend/views/ETLPipelines.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,16 @@ def post(self, request, group_id, *_, **__):
"type": "string",
"value": body.local_inbox.manifest_path
},
"LOCAL_OUTBOX_SYSTEM_ID": {
"LOCAL_INBOX_MANIFEST_GENERATION_POLICY": {
"type": "string",
"value": body.local_inbox.manifest_generation_policy
},
"LOCAL_INBOX_MANIFEST_PRIORITY": {
"type": "string",
"value": body.local_inbox.manifest_priority
},
"LOCAL_OUTBOX_SYSTEM_ID": {
"type": "string",
"value": body.local_outbox.system_id
},
"LOCAL_OUTBOX_DATA_PATH": {
Expand All @@ -142,6 +150,14 @@ def post(self, request, group_id, *_, **__):
"type": "string",
"value": body.local_outbox.manifest_path
},
"LOCAL_OUTBOX_MANIFEST_GENERATION_POLICY": {
"type": "string",
"value": body.local_outbox.manifest_generation_policy
},
"LOCAL_OUTBOX_MANIFEST_PRIORITY": {
"type": "string",
"value": body.local_outbox.manifest_priority
},
"GLOBUS_SOURCE_ENDPOINT_ID": {
"type": "string",
"value": body.local_outbox.globus_endpoint_id
Expand Down Expand Up @@ -212,27 +228,15 @@ def post(self, request, group_id, *_, **__):

# Update the dependecies of the gen-outbound-manifests task to
# include the last tapis job task
gen_outbound_manifests_task = next(filter(lambda t: t.id == "gen-outbound-manifests", ))
gen_outbound_manifests_task = next(filter(lambda t: t.id == "gen-outbound-manifests", tasks))
gen_outbound_manifests_task.depends_on.append(
TaskDependency(id=last_task_id)
)

# Add the tasks to the database
for i, job in enumerate(request.jobs, start=1):
task_id = f"job-{i}"
for task in tasks:
try:
tasks.append(
task_service.create(
pipeline,
{
"id": task_id,
"type": "tapis_job",
"tapis_job_def": job,
"dependencies": [{"id": last_task_id}]
}
)
)
last_task_id = task_id
task_service.create(pipeline, task)
except (ValidationError, BadRequestError) as e:
pipeline.delete()
task_service.delete(tasks)
Expand Down
1 change: 0 additions & 1 deletion src/api/src/backend/views/http/etl.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ class TapisETLPipeline(Pipeline):
GlobusRemoteInbox,
S3RemoteInbox
]
followup_tasks: List[Dict] = []

@validator("jobs")
def one_or_more_jobs(cls, value):
Expand Down

0 comments on commit d4dd890

Please sign in to comment.