Skip to content

Commit

Permalink
bootstrap user defined function task entrypoint to fix sdk import issues
Browse files Browse the repository at this point in the history
  • Loading branch information
nathandf committed Oct 26, 2023
1 parent ffe3a9f commit 5f0e434
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 18 deletions.
33 changes: 18 additions & 15 deletions src/engine/src/core/tasks/executors/Function.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import os, base64, time, shutil
import os, base64, time, shutil, inspect

from kubernetes import client

Expand All @@ -13,6 +13,7 @@
from core.resources import JobResource
from utils import get_flavor
from utils.k8s import flavor_to_k8s_resource_reqs, input_to_k8s_env_vars, gen_resource_name
from helpers import function_bootstrap
from helpers.GitCacheService import GitCacheService
from errors import WorkflowTerminated

Expand Down Expand Up @@ -209,19 +210,21 @@ def _setup_python_container(self):
# Create entrypoint file that will be mounted into the container via NFS mount.
# The code provided in the request is expected to be base64 encoded. Decode, then
# encode in UTF-8
env = []
entrypoint_filename = self.task.entrypoint.lstrip("/")
if self.task.entrypoint == None:
entrypoint_filename = "entrypoint.py"
local_entrypoint_file_path = os.path.join(self.task.exec_dir, entrypoint_filename)
self._write_entrypoint_file(local_entrypoint_file_path, self.task.code)

env.append(
client.V1EnvVar(
entrypoint_filename = "entrypoint.py"
local_entrypoint_file_path = os.path.join(self.task.exec_dir, entrypoint_filename)
entrypoint_env_var = client.V1EnvVar(
name="_OWE_ENTRYPOINT_FILE_PATH",
value=os.path.join(self.task.container_exec_dir, entrypoint_filename)
)
if self.task.entrypoint != None:
self.task.code = base64.encode(inspect.getsource(function))
entrypoint_env_var = client.V1EnvVar(
name="_OWE_ENTRYPOINT_FILE_PATH",
value=os.path.join(self.task.container_exec_dir, entrypoint_filename)
value=os.path.join(self.task.container_exec_dir, self.task.entrypoint.lstrip("/"))
)
)


self._write_entrypoint_file(local_entrypoint_file_path, self.task.code)

# Create requirements file that will be mounted into the functions container
# via NFS mount. This file will be used with the specified installer to install
Expand Down Expand Up @@ -262,12 +265,12 @@ def _setup_python_container(self):
owe_python_sdk_local_path = os.path.join(self.task.work_dir, "src/owe_python_sdk")
shutil.copytree(OWE_PYTHON_SDK_DIR, owe_python_sdk_local_path, dirs_exist_ok=True)

# entrypoint_cmd = f"python3 {entrypoint_py} 2> {stderr} 1> {stdout}"
entrypoint_cmd = f"sleep 60m"
entrypoint_cmd = f"python3 {entrypoint_py} 2> {stderr} 1> {stdout}"
args = [f"{install_cmd} {entrypoint_cmd}"]

return ContainerDetails(
image=self.task.runtime,
command=command,
args=args
args=args,
env=[entrypoint_env_var]
)
6 changes: 3 additions & 3 deletions src/engine/src/core/workflows/executors/WorkflowExecutor.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,9 +230,9 @@ def _prepare_tasks(self):

# Paths to the workdir for the task inside the job container
task.container_work_dir = "/mnt/open-workflow-engine/pipeline/task"
task.container_exec_dir = f"{task.container_work_dir}/src"
task.container_input_dir = f"{task.container_work_dir}/input"
task.container_output_dir = f"{task.container_work_dir}/output"
task.container_exec_dir = f"{task.container_work_dir}/src/"
task.container_input_dir = f"{task.container_work_dir}/input/"
task.container_output_dir = f"{task.container_work_dir}/output/"

# Paths to the workdir inside the nfs-server container
task.nfs_work_dir = f"{self.state.ctx.pipeline.nfs_work_dir}{task.id}/"
Expand Down
5 changes: 5 additions & 0 deletions src/engine/src/helpers/function_bootstrap.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
import os, sys


sys.path.append("/mnt/open-workflow-engine/pipeline/task/src/owe_python_sdk")
exec(open(os.envrion.get("_OWE_ENTRYPOINT_FILE_PATH")).read())

0 comments on commit 5f0e434

Please sign in to comment.