Skip to content

Commit

Permalink
TemplateMapper creates new task or pipeline and maps properties to or…
Browse files Browse the repository at this point in the history
…iginal if missing
  • Loading branch information
nathandf committed Oct 18, 2023
1 parent a761c64 commit cad6345
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 8 deletions.
8 changes: 4 additions & 4 deletions src/engine/src/core/workflows/executors/WorkflowExecutor.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ def _prepare_tasks(self):
objects, prepares the file system for each task execution, handles task templating,
and generates and registers the task executors that will be called to perform the
work detailed in the task definition."""

self.state.ctx.output = {}
for task in self.state.ctx.pipeline.tasks:
# Create an execution_uuid for each task in the pipeline
task.execution_uuid = str(uuid4())
Expand Down Expand Up @@ -241,7 +241,7 @@ def _prepare_tasks(self):
task = template_mapper.map(task, task.uses)

# Add a key to the output for the task
self.state.ctx.output = {task.id: []}
self.state.ctx.output[task.id] = None

# Resolve and register the task executor
executor = factory.build(task, self.state.ctx, self.exchange, plugins=self._plugins)
Expand Down Expand Up @@ -284,10 +284,10 @@ def _start_task(self, task):

self.state.ctx.output = {
**self.state.ctx.output,
**task_result.output
**{task.id: task_result.output}
}
else:
task_result = TaskResult(0, data={"task": task.id})
task_result = TaskResult(0, output={task.id: None})
except InvalidTaskTypeError as e:
self.state.ctx.logger.error(str(e))
task_result = TaskResult(1, errors=[str(e)])
Expand Down
37 changes: 33 additions & 4 deletions src/engine/src/helpers/TemplateMapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,28 @@ def __init__(self, cache_dir: str):
"tapis_actor": TapisActorTask
}

def map(self, obj: Union[Pipeline, Task], uses: Uses):
def map(self, obj: Union[Pipeline, Task], uses: Uses) -> Union[Pipeline, Task]:
"""This method takes an object(Pipeline or Task object), and updates its
attributes with the values found in the templates
IMPORTANT NOTE:
The original object is modified and returned by this function, not a copy.
"""

# Clone git repository specified on the pipeline.uses if exists
template = TemplateRepository(uses, cache_dir=self.cache_dir)

# Resolve which class the final object should have
obj_class = Pipeline
if not issubclass(obj.__class__, Pipeline):
obj_class = self.task_map_by_type.get(obj.get("type"), None)

# Raise exception if no class could be resolved from the template
if obj_class == None:
raise Exception(f"Invalid Template: Unable to resolve object type from Template. Task template object 'type' property must be one of [{self.task_map_by_type.keys()}]")

dict_obj = obj.dict()

for attr in template.keys():
# For pipelines only. Skip the tasks property as they should be handled
# seperately in another call to the map method of the Template ampper
Expand All @@ -39,10 +57,21 @@ def map(self, obj: Union[Pipeline, Task], uses: Uses):
# For task only. The template should specify the correct type. For all other properties,
# that are not specified, we use that which in enumerated in the template
if attr == "type":
obj.type = template.get(attr)
dict_obj["type"] = template.get(attr)
continue

if obj.get(attr, None) == None:
dict_obj[attr] = template[attr]

new_obj = obj_class(**dict_obj)

for attr in vars(new_obj):
if attr == "tasks":
continue

if getattr(obj, attr) == None:
setattr(obj, attr, template[attr])
updated_value = getattr(new_obj, attr)
original_value = getattr(obj, attr)
if original_value != updated_value:
setattr(obj, attr, updated_value)

return obj

0 comments on commit cad6345

Please sign in to comment.