Skip to content

Commit

Permalink
fix: Fix running Pipeline with conditional branch and Component wit…
Browse files Browse the repository at this point in the history
…h default inputs (#7799)

* Fix running Pipeline with conditional branch and Component with default inputs

* Add release notes

* Change arg name of _init_to_run so it's clearer

* Enhance release note
  • Loading branch information
silvanocerza authored and vblagoje committed Jul 3, 2024
1 parent 3194785 commit 0c7bbfb
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 9 deletions.
7 changes: 6 additions & 1 deletion haystack/core/pipeline/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -719,7 +719,7 @@ def _init_inputs_state(self, data: Dict[str, Dict[str, Any]]) -> Dict[str, Dict[

return {**data}

def _init_to_run(self) -> List[Tuple[str, Component]]:
def _init_to_run(self, pipeline_inputs: Dict[str, Any]) -> List[Tuple[str, Component]]:
to_run: List[Tuple[str, Component]] = []
for node_name in self.graph.nodes:
component = self.graph.nodes[node_name]["instance"]
Expand All @@ -729,6 +729,11 @@ def _init_to_run(self) -> List[Tuple[str, Component]]:
to_run.append((node_name, component))
continue

if node_name in pipeline_inputs:
# This component is in the input data, if it has enough inputs it can run right away
to_run.append((node_name, component))
continue

for socket in component.__haystack_input__._sockets_dict.values():
if not socket.senders or socket.is_variadic:
# Component has at least one input not connected or is variadic, can run right away.
Expand Down
19 changes: 14 additions & 5 deletions haystack/core/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,12 @@ def run(self, word: str):
# Initialize the inputs state
last_inputs: Dict[str, Dict[str, Any]] = self._init_inputs_state(data)

# Take all components that have at least 1 input not connected or is variadic,
# and all components that have no inputs at all
to_run: List[Tuple[str, Component]] = self._init_to_run()
# Take all components that:
# - have no inputs
# - receive input from the user
# - have at least one input not connected
# - have at least one input that is variadic
to_run: List[Tuple[str, Component]] = self._init_to_run(data)

# These variables are used to detect when we're stuck in a loop.
# Stuck loops can happen when one or more components are waiting for input but
Expand Down Expand Up @@ -232,8 +235,15 @@ def run(self, word: str):
if name != sender_component_name:
continue

pair = (receiver_component_name, self.graph.nodes[receiver_component_name]["instance"])
if edge_data["from_socket"].name not in res:
# This output has not been produced by the component, skip it
# The component didn't produce any output for this socket.
# We can't run the receiver, let's remove it from the list of components to run
# or we risk running it if it's in those lists.
if pair in to_run:
to_run.remove(pair)
if pair in waiting_for_input:
waiting_for_input.remove(pair)
continue

if receiver_component_name not in last_inputs:
Expand All @@ -249,7 +259,6 @@ def run(self, word: str):
else:
last_inputs[receiver_component_name][edge_data["to_socket"].name] = value

pair = (receiver_component_name, self.graph.nodes[receiver_component_name]["instance"])
is_greedy = pair[1].__haystack_is_greedy__
is_variadic = edge_data["to_socket"].is_variadic
if is_variadic and is_greedy:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
fixes:
- |
Fix some bugs running a Pipeline that has Components with conditional outputs.
Some branches that were expected not to run would run anyway, even if they received no inputs.
Some branches instead would cause the Pipeline to get stuck waiting to run that branch, even if they received no inputs.
The behaviour would depend whether the Component not receiving the input has a optional input or not.
10 changes: 7 additions & 3 deletions test/core/pipeline/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -694,19 +694,23 @@ def test__init_to_run(self):
pipe.add_component("with_no_inputs", ComponentWithNoInputs())
pipe.add_component("with_single_input", ComponentWithSingleInput())
pipe.add_component("another_with_single_input", ComponentWithSingleInput())
pipe.add_component("yet_another_with_single_input", ComponentWithSingleInput())
pipe.add_component("with_multiple_inputs", ComponentWithMultipleInputs())

pipe.connect("yet_another_with_single_input.out", "with_variadic.in")
pipe.connect("with_no_inputs.out", "with_variadic.in")
pipe.connect("with_single_input.out", "another_with_single_input.in")
pipe.connect("another_with_single_input.out", "with_multiple_inputs.in1")
pipe.connect("with_multiple_inputs.out", "with_variadic.in")

to_run = pipe._init_to_run()
assert len(to_run) == 4
data = {"yet_another_with_single_input": {"in": 1}}
to_run = pipe._init_to_run(data)
assert len(to_run) == 5
assert to_run[0][0] == "with_variadic"
assert to_run[1][0] == "with_no_inputs"
assert to_run[2][0] == "with_single_input"
assert to_run[3][0] == "with_multiple_inputs"
assert to_run[3][0] == "yet_another_with_single_input"
assert to_run[4][0] == "with_multiple_inputs"

def test__init_inputs_state(self):
pipe = Pipeline()
Expand Down

0 comments on commit 0c7bbfb

Please sign in to comment.