From 59941abd53da1adf4827bbe88bb61f88973d6263 Mon Sep 17 00:00:00 2001 From: nicolasbisurgi Date: Thu, 11 Apr 2024 15:06:47 -0500 Subject: [PATCH 1/8] added feature to enable or disable rules --- TM1py/Services/CubeService.py | 26 +++++++++++++++++++++++++- Tests/CubeService_test.py | 18 +++++++++++++++++- 2 files changed, 42 insertions(+), 2 deletions(-) diff --git a/TM1py/Services/CubeService.py b/TM1py/Services/CubeService.py index fcf31325..4843037f 100644 --- a/TM1py/Services/CubeService.py +++ b/TM1py/Services/CubeService.py @@ -231,7 +231,7 @@ def search_for_dimension(self, dimension_name: str, skip_control_cubes: bool = F def search_for_dimension_substring(self, substring: str, skip_control_cubes: bool = False, **kwargs) -> Dict[str, List[str]]: - """ Ask TM1 Server for a dictinary of cube names with the dimension whose name contains the substring + """ Ask TM1 Server for a dictionary of cube names with the dimension whose name contains the substring :param substring: string to search for in dim name :param skip_control_cubes: bool, True will exclude control cubes from result @@ -249,6 +249,30 @@ def search_for_dimension_substring(self, substring: str, skip_control_cubes: boo cube_dict = {entry['Name']: [dim['Name'] for dim in entry['Dimensions']] for entry in response.json()['value']} return cube_dict + def toggle_cube_rule(self, cube: Cube, enabled: bool): + """ Enable or disable a cube rule (entirely) + + :param cube: an instance of a Cube + :param enabled: True to enable the rule, False to disable it + """ + if not cube.rules.text: + # If there is no rule, there is nothing to do + return + else: + rule_lines = cube.rules.text.split('\n') + if enabled: + # If enabling the rule, remove the first '#' from the beginning of each line if present + modified_lines = [line[1:] if line.startswith('#') else line for line in rule_lines] + else: + # If disabling the rule, add '#' to the beginning of each line + modified_lines = ['#' + line for line in rule_lines] + + # Join the modified lines back into a single string + cube.rules = '\n'.join(modified_lines) + + self.update(cube) + + def search_for_rule_substring(self, substring: str, skip_control_cubes: bool = False, case_insensitive=True, space_insensitive=True, **kwargs) -> List[Cube]: """ get all cubes from TM1 Server as TM1py.Cube instances where rules for given cube contain specified substring diff --git a/Tests/CubeService_test.py b/Tests/CubeService_test.py index 0175f1b7..6aacd5f6 100644 --- a/Tests/CubeService_test.py +++ b/Tests/CubeService_test.py @@ -27,7 +27,7 @@ def setUp(self): # Connection to TM1 self.config = configparser.ConfigParser() self.config.read(Path(__file__).parent.joinpath('config.ini')) - self.tm1 = TM1Service(**self.config['tm1srv01']) + self.tm1 = TM1Service(**self.config['tm1srv04']) for dimension_name in self.dimension_names: elements = [Element('Element {}'.format(str(j)), 'Numeric') for j in range(1, 1001)] @@ -301,6 +301,22 @@ def test_get_measure_dimension(self): self.assertEqual(self.dimension_names[-1], measure_dimension) + def test_toggle_cube_rule(self): + uncommented = "SKIPCHECK;\n[]=N:2;\n#find_me_comment\nFEEDERS;\n" + commented = "#SKIPCHECK;\n#[]=N:2;\n##find_me_comment\n#FEEDERS;\n#" + c = self.tm1.cubes.get(self.cube_name) + c.rules = uncommented + self.tm1.cubes.update(c) + + # test disabling + self.tm1.cubes.toggle_cube_rule(c, enabled=False) + self.assertEqual(c.rules.text, commented) + + # test re-enable + self.tm1.cubes.toggle_cube_rule(c, enabled=True) + self.assertEqual(c.rules.text, uncommented) + + def tearDown(self): self.tm1.cubes.delete(self.cube_name) if self.tm1.cubes.exists(self.cube_name_to_delete): From e0b964b84ecfcfcf96bde2fe6a65b02dc8098cfb Mon Sep 17 00:00:00 2001 From: nicolasbisurgi Date: Fri, 12 Apr 2024 09:46:42 -0500 Subject: [PATCH 2/8] switch feature to use base64 encode/decode --- TM1py/Services/CubeService.py | 46 +++++++++++++++++++++-------------- Tests/CubeService_test.py | 9 ++++--- 2 files changed, 33 insertions(+), 22 deletions(-) diff --git a/TM1py/Services/CubeService.py b/TM1py/Services/CubeService.py index 4843037f..f467e602 100644 --- a/TM1py/Services/CubeService.py +++ b/TM1py/Services/CubeService.py @@ -4,6 +4,7 @@ from typing import List, Iterable, Dict from requests import Response +import base64 from TM1py.Objects.Cube import Cube from TM1py.Services.CellService import CellService @@ -249,29 +250,38 @@ def search_for_dimension_substring(self, substring: str, skip_control_cubes: boo cube_dict = {entry['Name']: [dim['Name'] for dim in entry['Dimensions']] for entry in response.json()['value']} return cube_dict - def toggle_cube_rule(self, cube: Cube, enabled: bool): - """ Enable or disable a cube rule (entirely) + def enable_cube_rule(self, cube: Cube) -> None: + """ + Enable a cube rule from its base64-encoded hash if it exists. - :param cube: an instance of a Cube - :param enabled: True to enable the rule, False to disable it + :param cube: An instance of a Cube. """ - if not cube.rules.text: - # If there is no rule, there is nothing to do + current_rule = cube.rules.text + if not current_rule: + # If there is no rule, there is nothing to do. return - else: - rule_lines = cube.rules.text.split('\n') - if enabled: - # If enabling the rule, remove the first '#' from the beginning of each line if present - modified_lines = [line[1:] if line.startswith('#') else line for line in rule_lines] - else: - # If disabling the rule, add '#' to the beginning of each line - modified_lines = ['#' + line for line in rule_lines] + try: + current_rule = current_rule[1:] if current_rule.startswith('#') else current_rule + cube.rules = base64.b64decode(current_rule).decode('utf-8') + except Exception: + raise ValueError(f"Current rule is not decodable by b64decode standards") - # Join the modified lines back into a single string - cube.rules = '\n'.join(modified_lines) + self.update(cube) - self.update(cube) + def disable_cube_rule(self, cube: Cube) -> None: + """ + Disable a cube rule by saving its base64-encoded hash and commenting each line. + :param cube: An instance of a Cube. + """ + current_rule = cube.rules.text + if not current_rule: + # If there is no rule, there is nothing to do. + return + + # Save the current rule as a base64-encoded hash + cube.rules = f"#{base64.b64encode(current_rule.encode('utf-8')).decode('utf-8')}" + self.update(cube) def search_for_rule_substring(self, substring: str, skip_control_cubes: bool = False, case_insensitive=True, space_insensitive=True, **kwargs) -> List[Cube]: @@ -429,4 +439,4 @@ def get_vmt(self, cube_name: str): def set_vmt(self, cube_name: str, vmt: int): url = format_url("/Cubes('{}')", cube_name) payload = {"ViewStorageMinTime": vmt} - response = self._rest.PATCH(url=url, data=json.dumps(payload)) \ No newline at end of file + response = self._rest.PATCH(url=url, data=json.dumps(payload)) diff --git a/Tests/CubeService_test.py b/Tests/CubeService_test.py index 6aacd5f6..43d547d8 100644 --- a/Tests/CubeService_test.py +++ b/Tests/CubeService_test.py @@ -1,3 +1,4 @@ +import base64 import configparser import unittest import uuid @@ -303,17 +304,17 @@ def test_get_measure_dimension(self): def test_toggle_cube_rule(self): uncommented = "SKIPCHECK;\n[]=N:2;\n#find_me_comment\nFEEDERS;\n" - commented = "#SKIPCHECK;\n#[]=N:2;\n##find_me_comment\n#FEEDERS;\n#" c = self.tm1.cubes.get(self.cube_name) c.rules = uncommented self.tm1.cubes.update(c) # test disabling - self.tm1.cubes.toggle_cube_rule(c, enabled=False) - self.assertEqual(c.rules.text, commented) + self.tm1.cubes.disable_cube_rule(c) + self.assertEqual(c.has_rules, False) + self.assertEqual(c.rules.text.startswith('#'), True) # test re-enable - self.tm1.cubes.toggle_cube_rule(c, enabled=True) + self.tm1.cubes.enable_cube_rule(c) self.assertEqual(c.rules.text, uncommented) From f6ee7209d4b830bc4a0e8b733508bc27bc9f4490 Mon Sep 17 00:00:00 2001 From: nicolasbisurgi Date: Mon, 15 Apr 2024 10:42:05 -0500 Subject: [PATCH 3/8] - change ValueError for Runtime error - added prefix to disabled rule --- TM1py/Services/CubeService.py | 9 ++++++--- Tests/CubeService_test.py | 2 +- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/TM1py/Services/CubeService.py b/TM1py/Services/CubeService.py index f467e602..1714627a 100644 --- a/TM1py/Services/CubeService.py +++ b/TM1py/Services/CubeService.py @@ -261,10 +261,11 @@ def enable_cube_rule(self, cube: Cube) -> None: # If there is no rule, there is nothing to do. return try: - current_rule = current_rule[1:] if current_rule.startswith('#') else current_rule + prefix = "# b64 encoded rule=" + current_rule = current_rule[len(prefix):] if current_rule.startswith(prefix) else current_rule cube.rules = base64.b64decode(current_rule).decode('utf-8') except Exception: - raise ValueError(f"Current rule is not decodable by b64decode standards") + raise RuntimeError(f"Current rule is not decodable by b64decode standards") self.update(cube) @@ -280,7 +281,9 @@ def disable_cube_rule(self, cube: Cube) -> None: return # Save the current rule as a base64-encoded hash - cube.rules = f"#{base64.b64encode(current_rule.encode('utf-8')).decode('utf-8')}" + prefix = "# b64 encoded rule=" + cube.rules = f"{prefix}{base64.b64encode(current_rule.encode('utf-8')).decode('utf-8')}" + self.update(cube) def search_for_rule_substring(self, substring: str, skip_control_cubes: bool = False, case_insensitive=True, diff --git a/Tests/CubeService_test.py b/Tests/CubeService_test.py index 43d547d8..027f3357 100644 --- a/Tests/CubeService_test.py +++ b/Tests/CubeService_test.py @@ -311,7 +311,7 @@ def test_toggle_cube_rule(self): # test disabling self.tm1.cubes.disable_cube_rule(c) self.assertEqual(c.has_rules, False) - self.assertEqual(c.rules.text.startswith('#'), True) + self.assertEqual(c.rules.text.startswith('# b64 encoded rule='), True) # test re-enable self.tm1.cubes.enable_cube_rule(c) From 740aada01fbd075715ee18585ada7b6ff3c94069 Mon Sep 17 00:00:00 2001 From: nicolasbisurgi Date: Tue, 16 Apr 2024 15:45:55 -0500 Subject: [PATCH 4/8] added enable/disable rule per subsection --- TM1py/Objects/Rules.py | 2 +- TM1py/Services/CubeService.py | 68 ++++++++++++++++++++++++++++------- Tests/CubeService_test.py | 26 ++++++++++---- 3 files changed, 76 insertions(+), 20 deletions(-) diff --git a/TM1py/Objects/Rules.py b/TM1py/Objects/Rules.py index 19f61386..ec4b18e8 100644 --- a/TM1py/Objects/Rules.py +++ b/TM1py/Objects/Rules.py @@ -3,6 +3,7 @@ from TM1py.Objects.TM1Object import TM1Object +KEYWORDS = ['SKIPCHECK', 'FEEDSTRINGS', 'UNDEFVALS', 'FEEDERS'] class Rules(TM1Object): """ @@ -13,7 +14,6 @@ class Rules(TM1Object): comments are not included. """ - KEYWORDS = ['SKIPCHECK', 'FEEDSTRINGS', 'UNDEFVALS', 'FEEDERS'] def __init__(self, rules: str): self._text = rules diff --git a/TM1py/Services/CubeService.py b/TM1py/Services/CubeService.py index 1714627a..de751ea3 100644 --- a/TM1py/Services/CubeService.py +++ b/TM1py/Services/CubeService.py @@ -7,6 +7,7 @@ import base64 from TM1py.Objects.Cube import Cube +from TM1py.Objects.Rules import KEYWORDS from TM1py.Services.CellService import CellService from TM1py.Services.ObjectService import ObjectService from TM1py.Services.RestService import RestService @@ -250,39 +251,82 @@ def search_for_dimension_substring(self, substring: str, skip_control_cubes: boo cube_dict = {entry['Name']: [dim['Name'] for dim in entry['Dimensions']] for entry in response.json()['value']} return cube_dict - def enable_cube_rule(self, cube: Cube) -> None: + def enable_cube_rule(self, cube: Cube, sections: List[str] = None) -> None: """ Enable a cube rule from its base64-encoded hash if it exists. :param cube: An instance of a Cube. + :param sections: a list of valid Rule sections (KEYWORDS) """ current_rule = cube.rules.text if not current_rule: # If there is no rule, there is nothing to do. return - try: - prefix = "# b64 encoded rule=" - current_rule = current_rule[len(prefix):] if current_rule.startswith(prefix) else current_rule - cube.rules = base64.b64decode(current_rule).decode('utf-8') - except Exception: - raise RuntimeError(f"Current rule is not decodable by b64decode standards") + + prefix = "# B64 ENCODED " + + if not sections: + # Decode the entire rule + rule_prefix = f"{prefix}RULE=" + encoded_rule = current_rule[len(rule_prefix):] if current_rule.startswith(rule_prefix) else current_rule + cube.rules = base64.b64decode(encoded_rule).decode('utf-8') + else: + for section in [section.upper() for section in sections]: + if section not in KEYWORDS: + raise ValueError(f"{section} is not a valid value, only {KEYWORDS} are accepted.") + else: + new_rule = cube.rules.text.splitlines() + for i, line in enumerate(new_rule): + section_prefix = f"{prefix}{section}=".upper() + if line.upper().startswith(section_prefix): + encoded_section = line[len(section_prefix):] + new_rule[i] = base64.b64decode(encoded_section).decode('utf-8') + + cube.rules = "\n".join(new_rule) self.update(cube) - def disable_cube_rule(self, cube: Cube) -> None: + def disable_cube_rule(self, cube: Cube, sections: List[str] = None) -> None: """ Disable a cube rule by saving its base64-encoded hash and commenting each line. - :param cube: An instance of a Cube. + :param sections: a list of valid Rule sections (KEYWORDS) """ current_rule = cube.rules.text if not current_rule: # If there is no rule, there is nothing to do. return - # Save the current rule as a base64-encoded hash - prefix = "# b64 encoded rule=" - cube.rules = f"{prefix}{base64.b64encode(current_rule.encode('utf-8')).decode('utf-8')}" + prefix = "# B64 ENCODED " + if not sections: + # Encode the entire rule + cube.rules = f"{prefix}RULE={base64.b64encode(current_rule.encode('utf-8')).decode('utf-8')}" + + else: + for section in [section.upper() for section in sections]: + + if section not in KEYWORDS: + raise ValueError(f"{section} is not a valid value, only {KEYWORDS} are accepted.") + + else: + + if section in ['FEEDSTRINGS', 'UNDEFVALS']: + rule = cube.rules.text.splitlines() + for i, line in enumerate(rule): + section_prefix = f"{prefix}{section}=" + if line.upper().startswith(section): + rule[i] = f"{section_prefix}{base64.b64encode(line.encode('utf-8')).decode('utf-8')}" + cube.rules = "\n".join(rule) + + else: + section_str = 'SKIPCHECK;' if section == 'SKIPCHECK' else 'FEEDERS;' + rule = cube.rules.text.splitlines() + section_starts = rule.index(section_str) + section_ends = rule.index('FEEDERS;') if 'FEEDERS;' in rule and section == 'SKIPCHECK' else len(rule) + section_body = "\n".join(rule[section_starts:section_ends]) + encoded_section = f"{prefix}{section}={base64.b64encode(section_body.encode('utf-8')).decode('utf-8')}" + rule[section_starts:section_ends] = [encoded_section] + cube.rules = "\n".join(rule) self.update(cube) diff --git a/Tests/CubeService_test.py b/Tests/CubeService_test.py index 027f3357..471bad16 100644 --- a/Tests/CubeService_test.py +++ b/Tests/CubeService_test.py @@ -28,7 +28,7 @@ def setUp(self): # Connection to TM1 self.config = configparser.ConfigParser() self.config.read(Path(__file__).parent.joinpath('config.ini')) - self.tm1 = TM1Service(**self.config['tm1srv04']) + self.tm1 = TM1Service(**self.config['tm1srv01']) for dimension_name in self.dimension_names: elements = [Element('Element {}'.format(str(j)), 'Numeric') for j in range(1, 1001)] @@ -303,20 +303,32 @@ def test_get_measure_dimension(self): self.assertEqual(self.dimension_names[-1], measure_dimension) def test_toggle_cube_rule(self): - uncommented = "SKIPCHECK;\n[]=N:2;\n#find_me_comment\nFEEDERS;\n" + uncommented = Rules("#comment1\nFEEDSTRINGS;\nUNDEFVALS;\n#comment2\nSKIPCHECK;\n#comment3\n#comment4\n[" \ + "]=N:2;\n#find_me_comment\nFEEDERS;\n#comment5\n[]=>DB(some_cube);\n#comment6") c = self.tm1.cubes.get(self.cube_name) c.rules = uncommented self.tm1.cubes.update(c) + self.assertEqual(self.tm1.cubes.get(c.name).has_rules, True) + # test disabling + self.tm1.cubes.disable_cube_rule(c, sections=['FEEDSTRINGS']) + self.tm1.cubes.enable_cube_rule(c, sections=['FEEDSTRINGS']) + self.assertEqual(c.rules.text, uncommented.text) + + self.tm1.cubes.disable_cube_rule(c, sections=['UNDEFVALS']) + self.tm1.cubes.enable_cube_rule(c, sections=['UNDEFVALS']) + self.assertEqual(c.rules.text, uncommented.text) + + self.tm1.cubes.disable_cube_rule(c, sections=['SKIPCHECK', 'FEEDERS']) + self.tm1.cubes.enable_cube_rule(c, sections=['SKIPCHECK', 'FEEDERS']) + self.assertEqual(c.rules.text, uncommented.text) + self.tm1.cubes.disable_cube_rule(c) - self.assertEqual(c.has_rules, False) - self.assertEqual(c.rules.text.startswith('# b64 encoded rule='), True) + self.assertEqual(c.rules.text.startswith('# B64 ENCODED RULE='), True) - # test re-enable self.tm1.cubes.enable_cube_rule(c) - self.assertEqual(c.rules.text, uncommented) - + self.assertEqual(c.rules.text, uncommented.text) def tearDown(self): self.tm1.cubes.delete(self.cube_name) From 295d9135872376738a40abce7823c93595a62a99 Mon Sep 17 00:00:00 2001 From: nicolasbisurgi Date: Tue, 16 Apr 2024 15:45:55 -0500 Subject: [PATCH 5/8] added enable/disable rule per subsection --- TM1py/Objects/Cube.py | 2 +- TM1py/Objects/Rules.py | 2 +- TM1py/Services/CubeService.py | 68 ++++++++++++++++++++++++++++------- Tests/CubeService_test.py | 29 +++++++++++---- 4 files changed, 80 insertions(+), 21 deletions(-) diff --git a/TM1py/Objects/Cube.py b/TM1py/Objects/Cube.py index 52632464..f4119776 100644 --- a/TM1py/Objects/Cube.py +++ b/TM1py/Objects/Cube.py @@ -112,6 +112,6 @@ def _construct_body(self) -> str: body_as_dict['Dimensions@odata.bind'] = [format_url("Dimensions('{}')", dimension) for dimension in self.dimensions] - if self.has_rules: + if self.rules.text: body_as_dict['Rules'] = str(self.rules) return json.dumps(body_as_dict, ensure_ascii=False) diff --git a/TM1py/Objects/Rules.py b/TM1py/Objects/Rules.py index 19f61386..ec4b18e8 100644 --- a/TM1py/Objects/Rules.py +++ b/TM1py/Objects/Rules.py @@ -3,6 +3,7 @@ from TM1py.Objects.TM1Object import TM1Object +KEYWORDS = ['SKIPCHECK', 'FEEDSTRINGS', 'UNDEFVALS', 'FEEDERS'] class Rules(TM1Object): """ @@ -13,7 +14,6 @@ class Rules(TM1Object): comments are not included. """ - KEYWORDS = ['SKIPCHECK', 'FEEDSTRINGS', 'UNDEFVALS', 'FEEDERS'] def __init__(self, rules: str): self._text = rules diff --git a/TM1py/Services/CubeService.py b/TM1py/Services/CubeService.py index 1714627a..de751ea3 100644 --- a/TM1py/Services/CubeService.py +++ b/TM1py/Services/CubeService.py @@ -7,6 +7,7 @@ import base64 from TM1py.Objects.Cube import Cube +from TM1py.Objects.Rules import KEYWORDS from TM1py.Services.CellService import CellService from TM1py.Services.ObjectService import ObjectService from TM1py.Services.RestService import RestService @@ -250,39 +251,82 @@ def search_for_dimension_substring(self, substring: str, skip_control_cubes: boo cube_dict = {entry['Name']: [dim['Name'] for dim in entry['Dimensions']] for entry in response.json()['value']} return cube_dict - def enable_cube_rule(self, cube: Cube) -> None: + def enable_cube_rule(self, cube: Cube, sections: List[str] = None) -> None: """ Enable a cube rule from its base64-encoded hash if it exists. :param cube: An instance of a Cube. + :param sections: a list of valid Rule sections (KEYWORDS) """ current_rule = cube.rules.text if not current_rule: # If there is no rule, there is nothing to do. return - try: - prefix = "# b64 encoded rule=" - current_rule = current_rule[len(prefix):] if current_rule.startswith(prefix) else current_rule - cube.rules = base64.b64decode(current_rule).decode('utf-8') - except Exception: - raise RuntimeError(f"Current rule is not decodable by b64decode standards") + + prefix = "# B64 ENCODED " + + if not sections: + # Decode the entire rule + rule_prefix = f"{prefix}RULE=" + encoded_rule = current_rule[len(rule_prefix):] if current_rule.startswith(rule_prefix) else current_rule + cube.rules = base64.b64decode(encoded_rule).decode('utf-8') + else: + for section in [section.upper() for section in sections]: + if section not in KEYWORDS: + raise ValueError(f"{section} is not a valid value, only {KEYWORDS} are accepted.") + else: + new_rule = cube.rules.text.splitlines() + for i, line in enumerate(new_rule): + section_prefix = f"{prefix}{section}=".upper() + if line.upper().startswith(section_prefix): + encoded_section = line[len(section_prefix):] + new_rule[i] = base64.b64decode(encoded_section).decode('utf-8') + + cube.rules = "\n".join(new_rule) self.update(cube) - def disable_cube_rule(self, cube: Cube) -> None: + def disable_cube_rule(self, cube: Cube, sections: List[str] = None) -> None: """ Disable a cube rule by saving its base64-encoded hash and commenting each line. - :param cube: An instance of a Cube. + :param sections: a list of valid Rule sections (KEYWORDS) """ current_rule = cube.rules.text if not current_rule: # If there is no rule, there is nothing to do. return - # Save the current rule as a base64-encoded hash - prefix = "# b64 encoded rule=" - cube.rules = f"{prefix}{base64.b64encode(current_rule.encode('utf-8')).decode('utf-8')}" + prefix = "# B64 ENCODED " + if not sections: + # Encode the entire rule + cube.rules = f"{prefix}RULE={base64.b64encode(current_rule.encode('utf-8')).decode('utf-8')}" + + else: + for section in [section.upper() for section in sections]: + + if section not in KEYWORDS: + raise ValueError(f"{section} is not a valid value, only {KEYWORDS} are accepted.") + + else: + + if section in ['FEEDSTRINGS', 'UNDEFVALS']: + rule = cube.rules.text.splitlines() + for i, line in enumerate(rule): + section_prefix = f"{prefix}{section}=" + if line.upper().startswith(section): + rule[i] = f"{section_prefix}{base64.b64encode(line.encode('utf-8')).decode('utf-8')}" + cube.rules = "\n".join(rule) + + else: + section_str = 'SKIPCHECK;' if section == 'SKIPCHECK' else 'FEEDERS;' + rule = cube.rules.text.splitlines() + section_starts = rule.index(section_str) + section_ends = rule.index('FEEDERS;') if 'FEEDERS;' in rule and section == 'SKIPCHECK' else len(rule) + section_body = "\n".join(rule[section_starts:section_ends]) + encoded_section = f"{prefix}{section}={base64.b64encode(section_body.encode('utf-8')).decode('utf-8')}" + rule[section_starts:section_ends] = [encoded_section] + cube.rules = "\n".join(rule) self.update(cube) diff --git a/Tests/CubeService_test.py b/Tests/CubeService_test.py index 027f3357..ac373c1c 100644 --- a/Tests/CubeService_test.py +++ b/Tests/CubeService_test.py @@ -28,7 +28,7 @@ def setUp(self): # Connection to TM1 self.config = configparser.ConfigParser() self.config.read(Path(__file__).parent.joinpath('config.ini')) - self.tm1 = TM1Service(**self.config['tm1srv04']) + self.tm1 = TM1Service(**self.config['tm1srv01']) for dimension_name in self.dimension_names: elements = [Element('Element {}'.format(str(j)), 'Numeric') for j in range(1, 1001)] @@ -303,20 +303,35 @@ def test_get_measure_dimension(self): self.assertEqual(self.dimension_names[-1], measure_dimension) def test_toggle_cube_rule(self): - uncommented = "SKIPCHECK;\n[]=N:2;\n#find_me_comment\nFEEDERS;\n" + uncommented = Rules("#comment1\nFEEDSTRINGS;\nUNDEFVALS;\n#comment2\nSKIPCHECK;\n#comment3\n#comment4\n[" \ + "]=N:2;\n#find_me_comment\nFEEDERS;\n#comment5\n[]=>DB(some_cube);\n#comment6") c = self.tm1.cubes.get(self.cube_name) c.rules = uncommented self.tm1.cubes.update(c) + self.assertEqual(self.tm1.cubes.get(c.name).has_rules, True) + # test disabling + self.tm1.cubes.disable_cube_rule(c, sections=['FEEDSTRINGS']) + self.tm1.cubes.enable_cube_rule(c, sections=['FEEDSTRINGS']) + self.assertEqual(c.rules.text, uncommented.text) + + self.tm1.cubes.disable_cube_rule(c, sections=['UNDEFVALS']) + self.tm1.cubes.enable_cube_rule(c, sections=['UNDEFVALS']) + self.assertEqual(c.rules.text, uncommented.text) + + self.tm1.cubes.disable_cube_rule(c, sections=['SKIPCHECK', 'FEEDERS']) + self.tm1.cubes.enable_cube_rule(c, sections=['SKIPCHECK', 'FEEDERS']) + self.assertEqual(c.rules.text, uncommented.text) + self.tm1.cubes.disable_cube_rule(c) - self.assertEqual(c.has_rules, False) - self.assertEqual(c.rules.text.startswith('# b64 encoded rule='), True) + self.assertEqual(c.rules.text.startswith('# B64 ENCODED RULE='), True) - # test re-enable - self.tm1.cubes.enable_cube_rule(c) - self.assertEqual(c.rules.text, uncommented) + cells = {('Element 1', 'Element 1', 'Element 1'): 1} + self.tm1.cells.write_values(self.cube_name, cells) + self.tm1.cubes.enable_cube_rule(c) + self.assertEqual(c.rules.text, uncommented.text) def tearDown(self): self.tm1.cubes.delete(self.cube_name) From 7a167035788fa8770e54725319a31aad631b857b Mon Sep 17 00:00:00 2001 From: MariusWirtz Date: Wed, 17 Apr 2024 17:56:44 +1000 Subject: [PATCH 6/8] Enable, disable rules or feeders enable, disable functions in Cube class enable, disable service functions in CubeService enable and disable all rules or only feeders --- TM1py/Objects/Cube.py | 72 +++++++++++++- TM1py/Objects/Rules.py | 5 +- TM1py/Services/CubeService.py | 98 ++++++------------ Tests/CubeService_test.py | 46 ++++----- Tests/Cube_test.py | 180 ++++++++++++++++++++++++++++++++-- 5 files changed, 293 insertions(+), 108 deletions(-) diff --git a/TM1py/Objects/Cube.py b/TM1py/Objects/Cube.py index f4119776..ed5335b5 100644 --- a/TM1py/Objects/Cube.py +++ b/TM1py/Objects/Cube.py @@ -1,10 +1,10 @@ # -*- coding: utf-8 -*- - +import base64 import collections import json from typing import Iterable, List, Dict, Optional, Union -from TM1py.Objects.Rules import Rules +from TM1py.Objects.Rules import Rules, RULES_ENCODING_PREFIX, FEEDERS_ENCODING_PREFIX from TM1py.Objects.TM1Object import TM1Object from TM1py.Utils import format_url @@ -112,6 +112,72 @@ def _construct_body(self) -> str: body_as_dict['Dimensions@odata.bind'] = [format_url("Dimensions('{}')", dimension) for dimension in self.dimensions] - if self.rules.text: + if str(self.rules): body_as_dict['Rules'] = str(self.rules) return json.dumps(body_as_dict, ensure_ascii=False) + + def enable_rules(self): + if not self.rules.text: + # If there is no rule, there is nothing to do. + return + + rules_statements = self.rules.text.splitlines() + if not len(rules_statements) == 1: + raise RuntimeError( + "The cube rules are not disabled correctly. " + "Must be 1 line of base64-encoded hash.") + + encoded_rules_statement = rules_statements[0] + if not encoded_rules_statement.startswith(RULES_ENCODING_PREFIX): + raise RuntimeError( + f"The cube rules are not disabled correctly. " + f"Must start with prefix: '{RULES_ENCODING_PREFIX}'") + + encoded_rule = encoded_rules_statement[len(RULES_ENCODING_PREFIX):] + self._rules = Rules(base64.b64decode(encoded_rule).decode('utf-8')) + + def enable_feeders(self): + if not self.rules: + # If there is no rule, there is nothing to do. + return + + rules_statements = self.rules.text.splitlines() + + encoded_feeders_statement = rules_statements[-1] + if not encoded_feeders_statement.startswith(FEEDERS_ENCODING_PREFIX): + raise RuntimeError( + f"The cube feeders are not disabled correctly. " + f"First line in Feeders section must start with prefix: '{FEEDERS_ENCODING_PREFIX}'") + + encoded_feeders = encoded_feeders_statement[len(FEEDERS_ENCODING_PREFIX):] + self._rules = Rules( + self.rules.text[:-len(encoded_feeders_statement)] + base64.b64decode(encoded_feeders).decode('utf-8')) + + def disable_feeders(self): + if not self.rules: + # If there is no rule, there is nothing to do. + return + + rule_statements = self.rules.text.splitlines() + + # sanitize statements to simplify identification of 'feeders;' line + sanitized_rule_statements = [ + rule.strip().lower() + for rule + in rule_statements] + + feeders_start_index = sanitized_rule_statements.index('feeders;') + 1 + feeders_statements = rule_statements[feeders_start_index:] + hashed_feeders = base64.b64encode('\n'.join(feeders_statements).encode('utf-8')).decode('utf-8') + + rule_statements[feeders_start_index:] = [f"{FEEDERS_ENCODING_PREFIX}{hashed_feeders}"] + self._rules = Rules("\n".join(rule_statements)) + + def disable_rules(self): + if not self.rules: + # If there is no rule, there is nothing to do. + return + + # Encode the entire rule + self._rules = Rules( + f"{RULES_ENCODING_PREFIX}{base64.b64encode(self.rules.text.encode('utf-8')).decode('utf-8')}") diff --git a/TM1py/Objects/Rules.py b/TM1py/Objects/Rules.py index ec4b18e8..662bd523 100644 --- a/TM1py/Objects/Rules.py +++ b/TM1py/Objects/Rules.py @@ -3,7 +3,10 @@ from TM1py.Objects.TM1Object import TM1Object -KEYWORDS = ['SKIPCHECK', 'FEEDSTRINGS', 'UNDEFVALS', 'FEEDERS'] + +RULES_ENCODING_PREFIX = "# B64 ENCODED RULES=" +FEEDERS_ENCODING_PREFIX = "# B64 ENCODED FEEDERS=" + class Rules(TM1Object): """ diff --git a/TM1py/Services/CubeService.py b/TM1py/Services/CubeService.py index de751ea3..cd14c41f 100644 --- a/TM1py/Services/CubeService.py +++ b/TM1py/Services/CubeService.py @@ -1,13 +1,12 @@ # -*- coding: utf-8 -*- +import base64 import json import random from typing import List, Iterable, Dict from requests import Response -import base64 from TM1py.Objects.Cube import Cube -from TM1py.Objects.Rules import KEYWORDS from TM1py.Services.CellService import CellService from TM1py.Services.ObjectService import ObjectService from TM1py.Services.RestService import RestService @@ -15,6 +14,8 @@ from TM1py.Utils import format_url, require_version, require_data_admin, case_and_space_insensitive_equals + + class CubeService(ObjectService): """ Service to handle Object Updates for TM1 Cubes @@ -251,83 +252,42 @@ def search_for_dimension_substring(self, substring: str, skip_control_cubes: boo cube_dict = {entry['Name']: [dim['Name'] for dim in entry['Dimensions']] for entry in response.json()['value']} return cube_dict - def enable_cube_rule(self, cube: Cube, sections: List[str] = None) -> None: + def disable_rules(self, cube_name: str) -> None: """ - Enable a cube rule from its base64-encoded hash if it exists. + Disable the entire cube rule by substituting it with its base64-encoded hash - :param cube: An instance of a Cube. - :param sections: a list of valid Rule sections (KEYWORDS) + :param cube_name: name of the cube """ - current_rule = cube.rules.text - if not current_rule: - # If there is no rule, there is nothing to do. - return - - prefix = "# B64 ENCODED " - - if not sections: - # Decode the entire rule - rule_prefix = f"{prefix}RULE=" - encoded_rule = current_rule[len(rule_prefix):] if current_rule.startswith(rule_prefix) else current_rule - cube.rules = base64.b64decode(encoded_rule).decode('utf-8') - else: - for section in [section.upper() for section in sections]: - if section not in KEYWORDS: - raise ValueError(f"{section} is not a valid value, only {KEYWORDS} are accepted.") - else: - new_rule = cube.rules.text.splitlines() - for i, line in enumerate(new_rule): - section_prefix = f"{prefix}{section}=".upper() - if line.upper().startswith(section_prefix): - encoded_section = line[len(section_prefix):] - new_rule[i] = base64.b64decode(encoded_section).decode('utf-8') - - cube.rules = "\n".join(new_rule) - + cube = self.get(cube_name) + cube.disable_rules() self.update(cube) - def disable_cube_rule(self, cube: Cube, sections: List[str] = None) -> None: + def disable_feeders(self, cube_name: str) -> None: """ - Disable a cube rule by saving its base64-encoded hash and commenting each line. - :param cube: An instance of a Cube. - :param sections: a list of valid Rule sections (KEYWORDS) + Disable the feeders by substituting it with its base64-encoded hash + + :param cube_name: name of the cube """ - current_rule = cube.rules.text - if not current_rule: - # If there is no rule, there is nothing to do. - return + cube = self.get(cube_name) + cube.disable_feeders() + self.update(cube) - prefix = "# B64 ENCODED " - if not sections: - # Encode the entire rule - cube.rules = f"{prefix}RULE={base64.b64encode(current_rule.encode('utf-8')).decode('utf-8')}" + def enable_rules(self, cube_name: str) -> None: + """ Enable the disabled cube rules by decoding the base64-encoded hash - else: - for section in [section.upper() for section in sections]: - - if section not in KEYWORDS: - raise ValueError(f"{section} is not a valid value, only {KEYWORDS} are accepted.") - - else: - - if section in ['FEEDSTRINGS', 'UNDEFVALS']: - rule = cube.rules.text.splitlines() - for i, line in enumerate(rule): - section_prefix = f"{prefix}{section}=" - if line.upper().startswith(section): - rule[i] = f"{section_prefix}{base64.b64encode(line.encode('utf-8')).decode('utf-8')}" - cube.rules = "\n".join(rule) - - else: - section_str = 'SKIPCHECK;' if section == 'SKIPCHECK' else 'FEEDERS;' - rule = cube.rules.text.splitlines() - section_starts = rule.index(section_str) - section_ends = rule.index('FEEDERS;') if 'FEEDERS;' in rule and section == 'SKIPCHECK' else len(rule) - section_body = "\n".join(rule[section_starts:section_ends]) - encoded_section = f"{prefix}{section}={base64.b64encode(section_body.encode('utf-8')).decode('utf-8')}" - rule[section_starts:section_ends] = [encoded_section] - cube.rules = "\n".join(rule) + :param cube_name: name of the cube + """ + cube = self.get(cube_name) + cube.enable_rules() + self.update(cube) + + def enable_feeders(self, cube_name: str) -> None: + """ Enable the disabled cube rules by decoding the base64-encoded hash + :param cube_name: name of the cube + """ + cube = self.get(cube_name) + cube.enable_feeders() self.update(cube) def search_for_rule_substring(self, substring: str, skip_control_cubes: bool = False, case_insensitive=True, diff --git a/Tests/CubeService_test.py b/Tests/CubeService_test.py index 7ad9437a..a8c1e62b 100644 --- a/Tests/CubeService_test.py +++ b/Tests/CubeService_test.py @@ -1,4 +1,3 @@ -import base64 import configparser import unittest import uuid @@ -28,7 +27,7 @@ def setUp(self): # Connection to TM1 self.config = configparser.ConfigParser() self.config.read(Path(__file__).parent.joinpath('config.ini')) - self.tm1 = TM1Service(**self.config['tm1srv04']) + self.tm1 = TM1Service(**self.config['tm1srv01']) for dimension_name in self.dimension_names: elements = [Element('Element {}'.format(str(j)), 'Numeric') for j in range(1, 1001)] @@ -302,36 +301,31 @@ def test_get_measure_dimension(self): self.assertEqual(self.dimension_names[-1], measure_dimension) - def test_toggle_cube_rule(self): - uncommented = Rules("#comment1\nFEEDSTRINGS;\nUNDEFVALS;\n#comment2\nSKIPCHECK;\n#comment3\n#comment4\n[" \ - "]=N:2;\n#find_me_comment\nFEEDERS;\n#comment5\n[]=>DB(some_cube);\n#comment6") + def test_disable_rules_enable_rules(self): + original = Rules( + "#comment1\n" + "FEEDSTRINGS;\n" + "UNDEFVALS;\n" + "#comment2\n" + "SKIPCHECK;\n" + "#comment3\n" + "#comment4\n" + "[]=N:2;\n" + "#find_me_comment\n" + "FEEDERS;\n" + "#comment5\n" + "#comment6" + ) c = self.tm1.cubes.get(self.cube_name) - c.rules = uncommented + c.rules = original self.tm1.cubes.update(c) self.assertEqual(self.tm1.cubes.get(c.name).has_rules, True) - # test disabling - self.tm1.cubes.disable_cube_rule(c, sections=['FEEDSTRINGS']) - self.tm1.cubes.enable_cube_rule(c, sections=['FEEDSTRINGS']) - self.assertEqual(c.rules.text, uncommented.text) + self.tm1.cubes.disable_rules(c.name) + self.tm1.cubes.enable_rules(c.name) - self.tm1.cubes.disable_cube_rule(c, sections=['UNDEFVALS']) - self.tm1.cubes.enable_cube_rule(c, sections=['UNDEFVALS']) - self.assertEqual(c.rules.text, uncommented.text) - - self.tm1.cubes.disable_cube_rule(c, sections=['SKIPCHECK', 'FEEDERS']) - self.tm1.cubes.enable_cube_rule(c, sections=['SKIPCHECK', 'FEEDERS']) - self.assertEqual(c.rules.text, uncommented.text) - - self.tm1.cubes.disable_cube_rule(c) - self.assertEqual(c.rules.text.startswith('# B64 ENCODED RULE='), True) - - cells = {('Element 1', 'Element 1', 'Element 1'): 1} - self.tm1.cells.write_values(self.cube_name, cells) - - self.tm1.cubes.enable_cube_rule(c) - self.assertEqual(c.rules.text, uncommented.text) + self.assertEqual(c.rules.text, original.text) def tearDown(self): self.tm1.cubes.delete(self.cube_name) diff --git a/Tests/Cube_test.py b/Tests/Cube_test.py index b199ec3b..55a5f8a5 100644 --- a/Tests/Cube_test.py +++ b/Tests/Cube_test.py @@ -1,18 +1,21 @@ +import base64 import unittest from TM1py import Cube, Rules class TestCube(unittest.TestCase): - rules = """ -['d1':'e1'] = N: 1; -['d1':'e2'] = N: 2; -['d1':'e3'] = N: 3; -""" - cube = Cube( - name="c1", - dimensions=["d1", "d2"], - rules=rules) + + def setUp(self) -> None: + self.rules = ( + "['d1':'e1'] = N: 1;\n" + "['d1':'e2'] = N: 2;\n" + "['d1':'e3'] = N: 3;\n" + ) + self.cube = Cube( + name="c1", + dimensions=["d1", "d2"], + rules=self.rules) def test_update_rule_with_str(self): self.cube.rules = "['d1':'e1'] = N: 1;" @@ -28,6 +31,165 @@ def test_update_rule_with_rules_obj(self): self.cube.rules, Rules("['d1':'e1'] = N: 2;")) + def test_disable_rules(self): + self.cube.disable_rules() + + self.assertEqual( + "# B64 ENCODED RULES=WydkMSc6J2UxJ10gPSBOOiAxOwpbJ2QxJzonZTInXSA9IE46IDI7ClsnZDEnOidlMyddID0gTjogMzsK", + self.cube.rules.text + ) + + def test_disable_rules_enable_rules(self): + original_value = self.cube.rules.text + + self.cube.disable_rules() + self.cube.enable_rules() + + self.assertEqual( + original_value, + self.cube.rules.text + ) + + def test_disable_feeders(self): + self.cube.rules = Rules( + "SKIPCHECK;\n" + "['d1':'e1'] = N: ['d1':'e2'] * 2;\n" + "['d1':'e3'] = N: ['d1':'e4'] * 2;\n" + "FEEDERS;\n" + "['d1':'e2'] => ['d1':'e1'];\n" + "['d1':'e4'] => ['d1':'e3'];\n" + ) + self.cube.disable_feeders() + + self.assertEqual( + "# B64 ENCODED FEEDERS=WydkMSc6J2UyJ10gPT4gWydkMSc6J2UxJ107ClsnZDEnOidlNCddID0+IFsnZDEnOidlMyddOw==", + self.cube.rules.text.splitlines()[-1] + ) + + def test_disable_feeders_enable_feeders(self): + self.cube.rules = Rules( + "SKIPCHECK;\n" + "['d1':'e1'] = N: ['d1':'e2'] * 2;\n" + "['d1':'e3'] = N: ['d1':'e4'] * 2;\n" + "FEEDERS;\n" + "['d1':'e2'] => ['d1':'e1'];\n" + "['d1':'e4'] => ['d1':'e3'];" + ) + + original_rules = self.cube.rules.text + + self.cube.disable_feeders() + self.cube.enable_feeders() + + self.assertEqual( + original_rules, + self.cube.rules.text + ) + + def test_enable_rules_disable_rules_with_comments(self): + self.cube.rules = Rules( + # Not Relevant + "SKIPCHECK;\n" + # Not Relevant + # Not Relevant + "['d1':'e1'] = N: ['d1':'e2'] * 2;\n" + # Not Relevant + "['d1':'e3'] = N: ['d1':'e4'] * 2;\n" + # Not Relevant + "FEEDERS;\n" + # Not Relevant + # Not Relevant + "['d1':'e2'] => ['d1':'e1'];\n" + # Not Relevant + "['d1':'e4'] => ['d1':'e3'];" + ) + + original_rules = self.cube.rules.text + + self.cube.disable_rules() + self.cube.enable_rules() + + self.assertEqual( + original_rules, + self.cube.rules.text + ) + + def test_enable_feeders_disable_feeders_with_comments(self): + self.cube.rules = Rules( + # Not Relevant + "SKIPCHECK;\n" + # Not Relevant + # Not Relevant + "['d1':'e1'] = N: ['d1':'e2'] * 2;\n" + # Not Relevant + "['d1':'e3'] = N: ['d1':'e4'] * 2;\n" + # Not Relevant + "FEEDERS;\n" + # Not Relevant + # Not Relevant + "['d1':'e2'] => ['d1':'e1'];\n" + # Not Relevant + "['d1':'e4'] => ['d1':'e3'];" + ) + + original_rules = self.cube.rules.text + + self.cube.disable_feeders() + self.cube.enable_feeders() + + self.assertEqual( + original_rules, + self.cube.rules.text + ) + + def test_enable_rules_disable_rules_with_keywords(self): + self.cube.rules = Rules( + "FEEDSTRINGS;\n" + "UNDEVFALS;\n" + "SKIPCHECK;\n" + # Not Relevant + "['d1':'e1'] = N: ['d1':'e2'] * 2;\n" + # Not Relevant + "['d1':'e3'] = N: ['d1':'e4'] * 2;\n" + "FEEDERS;\n" + "['d1':'e2'] => ['d1':'e1'];\n" + "['d1':'e4'] => ['d1':'e3'];" + ) + + original_rules = self.cube.rules.text + + self.cube.disable_rules() + self.cube.enable_rules() + + self.assertEqual( + original_rules, + self.cube.rules.text + ) + + def test_enable_feeders_disable_feeders_with_keywords(self): + self.cube.rules = Rules( + "FEEDSTRINGS;\n" + "UNDEVFALS;\n" + "SKIPCHECK;\n" + # Not Relevant + "['d1':'e1'] = N: ['d1':'e2'] * 2;\n" + # Not Relevant + "['d1':'e3'] = N: ['d1':'e4'] * 2;\n" + "FEEDERS;\n" + "['d1':'e2'] => ['d1':'e1'];\n" + "['d1':'e4'] => ['d1':'e3'];" + ) + + original_rules = self.cube.rules.text + + self.cube.disable_feeders() + self.cube.enable_feeders() + + self.assertEqual( + original_rules, + self.cube.rules.text + ) + if __name__ == '__main__': unittest.main() From 0221f3beadb4c092f57ef88b4eff7e31f9b34504 Mon Sep 17 00:00:00 2001 From: VVM Date: Sat, 20 Apr 2024 11:00:04 +0100 Subject: [PATCH 7/8] Override boolean operator in Rules to return True when all lines in a rule are commented --- TM1py/Objects/Cube.py | 2 +- TM1py/Objects/Rules.py | 5 +++++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/TM1py/Objects/Cube.py b/TM1py/Objects/Cube.py index ed5335b5..33ffe60f 100644 --- a/TM1py/Objects/Cube.py +++ b/TM1py/Objects/Cube.py @@ -112,7 +112,7 @@ def _construct_body(self) -> str: body_as_dict['Dimensions@odata.bind'] = [format_url("Dimensions('{}')", dimension) for dimension in self.dimensions] - if str(self.rules): + if self.has_rules: body_as_dict['Rules'] = str(self.rules) return json.dumps(body_as_dict, ensure_ascii=False) diff --git a/TM1py/Objects/Rules.py b/TM1py/Objects/Rules.py index 662bd523..7a34ab15 100644 --- a/TM1py/Objects/Rules.py +++ b/TM1py/Objects/Rules.py @@ -92,3 +92,8 @@ def __iter__(self): def __str__(self): return self.text + + def __bool__(self): + if len(self.text): + return True + return False From ccaa0041d70e0c6cf4a748efb2f03ad893e014a8 Mon Sep 17 00:00:00 2001 From: MariusWirtz Date: Mon, 29 Apr 2024 22:45:33 +0200 Subject: [PATCH 8/8] Handle empty feeders dynamically in disable,enable Also handle duplicated disables, enables --- TM1py/Objects/Cube.py | 57 ++++++++++++++++----- Tests/Cube_test.py | 112 +++++++++++++++++++++++++++++++++++++----- 2 files changed, 147 insertions(+), 22 deletions(-) diff --git a/TM1py/Objects/Cube.py b/TM1py/Objects/Cube.py index 33ffe60f..aa092201 100644 --- a/TM1py/Objects/Cube.py +++ b/TM1py/Objects/Cube.py @@ -116,11 +116,18 @@ def _construct_body(self) -> str: body_as_dict['Rules'] = str(self.rules) return json.dumps(body_as_dict, ensure_ascii=False) - def enable_rules(self): + def enable_rules(self, error_if_not_disabled: bool = False): if not self.rules.text: # If there is no rule, there is nothing to do. return + if not self.rules.text.startswith(RULES_ENCODING_PREFIX): + if error_if_not_disabled: + raise RuntimeError( + f"The cube rules are already enabled. " + f"First line in Rules section must start with prefix: '{RULES_ENCODING_PREFIX}'") + return + rules_statements = self.rules.text.splitlines() if not len(rules_statements) == 1: raise RuntimeError( @@ -136,26 +143,37 @@ def enable_rules(self): encoded_rule = encoded_rules_statement[len(RULES_ENCODING_PREFIX):] self._rules = Rules(base64.b64decode(encoded_rule).decode('utf-8')) - def enable_feeders(self): + def enable_feeders(self, error_if_not_disabled: bool = False): + # case no rule if not self.rules: - # If there is no rule, there is nothing to do. return - rules_statements = self.rules.text.splitlines() + rule_statements = self.rules.text.splitlines() + # sanitize statements to simplify identification of 'feeders;' line + sanitized_rule_statements = [ + rule.strip().lower() + for rule + in rule_statements] + + # case no feeders + if 'feeders;' not in sanitized_rule_statements: + return - encoded_feeders_statement = rules_statements[-1] + encoded_feeders_statement = rule_statements[-1] if not encoded_feeders_statement.startswith(FEEDERS_ENCODING_PREFIX): - raise RuntimeError( - f"The cube feeders are not disabled correctly. " - f"First line in Feeders section must start with prefix: '{FEEDERS_ENCODING_PREFIX}'") + if error_if_not_disabled: + raise RuntimeError( + f"The cube feeders are not disabled correctly. " + f"First line in Feeders section must start with prefix: '{FEEDERS_ENCODING_PREFIX}'") + return encoded_feeders = encoded_feeders_statement[len(FEEDERS_ENCODING_PREFIX):] self._rules = Rules( self.rules.text[:-len(encoded_feeders_statement)] + base64.b64decode(encoded_feeders).decode('utf-8')) - def disable_feeders(self): + def disable_feeders(self, error_if_disabled: bool = False): + # case no rule if not self.rules: - # If there is no rule, there is nothing to do. return rule_statements = self.rules.text.splitlines() @@ -166,18 +184,35 @@ def disable_feeders(self): for rule in rule_statements] + # case no feeders + if 'feeders;' not in sanitized_rule_statements: + return + feeders_start_index = sanitized_rule_statements.index('feeders;') + 1 feeders_statements = rule_statements[feeders_start_index:] + + # don't disable twice + if len(feeders_statements) == 1 and feeders_statements[0].startswith(FEEDERS_ENCODING_PREFIX): + if error_if_disabled: + raise RuntimeError("The cube feeders are already disabled") + return + hashed_feeders = base64.b64encode('\n'.join(feeders_statements).encode('utf-8')).decode('utf-8') rule_statements[feeders_start_index:] = [f"{FEEDERS_ENCODING_PREFIX}{hashed_feeders}"] self._rules = Rules("\n".join(rule_statements)) - def disable_rules(self): + def disable_rules(self, error_if_disabled: bool = False): if not self.rules: # If there is no rule, there is nothing to do. return + # don't disable twice + if self.rules.text.startswith(RULES_ENCODING_PREFIX): + if error_if_disabled: + raise RuntimeError("The cube rules are already disabled") + return + # Encode the entire rule self._rules = Rules( f"{RULES_ENCODING_PREFIX}{base64.b64encode(self.rules.text.encode('utf-8')).decode('utf-8')}") diff --git a/Tests/Cube_test.py b/Tests/Cube_test.py index 55a5f8a5..8b790e19 100644 --- a/Tests/Cube_test.py +++ b/Tests/Cube_test.py @@ -1,4 +1,3 @@ -import base64 import unittest from TM1py import Cube, Rules @@ -50,23 +49,34 @@ def test_disable_rules_enable_rules(self): self.cube.rules.text ) - def test_disable_feeders(self): + def test_disable_feeders_enable_feeders(self): self.cube.rules = Rules( "SKIPCHECK;\n" "['d1':'e1'] = N: ['d1':'e2'] * 2;\n" "['d1':'e3'] = N: ['d1':'e4'] * 2;\n" "FEEDERS;\n" "['d1':'e2'] => ['d1':'e1'];\n" - "['d1':'e4'] => ['d1':'e3'];\n" + "['d1':'e4'] => ['d1':'e3'];" ) - self.cube.disable_feeders() + self.cube.disable_feeders() self.assertEqual( "# B64 ENCODED FEEDERS=WydkMSc6J2UyJ10gPT4gWydkMSc6J2UxJ107ClsnZDEnOidlNCddID0+IFsnZDEnOidlMyddOw==", self.cube.rules.text.splitlines()[-1] ) - def test_disable_feeders_enable_feeders(self): + self.cube.enable_feeders() + self.assertEqual( + "SKIPCHECK;\n" + "['d1':'e1'] = N: ['d1':'e2'] * 2;\n" + "['d1':'e3'] = N: ['d1':'e4'] * 2;\n" + "FEEDERS;\n" + "['d1':'e2'] => ['d1':'e1'];\n" + "['d1':'e4'] => ['d1':'e3'];", + self.cube.rules.text + ) + + def test_disable_feeders_twice_enable_feeders(self): self.cube.rules = Rules( "SKIPCHECK;\n" "['d1':'e1'] = N: ['d1':'e2'] * 2;\n" @@ -76,17 +86,69 @@ def test_disable_feeders_enable_feeders(self): "['d1':'e4'] => ['d1':'e3'];" ) - original_rules = self.cube.rules.text + self.cube.disable_feeders() + self.cube.disable_feeders() + self.assertEqual( + "# B64 ENCODED FEEDERS=WydkMSc6J2UyJ10gPT4gWydkMSc6J2UxJ107ClsnZDEnOidlNCddID0+IFsnZDEnOidlMyddOw==", + self.cube.rules.text.splitlines()[-1] + ) + + self.cube.enable_feeders() + self.assertEqual( + "SKIPCHECK;\n" + "['d1':'e1'] = N: ['d1':'e2'] * 2;\n" + "['d1':'e3'] = N: ['d1':'e4'] * 2;\n" + "FEEDERS;\n" + "['d1':'e2'] => ['d1':'e1'];\n" + "['d1':'e4'] => ['d1':'e3'];", + self.cube.rules.text + ) + + def test_disable_feeders_enable_feeders_no_feeders(self): + self.cube.rules = Rules( + "SKIPCHECK;\n" + "['d1':'e1'] = N: ['d1':'e2'] * 2;\n" + "['d1':'e3'] = N: ['d1':'e4'] * 2;\n" + "FEEDERS;\n" + ) self.cube.disable_feeders() + self.assertEqual( + "# B64 ENCODED FEEDERS=", + self.cube.rules.text.splitlines()[-1] + ) + self.cube.enable_feeders() + self.assertEqual( + "SKIPCHECK;\n" + "['d1':'e1'] = N: ['d1':'e2'] * 2;\n" + "['d1':'e3'] = N: ['d1':'e4'] * 2;\n" + "FEEDERS;\n", + self.cube.rules.text + ) + + def test_disable_enable_feeders_no_feeders_statement(self): + self.cube.rules = Rules( + "['d1':'e1'] = N: ['d1':'e2'] * 2;\n" + "['d1':'e3'] = N: ['d1':'e4'] * 2;\n" + ) + self.cube.disable_feeders() self.assertEqual( - original_rules, + "['d1':'e1'] = N: ['d1':'e2'] * 2;\n" + "['d1':'e3'] = N: ['d1':'e4'] * 2;\n", self.cube.rules.text ) - def test_enable_rules_disable_rules_with_comments(self): + self.cube.enable_feeders() + self.assertEqual( + "['d1':'e1'] = N: ['d1':'e2'] * 2;\n" + "['d1':'e3'] = N: ['d1':'e4'] * 2;\n", + self.cube.rules.text + ) + + + def test_disable_rules_enable_rules_with_comments(self): self.cube.rules = Rules( # Not Relevant "SKIPCHECK;\n" @@ -114,7 +176,7 @@ def test_enable_rules_disable_rules_with_comments(self): self.cube.rules.text ) - def test_enable_feeders_disable_feeders_with_comments(self): + def test_disable_feeders_enable_feeders_with_comments(self): self.cube.rules = Rules( # Not Relevant "SKIPCHECK;\n" @@ -142,7 +204,7 @@ def test_enable_feeders_disable_feeders_with_comments(self): self.cube.rules.text ) - def test_enable_rules_disable_rules_with_keywords(self): + def test_disable_rules_enable_rules_with_keywords(self): self.cube.rules = Rules( "FEEDSTRINGS;\n" "UNDEVFALS;\n" @@ -166,7 +228,7 @@ def test_enable_rules_disable_rules_with_keywords(self): self.cube.rules.text ) - def test_enable_feeders_disable_feeders_with_keywords(self): + def test_disable_feeders_enable_feeders_with_keywords(self): self.cube.rules = Rules( "FEEDSTRINGS;\n" "UNDEVFALS;\n" @@ -190,6 +252,34 @@ def test_enable_feeders_disable_feeders_with_keywords(self): self.cube.rules.text ) + def test_disable_feeders_twice_raise_error(self): + self.cube.rules = Rules( + "SKIPCHECK;\n" + "['d1':'e1'] = N: ['d1':'e2'] * 2;\n" + "['d1':'e3'] = N: ['d1':'e4'] * 2;\n" + "FEEDERS;\n" + "['d1':'e2'] => ['d1':'e1'];\n" + "['d1':'e4'] => ['d1':'e3'];" + ) + + self.cube.disable_feeders() + with self.assertRaises(RuntimeError): + self.cube.disable_feeders(error_if_disabled=True) + + def test_disable_rules_twice_raise_error(self): + self.cube.rules = Rules( + "SKIPCHECK;\n" + "['d1':'e1'] = N: ['d1':'e2'] * 2;\n" + "['d1':'e3'] = N: ['d1':'e4'] * 2;\n" + "FEEDERS;\n" + "['d1':'e2'] => ['d1':'e1'];\n" + "['d1':'e4'] => ['d1':'e3'];" + ) + + self.cube.disable_rules() + with self.assertRaises(RuntimeError): + self.cube.disable_rules(error_if_disabled=True) + if __name__ == '__main__': unittest.main()