diff --git a/src/engine/src/core/tasks/executors/Function.py b/src/engine/src/core/tasks/executors/Function.py index a6347889..7384448f 100644 --- a/src/engine/src/core/tasks/executors/Function.py +++ b/src/engine/src/core/tasks/executors/Function.py @@ -1,4 +1,4 @@ -import os, base64, time, shutil +import os, base64, time, shutil, inspect from kubernetes import client @@ -13,6 +13,7 @@ from core.resources import JobResource from utils import get_flavor from utils.k8s import flavor_to_k8s_resource_reqs, input_to_k8s_env_vars, gen_resource_name +from helpers import function_bootstrap from helpers.GitCacheService import GitCacheService from errors import WorkflowTerminated @@ -209,19 +210,21 @@ def _setup_python_container(self): # Create entrypoint file that will be mounted into the container via NFS mount. # The code provided in the request is expected to be base64 encoded. Decode, then # encode in UTF-8 - env = [] - entrypoint_filename = self.task.entrypoint.lstrip("/") - if self.task.entrypoint == None: - entrypoint_filename = "entrypoint.py" - local_entrypoint_file_path = os.path.join(self.task.exec_dir, entrypoint_filename) - self._write_entrypoint_file(local_entrypoint_file_path, self.task.code) - - env.append( - client.V1EnvVar( + entrypoint_filename = "entrypoint.py" + local_entrypoint_file_path = os.path.join(self.task.exec_dir, entrypoint_filename) + entrypoint_env_var = client.V1EnvVar( + name="_OWE_ENTRYPOINT_FILE_PATH", + value=os.path.join(self.task.container_exec_dir, entrypoint_filename) + ) + if self.task.entrypoint != None: + self.task.code = base64.encode(inspect.getsource(function)) + entrypoint_env_var = client.V1EnvVar( name="_OWE_ENTRYPOINT_FILE_PATH", - value=os.path.join(self.task.container_exec_dir, entrypoint_filename) + value=os.path.join(self.task.container_exec_dir, self.task.entrypoint.lstrip("/")) ) - ) + + + self._write_entrypoint_file(local_entrypoint_file_path, self.task.code) # Create requirements file that will be mounted into the functions container # via NFS mount. This file will be used with the specified installer to install @@ -262,12 +265,12 @@ def _setup_python_container(self): owe_python_sdk_local_path = os.path.join(self.task.work_dir, "src/owe_python_sdk") shutil.copytree(OWE_PYTHON_SDK_DIR, owe_python_sdk_local_path, dirs_exist_ok=True) - # entrypoint_cmd = f"python3 {entrypoint_py} 2> {stderr} 1> {stdout}" - entrypoint_cmd = f"sleep 60m" + entrypoint_cmd = f"python3 {entrypoint_py} 2> {stderr} 1> {stdout}" args = [f"{install_cmd} {entrypoint_cmd}"] return ContainerDetails( image=self.task.runtime, command=command, - args=args + args=args, + env=[entrypoint_env_var] ) \ No newline at end of file diff --git a/src/engine/src/core/workflows/executors/WorkflowExecutor.py b/src/engine/src/core/workflows/executors/WorkflowExecutor.py index cd9b851b..2f03f1db 100644 --- a/src/engine/src/core/workflows/executors/WorkflowExecutor.py +++ b/src/engine/src/core/workflows/executors/WorkflowExecutor.py @@ -230,9 +230,9 @@ def _prepare_tasks(self): # Paths to the workdir for the task inside the job container task.container_work_dir = "/mnt/open-workflow-engine/pipeline/task" - task.container_exec_dir = f"{task.container_work_dir}/src" - task.container_input_dir = f"{task.container_work_dir}/input" - task.container_output_dir = f"{task.container_work_dir}/output" + task.container_exec_dir = f"{task.container_work_dir}/src/" + task.container_input_dir = f"{task.container_work_dir}/input/" + task.container_output_dir = f"{task.container_work_dir}/output/" # Paths to the workdir inside the nfs-server container task.nfs_work_dir = f"{self.state.ctx.pipeline.nfs_work_dir}{task.id}/" diff --git a/src/engine/src/helpers/function_bootstrap.py b/src/engine/src/helpers/function_bootstrap.py new file mode 100644 index 00000000..467efe0e --- /dev/null +++ b/src/engine/src/helpers/function_bootstrap.py @@ -0,0 +1,5 @@ +import os, sys + + +sys.path.append("/mnt/open-workflow-engine/pipeline/task/src/owe_python_sdk") +exec(open(os.envrion.get("_OWE_ENTRYPOINT_FILE_PATH")).read()) \ No newline at end of file