diff --git a/src/api/src/backend/views/ETLPipelines.py b/src/api/src/backend/views/ETLPipelines.py index fb06abb8..35b9a970 100644 --- a/src/api/src/backend/views/ETLPipelines.py +++ b/src/api/src/backend/views/ETLPipelines.py @@ -20,8 +20,7 @@ TemplateTask, TapisJobTask, TaskDependency, - TaskInputSpec, - TaskOutputRef + TaskInputSpec ) from backend.views.http.etl import TapisETLPipeline from backend.models import ( @@ -233,6 +232,7 @@ def post(self, request, group_id, *_, **__): # Create a tapis job task for each job provided in the request. tasks = [] + job_tasks = [] try: for i, job in enumerate(body.jobs, start=1): # Set up the conditions for the job to run @@ -250,37 +250,36 @@ def post(self, request, group_id, *_, **__): conditions.append(no_op_condition) task_id = f"etl-job-{i}" - tasks.append( - TapisJobTask(**{ + tapis_job_task = TapisJobTask(**{ "id": task_id, "type": "tapis_job", "tapis_job_def": job, "depends_on": [{"id": last_task_id}], "conditions": conditions }) - ) + tasks.append(tapis_job_task) + job_tasks.append(tapis_job_task) last_task_id = task_id # Add the tasks from the template to the tasks list tasks.extend([TemplateTask(**task) for task in pipeline_template.get("tasks")]) - # Update the dependecies of the update-inbound-manifest task to - # include the last tapis job task - update_inbound_manifest_task = next(filter(lambda t: t.id == "update-inbound-manifest", tasks)) - update_inbound_manifest_task.depends_on.append( - TaskDependency(id=last_task_id, can_fail=True) - ) + # Update the dependecies of the status-reduce task to + # include all tapis-job tasks + status_reduce_task = next(filter(lambda t: t.id == "status-reduce", tasks)) + for job_task in job_tasks: + status_reduce_task.depends_on.append( + TaskDependency(id=job_task.id, can_fail=True) + ) - # Update the input of the update inbound manifest task to include - # the status output from the last tapis job task - update_inbound_manifest_task.input["LAST_TASK_STATUS"] = TaskInputSpec( - value_from={ - "task_output": { - "task_id": last_task_id, - "output_id": "STATUS" + status_reduce_task.input[f"{job_task.id}_JOB_STATUS"] = TaskInputSpec( + value_from={ + "task_output": { + "task_id": job_task.id, + "output_id": "STATUS" + } } - } - ) + ) except ValidationError as e: # Delete the pipeline pipeline.delete() diff --git a/src/engine/src/contrib/tapis/executors/TapisJob.py b/src/engine/src/contrib/tapis/executors/TapisJob.py index a9df4342..0f9153d6 100644 --- a/src/engine/src/contrib/tapis/executors/TapisJob.py +++ b/src/engine/src/contrib/tapis/executors/TapisJob.py @@ -99,5 +99,5 @@ def execute(self): return self._task_result(1, errors=[f"Job '{job.name}' ended with status {job.status}. Last Message: {job.lastMessage}"]) except Exception as e: - self._set_output("STATUS", json.dumps(None), flag="w") + self._set_output("STATUS", "FAILED", flag="w") return self._task_result(1, errors=[str(e)]) \ No newline at end of file