diff --git a/src/engine/src/contrib/tapis/middleware/event_handlers/notifications/TapisWorkflowsAPIBackend.py b/src/engine/src/contrib/tapis/middleware/event_handlers/notifications/TapisWorkflowsAPIBackend.py index 311f5093..f8e221ca 100644 --- a/src/engine/src/contrib/tapis/middleware/event_handlers/notifications/TapisWorkflowsAPIBackend.py +++ b/src/engine/src/contrib/tapis/middleware/event_handlers/notifications/TapisWorkflowsAPIBackend.py @@ -10,6 +10,7 @@ TASK_ACTIVE, TASK_ARCHIVING, TASK_BACKOFF, TASK_COMPLETED, TASK_FAILED, TASK_PENDING, TASK_SUSPENDED, TASK_TERMINATED, TASK_SKIPPED ) +from owe_python_sdk.constants import STDERR, STDOUT class TapisWorkflowsAPIBackend(EventHandler): @@ -222,7 +223,7 @@ def _tail_output(self, task, filename, flag="rb", max_bytes=10000): return str(file.read()) def _tail_stdout(self, task): - return self._tail_output(task, ".stdout") + return self._tail_output(task, STDOUT) def _tail_stderr(self, task): - return self._tail_output(task, ".stderr") \ No newline at end of file + return self._tail_output(task, STDERR) \ No newline at end of file diff --git a/src/engine/src/core/workflows/executors/WorkflowExecutor.py b/src/engine/src/core/workflows/executors/WorkflowExecutor.py index bc792c75..3bcf39ba 100644 --- a/src/engine/src/core/workflows/executors/WorkflowExecutor.py +++ b/src/engine/src/core/workflows/executors/WorkflowExecutor.py @@ -6,6 +6,7 @@ from core.tasks.TaskExecutorFactory import task_executor_factory as factory from owe_python_sdk.TaskResult import TaskResult +from owe_python_sdk.constants import STDERR, STDOUT from owe_python_sdk.middleware.ArchiveMiddleware import ArchiveMiddleware from owe_python_sdk.events import ( Event, @@ -65,7 +66,7 @@ class WorkflowExecutor(Worker, EventPublisher): When initialized, the WorkflowExecutor creates an EventExchange to which EventPublishers can publish Events. EventHandlers can then be registered to the EventExchange and handle the Events generated by the EventPublishers. The - WorkflowExecutor itself–as well as every TaskExecutor it spawns–are capable + WorkflowExecutor itself--as well as every TaskExecutor it spawns--are capable of publishing Events to this exchange. Each WorkflowExecutor initialized by the Server is persitent, meaning that it is used throughout the lifetime of the Workflow Executor Server. After each run of a workflow, the @@ -207,9 +208,10 @@ def _prepare_tasks(self): # Paths to the workdir for the task inside the workflow engine container task.work_dir = f"{self.state.ctx.pipeline.work_dir}{task.id}/" task.exec_dir = f"{task.work_dir}src/" + task.input_dir = f"{task.work_dir}input/" task.output_dir = f"{task.work_dir}output/" - task.stdout = f"{task.output_dir}.stdout" - task.stderr = f"{task.output_dir}.stderr" + task.stdout = f"{task.output_dir}{STDOUT}" + task.stderr = f"{task.output_dir}{STDERR}" # Paths to the workdir for the task inside the job container task.container_work_dir = "/mnt/open-workflow-engine/pipeline/task" @@ -218,6 +220,7 @@ def _prepare_tasks(self): # Paths to the workdir inside the nfs-server container task.nfs_work_dir = f"{self.state.ctx.pipeline.nfs_work_dir}{task.id}/" task.nfs_exec_dir = f"{task.nfs_work_dir}src/" + task.nfs_input_dir = f"{task.nfs_work_dir}input/" task.nfs_output_dir = f"{task.nfs_work_dir}output/" # Create the task's directories @@ -226,6 +229,12 @@ def _prepare_tasks(self): # Add a key to the output for the task self.state.ctx.output = {task.id: []} + # Resolve and register the task executor + executor = factory.build(task, self.state.ctx, self.exchange, plugins=self._plugins) + + # Register the task executor + self._register_executor(self.state.ctx.pipeline_run.uuid, task, executor) + @interceptable() def _prepare_task_fs(self, task): # Create the base directory for all files and output created during this task execution @@ -237,6 +246,9 @@ def _prepare_task_fs(self, task): # Create the output dir in which the output of the task execution will be stored os.makedirs(task.output_dir, exist_ok=True) + # Create the input dir in which the inputs to tasks will be staged + os.makedirs(task.input_dir, exist_ok=True) + # Create the stdout and stderr files Path(task.stdout).touch() Path(task.stderr).touch() @@ -252,11 +264,8 @@ def _start_task(self, task): try: if not self.state.is_dry_run: - # Resolve the task executor and execute the task - executor = factory.build(task, self.state.ctx, self.exchange, plugins=self._plugins) - - # Register the task executor - self._register_executor(self.state.ctx.pipeline_run.uuid, task, executor) + # Get the task executor + executor = self._get_executor(self.state.ctx.pipeline_run.uuid, task) task_result = executor.execute() @@ -293,8 +302,6 @@ def _on_task_terminal_state(self, task, task_result): # Deregister the task executor. This cleans up the resources that were created # during the initialization and execution of the task executor - # TODO NOTE the line below will throw and exception if task - # fails before registering the executor self._deregister_executor(self.state.ctx.pipeline_run.uuid, task) # Run the on_pipeline_terminal_state callback if all tasks are complete. @@ -494,9 +501,12 @@ def _remove_from_queue(self, task): @interceptable() def _register_executor(self, run_id, task, executor): - # TODO Might register an executor after termination in case of race condition self.state.executors[f"{run_id}.{task.id}"] = executor + @interceptable() + def _get_executor(self, run_id, task, default=None): + return self.state.executors.get(f"{run_id}.{task.id}", None) + @interceptable() def _deregister_executor(self, run_id, task): # Clean up the resources created by the task executor diff --git a/src/engine/src/owe_python_sdk/TaskExecutor.py b/src/engine/src/owe_python_sdk/TaskExecutor.py index a8bb0aba..6a537999 100644 --- a/src/engine/src/owe_python_sdk/TaskExecutor.py +++ b/src/engine/src/owe_python_sdk/TaskExecutor.py @@ -6,6 +6,7 @@ from owe_python_sdk.events import EventPublisher, EventExchange, Event from owe_python_sdk.TaskResult import TaskResult from owe_python_sdk.TaskOutputFile import TaskOutputFile +from owe_python_sdk.constants import STDERR, STDOUT from utils import lbuffer_str as lbuf from core.resources import Resource, ResourceType from conf.constants import ( @@ -69,10 +70,10 @@ def _set_output(self, filename, value, flag="wb"): file.write(value) def _stdout(self, value, flag="wb"): - self._set_output(".stdout", value, flag=flag) + self._set_output(STDOUT, value, flag=flag) def _stderr(self, value, flag="wb"): - self._set_output(".stderr", value, flag=flag) + self._set_output(STDERR, value, flag=flag) def _get_task_output_files(self): return os.listdir(self.task.output_dir) diff --git a/src/engine/src/owe_python_sdk/runtime/execution_context.py b/src/engine/src/owe_python_sdk/runtime/execution_context.py index 523b1ab8..0c2a49c8 100644 --- a/src/engine/src/owe_python_sdk/runtime/execution_context.py +++ b/src/engine/src/owe_python_sdk/runtime/execution_context.py @@ -8,6 +8,7 @@ class ExecutionContext: def __init__(self, runtime: Runtime): self._runtime = runtime self.output_dir = runtime.OUTPUT_DIR + self.input_dir = runtime.INPUT_DIR self.exec_dir = runtime.EXEC_DIR def get_input(self, key, default=None): @@ -34,7 +35,7 @@ def stderr(self, code: int, message): sys.exit(code) - def stdout(self, value): + def stdout(self, value, code=0): with open(self._runtime.STDOUT, "w") as file: if type(value) == dict: value = json.dumps(value) diff --git a/src/engine/src/owe_python_sdk/runtime/runtime.py b/src/engine/src/owe_python_sdk/runtime/runtime.py index 1c31da6b..d51b9414 100644 --- a/src/engine/src/owe_python_sdk/runtime/runtime.py +++ b/src/engine/src/owe_python_sdk/runtime/runtime.py @@ -5,6 +5,7 @@ class Runtime: def __init__(self): + self.INPUT_DIR = os.environ.get("OWE_INPUT_DIR") self.OUTPUT_DIR = os.environ.get("OWE_OUTPUT_DIR") self.EXEC_DIR = os.environ.get("OWE_EXEC_DIR") self.STDOUT = os.path.join(self.OUTPUT_DIR, STDOUT)