diff --git a/haystack_experimental/components/tools/openai/pipeline_caller.py b/haystack_experimental/components/tools/openai/pipeline_caller.py new file mode 100644 index 00000000..b17c43a7 --- /dev/null +++ b/haystack_experimental/components/tools/openai/pipeline_caller.py @@ -0,0 +1,310 @@ +# SPDX-FileCopyrightText: 2022-present deepset GmbH +# +# SPDX-License-Identifier: Apache-2.0 + +### This module is used to generate the OpenAI schema used for function/tool calling from a given Haystack pipeline. +### The main function is `extract_pipeline_parameters` which returns OpenAI compatible schema used in the Tool class. + +from dataclasses import MISSING, fields, is_dataclass +from inspect import getdoc +from typing import Any, Callable, Dict, List, Set, Tuple, Union, get_args, get_origin + +from docstring_parser import parse +from haystack import Pipeline, logging +from haystack.utils import deserialize_type + +from haystack_experimental.util.utils import is_pydantic_v2_model + +logger = logging.getLogger(__name__) + + +def extract_pipeline_parameters(pipeline: Pipeline) -> Dict[str, Any]: + """ + Extracts parameters from pipeline inputs and converts them to OpenAI tools JSON format. + + :param pipeline: The pipeline to extract parameters from. + :returns: A dictionary representing the pipeline's input parameters schema. + """ + properties = {} + required = [] + + pipeline_inputs = pipeline.inputs() + + for component_name, component_inputs in pipeline_inputs.items(): + component = pipeline.get_component(component_name) + param_descriptions = get_param_descriptions(component.run) + + for input_name, input_info in component_inputs.items(): + # Avoid name clashes by prefixing parameter names with the component name + prefixed_input_name = f"{component_name}.{input_name}" + + input_type = input_info.get("type") or Any + + description = param_descriptions.get(input_name, f"Input '{input_name}' for component '{component_name}'.") + + try: + property_schema = create_property_schema(input_type, description) + except ValueError as e: + raise ValueError(f"Error processing input '{prefixed_input_name}': {e}") + + properties[prefixed_input_name] = property_schema + + if input_info.get("is_mandatory", False): + required.append(prefixed_input_name) + + parameters_schema = {"type": "object", "properties": properties} + + if required: + parameters_schema["required"] = required + + return parameters_schema + + +def get_param_descriptions(method: Callable) -> Dict[str, str]: + """ + Extracts parameter descriptions from the method's docstring using docstring_parser. + + :param method: The method to extract parameter descriptions from. + :returns: A dictionary mapping parameter names to their descriptions. + """ + docstring = getdoc(method) + if not docstring: + return {} + + parsed_doc = parse(docstring) + return {param.arg_name: param.description.strip() for param in parsed_doc.params} + + +def create_property_schema(python_type: Any, description: str, default: Any = None) -> Dict[str, Any]: # noqa: PLR0912, PLR0915 + """ + Creates a property schema for a given Python type, recursively if necessary. + + :param python_type: The Python type to create a property schema for. + :param description: The description of the property. + :param default: The default value of the property. + :returns: A dictionary representing the property schema. + """ + nullable = is_nullable_type(python_type) + if nullable: + # Remove NoneType from the Union to get the actual types + non_none_types = [t for t in get_args(python_type) if t is not type(None)] + python_type = select_preferred_type(non_none_types) + else: + python_type = resolve_forward_ref(python_type) + + if not is_supported_type(python_type): + # Assume it is a string type + property_schema = {"type": "string", "description": description} + if default is not None: + property_schema["default"] = default + return property_schema + + openai_type = get_openai_type(python_type) + property_schema = {"type": openai_type, "description": description} + + if default is not None: + property_schema["default"] = default + + if openai_type == "array": + item_type = get_args(python_type)[0] if get_args(python_type) else Any + item_type = resolve_forward_ref(item_type) + if not is_supported_type(item_type): + # Assume item type is string + items_schema = {"type": "string"} + else: + # Create items schema without 'description' + items_schema = create_property_schema(item_type, "") + items_schema.pop("description", None) + property_schema["items"] = items_schema + + elif openai_type == "object": + # Support both dataclasses and Pydantic v2 + if is_dataclass(python_type) or is_pydantic_v2_model(python_type): + # Handle dataclasses and Pydantic models by their fields + property_schema["properties"] = {} + required_fields = [] + + if is_dataclass(python_type): + model_fields = fields(python_type) + for field in model_fields: + field_description = f"Field '{field.name}' of '{python_type.__name__}'." + field_schema = create_property_schema(field.type, field_description) + property_schema["properties"][field.name] = field_schema + + # Add to required fields if the field has no default value + if field.default is MISSING and field.default_factory is MISSING: + required_fields.append(field.name) + else: # Pydantic v2 model + model_fields = python_type.model_fields + for name, field in model_fields.items(): + field_description = f"Field '{name}' of '{python_type.__name__}'." + field_schema = create_property_schema(field.annotation, field_description) + property_schema["properties"][name] = field_schema + + if field.is_required(): + required_fields.append(name) + + if required_fields: + property_schema["required"] = required_fields + + elif get_origin(python_type) is dict: + # For dicts, specify the value type using 'additionalProperties' + args = get_args(python_type) + # Check for key and value type args since Dict[K, V] has 2 type parameters + if args and len(args) == 2: # noqa: PLR2004 + _, value_type = args + value_type = resolve_forward_ref(value_type) + if is_any_type(value_type): + # Allow any type of value + property_schema["additionalProperties"] = {} + elif not is_supported_type(value_type): + # Assume value type is string + property_schema["additionalProperties"] = {"type": "string"} + else: + property_schema["additionalProperties"] = create_property_schema(value_type, description) + else: + property_schema["additionalProperties"] = {"type": "string"} + else: + # Assume object is a string type + openai_type = "string" + property_schema = {"type": openai_type, "description": description} + if default is not None: + property_schema["default"] = default + + return property_schema + + +def is_nullable_type(python_type: Any) -> bool: + """ + Checks if the type is a Union with NoneType (i.e., Optional). + + :param python_type: The Python type to check. + :returns: True if the type is a Union with NoneType, False otherwise. + """ + origin = get_origin(python_type) + if origin is Union: + return type(None) in get_args(python_type) + return False + + +def is_basic_python_type(python_type: Any) -> bool: + """ + Checks if the type is a basic Python type. + + :param python_type: The Python type to check. + :returns: True if the type is a basic Python type, False otherwise. + """ + return isinstance(python_type, type) and issubclass(python_type, (str, int, float, bool, list, dict)) + + +def is_supported_type(python_type: Any) -> bool: + """ + Checks if the type is a basic type, a dataclass, a Pydantic v2 model, or a supported generic type. + + :param python_type: The Python type to check. + :returns: True if the type is a basic type, a dataclass, + a Pydantic v2 model, or a supported generic type, False otherwise. + """ + return ( + is_basic_python_type(python_type) + or is_dataclass(python_type) + or is_pydantic_v2_model(python_type) + or is_supported_generic(python_type) + or is_any_type(python_type) + ) + + +def is_supported_generic(python_type: Any) -> bool: + """ + Checks if the type is a supported generic type like List or Dict. + + :param python_type: The Python type to check. + :returns: True if the type is a supported generic type, False otherwise. + """ + origin = get_origin(python_type) + return origin in (list, List, dict, Dict) + + +def resolve_forward_ref(python_type: Any) -> Any: + """ + Resolves forward references to actual types. + + :param python_type: The Python type to resolve. + :returns: The resolved Python type. + """ + if isinstance(python_type, str): + python_type = deserialize_type(python_type) + return python_type + + +def select_preferred_type(types: List[Any]) -> Any: + """ + Selects the preferred type from a list of types. + + :param types: The list of types to select from. + :returns: The preferred type. + """ + # Resolve forward references + types_resolved = [resolve_forward_ref(t) for t in types] + + # Prefer basic types + for t in types_resolved: + if is_basic_python_type(t): + return t + + # Then prefer dataclasses + for t in types_resolved: + if is_dataclass(t): + return t + + # If none matched, return the first resolved type + if types_resolved: + return types_resolved[0] + + raise ValueError(f"No supported types found in Union: {types}") + + +def get_openai_type(python_type: Any) -> str: # noqa: PLR0911 + """ + Converts Python types to OpenAI schema types. + + :param python_type: The Python type to convert. + :returns: The OpenAI schema type. + """ + python_type = resolve_forward_ref(python_type) + + if is_any_type(python_type): + return "object" # Allow any JSON structure + + if is_basic_python_type(python_type): + if issubclass(python_type, str): + return "string" + elif issubclass(python_type, int): + return "integer" + elif issubclass(python_type, float): + return "number" + elif issubclass(python_type, bool): + return "boolean" + elif issubclass(python_type, (list,)): + return "array" + elif issubclass(python_type, (dict,)): + return "object" + elif is_dataclass(python_type) or is_pydantic_v2_model(python_type): + return "object" + elif get_origin(python_type) in (list, List, tuple, Tuple, set, Set): + return "array" + elif get_origin(python_type) in (dict, Dict): + return "object" + + # If none of the above conditions are met, raise an error + raise ValueError(f"Unsupported type: {python_type}") + + +def is_any_type(python_type: Any) -> bool: + """ + Checks if the type is typing.Any. + + :param python_type: The Python type to check. + :returns: True if the type is typing.Any, False otherwise. + """ + return python_type is Any or str(python_type) == "typing.Any" diff --git a/haystack_experimental/dataclasses/tool.py b/haystack_experimental/dataclasses/tool.py index 33719524..2198ae90 100644 --- a/haystack_experimental/dataclasses/tool.py +++ b/haystack_experimental/dataclasses/tool.py @@ -3,18 +3,25 @@ # SPDX-License-Identifier: Apache-2.0 import inspect -from dataclasses import asdict, dataclass -from typing import Any, Callable, Dict, Optional +from collections import defaultdict +from dataclasses import asdict, dataclass, is_dataclass +from typing import Any, Callable, Dict, List, Optional, Union, get_args, get_origin, get_type_hints +from haystack import Pipeline, logging from haystack.lazy_imports import LazyImport from haystack.utils import deserialize_callable, serialize_callable from pydantic import create_model +from haystack_experimental.util.utils import is_pydantic_v2_model + with LazyImport(message="Run 'pip install jsonschema'") as jsonschema_import: from jsonschema import Draft202012Validator from jsonschema.exceptions import SchemaError +logger = logging.getLogger(__name__) + + class ToolInvocationError(Exception): """ Exception raised when a Tool invocation fails. @@ -198,6 +205,120 @@ def get_weather( return Tool(name=name or function.__name__, description=tool_description, parameters=schema, function=function) + @classmethod + def from_pipeline(cls, pipeline: Pipeline, name: str, description: str) -> "Tool": + """ + Create a Tool instance from a Pipeline. + + :param pipeline: + The Pipeline to be converted into a Tool. + :param name: + Name for the tool. + :param description: + Description of the tool. + :returns: + The Tool created from the Pipeline. + :raises ValueError: + If the pipeline is invalid or schema generation fails. + """ + from haystack_experimental.components.tools.openai.pipeline_caller import extract_pipeline_parameters + + # Extract the parameters schema from the pipeline components + parameters = extract_pipeline_parameters(pipeline) + + def _convert_to_dataclass(data: Any, data_type: Any) -> Any: + """ + Recursively convert dictionaries into dataclass instances based on the provided data type. + + This function handles nested dataclasses by recursively converting each field. + + :param data: + The input data to convert. + :param data_type: + The target data type, expected to be a dataclass type. + :returns: + An instance of the dataclass with data populated from the input dictionary. + """ + if data is None or not isinstance(data, dict): + return data + + # Check if the target type is a dataclass + if is_dataclass(data_type): + # Get field types for the dataclass (field name -> field type) + field_types = get_type_hints(data_type) + converted_data = {} + # Recursively convert each field in the dataclass + for field_name, field_type in field_types.items(): + if field_name in data: + # Recursive step: convert nested dictionaries into dataclass instances + converted_data[field_name] = _convert_to_dataclass(data[field_name], field_type) + # Instantiate the dataclass with the converted data + return data_type(**converted_data) + # If data_type is not a dataclass, return the data unchanged + return data + + def pipeline_invoker(**kwargs): + """ + Invokes the pipeline using keyword arguments provided by the LLM function calling/tool generated response. + + It remaps the LLM's function call payload to match the pipeline's `run` method expected format. + + :param kwargs: + The keyword arguments to invoke the pipeline with. + :returns: + The result of the pipeline invocation. + """ + pipeline_kwargs = defaultdict(dict) + for component_param, component_input in kwargs.items(): + if "." in component_param: + # Split parameter into component name and parameter name + component_name, param_name = component_param.split(".", 1) + # Retrieve the component from the pipeline + component = pipeline.get_component(component_name) + # Get the parameter from the signature, checking if it exists + param = inspect.signature(component.run).parameters.get(param_name) + # Use the parameter annotation if it exists, otherwise assume a string type + param_type: Any = param.annotation if param else str + + # Determine the origin type (e.g., list) and target_type (inner type) + origin: Any = get_origin(param_type) or param_type + target_type: Any + values_to_convert: Union[Any, List[Any]] + + if origin is list: + # Parameter is a list; get the element type + target_type = get_args(param_type)[0] + values_to_convert = component_input + else: + # Parameter is a single value + target_type = param_type + values_to_convert = [component_input] + + # Convert dictionary inputs into dataclass or Pydantic model instances if necessary + if is_dataclass(target_type) or is_pydantic_v2_model(target_type): + converted = [ + target_type.model_validate(item) + if is_pydantic_v2_model(target_type) + else _convert_to_dataclass(item, target_type) + for item in values_to_convert + if isinstance(item, dict) + ] + # Update the component input with the converted data + component_input = converted if origin is list else converted[0] + + # Map the parameter to the component in the pipeline kwargs + pipeline_kwargs[component_name][param_name] = component_input + else: + # Handle global parameters not associated with a specific component + pipeline_kwargs[component_param] = component_input + + logger.debug(f"Invoking pipeline (as tool) with kwargs: {pipeline_kwargs}") + # Invoke the pipeline with the prepared arguments + return pipeline.run(data=pipeline_kwargs) + + # Return a new Tool instance with the pipeline invoker as the function to be called + return Tool(name=name, description=description, parameters=parameters, function=pipeline_invoker) + def _remove_title_from_schema(schema: Dict[str, Any]): """ diff --git a/haystack_experimental/util/utils.py b/haystack_experimental/util/utils.py index 59beeacd..568f3930 100644 --- a/haystack_experimental/util/utils.py +++ b/haystack_experimental/util/utils.py @@ -2,7 +2,7 @@ # # SPDX-License-Identifier: Apache-2.0 -from typing import List, Union +from typing import Any, List, Union def expand_page_range(page_range: List[Union[str, int]]) -> List[int]: @@ -41,3 +41,13 @@ def expand_page_range(page_range: List[Union[str, int]]) -> List[int]: raise ValueError("No valid page numbers or ranges found in the input list") return expanded_page_range + + +def is_pydantic_v2_model(instance: Any) -> bool: + """ + Checks if the instance is a Pydantic v2 model. + + :param instance: The instance to check. + :returns: True if the instance is a Pydantic v2 model, False otherwise. + """ + return hasattr(instance, "model_validate") diff --git a/test/components/tools/test_tool_pipeline.py b/test/components/tools/test_tool_pipeline.py new file mode 100644 index 00000000..3570e563 --- /dev/null +++ b/test/components/tools/test_tool_pipeline.py @@ -0,0 +1,1360 @@ +import os +from typing import Dict, List, Optional, Any +from dataclasses import dataclass +from haystack import Pipeline, component +from pydantic import BaseModel +import pytest +from haystack_experimental.components.generators.chat.openai import OpenAIChatGenerator +from haystack_experimental.components.tools.tool_invoker import ToolInvoker +from haystack_experimental.dataclasses.chat_message import ChatMessage +from haystack_experimental.dataclasses.tool import Tool +import json + + +### Component and Model Definitions, helper classes used in the tests + +### Some classes used in the tests have comments and some don't. +### This is intentional to test that the tool schema generation works for both commented and uncommented dataclasses. +@component +class SimpleComponent: + """A simple component that generates text.""" + + @component.output_types(reply=str) + def run(self, text: str) -> Dict[str, str]: + """ + A simple component that generates text. + + :param text: The text to generate. + :return: A dictionary with the generated text. + """ + return {"reply": f"Hello, {text}!"} + +class Product(BaseModel): + """A product model.""" + name: str + price: float + +@dataclass +class User: + """A simple user dataclass.""" + name: str = "Anonymous" + age: int = 0 + +@component +class UserGreeter: + """A simple component that processes a User.""" + + @component.output_types(message=str) + def run(self, user: User) -> Dict[str, str]: + """ + A simple component that processes a User. + + :param user: The User object to process. + :return: A dictionary with a message about the user. + """ + return {"message": f"User {user.name} is {user.age} years old"} + +@component +class ListProcessor: + """A component that processes a list of strings.""" + + @component.output_types(concatenated=str) + def run(self, texts: List[str]) -> Dict[str, str]: + """ + Concatenates a list of strings into a single string. + + :param texts: The list of strings to concatenate. + :return: A dictionary with the concatenated string. + """ + return {"concatenated": ' '.join(texts)} + +@component +class UsersProcessor: + """A component that processes a list of Users.""" + + @component.output_types(summary=str) + def run(self, users: List[User]) -> Dict[str, str]: + """ + Processes a list of users and returns a summary. + + :param users: The list of User objects to process. + :return: A dictionary with a summary of the users. + """ + names = [user.name for user in users] + return {"summary": f"Processing users: {', '.join(names)}"} + +@component +class MixedInputComponent: + """A component that processes both a string and a list of Users.""" + + @component.output_types(result=str) + def run(self, greeting: str, users: List[User]) -> Dict[str, str]: + """ + Greets a list of users with the provided greeting. + + :param greeting: The greeting to use. + :param users: The list of User objects to greet. + :return: A dictionary with the greeting result. + """ + names = [user.name for user in users] + return {"result": f"{greeting}, {', '.join(names)}!"} + +@component +class MultiTypeInputComponent: + """A component that processes a string, a User, and a list of strings.""" + + @component.output_types(summary=str) + def run(self, message: str, user: User, tags: List[str]) -> Dict[str, str]: + """ + Creates a summary using the provided message, user, and tags. + + :param message: A message string. + :param user: A User object. + :param tags: A list of tags. + :return: A dictionary with the summary. + """ + tags_str = ', '.join(tags) + return {"summary": f"{message} by {user.name} (age {user.age}) with tags: {tags_str}"} + +@component +class DictInputComponent: + """A component that processes a dictionary of string keys to integer values.""" + + @component.output_types(total=int) + def run(self, data: Dict[str, int]) -> Dict[str, int]: + """ + Sums the values in the dictionary. + + :param data: A dictionary of integer values. + :return: A dictionary with the total sum. + """ + total = sum(data.values()) + return {"total": total} + +@component +class ProductProcessor: + """A component that processes a Product.""" + + @component.output_types(description=str) + def run(self, product: Product) -> Dict[str, str]: + """ + Creates a description for the product. + + :param product: The Product to process. + :return: A dictionary with the product description. + """ + return { + "description": f"The product {product.name} costs ${product.price:.2f}." + } + +@dataclass +class Address: + """A dataclass representing a physical address. + + Attributes: + street (str): The street address including house/building number. + city (str): The name of the city. + """ + street: str + city: str + +@dataclass +class Person: + name: str + address: Address + +@component +class PersonProcessor: + """A component that processes a Person with nested Address.""" + + @component.output_types(info=str) + def run(self, person: Person) -> Dict[str, str]: + """ + Creates information about the person. + + :param person: The Person to process. + :return: A dictionary with the person's information. + """ + return { + "info": f"{person.name} lives at {person.address.street}, {person.address.city}." + } + +@dataclass +class Profile: + username: str + bio: Optional[str] = None + +@component +class ProfileProcessor: + """A component that processes a Profile with an optional bio.""" + + @component.output_types(output=str) + def run(self, profile: Profile) -> Dict[str, str]: + """ + Creates a profile output. + + :param profile: The Profile to process. + :return: A dictionary with the profile output. + """ + bio = profile.bio or "No bio provided" + return { + "output": f"User {profile.username}: {bio}" + } + +@component +class OptionalDictComponent: + """A component that processes an optional dictionary with Any type values.""" + + @component.output_types(output=str) + def run(self, data: Optional[Dict[str, Any]] = None) -> Dict[str, str]: + """ + Processes an optional dictionary. + + :param data: An optional dictionary with values of any type. + :return: A dictionary with a message about the input data. + """ + if data is None: + return {"output": "No data provided"} + else: + keys = ', '.join(data.keys()) + return {"output": f"Received data with keys: {keys}"} + + +class TestToolPipeline: + ### Basic Pipeline Tests without LLM involved + + def test_from_pipeline_basic(self): + pipeline = Pipeline() + pipeline.add_component("simple", SimpleComponent()) + + # Create a tool from the pipeline + tool = Tool.from_pipeline( + pipeline=pipeline, + name="hello_replying_tool", + description="A hello replying tool" + ) + + # Test that the tool was created correctly + assert tool.name == "hello_replying_tool" + assert tool.description == "A hello replying tool" + assert tool.parameters == { + "type": "object", + "properties": { + "simple.text": { + "type": "string", + "description": "The text to generate." + } + }, + "required": ["simple.text"] + } + + # Test that the tool can be invoked + llm_prepared_input = {"simple.text": "world"} + result = tool.invoke(**llm_prepared_input) + + assert isinstance(result, dict) + assert "simple" in result + assert "reply" in result["simple"] + assert result["simple"]["reply"] == "Hello, world!" + + def test_from_pipeline_basic_with_two_connected_components(self): + pipeline = Pipeline() + + pipeline.add_component("simple", SimpleComponent()) + pipeline.add_component("simple2", SimpleComponent()) + pipeline.connect("simple.reply", "simple2.text") + + # Create a tool from the pipeline + tool = Tool.from_pipeline( + pipeline=pipeline, + name="hello_replying_tool", + description="A hello replying tool" + ) + + # Only simple.text is pipeline input because simple2.text is connected to simple.reply + assert tool.name == "hello_replying_tool" + assert tool.description == "A hello replying tool" + assert tool.parameters == { + "type": "object", + "properties": { + "simple.text": { + "type": "string", + "description": "The text to generate." + } + }, + "required": ["simple.text"] + } + + # Test that the tool can be invoked + llm_prepared_input = {"simple.text": "world"} + result = tool.invoke(**llm_prepared_input) + + assert isinstance(result, dict) + assert "simple2" in result + assert "reply" in result["simple2"] + assert result["simple2"]["reply"] == "Hello, Hello, world!!" + + def test_from_pipeline_with_dataclass_input(self): + pipeline = Pipeline() + pipeline.add_component("user_greeter", UserGreeter()) + + tool = Tool.from_pipeline( + pipeline=pipeline, + name="user_info_tool", + description="A tool that returns user information" + ) + + # Update the assertion to match the actual schema generated by Tool.from_pipeline + assert tool.parameters == { + "type": "object", + "properties": { + "user_greeter.user": { + "type": "object", + "description": "The User object to process.", + "properties": { + "name": { + "type": "string", + "description": "Field 'name' of 'User'." + }, + "age": { + "type": "integer", + "description": "Field 'age' of 'User'." + } + } + } + }, + "required": ["user_greeter.user"] + } + + # Test that the tool can be invoked + llm_prepared_input = {"user_greeter.user": {"name": "Alice", "age": 30}} + result = tool.invoke(**llm_prepared_input) + + assert isinstance(result, dict) + assert "user_greeter" in result + assert "message" in result["user_greeter"] + assert result["user_greeter"]["message"] == "User Alice is 30 years old" + + def test_from_pipeline_with_list_input(self): + pipeline = Pipeline() + pipeline.add_component("list_processor", ListProcessor()) + + # Create a tool from the pipeline + tool = Tool.from_pipeline( + pipeline=pipeline, + name="list_processing_tool", + description="A tool that concatenates a list of strings" + ) + + # Test that the tool was created correctly + assert tool.name == "list_processing_tool" + assert tool.description == "A tool that concatenates a list of strings" + assert tool.parameters == { + "type": "object", + "properties": { + "list_processor.texts": { + "type": "array", + "items": { + "type": "string" + }, + "description": "The list of strings to concatenate." + } + }, + "required": ["list_processor.texts"] + } + + # Test that the tool can be invoked + llm_prepared_input = {"list_processor.texts": ["hello", "world"]} + result = tool.invoke(**llm_prepared_input) + + assert isinstance(result, dict) + assert "list_processor" in result + assert "concatenated" in result["list_processor"] + assert result["list_processor"]["concatenated"] == "hello world" + + def test_from_pipeline_with_list_of_dataclasses(self): + pipeline = Pipeline() + pipeline.add_component("users_processor", UsersProcessor()) + + # Create a tool from the pipeline + tool = Tool.from_pipeline( + pipeline=pipeline, + name="users_processing_tool", + description="A tool that processes multiple users" + ) + + # Test that the tool was created correctly + assert tool.name == "users_processing_tool" + assert tool.description == "A tool that processes multiple users" + assert tool.parameters == { + "type": "object", + "properties": { + "users_processor.users": { + "type": "array", + "description": "The list of User objects to process.", + "items": { + "type": "object", + "properties": { + "name": { + "type": "string", + "description": "Field 'name' of 'User'." + }, + "age": { + "type": "integer", + "description": "Field 'age' of 'User'." + } + } + } + } + }, + "required": ["users_processor.users"] + } + + # Test that the tool can be invoked + users_data = [ + {"name": "Alice", "age": 30}, + {"name": "Bob", "age": 25} + ] + llm_prepared_input = {"users_processor.users": users_data} + result = tool.invoke(**llm_prepared_input) + + assert isinstance(result, dict) + assert "users_processor" in result + assert "summary" in result["users_processor"] + assert result["users_processor"]["summary"] == "Processing users: Alice, Bob" + + def test_from_pipeline_with_mixed_inputs(self): + pipeline = Pipeline() + pipeline.add_component("mixed_input", MixedInputComponent()) + + tool = Tool.from_pipeline( + pipeline=pipeline, + name="greeting_tool", + description="A tool that greets users with a greeting message" + ) + + assert tool.parameters == { + "type": "object", + "properties": { + "mixed_input.greeting": { + "type": "string", + "description": "The greeting to use." + }, + "mixed_input.users": { + "type": "array", + "description": "The list of User objects to greet.", + "items": { + "type": "object", + "properties": { + "name": { + "type": "string", + "description": "Field 'name' of 'User'." + }, + "age": { + "type": "integer", + "description": "Field 'age' of 'User'." + } + } + } + } + }, + "required": ["mixed_input.greeting", "mixed_input.users"] + } + + # Test that the tool can be invoked + llm_prepared_input = { + "mixed_input.greeting": "Hello", + "mixed_input.users": [ + {"name": "Alice", "age": 30}, + {"name": "Bob", "age": 25} + ] + } + result = tool.invoke(**llm_prepared_input) + + assert isinstance(result, dict) + assert "mixed_input" in result + assert "result" in result["mixed_input"] + assert result["mixed_input"]["result"] == "Hello, Alice, Bob!" + + def test_from_pipeline_with_multiple_input_types(self): + pipeline = Pipeline() + pipeline.add_component("multi_type", MultiTypeInputComponent()) + + tool = Tool.from_pipeline( + pipeline=pipeline, + name="summary_tool", + description="A tool that summarizes inputs" + ) + + assert tool.parameters == { + "type": "object", + "properties": { + "multi_type.message": { + "type": "string", + "description": "A message string." + }, + "multi_type.user": { + "type": "object", + "description": "A User object.", + "properties": { + "name": { + "type": "string", + "description": "Field 'name' of 'User'." + }, + "age": { + "type": "integer", + "description": "Field 'age' of 'User'." + } + } + }, + "multi_type.tags": { + "type": "array", + "description": "A list of tags.", + "items": { + "type": "string" + } + } + }, + "required": ["multi_type.message", "multi_type.user", "multi_type.tags"] + } + + # Test that the tool can be invoked + llm_prepared_input = { + "multi_type.message": "This is a test", + "multi_type.user": {"name": "Charlie", "age": 28}, + "multi_type.tags": ["example", "test", "pipeline"] + } + result = tool.invoke(**llm_prepared_input) + + assert isinstance(result, dict) + assert "multi_type" in result + assert "summary" in result["multi_type"] + expected_summary = "This is a test by Charlie (age 28) with tags: example, test, pipeline" + assert result["multi_type"]["summary"] == expected_summary + + def test_from_pipeline_with_dict_input(self): + pipeline = Pipeline() + pipeline.add_component("dict_input", DictInputComponent()) + + tool = Tool.from_pipeline( + pipeline=pipeline, + name="sum_tool", + description="A tool that sums integer values in a dictionary" + ) + + assert tool.parameters == { + "type": "object", + "properties": { + "dict_input.data": { + "type": "object", + "description": "A dictionary of integer values.", + "additionalProperties": { + "description": "A dictionary of integer values.", + "type": "integer" + } + } + }, + "required": ["dict_input.data"] + } + + # Test that the tool can be invoked + llm_prepared_input = { + "dict_input.data": {"a": 1, "b": 2, "c": 3} + } + result = tool.invoke(**llm_prepared_input) + + assert isinstance(result, dict) + assert "dict_input" in result + assert "total" in result["dict_input"] + assert result["dict_input"]["total"] == 6 + + def test_from_pipeline_with_pydantic_model(self): + pipeline = Pipeline() + pipeline.add_component("product_processor", ProductProcessor()) + + tool = Tool.from_pipeline( + pipeline=pipeline, + name="product_description_tool", + description="A tool that generates product descriptions" + ) + + assert tool.parameters == { + "type": "object", + "properties": { + "product_processor.product": { + "type": "object", + "description": "The Product to process.", + "properties": { + "name": { + "type": "string", + "description": "Field 'name' of 'Product'." + }, + "price": { + "type": "number", + "description": "Field 'price' of 'Product'." + } + }, + "required": ["name", "price"] + } + }, + "required": ["product_processor.product"] + } + + # Test that the tool can be invoked + llm_prepared_input = { + "product_processor.product": {"name": "Widget", "price": 19.99} + } + result = tool.invoke(**llm_prepared_input) + + assert isinstance(result, dict) + assert "product_processor" in result + assert "description" in result["product_processor"] + assert result["product_processor"]["description"] == "The product Widget costs $19.99." + + def test_from_pipeline_with_nested_dataclass(self): + + pipeline = Pipeline() + pipeline.add_component("person_processor", PersonProcessor()) + + tool = Tool.from_pipeline( + pipeline=pipeline, + name="person_info_tool", + description="A tool that provides information about a person" + ) + + assert tool.parameters == { + "type": "object", + "properties": { + "person_processor.person": { + "type": "object", + "description": "The Person to process.", + "properties": { + "name": { + "type": "string", + "description": "Field 'name' of 'Person'." + }, + "address": { + "type": "object", + "description": "Field 'address' of 'Person'.", + "properties": { + "street": { + "type": "string", + "description": "Field 'street' of 'Address'." + }, + "city": { + "type": "string", + "description": "Field 'city' of 'Address'." + } + }, + "required": ["street", "city"] + } + }, + "required": ["name", "address"] + } + }, + "required": ["person_processor.person"] + } + + # Test that the tool can be invoked + llm_prepared_input = { + "person_processor.person": { + "name": "Diana", + "address": { + "street": "123 Elm Street", + "city": "Metropolis" + } + } + } + result = tool.invoke(**llm_prepared_input) + + assert isinstance(result, dict) + assert "person_processor" in result + assert "info" in result["person_processor"] + assert result["person_processor"]["info"] == "Diana lives at 123 Elm Street, Metropolis." + + def test_from_pipeline_with_optional_fields(self): + pipeline = Pipeline() + pipeline.add_component("profile_processor", ProfileProcessor()) + + tool = Tool.from_pipeline( + pipeline=pipeline, + name="profile_tool", + description="A tool that processes user profiles" + ) + + # The 'bio' field is optional, so it should not be required + assert tool.parameters == { + "type": "object", + "properties": { + "profile_processor.profile": { + "type": "object", + "description": "The Profile to process.", + "properties": { + "username": { + "type": "string", + "description": "Field 'username' of 'Profile'." + }, + "bio": { + "type": "string", + "description": "Field 'bio' of 'Profile'.", + } + }, + "required": ["username"] + } + }, + "required": ["profile_processor.profile"] + } + + # Test that the tool can be invoked without the optional bio + llm_prepared_input = { + "profile_processor.profile": { + "username": "johndoe" + } + } + result = tool.invoke(**llm_prepared_input) + + assert isinstance(result, dict) + assert "profile_processor" in result + assert "output" in result["profile_processor"] + assert result["profile_processor"]["output"] == "User johndoe: No bio provided" + + # Now invoke with the bio provided + llm_prepared_input["profile_processor.profile"]["bio"] = "Just another developer" + result = tool.invoke(**llm_prepared_input) + + assert result["profile_processor"]["output"] == "User johndoe: Just another developer" + + def test_from_pipeline_with_optional_dict_any_input(self): + """ + Test pipeline with a component that accepts Optional[Dict[str, Any]]. + """ + pipeline = Pipeline() + pipeline.add_component("optional_dict", OptionalDictComponent()) + + tool = Tool.from_pipeline( + pipeline=pipeline, + name="optional_dict_tool", + description="A tool that processes optional dictionary input with Any type values" + ) + + # Assert that the tool's parameters are correctly generated + assert tool.parameters == { + "type": "object", + "properties": { + "optional_dict.data": { + "type": "object", + "description": "An optional dictionary with values of any type.", + "additionalProperties": {} + } + } + # Note: 'required' is not included since the 'data' parameter is optional + } + + # Test invocation without providing 'data' (should use default None) + result = tool.invoke() + assert isinstance(result, dict) + assert "optional_dict" in result + assert result["optional_dict"]["output"] == "No data provided" + + # Test invocation with 'data' provided + llm_prepared_input = { + "optional_dict.data": {"key1": 1, "key2": "value2", "key3": [1, 2, 3]} + } + result = tool.invoke(**llm_prepared_input) + assert isinstance(result, dict) + assert "optional_dict" in result + assert "Received data with keys: key1, key2, key3" == result["optional_dict"]["output"] + + + + + ### Real integration tests with LLMs (requires OPENAI_API_KEY) + ### We'll test the end user experience of using pipelines as tools + + @pytest.mark.integration + @pytest.mark.skipif( + not os.environ.get("OPENAI_API_KEY", None), + reason="Export an env var called OPENAI_API_KEY containing the OpenAI API key to run this test.", + ) + def test_from_pipeline_basic_with_LLM(self): + tool_pipeline = Pipeline() + tool_pipeline.add_component("simple", SimpleComponent()) + + # Create a tool from the pipeline + tool = Tool.from_pipeline( + pipeline=tool_pipeline, + name="hello_replying_tool", + description="A hello replying tool" + ) + + # now the main pipeline that uses the the pipeline above as a tool + main_pipeline = Pipeline() + main_pipeline.add_component("llm", OpenAIChatGenerator(model="gpt-4o", tools=[tool])) + main_pipeline.add_component("tool_invoker", ToolInvoker(tools=[tool], convert_result_to_json_string=True)) + main_pipeline.connect("llm.replies", "tool_invoker.messages") + + # Rather than forcing particular tool usage, we slightly hint at it + messages = [ChatMessage.from_user("Say hello to the world using the tool")] + result = main_pipeline.run(data={"llm": {"messages": messages}}) + + # Check that the tool was used + assert "tool_invoker" in result + assert "tool_messages" in result["tool_invoker"] + assert len(result["tool_invoker"]["tool_messages"]) == 1 + + tool_message: ChatMessage = result["tool_invoker"]["tool_messages"][0] + assert tool_message.role == "tool" + assert tool_message.tool_call_result is not None + + # Check particularities of the tool result coming from the tool pipeline + parsed_result = json.loads(tool_message.tool_call_result.result) + assert "simple" in parsed_result + assert "reply" in parsed_result["simple"] + assert "hello" in parsed_result["simple"]["reply"].lower() + assert "world" in parsed_result["simple"]["reply"].lower() + + + @pytest.mark.integration + @pytest.mark.skipif( + not os.environ.get("OPENAI_API_KEY"), + reason="Set the OPENAI_API_KEY environment variable to run this test.", + ) + def test_from_pipeline_basic_with_two_connected_components_with_LLM(self): + # Create the tool pipeline with two connected components + tool_pipeline = Pipeline() + tool_pipeline.add_component("simple", SimpleComponent()) + tool_pipeline.add_component("simple2", SimpleComponent()) + tool_pipeline.connect("simple.reply", "simple2.text") + + # Create a tool from the pipeline + tool = Tool.from_pipeline( + pipeline=tool_pipeline, + name="hello_replying_tool", + description="A hello replying tool that uses two connected components" + ) + + # Create the main pipeline that uses the tool + main_pipeline = Pipeline() + main_pipeline.add_component("llm", OpenAIChatGenerator(model="gpt-4o", tools=[tool])) + main_pipeline.add_component("tool_invoker", ToolInvoker(tools=[tool], convert_result_to_json_string=True)) + main_pipeline.connect("llm.replies", "tool_invoker.messages") + + # Provide a message to the LLM that hints at using the tool + messages = [ChatMessage.from_user("Say hello to the world twice using the tool")] + result = main_pipeline.run(data={"llm": {"messages": messages}}) + + # Check that the tool was used and produced the expected result + assert "tool_invoker" in result + assert "tool_messages" in result["tool_invoker"] + assert len(result["tool_invoker"]["tool_messages"]) >= 1 + + tool_message: ChatMessage = result["tool_invoker"]["tool_messages"][0] + assert tool_message.role == "tool" + assert tool_message.tool_call_result is not None + + parsed_result = json.loads(tool_message.tool_call_result.result) + assert "simple2" in parsed_result + assert "reply" in parsed_result["simple2"] + reply = parsed_result["simple2"]["reply"].lower() + assert "hello" in reply + assert "world" in reply + + @pytest.mark.integration + @pytest.mark.skipif( + not os.environ.get("OPENAI_API_KEY"), + reason="Set the OPENAI_API_KEY environment variable to run this test.", + ) + def test_from_pipeline_with_dataclass_input_with_LLM(self): + # Create the tool pipeline with UserGreeter component + tool_pipeline = Pipeline() + tool_pipeline.add_component("user_greeter", UserGreeter()) + + # Create a tool from the pipeline + tool = Tool.from_pipeline( + pipeline=tool_pipeline, + name="user_info_tool", + description="A tool that returns user information" + ) + + # Create the main pipeline that uses the tool + main_pipeline = Pipeline() + main_pipeline.add_component("llm", OpenAIChatGenerator(model="gpt-4o", tools=[tool])) + main_pipeline.add_component( + "tool_invoker", ToolInvoker(tools=[tool], convert_result_to_json_string=True) + ) + main_pipeline.connect("llm.replies", "tool_invoker.messages") + + # Provide a message that hints at using the tool + messages = [ + ChatMessage.from_user( + "Use the tool to provide information about a user named Alice who is 30 years old" + ) + ] + result = main_pipeline.run(data={"llm": {"messages": messages}}) + + # Check that the tool was used and produced the expected result + assert "tool_invoker" in result + assert "tool_messages" in result["tool_invoker"] + assert len(result["tool_invoker"]["tool_messages"]) >= 1 + + tool_message: ChatMessage = result["tool_invoker"]["tool_messages"][0] + assert tool_message.role == "tool" + assert tool_message.tool_call_result is not None + + parsed_result = json.loads(tool_message.tool_call_result.result) + assert "user_greeter" in parsed_result + assert "message" in parsed_result["user_greeter"] + message = parsed_result["user_greeter"]["message"] + assert "alice" in message.lower() + assert "30" in message + + @pytest.mark.integration + @pytest.mark.skipif( + not os.environ.get("OPENAI_API_KEY"), + reason="Set the OPENAI_API_KEY environment variable to run this test.", + ) + def test_from_pipeline_with_list_input_with_LLM(self): + # Create the tool pipeline with ListProcessor component + tool_pipeline = Pipeline() + tool_pipeline.add_component("list_processor", ListProcessor()) + + # Create a tool from the pipeline + tool = Tool.from_pipeline( + pipeline=tool_pipeline, + name="list_processing_tool", + description="A tool that concatenates a list of strings" + ) + + # Create the main pipeline that uses the tool + main_pipeline = Pipeline() + main_pipeline.add_component("llm", OpenAIChatGenerator(model="gpt-4o", tools=[tool])) + main_pipeline.add_component( + "tool_invoker", ToolInvoker(tools=[tool], convert_result_to_json_string=True) + ) + main_pipeline.connect("llm.replies", "tool_invoker.messages") + + # Provide a message that hints at using the tool + messages = [ + ChatMessage.from_user( + "Use the tool to concatenate the words 'hello' and 'world'" + ) + ] + result = main_pipeline.run(data={"llm": {"messages": messages}}) + + # Check that the tool was used and produced the expected result + assert "tool_invoker" in result + assert "tool_messages" in result["tool_invoker"] + assert len(result["tool_invoker"]["tool_messages"]) >= 1 + + tool_message: ChatMessage = result["tool_invoker"]["tool_messages"][0] + assert tool_message.role == "tool" + assert tool_message.tool_call_result is not None + + parsed_result = json.loads(tool_message.tool_call_result.result) + assert "list_processor" in parsed_result + assert "concatenated" in parsed_result["list_processor"] + concatenated = parsed_result["list_processor"]["concatenated"].lower() + assert concatenated == "hello world" + + @pytest.mark.integration + @pytest.mark.skipif( + not os.environ.get("OPENAI_API_KEY"), + reason="Set the OPENAI_API_KEY environment variable to run this test.", + ) + def test_from_pipeline_with_list_of_dataclasses_with_LLM(self): + # Create the tool pipeline with UsersProcessor component + tool_pipeline = Pipeline() + tool_pipeline.add_component("users_processor", UsersProcessor()) + + # Create a tool from the pipeline + tool = Tool.from_pipeline( + pipeline=tool_pipeline, + name="users_processing_tool", + description="A tool that processes multiple users" + ) + + # Create the main pipeline that uses the tool + main_pipeline = Pipeline() + main_pipeline.add_component("llm", OpenAIChatGenerator(model="gpt-4o", tools=[tool])) + main_pipeline.add_component( + "tool_invoker", ToolInvoker(tools=[tool], convert_result_to_json_string=True) + ) + main_pipeline.connect("llm.replies", "tool_invoker.messages") + + # Provide a message that hints at using the tool + messages = [ + ChatMessage.from_user( + "Use the tool to process users Alice aged 30 and Bob aged 25" + ) + ] + result = main_pipeline.run(data={"llm": {"messages": messages}}) + + # Check that the tool was used and produced the expected result + assert "tool_invoker" in result + assert "tool_messages" in result["tool_invoker"] + assert len(result["tool_invoker"]["tool_messages"]) >= 1 + + tool_message: ChatMessage = result["tool_invoker"]["tool_messages"][0] + assert tool_message.role == "tool" + assert tool_message.tool_call_result is not None + + parsed_result = json.loads(tool_message.tool_call_result.result) + assert "users_processor" in parsed_result + assert "summary" in parsed_result["users_processor"] + summary = parsed_result["users_processor"]["summary"] + assert "alice" in summary.lower() + assert "bob" in summary.lower() + assert "processing users" in summary.lower() + + @pytest.mark.integration + @pytest.mark.skipif( + not os.environ.get("OPENAI_API_KEY"), + reason="Set the OPENAI_API_KEY environment variable to run this test.", + ) + def test_from_pipeline_with_mixed_inputs_with_LLM(self): + # Create the tool pipeline with MixedInputComponent + tool_pipeline = Pipeline() + tool_pipeline.add_component("mixed_input", MixedInputComponent()) + + # Create a tool from the pipeline + tool = Tool.from_pipeline( + pipeline=tool_pipeline, + name="greeting_tool", + description="A tool that greets users with a greeting message" + ) + + # Create the main pipeline that uses the tool + main_pipeline = Pipeline() + main_pipeline.add_component("llm", OpenAIChatGenerator(model="gpt-4o", tools=[tool])) + main_pipeline.add_component( + "tool_invoker", ToolInvoker(tools=[tool], convert_result_to_json_string=True) + ) + main_pipeline.connect("llm.replies", "tool_invoker.messages") + + # Provide a message that hints at using the tool + messages = [ + ChatMessage.from_user( + "Use the tool to greet users Alice and Bob with 'Hello'" + ) + ] + result = main_pipeline.run(data={"llm": {"messages": messages}}) + + # Check that the tool was used and produced the expected result + assert "tool_invoker" in result + assert "tool_messages" in result["tool_invoker"] + assert len(result["tool_invoker"]["tool_messages"]) >= 1 + + tool_message: ChatMessage = result["tool_invoker"]["tool_messages"][0] + assert tool_message.role == "tool" + assert tool_message.tool_call_result is not None + + parsed_result = json.loads(tool_message.tool_call_result.result) + assert "mixed_input" in parsed_result + assert "result" in parsed_result["mixed_input"] + result_text = parsed_result["mixed_input"]["result"] + assert "hello, alice, bob" in result_text.lower() + + @pytest.mark.integration + @pytest.mark.skipif( + not os.environ.get("OPENAI_API_KEY"), + reason="Set the OPENAI_API_KEY environment variable to run this test.", + ) + def test_from_pipeline_with_multiple_input_types_with_LLM(self): + # Create the tool pipeline with MultiTypeInputComponent + tool_pipeline = Pipeline() + tool_pipeline.add_component("multi_type", MultiTypeInputComponent()) + + # Create a tool from the pipeline + tool = Tool.from_pipeline( + pipeline=tool_pipeline, + name="summary_tool", + description="A tool that summarizes inputs" + ) + + # Create the main pipeline that uses the tool + main_pipeline = Pipeline() + main_pipeline.add_component("llm", OpenAIChatGenerator(model="gpt-4o", tools=[tool])) + main_pipeline.add_component( + "tool_invoker", ToolInvoker(tools=[tool], convert_result_to_json_string=True) + ) + main_pipeline.connect("llm.replies", "tool_invoker.messages") + + # Provide a message that hints at using the tool + messages = [ + ChatMessage.from_user( + "Use the tool to summarize 'This is a test' by user Charlie aged 28 with tags: example, test, pipeline" + ) + ] + result = main_pipeline.run(data={"llm": {"messages": messages}}) + + # Check that the tool was used and produced the expected result + assert "tool_invoker" in result + assert "tool_messages" in result["tool_invoker"] + assert len(result["tool_invoker"]["tool_messages"]) >=1 + + tool_message: ChatMessage = result["tool_invoker"]["tool_messages"][0] + assert tool_message.role == "tool" + assert tool_message.tool_call_result is not None + + parsed_result = json.loads(tool_message.tool_call_result.result) + assert "multi_type" in parsed_result + assert "summary" in parsed_result["multi_type"] + summary = parsed_result["multi_type"]["summary"] + assert "this is a test" in summary.lower() + assert "charlie" in summary.lower() + assert "28" in summary + assert "example, test, pipeline" in summary.lower() + + @pytest.mark.integration + @pytest.mark.skip("There is no support for additionalProperties OpenAI schema option") + def test_from_pipeline_with_dict_input_with_LLM(self): + # Create the tool pipeline with DictInputComponent + tool_pipeline = Pipeline() + tool_pipeline.add_component("dict_input", DictInputComponent()) + + # Create a tool from the pipeline + tool = Tool.from_pipeline( + pipeline=tool_pipeline, + name="sum_tool", + description="A tool that sums integer values in a dictionary" + ) + + # Create the main pipeline that uses the tool + main_pipeline = Pipeline() + main_pipeline.add_component("llm", OpenAIChatGenerator(model="gpt-4o", tools=[tool])) + main_pipeline.add_component( + "tool_invoker", ToolInvoker(tools=[tool], convert_result_to_json_string=True) + ) + main_pipeline.connect("llm.replies", "tool_invoker.messages") + + # Provide a message that hints at using the tool + messages = [ + ChatMessage.from_user( + "Use the tool to sum the numbers with dictionary keys 'a':1, 'b':2, 'c':3" + ) + ] + result = main_pipeline.run(data={"llm": {"messages": messages}}) + + # Check that the tool was used and produced the expected result + assert "tool_invoker" in result + assert "tool_messages" in result["tool_invoker"] + assert len(result["tool_invoker"]["tool_messages"]) >=1 + + tool_message: ChatMessage = result["tool_invoker"]["tool_messages"][0] + assert tool_message.role == "tool" + assert tool_message.tool_call_result is not None + + parsed_result = json.loads(tool_message.tool_call_result.result) + assert "dict_input" in parsed_result + assert "total" in parsed_result["dict_input"] + assert parsed_result["dict_input"]["total"] == 6 + + @pytest.mark.integration + @pytest.mark.skipif( + not os.environ.get("OPENAI_API_KEY"), + reason="Set the OPENAI_API_KEY environment variable to run this test.", + ) + def test_from_pipeline_with_pydantic_model_with_LLM(self): + # Create the tool pipeline with ProductProcessor component + tool_pipeline = Pipeline() + tool_pipeline.add_component("product_processor", ProductProcessor()) + + # Create a tool from the pipeline + tool = Tool.from_pipeline( + pipeline=tool_pipeline, + name="product_description_tool", + description="A tool that generates product descriptions" + ) + + # Create the main pipeline that uses the tool + main_pipeline = Pipeline() + main_pipeline.add_component("llm", OpenAIChatGenerator(model="gpt-4o", tools=[tool])) + main_pipeline.add_component( + "tool_invoker", ToolInvoker(tools=[tool], convert_result_to_json_string=True) + ) + main_pipeline.connect("llm.replies", "tool_invoker.messages") + + # Provide a message that hints at using the tool + messages = [ + ChatMessage.from_user( + "Use the tool to generate a description for a product named 'Widget' priced at 19.99" + ) + ] + result = main_pipeline.run(data={"llm": {"messages": messages}}) + + # Check that the tool was used and produced the expected result + assert "tool_invoker" in result + assert "tool_messages" in result["tool_invoker"] + assert len(result["tool_invoker"]["tool_messages"]) >=1 + + tool_message: ChatMessage = result["tool_invoker"]["tool_messages"][0] + assert tool_message.role == "tool" + assert tool_message.tool_call_result is not None + + parsed_result = json.loads(tool_message.tool_call_result.result) + assert "product_processor" in parsed_result + assert "description" in parsed_result["product_processor"] + description = parsed_result["product_processor"]["description"] + assert "widget" in description.lower() + assert "$19.99" in description + + @pytest.mark.integration + @pytest.mark.skipif( + not os.environ.get("OPENAI_API_KEY"), + reason="Set the OPENAI_API_KEY environment variable to run this test.", + ) + def test_from_pipeline_with_nested_dataclass_with_LLM(self): + # Create the tool pipeline with PersonProcessor component + tool_pipeline = Pipeline() + tool_pipeline.add_component("person_processor", PersonProcessor()) + + # Create a tool from the pipeline + tool = Tool.from_pipeline( + pipeline=tool_pipeline, + name="person_info_tool", + description="A tool that provides information about a person" + ) + + # Create the main pipeline that uses the tool + main_pipeline = Pipeline() + main_pipeline.add_component("llm", OpenAIChatGenerator(model="gpt-4o", tools=[tool])) + main_pipeline.add_component( + "tool_invoker", ToolInvoker(tools=[tool], convert_result_to_json_string=True) + ) + main_pipeline.connect("llm.replies", "tool_invoker.messages") + + # Provide a message that hints at using the tool + messages = [ + ChatMessage.from_user( + "Use the tool to provide information about Diana who lives at 123 Elm Street, Metropolis" + ) + ] + result = main_pipeline.run(data={"llm": {"messages": messages}}) + + # Check that the tool was used and produced the expected result + assert "tool_invoker" in result + assert "tool_messages" in result["tool_invoker"] + assert len(result["tool_invoker"]["tool_messages"]) >=1 + + tool_message: ChatMessage = result["tool_invoker"]["tool_messages"][0] + assert tool_message.role == "tool" + assert tool_message.tool_call_result is not None + + parsed_result = json.loads(tool_message.tool_call_result.result) + assert "person_processor" in parsed_result + assert "info" in parsed_result["person_processor"] + info = parsed_result["person_processor"]["info"] + assert "diana" in info.lower() + assert "123 elm street" in info.lower() + assert "metropolis" in info.lower() + + @pytest.mark.integration + @pytest.mark.skipif( + not os.environ.get("OPENAI_API_KEY"), + reason="Set the OPENAI_API_KEY environment variable to run this test.", + ) + def test_from_pipeline_with_optional_fields_with_LLM(self): + # Create the tool pipeline with ProfileProcessor component + tool_pipeline = Pipeline() + tool_pipeline.add_component("profile_processor", ProfileProcessor()) + + # Create a tool from the pipeline + tool = Tool.from_pipeline( + pipeline=tool_pipeline, + name="profile_tool", + description="A tool that processes user profiles" + ) + + # Create the main pipeline that uses the tool + main_pipeline = Pipeline() + main_pipeline.add_component("llm", OpenAIChatGenerator(model="gpt-4o", tools=[tool])) + main_pipeline.add_component( + "tool_invoker", ToolInvoker(tools=[tool], convert_result_to_json_string=True) + ) + main_pipeline.connect("llm.replies", "tool_invoker.messages") + + # Test without optional bio + messages = [ + ChatMessage.from_user( + "Use the tool to process a profile for user 'johndoe' without a bio" + ) + ] + result = main_pipeline.run(data={"llm": {"messages": messages}}) + assert "tool_invoker" in result + assert "tool_messages" in result["tool_invoker"] + + tool_message = result["tool_invoker"]["tool_messages"][0] + parsed_result = json.loads(tool_message.tool_call_result.result) + output = parsed_result["profile_processor"]["output"] + assert "johndoe" in output + assert "no bio provided" in output.lower() + + # Test with optional bio + messages = [ + ChatMessage.from_user( + "Use the tool to process a profile for user 'johndoe' with bio 'Just another developer'" + ) + ] + result = main_pipeline.run(data={"llm": {"messages": messages}}) + tool_message = result["tool_invoker"]["tool_messages"][0] + parsed_result = json.loads(tool_message.tool_call_result.result) + output = parsed_result["profile_processor"]["output"] + assert "johndoe" in output.lower() + assert "just another developer" in output.lower() + + @pytest.mark.integration + @pytest.mark.skipif( + not os.environ.get("OPENAI_API_KEY"), + reason="Set the OPENAI_API_KEY environment variable to run this test.", + ) + def test_from_pipeline_with_optional_dict_any_input_with_LLM(self): + """ + Integration test for pipeline with a component that accepts Optional[Dict[str, Any]], + using an LLM to generate the tool calls. + """ + # Create the tool pipeline + tool_pipeline = Pipeline() + tool_pipeline.add_component("optional_dict", OptionalDictComponent()) + + # Create a tool from the pipeline + tool = Tool.from_pipeline( + pipeline=tool_pipeline, + name="optional_dict_tool", + description="A tool that processes optional dictionary input with Any type values" + ) + + # Create the main pipeline that uses the tool + main_pipeline = Pipeline() + main_pipeline.add_component("llm", OpenAIChatGenerator(model="gpt-4o", tools=[tool])) + main_pipeline.add_component( + "tool_invoker", ToolInvoker(tools=[tool], convert_result_to_json_string=True) + ) + main_pipeline.connect("llm.replies", "tool_invoker.messages") + + # Test without providing data + messages = [ + ChatMessage.from_user( + "Use the tool without providing any data dictionary" + ) + ] + result = main_pipeline.run(data={"llm": {"messages": messages}}) + assert "tool_invoker" in result + assert "tool_messages" in result["tool_invoker"] + + tool_message = result["tool_invoker"]["tool_messages"][0] + parsed_result = json.loads(tool_message.tool_call_result.result) + assert "optional_dict" in parsed_result + assert parsed_result["optional_dict"]["output"] == "No data provided" + + # Test with providing data + messages = [ + ChatMessage.from_user( + "Use the tool with a dictionary data field contains the following key/pairs: name: 'Alice', age: 30, and scores: [85, 92, 78]" + ) + ] + result = main_pipeline.run(data={"llm": {"messages": messages}}) + tool_message = result["tool_invoker"]["tool_messages"][0] + parsed_result = json.loads(tool_message.tool_call_result.result) + output = parsed_result["optional_dict"]["output"] + # TODO: This doesn't work yet, because the LLM returns an empty dictionary as parameter values + # Leaving it here as a reminder to explore this further