Skip to content

Commit

Permalink
add status-reduce task to etl pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
nathandf committed Dec 15, 2023
1 parent 290220b commit 95611c9
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 21 deletions.
39 changes: 19 additions & 20 deletions src/api/src/backend/views/ETLPipelines.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@
TemplateTask,
TapisJobTask,
TaskDependency,
TaskInputSpec,
TaskOutputRef
TaskInputSpec
)
from backend.views.http.etl import TapisETLPipeline
from backend.models import (
Expand Down Expand Up @@ -233,6 +232,7 @@ def post(self, request, group_id, *_, **__):

# Create a tapis job task for each job provided in the request.
tasks = []
job_tasks = []
try:
for i, job in enumerate(body.jobs, start=1):
# Set up the conditions for the job to run
Expand All @@ -250,37 +250,36 @@ def post(self, request, group_id, *_, **__):
conditions.append(no_op_condition)

task_id = f"etl-job-{i}"
tasks.append(
TapisJobTask(**{
tapis_job_task = TapisJobTask(**{
"id": task_id,
"type": "tapis_job",
"tapis_job_def": job,
"depends_on": [{"id": last_task_id}],
"conditions": conditions
})
)
tasks.append(tapis_job_task)
job_tasks.append(tapis_job_task)
last_task_id = task_id

# Add the tasks from the template to the tasks list
tasks.extend([TemplateTask(**task) for task in pipeline_template.get("tasks")])

# Update the dependecies of the update-inbound-manifest task to
# include the last tapis job task
update_inbound_manifest_task = next(filter(lambda t: t.id == "update-inbound-manifest", tasks))
update_inbound_manifest_task.depends_on.append(
TaskDependency(id=last_task_id, can_fail=True)
)
# Update the dependecies of the status-reduce task to
# include all tapis-job tasks
status_reduce_task = next(filter(lambda t: t.id == "status-reduce", tasks))
for job_task in job_tasks:
status_reduce_task.depends_on.append(
TaskDependency(id=job_task.id, can_fail=True)
)

# Update the input of the update inbound manifest task to include
# the status output from the last tapis job task
update_inbound_manifest_task.input["LAST_TASK_STATUS"] = TaskInputSpec(
value_from={
"task_output": {
"task_id": last_task_id,
"output_id": "STATUS"
status_reduce_task.input[f"{job_task.id}_JOB_STATUS"] = TaskInputSpec(
value_from={
"task_output": {
"task_id": job_task.id,
"output_id": "STATUS"
}
}
}
)
)
except ValidationError as e:
# Delete the pipeline
pipeline.delete()
Expand Down
2 changes: 1 addition & 1 deletion src/engine/src/contrib/tapis/executors/TapisJob.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,5 +99,5 @@ def execute(self):
return self._task_result(1, errors=[f"Job '{job.name}' ended with status {job.status}. Last Message: {job.lastMessage}"])

except Exception as e:
self._set_output("STATUS", json.dumps(None), flag="w")
self._set_output("STATUS", "FAILED", flag="w")
return self._task_result(1, errors=[str(e)])

0 comments on commit 95611c9

Please sign in to comment.