From d4dd890e42d39339c259522ab89434b8605e9951 Mon Sep 17 00:00:00 2001 From: Nathan Freeman Date: Tue, 10 Oct 2023 14:36:50 -0500 Subject: [PATCH] coordinate task dependencies in etl endpoint. --- src/api/src/backend/views/ETLPipelines.py | 36 +++++++++++++---------- src/api/src/backend/views/http/etl.py | 1 - 2 files changed, 20 insertions(+), 17 deletions(-) diff --git a/src/api/src/backend/views/ETLPipelines.py b/src/api/src/backend/views/ETLPipelines.py index 033b8518..fc129ae8 100644 --- a/src/api/src/backend/views/ETLPipelines.py +++ b/src/api/src/backend/views/ETLPipelines.py @@ -130,8 +130,16 @@ def post(self, request, group_id, *_, **__): "type": "string", "value": body.local_inbox.manifest_path }, - "LOCAL_OUTBOX_SYSTEM_ID": { + "LOCAL_INBOX_MANIFEST_GENERATION_POLICY": { + "type": "string", + "value": body.local_inbox.manifest_generation_policy + }, + "LOCAL_INBOX_MANIFEST_PRIORITY": { "type": "string", + "value": body.local_inbox.manifest_priority + }, + "LOCAL_OUTBOX_SYSTEM_ID": { + "type": "string", "value": body.local_outbox.system_id }, "LOCAL_OUTBOX_DATA_PATH": { @@ -142,6 +150,14 @@ def post(self, request, group_id, *_, **__): "type": "string", "value": body.local_outbox.manifest_path }, + "LOCAL_OUTBOX_MANIFEST_GENERATION_POLICY": { + "type": "string", + "value": body.local_outbox.manifest_generation_policy + }, + "LOCAL_OUTBOX_MANIFEST_PRIORITY": { + "type": "string", + "value": body.local_outbox.manifest_priority + }, "GLOBUS_SOURCE_ENDPOINT_ID": { "type": "string", "value": body.local_outbox.globus_endpoint_id @@ -212,27 +228,15 @@ def post(self, request, group_id, *_, **__): # Update the dependecies of the gen-outbound-manifests task to # include the last tapis job task - gen_outbound_manifests_task = next(filter(lambda t: t.id == "gen-outbound-manifests", )) + gen_outbound_manifests_task = next(filter(lambda t: t.id == "gen-outbound-manifests", tasks)) gen_outbound_manifests_task.depends_on.append( TaskDependency(id=last_task_id) ) # Add the tasks to the database - for i, job in enumerate(request.jobs, start=1): - task_id = f"job-{i}" + for task in tasks: try: - tasks.append( - task_service.create( - pipeline, - { - "id": task_id, - "type": "tapis_job", - "tapis_job_def": job, - "dependencies": [{"id": last_task_id}] - } - ) - ) - last_task_id = task_id + task_service.create(pipeline, task) except (ValidationError, BadRequestError) as e: pipeline.delete() task_service.delete(tasks) diff --git a/src/api/src/backend/views/http/etl.py b/src/api/src/backend/views/http/etl.py index 0d8d7f4f..45d5ab97 100644 --- a/src/api/src/backend/views/http/etl.py +++ b/src/api/src/backend/views/http/etl.py @@ -60,7 +60,6 @@ class TapisETLPipeline(Pipeline): GlobusRemoteInbox, S3RemoteInbox ] - followup_tasks: List[Dict] = [] @validator("jobs") def one_or_more_jobs(cls, value):