diff --git a/src/engine/src/core/tasks/executors/Function.py b/src/engine/src/core/tasks/executors/Function.py index a673c859..d55ce9e1 100644 --- a/src/engine/src/core/tasks/executors/Function.py +++ b/src/engine/src/core/tasks/executors/Function.py @@ -12,7 +12,7 @@ ) from core.resources import JobResource from utils import get_flavor -from utils.k8s import flavor_to_k8s_resource_reqs, input_to_k8s_env_vars, gen_resource_name +from utils.k8s import flavor_to_k8s_resource_reqs, gen_resource_name from core.tasks import function_bootstrap from core.repositories import GitCacheRepository @@ -188,13 +188,6 @@ def _setup_container(self) -> ContainerDetails: # Convert defined workflow inputs into the function containers env vars with # the open workflow engine input prefix container_details.env = container_details.env + env - # + input_to_k8s_env_vars( - # self.task.input, - # self.ctx.pipeline.work_dir, - # env=self.ctx.env, - # args=self.ctx.args, - # prefix="_OWE_WORKFLOW_INPUT_" - # ) return container_details diff --git a/src/engine/src/utils/k8s.py b/src/engine/src/utils/k8s.py index 449bc081..5f06a028 100644 --- a/src/engine/src/utils/k8s.py +++ b/src/engine/src/utils/k8s.py @@ -25,100 +25,6 @@ def flavor_to_limits(flavor: Flavor): return {"cpu": flavor.cpu, "memory": flavor.memory, **gpu_specs} -def input_to_k8s_env_vars(_inputs, pipeline_work_dir, env={}, args={}, prefix=""): - k8senvvars = [] - for input_id, _input in _inputs.items(): - # Use input[input_id].value if provided - value = _input.value - if value != None: - k8senvvars.append( - V1EnvVar( - name=f"{prefix}{input_id}", - value=value - ), - ) - continue - - # Raise exception if both value and value_from are None - value_from = _input.value_from - if value_from == None and value == None: - raise Exception(f"Invalid input value for '{input_id}'. Value cannot be null/None") - - k8senvvar_value_source_key = None - k8senvvar_value_source = None - k8senvvar_value = None - - if value_from.get("env", None) != None: - k8senvvar_value_source = "env" - key = value_from.get("env") - k8senvvar_value = get_value_from_env(env, key) - if k8senvvar_value == None: - raise Exception(f"No value found for environment variable '{value_from.get('env')}'") - k8senvvar_value_source_key = key - elif value_from.get("args", None) != None: - k8senvvar_value_source = "args" - key = value_from.get("args") - k8senvvar_value = get_value_from_args(args, key) - k8senvvar_value_source_key = key - elif value_from.get("task_output", None) != None: - k8senvvar_value_source = "task_output" - task_id = value_from.get("task_output").task_id - output_id = value_from.get("task_output").output_id - - # Get the value from the output of a previous task - previous_task_output_path = os.path.join( - pipeline_work_dir, - task_id, - "output", - output_id - ) - - # Check that the specified output file exists - if not os.path.isfile(previous_task_output_path): raise Exception(f"Error: output '{output_id}' for task {task_id} not found") - - # Grab the value of the output from the file - contents = None - with open(previous_task_output_path, "r") as file: - contents = file.read() - - # TODO Consider allowing None - # Raise exception if output is None - if contents == None: raise Exception(f"Error: Output '{output_id}' for task '{task_id}' is null") - - # Coerce the value of the contents into the value specified in the input - input_type = _input.type - try: - if input_type == "string": - k8senvvar_value = str(contents) - elif input_type == "int": - k8senvvar_value = int(contents) - elif input_type == "float": - k8senvvar_value = float(contents) - elif input_type == "binary": - pass - else: - raise Exception(f"Error: Unsupported input type. Cannot coerce task output '{output_id}' for task '{task_id}' to type {input_type}") - except Exception as e: - raise Exception(f"Error: Could not coerce output '{output_id}' for task '{task_id}' to {input_type}. Details: {e.message}") - - else: - raise Exception("Invalid 'value_from' type: Must be oneOf type [env, args, task_output]") - - if k8senvvar_value == None: - source_key_error_message = "" - if k8senvvar_value_source_key: - source_key_error_message = f" at key '{k8senvvar_value_source_key}'" - raise Exception(f"Value for input {input_id} not found in source {k8senvvar_value_source}{source_key_error_message}") - - k8senvvars.append( - V1EnvVar( - name=f"{prefix}{input_id}", - value=k8senvvar_value - ), - ) - - return k8senvvars - def get_value_from_env(env, key): value = env.get(key, None) if value == None: return None