Skip to content

Commit

Permalink
added HasMinorErrors exception as requested on Allow RushTI to ignore…
Browse files Browse the repository at this point in the history
… minor errors on OPT task type #86
  • Loading branch information
nicolasbisurgi authored and MariusWirtz committed Nov 1, 2024
1 parent f230aed commit ef04c6e
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 35 deletions.
66 changes: 35 additions & 31 deletions rushti.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,13 +185,15 @@ def extract_task_from_line(line: str, task_class: Union[Type[Task], Type[Optimiz
task_id = line_arguments.pop("id")
predecessors = line_arguments.pop("predecessors", [])
require_predecessor_success = line_arguments.pop("require_predecessor_success", False)
succeed_on_minor_errors = line_arguments.pop("succeed_on_minor_errors", False)

return OptimizedTask(
task_id=task_id,
instance_name=line_arguments.pop("instance"),
process_name=line_arguments.pop("process"),
predecessors=predecessors,
require_predecessor_success=require_predecessor_success,
succeed_on_minor_errors=succeed_on_minor_errors,
parameters=line_arguments)
else:
return Task(
Expand All @@ -201,32 +203,30 @@ def extract_task_from_line(line: str, task_class: Union[Type[Task], Type[Optimiz

def parse_line_arguments(line: str) -> Dict[str, Any]:
line_arguments = {}
line = line.replace("\\", UNIQUE_STRING)

parts = shlex.split(line, posix=False)
# Use shlex to split the line with posix=True for proper escaping
parts = shlex.split(line, posix=True)

for part in parts:
if '=' not in part:
continue

# Split on the first '=' to get argument and value
argument, value = part.split('=', 1)
argument = argument.replace(UNIQUE_STRING, "\\")
value = value.replace(UNIQUE_STRING, "\\")

if argument.lower() in ["process", "instance", "id"]:
line_arguments[argument.lower()] = value.strip('"')
elif argument.lower() == "require_predecessor_success":
line_arguments[argument] = value.strip('"').lower() in TRUE_VALUES
elif argument.lower() == "predecessors":
predecessors = value.strip('"').split(",")

# Handle specific keys with logic
key_lower = argument.lower()
if key_lower in ["process", "instance", "id"]:
line_arguments[key_lower] = value
elif key_lower == "require_predecessor_success":
line_arguments[argument] = value.lower() in TRUE_VALUES
elif key_lower == "predecessors":
predecessors = value.split(",")
line_arguments[argument] = [] if predecessors[0] in ["", "0", 0] else predecessors
elif key_lower == "succeed_on_minor_errors":
line_arguments[argument] = value.lower() in TRUE_VALUES
else:
# Remove surrounding double quotes if present
if value.startswith('"') and value.endswith('"'):
value = value[1:-1]
# Unescape quotes within the value
value = value.replace('\\"', '"')
# Replace double backslashes with single backslashes
value = value.replace("\\\\", "\\")
# Store the argument-value pair as is
line_arguments[argument] = value

return line_arguments
Expand Down Expand Up @@ -255,7 +255,8 @@ def expand_task(
if isinstance(task, OptimizedTask):
result.append(OptimizedTask(task.id, task.instance_name, task.process_name, parameters=expanded_params,
predecessors=task.predecessors,
require_predecessor_success=task.require_predecessor_success))
require_predecessor_success=task.require_predecessor_success,
succeed_on_minor_errors = task.succeed_on_minor_errors))
elif isinstance(task, Task):
result.append(Task(task.instance_name, task.process_name, parameters=expanded_params))
return result
Expand Down Expand Up @@ -474,25 +475,28 @@ def get_ordered_tasks_and_waits(


def execute_process_with_retries(tm1: TM1Service, task: Task, retries: int):
attempt = 0
while attempt <= retries:
for attempt in range(retries + 1):
try:
# process runs and returns either success = True or False
# Execute the process and unpack results
success, status, error_log_file = tm1.processes.execute_with_return(
process_name=task.process_name,
process_name=task.process_name,
**task.parameters)

# Handle minor errors for OptimizedTask
if isinstance(task, OptimizedTask) and not success and task.succeed_on_minor_errors and status == 'HasMinorErrors':
success = True
logging.debug(f"{task.id} returned {status} but has succeed_on_minor_errors={task.succeed_on_minor_errors}")

if success:
return success, status, error_log_file, attempt

except Exception as e:
# process could not be executed (e.g. doesn't exist)
# If last attempt, then raise exception
if attempt == retries:
# Raise exception on the final attempt
raise e
else:
if success:
break
finally:
attempt += 1

return success, status, error_log_file, attempt - 1
# If all retries fail
return False, status, error_log_file, retries


def update_task_execution_results(func):
Expand Down
3 changes: 3 additions & 0 deletions tests/resources/tasks_opt_succeed_on_minors_error.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
id="1" predecessors="" require_predecessor_success="1" succeed_on_minor_errors="0" instance="DEVOPS_DEV" process="rushti.process.return" pReturnCode=0
id="2" predecessors="1" require_predecessor_success="1" succeed_on_minor_errors="1" instance="DEVOPS_DEV" process="rushti.process.return" pReturnCode=1
id="3" predecessors="2" require_predecessor_success="1" instance="DEVOPS_DEV" process="rushti.process.return" pReturnCode=0
53 changes: 52 additions & 1 deletion tests/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ def test_backslashes(self):
self.assertEqual(result, expected)

def test_complex_nested_quotes(self):
line = 'instance=tm1 process=process1 param1="outer \\"inner \\\\"deepest\\\\" inner\\" outer"'
line = r'instance=tm1 process=process1 param1="outer \"inner \\\"deepest\\\" inner\" outer"'
result = parse_line_arguments(line)
expected = {
'instance': 'tm1',
Expand All @@ -187,5 +187,56 @@ def test_predecessors_and_require_predecessor_success(self):
}
self.assertEqual(result, expected)


class TestSucceedOnMinorErrors(unittest.TestCase):
def test_default_value(self):
line = 'id=1 instance=tm1 process=process1 predecessors="2,3,4" require_predecessor_success="1"'
result = parse_line_arguments(line)
expected = {
'id': '1',
'instance': 'tm1',
'process': 'process1',
'predecessors': ['2', '3', '4'],
'require_predecessor_success': True
}
self.assertEqual(result, expected)

def test_explicit_false_value(self):
line = 'id=1 instance=tm1 process=process1 predecessors="2,3,4" require_predecessor_success="1" succeed_on_minor_errors="0"'
result = parse_line_arguments(line)
expected = {
'id': '1',
'instance': 'tm1',
'process': 'process1',
'predecessors': ['2', '3', '4'],
'require_predecessor_success': True,
'succeed_on_minor_errors': False
}
self.assertEqual(result, expected)

def test_explicit_true_value(self):
line = 'id=1 instance=tm1 process=process1 predecessors="" require_predecessor_success="" succeed_on_minor_errors="1"'
result = parse_line_arguments(line)
expected = {
'id': '1',
'instance': 'tm1',
'process': 'process1',
'predecessors': [],
'require_predecessor_success': False,
'succeed_on_minor_errors': True
}
self.assertEqual(result, expected)

def test_line_translation_with_succeed_on_minor_errors(self):
task = OptimizedTask("1", "tm1srv01", "process1", {"param1": "value1"}, [], False, succeed_on_minor_errors=True)
expected_line = 'id="1" predecessors="" require_predecessor_success="False" succeed_on_minor_errors="True" instance="tm1srv01" process="process1" param1="value1"\n'
self.assertEqual(task.translate_to_line(), expected_line)

def test_line_translation_without_succeed_on_minor_errors(self):
task = OptimizedTask("1", "tm1srv01", "process1", {"param1": "value1"}, [2,3,4], False)
expected_line = 'id="1" predecessors="2,3,4" require_predecessor_success="False" succeed_on_minor_errors="False" instance="tm1srv01" process="process1" param1="value1"\n'
self.assertEqual(task.translate_to_line(), expected_line)


if __name__ == '__main__':
unittest.main()
10 changes: 7 additions & 3 deletions utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,13 @@ def translate_to_line(self):
class OptimizedTask(Task):
def __init__(self, task_id: str, instance_name: str, process_name: str, parameters: Dict[str, Any],
predecessors: List,
require_predecessor_success: bool):
require_predecessor_success: bool,
succeed_on_minor_errors: bool = False):
super().__init__(instance_name, process_name, parameters)
self.id = task_id
self.predecessors = predecessors
self.require_predecessor_success = require_predecessor_success
self.succeed_on_minor_errors = succeed_on_minor_errors
self.successors = list()

@property
Expand All @@ -65,11 +67,13 @@ def has_predecessors(self):
def has_successors(self):
return len(self.successors) > 0


def translate_to_line(self):
return 'id="{id}" predecessors="{predecessors}" require_predecessor_success="{require_predecessor_success}" instance="{instance}" process="{process}" {parameters}\n'.format(
return 'id="{id}" predecessors="{predecessors}" require_predecessor_success="{require_predecessor_success}" succeed_on_minor_errors="{succeed_on_minor_errors}" instance="{instance}" process="{process}" {parameters}\n'.format(
id=self.id,
predecessors=self.predecessors,
predecessors=",".join(map(str, self.predecessors)),
require_predecessor_success=self.require_predecessor_success,
succeed_on_minor_errors=self.succeed_on_minor_errors,
instance=self.instance_name,
process=self.process_name,
parameters=' '.join('{}="{}"'.format(parameter, value) for parameter, value in self.parameters.items()))
Expand Down

0 comments on commit ef04c6e

Please sign in to comment.