From eb0185592bceb2fc3b916813158f689e5e6f8a7f Mon Sep 17 00:00:00 2001 From: shadeMe Date: Fri, 29 Nov 2024 15:18:18 +0100 Subject: [PATCH 1/5] feat: Port evaluation harness from `haystack-experimental` --- docs/pydoc/config/evaluation_api.yml | 5 +- haystack/evaluation/__init__.py | 27 +- .../{base.py => base_eval_run_result.py} | 0 haystack/evaluation/eval_run_result.py | 2 +- haystack/evaluation/harness/__init__.py | 7 + .../evaluation/harness/evaluation_harness.py | 83 +++ haystack/evaluation/harness/rag/__init__.py | 24 + haystack/evaluation/harness/rag/_telemetry.py | 77 +++ .../harness/rag/evaluation_pipeline.py | 49 ++ haystack/evaluation/harness/rag/harness.py | 415 ++++++++++++ haystack/evaluation/harness/rag/parameters.py | 157 +++++ haystack/evaluation/util/__init__.py | 3 + haystack/evaluation/util/helpers.py | 98 +++ haystack/evaluation/util/pipeline_pair.py | 237 +++++++ .../testing/sample_components/__init__.py | 6 +- .../testing/sample_components/add_value.py | 21 +- haystack/testing/sample_components/double.py | 16 + test/evaluation/harness/__init__.py | 3 + test/evaluation/harness/rag/__init__.py | 3 + test/evaluation/harness/rag/test_harness.py | 603 ++++++++++++++++++ test/evaluation/util/__init__.py | 3 + test/evaluation/util/test_helpers.py | 51 ++ test/evaluation/util/test_pipeline_pair.py | 234 +++++++ 23 files changed, 2117 insertions(+), 7 deletions(-) rename haystack/evaluation/{base.py => base_eval_run_result.py} (100%) create mode 100644 haystack/evaluation/harness/__init__.py create mode 100644 haystack/evaluation/harness/evaluation_harness.py create mode 100644 haystack/evaluation/harness/rag/__init__.py create mode 100644 haystack/evaluation/harness/rag/_telemetry.py create mode 100644 haystack/evaluation/harness/rag/evaluation_pipeline.py create mode 100644 haystack/evaluation/harness/rag/harness.py create mode 100644 haystack/evaluation/harness/rag/parameters.py create mode 100644 haystack/evaluation/util/__init__.py create mode 100644 haystack/evaluation/util/helpers.py create mode 100644 haystack/evaluation/util/pipeline_pair.py create mode 100644 test/evaluation/harness/__init__.py create mode 100644 test/evaluation/harness/rag/__init__.py create mode 100644 test/evaluation/harness/rag/test_harness.py create mode 100644 test/evaluation/util/__init__.py create mode 100644 test/evaluation/util/test_helpers.py create mode 100644 test/evaluation/util/test_pipeline_pair.py diff --git a/docs/pydoc/config/evaluation_api.yml b/docs/pydoc/config/evaluation_api.yml index e445e9a568..c840181c03 100644 --- a/docs/pydoc/config/evaluation_api.yml +++ b/docs/pydoc/config/evaluation_api.yml @@ -3,8 +3,11 @@ loaders: search_path: [../../../haystack/evaluation] modules: [ - "base", + "base_eval_run_result", "eval_run_result", + "harness.evaluation_harness", + "harness.rag.harness", + "harness.rag.parameters", ] ignore_when_discovered: ["__init__"] processors: diff --git a/haystack/evaluation/__init__.py b/haystack/evaluation/__init__.py index 734699a03f..49e4371e9a 100644 --- a/haystack/evaluation/__init__.py +++ b/haystack/evaluation/__init__.py @@ -2,7 +2,30 @@ # # SPDX-License-Identifier: Apache-2.0 -from .base import BaseEvaluationRunResult +from .base_eval_run_result import BaseEvaluationRunResult from .eval_run_result import EvaluationRunResult +from .harness import EvaluationHarness, EvaluationRunOverrides +from .harness.rag import DefaultRAGArchitecture, RAGEvaluationHarness +from .harness.rag.parameters import ( + RAGEvaluationInput, + RAGEvaluationMetric, + RAGEvaluationOutput, + RAGEvaluationOverrides, + RAGExpectedComponent, + RAGExpectedComponentMetadata, +) -__all__ = ["BaseEvaluationRunResult", "EvaluationRunResult"] +__all__ = [ + "BaseEvaluationRunResult", + "EvaluationRunResult", + "EvaluationHarness", + "EvaluationRunOverrides", + "DefaultRAGArchitecture", + "RAGEvaluationHarness", + "RAGExpectedComponent", + "RAGExpectedComponentMetadata", + "RAGEvaluationMetric", + "RAGEvaluationOutput", + "RAGEvaluationOverrides", + "RAGEvaluationInput", +] diff --git a/haystack/evaluation/base.py b/haystack/evaluation/base_eval_run_result.py similarity index 100% rename from haystack/evaluation/base.py rename to haystack/evaluation/base_eval_run_result.py diff --git a/haystack/evaluation/eval_run_result.py b/haystack/evaluation/eval_run_result.py index 1d04419ebe..87cf1583b8 100644 --- a/haystack/evaluation/eval_run_result.py +++ b/haystack/evaluation/eval_run_result.py @@ -9,7 +9,7 @@ from pandas import DataFrame from pandas import concat as pd_concat -from .base import BaseEvaluationRunResult +from .base_eval_run_result import BaseEvaluationRunResult class EvaluationRunResult(BaseEvaluationRunResult): diff --git a/haystack/evaluation/harness/__init__.py b/haystack/evaluation/harness/__init__.py new file mode 100644 index 0000000000..9b1cb444b5 --- /dev/null +++ b/haystack/evaluation/harness/__init__.py @@ -0,0 +1,7 @@ +# SPDX-FileCopyrightText: 2022-present deepset GmbH +# +# SPDX-License-Identifier: Apache-2.0 + +from .evaluation_harness import EvaluationHarness, EvaluationRunOverrides + +_all_ = ["EvaluationHarness", "EvaluationRunOverrides"] diff --git a/haystack/evaluation/harness/evaluation_harness.py b/haystack/evaluation/harness/evaluation_harness.py new file mode 100644 index 0000000000..cf19377a23 --- /dev/null +++ b/haystack/evaluation/harness/evaluation_harness.py @@ -0,0 +1,83 @@ +# SPDX-FileCopyrightText: 2022-present deepset GmbH +# +# SPDX-License-Identifier: Apache-2.0 + +from abc import ABC, abstractmethod +from dataclasses import dataclass +from typing import Any, Dict, Generic, Optional, Type, TypeVar + +from haystack import Pipeline +from haystack.core.serialization import DeserializationCallbacks + + +@dataclass +class EvaluationRunOverrides: + """ + Overrides for an evaluation run. + + Used to override the init parameters of components in either + (or both) the evaluated and evaluation pipelines. Each key is + a component name and its value a dictionary with init parameters + to override. + + :param evaluated_pipeline_overrides: + Overrides for the evaluated pipeline. + :param evaluation_pipeline_overrides: + Overrides for the evaluation pipeline. + """ + + evaluated_pipeline_overrides: Optional[Dict[str, Dict[str, Any]]] = None + evaluation_pipeline_overrides: Optional[Dict[str, Dict[str, Any]]] = None + + +EvalRunInputT = TypeVar("EvalRunInputT") +EvalRunOutputT = TypeVar("EvalRunOutputT") +EvalRunOverridesT = TypeVar("EvalRunOverridesT") + + +class EvaluationHarness(ABC, Generic[EvalRunInputT, EvalRunOverridesT, EvalRunOutputT]): + """ + Executes a pipeline with a given set of parameters, inputs and evaluates its outputs with an evaluation pipeline. + """ + + @staticmethod + def _override_pipeline(pipeline: Pipeline, parameter_overrides: Optional[Dict[str, Any]]) -> Pipeline: + def component_pre_init_callback(name: str, cls: Type, init_params: Dict[str, Any]): # pylint: disable=unused-argument + assert parameter_overrides is not None + overrides = parameter_overrides.get(name) + if overrides: + init_params.update(overrides) + + def validate_overrides(): + if parameter_overrides is None: + return + + pipeline_components = pipeline.inputs(include_components_with_connected_inputs=True).keys() + for component_name in parameter_overrides.keys(): + if component_name not in pipeline_components: + raise ValueError(f"Cannot override non-existent component '{component_name}'") + + callbacks = DeserializationCallbacks(component_pre_init_callback) + if parameter_overrides: + validate_overrides() + serialized_pipeline = pipeline.dumps() + pipeline = Pipeline.loads(serialized_pipeline, callbacks=callbacks) + + return pipeline + + @abstractmethod + def run( + self, inputs: EvalRunInputT, *, overrides: Optional[EvalRunOverridesT] = None, run_name: Optional[str] = None + ) -> EvalRunOutputT: + """ + Launch a evaluation run. + + :param inputs: + Inputs to the evaluated and evaluation pipelines. + :param overrides: + Overrides for the harness. + :param run_name: + A name for the evaluation run. + :returns: + The output of the evaluation pipeline. + """ diff --git a/haystack/evaluation/harness/rag/__init__.py b/haystack/evaluation/harness/rag/__init__.py new file mode 100644 index 0000000000..fb7009f777 --- /dev/null +++ b/haystack/evaluation/harness/rag/__init__.py @@ -0,0 +1,24 @@ +# SPDX-FileCopyrightText: 2022-present deepset GmbH +# +# SPDX-License-Identifier: Apache-2.0 + +from .harness import DefaultRAGArchitecture, RAGEvaluationHarness +from .parameters import ( + RAGEvaluationInput, + RAGEvaluationMetric, + RAGEvaluationOutput, + RAGEvaluationOverrides, + RAGExpectedComponent, + RAGExpectedComponentMetadata, +) + +_all_ = [ + "DefaultRAGArchitecture", + "RAGEvaluationHarness", + "RAGExpectedComponent", + "RAGExpectedComponentMetadata", + "RAGEvaluationMetric", + "RAGEvaluationOutput", + "RAGEvaluationOverrides", + "RAGEvaluationInput", +] diff --git a/haystack/evaluation/harness/rag/_telemetry.py b/haystack/evaluation/harness/rag/_telemetry.py new file mode 100644 index 0000000000..381c52d637 --- /dev/null +++ b/haystack/evaluation/harness/rag/_telemetry.py @@ -0,0 +1,77 @@ +# SPDX-FileCopyrightText: 2022-present deepset GmbH +# +# SPDX-License-Identifier: Apache-2.0 + +from dataclasses import asdict, dataclass, replace +from typing import TYPE_CHECKING, Any, Dict, Optional, Tuple + +from haystack.telemetry._telemetry import send_telemetry + +from .parameters import RAGEvaluationInput, RAGEvaluationMetric, RAGEvaluationOverrides + +if TYPE_CHECKING: + from .harness import DefaultRAGArchitecture, RAGEvaluationHarness + + +@dataclass +class TelemetryPayload: # pylint: disable=too-many-instance-attributes + """ + Represents the telemetry payload for evaluating a RAG model. + + :param eval_metrics: + Active evaluation metrics and per-metric metadata. + :param num_queries: + Number of queries used for evaluation. + :param execution_time_sec: + Execution time in seconds for the evaluation. + :param default_architecture: + Default RAG architecture used for the RAG pipeline. + :param num_gt_answers: + Number of ground truth answers used in evaluation. + :param num_gt_contexts: + Number of ground truth contexts used in evaluation. + :param rag_pipeline_overrides: + Indicates if the RAG pipeline has any overrides. + :param eval_pipeline_overrides: + Indicates if the evaluation pipeline has any overrides. + """ + + eval_metrics: Dict[RAGEvaluationMetric, Optional[Dict[str, Any]]] + num_queries: int + execution_time_sec: float + + default_architecture: Optional["DefaultRAGArchitecture"] = None + num_gt_answers: Optional[int] = None + num_gt_contexts: Optional[int] = None + rag_pipeline_overrides: Optional[bool] = None + eval_pipeline_overrides: Optional[bool] = None + + def serialize(self) -> Dict[str, Any]: + out = asdict(self) + + out["eval_metrics"] = {k.value: v for k, v in self.eval_metrics.items()} + out["default_architecture"] = self.default_architecture.value if self.default_architecture else None + + return out + + +@send_telemetry +def harness_eval_run_complete( + harness: "RAGEvaluationHarness", + inputs: RAGEvaluationInput, + execution_time_sec: float, + overrides: Optional[RAGEvaluationOverrides] = None, +) -> Optional[Tuple[str, Dict[str, Any]]]: + payload = harness._telemetry_payload + + payload = replace( + payload, + num_queries=len(inputs.queries), + execution_time_sec=execution_time_sec, + num_gt_answers=(len(inputs.ground_truth_answers) if inputs.ground_truth_answers else None), + num_gt_contexts=(len(inputs.ground_truth_documents) if inputs.ground_truth_documents else None), + rag_pipeline_overrides=(overrides.rag_pipeline is not None if overrides else None), + eval_pipeline_overrides=(overrides.eval_pipeline is not None if overrides else None), + ) + + return "RAG evaluation harness eval run", payload.serialize() diff --git a/haystack/evaluation/harness/rag/evaluation_pipeline.py b/haystack/evaluation/harness/rag/evaluation_pipeline.py new file mode 100644 index 0000000000..80e4c7e6e9 --- /dev/null +++ b/haystack/evaluation/harness/rag/evaluation_pipeline.py @@ -0,0 +1,49 @@ +# SPDX-FileCopyrightText: 2022-present deepset GmbH +# +# SPDX-License-Identifier: Apache-2.0 + +from functools import partial +from typing import Callable, Dict, Set + +from haystack import Pipeline +from haystack.components.evaluators import ( + ContextRelevanceEvaluator, + DocumentMAPEvaluator, + DocumentMRREvaluator, + DocumentRecallEvaluator, + FaithfulnessEvaluator, + SASEvaluator, +) +from haystack.components.evaluators.document_recall import RecallMode + +from .parameters import RAGEvaluationMetric + + +def default_rag_evaluation_pipeline(metrics: Set[RAGEvaluationMetric]) -> Pipeline: + """ + Builds the default evaluation pipeline for RAG. + + :param metrics: + The set of metrics to include in the pipeline. + :returns: + The evaluation pipeline. + """ + pipeline = Pipeline() + + metric_ctors: Dict[RAGEvaluationMetric, Callable] = { + RAGEvaluationMetric.DOCUMENT_MAP: DocumentMAPEvaluator, + RAGEvaluationMetric.DOCUMENT_MRR: DocumentMRREvaluator, + RAGEvaluationMetric.DOCUMENT_RECALL_SINGLE_HIT: partial(DocumentRecallEvaluator, mode=RecallMode.SINGLE_HIT), + RAGEvaluationMetric.DOCUMENT_RECALL_MULTI_HIT: partial(DocumentRecallEvaluator, mode=RecallMode.MULTI_HIT), + RAGEvaluationMetric.SEMANTIC_ANSWER_SIMILARITY: partial( + SASEvaluator, model="sentence-transformers/all-MiniLM-L6-v2" + ), + RAGEvaluationMetric.FAITHFULNESS: partial(FaithfulnessEvaluator, raise_on_failure=False), + RAGEvaluationMetric.CONTEXT_RELEVANCE: partial(ContextRelevanceEvaluator, raise_on_failure=False), + } + + for metric in metrics: + ctor = metric_ctors[metric] + pipeline.add_component(metric.value, ctor()) + + return pipeline diff --git a/haystack/evaluation/harness/rag/harness.py b/haystack/evaluation/harness/rag/harness.py new file mode 100644 index 0000000000..28407af376 --- /dev/null +++ b/haystack/evaluation/harness/rag/harness.py @@ -0,0 +1,415 @@ +# SPDX-FileCopyrightText: 2022-present deepset GmbH +# +# SPDX-License-Identifier: Apache-2.0 + +import time +from copy import deepcopy +from enum import Enum +from typing import Any, Dict, List, Optional, Set, Union + +from haystack import Pipeline +from haystack.evaluation.eval_run_result import EvaluationRunResult + +from ...util.helpers import aggregate_batched_pipeline_outputs, deaggregate_batched_pipeline_inputs +from ...util.pipeline_pair import PipelinePair +from ..evaluation_harness import EvaluationHarness +from ._telemetry import TelemetryPayload, harness_eval_run_complete +from .evaluation_pipeline import default_rag_evaluation_pipeline +from .parameters import ( + RAGEvaluationInput, + RAGEvaluationMetric, + RAGEvaluationOutput, + RAGEvaluationOverrides, + RAGExpectedComponent, + RAGExpectedComponentMetadata, +) + + +class DefaultRAGArchitecture(Enum): + """ + Represents default RAG pipeline architectures that can be used with the evaluation harness. + """ + + #: A RAG pipeline with: + #: - A query embedder component named 'query_embedder' with a 'text' input. + #: - A document retriever component named 'retriever' with a 'documents' output. + EMBEDDING_RETRIEVAL = "embedding_retrieval" + + #: A RAG pipeline with: + #: - A document retriever component named 'retriever' with a 'query' input and a 'documents' output. + KEYWORD_RETRIEVAL = "keyword_retrieval" + + #: A RAG pipeline with: + #: - A query embedder component named 'query_embedder' with a 'text' input. + #: - A document retriever component named 'retriever' with a 'documents' output. + #: - A response generator component named 'generator' with a 'replies' output. + GENERATION_WITH_EMBEDDING_RETRIEVAL = "generation_with_embedding_retrieval" + + #: A RAG pipeline with: + #: - A document retriever component named 'retriever' with a 'query' input and a 'documents' output. + #: - A response generator component named 'generator' with a 'replies' output. + GENERATION_WITH_KEYWORD_RETRIEVAL = "generation_with_keyword_retrieval" + + @property + def expected_components(self) -> Dict[RAGExpectedComponent, RAGExpectedComponentMetadata]: + """ + Returns the expected components for the architecture. + + :returns: + The expected components. + """ + if self in ( + DefaultRAGArchitecture.EMBEDDING_RETRIEVAL, + DefaultRAGArchitecture.GENERATION_WITH_EMBEDDING_RETRIEVAL, + ): + expected = { + RAGExpectedComponent.QUERY_PROCESSOR: RAGExpectedComponentMetadata( + name="query_embedder", input_mapping={"query": "text"} + ), + RAGExpectedComponent.DOCUMENT_RETRIEVER: RAGExpectedComponentMetadata( + name="retriever", output_mapping={"retrieved_documents": "documents"} + ), + } + elif self in ( + DefaultRAGArchitecture.KEYWORD_RETRIEVAL, + DefaultRAGArchitecture.GENERATION_WITH_KEYWORD_RETRIEVAL, + ): + expected = { + RAGExpectedComponent.QUERY_PROCESSOR: RAGExpectedComponentMetadata( + name="retriever", input_mapping={"query": "query"} + ), + RAGExpectedComponent.DOCUMENT_RETRIEVER: RAGExpectedComponentMetadata( + name="retriever", output_mapping={"retrieved_documents": "documents"} + ), + } + else: + raise NotImplementedError(f"Unexpected default RAG architecture: {self}") + + if self in ( + DefaultRAGArchitecture.GENERATION_WITH_EMBEDDING_RETRIEVAL, + DefaultRAGArchitecture.GENERATION_WITH_KEYWORD_RETRIEVAL, + ): + expected[RAGExpectedComponent.RESPONSE_GENERATOR] = RAGExpectedComponentMetadata( + name="generator", output_mapping={"replies": "replies"} + ) + + return expected + + +class RAGEvaluationHarness(EvaluationHarness[RAGEvaluationInput, RAGEvaluationOverrides, RAGEvaluationOutput]): + """ + Evaluation harness for evaluating RAG pipelines. + """ + + def __init__( + self, + rag_pipeline: Pipeline, + rag_components: Union[DefaultRAGArchitecture, Dict[RAGExpectedComponent, RAGExpectedComponentMetadata]], + metrics: Set[RAGEvaluationMetric], + *, + progress_bar: bool = True, + ): + """ + Create an evaluation harness for evaluating basic RAG pipelines. + + :param rag_pipeline: + The RAG pipeline to evaluate. + :param rag_components: + Either a default RAG architecture or a mapping + of expected components to their metadata. + :param metrics: + The metrics to use during evaluation. + :param progress_bar: + Whether to display a progress bar during evaluation. + """ + super().__init__() + + self._telemetry_payload = TelemetryPayload( + eval_metrics={m: None for m in metrics}, + num_queries=0, + execution_time_sec=0.0, + default_architecture=(rag_components if isinstance(rag_components, DefaultRAGArchitecture) else None), + ) + + if isinstance(rag_components, DefaultRAGArchitecture): + rag_components = rag_components.expected_components + + self._validate_rag_components(rag_pipeline, rag_components, metrics) + + self.rag_pipeline = rag_pipeline + self.rag_components = deepcopy(rag_components) + self.metrics = deepcopy(metrics) + self.evaluation_pipeline = default_rag_evaluation_pipeline(metrics) + self.progress_bar = progress_bar + + def run( # noqa: D102 + self, + inputs: RAGEvaluationInput, + *, + overrides: Optional[RAGEvaluationOverrides] = None, + run_name: Optional[str] = "RAG Evaluation", + ) -> RAGEvaluationOutput: + start_time = time.time() + + rag_inputs = self._prepare_rag_pipeline_inputs(inputs) + eval_inputs = self._prepare_eval_pipeline_additional_inputs(inputs) + pipeline_pair = self._generate_eval_run_pipelines(overrides) + + pipeline_outputs = pipeline_pair.run_first_as_batch(rag_inputs, eval_inputs, progress_bar=self.progress_bar) + rag_outputs, eval_outputs = (pipeline_outputs["first"], pipeline_outputs["second"]) + + result_inputs: Dict[str, List[Any]] = { + "questions": inputs.queries, + "contexts": [ + [doc.content for doc in docs] + for docs in self._lookup_component_output( + RAGExpectedComponent.DOCUMENT_RETRIEVER, rag_outputs, "retrieved_documents" + ) + ], + } + if RAGExpectedComponent.RESPONSE_GENERATOR in self.rag_components: + result_inputs["responses"] = self._lookup_component_output( + RAGExpectedComponent.RESPONSE_GENERATOR, rag_outputs, "replies" + ) + + if inputs.ground_truth_answers is not None: + result_inputs["ground_truth_answers"] = inputs.ground_truth_answers + if inputs.ground_truth_documents is not None: + result_inputs["ground_truth_documents"] = [ + [doc.content for doc in docs] for docs in inputs.ground_truth_documents + ] + + assert run_name is not None + run_results = EvaluationRunResult(run_name, inputs=result_inputs, results=eval_outputs) + + harness_eval_run_complete(self, inputs, time.time() - start_time, overrides) + + return RAGEvaluationOutput( + evaluated_pipeline=pipeline_pair.first.dumps(), + evaluation_pipeline=pipeline_pair.second.dumps(), + inputs=deepcopy(inputs), + results=run_results, + ) + + def _lookup_component_output( + self, component: RAGExpectedComponent, outputs: Dict[str, Dict[str, Any]], output_name: str + ) -> Any: + name = self.rag_components[component].name + mapping = self.rag_components[component].output_mapping + output_name = mapping[output_name] + return outputs[name][output_name] + + def _generate_eval_run_pipelines(self, overrides: Optional[RAGEvaluationOverrides]) -> PipelinePair: + if overrides is None: + rag_overrides = None + eval_overrides = None + else: + rag_overrides = overrides.rag_pipeline + eval_overrides = overrides.eval_pipeline + + if eval_overrides is not None: + for metric in eval_overrides.keys(): + if metric not in self.metrics: + raise ValueError(f"Cannot override parameters of unused evaluation metric '{metric.value}'") + + eval_overrides = {k.value: v for k, v in eval_overrides.items()} # type: ignore + + rag_pipeline = self._override_pipeline(self.rag_pipeline, rag_overrides) + eval_pipeline = self._override_pipeline(self.evaluation_pipeline, eval_overrides) # type: ignore + + included_first_outputs = {self.rag_components[RAGExpectedComponent.DOCUMENT_RETRIEVER].name} + if RAGExpectedComponent.RESPONSE_GENERATOR in self.rag_components: + included_first_outputs.add(self.rag_components[RAGExpectedComponent.RESPONSE_GENERATOR].name) + + return PipelinePair( + first=rag_pipeline, + second=eval_pipeline, + outputs_to_inputs=self._map_rag_eval_pipeline_io(), + map_first_outputs=lambda x: self._aggregate_rag_outputs( # pylint: disable=unnecessary-lambda + x + ), + included_first_outputs=included_first_outputs, + pre_execution_callback_first=lambda: print("Executing RAG pipeline..."), + pre_execution_callback_second=lambda: print("Executing evaluation pipeline..."), + ) + + def _aggregate_rag_outputs(self, outputs: List[Dict[str, Dict[str, Any]]]) -> Dict[str, Dict[str, Any]]: + aggregate = aggregate_batched_pipeline_outputs(outputs) + + if RAGExpectedComponent.RESPONSE_GENERATOR in self.rag_components: + # We only care about the first response from the generator. + generator_name = self.rag_components[RAGExpectedComponent.RESPONSE_GENERATOR].name + replies_output_name = self.rag_components[RAGExpectedComponent.RESPONSE_GENERATOR].output_mapping["replies"] + aggregate[generator_name][replies_output_name] = [ + r[0] for r in aggregate[generator_name][replies_output_name] + ] + + return aggregate + + def _map_rag_eval_pipeline_io(self) -> Dict[str, List[str]]: + # We currently only have metric components in the eval pipeline. + # So, we just map those inputs to the outputs of the rag pipeline. + metric_inputs_to_component_outputs = { + RAGEvaluationMetric.DOCUMENT_MAP: { + "retrieved_documents": (RAGExpectedComponent.DOCUMENT_RETRIEVER, "retrieved_documents") + }, + RAGEvaluationMetric.DOCUMENT_MRR: { + "retrieved_documents": (RAGExpectedComponent.DOCUMENT_RETRIEVER, "retrieved_documents") + }, + RAGEvaluationMetric.DOCUMENT_RECALL_SINGLE_HIT: { + "retrieved_documents": (RAGExpectedComponent.DOCUMENT_RETRIEVER, "retrieved_documents") + }, + RAGEvaluationMetric.DOCUMENT_RECALL_MULTI_HIT: { + "retrieved_documents": (RAGExpectedComponent.DOCUMENT_RETRIEVER, "retrieved_documents") + }, + RAGEvaluationMetric.SEMANTIC_ANSWER_SIMILARITY: { + "predicted_answers": (RAGExpectedComponent.RESPONSE_GENERATOR, "replies") + }, + RAGEvaluationMetric.FAITHFULNESS: { + "contexts": (RAGExpectedComponent.DOCUMENT_RETRIEVER, "retrieved_documents"), + "predicted_answers": (RAGExpectedComponent.RESPONSE_GENERATOR, "replies"), + }, + RAGEvaluationMetric.CONTEXT_RELEVANCE: { + "contexts": (RAGExpectedComponent.DOCUMENT_RETRIEVER, "retrieved_documents") + }, + } + + outputs_to_inputs: Dict[str, List[str]] = {} + for metric in self.metrics: + io = metric_inputs_to_component_outputs[metric] + for metric_input_name, (component, component_output_name) in io.items(): + component_out = ( + f"{self.rag_components[component].name}." + f"{self.rag_components[component].output_mapping[component_output_name]}" + ) + metric_in = f"{metric.value}.{metric_input_name}" + if component_out not in outputs_to_inputs: + outputs_to_inputs[component_out] = [] + outputs_to_inputs[component_out].append(metric_in) + + return outputs_to_inputs + + def _prepare_rag_pipeline_inputs(self, inputs: RAGEvaluationInput) -> List[Dict[str, Dict[str, Any]]]: + query_embedder_name = self.rag_components[RAGExpectedComponent.QUERY_PROCESSOR].name + query_embedder_text_input = self.rag_components[RAGExpectedComponent.QUERY_PROCESSOR].input_mapping["query"] + + if inputs.rag_pipeline_inputs is not None: + # Ensure that the query embedder input is not provided as additional input. + existing = inputs.rag_pipeline_inputs.get(query_embedder_name) + if existing is not None: + existing = existing.get(query_embedder_text_input) # type: ignore + if existing is not None: + raise ValueError( + f"Query embedder input '{query_embedder_text_input}' cannot be provided as additional input." + ) + + # Add the queries as an aggregate input. + rag_inputs = deepcopy(inputs.rag_pipeline_inputs) + if query_embedder_name not in rag_inputs: + rag_inputs[query_embedder_name] = {} + rag_inputs[query_embedder_name][query_embedder_text_input] = deepcopy(inputs.queries) + else: + rag_inputs = {query_embedder_name: {query_embedder_text_input: deepcopy(inputs.queries)}} + + separate_rag_inputs = deaggregate_batched_pipeline_inputs(rag_inputs) + return separate_rag_inputs + + def _prepare_eval_pipeline_additional_inputs(self, inputs: RAGEvaluationInput) -> Dict[str, Dict[str, Any]]: + eval_inputs: Dict[str, Dict[str, List[Any]]] = {} + + for metric in self.metrics: + if metric in ( + RAGEvaluationMetric.DOCUMENT_MAP, + RAGEvaluationMetric.DOCUMENT_MRR, + RAGEvaluationMetric.DOCUMENT_RECALL_SINGLE_HIT, + RAGEvaluationMetric.DOCUMENT_RECALL_MULTI_HIT, + ): + if inputs.ground_truth_documents is None: + raise ValueError(f"Ground truth documents required for metric '{metric.value}'.") + if len(inputs.ground_truth_documents) != len(inputs.queries): + raise ValueError("Length of ground truth documents should match the number of queries.") + + eval_inputs[metric.value] = {"ground_truth_documents": inputs.ground_truth_documents} + elif metric in (RAGEvaluationMetric.FAITHFULNESS, RAGEvaluationMetric.CONTEXT_RELEVANCE): + eval_inputs[metric.value] = {"questions": inputs.queries} + elif metric == RAGEvaluationMetric.SEMANTIC_ANSWER_SIMILARITY: + if inputs.ground_truth_answers is None: + raise ValueError(f"Ground truth answers required for metric '{metric.value}'.") + if len(inputs.ground_truth_answers) != len(inputs.queries): + raise ValueError("Length of ground truth answers should match the number of queries.") + + eval_inputs[metric.value] = {"ground_truth_answers": inputs.ground_truth_answers} + + return eval_inputs + + @staticmethod + def _validate_rag_components( + pipeline: Pipeline, + components: Dict[RAGExpectedComponent, RAGExpectedComponentMetadata], + metrics: Set[RAGEvaluationMetric], + ): + metric_specific_required_components = { + RAGEvaluationMetric.DOCUMENT_MAP: [ + RAGExpectedComponent.QUERY_PROCESSOR, + RAGExpectedComponent.DOCUMENT_RETRIEVER, + ], + RAGEvaluationMetric.DOCUMENT_MRR: [ + RAGExpectedComponent.QUERY_PROCESSOR, + RAGExpectedComponent.DOCUMENT_RETRIEVER, + ], + RAGEvaluationMetric.DOCUMENT_RECALL_SINGLE_HIT: [ + RAGExpectedComponent.QUERY_PROCESSOR, + RAGExpectedComponent.DOCUMENT_RETRIEVER, + ], + RAGEvaluationMetric.DOCUMENT_RECALL_MULTI_HIT: [ + RAGExpectedComponent.QUERY_PROCESSOR, + RAGExpectedComponent.DOCUMENT_RETRIEVER, + ], + RAGEvaluationMetric.SEMANTIC_ANSWER_SIMILARITY: [ + RAGExpectedComponent.QUERY_PROCESSOR, + RAGExpectedComponent.RESPONSE_GENERATOR, + ], + RAGEvaluationMetric.FAITHFULNESS: [ + RAGExpectedComponent.QUERY_PROCESSOR, + RAGExpectedComponent.DOCUMENT_RETRIEVER, + RAGExpectedComponent.RESPONSE_GENERATOR, + ], + RAGEvaluationMetric.CONTEXT_RELEVANCE: [ + RAGExpectedComponent.QUERY_PROCESSOR, + RAGExpectedComponent.DOCUMENT_RETRIEVER, + ], + } + + for m in metrics: + required_components = metric_specific_required_components[m] + if not all(c in components for c in required_components): + raise ValueError( + f"In order to use the metric '{m}', the RAG evaluation harness requires metadata " + f"for the following components: {required_components}" + ) + + pipeline_outputs = pipeline.outputs(include_components_with_connected_outputs=True) + pipeline_inputs = pipeline.inputs(include_components_with_connected_inputs=True) + + for component, metadata in components.items(): + if metadata.name not in pipeline_outputs or metadata.name not in pipeline_inputs: + raise ValueError( + f"Expected '{component.value}' component named '{metadata.name}' not found in pipeline." + ) + + comp_inputs = pipeline_inputs[metadata.name] + comp_outputs = pipeline_outputs[metadata.name] + + for needle in metadata.input_mapping.values(): + if needle not in comp_inputs: + raise ValueError( + f"Required input '{needle}' not found in '{component.value}' " + f"component named '{metadata.name}'." + ) + + for needle in metadata.output_mapping.values(): + if needle not in comp_outputs: + raise ValueError( + f"Required output '{needle}' not found in '{component.value}' " + f"component named '{metadata.name}'." + ) diff --git a/haystack/evaluation/harness/rag/parameters.py b/haystack/evaluation/harness/rag/parameters.py new file mode 100644 index 0000000000..638e4227a5 --- /dev/null +++ b/haystack/evaluation/harness/rag/parameters.py @@ -0,0 +1,157 @@ +# SPDX-FileCopyrightText: 2022-present deepset GmbH +# +# SPDX-License-Identifier: Apache-2.0 + +from dataclasses import dataclass, field +from enum import Enum +from typing import Any, Dict, List, Optional + +from haystack import Document +from haystack.evaluation.eval_run_result import EvaluationRunResult + + +class RAGExpectedComponent(Enum): + """ + Represents the basic components in a RAG pipeline that are, by default, required to be present for evaluation. + + Each of these can be separate components in the pipeline or a single component that performs + multiple tasks. + """ + + #: The component in a RAG pipeline that accepts the user query. + #: Expected inputs: `query` - Name of input that contains the query string. + QUERY_PROCESSOR = "query_processor" + + #: The component in a RAG pipeline that retrieves documents based on the query. + #: Expected outputs: `retrieved_documents` - Name of output containing retrieved documents. + DOCUMENT_RETRIEVER = "document_retriever" + + #: The component in a RAG pipeline that generates responses based on the query and the retrieved documents. + #: Can be optional if the harness is only evaluating retrieval. + #: Expected outputs: `replies` - Name of out containing the LLM responses. Only the first response is used. + RESPONSE_GENERATOR = "response_generator" + + +@dataclass(frozen=True) +class RAGExpectedComponentMetadata: + """ + Metadata for a `RAGExpectedComponent`. + + :param name: + Name of the component in the pipeline. + :param input_mapping: + Mapping of the expected inputs to + corresponding component input names. + :param output_mapping: + Mapping of the expected outputs to + corresponding component output names. + """ + + name: str + input_mapping: Dict[str, str] = field(default_factory=dict) + output_mapping: Dict[str, str] = field(default_factory=dict) + + +class RAGEvaluationMetric(Enum): + """ + Represents the metrics that can be used to evaluate a RAG pipeline. + """ + + #: Document Mean Average Precision. + #: Required RAG components: Query Processor, Document Retriever. + DOCUMENT_MAP = "metric_doc_map" + + #: Document Mean Reciprocal Rank. + #: Required RAG components: Query Processor, Document Retriever. + DOCUMENT_MRR = "metric_doc_mrr" + + #: Document Recall with a single hit. + #: Required RAG components: Query Processor, Document Retriever. + DOCUMENT_RECALL_SINGLE_HIT = "metric_doc_recall_single" + + #: Document Recall with multiple hits. + #: Required RAG components: Query Processor, Document Retriever. + DOCUMENT_RECALL_MULTI_HIT = "metric_doc_recall_multi" + + #: Semantic Answer Similarity. + #: Required RAG components: Query Processor, Response Generator. + SEMANTIC_ANSWER_SIMILARITY = "metric_sas" + + #: Faithfulness. + #: Required RAG components: Query Processor, Document Retriever, Response Generator. + FAITHFULNESS = "metric_faithfulness" + + #: Context Relevance. + #: Required RAG components: Query Processor, Document Retriever. + CONTEXT_RELEVANCE = "metric_context_relevance" + + +@dataclass(frozen=True) +class RAGEvaluationInput: + """ + Input passed to the RAG evaluation harness. + + :param queries: + The queries passed to the RAG pipeline. + :param ground_truth_documents: + The ground truth documents passed to the + evaluation pipeline. Only required for metrics + that require them. Corresponds to the queries. + :param ground_truth_answers: + The ground truth answers passed to the + evaluation pipeline. Only required for metrics + that require them. Corresponds to the queries. + :param rag_pipeline_inputs: + Additional inputs to pass to the RAG pipeline. Each + key is the name of the component and its value a dictionary + with the input name and a list of values, each corresponding + to a query. + """ + + queries: List[str] + ground_truth_documents: Optional[List[List[Document]]] = None + ground_truth_answers: Optional[List[str]] = None + rag_pipeline_inputs: Optional[Dict[str, Dict[str, List[Any]]]] = None + + +@dataclass(frozen=True) +class RAGEvaluationOverrides: + """ + Overrides for a RAG evaluation run. + + Used to override the init parameters of components in + either (or both) the evaluated and evaluation pipelines. + + :param rag_pipeline: + Overrides for the RAG pipeline. Each + key is a component name and its value a dictionary + with init parameters to override. + :param eval_pipeline: + Overrides for the evaluation pipeline. Each + key is a RAG metric and its value a dictionary + with init parameters to override. + """ + + rag_pipeline: Optional[Dict[str, Dict[str, Any]]] = None + eval_pipeline: Optional[Dict[RAGEvaluationMetric, Dict[str, Any]]] = None + + +@dataclass(frozen=True) +class RAGEvaluationOutput: + """ + Represents the output of a RAG evaluation run. + + :param evaluated_pipeline: + Serialized version of the evaluated pipeline, including overrides. + :param evaluation_pipeline: + Serialized version of the evaluation pipeline, including overrides. + :param inputs: + Input passed to the evaluation harness. + :param results: + Results of the evaluation run. + """ + + evaluated_pipeline: str + evaluation_pipeline: str + inputs: RAGEvaluationInput + results: EvaluationRunResult diff --git a/haystack/evaluation/util/__init__.py b/haystack/evaluation/util/__init__.py new file mode 100644 index 0000000000..c1764a6e03 --- /dev/null +++ b/haystack/evaluation/util/__init__.py @@ -0,0 +1,3 @@ +# SPDX-FileCopyrightText: 2022-present deepset GmbH +# +# SPDX-License-Identifier: Apache-2.0 diff --git a/haystack/evaluation/util/helpers.py b/haystack/evaluation/util/helpers.py new file mode 100644 index 0000000000..fe48516b48 --- /dev/null +++ b/haystack/evaluation/util/helpers.py @@ -0,0 +1,98 @@ +# SPDX-FileCopyrightText: 2022-present deepset GmbH +# +# SPDX-License-Identifier: Apache-2.0 + +from copy import deepcopy +from typing import Any, Dict, List + + +def aggregate_batched_pipeline_outputs(outputs: List[Dict[str, Dict[str, Any]]]) -> Dict[str, Dict[str, Any]]: + """ + Combine the outputs of a pipeline that has been executed iteratively over a batch of inputs. + + Performs a transpose operation on the first and the third dimensions of the outputs. + + :param outputs: + A list of outputs from the pipeline, where each output + is a dictionary with the same keys and values with the + same keys. + :returns: + The combined outputs. + """ + # The pipeline is invoked iteratively over a batch of inputs, such + # that each element in the outputs corresponds to a single element in + # the batch input. + if len(outputs) == 0: + return {} + if len(outputs) == 1: + return outputs[0] + + # We'll use the first output as a sentinel to determine + # if the shape of the rest of the outputs are the same. + sentinel = outputs[0] + for output in outputs[1:]: + if output.keys() != sentinel.keys(): + raise ValueError(f"Expected components '{list(sentinel.keys())}' " f"but got '{list(output.keys())}'") + + for component_name, expected in sentinel.items(): + got = output[component_name] + if got.keys() != expected.keys(): + raise ValueError( + f"Expected outputs from component '{component_name}' to have " + f"keys '{list(expected.keys())}' but got '{list(got.keys())}'" + ) + + # The outputs are of the correct/same shape. Now to transpose + # the outermost list with the innermost dictionary. + transposed: Dict[str, Dict[str, Any]] = {} + for k, v in sentinel.items(): + transposed[k] = {k_h: [] for k_h in v.keys()} + + for output in outputs: + for component_name, component_outputs in output.items(): + dest = transposed[component_name] + for output_name, output_value in component_outputs.items(): + dest[output_name].append(output_value) + + return transposed + + +def deaggregate_batched_pipeline_inputs(inputs: Dict[str, Dict[str, List[Any]]]) -> List[Dict[str, Dict[str, Any]]]: + """ + Separate the inputs of a pipeline that has been batched along its innermost dimension. + + Performs a transpose operation on the first and the third dimensions of the inputs. + + :param inputs: + A dictionary of pipeline inputs that maps + component-input pairs to lists of values. + :returns: + The separated inputs. + """ + if len(inputs) == 0: + return [] + + # First component's inputs + sentinel = next(iter(inputs.values())) + # First component's first input's values + sentinel = next(iter(sentinel.values())) # type: ignore + + for component_name, component_inputs in inputs.items(): + for input_name, input_values in component_inputs.items(): + if len(input_values) != len(sentinel): + raise ValueError( + f"Expected input '{component_name}.{input_name}' to have " + f"{len(sentinel)} values but got {len(input_values)}" + ) + + proto = {k: {k_h: None for k_h in v.keys()} for k, v in inputs.items()} + transposed: List[Dict[str, Dict[str, Any]]] = [] + + for i in range(len(sentinel)): + new_dict = deepcopy(proto) + for component_name, component_inputs in inputs.items(): + for input_name, input_values in component_inputs.items(): + new_dict[component_name][input_name] = input_values[i] + transposed.append(new_dict) + + return transposed diff --git a/haystack/evaluation/util/pipeline_pair.py b/haystack/evaluation/util/pipeline_pair.py new file mode 100644 index 0000000000..6aefb99096 --- /dev/null +++ b/haystack/evaluation/util/pipeline_pair.py @@ -0,0 +1,237 @@ +# SPDX-FileCopyrightText: 2022-present deepset GmbH +# +# SPDX-License-Identifier: Apache-2.0 + +from dataclasses import dataclass +from typing import Any, Callable, Dict, List, Optional, Set, Tuple + +from tqdm import tqdm + +from haystack import Pipeline + + +@dataclass(frozen=True) +class PipelinePair: # pylint: disable=too-many-instance-attributes + """ + A pair of pipelines that are linked together and executed sequentially. + + :param first: + The first pipeline in the sequence. + :param second: + The second pipeline in the sequence. + :param outputs_to_inputs: + A mapping of the outputs of the first pipeline to the + inputs of the second pipeline in the following format: + `"name_of_component.name_of_output": "name_of_component.name_of_input`. + A single output can be mapped to multiple inputs. + :param map_first_outputs: + A function that post-processes the outputs of the first + pipeline, which it receives as its (only) argument. + :param included_first_outputs: + Names of components in the first pipeline whose outputs + should be included in the final outputs. + :param included_second_outputs: + Names of components in the second pipeline whose outputs + should be included in the final outputs. + :param pre_execution_callback_first: + A function that is called before the first pipeline is executed. + :param pre_execution_callback_second: + A function that is called before the second pipeline is executed. + """ + + first: Pipeline + second: Pipeline + outputs_to_inputs: Dict[str, List[str]] + map_first_outputs: Optional[Callable] = None + included_first_outputs: Optional[Set[str]] = None + included_second_outputs: Optional[Set[str]] = None + pre_execution_callback_first: Optional[Callable] = None + pre_execution_callback_second: Optional[Callable] = None + + def __post_init__(self): + first_outputs = self.first.outputs(include_components_with_connected_outputs=True) + second_inputs = self.second.inputs(include_components_with_connected_inputs=True) + seen_second_inputs = set() + + # Validate the mapping of outputs from the first pipeline + # to the inputs of the second pipeline. + for first_out, second_ins in self.outputs_to_inputs.items(): + first_comp_name, first_out_name = self._split_input_output_path(first_out) + if first_comp_name not in first_outputs: + raise ValueError(f"Output component '{first_comp_name}' not found in first pipeline.") + if first_out_name not in first_outputs[first_comp_name]: + raise ValueError( + f"Component '{first_comp_name}' in first pipeline does not have expected output '{first_out_name}'." + ) + + for second_in in second_ins: + if second_in in seen_second_inputs: + raise ValueError( + f"Input '{second_in}' in second pipeline is connected to multiple first pipeline outputs." + ) + + second_comp_name, second_input_name = self._split_input_output_path(second_in) + if second_comp_name not in second_inputs: + raise ValueError(f"Input component '{second_comp_name}' not found in second pipeline.") + if second_input_name not in second_inputs[second_comp_name]: + raise ValueError( + f"Component '{second_comp_name}' in second pipeline " + f"does not have expected input '{second_input_name}'." + ) + seen_second_inputs.add(second_in) + + def _validate_second_inputs(self, inputs: Dict[str, Dict[str, Any]]): + # Check if the connected input is also provided explicitly. + second_connected_inputs = [ + self._split_input_output_path(p_h) for p in self.outputs_to_inputs.values() for p_h in p + ] + for component_name, input_name in second_connected_inputs: + provided_input = inputs.get(component_name) + if provided_input is None: + continue + if input_name in provided_input: + raise ValueError( + f"Second pipeline input '{component_name}.{input_name}' cannot " + "be provided both explicitly and by the first pipeline." + ) + + @staticmethod + def _split_input_output_path(path: str) -> Tuple[str, str]: + # Split the input/output path into component name and input/output name. + pos = path.find(".") + if pos == -1: + raise ValueError( + f"Invalid pipeline i/o path specifier '{path}' - Must be " + "in the following format: ." + ) + return path[:pos], path[pos + 1 :] + + def _prepare_required_outputs_for_first_pipeline(self) -> Set[str]: + # To ensure that we have all the outputs from the first + # pipeline that are required by the second pipeline, we + # collect first collect all the keys in the first-to-second + # output-to-input mapping and then add the explicitly included + # first pipeline outputs. + first_components_with_outputs = {self._split_input_output_path(p)[0] for p in self.outputs_to_inputs.keys()} + if self.included_first_outputs is not None: + first_components_with_outputs = first_components_with_outputs.union(self.included_first_outputs) + return first_components_with_outputs + + def _map_first_second_pipeline_io( + self, first_outputs: Dict[str, Dict[str, Any]], second_inputs: Dict[str, Dict[str, Any]] + ) -> Dict[str, Dict[str, Any]]: + # Map the first pipeline outputs to the second pipeline inputs. + for first_output, second_input_candidates in self.outputs_to_inputs.items(): + first_component, first_output = self._split_input_output_path(first_output) + + # Each output from the first pipeline can be mapped to multiple inputs in the second pipeline. + for second_input in second_input_candidates: + second_component, second_input_socket = self._split_input_output_path(second_input) + + second_component_inputs = second_inputs.get(second_component) + if second_component_inputs is not None: + # Pre-condition should've been validated earlier. + assert second_input_socket not in second_component_inputs + # The first pipeline's output should also guaranteed at this point. + second_component_inputs[second_input_socket] = first_outputs[first_component][first_output] + else: + second_inputs[second_component] = { + second_input_socket: first_outputs[first_component][first_output] + } + + return second_inputs + + def run( + self, first_inputs: Dict[str, Dict[str, Any]], second_inputs: Optional[Dict[str, Dict[str, Any]]] = None + ) -> Dict[str, Dict[str, Any]]: + """ + Execute the pipeline pair in sequence. + + Invokes the first pipeline and then the second with the outputs + of the former. This assumes that both pipelines have the same input + modality, i.e., the shapes of the first pipeline's outputs match the + shapes of the second pipeline's inputs. + + :param first_inputs: + The inputs to the first pipeline. + :param second_inputs: + The inputs to the second pipeline. + :returns: + A dictionary with the following keys: + - `first` - The outputs of the first pipeline. + - `second` - The outputs of the second pipeline. + """ + second_inputs = second_inputs or {} + self._validate_second_inputs(second_inputs) + + if self.pre_execution_callback_first is not None: + self.pre_execution_callback_first() + first_outputs = self.first.run( + first_inputs, include_outputs_from=self._prepare_required_outputs_for_first_pipeline() + ) + if self.map_first_outputs is not None: + first_outputs = self.map_first_outputs(first_outputs) + second_inputs = self._map_first_second_pipeline_io(first_outputs, second_inputs) + + if self.pre_execution_callback_second is not None: + self.pre_execution_callback_second() + second_outputs = self.second.run(second_inputs, include_outputs_from=self.included_second_outputs) + + return {"first": first_outputs, "second": second_outputs} + + def run_first_as_batch( + self, + first_inputs: List[Dict[str, Dict[str, Any]]], + second_inputs: Optional[Dict[str, Dict[str, Any]]] = None, + *, + progress_bar: bool = False, + ) -> Dict[str, Dict[str, Any]]: + """ + Execute the pipeline pair in sequence. + + Invokes the first pipeline iteratively over the list of inputs and + passing the cumulative outputs to the second pipeline. This is suitable + when the first pipeline has a single logical input-to-output mapping and the + second pipeline expects multiple logical inputs, e.g: a retrieval + pipeline that accepts a single query and returns a list of documents + and an evaluation pipeline that accepts multiple lists of documents + and multiple lists of ground truth data. + + :param first_inputs: + A batch of inputs to the first pipeline. A mapping + function must be provided to aggregate the outputs. + :param second_inputs: + The inputs to the second pipeline. + :param progress_bar: + Whether to display a progress bar for the execution + of the first pipeline. + :returns: + A dictionary with the following keys: + - `first` - The (aggregate) outputs of the first pipeline. + - `second` - The outputs of the second pipeline. + """ + second_inputs = second_inputs or {} + self._validate_second_inputs(second_inputs) + + first_components_with_outputs = self._prepare_required_outputs_for_first_pipeline() + if self.map_first_outputs is None: + raise ValueError("Mapping function for first pipeline outputs must be provided for batch execution.") + + if self.pre_execution_callback_first is not None: + self.pre_execution_callback_first() + first_outputs: Dict[str, Dict[str, Any]] = self.map_first_outputs( + [ + self.first.run(i, include_outputs_from=first_components_with_outputs) + for i in tqdm(first_inputs, disable=not progress_bar) + ] + ) + if not isinstance(first_outputs, dict): + raise ValueError("Mapping function must return an aggregate dictionary of outputs.") + + second_inputs = self._map_first_second_pipeline_io(first_outputs, second_inputs) + + if self.pre_execution_callback_second is not None: + self.pre_execution_callback_second() + second_outputs = self.second.run(second_inputs, include_outputs_from=self.included_second_outputs) + + return {"first": first_outputs, "second": second_outputs} diff --git a/haystack/testing/sample_components/__init__.py b/haystack/testing/sample_components/__init__.py index 011ca2ddea..c9643d6b14 100644 --- a/haystack/testing/sample_components/__init__.py +++ b/haystack/testing/sample_components/__init__.py @@ -3,9 +3,9 @@ # SPDX-License-Identifier: Apache-2.0 from haystack.testing.sample_components.accumulate import Accumulate -from haystack.testing.sample_components.add_value import AddFixedValue +from haystack.testing.sample_components.add_value import AddFixedValue, AddFixedValueBatch from haystack.testing.sample_components.concatenate import Concatenate -from haystack.testing.sample_components.double import Double +from haystack.testing.sample_components.double import Double, DoubleBatch from haystack.testing.sample_components.fstring import FString from haystack.testing.sample_components.greet import Greet from haystack.testing.sample_components.hello import Hello @@ -26,10 +26,12 @@ "Accumulate", "Threshold", "AddFixedValue", + "AddFixedValueBatch", "Repeat", "Sum", "Greet", "Double", + "DoubleBatch", "StringJoiner", "Hello", "TextSplitter", diff --git a/haystack/testing/sample_components/add_value.py b/haystack/testing/sample_components/add_value.py index cc445a4cf6..b501567641 100644 --- a/haystack/testing/sample_components/add_value.py +++ b/haystack/testing/sample_components/add_value.py @@ -2,7 +2,7 @@ # # SPDX-License-Identifier: Apache-2.0 -from typing import Optional +from typing import List, Optional from haystack.core.component import component @@ -24,3 +24,22 @@ def run(self, value: int, add: Optional[int] = None): if add is None: add = self.add return {"result": value + add} + + +@component +class AddFixedValueBatch: + """ + Adds two values together. + """ + + def __init__(self, add: int = 1): + self.add = add + + @component.output_types(result=List[int]) + def run(self, value: List[int], add: Optional[List[int]] = None): + """ + Adds two values together. + """ + if add is None: + add = [self.add] * len(value) + return {"result": [v + a for v, a in zip(value, add)]} diff --git a/haystack/testing/sample_components/double.py b/haystack/testing/sample_components/double.py index 42286ce141..73460c80ef 100644 --- a/haystack/testing/sample_components/double.py +++ b/haystack/testing/sample_components/double.py @@ -2,6 +2,8 @@ # # SPDX-License-Identifier: Apache-2.0 +from typing import List + from haystack.core.component import component @@ -17,3 +19,17 @@ def run(self, value: int): Doubles the input value. """ return {"value": value * 2} + + +@component +class DoubleBatch: + """ + Doubles the input value. + """ + + @component.output_types(value=List[int]) + def run(self, value: List[int]): + """ + Doubles the input value. + """ + return {"value": [v * 2 for v in value]} diff --git a/test/evaluation/harness/__init__.py b/test/evaluation/harness/__init__.py new file mode 100644 index 0000000000..c1764a6e03 --- /dev/null +++ b/test/evaluation/harness/__init__.py @@ -0,0 +1,3 @@ +# SPDX-FileCopyrightText: 2022-present deepset GmbH +# +# SPDX-License-Identifier: Apache-2.0 diff --git a/test/evaluation/harness/rag/__init__.py b/test/evaluation/harness/rag/__init__.py new file mode 100644 index 0000000000..c1764a6e03 --- /dev/null +++ b/test/evaluation/harness/rag/__init__.py @@ -0,0 +1,3 @@ +# SPDX-FileCopyrightText: 2022-present deepset GmbH +# +# SPDX-License-Identifier: Apache-2.0 diff --git a/test/evaluation/harness/rag/test_harness.py b/test/evaluation/harness/rag/test_harness.py new file mode 100644 index 0000000000..3425eb514d --- /dev/null +++ b/test/evaluation/harness/rag/test_harness.py @@ -0,0 +1,603 @@ +# SPDX-FileCopyrightText: 2022-present deepset GmbH +# +# SPDX-License-Identifier: Apache-2.0 +from typing import Any, Dict, List, Optional, Union +import pytest + +from haystack.evaluation.harness.rag import ( + DefaultRAGArchitecture, + RAGEvaluationHarness, + RAGExpectedComponent, + RAGExpectedComponentMetadata, + RAGEvaluationMetric, + RAGEvaluationOverrides, + RAGEvaluationInput, +) +from haystack import Pipeline, component, Document, default_to_dict, default_from_dict +from haystack.document_stores.in_memory import InMemoryDocumentStore +from haystack.components.embedders import SentenceTransformersTextEmbedder +from haystack.components.builders import PromptBuilder +from haystack.components.evaluators import ( + ContextRelevanceEvaluator, + DocumentMAPEvaluator, + DocumentMRREvaluator, + DocumentRecallEvaluator, + FaithfulnessEvaluator, + SASEvaluator, +) +from haystack.components.evaluators.document_recall import RecallMode +from haystack.components.retrievers.in_memory import InMemoryEmbeddingRetriever, InMemoryBM25Retriever +from haystack.components.generators import OpenAIGenerator +from haystack.utils import Secret + + +@component +class NonConformantComponent: + def __init__(self, inputs, outputs) -> None: + component.set_input_types(self, **inputs) + component.set_output_types(self, **outputs) + + def run(self, **kwargs): + return {} + + +@component +class MockGenerator: + def __init__(self, arg: int) -> None: + self.arg = arg + + def to_dict(self): + return default_to_dict(self, arg=self.arg) + + @classmethod + def from_dict(cls, data): + return default_from_dict(cls, data) + + @component.output_types(replies=List[str]) + def run(self, prompt: str) -> Dict[str, Any]: + return {"replies": ["placeholder"]} + + +@component +class MockKeywordRetriever: + def __init__(self) -> None: + self.counter = 0 + + @component.output_types(documents=List[Document]) + def run(self, query: str) -> Dict[str, Any]: + samples = [ + [Document(content="France")], + [Document(content="9th century"), Document(content="10th century"), Document(content="9th")], + [Document(content="classical"), Document(content="rock music"), Document(content="dubstep")], + [Document(content="11th"), Document(content="the 11th"), Document(content="11th century")], + [Document(content="Denmark"), Document(content="Norway"), Document(content="Iceland")], + [ + Document(content="10th century"), + Document(content="the first half of the 10th century"), + Document(content="10th"), + Document(content="10th"), + ], + ] + + idx = self.counter % len(samples) + self.counter += 1 + + return {"documents": samples[idx]} + + +@component +class MockEvaluator: + def __init__(self, metric: Union[str, RAGEvaluationMetric]) -> None: + if isinstance(metric, str): + metric = RAGEvaluationMetric(metric) + self.metric = metric + + io_map = { + RAGEvaluationMetric.DOCUMENT_MAP: DocumentMAPEvaluator(), + RAGEvaluationMetric.DOCUMENT_MRR: DocumentMRREvaluator(), + RAGEvaluationMetric.DOCUMENT_RECALL_SINGLE_HIT: DocumentRecallEvaluator(mode=RecallMode.SINGLE_HIT), + RAGEvaluationMetric.DOCUMENT_RECALL_MULTI_HIT: DocumentRecallEvaluator(mode=RecallMode.MULTI_HIT), + RAGEvaluationMetric.SEMANTIC_ANSWER_SIMILARITY: SASEvaluator("sentence-transformers/all-MiniLM-L6-v2"), + RAGEvaluationMetric.FAITHFULNESS: FaithfulnessEvaluator(api_key=Secret.from_token("test_key")), + RAGEvaluationMetric.CONTEXT_RELEVANCE: ContextRelevanceEvaluator(api_key=Secret.from_token("test_key")), + } + + self.__haystack_input__ = io_map[metric].__haystack_input__ + self.__haystack_output__ = io_map[metric].__haystack_output__ + + def to_dict(self): + return default_to_dict(self, metric=str(self.metric)) + + @staticmethod + def default_output(metric) -> Dict[str, Any]: + if metric in ( + RAGEvaluationMetric.FAITHFULNESS, + RAGEvaluationMetric.DOCUMENT_MAP, + RAGEvaluationMetric.DOCUMENT_MRR, + RAGEvaluationMetric.DOCUMENT_RECALL_SINGLE_HIT, + RAGEvaluationMetric.DOCUMENT_RECALL_MULTI_HIT, + ): + return {"individual_scores": [1] * 6, "score": 1.0} + else: + return { + "individual_scores": [1] * 6, + "score": 1.0, + "results": [{"statements": ["placeholder"], "statement_scores": [1.0], "score": 1.0}] * 6, + } + + def run(self, **kwargs) -> Dict[str, Any]: + return self.default_output(self.metric) + + +def build_rag_pipeline_with_query_embedder( + embedder_name: str = "text_embedder", + embedder_component: Optional[Any] = None, + generator_name: str = "llm", + generator_component: Optional[Any] = None, +): + document_store = InMemoryDocumentStore() + retriever = InMemoryEmbeddingRetriever(document_store) + + if embedder_component: + text_embedder = embedder_component + else: + text_embedder = SentenceTransformersTextEmbedder(model="sentence-transformers/all-MiniLM-L6-v2") + template = """ + Given the following information, answer the question. + + Context: + {% for document in documents %} + {{ document.content }} + {% endfor %} + + Question: {{question}} + Answer: + """ + + prompt_builder = PromptBuilder(template=template) + + if generator_component: + generator = generator_component + else: + generator = OpenAIGenerator(model="gpt-3.5-turbo", api_key=Secret.from_token("test_key")) + + pipeline = Pipeline() + pipeline.add_component(embedder_name, text_embedder) + pipeline.add_component("retriever", retriever) + pipeline.add_component("prompt_builder", prompt_builder) + pipeline.add_component(generator_name, generator) + pipeline.connect(f"{embedder_name}.embedding", "retriever.query_embedding") + pipeline.connect("retriever", "prompt_builder.documents") + pipeline.connect("prompt_builder", generator_name) + return pipeline + + +def build_rag_pipeline_with_keyword_retriever( + retriever_name: str = "retriever", + retriever_component: Optional[Any] = None, + retriever_output_name: str = "documents", + generator_name: str = "llm", + generator_component: Optional[Any] = None, +): + document_store = InMemoryDocumentStore() + if retriever_component: + retriever = retriever_component + else: + retriever = InMemoryBM25Retriever(document_store) + template = """ + Given the following information, answer the question. + + Context: + {% for document in documents %} + {{ document.content }} + {% endfor %} + + Question: {{question}} + Answer: + """ + + prompt_builder = PromptBuilder(template=template) + if generator_component: + generator = generator_component + else: + generator = OpenAIGenerator(model="gpt-3.5-turbo", api_key=Secret.from_token("test_key")) + + pipeline = Pipeline() + pipeline.add_component(retriever_name, retriever) + pipeline.add_component("prompt_builder", prompt_builder) + pipeline.add_component(generator_name, generator) + pipeline.connect(f"{retriever_name}.{retriever_output_name}", "prompt_builder.documents") + pipeline.connect("prompt_builder", generator_name) + return pipeline + + +@pytest.fixture +def rag_pipeline(): + return build_rag_pipeline_with_query_embedder("text_embedder") + + +@pytest.fixture +def rag_pipeline_with_query_embedder(): + return build_rag_pipeline_with_query_embedder(embedder_name="query_embedder", generator_name="generator") + + +@pytest.fixture +def rag_pipeline_with_keyword_retriever(): + return build_rag_pipeline_with_keyword_retriever(generator_name="generator") + + +class TestRAGEvaluationHarness: + def test_init(self, rag_pipeline): + harness = RAGEvaluationHarness( + rag_pipeline, + rag_components={ + RAGExpectedComponent.QUERY_PROCESSOR: RAGExpectedComponentMetadata( + name="text_embedder", input_mapping={"query": "text"} + ), + RAGExpectedComponent.DOCUMENT_RETRIEVER: RAGExpectedComponentMetadata( + name="retriever", output_mapping={"retrieved_documents": "documents"} + ), + RAGExpectedComponent.RESPONSE_GENERATOR: RAGExpectedComponentMetadata( + name="llm", output_mapping={"replies": "replies"} + ), + }, + metrics={RAGEvaluationMetric.DOCUMENT_MAP}, + ) + + def test_init_invalid_expected_component(self, rag_pipeline): + with pytest.raises(ValueError, match="RAG evaluation harness requires metadata"): + _ = RAGEvaluationHarness(rag_pipeline, rag_components={}, metrics={RAGEvaluationMetric.DOCUMENT_MAP}) + + with pytest.raises(ValueError, match="RAG evaluation harness requires metadata"): + _ = RAGEvaluationHarness( + rag_pipeline, + rag_components={ + RAGExpectedComponent.QUERY_PROCESSOR: RAGExpectedComponentMetadata( + name="text_embedder", input_mapping={"query": "text"} + ) + }, + metrics={RAGEvaluationMetric.DOCUMENT_MAP}, + ) + + def test_init_invalid_missing_components(self, rag_pipeline): + with pytest.raises(ValueError, match="named 'embedder' not found in pipeline"): + _ = RAGEvaluationHarness( + rag_pipeline, + rag_components={ + RAGExpectedComponent.QUERY_PROCESSOR: RAGExpectedComponentMetadata( + name="embedder", input_mapping={"query": "text"} + ), + RAGExpectedComponent.DOCUMENT_RETRIEVER: RAGExpectedComponentMetadata( + name="retriever", output_mapping={"retrieved_documents": "documents"} + ), + RAGExpectedComponent.RESPONSE_GENERATOR: RAGExpectedComponentMetadata( + name="llm", output_mapping={"replies": "replies"} + ), + }, + metrics={RAGEvaluationMetric.DOCUMENT_MAP}, + ) + + def test_init_invalid_missing_inputs(self, rag_pipeline): + with pytest.raises( + ValueError, + match="Required input 'rando_input' not found in 'query_processor' component named 'text_embedder'", + ): + _ = RAGEvaluationHarness( + rag_pipeline, + rag_components={ + RAGExpectedComponent.QUERY_PROCESSOR: RAGExpectedComponentMetadata( + name="text_embedder", input_mapping={"query": "rando_input"} + ), + RAGExpectedComponent.DOCUMENT_RETRIEVER: RAGExpectedComponentMetadata( + name="retriever", output_mapping={"retrieved_documents": "documents"} + ), + RAGExpectedComponent.RESPONSE_GENERATOR: RAGExpectedComponentMetadata( + name="llm", output_mapping={"replies": "replies"} + ), + }, + metrics={RAGEvaluationMetric.DOCUMENT_MAP}, + ) + + def test_init_invalid_missing_outputs(self, rag_pipeline): + with pytest.raises( + ValueError, match="Required output 'rando_output' not found in 'response_generator' component named 'llm'" + ): + _ = RAGEvaluationHarness( + rag_pipeline, + rag_components={ + RAGExpectedComponent.QUERY_PROCESSOR: RAGExpectedComponentMetadata( + name="text_embedder", input_mapping={"query": "text"} + ), + RAGExpectedComponent.DOCUMENT_RETRIEVER: RAGExpectedComponentMetadata( + name="retriever", output_mapping={"retrieved_documents": "documents"} + ), + RAGExpectedComponent.RESPONSE_GENERATOR: RAGExpectedComponentMetadata( + name="llm", output_mapping={"replies": "rando_output"} + ), + }, + metrics={RAGEvaluationMetric.DOCUMENT_MAP}, + ) + + def test_init_defaults(self, rag_pipeline_with_query_embedder, rag_pipeline_with_keyword_retriever): + _ = RAGEvaluationHarness( + rag_pipeline_with_query_embedder, + DefaultRAGArchitecture.GENERATION_WITH_EMBEDDING_RETRIEVAL, + metrics={RAGEvaluationMetric.DOCUMENT_MAP}, + ) + + _ = RAGEvaluationHarness( + rag_pipeline_with_keyword_retriever, + DefaultRAGArchitecture.GENERATION_WITH_KEYWORD_RETRIEVAL, + metrics={RAGEvaluationMetric.DOCUMENT_MAP}, + ) + + _ = RAGEvaluationHarness( + rag_pipeline_with_query_embedder, + DefaultRAGArchitecture.EMBEDDING_RETRIEVAL, + metrics={RAGEvaluationMetric.DOCUMENT_MAP}, + ) + + _ = RAGEvaluationHarness( + rag_pipeline_with_keyword_retriever, + DefaultRAGArchitecture.KEYWORD_RETRIEVAL, + metrics={RAGEvaluationMetric.DOCUMENT_MAP}, + ) + + def test_init_defaults_invalid_missing_inputs(self): + with pytest.raises( + ValueError, match="Required input 'text' not found in 'query_processor' component named 'query_embedder'" + ): + _ = RAGEvaluationHarness( + build_rag_pipeline_with_query_embedder(embedder_name="llm", generator_name="query_embedder"), + DefaultRAGArchitecture.GENERATION_WITH_EMBEDDING_RETRIEVAL, + metrics={RAGEvaluationMetric.DOCUMENT_MAP}, + ) + + with pytest.raises( + ValueError, match="Required input 'query' not found in 'query_processor' component named 'retriever'" + ): + _ = RAGEvaluationHarness( + build_rag_pipeline_with_keyword_retriever(retriever_name="llm", generator_name="retriever"), + DefaultRAGArchitecture.GENERATION_WITH_KEYWORD_RETRIEVAL, + metrics={RAGEvaluationMetric.DOCUMENT_MAP}, + ) + + def test_init_defaults_invalid_missing_outputs(self): + non_conformant_query_embedder_pipeline = build_rag_pipeline_with_query_embedder( + embedder_name="query_embedder", + generator_name="generator", + generator_component=NonConformantComponent({"prompt": str}, {"responses": List[str]}), + ) + non_conformant_keyword_retriever_pipeline = build_rag_pipeline_with_keyword_retriever( + retriever_component=NonConformantComponent({"query": str}, {"docs": List[Document]}), + retriever_output_name="docs", + ) + + with pytest.raises( + ValueError, match="Required output 'replies' not found in 'response_generator' component named 'generator'" + ): + _ = RAGEvaluationHarness( + non_conformant_query_embedder_pipeline, + DefaultRAGArchitecture.GENERATION_WITH_EMBEDDING_RETRIEVAL, + metrics={RAGEvaluationMetric.DOCUMENT_MAP}, + ) + + with pytest.raises( + ValueError, + match="Required output 'documents' not found in 'document_retriever' component named 'retriever'", + ): + _ = RAGEvaluationHarness( + non_conformant_keyword_retriever_pipeline, + DefaultRAGArchitecture.GENERATION_WITH_KEYWORD_RETRIEVAL, + metrics={RAGEvaluationMetric.DOCUMENT_MAP}, + ) + + def test_init_invalid_component_for_metric(self, rag_pipeline_with_query_embedder): + with pytest.raises(ValueError, match="In order to use the metric .* RAG evaluation harness requires metadata"): + _ = RAGEvaluationHarness( + rag_pipeline_with_query_embedder, + DefaultRAGArchitecture.EMBEDDING_RETRIEVAL, + metrics={RAGEvaluationMetric.SEMANTIC_ANSWER_SIMILARITY}, + ) + + def test_run_invalid_ground_truths(self, rag_pipeline_with_query_embedder): + harness_map = RAGEvaluationHarness( + rag_pipeline_with_query_embedder, + DefaultRAGArchitecture.GENERATION_WITH_EMBEDDING_RETRIEVAL, + metrics={RAGEvaluationMetric.DOCUMENT_MAP}, + ) + harness_sas = RAGEvaluationHarness( + rag_pipeline_with_query_embedder, + DefaultRAGArchitecture.GENERATION_WITH_EMBEDDING_RETRIEVAL, + metrics={RAGEvaluationMetric.SEMANTIC_ANSWER_SIMILARITY}, + ) + + input_no_gt_docs = RAGEvaluationInput(queries=["What is the capital of France?"]) + input_mismatching_gt_docs = RAGEvaluationInput( + queries=["What is the capital of France?"], ground_truth_documents=[] + ) + input_no_gt_answers = RAGEvaluationInput( + queries=["What is the capital of France?"], + ground_truth_documents=[[Document(content="Paris is the capital of France.")]], + ) + input_mismatching_gt_answers = RAGEvaluationInput( + queries=["What is the capital of France?"], + ground_truth_documents=[[Document(content="Paris is the capital of France.")]], + ground_truth_answers=[], + ) + + with pytest.raises(ValueError, match="Ground truth documents required"): + _ = harness_map.run(input_no_gt_docs) + + with pytest.raises(ValueError, match="Length of ground truth documents should match the number of queries"): + _ = harness_map.run(input_mismatching_gt_docs) + + with pytest.raises(ValueError, match="Ground truth answers required"): + _ = harness_sas.run(input_no_gt_answers) + + with pytest.raises(ValueError, match="Length of ground truth answers should match the number of queries"): + _ = harness_sas.run(input_mismatching_gt_answers) + + def test_run_invalid_additional_input(self, rag_pipeline_with_query_embedder): + harness = RAGEvaluationHarness( + rag_pipeline_with_query_embedder, + DefaultRAGArchitecture.GENERATION_WITH_EMBEDDING_RETRIEVAL, + metrics={RAGEvaluationMetric.DOCUMENT_MAP}, + ) + + input = RAGEvaluationInput( + queries=["What is the capital of France?"], + ground_truth_documents=[[Document(content="Paris is the capital of France.")]], + rag_pipeline_inputs={"query_embedder": {"text": ["Some other question?"]}}, + ) + + with pytest.raises(ValueError, match="Query embedder input 'text' cannot be provided as additional input"): + _ = harness.run(input) + + def test_run_invalid_override(self, rag_pipeline_with_query_embedder): + harness = RAGEvaluationHarness( + rag_pipeline_with_query_embedder, + DefaultRAGArchitecture.GENERATION_WITH_EMBEDDING_RETRIEVAL, + metrics={RAGEvaluationMetric.DOCUMENT_MAP}, + ) + + input = RAGEvaluationInput( + queries=["What is the capital of France?"], + ground_truth_documents=[[Document(content="Paris is the capital of France.")]], + ) + + with pytest.raises(ValueError, match="Cannot override non-existent component 'rando_component'"): + _ = harness.run( + input, overrides=RAGEvaluationOverrides(rag_pipeline={"rando_component": {"Some": "thing"}}) + ) + + with pytest.raises(ValueError, match="Cannot override parameters of unused evaluation metric"): + _ = harness.run( + input, + overrides=RAGEvaluationOverrides( + eval_pipeline={RAGEvaluationMetric.DOCUMENT_RECALL_MULTI_HIT: {"mode": "single_hit"}} + ), + ) + + def test_run_statistical_metrics(self): + metrics = { + RAGEvaluationMetric.DOCUMENT_MAP, + RAGEvaluationMetric.DOCUMENT_MRR, + RAGEvaluationMetric.DOCUMENT_RECALL_SINGLE_HIT, + RAGEvaluationMetric.DOCUMENT_RECALL_MULTI_HIT, + } + harness = RAGEvaluationHarness( + build_rag_pipeline_with_keyword_retriever( + retriever_component=MockKeywordRetriever(), + generator_component=MockGenerator(arg=0), + generator_name="generator", + ), + DefaultRAGArchitecture.KEYWORD_RETRIEVAL, + metrics=metrics, + ) + + mock_eval_pipeline = Pipeline() + for m in metrics: + mock_eval_pipeline.add_component(m.value, MockEvaluator(metric=m)) + + harness.evaluation_pipeline = mock_eval_pipeline + + inputs = RAGEvaluationInput( + queries=["What is the capital of France?"] * 6, + ground_truth_documents=[ + [Document(content="France")], + [Document(content="9th century"), Document(content="9th")], + [Document(content="classical music"), Document(content="classical")], + [Document(content="11th century"), Document(content="the 11th")], + [Document(content="Denmark, Iceland and Norway")], + [Document(content="10th century"), Document(content="10th")], + ], + ) + + output = harness.run( + inputs, overrides=RAGEvaluationOverrides(rag_pipeline={"generator": {"arg": 100}}), run_name="test_run" + ) + + assert output.inputs == inputs + assert output.results.run_name == "test_run" + assert output.results.results == {m.value: MockEvaluator.default_output(m) for m in metrics} + overriden_pipeline_dict = Pipeline.loads(output.evaluated_pipeline).to_dict() + assert overriden_pipeline_dict["components"]["generator"]["init_parameters"]["arg"] == 100 + + def test_run_model_based_metrics(self, monkeypatch): + monkeypatch.setenv("OPENAI_API_KEY", "test") + + metrics = { + RAGEvaluationMetric.FAITHFULNESS, + RAGEvaluationMetric.CONTEXT_RELEVANCE, + RAGEvaluationMetric.SEMANTIC_ANSWER_SIMILARITY, + } + harness = RAGEvaluationHarness( + build_rag_pipeline_with_keyword_retriever( + retriever_component=MockKeywordRetriever(), + generator_component=MockGenerator(arg=0), + generator_name="generator", + ), + DefaultRAGArchitecture.GENERATION_WITH_KEYWORD_RETRIEVAL, + metrics=metrics, + ) + + mock_eval_pipeline = Pipeline() + for m in metrics: + mock_eval_pipeline.add_component(m.value, MockEvaluator(metric=m)) + + harness.evaluation_pipeline = mock_eval_pipeline + + inputs = RAGEvaluationInput( + queries=["What is the capital of France?"] * 6, + ground_truth_documents=[ + [Document(content="France")], + [Document(content="9th century"), Document(content="9th")], + [Document(content="classical music"), Document(content="classical")], + [Document(content="11th century"), Document(content="the 11th")], + [Document(content="Denmark, Iceland and Norway")], + [Document(content="10th century"), Document(content="10th")], + ], + ground_truth_answers=[ + "Paris is the capital of France.", + "9th century", + "classical music", + "11th century", + "Denmark, Iceland and Norway", + "10th century", + ], + ) + + output = harness.run(inputs, run_name="test_run") + + assert output.inputs == inputs + assert output.results.run_name == "test_run" + assert output.results.inputs == { + "questions": ["What is the capital of France?"] * 6, + "contexts": [ + ["France"], + ["9th century", "10th century", "9th"], + ["classical", "rock music", "dubstep"], + ["11th", "the 11th", "11th century"], + ["Denmark", "Norway", "Iceland"], + ["10th century", "the first half of the 10th century", "10th", "10th"], + ], + "responses": ["placeholder", "placeholder", "placeholder", "placeholder", "placeholder", "placeholder"], + "ground_truth_documents": [ + ["France"], + ["9th century", "9th"], + ["classical music", "classical"], + ["11th century", "the 11th"], + ["Denmark, Iceland and Norway"], + ["10th century", "10th"], + ], + "ground_truth_answers": [ + "Paris is the capital of France.", + "9th century", + "classical music", + "11th century", + "Denmark, Iceland and Norway", + "10th century", + ], + } + assert output.results.results == {m.value: MockEvaluator.default_output(m) for m in metrics} diff --git a/test/evaluation/util/__init__.py b/test/evaluation/util/__init__.py new file mode 100644 index 0000000000..c1764a6e03 --- /dev/null +++ b/test/evaluation/util/__init__.py @@ -0,0 +1,3 @@ +# SPDX-FileCopyrightText: 2022-present deepset GmbH +# +# SPDX-License-Identifier: Apache-2.0 diff --git a/test/evaluation/util/test_helpers.py b/test/evaluation/util/test_helpers.py new file mode 100644 index 0000000000..13ffdff1cf --- /dev/null +++ b/test/evaluation/util/test_helpers.py @@ -0,0 +1,51 @@ +# SPDX-FileCopyrightText: 2022-present deepset GmbH +# +# SPDX-License-Identifier: Apache-2.0 +import pytest + +from haystack.evaluation.util.helpers import aggregate_batched_pipeline_outputs, deaggregate_batched_pipeline_inputs + + +def test_aggregate_batched_pipeline_outputs_empty(): + assert aggregate_batched_pipeline_outputs([]) == {} + + +def test_aggregate_batched_pipeline_outputs_single(): + assert aggregate_batched_pipeline_outputs([{"a": {"b": [1, 2]}}]) == {"a": {"b": [1, 2]}} + + +def test_aggregate_batched_pipeline_outputs_multiple(): + outputs = [{"a": {"b": [1, 2], "c": [10, 20]}}, {"a": {"b": [3, 4], "c": [30, 40]}}] + assert aggregate_batched_pipeline_outputs(outputs) == {"a": {"b": [[1, 2], [3, 4]], "c": [[10, 20], [30, 40]]}} + + +def test_aggregate_batched_pipeline_outputs_mismatched_components(): + outputs = [{"a": {"b": [1, 2]}}, {"c": {"b": [3, 4]}}] + with pytest.raises(ValueError, match="Expected components .* but got"): + aggregate_batched_pipeline_outputs(outputs) + + +def test_aggregate_batched_pipeline_outputs_mismatched_component_outputs(): + outputs = [{"a": {"b": [1, 2]}}, {"a": {"b": [3, 4], "c": [5, 6]}}] + with pytest.raises(ValueError, match="Expected outputs from component .* to have keys .* but got"): + aggregate_batched_pipeline_outputs(outputs) + + +def test_deaggregate_batched_pipeline_inputs_empty(): + assert deaggregate_batched_pipeline_inputs({}) == [] + + +def test_deaggregate_batched_pipeline_inputs_single(): + inputs = {"a": {"b": [1, 2]}} + assert deaggregate_batched_pipeline_inputs(inputs) == [{"a": {"b": 1}}, {"a": {"b": 2}}] + + +def test_deaggregate_batched_pipeline_inputs_multiple(): + inputs = {"a": {"b": [1, 2], "c": [10, 20]}} + assert deaggregate_batched_pipeline_inputs(inputs) == [{"a": {"b": 1, "c": 10}}, {"a": {"b": 2, "c": 20}}] + + +def test_deaggregate_batched_pipeline_inputs_shape_mismatch(): + inputs = {"a": {"b": [1, 2]}, "c": {"b": [3]}} + with pytest.raises(ValueError, match="Expected input .* to have *. values but got"): + deaggregate_batched_pipeline_inputs(inputs) diff --git a/test/evaluation/util/test_pipeline_pair.py b/test/evaluation/util/test_pipeline_pair.py new file mode 100644 index 0000000000..fe1705908d --- /dev/null +++ b/test/evaluation/util/test_pipeline_pair.py @@ -0,0 +1,234 @@ +import pytest + +from haystack import Pipeline +from haystack.evaluation.util.pipeline_pair import PipelinePair +from haystack.evaluation.util.helpers import aggregate_batched_pipeline_outputs + +from haystack.testing.sample_components import AddFixedValue, Double, AddFixedValueBatch, DoubleBatch + + +@pytest.fixture +def first_pipeline(): + first = Pipeline() + first.add_component("first_addition", AddFixedValue(add=10)) + first.add_component("second_addition", AddFixedValue(add=100)) + first.add_component("double", Double()) + first.connect("first_addition", "double") + first.connect("double", "second_addition") + return first + + +@pytest.fixture +def second_pipeline(): + second = Pipeline() + second.add_component("first_addition", AddFixedValue(add=1)) + second.add_component("second_addition", AddFixedValue(add=2)) + second.add_component("double", Double()) + second.connect("first_addition", "double") + second.connect("double", "second_addition") + return second + + +@pytest.fixture +def second_pipeline_batched(): + second = Pipeline() + second.add_component("first_addition", AddFixedValueBatch(add=1)) + second.add_component("second_addition", AddFixedValueBatch(add=2)) + second.add_component("double", DoubleBatch()) + second.connect("first_addition", "double") + second.connect("double", "second_addition") + return second + + +def test_pipeline_pair_init(first_pipeline, second_pipeline): + pair = PipelinePair( + first=first_pipeline, + second=second_pipeline, + outputs_to_inputs={"first_addition.result": ["first_addition.value"]}, + ) + + +def test_pipeline_pair_invalid_io_specifier(first_pipeline, second_pipeline): + with pytest.raises(ValueError, match="Invalid pipeline i/o path specifier"): + _ = PipelinePair( + first=first_pipeline, second=second_pipeline, outputs_to_inputs={"nonexistent": ["nonexistent"]} + ) + + +def test_pipeline_pair_invalid_first_component(first_pipeline, second_pipeline): + with pytest.raises(ValueError, match="Output component .* not found in first pipeline."): + _ = PipelinePair( + first=first_pipeline, second=second_pipeline, outputs_to_inputs={"nonexistent.out": ["nonexistent.in"]} + ) + + +def test_pipeline_pair_invalid_first_component_output(first_pipeline, second_pipeline): + with pytest.raises(ValueError, match="Component .* in first pipeline does not have expected output"): + _ = PipelinePair( + first=first_pipeline, second=second_pipeline, outputs_to_inputs={"double.out": ["nonexistent.in"]} + ) + + +def test_pipeline_pair_invalid_second_component(first_pipeline, second_pipeline): + with pytest.raises(ValueError, match="Input component .* not found in second pipeline."): + _ = PipelinePair( + first=first_pipeline, + second=second_pipeline, + outputs_to_inputs={"first_addition.result": ["nonexistent.in"]}, + ) + + +def test_pipeline_pair_invalid_second_component_input(first_pipeline, second_pipeline): + with pytest.raises(ValueError, match="Component .* in second pipeline does not have expected input"): + _ = PipelinePair( + first=first_pipeline, + second=second_pipeline, + outputs_to_inputs={"first_addition.result": ["second_addition.some_input"]}, + ) + + +def test_pipeline_pair_invalid_second_multiple_inputs(first_pipeline, second_pipeline): + with pytest.raises( + ValueError, match="Input .* in second pipeline is connected to multiple first pipeline outputs." + ): + _ = PipelinePair( + first=first_pipeline, + second=second_pipeline, + outputs_to_inputs={ + "first_addition.result": ["second_addition.value"], + "second_addition.result": ["second_addition.value"], + }, + ) + + +def test_pipeline_pair_run(first_pipeline, second_pipeline): + pair = PipelinePair( + first=first_pipeline, + second=second_pipeline, + outputs_to_inputs={"first_addition.result": ["first_addition.value"]}, + included_first_outputs={"first_addition"}, + included_second_outputs={"double"}, + ) + + results = pair.run({"first_addition": {"value": 1}}) + assert results == { + "first": {"first_addition": {"result": 11}, "second_addition": {"result": 122}}, + "second": {"double": {"value": 24}, "second_addition": {"result": 26}}, + } + + pair = PipelinePair( + first=first_pipeline, + second=second_pipeline, + outputs_to_inputs={"first_addition.result": ["first_addition.value", "first_addition.add"]}, + included_first_outputs={"first_addition"}, + included_second_outputs={"first_addition", "double"}, + ) + + results = pair.run({"first_addition": {"value": 10}}) + assert results == { + "first": {"first_addition": {"result": 20}, "second_addition": {"result": 140}}, + "second": {"first_addition": {"result": 40}, "double": {"value": 80}, "second_addition": {"result": 82}}, + } + + +def test_pipeline_pair_run_second_extra_inputs(first_pipeline, second_pipeline): + pair = PipelinePair( + first=first_pipeline, + second=second_pipeline, + outputs_to_inputs={"first_addition.result": ["first_addition.value"]}, + included_first_outputs={"first_addition"}, + included_second_outputs={"first_addition", "double"}, + ) + + results = pair.run( + first_inputs={"first_addition": {"value": 1}}, + second_inputs={"first_addition": {"add": 10}, "second_addition": {"add": 100}}, + ) + assert results == { + "first": {"first_addition": {"result": 11}, "second_addition": {"result": 122}}, + "second": {"first_addition": {"result": 21}, "double": {"value": 42}, "second_addition": {"result": 142}}, + } + + +def test_pipeline_pair_run_invalid_second_extra_inputs(first_pipeline, second_pipeline): + pair = PipelinePair( + first=first_pipeline, + second=second_pipeline, + outputs_to_inputs={"first_addition.result": ["first_addition.value"]}, + included_first_outputs={"first_addition"}, + included_second_outputs={"first_addition", "double"}, + ) + + with pytest.raises( + ValueError, match="Second pipeline input .* cannot be provided both explicitly and by the first pipeline" + ): + results = pair.run( + first_inputs={"first_addition": {"value": 1}}, second_inputs={"first_addition": {"value": 10}} + ) + + +def test_pipeline_pair_run_map_first_outputs(first_pipeline, second_pipeline): + pair = PipelinePair( + first=first_pipeline, + second=second_pipeline, + outputs_to_inputs={"first_addition.result": ["first_addition.value"]}, + included_first_outputs={"first_addition"}, + included_second_outputs={"double"}, + map_first_outputs=lambda x: {"first_addition": {"result": 0}, "second_addition": {"result": 0}}, + ) + + results = pair.run({"first_addition": {"value": 1}}) + assert results == { + "first": {"first_addition": {"result": 0}, "second_addition": {"result": 0}}, + "second": {"double": {"value": 2}, "second_addition": {"result": 4}}, + } + + +def test_pipeline_pair_run_first_as_batch(first_pipeline, second_pipeline_batched): + pair = PipelinePair( + first=first_pipeline, + second=second_pipeline_batched, + outputs_to_inputs={"second_addition.result": ["first_addition.value"]}, + included_first_outputs={"first_addition"}, + included_second_outputs={"first_addition", "double"}, + map_first_outputs=lambda x: aggregate_batched_pipeline_outputs(x), + ) + + results = pair.run_first_as_batch([{"first_addition": {"value": i}} for i in range(5)]) + assert results == { + "first": { + "first_addition": {"result": [10, 11, 12, 13, 14]}, + "second_addition": {"result": [120, 122, 124, 126, 128]}, + }, + "second": { + "first_addition": {"result": [121, 123, 125, 127, 129]}, + "double": {"value": [242, 246, 250, 254, 258]}, + "second_addition": {"result": [244, 248, 252, 256, 260]}, + }, + } + + +def test_pipeline_pair_run_first_as_batch_invalid_map_first_outputs(first_pipeline, second_pipeline_batched): + pair = PipelinePair( + first=first_pipeline, + second=second_pipeline_batched, + outputs_to_inputs={"second_addition.result": ["first_addition.value"]}, + included_first_outputs={"first_addition"}, + included_second_outputs={"first_addition", "double"}, + map_first_outputs=None, + ) + + with pytest.raises(ValueError, match="Mapping function for first pipeline outputs must be provided"): + results = pair.run_first_as_batch([{"first_addition": {"value": i}} for i in range(5)]) + + pair = PipelinePair( + first=first_pipeline, + second=second_pipeline_batched, + outputs_to_inputs={"second_addition.result": ["first_addition.value"]}, + included_first_outputs={"first_addition"}, + included_second_outputs={"first_addition", "double"}, + map_first_outputs=lambda x: x, + ) + + with pytest.raises(ValueError, match="Mapping function must return an aggregate dictionary"): + results = pair.run_first_as_batch([{"first_addition": {"value": i}} for i in range(5)]) From fd2aead8daebf5a600194ed176e1d25a2c9d1fc7 Mon Sep 17 00:00:00 2001 From: Daria Fokina Date: Thu, 5 Dec 2024 18:15:15 +0100 Subject: [PATCH 2/5] docstrings evaluation_harness.py --- haystack/evaluation/harness/evaluation_harness.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/haystack/evaluation/harness/evaluation_harness.py b/haystack/evaluation/harness/evaluation_harness.py index cf19377a23..f4618fe106 100644 --- a/haystack/evaluation/harness/evaluation_harness.py +++ b/haystack/evaluation/harness/evaluation_harness.py @@ -15,9 +15,9 @@ class EvaluationRunOverrides: """ Overrides for an evaluation run. - Used to override the init parameters of components in either - (or both) the evaluated and evaluation pipelines. Each key is - a component name and its value a dictionary with init parameters + Use it to override the init parameters of components in either + or both the evaluated and evaluation pipelines. Each key is + a component name, and its value is a dictionary with init parameters to override. :param evaluated_pipeline_overrides: @@ -37,7 +37,8 @@ class EvaluationRunOverrides: class EvaluationHarness(ABC, Generic[EvalRunInputT, EvalRunOverridesT, EvalRunOutputT]): """ - Executes a pipeline with a given set of parameters, inputs and evaluates its outputs with an evaluation pipeline. + Executes a pipeline with a given set of parameters and inputs, + then evaluates its outputs with an evaluation pipeline. """ @staticmethod @@ -70,7 +71,7 @@ def run( self, inputs: EvalRunInputT, *, overrides: Optional[EvalRunOverridesT] = None, run_name: Optional[str] = None ) -> EvalRunOutputT: """ - Launch a evaluation run. + Launch an evaluation run. :param inputs: Inputs to the evaluated and evaluation pipelines. From e39cf8a40b90046656c7b56bd313fb93d288c681 Mon Sep 17 00:00:00 2001 From: Daria Fokina Date: Thu, 5 Dec 2024 18:21:13 +0100 Subject: [PATCH 3/5] docstrings parameters.py --- haystack/evaluation/harness/rag/parameters.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/haystack/evaluation/harness/rag/parameters.py b/haystack/evaluation/harness/rag/parameters.py index 638e4227a5..df0397c1e5 100644 --- a/haystack/evaluation/harness/rag/parameters.py +++ b/haystack/evaluation/harness/rag/parameters.py @@ -14,7 +14,7 @@ class RAGExpectedComponent(Enum): """ Represents the basic components in a RAG pipeline that are, by default, required to be present for evaluation. - Each of these can be separate components in the pipeline or a single component that performs + These can be separate components in a pipeline or a single component that performs multiple tasks. """ @@ -96,14 +96,14 @@ class RAGEvaluationInput: :param ground_truth_documents: The ground truth documents passed to the evaluation pipeline. Only required for metrics - that require them. Corresponds to the queries. + that need the ground truth documents. Corresponds to the queries. :param ground_truth_answers: The ground truth answers passed to the evaluation pipeline. Only required for metrics - that require them. Corresponds to the queries. + that need the ground truth answeres. Corresponds to the queries. :param rag_pipeline_inputs: Additional inputs to pass to the RAG pipeline. Each - key is the name of the component and its value a dictionary + key is the name of the component, and its value is a dictionary with the input name and a list of values, each corresponding to a query. """ @@ -120,15 +120,15 @@ class RAGEvaluationOverrides: Overrides for a RAG evaluation run. Used to override the init parameters of components in - either (or both) the evaluated and evaluation pipelines. + either or both the evaluated and evaluation pipelines. :param rag_pipeline: Overrides for the RAG pipeline. Each - key is a component name and its value a dictionary + key is a component name, and its value is a dictionary with init parameters to override. :param eval_pipeline: Overrides for the evaluation pipeline. Each - key is a RAG metric and its value a dictionary + key is a RAG metric, and its value is a dictionary with init parameters to override. """ @@ -142,9 +142,9 @@ class RAGEvaluationOutput: Represents the output of a RAG evaluation run. :param evaluated_pipeline: - Serialized version of the evaluated pipeline, including overrides. + A serialized version of the evaluated pipeline, including overrides. :param evaluation_pipeline: - Serialized version of the evaluation pipeline, including overrides. + A serialized version of the evaluation pipeline, including overrides. :param inputs: Input passed to the evaluation harness. :param results: From 2c1e2f711f1d9b3fa4776a87456c93ca5b1fdf1f Mon Sep 17 00:00:00 2001 From: Daria Fokina Date: Thu, 5 Dec 2024 18:49:16 +0100 Subject: [PATCH 4/5] Update evaluation_harness.py --- haystack/evaluation/harness/evaluation_harness.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/haystack/evaluation/harness/evaluation_harness.py b/haystack/evaluation/harness/evaluation_harness.py index f4618fe106..572ec7fbc8 100644 --- a/haystack/evaluation/harness/evaluation_harness.py +++ b/haystack/evaluation/harness/evaluation_harness.py @@ -37,8 +37,7 @@ class EvaluationRunOverrides: class EvaluationHarness(ABC, Generic[EvalRunInputT, EvalRunOverridesT, EvalRunOutputT]): """ - Executes a pipeline with a given set of parameters and inputs, - then evaluates its outputs with an evaluation pipeline. + Executes a pipeline with specified parameters and inputs, then evaluates its outputs using an evaluation pipeline. """ @staticmethod From 5f784926ce8cc116945b5779356f1d9d54c1c09c Mon Sep 17 00:00:00 2001 From: Julian Risch Date: Fri, 6 Dec 2024 15:01:00 +0100 Subject: [PATCH 5/5] add release note --- releasenotes/notes/add-eval-harness-24698ae7c4470644.yaml | 4 ++++ 1 file changed, 4 insertions(+) create mode 100644 releasenotes/notes/add-eval-harness-24698ae7c4470644.yaml diff --git a/releasenotes/notes/add-eval-harness-24698ae7c4470644.yaml b/releasenotes/notes/add-eval-harness-24698ae7c4470644.yaml new file mode 100644 index 0000000000..9ce83fb9c7 --- /dev/null +++ b/releasenotes/notes/add-eval-harness-24698ae7c4470644.yaml @@ -0,0 +1,4 @@ +--- +features: + - | + Added a new RAGEvaluationHarness for evaluating RAG pipelines and an EvaluationHarness abstraction, which executes a pipeline with a given set of parameters, inputs and evaluates its outputs with an evaluation pipeline.