Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add pipeline run with kwargs to Haystack 2.x #6266

Closed
wants to merge 14 commits into from
107 changes: 104 additions & 3 deletions haystack/preview/pipeline.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from typing import Any, Dict, Optional, Union, TextIO
from collections import defaultdict
from typing import Any, Dict, Optional, Union, TextIO, Tuple
from pathlib import Path
import datetime
import logging
Expand Down Expand Up @@ -35,14 +36,76 @@ def __init__(

def run(self, data: Dict[str, Any], debug: bool = False) -> Dict[str, Any]:
"""
Runs the pipeline.
Runs the pipeline with given input data.

:param data: A dictionary of inputs for the pipeline's components. Each key is a component name
and its value is a dictionary of that component's input parameters.
:param debug: Set to True to collect and return debug information.
:return: A dictionary containing the pipeline's output.
:raises PipelineRuntimeError: If a component fails or returns unexpected output.

Example a - Using named components:
Consider a 'Hello' component that takes a 'word' input and outputs a greeting.

```python
@component
class Hello:
@component.output_types(output=str)
def run(self, word: str):
return {"output": f"Hello, {word}!"}
```

Create a pipeline with two 'Hello' components connected together:

```python
pipeline = Pipeline()
pipeline.add_component("hello", Hello())
pipeline.add_component("hello2", Hello())
pipeline.connect("hello.output", "hello2.word")
result = pipeline.run(data={"hello": {"word": "world"}})
```

This runs the pipeline with the specified input for 'hello', yielding
{'hello2': {'output': 'Hello, Hello, world!!'}}.

Example b - Using flat inputs:
You can also pass inputs directly without specifying component names:

```python
result = pipeline.run(data={"word": "world"})
```

The pipeline resolves inputs to the correct components, returning
{'hello2': {'output': 'Hello, Hello, world!!'}}.
"""
# check whether the data is a nested dictionary of component inputs where each key is a component name
# and each value is a dictionary of input parameters for that component
is_nested_component_input = all(isinstance(value, dict) for value in data.values())
if is_nested_component_input:
return self._run_internal(data=data, debug=debug)
else:
# flat input, a dict where keys are input names and values are the corresponding values
# we need to convert it to a nested dictionary of component inputs and then run the pipeline
# just like in the previous case
pipeline_inputs, unresolved_inputs = self._prepare_component_input_data(data)
if unresolved_inputs:
logger.warning(
"Inputs %s were not matched to any component inputs, please check your run parameters.",
list(unresolved_inputs.keys()),
)

return self._run_internal(data=pipeline_inputs, debug=debug)

def _run_internal(self, data: Dict[str, Any], debug: bool = False) -> Dict[str, Any]:
"""
Runs the pipeline by invoking the underlying run to initiate the pipeline execution.

:params data: the inputs to give to the input components of the Pipeline.
:params debug: whether to collect and return debug information.

:returns: A dictionary with the outputs of the output components of the Pipeline.

:raises PipelineRuntimeError: if the any of the components fail or return unexpected output.
:raises PipelineRuntimeError: if any of the components fail or return unexpected output.
"""
pipeline_running(self)
return super().run(data=data, debug=debug)
Expand Down Expand Up @@ -97,3 +160,41 @@ def load(cls, fp: TextIO, marshaller: Marshaller = DEFAULT_MARSHALLER) -> "Pipel
:returns: A `Pipeline` object.
"""
return cls.from_dict(marshaller.unmarshal(fp.read()))

def _prepare_component_input_data(self, data: Dict[str, Any]) -> Tuple[Dict[str, Dict[str, Any]], Dict[str, Any]]:
"""
Organizes input data for pipeline components and identifies any inputs that are not matched to any
component's input slots.

This method processes a flat dictionary of input data, where each key-value pair represents an input name
and its corresponding value. It distributes these inputs to the appropriate pipeline components based on
their input requirements. Inputs that don't match any component's input slots are classified as unresolved.

:param data: A dictionary with input names as keys and input values as values.
:type data: Dict[str, Any]
:return: A tuple containing two elements:
1. A dictionary mapping component names to their respective matched inputs.
2. A dictionary of inputs that were not matched to any component, termed as unresolved keyword arguments.
:rtype: Tuple[Dict[str, Dict[str, Any]], Dict[str, Any]]
"""
pipeline_input_data: Dict[str, Dict[str, Any]] = defaultdict(dict)
unresolved_kwargs = {}

# Retrieve the input slots for each component in the pipeline
available_inputs: Dict[str, Dict[str, Any]] = self.inputs()

# Go through all provided to distribute them to the appropriate component inputs
for input_name, input_value in data.items():
resolved_at_least_once = False

# Check each component to see if it has a slot for the current kwarg
for component_name, component_inputs in available_inputs.items():
if input_name in component_inputs:
# If a match is found, add the kwarg to the component's input data
pipeline_input_data[component_name][input_name] = input_value
resolved_at_least_once = True

if not resolved_at_least_once:
unresolved_kwargs[input_name] = input_value

return pipeline_input_data, unresolved_kwargs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
preview:
- |
Adds option to run pipelines without specifying component inputs and their corresponding key/value pairs. Instead,
provide the input keys/values directly, and the pipeline's internal mechanisms will automatically determine the
appropriate components.
108 changes: 108 additions & 0 deletions test/preview/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,111 @@ def test_pipeline_load(test_files_path):
assert pipeline.max_loops_allowed == 99
assert isinstance(pipeline.get_component("Comp1"), TestComponent)
assert isinstance(pipeline.get_component("Comp2"), TestComponent)


@pytest.mark.unit
def test_pipeline_resolution_simple_input():
@component
class Hello:
@component.output_types(output=str)
def run(self, word: str):
"""
Takes a string in input and returns "Hello, <string>!"
in output.
"""
return {"output": f"Hello, {word}!"}

pipeline = Pipeline()
pipeline.add_component("hello", Hello())
pipeline.add_component("hello2", Hello())

pipeline.connect("hello.output", "hello2.word")
result = pipeline.run(data={"hello": {"word": "world"}})
assert result == {"hello2": {"output": "Hello, Hello, world!!"}}

result = pipeline.run(data={"word": "world"})
assert result == {"hello2": {"output": "Hello, Hello, world!!"}}


def test_pipeline_resolution_wrong_input_name(caplog):
@component
class Hello:
@component.output_types(output=str)
def run(self, who: str):
"""
Takes a string in input and returns "Hello, <string>!"
in output.
"""
return {"output": f"Hello, {who}!"}

pipeline = Pipeline()
pipeline.add_component("hello", Hello())
pipeline.add_component("hello2", Hello())

pipeline.connect("hello.output", "hello2.who")

# test case with nested component inputs
with pytest.raises(ValueError):
pipeline.run(data={"hello": {"non_existing_input": "world"}})

# test case with flat component inputs
with pytest.raises(ValueError):
pipeline.run(data={"non_existing_input": "world"})

# important to check that the warning is logged for UX purposes, leave it here
assert "were not matched to any component" in caplog.text


def test_pipeline_resolution_with_mixed_correct_and_incorrect_input_names(caplog):
@component
class Hello:
@component.output_types(output=str)
def run(self, who: str):
"""
Takes a string in input and returns "Hello, <string>!"
in output.
"""
return {"output": f"Hello, {who}!"}

pipeline = Pipeline()
pipeline.add_component("hello", Hello())
pipeline.add_component("hello2", Hello())

pipeline.connect("hello.output", "hello2.who")

# test case with nested component inputs
# this will raise ValueError because hello component does not have an input named "non_existing_input"
# even though it has an input named "who"
with pytest.raises(ValueError):
pipeline.run(data={"hello": {"non_existing_input": "world", "who": "world"}})

# test case with flat component inputs
# this will not raise ValueError because the input "who" will be resolved to the correct component
# and we'll log a warning for the input "non_existing_input" which was not resolved
result = pipeline.run(data={"non_existing_input": "world", "who": "world"})
assert result == {"hello2": {"output": "Hello, Hello, world!!"}}

# important to check that the warning is logged for UX purposes, leave it here
assert "were not matched to any component" in caplog.text


def test_pipeline_resolution_duplicate_input_names_across_components():
@component
class Hello:
@component.output_types(output=str)
def run(self, who: str, what: str):
return {"output": f"Hello {who} {what}!"}

pipe = Pipeline()
pipe.add_component("hello", Hello())
pipe.add_component("hello2", Hello())

pipe.connect("hello.output", "hello2.who")

result = pipe.run(data={"what": "Haystack", "who": "world"})
assert result == {"hello2": {"output": "Hello Hello world Haystack! Haystack!"}}

resolved, _ = pipe._prepare_component_input_data(data={"what": "Haystack", "who": "world"})

# why does hello2 have only one input? Because who of hello2 is inserted from hello.output
assert resolved == {"hello": {"what": "Haystack", "who": "world"}, "hello2": {"what": "Haystack"}}