Skip to content

Commit

Permalink
use git cache instead of clone jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
nathandf committed Oct 23, 2023
1 parent 7291168 commit 30a0551
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 99 deletions.
103 changes: 10 additions & 93 deletions src/engine/src/core/tasks/executors/Function.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from core.resources import JobResource
from utils import get_flavor
from utils.k8s import flavor_to_k8s_resource_reqs, input_to_k8s_env_vars, gen_resource_name
from helpers.GitCacheService import GitCacheService
from errors import WorkflowTerminated


Expand Down Expand Up @@ -52,16 +53,16 @@ def __init__(self, task, ctx, exchange, plugins=[]):

def execute(self):
job_name = gen_resource_name(prefix="fn")

# Prepares the file system for the Function task by clone
# git repsoitories specified by the user in the request
init_jobs_success = True
if len(self.task.git_repositories) > 0:
init_jobs_task_result = self._run_git_clone_jobs(job_name)
init_jobs_success = init_jobs_task_result.success

if not init_jobs_success:
return init_jobs_task_result
# git repsoitories specified in the request
try:
for repo in self.task.git_respositories:
git_cache_service = GitCacheService(
cache_dir=os.path.join(self.task.exec_dir, repo.directory)
)
git_cache_service.add(repo.url, branch=repo.branch)
except Exception as e:
return self._task_result(1, errors=[str(e)])

# Set up the container details for the task's specified runtime
container_details = self._setup_container()
Expand Down Expand Up @@ -141,90 +142,6 @@ def execute(self):

return self._task_result(0 if self._job_succeeded(job) else 1)

def _run_git_clone_jobs(self, job_name):
job_name = job_name.replace("wf-fn", "wf-fn-init")
for i, repo in enumerate(self.task.git_repositories):
# Create the command for the container. Add the branch to
# the command if specified
command = ["git", "clone"]
if repo.branch != None: command += ["-b", repo.branch]

command += [
repo.url,
os.path.join("src", repo.directory)
]

try:
job = self.batch_v1_api.create_namespaced_job(
namespace=KUBERNETES_NAMESPACE,
body=client.V1Job(
metadata=client.V1ObjectMeta(
labels=dict(job=job_name + str(i)),
name=job_name + str(i),
namespace=KUBERNETES_NAMESPACE,
),
spec=client.V1JobSpec(
backoff_limit=0,
template=client.V1PodTemplateSpec(
spec=client.V1PodSpec(
containers=[
client.V1Container(
name=job_name + str(i),
image="alpine/git:latest",
command=command,
volume_mounts=[
client.V1VolumeMount(
name="task-workdir",
mount_path=self.task.container_work_dir
)
],
working_dir=self.task.container_work_dir,
resources=flavor_to_k8s_resource_reqs(get_flavor("c1tiny"))
)
],
restart_policy="Never",
volumes=[
client.V1Volume(
name="task-workdir",
nfs=client.V1NFSVolumeSource(
server=WORKFLOW_NFS_SERVER,
path=self.task.nfs_work_dir
),
)
]
)
)
)
)
)
# Register the job to be deleted after execution
self._register_resource(JobResource(job=job))
except Exception as e:
self.ctx.logger.error(e)
self._stderr(str(e), "w")
return self._task_result(1, errors=[str(e)])

try:
while not self._job_in_terminal_state(job):
if self.terminating:
self.ctx.logger.error("Workflow Terminated")
self.cleanup(terminating=True)
self._stderr("Workflow Terminated", "w")
return self._task_result(2, errors=["Workflow Terminated"])

job = self.batch_v1_api.read_namespaced_job(
job.metadata.name,
KUBERNETES_NAMESPACE
)

time.sleep(self.polling_interval)
except Exception as e:
self.ctx.logger.error(str(e))
self._stderr(str(e), "w")
return self._task_result(1, errors=[str(e)])

return self._task_result(0 if self._job_succeeded(job) else 1)

def _setup_container(self) -> ContainerDetails:
if self.task.runtime in self.runtimes["python"]:
container_details = self._setup_python_container()
Expand Down
15 changes: 9 additions & 6 deletions src/engine/src/helpers/GitCacheService.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,21 @@ class GitCacheService:
def __init__(self, cache_dir):
self._cache_dir = cache_dir

def add(self, url: str, directory: str):
git.Repo.clone_from(url, os.path.join(self._cache_dir, directory.lstrip("/")))
def add(self, url: str, directory: str, branch=None):
kwargs = {}
if branch != None:
kwargs["branch"] = branch
git.Repo.clone_from(url, os.path.join(self._cache_dir, directory.lstrip("/")), **kwargs)

def repo_exists(self, path):
return os.path.exists(os.path.join(self._cache_dir, path.lstrip("/")))

def update(self, directory):
def update(self, directory, branch=None):
git.cmd.Git(os.path.join(self._cache_dir, directory.lstrip("/"))).pull()

def add_or_update(self, url, directory):
def add_or_update(self, url, directory, branch=None):
if not self.repo_exists(directory):
self.add(url, directory)
self.add(url, directory, branch)
return

self.update(directory)
self.update(directory, branch)

0 comments on commit 30a0551

Please sign in to comment.