From 1c2ce5f0a710d36209b26444c7e585a637c5c331 Mon Sep 17 00:00:00 2001 From: nicolasbisurgi Date: Wed, 19 Jul 2023 16:22:15 -0500 Subject: [PATCH] Added expandable task support Added support for commented lines Fixed repeated task validations --- Tasks_type_classic_mdx.txt | 16 +++++++ Tasks_type_optimized_mdx.txt | 10 +++++ rushti.py | 85 ++++++++++++++++++++++++++---------- utils.py | 37 ++++++++++++++-- 4 files changed, 123 insertions(+), 25 deletions(-) create mode 100644 Tasks_type_classic_mdx.txt create mode 100644 Tasks_type_optimized_mdx.txt diff --git a/Tasks_type_classic_mdx.txt b/Tasks_type_classic_mdx.txt new file mode 100644 index 0000000..538c33c --- /dev/null +++ b/Tasks_type_classic_mdx.txt @@ -0,0 +1,16 @@ +instance="tm1_d" process="zzz_sys_tm1_pause" pWait=1 pMDXTest1*=*"{TM1FILTERBYLEVEL(TM1SUBSETALL([Season].[Season]) , 0)}" +wait +# also +instance="tm1_d" process="zzz_sys_tm1_pause" pWait=2 pMDXTest2*=*"{TM1FILTERBYLEVEL(TM1SUBSETALL([Season].[Season]) , 0)}" +instance="tm1_d" process="zzz_sys_tm1_pause" pWait=3 +# test +instance="tm1_d" process="zzz_sys_tm1_pause" pWait=4 +wait +instance="tm1_d" process="zzz_sys_tm1_pause" pWait=5 +wait +# comments +instance="tm1_d" process="zzz_sys_tm1_pause" pWait=6 pMDXTest1*=*"{TM1FILTERBYLEVEL(TM1SUBSETALL([Season].[Season]) , 0)}" +instance="tm1_d" process="zzz_sys_tm1_pause" pWait=7 +wait +# here +instance="tm1_d" process="zzz_sys_tm1_pause" pWait=8 \ No newline at end of file diff --git a/Tasks_type_optimized_mdx.txt b/Tasks_type_optimized_mdx.txt new file mode 100644 index 0000000..1a16ce7 --- /dev/null +++ b/Tasks_type_optimized_mdx.txt @@ -0,0 +1,10 @@ +# testing +id="1" predecessors="" require_predecessor_success="1" instance="tm1_d" process="zzz_sys_tm1_pause" pWait=5 pMDXTest1*=*"{TM1FILTERBYLEVEL(TM1SUBSETALL([Season].[Season]) , 0)}" +id="2" predecessors="" require_predecessor_success="1" instance="tm1_d" process="zzz_sys_tm1_pause" pWait=10 pMDXTest1*=*"{TM1FILTERBYLEVEL(TM1SUBSETALL([Season].[Season]) , 0)}" pMDXTest2*=*"{TM1FILTERBYLEVEL(TM1SUBSETALL([Scenario].[Scenario]) , 0)}" +# with comments +id="3" predecessors="2" require_predecessor_success="1" instance="tm1_d" process="zzz_sys_tm1_pause" pWait=5 +id="4" predecessors="1" require_predecessor_success="1" instance="tm1_d" process="zzz_sys_tm1_pause" pWait=5 +id="5" predecessors="3" require_predecessor_success="1" instance="tm1_d" process="zzz_sys_tm1_pause" pWait=5 +id="6" predecessors="4,5" require_predecessor_success="1" instance="tm1_d" process="zzz_sys_tm1_pause" pWait=11 pMDXTest1*=*"{TM1FILTERBYLEVEL(TM1SUBSETALL([Season].[Season]) , 0)}" +id="7" predecessors="4" require_predecessor_success="1" instance="tm1_d" process="zzz_sys_tm1_pause" pWait=6 +id="8" predecessors="6" require_predecessor_success="1" instance="tm1_d" process="zzz_sys_tm1_pause" pWait=6 \ No newline at end of file diff --git a/rushti.py b/rushti.py index d2502d0..3fb773b 100644 --- a/rushti.py +++ b/rushti.py @@ -13,6 +13,7 @@ from datetime import datetime, timedelta from logging.config import fileConfig from typing import List, Union, Dict +from itertools import product try: import chardet @@ -21,7 +22,7 @@ from TM1py import TM1Service -from utils import set_current_directory, Task, OptimizedTask, ExecutionMode, Wait +from utils import set_current_directory, Task, OptimizedTask, ExecutionMode, Wait, flatten_to_list APP_NAME = "RushTI" CURRENT_DIRECTORY = set_current_directory() @@ -87,6 +88,7 @@ def setup_tm1_services(max_workers: int, tasks_file_path: str, execution_mode: E tm1_instances_in_tasks = get_instances_from_tasks_file(execution_mode, max_workers, tasks_file_path) + global tm1_services tm1_services = dict() # parse .ini config = configparser.ConfigParser() @@ -159,6 +161,29 @@ def extract_task_or_wait_from_line(line: str) -> Union[Task, Wait]: parameters=line_arguments) +def expand_task(task: Union[Task, OptimizedTask]) -> List[Union[Task, OptimizedTask]]: + global tm1_services + tm1 = tm1_services[task.instance_name] + list_params = [] + result = [] + for param, value in task.parameters.items(): + if param.endswith('*'): + elements = tm1.dimensions.hierarchies.elements.execute_set_mdx(value[1:], member_properties=['Name'], + parent_properties=None, + element_properties=None) + list_params.append([(param[:-1], element[0]["Name"]) for element in elements]) + else: + list_params.append([(param, value)]) + for expanded_params in [dict(combo) for combo in product(*list_params)]: + if isinstance(task, OptimizedTask): + result.append(OptimizedTask(task.id, task.instance_name, task.process_name, parameters=expanded_params, + predecessors=task.predecessors, + require_predecessor_success=task.require_predecessor_success)) + elif isinstance(task, Task): + result.append(Task(task.instance_name, task.process_name, parameters=expanded_params)) + return result + + def extract_task_from_line_type_opt(line: str) -> OptimizedTask: """ Translate one line from txt file type 'opt' into arguments for execution :param: line: Arguments for execution. E.g. id="5" predecessors="2,3" require_predecessor_success="1" instance="tm1srv01" @@ -201,14 +226,19 @@ def extract_task_from_line_type_opt(line: str) -> OptimizedTask: parameters=line_arguments) -def extract_ordered_tasks_and_waits_from_file_type_norm(file_path: str): +def extract_ordered_tasks_and_waits_from_file_type_norm(file_path: str, expand: bool = False): with open(file_path, encoding='utf-8') as file: - return [extract_task_or_wait_from_line(line) for line in file.readlines()] + original_tasks = [extract_task_or_wait_from_line(line) for line in file.readlines() if not line.startswith('#')] + if not expand: + return original_tasks + else: + return flatten_to_list([expand_task(task) if isinstance(task, Task) else Wait() for task in original_tasks]) -def extract_ordered_tasks_and_waits_from_file_type_opt(max_workers: int, file_path: str) -> List[Task]: +def extract_ordered_tasks_and_waits_from_file_type_opt(max_workers: int, file_path: str, expand: bool = False) -> List[ + Task]: ordered_tasks_and_waits = list() - tasks = extract_tasks_from_file_type_opt(file_path) + tasks = extract_tasks_from_file_type_opt(file_path, expand) # mapping of level (int) against list of tasks tasks_by_level = deduce_levels_of_tasks(tasks) @@ -223,7 +253,7 @@ def extract_ordered_tasks_and_waits_from_file_type_opt(max_workers: int, file_pa return ordered_tasks_and_waits -def extract_tasks_from_file_type_opt(file_path: str) -> dict: +def extract_tasks_from_file_type_opt(file_path: str, expand: bool = False) -> dict: """ :param file_path: :return: tasks @@ -234,14 +264,24 @@ def extract_tasks_from_file_type_opt(file_path: str) -> dict: lines = input_file.readlines() # Build tasks dictionary for line in lines: - # skip empty lines - if not line.strip(): - continue - task = extract_task_from_line_type_opt(line) - if task.id not in tasks: - tasks[task.id] = [task] - else: - tasks[task.id].append(task) + # exclude comments + if not line.startswith('#'): + # skip empty lines + if not line.strip(): + continue + task = extract_task_from_line_type_opt(line) + if task.id not in tasks: + tasks[task.id] = [task] + else: + tasks[task.id].append(task) + + # expand tasks + if expand: + for task_id in tasks: + for task in tasks[task_id]: + for expanded_task in expand_task(task): + tasks[task.id].append(expanded_task) + tasks[task.id].remove(task) # Populate the successors attribute for task_id in tasks: @@ -351,7 +391,8 @@ def pre_process_file(file_path: str): file.write(text) -def get_ordered_tasks_and_waits(file_path: str, max_workers: int, tasks_file_type: ExecutionMode) -> List[Task]: +def get_ordered_tasks_and_waits(file_path: str, max_workers: int, tasks_file_type: ExecutionMode, + expand: bool = False) -> List[Task]: """ Extract tasks from file if necessary transform a file that respects type 'opt' specification into a scheduled and optimized list of tasks :param file_path: @@ -367,9 +408,9 @@ def get_ordered_tasks_and_waits(file_path: str, max_workers: int, tasks_file_typ logging.info(f"Function '{pre_process_file.__name__}' skipped. Optional dependency 'chardet' not installed") if tasks_file_type == ExecutionMode.NORM: - return extract_ordered_tasks_and_waits_from_file_type_norm(file_path) + return extract_ordered_tasks_and_waits_from_file_type_norm(file_path, expand) else: - return extract_ordered_tasks_and_waits_from_file_type_opt(max_workers, file_path) + return extract_ordered_tasks_and_waits_from_file_type_opt(max_workers, file_path, expand) def execute_process_with_retries(tm1: TM1Service, task: Task, retries: int): @@ -514,7 +555,7 @@ def validate_tasks(tasks: List[Task], tm1_services: Dict[str, TM1Service]) -> bo tm1 = tm1_services[task.instance_name] # avoid repeated validations - if current_task in validated_tasks: + if current_task["process"] in validated_tasks: continue # check for process existence @@ -524,7 +565,7 @@ def validate_tasks(tasks: List[Task], tm1_services: Dict[str, TM1Service]) -> bo instance=task.instance_name ) logger.error(msg) - validated_tasks.append(current_task) + validated_tasks.append(current_task["process"]) validation_ok = False continue @@ -595,7 +636,7 @@ def translate_cmd_arguments(*args): """ Translation and Validity-checks for command line arguments. - :param args: + :param args: :return: tasks_file_path, maximum_workers, execution_mode, retries, result_file """ # too few arguments @@ -732,7 +773,7 @@ def exit_rushti( try: # determine and validate tasks - tasks = get_ordered_tasks_and_waits(tasks_file_path, maximum_workers, execution_mode) + tasks = get_ordered_tasks_and_waits(tasks_file_path, maximum_workers, execution_mode, True) if not validate_tasks(tasks, tm1_service_by_instance): raise ValueError("Invalid tasks provided") @@ -765,4 +806,4 @@ def exit_rushti( successes=sum(results), start_time=start, end_time=end, - elapsed_time=duration) + elapsed_time=duration) \ No newline at end of file diff --git a/utils.py b/utils.py index 91f7c88..cb9a527 100644 --- a/utils.py +++ b/utils.py @@ -1,7 +1,7 @@ import os import sys from enum import Enum -from typing import List, Dict +from typing import List, Dict, Any def set_current_directory(): @@ -32,7 +32,7 @@ def __eq__(self, other): class Task: id = 1 - def __init__(self, instance_name, process_name, parameters): + def __init__(self, instance_name: str, process_name: str, parameters: Dict[str, Any] = None): self.id = Task.id self.instance_name = instance_name self.process_name = process_name @@ -48,7 +48,8 @@ def translate_to_line(self): class OptimizedTask(Task): - def __init__(self, task_id: str, instance_name: str, process_name: str, parameters: Dict, predecessors: List, + def __init__(self, task_id: str, instance_name: str, process_name: str, parameters: Dict[str, Any], + predecessors: List, require_predecessor_success: bool): super().__init__(instance_name, process_name, parameters) self.id = task_id @@ -64,6 +65,15 @@ def has_predecessors(self): def has_successors(self): return len(self.successors) > 0 + def translate_to_line(self): + return 'id="{id}" predecessors="{predecessors}" require_predecessor_success="{require_predecessor_success}" instance="{instance}" process="{process}" {parameters}\n'.format( + id=self.id, + predecessors=self.predecessors, + require_predecessor_success=self.require_predecessor_success, + instance=self.instance_name, + process=self.process_name, + parameters=' '.join('{}="{}"'.format(parameter, value) for parameter, value in self.parameters.items())) + class ExecutionMode(Enum): NORM = 1 @@ -76,3 +86,24 @@ def _missing_(cls, value): return member # default return cls.NORM + + +def flatten_to_list(object) -> list: + """takes an nested iterables and returns a flat list of them + + Args: + object (_type_): object containing the iterable + + Returns: + list: flat list of objects + """ + gather = [] + if not isinstance(object, str): + for item in object: + if isinstance(item, (list, tuple, set)): + gather.extend(flatten_to_list(item)) + else: + gather.append(item) + else: + gather.append(object) + return gather \ No newline at end of file