Skip to content

Commit

Permalink
Added expandable task support
Browse files Browse the repository at this point in the history
Added support for commented lines
Fixed repeated task validations
  • Loading branch information
nicolasbisurgi authored and MariusWirtz committed Aug 11, 2023
1 parent dbd0068 commit 1c2ce5f
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 25 deletions.
16 changes: 16 additions & 0 deletions Tasks_type_classic_mdx.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
instance="tm1_d" process="zzz_sys_tm1_pause" pWait=1 pMDXTest1*=*"{TM1FILTERBYLEVEL(TM1SUBSETALL([Season].[Season]) , 0)}"
wait
# also
instance="tm1_d" process="zzz_sys_tm1_pause" pWait=2 pMDXTest2*=*"{TM1FILTERBYLEVEL(TM1SUBSETALL([Season].[Season]) , 0)}"
instance="tm1_d" process="zzz_sys_tm1_pause" pWait=3
# test
instance="tm1_d" process="zzz_sys_tm1_pause" pWait=4
wait
instance="tm1_d" process="zzz_sys_tm1_pause" pWait=5
wait
# comments
instance="tm1_d" process="zzz_sys_tm1_pause" pWait=6 pMDXTest1*=*"{TM1FILTERBYLEVEL(TM1SUBSETALL([Season].[Season]) , 0)}"
instance="tm1_d" process="zzz_sys_tm1_pause" pWait=7
wait
# here
instance="tm1_d" process="zzz_sys_tm1_pause" pWait=8
10 changes: 10 additions & 0 deletions Tasks_type_optimized_mdx.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# testing
id="1" predecessors="" require_predecessor_success="1" instance="tm1_d" process="zzz_sys_tm1_pause" pWait=5 pMDXTest1*=*"{TM1FILTERBYLEVEL(TM1SUBSETALL([Season].[Season]) , 0)}"
id="2" predecessors="" require_predecessor_success="1" instance="tm1_d" process="zzz_sys_tm1_pause" pWait=10 pMDXTest1*=*"{TM1FILTERBYLEVEL(TM1SUBSETALL([Season].[Season]) , 0)}" pMDXTest2*=*"{TM1FILTERBYLEVEL(TM1SUBSETALL([Scenario].[Scenario]) , 0)}"
# with comments
id="3" predecessors="2" require_predecessor_success="1" instance="tm1_d" process="zzz_sys_tm1_pause" pWait=5
id="4" predecessors="1" require_predecessor_success="1" instance="tm1_d" process="zzz_sys_tm1_pause" pWait=5
id="5" predecessors="3" require_predecessor_success="1" instance="tm1_d" process="zzz_sys_tm1_pause" pWait=5
id="6" predecessors="4,5" require_predecessor_success="1" instance="tm1_d" process="zzz_sys_tm1_pause" pWait=11 pMDXTest1*=*"{TM1FILTERBYLEVEL(TM1SUBSETALL([Season].[Season]) , 0)}"
id="7" predecessors="4" require_predecessor_success="1" instance="tm1_d" process="zzz_sys_tm1_pause" pWait=6
id="8" predecessors="6" require_predecessor_success="1" instance="tm1_d" process="zzz_sys_tm1_pause" pWait=6
85 changes: 63 additions & 22 deletions rushti.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from datetime import datetime, timedelta
from logging.config import fileConfig
from typing import List, Union, Dict
from itertools import product

try:
import chardet
Expand All @@ -21,7 +22,7 @@

from TM1py import TM1Service

from utils import set_current_directory, Task, OptimizedTask, ExecutionMode, Wait
from utils import set_current_directory, Task, OptimizedTask, ExecutionMode, Wait, flatten_to_list

APP_NAME = "RushTI"
CURRENT_DIRECTORY = set_current_directory()
Expand Down Expand Up @@ -87,6 +88,7 @@ def setup_tm1_services(max_workers: int, tasks_file_path: str, execution_mode: E

tm1_instances_in_tasks = get_instances_from_tasks_file(execution_mode, max_workers, tasks_file_path)

global tm1_services
tm1_services = dict()
# parse .ini
config = configparser.ConfigParser()
Expand Down Expand Up @@ -159,6 +161,29 @@ def extract_task_or_wait_from_line(line: str) -> Union[Task, Wait]:
parameters=line_arguments)


def expand_task(task: Union[Task, OptimizedTask]) -> List[Union[Task, OptimizedTask]]:
global tm1_services
tm1 = tm1_services[task.instance_name]
list_params = []
result = []
for param, value in task.parameters.items():
if param.endswith('*'):
elements = tm1.dimensions.hierarchies.elements.execute_set_mdx(value[1:], member_properties=['Name'],
parent_properties=None,
element_properties=None)
list_params.append([(param[:-1], element[0]["Name"]) for element in elements])
else:
list_params.append([(param, value)])
for expanded_params in [dict(combo) for combo in product(*list_params)]:
if isinstance(task, OptimizedTask):
result.append(OptimizedTask(task.id, task.instance_name, task.process_name, parameters=expanded_params,
predecessors=task.predecessors,
require_predecessor_success=task.require_predecessor_success))
elif isinstance(task, Task):
result.append(Task(task.instance_name, task.process_name, parameters=expanded_params))
return result


def extract_task_from_line_type_opt(line: str) -> OptimizedTask:
""" Translate one line from txt file type 'opt' into arguments for execution
:param: line: Arguments for execution. E.g. id="5" predecessors="2,3" require_predecessor_success="1" instance="tm1srv01"
Expand Down Expand Up @@ -201,14 +226,19 @@ def extract_task_from_line_type_opt(line: str) -> OptimizedTask:
parameters=line_arguments)


def extract_ordered_tasks_and_waits_from_file_type_norm(file_path: str):
def extract_ordered_tasks_and_waits_from_file_type_norm(file_path: str, expand: bool = False):
with open(file_path, encoding='utf-8') as file:
return [extract_task_or_wait_from_line(line) for line in file.readlines()]
original_tasks = [extract_task_or_wait_from_line(line) for line in file.readlines() if not line.startswith('#')]
if not expand:
return original_tasks
else:
return flatten_to_list([expand_task(task) if isinstance(task, Task) else Wait() for task in original_tasks])


def extract_ordered_tasks_and_waits_from_file_type_opt(max_workers: int, file_path: str) -> List[Task]:
def extract_ordered_tasks_and_waits_from_file_type_opt(max_workers: int, file_path: str, expand: bool = False) -> List[
Task]:
ordered_tasks_and_waits = list()
tasks = extract_tasks_from_file_type_opt(file_path)
tasks = extract_tasks_from_file_type_opt(file_path, expand)

# mapping of level (int) against list of tasks
tasks_by_level = deduce_levels_of_tasks(tasks)
Expand All @@ -223,7 +253,7 @@ def extract_ordered_tasks_and_waits_from_file_type_opt(max_workers: int, file_pa
return ordered_tasks_and_waits


def extract_tasks_from_file_type_opt(file_path: str) -> dict:
def extract_tasks_from_file_type_opt(file_path: str, expand: bool = False) -> dict:
"""
:param file_path:
:return: tasks
Expand All @@ -234,14 +264,24 @@ def extract_tasks_from_file_type_opt(file_path: str) -> dict:
lines = input_file.readlines()
# Build tasks dictionary
for line in lines:
# skip empty lines
if not line.strip():
continue
task = extract_task_from_line_type_opt(line)
if task.id not in tasks:
tasks[task.id] = [task]
else:
tasks[task.id].append(task)
# exclude comments
if not line.startswith('#'):
# skip empty lines
if not line.strip():
continue
task = extract_task_from_line_type_opt(line)
if task.id not in tasks:
tasks[task.id] = [task]
else:
tasks[task.id].append(task)

# expand tasks
if expand:
for task_id in tasks:
for task in tasks[task_id]:
for expanded_task in expand_task(task):
tasks[task.id].append(expanded_task)
tasks[task.id].remove(task)

# Populate the successors attribute
for task_id in tasks:
Expand Down Expand Up @@ -351,7 +391,8 @@ def pre_process_file(file_path: str):
file.write(text)


def get_ordered_tasks_and_waits(file_path: str, max_workers: int, tasks_file_type: ExecutionMode) -> List[Task]:
def get_ordered_tasks_and_waits(file_path: str, max_workers: int, tasks_file_type: ExecutionMode,
expand: bool = False) -> List[Task]:
""" Extract tasks from file
if necessary transform a file that respects type 'opt' specification into a scheduled and optimized list of tasks
:param file_path:
Expand All @@ -367,9 +408,9 @@ def get_ordered_tasks_and_waits(file_path: str, max_workers: int, tasks_file_typ
logging.info(f"Function '{pre_process_file.__name__}' skipped. Optional dependency 'chardet' not installed")

if tasks_file_type == ExecutionMode.NORM:
return extract_ordered_tasks_and_waits_from_file_type_norm(file_path)
return extract_ordered_tasks_and_waits_from_file_type_norm(file_path, expand)
else:
return extract_ordered_tasks_and_waits_from_file_type_opt(max_workers, file_path)
return extract_ordered_tasks_and_waits_from_file_type_opt(max_workers, file_path, expand)


def execute_process_with_retries(tm1: TM1Service, task: Task, retries: int):
Expand Down Expand Up @@ -514,7 +555,7 @@ def validate_tasks(tasks: List[Task], tm1_services: Dict[str, TM1Service]) -> bo
tm1 = tm1_services[task.instance_name]

# avoid repeated validations
if current_task in validated_tasks:
if current_task["process"] in validated_tasks:
continue

# check for process existence
Expand All @@ -524,7 +565,7 @@ def validate_tasks(tasks: List[Task], tm1_services: Dict[str, TM1Service]) -> bo
instance=task.instance_name
)
logger.error(msg)
validated_tasks.append(current_task)
validated_tasks.append(current_task["process"])
validation_ok = False
continue

Expand Down Expand Up @@ -595,7 +636,7 @@ def translate_cmd_arguments(*args):
""" Translation and Validity-checks for command line arguments.
:param args:
:param args:
:return: tasks_file_path, maximum_workers, execution_mode, retries, result_file
"""
# too few arguments
Expand Down Expand Up @@ -732,7 +773,7 @@ def exit_rushti(

try:
# determine and validate tasks
tasks = get_ordered_tasks_and_waits(tasks_file_path, maximum_workers, execution_mode)
tasks = get_ordered_tasks_and_waits(tasks_file_path, maximum_workers, execution_mode, True)
if not validate_tasks(tasks, tm1_service_by_instance):
raise ValueError("Invalid tasks provided")

Expand Down Expand Up @@ -765,4 +806,4 @@ def exit_rushti(
successes=sum(results),
start_time=start,
end_time=end,
elapsed_time=duration)
elapsed_time=duration)
37 changes: 34 additions & 3 deletions utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import os
import sys
from enum import Enum
from typing import List, Dict
from typing import List, Dict, Any


def set_current_directory():
Expand Down Expand Up @@ -32,7 +32,7 @@ def __eq__(self, other):
class Task:
id = 1

def __init__(self, instance_name, process_name, parameters):
def __init__(self, instance_name: str, process_name: str, parameters: Dict[str, Any] = None):
self.id = Task.id
self.instance_name = instance_name
self.process_name = process_name
Expand All @@ -48,7 +48,8 @@ def translate_to_line(self):


class OptimizedTask(Task):
def __init__(self, task_id: str, instance_name: str, process_name: str, parameters: Dict, predecessors: List,
def __init__(self, task_id: str, instance_name: str, process_name: str, parameters: Dict[str, Any],
predecessors: List,
require_predecessor_success: bool):
super().__init__(instance_name, process_name, parameters)
self.id = task_id
Expand All @@ -64,6 +65,15 @@ def has_predecessors(self):
def has_successors(self):
return len(self.successors) > 0

def translate_to_line(self):
return 'id="{id}" predecessors="{predecessors}" require_predecessor_success="{require_predecessor_success}" instance="{instance}" process="{process}" {parameters}\n'.format(
id=self.id,
predecessors=self.predecessors,
require_predecessor_success=self.require_predecessor_success,
instance=self.instance_name,
process=self.process_name,
parameters=' '.join('{}="{}"'.format(parameter, value) for parameter, value in self.parameters.items()))


class ExecutionMode(Enum):
NORM = 1
Expand All @@ -76,3 +86,24 @@ def _missing_(cls, value):
return member
# default
return cls.NORM


def flatten_to_list(object) -> list:
"""takes an nested iterables and returns a flat list of them
Args:
object (_type_): object containing the iterable
Returns:
list: flat list of objects
"""
gather = []
if not isinstance(object, str):
for item in object:
if isinstance(item, (list, tuple, set)):
gather.extend(flatten_to_list(item))
else:
gather.append(item)
else:
gather.append(object)
return gather

0 comments on commit 1c2ce5f

Please sign in to comment.