From ccaa0041d70e0c6cf4a748efb2f03ad893e014a8 Mon Sep 17 00:00:00 2001 From: MariusWirtz Date: Mon, 29 Apr 2024 22:45:33 +0200 Subject: [PATCH] Handle empty feeders dynamically in disable,enable Also handle duplicated disables, enables --- TM1py/Objects/Cube.py | 57 ++++++++++++++++----- Tests/Cube_test.py | 112 +++++++++++++++++++++++++++++++++++++----- 2 files changed, 147 insertions(+), 22 deletions(-) diff --git a/TM1py/Objects/Cube.py b/TM1py/Objects/Cube.py index 33ffe60f..aa092201 100644 --- a/TM1py/Objects/Cube.py +++ b/TM1py/Objects/Cube.py @@ -116,11 +116,18 @@ def _construct_body(self) -> str: body_as_dict['Rules'] = str(self.rules) return json.dumps(body_as_dict, ensure_ascii=False) - def enable_rules(self): + def enable_rules(self, error_if_not_disabled: bool = False): if not self.rules.text: # If there is no rule, there is nothing to do. return + if not self.rules.text.startswith(RULES_ENCODING_PREFIX): + if error_if_not_disabled: + raise RuntimeError( + f"The cube rules are already enabled. " + f"First line in Rules section must start with prefix: '{RULES_ENCODING_PREFIX}'") + return + rules_statements = self.rules.text.splitlines() if not len(rules_statements) == 1: raise RuntimeError( @@ -136,26 +143,37 @@ def enable_rules(self): encoded_rule = encoded_rules_statement[len(RULES_ENCODING_PREFIX):] self._rules = Rules(base64.b64decode(encoded_rule).decode('utf-8')) - def enable_feeders(self): + def enable_feeders(self, error_if_not_disabled: bool = False): + # case no rule if not self.rules: - # If there is no rule, there is nothing to do. return - rules_statements = self.rules.text.splitlines() + rule_statements = self.rules.text.splitlines() + # sanitize statements to simplify identification of 'feeders;' line + sanitized_rule_statements = [ + rule.strip().lower() + for rule + in rule_statements] + + # case no feeders + if 'feeders;' not in sanitized_rule_statements: + return - encoded_feeders_statement = rules_statements[-1] + encoded_feeders_statement = rule_statements[-1] if not encoded_feeders_statement.startswith(FEEDERS_ENCODING_PREFIX): - raise RuntimeError( - f"The cube feeders are not disabled correctly. " - f"First line in Feeders section must start with prefix: '{FEEDERS_ENCODING_PREFIX}'") + if error_if_not_disabled: + raise RuntimeError( + f"The cube feeders are not disabled correctly. " + f"First line in Feeders section must start with prefix: '{FEEDERS_ENCODING_PREFIX}'") + return encoded_feeders = encoded_feeders_statement[len(FEEDERS_ENCODING_PREFIX):] self._rules = Rules( self.rules.text[:-len(encoded_feeders_statement)] + base64.b64decode(encoded_feeders).decode('utf-8')) - def disable_feeders(self): + def disable_feeders(self, error_if_disabled: bool = False): + # case no rule if not self.rules: - # If there is no rule, there is nothing to do. return rule_statements = self.rules.text.splitlines() @@ -166,18 +184,35 @@ def disable_feeders(self): for rule in rule_statements] + # case no feeders + if 'feeders;' not in sanitized_rule_statements: + return + feeders_start_index = sanitized_rule_statements.index('feeders;') + 1 feeders_statements = rule_statements[feeders_start_index:] + + # don't disable twice + if len(feeders_statements) == 1 and feeders_statements[0].startswith(FEEDERS_ENCODING_PREFIX): + if error_if_disabled: + raise RuntimeError("The cube feeders are already disabled") + return + hashed_feeders = base64.b64encode('\n'.join(feeders_statements).encode('utf-8')).decode('utf-8') rule_statements[feeders_start_index:] = [f"{FEEDERS_ENCODING_PREFIX}{hashed_feeders}"] self._rules = Rules("\n".join(rule_statements)) - def disable_rules(self): + def disable_rules(self, error_if_disabled: bool = False): if not self.rules: # If there is no rule, there is nothing to do. return + # don't disable twice + if self.rules.text.startswith(RULES_ENCODING_PREFIX): + if error_if_disabled: + raise RuntimeError("The cube rules are already disabled") + return + # Encode the entire rule self._rules = Rules( f"{RULES_ENCODING_PREFIX}{base64.b64encode(self.rules.text.encode('utf-8')).decode('utf-8')}") diff --git a/Tests/Cube_test.py b/Tests/Cube_test.py index 55a5f8a5..8b790e19 100644 --- a/Tests/Cube_test.py +++ b/Tests/Cube_test.py @@ -1,4 +1,3 @@ -import base64 import unittest from TM1py import Cube, Rules @@ -50,23 +49,34 @@ def test_disable_rules_enable_rules(self): self.cube.rules.text ) - def test_disable_feeders(self): + def test_disable_feeders_enable_feeders(self): self.cube.rules = Rules( "SKIPCHECK;\n" "['d1':'e1'] = N: ['d1':'e2'] * 2;\n" "['d1':'e3'] = N: ['d1':'e4'] * 2;\n" "FEEDERS;\n" "['d1':'e2'] => ['d1':'e1'];\n" - "['d1':'e4'] => ['d1':'e3'];\n" + "['d1':'e4'] => ['d1':'e3'];" ) - self.cube.disable_feeders() + self.cube.disable_feeders() self.assertEqual( "# B64 ENCODED FEEDERS=WydkMSc6J2UyJ10gPT4gWydkMSc6J2UxJ107ClsnZDEnOidlNCddID0+IFsnZDEnOidlMyddOw==", self.cube.rules.text.splitlines()[-1] ) - def test_disable_feeders_enable_feeders(self): + self.cube.enable_feeders() + self.assertEqual( + "SKIPCHECK;\n" + "['d1':'e1'] = N: ['d1':'e2'] * 2;\n" + "['d1':'e3'] = N: ['d1':'e4'] * 2;\n" + "FEEDERS;\n" + "['d1':'e2'] => ['d1':'e1'];\n" + "['d1':'e4'] => ['d1':'e3'];", + self.cube.rules.text + ) + + def test_disable_feeders_twice_enable_feeders(self): self.cube.rules = Rules( "SKIPCHECK;\n" "['d1':'e1'] = N: ['d1':'e2'] * 2;\n" @@ -76,17 +86,69 @@ def test_disable_feeders_enable_feeders(self): "['d1':'e4'] => ['d1':'e3'];" ) - original_rules = self.cube.rules.text + self.cube.disable_feeders() + self.cube.disable_feeders() + self.assertEqual( + "# B64 ENCODED FEEDERS=WydkMSc6J2UyJ10gPT4gWydkMSc6J2UxJ107ClsnZDEnOidlNCddID0+IFsnZDEnOidlMyddOw==", + self.cube.rules.text.splitlines()[-1] + ) + + self.cube.enable_feeders() + self.assertEqual( + "SKIPCHECK;\n" + "['d1':'e1'] = N: ['d1':'e2'] * 2;\n" + "['d1':'e3'] = N: ['d1':'e4'] * 2;\n" + "FEEDERS;\n" + "['d1':'e2'] => ['d1':'e1'];\n" + "['d1':'e4'] => ['d1':'e3'];", + self.cube.rules.text + ) + + def test_disable_feeders_enable_feeders_no_feeders(self): + self.cube.rules = Rules( + "SKIPCHECK;\n" + "['d1':'e1'] = N: ['d1':'e2'] * 2;\n" + "['d1':'e3'] = N: ['d1':'e4'] * 2;\n" + "FEEDERS;\n" + ) self.cube.disable_feeders() + self.assertEqual( + "# B64 ENCODED FEEDERS=", + self.cube.rules.text.splitlines()[-1] + ) + self.cube.enable_feeders() + self.assertEqual( + "SKIPCHECK;\n" + "['d1':'e1'] = N: ['d1':'e2'] * 2;\n" + "['d1':'e3'] = N: ['d1':'e4'] * 2;\n" + "FEEDERS;\n", + self.cube.rules.text + ) + + def test_disable_enable_feeders_no_feeders_statement(self): + self.cube.rules = Rules( + "['d1':'e1'] = N: ['d1':'e2'] * 2;\n" + "['d1':'e3'] = N: ['d1':'e4'] * 2;\n" + ) + self.cube.disable_feeders() self.assertEqual( - original_rules, + "['d1':'e1'] = N: ['d1':'e2'] * 2;\n" + "['d1':'e3'] = N: ['d1':'e4'] * 2;\n", self.cube.rules.text ) - def test_enable_rules_disable_rules_with_comments(self): + self.cube.enable_feeders() + self.assertEqual( + "['d1':'e1'] = N: ['d1':'e2'] * 2;\n" + "['d1':'e3'] = N: ['d1':'e4'] * 2;\n", + self.cube.rules.text + ) + + + def test_disable_rules_enable_rules_with_comments(self): self.cube.rules = Rules( # Not Relevant "SKIPCHECK;\n" @@ -114,7 +176,7 @@ def test_enable_rules_disable_rules_with_comments(self): self.cube.rules.text ) - def test_enable_feeders_disable_feeders_with_comments(self): + def test_disable_feeders_enable_feeders_with_comments(self): self.cube.rules = Rules( # Not Relevant "SKIPCHECK;\n" @@ -142,7 +204,7 @@ def test_enable_feeders_disable_feeders_with_comments(self): self.cube.rules.text ) - def test_enable_rules_disable_rules_with_keywords(self): + def test_disable_rules_enable_rules_with_keywords(self): self.cube.rules = Rules( "FEEDSTRINGS;\n" "UNDEVFALS;\n" @@ -166,7 +228,7 @@ def test_enable_rules_disable_rules_with_keywords(self): self.cube.rules.text ) - def test_enable_feeders_disable_feeders_with_keywords(self): + def test_disable_feeders_enable_feeders_with_keywords(self): self.cube.rules = Rules( "FEEDSTRINGS;\n" "UNDEVFALS;\n" @@ -190,6 +252,34 @@ def test_enable_feeders_disable_feeders_with_keywords(self): self.cube.rules.text ) + def test_disable_feeders_twice_raise_error(self): + self.cube.rules = Rules( + "SKIPCHECK;\n" + "['d1':'e1'] = N: ['d1':'e2'] * 2;\n" + "['d1':'e3'] = N: ['d1':'e4'] * 2;\n" + "FEEDERS;\n" + "['d1':'e2'] => ['d1':'e1'];\n" + "['d1':'e4'] => ['d1':'e3'];" + ) + + self.cube.disable_feeders() + with self.assertRaises(RuntimeError): + self.cube.disable_feeders(error_if_disabled=True) + + def test_disable_rules_twice_raise_error(self): + self.cube.rules = Rules( + "SKIPCHECK;\n" + "['d1':'e1'] = N: ['d1':'e2'] * 2;\n" + "['d1':'e3'] = N: ['d1':'e4'] * 2;\n" + "FEEDERS;\n" + "['d1':'e2'] => ['d1':'e1'];\n" + "['d1':'e4'] => ['d1':'e3'];" + ) + + self.cube.disable_rules() + with self.assertRaises(RuntimeError): + self.cube.disable_rules(error_if_disabled=True) + if __name__ == '__main__': unittest.main()