Skip to content

Commit

Permalink
handle for errors during conditional expression evaluation and operan…
Browse files Browse the repository at this point in the history
…d resoultion
  • Loading branch information
nathandf committed Dec 11, 2023
1 parent d85e878 commit 72c7375
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 33 deletions.
27 changes: 15 additions & 12 deletions src/engine/src/core/expressions/ConditionalExpressionEvaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
ConditionalExpression,
ConditionalExpressions
)

from core.expressions import OperandResolver
from errors.tasks import ConditionalExpressionEvalError, OperandResolutionError


class ConditionalExpressionEvaluator:
Expand All @@ -31,19 +31,22 @@ def evaluate_all(self, conditions: ConditionalExpressions):
return all(evaluations)

def evaluate(self, condition: ConditionalExpression):
operator = list(condition.keys())[0] # There will only ever be one key in a condition.
operands = condition[operator]
if operator in self._comparison_operators:
return self._comparison(operator, operands)
try:
operator = list(condition.keys())[0] # There will only ever be one key in a condition.
operands = condition[operator]
if operator in self._comparison_operators:
return self._comparison(operator, operands)

if operator in self._membership_operators:
return self._membership(operator, operands)

if operator == "not":
return not self.evaluate(operands)
if operator in self._membership_operators:
return self._membership(operator, operands)

if operator in self._logical_operators:
return self._logical(operator, operands)
if operator == "not":
return not self.evaluate(operands)

if operator in self._logical_operators:
return self._logical(operator, operands)
except OperandResolutionError as e:
raise ConditionalExpressionEvalError(e)

def _comparison(self, operator, operands):
resolved_operands = [
Expand Down
37 changes: 24 additions & 13 deletions src/engine/src/core/expressions/OperandResolver.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from owe_python_sdk.schema import Operand

from core.workflows import ValueFromService
from errors.tasks import OperandResolutionError


class OperandResolver:
Expand All @@ -17,20 +18,30 @@ def resolve(self, operand: Operand):
# NOTE all operands should have only 1 key
key = list(operand.keys())[0]
if key == "task_output":
value = self._value_from_service.get_task_output_value_by_id(
task_id=operand[key].task_id,
_id=operand[key].output_id
)
return value
try:
value = self._value_from_service.get_task_output_value_by_id(
task_id=operand[key].task_id,
_id=operand[key].output_id
)

return value
except Exception:
raise OperandResolutionError(f"No output found for task '{operand[key].task_id}' with output id of '{operand[key].output_id}'")

if key == "args":
value = self._value_from_service.get_arg_value_by_key(
operand[key]
)
return value
try:
value = self._value_from_service.get_arg_value_by_key(
operand[key]
)
return value
except Exception:
raise OperandResolutionError(f"Error attempting to fetch value from args at key '{key}'")

if key == "env":
value = self._value_from_service.get_env_value_by_key(
operand[key]
)
return value
try:
value = self._value_from_service.get_env_value_by_key(
operand[key]
)
return value
except Exception:
raise OperandResolutionError(f"Error attempting to fetch value from env at key '{key}'")
24 changes: 16 additions & 8 deletions src/engine/src/core/workflows/WorkflowExecutor.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
MissingInitialTasksError,
InvalidDependenciesError,
CycleDetectedError,
ConditionalExpressionEvalError
)
from core.middleware.archivers import S3Archiver, IRODSArchiver
from conf.constants import BASE_WORK_DIR
Expand Down Expand Up @@ -298,26 +299,33 @@ def _prepare_task_fs(self, task):

@interceptable()
def _start_task(self, task):
# Check if any of the previous task was skipped. If any were and the current
# task's dependency specifies a can_skip to False for any of the skipped tasks,
# Check if any of the previous tasks were skipped. If yes, and the current
# task's dependency specifies a can_skip == False for any of the skipped tasks,
# this task will also be skipped
skip = False
for dep in task.depends_on:
if dep.id in self.state.skipped and dep.can_skip == False:
skip = True
break

# Default TaskResult is a task skipped. Will be overwritten if task not skipped
task_result = TaskResult(-1)

# Determine if the task should be skipped
error = None
if not skip:
# Evaluate the task's conditions if the previous task was not skipped
evaluator = self.container.load("ConditionalExpressionEvaluator")
skip = not evaluator.evaluate_all(task.conditions)
try:
# Evaluate the task's conditions if the previous task was not skipped
evaluator = self.container.load("ConditionalExpressionEvaluator")
skip = not evaluator.evaluate_all(task.conditions)
except ConditionalExpressionEvalError as e:
error = e
self.state.ctx.logger.error(e)
task_result = TaskResult(0, errors=[e])

# Default TaskResult is a task skipped. Will be overwritten if task not skipped
task_result = TaskResult(-1)

# Execute the task
if not skip:
if not skip and error == None:
# Log the task status
self.state.ctx.logger.info(self.t_str(task, "ACTIVE"))

Expand Down
6 changes: 6 additions & 0 deletions src/engine/src/errors/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,9 @@ class CycleDetectedError(WorkflowsBaseException):

class FailedTaskError(WorkflowsBaseException):
pass

class OperandResolutionError(WorkflowsBaseException):
pass

class ConditionalExpressionEvalError(WorkflowsBaseException):
pass

0 comments on commit 72c7375

Please sign in to comment.