Skip to content

Commit

Permalink
Fix merge function cycle and multiple entry points issues (#44)
Browse files Browse the repository at this point in the history
* Fix merge function cycle and multiple entry points issues

* Fix cycle detection in WorkflowBuilder

* Add test for cycle detection in workflow builder

* Refactor build_workflow method in workflow_builder.py
  • Loading branch information
vGsteiger authored Jan 10, 2024
1 parent cfdf4cd commit 98c24f5
Show file tree
Hide file tree
Showing 2 changed files with 223 additions and 1 deletion.
24 changes: 24 additions & 0 deletions multi_x_serverless/deployment/client/deploy/workflow_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ def build_workflow(self, config: Config) -> Workflow: # pylint: disable=too-man
index_in_dag += 1
function_instances[predecessor_instance.name] = predecessor_instance

self._cycle_check(entry_point, config)

for successor_of_current_index, successor in enumerate(config.workflow_app.get_successors(entry_point)):
functions_to_visit.put((successor.handler, predecessor_instance.name, successor_of_current_index))

Expand All @@ -99,6 +101,7 @@ def build_workflow(self, config: Config) -> Workflow: # pylint: disable=too-man
if not multi_x_serverless_function.is_waiting_for_predecessors()
else f"{multi_x_serverless_function.name}:merge:{index_in_dag}"
)

index_in_dag += 1
# If the function is waiting for its predecessors, there can only be one instance of the function
# Otherwise, we create a new instance of the function for every predecessor
Expand All @@ -121,6 +124,27 @@ def build_workflow(self, config: Config) -> Workflow: # pylint: disable=too-man
functions: list[FunctionInstance] = list(function_instances.values())
return Workflow(resources=resources, functions=functions, edges=edges, name=config.workflow_name, config=config)

def _cycle_check(self, function: MultiXServerlessFunction, config: Config) -> None:
visiting: set[MultiXServerlessFunction] = set()
visited: set[MultiXServerlessFunction] = set()
self._dfs(function, visiting, visited, config)

def _dfs(
self,
node: MultiXServerlessFunction,
visiting: set[MultiXServerlessFunction],
visited: set[MultiXServerlessFunction],
config: Config,
) -> None:
visiting.add(node)
for successor in config.workflow_app.get_successors(node):
if successor in visiting:
raise RuntimeError(f"Cycle detected: {successor.name} is being visited again")
if successor not in visited:
self._dfs(successor, visiting, visited, config)
visiting.remove(node)
visited.add(node)

def _verify_providers(self, providers: list[dict]) -> None:
for provider in providers:
Provider(**provider)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,207 @@ def test_build_workflow_multiple_entry_points(self):
function2.handler = "function1"
function2.regions_and_providers = {"providers": []}
self.config.workflow_app.functions = {"function1": function1, "function2": function2}
with self.assertRaises(RuntimeError):
with self.assertRaisesRegex(RuntimeError, "Multiple entry points defined"):
self.builder.build_workflow(self.config)

def test_build_workflow_merge_case_self_cycle(self):
# Create mock functions
function1 = Mock(spec=MultiXServerlessFunction)
function1.entry_point = True
function1.name = "function1"
function1.handler = "function1"
function1.regions_and_providers = {}
function1.is_waiting_for_predecessors = Mock(return_value=False)

function2 = Mock(spec=MultiXServerlessFunction)
function2.entry_point = False
function2.name = "function2"
function2.handler = "function2"
function2.regions_and_providers = {"providers": []}
function2.is_waiting_for_predecessors = Mock(return_value=True) # This is a merge function

# Mock the workflow app to return the successors of function1
self.config.workflow_app.get_successors = Mock(return_value=[function2])

# Set the functions in the config
self.config.workflow_app.functions = {"function1": function1, "function2": function2}

# Call build_workflow
with self.assertRaisesRegex(
RuntimeError,
"Cycle detected: function2 is being visited again",
):
self.builder.build_workflow(self.config)

def test_build_workflow_merge_case_multiple_incoming(self):
# Create mock functions
function1 = Mock(spec=MultiXServerlessFunction)
function1.entry_point = True
function1.name = "function1"
function1.handler = "function1"
function1.regions_and_providers = {}
function1.is_waiting_for_predecessors = Mock(return_value=False)

function2 = Mock(spec=MultiXServerlessFunction)
function2.entry_point = False
function2.name = "function2"
function2.handler = "function2"
function2.regions_and_providers = {"providers": []}
function2.is_waiting_for_predecessors = Mock(return_value=True) # This is a merge function

# Mock the workflow app to return the successors of function1
# The first two calls are for the dfs, the next two for the actual successors
self.config.workflow_app.get_successors = Mock(
side_effect=[[function2, function2, function2], [], [function2, function2, function2], [], [], []]
)

# Set the functions in the config
self.config.workflow_app.functions = {"function1": function1, "function2": function2}

# Call build_workflow
workflow = self.builder.build_workflow(self.config)

self.assertEqual(len(workflow._edges), 3)

def test_cycle_detection(self):
# Create mock functions
function1 = Mock(spec=MultiXServerlessFunction)
function1.name = "function1"

function2 = Mock(spec=MultiXServerlessFunction)
function2.name = "function2"

# Create a mock config
config = Mock(spec=Config)
config.workflow_app.get_successors = Mock(side_effect=[[function2], [function1]])

# This should raise a RuntimeError because there is a cycle
with self.assertRaises(RuntimeError) as context:
self.builder._cycle_check(function1, config)

self.assertTrue("Cycle detected: function1 is being visited again" == str(context.exception))

def test_cycle_detection_no_cycle(self):
# Create mock functions
function1 = Mock(spec=MultiXServerlessFunction)
function1.name = "function1"

function2 = Mock(spec=MultiXServerlessFunction)
function2.name = "function2"

function3 = Mock(spec=MultiXServerlessFunction)
function3.name = "function3"

# Create a mock config
config = Mock(spec=Config)
config.workflow_app.get_successors = Mock(side_effect=[[function2], [function3], []])

# This should not raise a RuntimeError because there is no cycle
self.builder._cycle_check(function1, config)

def test_build_workflow_self_call(self):
# Create mock functions
function1 = Mock(spec=MultiXServerlessFunction)
function1.entry_point = True
function1.name = "function1"
function1.handler = "function1"
function1.regions_and_providers = {}
function1.is_waiting_for_predecessors = Mock(return_value=False)

# Mock the workflow app to return the successors of function1
self.config.workflow_app.get_successors = Mock(side_effect=[[function1], [], []])

# Set the functions in the config
self.config.workflow_app.functions = {"function1": function1}

# Call build_workflow
with self.assertRaisesRegex(
RuntimeError,
"Cycle detected: function1 is being visited again",
):
self.builder.build_workflow(self.config)

def test_build_workflow_merge_working(self):
# Create mock functions
function1 = Mock(spec=MultiXServerlessFunction)
function1.entry_point = True
function1.name = "function1"
function1.handler = "function1"
function1.regions_and_providers = {}
function1.is_waiting_for_predecessors = Mock(return_value=False)

function2 = Mock(spec=MultiXServerlessFunction)
function2.entry_point = False
function2.name = "function2"
function2.handler = "function2"
function2.regions_and_providers = {"providers": []}
function2.is_waiting_for_predecessors = Mock(return_value=True) # This is a merge function

# Mock the workflow app to return the successors of function1

self.config.workflow_app.get_successors = Mock(side_effect=[[function2], [], [function2], []])
# Set the functions in the config
self.config.workflow_app.functions = {"function1": function1, "function2": function2}

# Call build_workflow
workflow = self.builder.build_workflow(self.config)

self.assertEqual(len(workflow._edges), 1)

def test_build_workflow_cycle_in_function_calls(self):
# Create mock functions
function1 = Mock(spec=MultiXServerlessFunction)
function1.entry_point = True
function1.name = "function1"
function1.handler = "function1"
function1.regions_and_providers = {}
function1.is_waiting_for_predecessors = Mock(return_value=False)

function2 = Mock(spec=MultiXServerlessFunction)
function2.entry_point = False
function2.name = "function2"
function2.handler = "function2"
function2.regions_and_providers = {"providers": []}
function2.is_waiting_for_predecessors = Mock(return_value=False) # This is a merge function

# Mock the workflow app to return the successors of function1

self.config.workflow_app.get_successors = Mock(
side_effect=[[function2], [function1], [function2], [function1], []]
)
# Set the functions in the config
self.config.workflow_app.functions = {"function1": function1, "function2": function2}

# Call build_workflow and assert the specific error message
with self.assertRaisesRegex(
RuntimeError,
"Cycle detected: function1 is being visited again",
):
self.builder.build_workflow(self.config)

def test_build_workflow_merge_cycle(self):
# Create mock functions
function1 = Mock(spec=MultiXServerlessFunction)
function1.entry_point = True
function1.name = "function1"
function1.handler = "function1"
function1.regions_and_providers = {}
function1.is_waiting_for_predecessors = Mock(return_value=False)

function2 = Mock(spec=MultiXServerlessFunction)
function2.entry_point = False
function2.name = "function2"
function2.handler = "function2"
function2.regions_and_providers = {"providers": []}
function2.is_waiting_for_predecessors = Mock(return_value=True)

self.config.workflow_app.get_successors = Mock(side_effect=[[function2], [function2], []])

self.config.workflow_app.functions = {"function1": function1, "function2": function2}

# Call build_workflow
self.assertRaises(RuntimeError, self.builder.build_workflow, self.config)

@patch("os.path.join")
def test_get_function_role_with_policy_file(self, mock_join):
mock_join.return_value = "/path/to/policy"
Expand Down

0 comments on commit 98c24f5

Please sign in to comment.