Skip to content

Commit

Permalink
add destination path to remote inbox
Browse files Browse the repository at this point in the history
  • Loading branch information
nathandf committed Nov 10, 2023
1 parent 8dee7c1 commit 5ba782c
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 5 deletions.
4 changes: 4 additions & 0 deletions src/api/src/backend/views/ETLPipelines.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,10 @@ def post(self, request, group_id, *_, **__):
"type": "string",
"value": body.remote_inbox.globus_endpoint_id
},
"GLOBUS_DESTINATION_PATH": {
"type": "string",
"value": body.remote_inbox.globus_destination_path
},
"GLOBUS_CLIENT_ID": {
"type": "string",
"value": body.remote_inbox.globus_client_id
Expand Down
1 change: 1 addition & 0 deletions src/api/src/backend/views/http/etl.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class GlobusRemoteInbox(BaseModel):
globus_auth: GlobusAuth
globus_endpoint_id: str
globus_client_id: str
globus_destination_path: str

class S3Auth(BaseModel):
access_key: str
Expand Down
12 changes: 8 additions & 4 deletions src/engine/src/core/workflows/executors/WorkflowExecutor.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,11 @@ def _prepare_tasks(self):
# Fetch task templates
if task.uses != None:
template_mapper = TemplateMapper(cache_dir=self.state.ctx.pipeline.git_cache_dir)
task = template_mapper.map(task, task.uses)
try:
task = template_mapper.map(task, task.uses)
except Exception as e:
# Trigger the terminal state callback.
self._on_pipeline_terminal_state(event=PIPELINE_FAILED, message=str(e))

# Add a key to the output for the task
self.state.ctx.output[task.id] = None
Expand Down Expand Up @@ -337,15 +341,15 @@ def _on_task_terminal_state(self, task, task_result):
return self._fetch_ready_tasks()

@interceptable()
def _on_pipeline_terminal_state(self, event=None):
def _on_pipeline_terminal_state(self, event=None, message=""):
# No event was provided. Determine if complete or failed from number
# of failed tasks
if event == None:
event = PIPELINE_FAILED if len(self.state.failed) > 0 else PIPELINE_COMPLETED

msg = "COMPLETED"
if event == PIPELINE_FAILED: msg = "FAILED"
elif event == PIPELINE_TERMINATED: msg = "TERMINATED"
if event == PIPELINE_FAILED: msg = "FAILED" + f" {message}"
elif event == PIPELINE_TERMINATED: msg = "TERMINATED" + f" {message}"

self.state.ctx.logger.info(self.p_str(msg))

Expand Down
5 changes: 4 additions & 1 deletion src/engine/src/helpers/TemplateMapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,10 @@ def map(self, obj: Union[Pipeline, Task], uses: Uses) -> Union[Pipeline, Task]:
"""

# Clone git repository specified on the pipeline.uses if exists
template = self.template_repo.get_by_uses(uses)
try:
template = self.template_repo.get_by_uses(uses)
except Exception as e:
raise Exception(f"Template mapping error: {e}")

# Resolve which class the final object should have
obj_class = Pipeline
Expand Down

0 comments on commit 5ba782c

Please sign in to comment.