Skip to content

Commit

Permalink
Handle empty feeders dynamically in disable,enable
Browse files Browse the repository at this point in the history
Also handle duplicated disables, enables
  • Loading branch information
MariusWirtz committed Apr 29, 2024
1 parent 0221f3b commit ccaa004
Show file tree
Hide file tree
Showing 2 changed files with 147 additions and 22 deletions.
57 changes: 46 additions & 11 deletions TM1py/Objects/Cube.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,11 +116,18 @@ def _construct_body(self) -> str:
body_as_dict['Rules'] = str(self.rules)
return json.dumps(body_as_dict, ensure_ascii=False)

def enable_rules(self):
def enable_rules(self, error_if_not_disabled: bool = False):
if not self.rules.text:
# If there is no rule, there is nothing to do.
return

if not self.rules.text.startswith(RULES_ENCODING_PREFIX):
if error_if_not_disabled:
raise RuntimeError(
f"The cube rules are already enabled. "
f"First line in Rules section must start with prefix: '{RULES_ENCODING_PREFIX}'")
return

rules_statements = self.rules.text.splitlines()
if not len(rules_statements) == 1:
raise RuntimeError(
Expand All @@ -136,26 +143,37 @@ def enable_rules(self):
encoded_rule = encoded_rules_statement[len(RULES_ENCODING_PREFIX):]
self._rules = Rules(base64.b64decode(encoded_rule).decode('utf-8'))

def enable_feeders(self):
def enable_feeders(self, error_if_not_disabled: bool = False):
# case no rule
if not self.rules:
# If there is no rule, there is nothing to do.
return

rules_statements = self.rules.text.splitlines()
rule_statements = self.rules.text.splitlines()
# sanitize statements to simplify identification of 'feeders;' line
sanitized_rule_statements = [
rule.strip().lower()
for rule
in rule_statements]

# case no feeders
if 'feeders;' not in sanitized_rule_statements:
return

encoded_feeders_statement = rules_statements[-1]
encoded_feeders_statement = rule_statements[-1]
if not encoded_feeders_statement.startswith(FEEDERS_ENCODING_PREFIX):
raise RuntimeError(
f"The cube feeders are not disabled correctly. "
f"First line in Feeders section must start with prefix: '{FEEDERS_ENCODING_PREFIX}'")
if error_if_not_disabled:
raise RuntimeError(
f"The cube feeders are not disabled correctly. "
f"First line in Feeders section must start with prefix: '{FEEDERS_ENCODING_PREFIX}'")
return

encoded_feeders = encoded_feeders_statement[len(FEEDERS_ENCODING_PREFIX):]
self._rules = Rules(
self.rules.text[:-len(encoded_feeders_statement)] + base64.b64decode(encoded_feeders).decode('utf-8'))

def disable_feeders(self):
def disable_feeders(self, error_if_disabled: bool = False):
# case no rule
if not self.rules:
# If there is no rule, there is nothing to do.
return

rule_statements = self.rules.text.splitlines()
Expand All @@ -166,18 +184,35 @@ def disable_feeders(self):
for rule
in rule_statements]

# case no feeders
if 'feeders;' not in sanitized_rule_statements:
return

feeders_start_index = sanitized_rule_statements.index('feeders;') + 1
feeders_statements = rule_statements[feeders_start_index:]

# don't disable twice
if len(feeders_statements) == 1 and feeders_statements[0].startswith(FEEDERS_ENCODING_PREFIX):
if error_if_disabled:
raise RuntimeError("The cube feeders are already disabled")
return

hashed_feeders = base64.b64encode('\n'.join(feeders_statements).encode('utf-8')).decode('utf-8')

rule_statements[feeders_start_index:] = [f"{FEEDERS_ENCODING_PREFIX}{hashed_feeders}"]
self._rules = Rules("\n".join(rule_statements))

def disable_rules(self):
def disable_rules(self, error_if_disabled: bool = False):
if not self.rules:
# If there is no rule, there is nothing to do.
return

# don't disable twice
if self.rules.text.startswith(RULES_ENCODING_PREFIX):
if error_if_disabled:
raise RuntimeError("The cube rules are already disabled")
return

# Encode the entire rule
self._rules = Rules(
f"{RULES_ENCODING_PREFIX}{base64.b64encode(self.rules.text.encode('utf-8')).decode('utf-8')}")
112 changes: 101 additions & 11 deletions Tests/Cube_test.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import base64
import unittest

from TM1py import Cube, Rules
Expand Down Expand Up @@ -50,23 +49,34 @@ def test_disable_rules_enable_rules(self):
self.cube.rules.text
)

def test_disable_feeders(self):
def test_disable_feeders_enable_feeders(self):
self.cube.rules = Rules(
"SKIPCHECK;\n"
"['d1':'e1'] = N: ['d1':'e2'] * 2;\n"
"['d1':'e3'] = N: ['d1':'e4'] * 2;\n"
"FEEDERS;\n"
"['d1':'e2'] => ['d1':'e1'];\n"
"['d1':'e4'] => ['d1':'e3'];\n"
"['d1':'e4'] => ['d1':'e3'];"
)
self.cube.disable_feeders()

self.cube.disable_feeders()
self.assertEqual(
"# B64 ENCODED FEEDERS=WydkMSc6J2UyJ10gPT4gWydkMSc6J2UxJ107ClsnZDEnOidlNCddID0+IFsnZDEnOidlMyddOw==",
self.cube.rules.text.splitlines()[-1]
)

def test_disable_feeders_enable_feeders(self):
self.cube.enable_feeders()
self.assertEqual(
"SKIPCHECK;\n"
"['d1':'e1'] = N: ['d1':'e2'] * 2;\n"
"['d1':'e3'] = N: ['d1':'e4'] * 2;\n"
"FEEDERS;\n"
"['d1':'e2'] => ['d1':'e1'];\n"
"['d1':'e4'] => ['d1':'e3'];",
self.cube.rules.text
)

def test_disable_feeders_twice_enable_feeders(self):
self.cube.rules = Rules(
"SKIPCHECK;\n"
"['d1':'e1'] = N: ['d1':'e2'] * 2;\n"
Expand All @@ -76,17 +86,69 @@ def test_disable_feeders_enable_feeders(self):
"['d1':'e4'] => ['d1':'e3'];"
)

original_rules = self.cube.rules.text
self.cube.disable_feeders()
self.cube.disable_feeders()
self.assertEqual(
"# B64 ENCODED FEEDERS=WydkMSc6J2UyJ10gPT4gWydkMSc6J2UxJ107ClsnZDEnOidlNCddID0+IFsnZDEnOidlMyddOw==",
self.cube.rules.text.splitlines()[-1]
)

self.cube.enable_feeders()
self.assertEqual(
"SKIPCHECK;\n"
"['d1':'e1'] = N: ['d1':'e2'] * 2;\n"
"['d1':'e3'] = N: ['d1':'e4'] * 2;\n"
"FEEDERS;\n"
"['d1':'e2'] => ['d1':'e1'];\n"
"['d1':'e4'] => ['d1':'e3'];",
self.cube.rules.text
)

def test_disable_feeders_enable_feeders_no_feeders(self):
self.cube.rules = Rules(
"SKIPCHECK;\n"
"['d1':'e1'] = N: ['d1':'e2'] * 2;\n"
"['d1':'e3'] = N: ['d1':'e4'] * 2;\n"
"FEEDERS;\n"
)

self.cube.disable_feeders()
self.assertEqual(
"# B64 ENCODED FEEDERS=",
self.cube.rules.text.splitlines()[-1]
)

self.cube.enable_feeders()
self.assertEqual(
"SKIPCHECK;\n"
"['d1':'e1'] = N: ['d1':'e2'] * 2;\n"
"['d1':'e3'] = N: ['d1':'e4'] * 2;\n"
"FEEDERS;\n",
self.cube.rules.text
)

def test_disable_enable_feeders_no_feeders_statement(self):
self.cube.rules = Rules(
"['d1':'e1'] = N: ['d1':'e2'] * 2;\n"
"['d1':'e3'] = N: ['d1':'e4'] * 2;\n"
)

self.cube.disable_feeders()
self.assertEqual(
original_rules,
"['d1':'e1'] = N: ['d1':'e2'] * 2;\n"
"['d1':'e3'] = N: ['d1':'e4'] * 2;\n",
self.cube.rules.text
)

def test_enable_rules_disable_rules_with_comments(self):
self.cube.enable_feeders()
self.assertEqual(
"['d1':'e1'] = N: ['d1':'e2'] * 2;\n"
"['d1':'e3'] = N: ['d1':'e4'] * 2;\n",
self.cube.rules.text
)


def test_disable_rules_enable_rules_with_comments(self):
self.cube.rules = Rules(
# Not Relevant
"SKIPCHECK;\n"
Expand Down Expand Up @@ -114,7 +176,7 @@ def test_enable_rules_disable_rules_with_comments(self):
self.cube.rules.text
)

def test_enable_feeders_disable_feeders_with_comments(self):
def test_disable_feeders_enable_feeders_with_comments(self):
self.cube.rules = Rules(
# Not Relevant
"SKIPCHECK;\n"
Expand Down Expand Up @@ -142,7 +204,7 @@ def test_enable_feeders_disable_feeders_with_comments(self):
self.cube.rules.text
)

def test_enable_rules_disable_rules_with_keywords(self):
def test_disable_rules_enable_rules_with_keywords(self):
self.cube.rules = Rules(
"FEEDSTRINGS;\n"
"UNDEVFALS;\n"
Expand All @@ -166,7 +228,7 @@ def test_enable_rules_disable_rules_with_keywords(self):
self.cube.rules.text
)

def test_enable_feeders_disable_feeders_with_keywords(self):
def test_disable_feeders_enable_feeders_with_keywords(self):
self.cube.rules = Rules(
"FEEDSTRINGS;\n"
"UNDEVFALS;\n"
Expand All @@ -190,6 +252,34 @@ def test_enable_feeders_disable_feeders_with_keywords(self):
self.cube.rules.text
)

def test_disable_feeders_twice_raise_error(self):
self.cube.rules = Rules(
"SKIPCHECK;\n"
"['d1':'e1'] = N: ['d1':'e2'] * 2;\n"
"['d1':'e3'] = N: ['d1':'e4'] * 2;\n"
"FEEDERS;\n"
"['d1':'e2'] => ['d1':'e1'];\n"
"['d1':'e4'] => ['d1':'e3'];"
)

self.cube.disable_feeders()
with self.assertRaises(RuntimeError):
self.cube.disable_feeders(error_if_disabled=True)

def test_disable_rules_twice_raise_error(self):
self.cube.rules = Rules(
"SKIPCHECK;\n"
"['d1':'e1'] = N: ['d1':'e2'] * 2;\n"
"['d1':'e3'] = N: ['d1':'e4'] * 2;\n"
"FEEDERS;\n"
"['d1':'e2'] => ['d1':'e1'];\n"
"['d1':'e4'] => ['d1':'e3'];"
)

self.cube.disable_rules()
with self.assertRaises(RuntimeError):
self.cube.disable_rules(error_if_disabled=True)


if __name__ == '__main__':
unittest.main()

0 comments on commit ccaa004

Please sign in to comment.