Skip to content

Commit

Permalink
Add/Alter input furnishing functionality that is built upon using inp…
Browse files Browse the repository at this point in the history
…ut files instead of envrionment variables
  • Loading branch information
nathandf committed Jan 8, 2024
1 parent eed8cd6 commit c8eda13
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 26 deletions.
38 changes: 21 additions & 17 deletions src/engine/src/core/tasks/executors/Function.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import os, base64, time, shutil, inspect
import os, base64, time, shutil, inspect, json

from kubernetes import client

Expand Down Expand Up @@ -177,17 +177,21 @@ def _setup_container(self) -> ContainerDetails:
name="_OWE_EXEC_DIR",
value=os.path.join(self.task.container_exec_dir)
),
client.V1EnvVar(
name="_OWE_INPUT_SCHEMA",
value=json.dumps(self.task.input.dict())
)
]

# Convert defined workflow inputs into the function containers env vars with
# the open workflow engine input prefix
container_details.env = container_details.env + env + input_to_k8s_env_vars(
self.task.input,
self.ctx.pipeline.work_dir,
env=self.ctx.env,
args=self.ctx.args,
prefix="_OWE_WORKFLOW_INPUT_"
)
# # Convert defined workflow inputs into the function containers env vars with
# # the open workflow engine input prefix
# container_details.env = container_details.env + env + input_to_k8s_env_vars(
# self.task.input,
# self.ctx.pipeline.work_dir,
# env=self.ctx.env,
# args=self.ctx.args,
# prefix="_OWE_WORKFLOW_INPUT_"
# )

return container_details

Expand All @@ -196,13 +200,13 @@ def _write_entrypoint_file(self, file_path, code):
file.write(base64.b64decode(code))

# FIXME
def _setup_linux_container(self):
# Create entrypoint file that will be mounted into the container via NFS mount.
# The code provided in the request is expected to be base64 encoded. Decode, then
# encode in UTF-8
entrypoint_filename = "entrypoint.sh"
local_entrypoint_file_path = os.path.join(self.task.exec_dir, entrypoint_filename)
self._write_entrypoint_file(local_entrypoint_file_path, self.task.code)
# def _setup_linux_container(self):
# # Create entrypoint file that will be mounted into the container via NFS mount.
# # The code provided in the request is expected to be base64 encoded. Decode, then
# # encode in UTF-8
# entrypoint_filename = "entrypoint.sh"
# local_entrypoint_file_path = os.path.join(self.task.exec_dir, entrypoint_filename)
# self._write_entrypoint_file(local_entrypoint_file_path, self.task.code)

def _setup_python_container(self):
# Create entrypoint file that will be mounted into the container via NFS mount.
Expand Down
22 changes: 15 additions & 7 deletions src/engine/src/owe_python_sdk/runtime/execution_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,24 @@ def __init__(self, runtime: Runtime):
self.output_dir = runtime.OUTPUT_DIR
self.input_dir = runtime.INPUT_DIR
self.exec_dir = runtime.EXEC_DIR
self.input_schema = runtime.INPUT_SCHEMA
self.input_ids = list(self.input_schema.keys())

def get_input(self, key, default=None):
input_var = os.environ.get(INPUT_PREFIX + key, default=default)
def get_input(self, input_id, default=None):
contents = None
try:
with open(os.path.join(self.input_id, input_id), mode="r") as file:
contents = file.read()
except FileNotFoundError:
return default

return input_var
if contents == "": return None

return contents

def find_inputs(self, contains=None):
keys = list(os.environ.keys())
if contains == None: return keys
ids = [key.replace(INPUT_PREFIX, "") for key in keys if contains in key and print(key) == None]
if contains == None: return self.input_ids
ids = [input_id for input_id in self.input_ids if contains in input_id]
return ids

def set_output(self, _id, value, encoding=None):
Expand All @@ -42,7 +50,7 @@ def stderr(self, code: int, message):

sys.exit(code)

def stdout(self, value, code=0):
def stdout(self, value):
with open(self._runtime.STDOUT, "w") as file:
if type(value) == dict:
value = json.dumps(value)
Expand Down
5 changes: 3 additions & 2 deletions src/engine/src/owe_python_sdk/runtime/runtime.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import os
import os, json

from owe_python_sdk.constants import STDOUT, STDERR

Expand All @@ -12,4 +12,5 @@ def __init__(self):
self.PIPELINE_RUN_UUID = os.environ.get("_OWE_PIPELINE_RUN_UUID")
self.TASK_ID = os.environ.get("_OWE_TASK_ID")
self.STDOUT = os.path.join(self.OUTPUT_DIR, STDOUT)
self.STDERR = os.path.join(self.OUTPUT_DIR, STDERR)
self.STDERR = os.path.join(self.OUTPUT_DIR, STDERR)
self.INPUT_SCHEMA = json.loads(os.environ.get("_OWE_INPUT_SCHEMA"))

0 comments on commit c8eda13

Please sign in to comment.