Skip to content

Commit

Permalink
add GitCacheService
Browse files Browse the repository at this point in the history
  • Loading branch information
nathandf committed Oct 16, 2023
1 parent 926a6fb commit 4cae145
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 8 deletions.
3 changes: 1 addition & 2 deletions src/api/src/backend/views/ETLPipelines.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,9 @@
ServerError as ServerErrorResp,
UnprocessableEntity
)
from backend.views.http.responses import BaseResponse, ResourceURLResponse
from backend.views.http.responses import ResourceURLResponse
from backend.views.http.requests import (
Uses,
GitRepository,
TemplateTask,
TapisJobTask,
TaskDependency
Expand Down
2 changes: 1 addition & 1 deletion src/engine/src/core/tasks/TaskExecutorFactory.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def build(self, task, ctx, exchange: EventExchange, plugins=[]):
# No task executors found with for the provided task type so
# raise an error
raise InvalidTaskTypeError(
f"Task '{task.name}' uses task type '{task.type}' which does not exist.",
f"Task '{task.id}' uses task type '{task.type}' which does not exist.",
hint=f"Update Task with id=={task.id} to have one of the following types: [image_build, container_run, request]",
)

Expand Down
51 changes: 46 additions & 5 deletions src/engine/src/core/workflows/executors/WorkflowExecutor.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import os, logging
import os, logging, json

from threading import Thread, Lock
from uuid import uuid4
Expand All @@ -19,6 +19,7 @@
TASK_ACTIVE, TASK_COMPLETED, TASK_FAILED
)
from helpers.GraphValidator import GraphValidator # From shared
from helpers.GitCacheService import GitCacheService
from errors.tasks import (
InvalidTaskTypeError,
MissingInitialTasksError,
Expand Down Expand Up @@ -171,8 +172,8 @@ def _staging(self, ctx):
# should be made via 'self.state.ctx'
self._set_context(ctx)

# Prepare the file system for this pipeline
self._prepare_pipeline_fs()
# Prepare the file system for this pipeline and handle pipeline templating
self._prepare_pipeline()

# Setup the server and the pipeline run loggers
self._setup_loggers()
Expand Down Expand Up @@ -385,7 +386,7 @@ def _set_tasks(self, tasks):
invalid_deps += 1
invalid_deps_message = (
invalid_deps_message
+ f"#{invalid_deps} An task cannot be dependent on itself: {task.id} | "
+ f"#{invalid_deps} A task cannot be dependent on itself: {task.id} | "
)
if dep.id not in task_ids:
invalid_deps += 1
Expand Down Expand Up @@ -421,6 +422,42 @@ def _set_tasks(self, tasks):

# Add all tasks to the queue
self.state.queue = [ task for task in self.state.tasks ]

@interceptable
def _prepare_pipeline(self):
# Create all of the directories needed for the pipeline to run and persist results and cache
self._prepare_pipeline_fs()

# Clone git repository specified on the pipeline.uses if exists
git_cache_service = GitCacheService(cache_dir=self.state.ctx.pipeline.git_cache_dir)
if self.state.ctx.pipeline.uses != None:
git_cache_service.add_or_update(
self.state.ctx.pipeline.uses.source.url,
# NOTE Using the url as the directory to clone into is intentional
self.state.ctx.pipeline.uses.source.url
)

template_root_dir = os.path.join(
self.state.ctx.pipeline.git_cache_dir,
self.state.ctx.pipeline.uses.source.url,
)

try:
# Open the owe-config.json file
with open(os.path.join(template_root_dir, "owe-config.json")) as file:
owe_config = json.loads(file.read())

# Open the etl pipeline schema.json
with open(
os.path.join(
template_root_dir,
owe_config.get(self.state.ctx.pipeline.uses.name).get("path")
)
) as file:
pipeline_template = json.loads(file.read())
except Exception as e:
raise Exception(f"Templating configuration Error (owe-config.json): {str(e)}")


@interceptable()
def _prepare_pipeline_fs(self):
Expand All @@ -434,6 +471,7 @@ def _prepare_pipeline_fs(self):
self.state.ctx.pipeline.nfs_cache_dir = f"{self.state.ctx.pipeline.nfs_root_dir}cache/"
self.state.ctx.pipeline.nfs_docker_cache_dir = f"{self.state.ctx.pipeline.nfs_cache_dir}docker"
self.state.ctx.pipeline.nfs_singularity_cache_dir = f"{self.state.ctx.pipeline.nfs_cache_dir}singularity"
self.state.ctx.pipeline.nfs_git_cache_dir = f"{self.state.ctx.pipeline.nfs_cache_dir}git"
self.state.ctx.pipeline.nfs_work_dir = f"{self.state.ctx.pipeline.nfs_root_dir}runs/{self.state.ctx.pipeline_run.uuid}/"

# The pipeline root dir. All files and directories produced by a workflow
Expand All @@ -453,6 +491,10 @@ def _prepare_pipeline_fs(self):
self.state.ctx.pipeline.singularity_cache_dir = f"{self.state.ctx.pipeline.cache_dir}singularity"
os.makedirs(f"{self.state.ctx.pipeline.singularity_cache_dir}", exist_ok=True)

# Create the github cache dir
self.state.ctx.pipeline.git_cache_dir = f"{self.state.ctx.pipeline.cache_dir}git"
os.makedirs(f"{self.state.ctx.pipeline.git_cache_dir}", exist_ok=True)

# The directory for this particular run of the workflow
self.state.ctx.pipeline.work_dir = f"{self.state.ctx.pipeline.root_dir}runs/{self.state.ctx.pipeline_run.uuid}/"
os.makedirs(self.state.ctx.pipeline.work_dir, exist_ok=True)
Expand Down Expand Up @@ -629,7 +671,6 @@ def _set_initial_state(self):

@interceptable()
def _set_context(self, ctx):
# TODO validate the ctx here. Maybe pydantic
self.state.ctx = ctx

def _reset_event_exchange(self):
Expand Down
24 changes: 24 additions & 0 deletions src/engine/src/helpers/GitCacheService.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import os

import git


class GitCacheService:
def __init__(self, cache_dir):
self._cache_dir = cache_dir

def add(self, url: str, directory: str):
git.Repo.clone_from(url, os.path.join(self._cache_dir, directory.lstrip("/")))

def repo_exists(self, path):
return os.path.exists(path)

def update(self, directory):
git.cmd.Git(directory).pull()

def add_or_update(self, url, directory):
if not self.repo_exists(directory):
self.add(url, directory)
return

self.update(directory)

0 comments on commit 4cae145

Please sign in to comment.