Skip to content

Commit

Permalink
rename interceptable->interruptable. Add task input file staging service
Browse files Browse the repository at this point in the history
  • Loading branch information
nathandf committed Jan 5, 2024
1 parent 7ef7ae1 commit 685adcc
Show file tree
Hide file tree
Showing 5 changed files with 137 additions and 57 deletions.
9 changes: 7 additions & 2 deletions src/engine/src/core/ioc/IOCContainerFactory.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
from functools import partial

from core.ioc import IOCContainer
from core.state import ReactiveState
from core.daos import (
Expand All @@ -24,6 +22,7 @@
TaskRepository,
TemplateRepository
)
from core.tasks.TaskInputFileStagingService import TaskInputFileStagingService
from core.workflows import (
GraphValidator,
ValueFromService
Expand Down Expand Up @@ -119,6 +118,12 @@ def build(self):
# )
# )

container.register("TaskInputFileStagingService",
lambda: TaskInputFileStagingService(
container.load("ValueFromService")
)
)

container.register("TaskOutputRepository",
lambda: TaskOutputRepository(
container.load("TaskOutputMapper")
Expand Down
68 changes: 68 additions & 0 deletions src/engine/src/core/tasks/TaskInputFileStagingService.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import os

from core.workflows import ValueFromService
from owe_python_sdk.schema import Task
from errors.tasks import TaskInputStagingError


class TaskInputFileStagingService:
"""Responsible for creating the actual files that will be used as inputs
during a task execution. Possible sources of data for the input files may be
one or all of the following:
1) The value property of an input specification
2) An argument passed as part of a pipeline invocation
2) A variable from the envrionment with which the pipeline was invoked
2) An output file from another task upon which the staging task is dependent
"""
def __init__(
self,
value_from_service: ValueFromService
):
self.value_from_service = value_from_service

def stage(self, task: Task):
"""Iterates over all of the items in the task input dictionary, fetches
the values from their sources, then creates the files in the task's
working directory"""
for input_id, input_ in task.input.items():
if input_.value != None:
self._create_input_(task, input_id, input_.value)

value_from = input_.value_from
key = list(value_from.keys())[0] # NOTE Should only have 1 key
if key == "task_output":
try:
value = self._value_from_service.get_task_output_value_by_id(
task_id=value_from[key].task_id,
_id=value_from[key].output_id
)
except Exception:
raise TaskInputStagingError(f"No output found for task '{value_from[key].task_id}' with output id of '{value_from[key].output_id}'")
if key == "args":
try:
value = self._value_from_service.get_arg_value_by_key(
value_from[key]
)
except Exception:
raise TaskInputStagingError(f"Error attempting to fetch value from args at key '{key}'")
if key == "env":
try:
value = self._value_from_service.get_env_value_by_key(
value_from[key]
)
except Exception:
raise TaskInputStagingError(f"Error attempting to fetch value from env at key '{key}'")

self._create_input_(task, input_id, value)

def _create_input_(self, task, input_id, value):
try:
with open(os.path.join(task.input_dir, input_id), mode="w") as file:
if value == None: value == ""
file.write(str(value))
except Exception as e:
raise TaskInputStagingError(f"Error while staging input: {e}")




1 change: 0 additions & 1 deletion src/engine/src/core/tasks/executors/Function.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
from utils.k8s import flavor_to_k8s_resource_reqs, input_to_k8s_env_vars, gen_resource_name
from core.tasks import function_bootstrap
from core.repositories import GitCacheRepository
from errors import WorkflowTerminated


class ContainerDetails:
Expand Down
Loading

0 comments on commit 685adcc

Please sign in to comment.