diff --git a/rushti.py b/rushti.py index e921b97..ce75a79 100644 --- a/rushti.py +++ b/rushti.py @@ -14,7 +14,7 @@ from itertools import product from logging.config import fileConfig from pathlib import Path -from typing import List, Union, Dict, Tuple +from typing import List, Union, Dict, Tuple, Type, Any import keyring @@ -170,31 +170,66 @@ def decrypt_password(encrypted_password: str) -> str: def extract_task_or_wait_from_line(line: str) -> Union[Task, Wait]: - """ Translate one line from txt file into arguments for execution: instance, process, parameters - :param line: Arguments for execution. E.g. instance="tm1srv01" process="Bedrock.Server.Wait" pWaitSec=2 - :return: instance_name, process_name, parameters - """ if line.strip().lower() == 'wait': return Wait() - line_arguments = dict() + return extract_task_from_line(line, task_class=Task) + +def extract_task_from_line_type_opt(line: str) -> OptimizedTask: + return extract_task_from_line(line, task_class=OptimizedTask) + +def extract_task_from_line(line: str, task_class: Union[Type[Task], Type[OptimizedTask]]) -> Union[Task, OptimizedTask]: + line_arguments = parse_line_arguments(line) + + if task_class == OptimizedTask: + task_id = line_arguments.pop("id") + predecessors = line_arguments.pop("predecessors", []) + require_predecessor_success = line_arguments.pop("require_predecessor_success", False) + + return OptimizedTask( + task_id=task_id, + instance_name=line_arguments.pop("instance"), + process_name=line_arguments.pop("process"), + predecessors=predecessors, + require_predecessor_success=require_predecessor_success, + parameters=line_arguments) + else: + return Task( + instance_name=line_arguments.pop("instance"), + process_name=line_arguments.pop("process"), + parameters=line_arguments) + +def parse_line_arguments(line: str) -> Dict[str, Any]: + line_arguments = {} line = line.replace("\\", UNIQUE_STRING) - for pair in shlex.split(line): - param, value = pair.split("=") - param = param.replace(UNIQUE_STRING, "\\") + + parts = shlex.split(line, posix=False) + + for part in parts: + if '=' not in part: + continue + argument, value = part.split('=', 1) + argument = argument.replace(UNIQUE_STRING, "\\") value = value.replace(UNIQUE_STRING, "\\") - # if instance or process, needs to be case insensitive - if param.lower() == "process" or param.lower() == "instance": - line_arguments[param.lower()] = value.strip('"').strip() - # parameters (e.g. pWaitSec) are case sensitive in TM1 REST API ! + if argument.lower() in ["process", "instance", "id"]: + line_arguments[argument.lower()] = value.strip('"') + elif argument.lower() == "require_predecessor_success": + line_arguments[argument] = value.strip('"').lower() in TRUE_VALUES + elif argument.lower() == "predecessors": + predecessors = value.strip('"').split(",") + line_arguments[argument] = [] if predecessors[0] in ["", "0", 0] else predecessors else: - line_arguments[param] = value.strip('"').strip() - return Task( - instance_name=line_arguments.pop("instance"), - process_name=line_arguments.pop("process"), - parameters=line_arguments) + # Remove surrounding double quotes if present + if value.startswith('"') and value.endswith('"'): + value = value[1:-1] + # Unescape quotes within the value + value = value.replace('\\"', '"') + # Replace double backslashes with single backslashes + value = value.replace("\\\\", "\\") + line_arguments[argument] = value + return line_arguments def expand_task( tm1_services: Dict[str, TM1Service], @@ -211,8 +246,8 @@ def expand_task( member_properties=['Name'], parent_properties=None, element_properties=None) - except: - raise RuntimeError(f"Failed to execute MDX '{mdx}'") + except Exception as e: + raise RuntimeError(f"Failed to execute MDX '{mdx}': {str(e)}") list_params.append([(param[:-1], element[0]["Name"]) for element in elements]) else: list_params.append([(param, value)]) @@ -226,48 +261,6 @@ def expand_task( return result -def extract_task_from_line_type_opt(line: str) -> OptimizedTask: - """ Translate one line from txt file type 'opt' into arguments for execution - :param: line: Arguments for execution. E.g. id="5" predecessors="2,3" require_predecessor_success="1" instance="tm1srv01" - process="Bedrock.Server.Wait" pWaitSec=5 - :return: attributes - """ - line_arguments = dict() - line = line.replace("\\", UNIQUE_STRING) - for pair in shlex.split(line): - argument, value = pair.split("=") - argument = argument.replace(UNIQUE_STRING, "\\") - value = value.replace(UNIQUE_STRING, "\\") - - # if instance or process, needs to be case insensitive - if argument.lower() == "process" or argument.lower() == "instance" or argument.lower() == "id": - line_arguments[argument.lower()] = value.strip('"').strip() - - elif argument.lower() == "require_predecessor_success": - line_arguments["require_predecessor_success"] = value.strip('"').strip().lower() in TRUE_VALUES - - # Convert string attribute value into list - elif argument.lower() == "predecessors": - predecessors = value.strip('"').strip().split(",") - # "", "0" and 0 is understood as 'no predecessor' - if predecessors[0] in ["", "0", 0]: - line_arguments[argument] = [] - else: - line_arguments[argument] = predecessors - - # parameters (e.g. pWaitSec) are case sensitive in TM1 REST API ! - else: - line_arguments[argument] = value.strip('"').strip() - - return OptimizedTask( - task_id=line_arguments.pop("id"), - instance_name=line_arguments.pop("instance"), - process_name=line_arguments.pop("process"), - predecessors=line_arguments.pop("predecessors"), - require_predecessor_success=line_arguments.pop("require_predecessor_success", False), - parameters=line_arguments) - - def extract_ordered_tasks_and_waits_from_file_type_norm( file_path: str, expand: bool = False, diff --git a/tests/tests.py b/tests/tests.py index b6ce36f..cee3286 100644 --- a/tests/tests.py +++ b/tests/tests.py @@ -1,7 +1,7 @@ import unittest from rushti import deduce_levels_of_tasks, extract_tasks_from_file_type_opt, \ - extract_ordered_tasks_and_waits_from_file_type_opt + extract_ordered_tasks_and_waits_from_file_type_opt, parse_line_arguments from utils import OptimizedTask, Wait @@ -128,3 +128,64 @@ def test_extract_lines_from_file_type_opt_multi_task_per_id(self): self.assertEqual(expected_task.parameters, ordered_task.parameters) self.assertEqual(expected_task.predecessors, ordered_task.predecessors) self.assertEqual(expected_task.require_predecessor_success, ordered_task.require_predecessor_success) + + +class TestParseLineArguments(unittest.TestCase): + + def test_basic_arguments(self): + line = 'instance=tm1 process=process1 param1="value1" param2="value 2"' + result = parse_line_arguments(line) + expected = { + 'instance': 'tm1', + 'process': 'process1', + 'param1': 'value1', + 'param2': 'value 2' + } + self.assertEqual(result, expected) + + def test_nested_double_quotes(self): + line = 'instance=tm1 process=process1 param1="value with \\"quotes\\"" param2="simple"' + result = parse_line_arguments(line) + expected = { + 'instance': 'tm1', + 'process': 'process1', + 'param1': 'value with "quotes"', + 'param2': 'simple' + } + self.assertEqual(result, expected) + + def test_backslashes(self): + line = r'instance=tm1 process=process1 param1="value\\with\\backslashes" param2="normal"' + result = parse_line_arguments(line) + expected = { + 'instance': 'tm1', + 'process': 'process1', + 'param1': r'value\with\backslashes', + 'param2': 'normal' + } + self.assertEqual(result, expected) + + def test_complex_nested_quotes(self): + line = 'instance=tm1 process=process1 param1="outer \\"inner \\\\"deepest\\\\" inner\\" outer"' + result = parse_line_arguments(line) + expected = { + 'instance': 'tm1', + 'process': 'process1', + 'param1': 'outer "inner \\"deepest\\" inner" outer' + } + self.assertEqual(result, expected) + + def test_predecessors_and_require_predecessor_success(self): + line = 'id=1 instance=tm1 process=process1 predecessors="2,3,4" require_predecessor_success="true"' + result = parse_line_arguments(line) + expected = { + 'id': '1', + 'instance': 'tm1', + 'process': 'process1', + 'predecessors': ['2', '3', '4'], + 'require_predecessor_success': True + } + self.assertEqual(result, expected) + +if __name__ == '__main__': + unittest.main()