diff --git a/src/engine/src/contrib/tapis/middleware/event_handlers/archivers/TapisSystemArchiver.py b/src/engine/src/contrib/tapis/middleware/event_handlers/archivers/TapisSystemArchiver.py index b2bc25ea..78c29fe8 100644 --- a/src/engine/src/contrib/tapis/middleware/event_handlers/archivers/TapisSystemArchiver.py +++ b/src/engine/src/contrib/tapis/middleware/event_handlers/archivers/TapisSystemArchiver.py @@ -13,7 +13,7 @@ class TapisSystemArchiver(EventHandler): def handle(self, event: Event): if event.type in [PIPELINE_COMPLETED, PIPELINE_TERMINATED, PIPELINE_FAILED]: - event.payload.logger.info(f"[PIPELINE] {event.payload.pipeline.id} [ARCHIVING] {trunc_uuid(event.payload.pipeline.run_id)}") + event.payload.logger.info(f"[PIPELINE] {event.payload.pipeline.id} [ARCHIVING] {trunc_uuid(event.payload.pipeline_run.uuid)}") try: self.archive( event.payload.archive, @@ -22,13 +22,13 @@ def handle(self, event: Event): event.payload.logger ) except ArchiveError as e: - event.payload.logger.error(f"[PIPELINE] {event.payload.pipeline.id} [ERROR] {trunc_uuid(event.payload.pipeline.run_id)}: {e.message}") + event.payload.logger.error(f"[PIPELINE] {event.payload.pipeline.id} [ERROR] {trunc_uuid(event.payload.pipeline_run.uuid)}: {e.message}") return except Exception as e: - event.payload.logger.error(f"[PIPELINE] {event.payload.pipeline.id} [ERROR] {trunc_uuid(event.payload.pipeline.run_id)}: {e}") + event.payload.logger.error(f"[PIPELINE] {event.payload.pipeline.id} [ERROR] {trunc_uuid(event.payload.pipeline_run.uuid)}: {e}") return - event.payload.logger.info(f"[PIPELINE] {event.payload.pipeline.id} [ARCHIVING COMPLETED] {trunc_uuid(event.payload.pipeline.run_id)}") + event.payload.logger.info(f"[PIPELINE] {event.payload.pipeline.id} [ARCHIVING COMPLETED] {trunc_uuid(event.payload.pipeline_run.uuid)}") def archive(self, archive, pipeline, args, logger): diff --git a/src/engine/src/core/workflows/executors/WorkflowExecutor.py b/src/engine/src/core/workflows/executors/WorkflowExecutor.py index fad6f692..19eb049e 100644 --- a/src/engine/src/core/workflows/executors/WorkflowExecutor.py +++ b/src/engine/src/core/workflows/executors/WorkflowExecutor.py @@ -19,7 +19,8 @@ PIPELINE_STAGING, TASK_STAGING, TASK_ACTIVE, TASK_COMPLETED, TASK_FAILED ) from helpers.GraphValidator import GraphValidator # From shared -from helpers.GitCacheService import GitCacheService +from helpers.TemplateMapper import TemplateMapper +from helpers.TemplateRepository import TemplateRepository from errors.tasks import ( InvalidTaskTypeError, MissingInitialTasksError, @@ -123,8 +124,8 @@ def __init__(self, _id=None, plugins=[]): "executors": {}, "dependency_graph": {}, "is_dry_run": False, - "ctx": None, "ready_tasks": [], + "ctx": None, }, lock=self.lock ) @@ -168,14 +169,16 @@ def _staging(self, ctx): # Resets the workflow executor to its initial state self._set_initial_state() - # Validates and sets the context. All subsequent references to the context - # should be made via 'self.state.ctx' + # Sets the execution context to the ReactiveState of the WorkflowExecutor. + # All subsequent references to the context should be made via 'self.state.ctx' self._set_context(ctx) # Prepare the file system for this pipeline and handle pipeline templating self._prepare_pipeline() - # Publish the active event + # Publish the PIPELINE_STAGING event + # NOTE Events can only be published AFTER the '_prepare_pipeline' method is called + # because the directory structure in which the logs do not exists until it is called. self.publish(Event(PIPELINE_STAGING, self.state.ctx)) # Setup the server and the pipeline run loggers @@ -184,10 +187,7 @@ def _staging(self, ctx): # Prepare task objects and create the directory structure for task output and execution self._prepare_tasks() - # Set the run id - self.state.ctx.pipeline.run_id = self.state.ctx.pipeline_run.uuid - - self.state.ctx.logger.info(f'{self.p_str("STAGING")} {self.state.ctx.pipeline.run_id}') + self.state.ctx.logger.info(f'{self.p_str("STAGING")} {self.state.ctx.pipeline_run.uuid}') # Notification handlers are used to relay/persist updates about a pipeline run # and its tasks to some external resource @@ -205,8 +205,13 @@ def _staging(self, ctx): @interceptable() def _prepare_tasks(self): - # Create an execution_uuid for each task in the pipeline + """This function adds information about the pipeline context to the task + objects, prepares the file system for each task execution, handles task templating, + and generates and registers the task executors that will be called to perform the + work detailed in the task definition.""" + for task in self.state.ctx.pipeline.tasks: + # Create an execution_uuid for each task in the pipeline task.execution_uuid = str(uuid4()) # Paths to the workdir for the task inside the workflow engine container @@ -230,6 +235,11 @@ def _prepare_tasks(self): # Create the task's directories self._prepare_task_fs(task) + # Fetch task templates + if task.uses != None: + template_mapper = TemplateMapper(cache_dir=self.state.ctx.pipeline.git_cache_dir) + task = template_mapper.map(task, task.uses) + # Add a key to the output for the task self.state.ctx.output = {task.id: []} @@ -431,37 +441,13 @@ def _prepare_pipeline(self): # Create all of the directories needed for the pipeline to run and persist results and cache self._prepare_pipeline_fs() - # Clone git repository specified on the pipeline.uses if exists - git_cache_service = GitCacheService(cache_dir=self.state.ctx.pipeline.git_cache_dir) - if self.state.ctx.pipeline.uses != None: - git_cache_service.add_or_update( - self.state.ctx.pipeline.uses.source.url, - # NOTE Using the url as the directory to clone into is intentional - self.state.ctx.pipeline.uses.source.url - ) + template = TemplateRepository( + self.state.ctx.pipeline.uses, + cache_dir=self.state.ctx.pipeline.git_cache_dir + ) - template_root_dir = os.path.join( - self.state.ctx.pipeline.git_cache_dir, - self.state.ctx.pipeline.uses.source.url, - ) - - try: - # Open the owe-config.json file - with open(os.path.join(template_root_dir, "owe-config.json")) as file: - owe_config = json.loads(file.read()) - - # Open the etl pipeline schema.json - with open( - os.path.join( - template_root_dir, - owe_config.get(self.state.ctx.pipeline.uses.name).get("path") - ) - ) as file: - pipeline_template = json.loads(file.read()) - except Exception as e: - raise Exception(f"Templating configuration Error (owe-config.json): {str(e)}") + # TODO map the template props to the pipeline - @interceptable() def _prepare_pipeline_fs(self): """Creates all of the directories necessary to run the pipeline, store @@ -544,25 +530,25 @@ def _remove_from_queue(self, task): len(self.state.queue) == 0 or self.state.queue.pop(self.state.queue.index(task)) @interceptable() - def _register_executor(self, run_id, task, executor): - self.state.executors[f"{run_id}.{task.id}"] = executor + def _register_executor(self, run_uuid, task, executor): + self.state.executors[f"{run_uuid}.{task.id}"] = executor @interceptable() - def _get_executor(self, run_id, task, default=None): - return self.state.executors.get(f"{run_id}.{task.id}", None) + def _get_executor(self, run_uuid, task, default=None): + return self.state.executors.get(f"{run_uuid}.{task.id}", None) @interceptable() - def _deregister_executor(self, run_id, task): + def _deregister_executor(self, run_uuid, task): # Clean up the resources created by the task executor - executor = self._get_executor(run_id, task) + executor = self._get_executor(run_uuid, task) executor.cleanup() - del self.state.executors[f"{run_id}.{task.id}"] + del self.state.executors[f"{run_uuid}.{task.id}"] # TODO use server logger below # self.state.ctx.logger.debug(self.t_str(task, "EXECUTOR DEREGISTERED")) @interceptable() - def _get_executor(self, run_id, task): - return self.state.executors[f"{run_id}.{task.id}"] + def _get_executor(self, run_uuid, task): + return self.state.executors[f"{run_uuid}.{task.id}"] def _cleanup_run(self): # TODO use server logger below diff --git a/src/engine/src/helpers/TemplateMapper.py b/src/engine/src/helpers/TemplateMapper.py new file mode 100644 index 00000000..2c723989 --- /dev/null +++ b/src/engine/src/helpers/TemplateMapper.py @@ -0,0 +1,48 @@ +from typing import Union + +from .TemplateRepository import TemplateRepository +from ..owe_python_sdk.schema import ( + Uses, + Pipeline, + Task, + TemplateTask, + FunctionTask, + ApplicationTask, + ImageBuildTask, + RequestTask, + TapisJobTask, + TapisActorTask +) + +class TemplateMapper: + def __init__(self, cache_dir: str): + self.cache_dir = cache_dir + self.task_map_by_type = { + "function": FunctionTask, + "application": ApplicationTask, + "image_build": ImageBuildTask, + "request": RequestTask, + "tapis_job": TapisJobTask, + "tapis_actor": TapisActorTask + } + + def map(self, obj: Union[Pipeline, Task], uses: Uses): + # Clone git repository specified on the pipeline.uses if exists + template = TemplateRepository(uses, cache_dir=self.cache_dir) + + for attr in template.keys(): + # For pipelines only. Skip the tasks property as they should be handled + # seperately in another call to the map method of the Template ampper + if attr == "tasks": + continue + + # For task only. The template should specify the correct type. For all other properties, + # that are not specified, we use that which in enumerated in the template + if attr == "type": + obj.type = template.get(attr) + continue + + if getattr(obj, attr) == None: + setattr(obj, attr, template[attr]) + + return obj \ No newline at end of file diff --git a/src/engine/src/helpers/TemplateRepository.py b/src/engine/src/helpers/TemplateRepository.py new file mode 100644 index 00000000..c1379808 --- /dev/null +++ b/src/engine/src/helpers/TemplateRepository.py @@ -0,0 +1,34 @@ +import os,json + +from .GitCacheService import GitCacheService +from ..owe_python_sdk.schema import Uses + +def TemplateRepository(uses: Uses, cache_dir: str): + # Clone git repository specified on the pipeline.uses if exists + git_cache_service = GitCacheService(cache_dir=cache_dir) + + git_cache_service.add_or_update( + uses.source.url, + # NOTE Using the url as the directory to clone into is intentional + uses.source.url + ) + + template_root_dir = os.path.join(cache_dir, uses.source.url) + + try: + # Open the owe-config.json file + with open(os.path.join(template_root_dir, "owe-config.json")) as file: + owe_config = json.loads(file.read()) + + # Open the etl pipeline schema.json + with open( + os.path.join( + template_root_dir, + owe_config.get(uses.name).get("path") + ) + ) as file: + template = json.loads(file.read()) + except Exception as e: + raise Exception(f"Templating configuration Error (owe-config.json): {str(e)}") + + return template \ No newline at end of file