diff --git a/haystack/core/pipeline/base.py b/haystack/core/pipeline/base.py index e683f46c5e..e3c139b6f3 100644 --- a/haystack/core/pipeline/base.py +++ b/haystack/core/pipeline/base.py @@ -719,7 +719,7 @@ def _init_inputs_state(self, data: Dict[str, Dict[str, Any]]) -> Dict[str, Dict[ return {**data} - def _init_to_run(self) -> List[Tuple[str, Component]]: + def _init_to_run(self, pipeline_inputs: Dict[str, Any]) -> List[Tuple[str, Component]]: to_run: List[Tuple[str, Component]] = [] for node_name in self.graph.nodes: component = self.graph.nodes[node_name]["instance"] @@ -729,6 +729,11 @@ def _init_to_run(self) -> List[Tuple[str, Component]]: to_run.append((node_name, component)) continue + if node_name in pipeline_inputs: + # This component is in the input data, if it has enough inputs it can run right away + to_run.append((node_name, component)) + continue + for socket in component.__haystack_input__._sockets_dict.values(): if not socket.senders or socket.is_variadic: # Component has at least one input not connected or is variadic, can run right away. diff --git a/haystack/core/pipeline/pipeline.py b/haystack/core/pipeline/pipeline.py index ed60e6f06a..5e51c28426 100644 --- a/haystack/core/pipeline/pipeline.py +++ b/haystack/core/pipeline/pipeline.py @@ -112,9 +112,12 @@ def run(self, word: str): # Initialize the inputs state last_inputs: Dict[str, Dict[str, Any]] = self._init_inputs_state(data) - # Take all components that have at least 1 input not connected or is variadic, - # and all components that have no inputs at all - to_run: List[Tuple[str, Component]] = self._init_to_run() + # Take all components that: + # - have no inputs + # - receive input from the user + # - have at least one input not connected + # - have at least one input that is variadic + to_run: List[Tuple[str, Component]] = self._init_to_run(data) # These variables are used to detect when we're stuck in a loop. # Stuck loops can happen when one or more components are waiting for input but @@ -232,8 +235,15 @@ def run(self, word: str): if name != sender_component_name: continue + pair = (receiver_component_name, self.graph.nodes[receiver_component_name]["instance"]) if edge_data["from_socket"].name not in res: - # This output has not been produced by the component, skip it + # The component didn't produce any output for this socket. + # We can't run the receiver, let's remove it from the list of components to run + # or we risk running it if it's in those lists. + if pair in to_run: + to_run.remove(pair) + if pair in waiting_for_input: + waiting_for_input.remove(pair) continue if receiver_component_name not in last_inputs: @@ -249,7 +259,6 @@ def run(self, word: str): else: last_inputs[receiver_component_name][edge_data["to_socket"].name] = value - pair = (receiver_component_name, self.graph.nodes[receiver_component_name]["instance"]) is_greedy = pair[1].__haystack_is_greedy__ is_variadic = edge_data["to_socket"].is_variadic if is_variadic and is_greedy: diff --git a/releasenotes/notes/fix-conditional-branching-a0f0d65c7ac97f71.yaml b/releasenotes/notes/fix-conditional-branching-a0f0d65c7ac97f71.yaml new file mode 100644 index 0000000000..426ad87140 --- /dev/null +++ b/releasenotes/notes/fix-conditional-branching-a0f0d65c7ac97f71.yaml @@ -0,0 +1,7 @@ +--- +fixes: + - | + Fix some bugs running a Pipeline that has Components with conditional outputs. + Some branches that were expected not to run would run anyway, even if they received no inputs. + Some branches instead would cause the Pipeline to get stuck waiting to run that branch, even if they received no inputs. + The behaviour would depend whether the Component not receiving the input has a optional input or not. diff --git a/test/core/pipeline/test_pipeline.py b/test/core/pipeline/test_pipeline.py index 1f9408052f..42e787f578 100644 --- a/test/core/pipeline/test_pipeline.py +++ b/test/core/pipeline/test_pipeline.py @@ -694,19 +694,23 @@ def test__init_to_run(self): pipe.add_component("with_no_inputs", ComponentWithNoInputs()) pipe.add_component("with_single_input", ComponentWithSingleInput()) pipe.add_component("another_with_single_input", ComponentWithSingleInput()) + pipe.add_component("yet_another_with_single_input", ComponentWithSingleInput()) pipe.add_component("with_multiple_inputs", ComponentWithMultipleInputs()) + pipe.connect("yet_another_with_single_input.out", "with_variadic.in") pipe.connect("with_no_inputs.out", "with_variadic.in") pipe.connect("with_single_input.out", "another_with_single_input.in") pipe.connect("another_with_single_input.out", "with_multiple_inputs.in1") pipe.connect("with_multiple_inputs.out", "with_variadic.in") - to_run = pipe._init_to_run() - assert len(to_run) == 4 + data = {"yet_another_with_single_input": {"in": 1}} + to_run = pipe._init_to_run(data) + assert len(to_run) == 5 assert to_run[0][0] == "with_variadic" assert to_run[1][0] == "with_no_inputs" assert to_run[2][0] == "with_single_input" - assert to_run[3][0] == "with_multiple_inputs" + assert to_run[3][0] == "yet_another_with_single_input" + assert to_run[4][0] == "with_multiple_inputs" def test__init_inputs_state(self): pipe = Pipeline()