Skip to content

Commit

Permalink
Remove references to code converting inputs to k8s env vars
Browse files Browse the repository at this point in the history
  • Loading branch information
nathandf committed Jan 9, 2024
1 parent 2badeeb commit 9778d15
Show file tree
Hide file tree
Showing 2 changed files with 1 addition and 102 deletions.
9 changes: 1 addition & 8 deletions src/engine/src/core/tasks/executors/Function.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
)
from core.resources import JobResource
from utils import get_flavor
from utils.k8s import flavor_to_k8s_resource_reqs, input_to_k8s_env_vars, gen_resource_name
from utils.k8s import flavor_to_k8s_resource_reqs, gen_resource_name
from core.tasks import function_bootstrap
from core.repositories import GitCacheRepository

Expand Down Expand Up @@ -188,13 +188,6 @@ def _setup_container(self) -> ContainerDetails:
# Convert defined workflow inputs into the function containers env vars with
# the open workflow engine input prefix
container_details.env = container_details.env + env
# + input_to_k8s_env_vars(
# self.task.input,
# self.ctx.pipeline.work_dir,
# env=self.ctx.env,
# args=self.ctx.args,
# prefix="_OWE_WORKFLOW_INPUT_"
# )

return container_details

Expand Down
94 changes: 0 additions & 94 deletions src/engine/src/utils/k8s.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,100 +25,6 @@ def flavor_to_limits(flavor: Flavor):

return {"cpu": flavor.cpu, "memory": flavor.memory, **gpu_specs}

def input_to_k8s_env_vars(_inputs, pipeline_work_dir, env={}, args={}, prefix=""):
k8senvvars = []
for input_id, _input in _inputs.items():
# Use input[input_id].value if provided
value = _input.value
if value != None:
k8senvvars.append(
V1EnvVar(
name=f"{prefix}{input_id}",
value=value
),
)
continue

# Raise exception if both value and value_from are None
value_from = _input.value_from
if value_from == None and value == None:
raise Exception(f"Invalid input value for '{input_id}'. Value cannot be null/None")

k8senvvar_value_source_key = None
k8senvvar_value_source = None
k8senvvar_value = None

if value_from.get("env", None) != None:
k8senvvar_value_source = "env"
key = value_from.get("env")
k8senvvar_value = get_value_from_env(env, key)
if k8senvvar_value == None:
raise Exception(f"No value found for environment variable '{value_from.get('env')}'")
k8senvvar_value_source_key = key
elif value_from.get("args", None) != None:
k8senvvar_value_source = "args"
key = value_from.get("args")
k8senvvar_value = get_value_from_args(args, key)
k8senvvar_value_source_key = key
elif value_from.get("task_output", None) != None:
k8senvvar_value_source = "task_output"
task_id = value_from.get("task_output").task_id
output_id = value_from.get("task_output").output_id

# Get the value from the output of a previous task
previous_task_output_path = os.path.join(
pipeline_work_dir,
task_id,
"output",
output_id
)

# Check that the specified output file exists
if not os.path.isfile(previous_task_output_path): raise Exception(f"Error: output '{output_id}' for task {task_id} not found")

# Grab the value of the output from the file
contents = None
with open(previous_task_output_path, "r") as file:
contents = file.read()

# TODO Consider allowing None
# Raise exception if output is None
if contents == None: raise Exception(f"Error: Output '{output_id}' for task '{task_id}' is null")

# Coerce the value of the contents into the value specified in the input
input_type = _input.type
try:
if input_type == "string":
k8senvvar_value = str(contents)
elif input_type == "int":
k8senvvar_value = int(contents)
elif input_type == "float":
k8senvvar_value = float(contents)
elif input_type == "binary":
pass
else:
raise Exception(f"Error: Unsupported input type. Cannot coerce task output '{output_id}' for task '{task_id}' to type {input_type}")
except Exception as e:
raise Exception(f"Error: Could not coerce output '{output_id}' for task '{task_id}' to {input_type}. Details: {e.message}")

else:
raise Exception("Invalid 'value_from' type: Must be oneOf type [env, args, task_output]")

if k8senvvar_value == None:
source_key_error_message = ""
if k8senvvar_value_source_key:
source_key_error_message = f" at key '{k8senvvar_value_source_key}'"
raise Exception(f"Value for input {input_id} not found in source {k8senvvar_value_source}{source_key_error_message}")

k8senvvars.append(
V1EnvVar(
name=f"{prefix}{input_id}",
value=k8senvvar_value
),
)

return k8senvvars

def get_value_from_env(env, key):
value = env.get(key, None)
if value == None: return None
Expand Down

0 comments on commit 9778d15

Please sign in to comment.