diff --git a/src/api/src/backend/views/ETLPipelines.py b/src/api/src/backend/views/ETLPipelines.py index c138fecd..37ac4f58 100644 --- a/src/api/src/backend/views/ETLPipelines.py +++ b/src/api/src/backend/views/ETLPipelines.py @@ -14,10 +14,9 @@ ServerError as ServerErrorResp, UnprocessableEntity ) -from backend.views.http.responses import BaseResponse, ResourceURLResponse +from backend.views.http.responses import ResourceURLResponse from backend.views.http.requests import ( Uses, - GitRepository, TemplateTask, TapisJobTask, TaskDependency diff --git a/src/engine/src/core/tasks/TaskExecutorFactory.py b/src/engine/src/core/tasks/TaskExecutorFactory.py index eedb881b..d52322a3 100644 --- a/src/engine/src/core/tasks/TaskExecutorFactory.py +++ b/src/engine/src/core/tasks/TaskExecutorFactory.py @@ -33,7 +33,7 @@ def build(self, task, ctx, exchange: EventExchange, plugins=[]): # No task executors found with for the provided task type so # raise an error raise InvalidTaskTypeError( - f"Task '{task.name}' uses task type '{task.type}' which does not exist.", + f"Task '{task.id}' uses task type '{task.type}' which does not exist.", hint=f"Update Task with id=={task.id} to have one of the following types: [image_build, container_run, request]", ) diff --git a/src/engine/src/core/workflows/executors/WorkflowExecutor.py b/src/engine/src/core/workflows/executors/WorkflowExecutor.py index 8d120178..3bb81986 100644 --- a/src/engine/src/core/workflows/executors/WorkflowExecutor.py +++ b/src/engine/src/core/workflows/executors/WorkflowExecutor.py @@ -1,4 +1,4 @@ -import os, logging +import os, logging, json from threading import Thread, Lock from uuid import uuid4 @@ -19,6 +19,7 @@ TASK_ACTIVE, TASK_COMPLETED, TASK_FAILED ) from helpers.GraphValidator import GraphValidator # From shared +from helpers.GitCacheService import GitCacheService from errors.tasks import ( InvalidTaskTypeError, MissingInitialTasksError, @@ -171,8 +172,8 @@ def _staging(self, ctx): # should be made via 'self.state.ctx' self._set_context(ctx) - # Prepare the file system for this pipeline - self._prepare_pipeline_fs() + # Prepare the file system for this pipeline and handle pipeline templating + self._prepare_pipeline() # Setup the server and the pipeline run loggers self._setup_loggers() @@ -385,7 +386,7 @@ def _set_tasks(self, tasks): invalid_deps += 1 invalid_deps_message = ( invalid_deps_message - + f"#{invalid_deps} An task cannot be dependent on itself: {task.id} | " + + f"#{invalid_deps} A task cannot be dependent on itself: {task.id} | " ) if dep.id not in task_ids: invalid_deps += 1 @@ -421,6 +422,42 @@ def _set_tasks(self, tasks): # Add all tasks to the queue self.state.queue = [ task for task in self.state.tasks ] + + @interceptable + def _prepare_pipeline(self): + # Create all of the directories needed for the pipeline to run and persist results and cache + self._prepare_pipeline_fs() + + # Clone git repository specified on the pipeline.uses if exists + git_cache_service = GitCacheService(cache_dir=self.state.ctx.pipeline.git_cache_dir) + if self.state.ctx.pipeline.uses != None: + git_cache_service.add_or_update( + self.state.ctx.pipeline.uses.source.url, + # NOTE Using the url as the directory to clone into is intentional + self.state.ctx.pipeline.uses.source.url + ) + + template_root_dir = os.path.join( + self.state.ctx.pipeline.git_cache_dir, + self.state.ctx.pipeline.uses.source.url, + ) + + try: + # Open the owe-config.json file + with open(os.path.join(template_root_dir, "owe-config.json")) as file: + owe_config = json.loads(file.read()) + + # Open the etl pipeline schema.json + with open( + os.path.join( + template_root_dir, + owe_config.get(self.state.ctx.pipeline.uses.name).get("path") + ) + ) as file: + pipeline_template = json.loads(file.read()) + except Exception as e: + raise Exception(f"Templating configuration Error (owe-config.json): {str(e)}") + @interceptable() def _prepare_pipeline_fs(self): @@ -434,6 +471,7 @@ def _prepare_pipeline_fs(self): self.state.ctx.pipeline.nfs_cache_dir = f"{self.state.ctx.pipeline.nfs_root_dir}cache/" self.state.ctx.pipeline.nfs_docker_cache_dir = f"{self.state.ctx.pipeline.nfs_cache_dir}docker" self.state.ctx.pipeline.nfs_singularity_cache_dir = f"{self.state.ctx.pipeline.nfs_cache_dir}singularity" + self.state.ctx.pipeline.nfs_git_cache_dir = f"{self.state.ctx.pipeline.nfs_cache_dir}git" self.state.ctx.pipeline.nfs_work_dir = f"{self.state.ctx.pipeline.nfs_root_dir}runs/{self.state.ctx.pipeline_run.uuid}/" # The pipeline root dir. All files and directories produced by a workflow @@ -453,6 +491,10 @@ def _prepare_pipeline_fs(self): self.state.ctx.pipeline.singularity_cache_dir = f"{self.state.ctx.pipeline.cache_dir}singularity" os.makedirs(f"{self.state.ctx.pipeline.singularity_cache_dir}", exist_ok=True) + # Create the github cache dir + self.state.ctx.pipeline.git_cache_dir = f"{self.state.ctx.pipeline.cache_dir}git" + os.makedirs(f"{self.state.ctx.pipeline.git_cache_dir}", exist_ok=True) + # The directory for this particular run of the workflow self.state.ctx.pipeline.work_dir = f"{self.state.ctx.pipeline.root_dir}runs/{self.state.ctx.pipeline_run.uuid}/" os.makedirs(self.state.ctx.pipeline.work_dir, exist_ok=True) @@ -629,7 +671,6 @@ def _set_initial_state(self): @interceptable() def _set_context(self, ctx): - # TODO validate the ctx here. Maybe pydantic self.state.ctx = ctx def _reset_event_exchange(self): diff --git a/src/engine/src/helpers/GitCacheService.py b/src/engine/src/helpers/GitCacheService.py new file mode 100644 index 00000000..79a3a511 --- /dev/null +++ b/src/engine/src/helpers/GitCacheService.py @@ -0,0 +1,24 @@ +import os + +import git + + +class GitCacheService: + def __init__(self, cache_dir): + self._cache_dir = cache_dir + + def add(self, url: str, directory: str): + git.Repo.clone_from(url, os.path.join(self._cache_dir, directory.lstrip("/"))) + + def repo_exists(self, path): + return os.path.exists(path) + + def update(self, directory): + git.cmd.Git(directory).pull() + + def add_or_update(self, url, directory): + if not self.repo_exists(directory): + self.add(url, directory) + return + + self.update(directory) \ No newline at end of file