From c8eda13fedf12085ddd12a5f6df112fa298f2bc6 Mon Sep 17 00:00:00 2001 From: Nathan Freeman Date: Mon, 8 Jan 2024 14:51:00 -0600 Subject: [PATCH] Add/Alter input furnishing functionality that is built upon using input files instead of envrionment variables --- .../src/core/tasks/executors/Function.py | 38 ++++++++++--------- .../runtime/execution_context.py | 22 +++++++---- .../src/owe_python_sdk/runtime/runtime.py | 5 ++- 3 files changed, 39 insertions(+), 26 deletions(-) diff --git a/src/engine/src/core/tasks/executors/Function.py b/src/engine/src/core/tasks/executors/Function.py index 1fd7fd50..fd42c88c 100644 --- a/src/engine/src/core/tasks/executors/Function.py +++ b/src/engine/src/core/tasks/executors/Function.py @@ -1,4 +1,4 @@ -import os, base64, time, shutil, inspect +import os, base64, time, shutil, inspect, json from kubernetes import client @@ -177,17 +177,21 @@ def _setup_container(self) -> ContainerDetails: name="_OWE_EXEC_DIR", value=os.path.join(self.task.container_exec_dir) ), + client.V1EnvVar( + name="_OWE_INPUT_SCHEMA", + value=json.dumps(self.task.input.dict()) + ) ] - # Convert defined workflow inputs into the function containers env vars with - # the open workflow engine input prefix - container_details.env = container_details.env + env + input_to_k8s_env_vars( - self.task.input, - self.ctx.pipeline.work_dir, - env=self.ctx.env, - args=self.ctx.args, - prefix="_OWE_WORKFLOW_INPUT_" - ) + # # Convert defined workflow inputs into the function containers env vars with + # # the open workflow engine input prefix + # container_details.env = container_details.env + env + input_to_k8s_env_vars( + # self.task.input, + # self.ctx.pipeline.work_dir, + # env=self.ctx.env, + # args=self.ctx.args, + # prefix="_OWE_WORKFLOW_INPUT_" + # ) return container_details @@ -196,13 +200,13 @@ def _write_entrypoint_file(self, file_path, code): file.write(base64.b64decode(code)) # FIXME - def _setup_linux_container(self): - # Create entrypoint file that will be mounted into the container via NFS mount. - # The code provided in the request is expected to be base64 encoded. Decode, then - # encode in UTF-8 - entrypoint_filename = "entrypoint.sh" - local_entrypoint_file_path = os.path.join(self.task.exec_dir, entrypoint_filename) - self._write_entrypoint_file(local_entrypoint_file_path, self.task.code) + # def _setup_linux_container(self): + # # Create entrypoint file that will be mounted into the container via NFS mount. + # # The code provided in the request is expected to be base64 encoded. Decode, then + # # encode in UTF-8 + # entrypoint_filename = "entrypoint.sh" + # local_entrypoint_file_path = os.path.join(self.task.exec_dir, entrypoint_filename) + # self._write_entrypoint_file(local_entrypoint_file_path, self.task.code) def _setup_python_container(self): # Create entrypoint file that will be mounted into the container via NFS mount. diff --git a/src/engine/src/owe_python_sdk/runtime/execution_context.py b/src/engine/src/owe_python_sdk/runtime/execution_context.py index 305f8ae4..02a6d8ef 100644 --- a/src/engine/src/owe_python_sdk/runtime/execution_context.py +++ b/src/engine/src/owe_python_sdk/runtime/execution_context.py @@ -10,16 +10,24 @@ def __init__(self, runtime: Runtime): self.output_dir = runtime.OUTPUT_DIR self.input_dir = runtime.INPUT_DIR self.exec_dir = runtime.EXEC_DIR + self.input_schema = runtime.INPUT_SCHEMA + self.input_ids = list(self.input_schema.keys()) - def get_input(self, key, default=None): - input_var = os.environ.get(INPUT_PREFIX + key, default=default) + def get_input(self, input_id, default=None): + contents = None + try: + with open(os.path.join(self.input_id, input_id), mode="r") as file: + contents = file.read() + except FileNotFoundError: + return default - return input_var + if contents == "": return None + + return contents def find_inputs(self, contains=None): - keys = list(os.environ.keys()) - if contains == None: return keys - ids = [key.replace(INPUT_PREFIX, "") for key in keys if contains in key and print(key) == None] + if contains == None: return self.input_ids + ids = [input_id for input_id in self.input_ids if contains in input_id] return ids def set_output(self, _id, value, encoding=None): @@ -42,7 +50,7 @@ def stderr(self, code: int, message): sys.exit(code) - def stdout(self, value, code=0): + def stdout(self, value): with open(self._runtime.STDOUT, "w") as file: if type(value) == dict: value = json.dumps(value) diff --git a/src/engine/src/owe_python_sdk/runtime/runtime.py b/src/engine/src/owe_python_sdk/runtime/runtime.py index e805d96a..fe82b021 100644 --- a/src/engine/src/owe_python_sdk/runtime/runtime.py +++ b/src/engine/src/owe_python_sdk/runtime/runtime.py @@ -1,4 +1,4 @@ -import os +import os, json from owe_python_sdk.constants import STDOUT, STDERR @@ -12,4 +12,5 @@ def __init__(self): self.PIPELINE_RUN_UUID = os.environ.get("_OWE_PIPELINE_RUN_UUID") self.TASK_ID = os.environ.get("_OWE_TASK_ID") self.STDOUT = os.path.join(self.OUTPUT_DIR, STDOUT) - self.STDERR = os.path.join(self.OUTPUT_DIR, STDERR) \ No newline at end of file + self.STDERR = os.path.join(self.OUTPUT_DIR, STDERR) + self.INPUT_SCHEMA = json.loads(os.environ.get("_OWE_INPUT_SCHEMA")) \ No newline at end of file