Skip to content

Commit

Permalink
Use STDERR/OUT constants. Add input directory to tasks. Register task…
Browse files Browse the repository at this point in the history
… executors in task prep phase
  • Loading branch information
nathandf committed Sep 28, 2023
1 parent ddb352e commit d3b3d2d
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
TASK_ACTIVE, TASK_ARCHIVING, TASK_BACKOFF, TASK_COMPLETED, TASK_FAILED,
TASK_PENDING, TASK_SUSPENDED, TASK_TERMINATED, TASK_SKIPPED
)
from owe_python_sdk.constants import STDERR, STDOUT


class TapisWorkflowsAPIBackend(EventHandler):
Expand Down Expand Up @@ -222,7 +223,7 @@ def _tail_output(self, task, filename, flag="rb", max_bytes=10000):
return str(file.read())

def _tail_stdout(self, task):
return self._tail_output(task, ".stdout")
return self._tail_output(task, STDOUT)

def _tail_stderr(self, task):
return self._tail_output(task, ".stderr")
return self._tail_output(task, STDERR)
32 changes: 21 additions & 11 deletions src/engine/src/core/workflows/executors/WorkflowExecutor.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from core.tasks.TaskExecutorFactory import task_executor_factory as factory
from owe_python_sdk.TaskResult import TaskResult
from owe_python_sdk.constants import STDERR, STDOUT
from owe_python_sdk.middleware.ArchiveMiddleware import ArchiveMiddleware
from owe_python_sdk.events import (
Event,
Expand Down Expand Up @@ -65,7 +66,7 @@ class WorkflowExecutor(Worker, EventPublisher):
When initialized, the WorkflowExecutor creates an EventExchange to which
EventPublishers can publish Events. EventHandlers can then be registered to the
EventExchange and handle the Events generated by the EventPublishers. The
WorkflowExecutor itselfas well as every TaskExecutor it spawnsare capable
WorkflowExecutor itself--as well as every TaskExecutor it spawns--are capable
of publishing Events to this exchange. Each WorkflowExecutor initialized by
the Server is persitent, meaning that it is used throughout the lifetime
of the Workflow Executor Server. After each run of a workflow, the
Expand Down Expand Up @@ -207,9 +208,10 @@ def _prepare_tasks(self):
# Paths to the workdir for the task inside the workflow engine container
task.work_dir = f"{self.state.ctx.pipeline.work_dir}{task.id}/"
task.exec_dir = f"{task.work_dir}src/"
task.input_dir = f"{task.work_dir}input/"
task.output_dir = f"{task.work_dir}output/"
task.stdout = f"{task.output_dir}.stdout"
task.stderr = f"{task.output_dir}.stderr"
task.stdout = f"{task.output_dir}{STDOUT}"
task.stderr = f"{task.output_dir}{STDERR}"

# Paths to the workdir for the task inside the job container
task.container_work_dir = "/mnt/open-workflow-engine/pipeline/task"
Expand All @@ -218,6 +220,7 @@ def _prepare_tasks(self):
# Paths to the workdir inside the nfs-server container
task.nfs_work_dir = f"{self.state.ctx.pipeline.nfs_work_dir}{task.id}/"
task.nfs_exec_dir = f"{task.nfs_work_dir}src/"
task.nfs_input_dir = f"{task.nfs_work_dir}input/"
task.nfs_output_dir = f"{task.nfs_work_dir}output/"

# Create the task's directories
Expand All @@ -226,6 +229,12 @@ def _prepare_tasks(self):
# Add a key to the output for the task
self.state.ctx.output = {task.id: []}

# Resolve and register the task executor
executor = factory.build(task, self.state.ctx, self.exchange, plugins=self._plugins)

# Register the task executor
self._register_executor(self.state.ctx.pipeline_run.uuid, task, executor)

@interceptable()
def _prepare_task_fs(self, task):
# Create the base directory for all files and output created during this task execution
Expand All @@ -237,6 +246,9 @@ def _prepare_task_fs(self, task):
# Create the output dir in which the output of the task execution will be stored
os.makedirs(task.output_dir, exist_ok=True)

# Create the input dir in which the inputs to tasks will be staged
os.makedirs(task.input_dir, exist_ok=True)

# Create the stdout and stderr files
Path(task.stdout).touch()
Path(task.stderr).touch()
Expand All @@ -252,11 +264,8 @@ def _start_task(self, task):

try:
if not self.state.is_dry_run:
# Resolve the task executor and execute the task
executor = factory.build(task, self.state.ctx, self.exchange, plugins=self._plugins)

# Register the task executor
self._register_executor(self.state.ctx.pipeline_run.uuid, task, executor)
# Get the task executor
executor = self._get_executor(self.state.ctx.pipeline_run.uuid, task)

task_result = executor.execute()

Expand Down Expand Up @@ -293,8 +302,6 @@ def _on_task_terminal_state(self, task, task_result):

# Deregister the task executor. This cleans up the resources that were created
# during the initialization and execution of the task executor
# TODO NOTE the line below will throw and exception if task
# fails before registering the executor
self._deregister_executor(self.state.ctx.pipeline_run.uuid, task)

# Run the on_pipeline_terminal_state callback if all tasks are complete.
Expand Down Expand Up @@ -494,9 +501,12 @@ def _remove_from_queue(self, task):

@interceptable()
def _register_executor(self, run_id, task, executor):
# TODO Might register an executor after termination in case of race condition
self.state.executors[f"{run_id}.{task.id}"] = executor

@interceptable()
def _get_executor(self, run_id, task, default=None):
return self.state.executors.get(f"{run_id}.{task.id}", None)

@interceptable()
def _deregister_executor(self, run_id, task):
# Clean up the resources created by the task executor
Expand Down
5 changes: 3 additions & 2 deletions src/engine/src/owe_python_sdk/TaskExecutor.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from owe_python_sdk.events import EventPublisher, EventExchange, Event
from owe_python_sdk.TaskResult import TaskResult
from owe_python_sdk.TaskOutputFile import TaskOutputFile
from owe_python_sdk.constants import STDERR, STDOUT
from utils import lbuffer_str as lbuf
from core.resources import Resource, ResourceType
from conf.constants import (
Expand Down Expand Up @@ -69,10 +70,10 @@ def _set_output(self, filename, value, flag="wb"):
file.write(value)

def _stdout(self, value, flag="wb"):
self._set_output(".stdout", value, flag=flag)
self._set_output(STDOUT, value, flag=flag)

def _stderr(self, value, flag="wb"):
self._set_output(".stderr", value, flag=flag)
self._set_output(STDERR, value, flag=flag)

def _get_task_output_files(self):
return os.listdir(self.task.output_dir)
Expand Down
3 changes: 2 additions & 1 deletion src/engine/src/owe_python_sdk/runtime/execution_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ class ExecutionContext:
def __init__(self, runtime: Runtime):
self._runtime = runtime
self.output_dir = runtime.OUTPUT_DIR
self.input_dir = runtime.INPUT_DIR
self.exec_dir = runtime.EXEC_DIR

def get_input(self, key, default=None):
Expand All @@ -34,7 +35,7 @@ def stderr(self, code: int, message):

sys.exit(code)

def stdout(self, value):
def stdout(self, value, code=0):
with open(self._runtime.STDOUT, "w") as file:
if type(value) == dict:
value = json.dumps(value)
Expand Down
1 change: 1 addition & 0 deletions src/engine/src/owe_python_sdk/runtime/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

class Runtime:
def __init__(self):
self.INPUT_DIR = os.environ.get("OWE_INPUT_DIR")
self.OUTPUT_DIR = os.environ.get("OWE_OUTPUT_DIR")
self.EXEC_DIR = os.environ.get("OWE_EXEC_DIR")
self.STDOUT = os.path.join(self.OUTPUT_DIR, STDOUT)
Expand Down

0 comments on commit d3b3d2d

Please sign in to comment.