From c78545dfc0d28e36318dbdab8613dca6bedc2d78 Mon Sep 17 00:00:00 2001 From: Ivo Bellin Salarin Date: Fri, 15 Nov 2024 10:52:44 +0100 Subject: [PATCH] feat(openai): be tolerant to exceptions (#8526) * feat: be tolerant to exceptions if ever an error is raised by the OpenAI API, don't fail the entire processing * fix: missing import, string separator * Enhance error handling * Use batched from more_itertools for compatibility with older Python versions * Fix batching and add test --------- Co-authored-by: Silvano Cerza --- .../embedders/openai_document_embedder.py | 38 +++++++++----- .../notes/patch-1-34479efe3bea0e4f.yaml | 5 ++ .../test_openai_document_embedder.py | 52 ++++++++++++------- 3 files changed, 63 insertions(+), 32 deletions(-) create mode 100644 releasenotes/notes/patch-1-34479efe3bea0e4f.yaml diff --git a/haystack/components/embedders/openai_document_embedder.py b/haystack/components/embedders/openai_document_embedder.py index 61b0cb4df9..fca5e2cfd1 100644 --- a/haystack/components/embedders/openai_document_embedder.py +++ b/haystack/components/embedders/openai_document_embedder.py @@ -5,12 +5,15 @@ import os from typing import Any, Dict, List, Optional, Tuple -from openai import OpenAI +from more_itertools import batched +from openai import APIError, OpenAI from tqdm import tqdm -from haystack import Document, component, default_from_dict, default_to_dict +from haystack import Document, component, default_from_dict, default_to_dict, logging from haystack.utils import Secret, deserialize_secrets_inplace +logger = logging.getLogger(__name__) + @component class OpenAIDocumentEmbedder: @@ -34,7 +37,7 @@ class OpenAIDocumentEmbedder: ``` """ - def __init__( + def __init__( # pylint: disable=too-many-positional-arguments self, api_key: Secret = Secret.from_env_var("OPENAI_API_KEY"), model: str = "text-embedding-ada-002", @@ -158,11 +161,11 @@ def from_dict(cls, data: Dict[str, Any]) -> "OpenAIDocumentEmbedder": deserialize_secrets_inplace(data["init_parameters"], keys=["api_key"]) return default_from_dict(cls, data) - def _prepare_texts_to_embed(self, documents: List[Document]) -> List[str]: + def _prepare_texts_to_embed(self, documents: List[Document]) -> Dict[str, str]: """ Prepare the texts to embed by concatenating the Document text with the metadata fields to embed. """ - texts_to_embed = [] + texts_to_embed = {} for doc in documents: meta_values_to_embed = [ str(doc.meta[key]) for key in self.meta_fields_to_embed if key in doc.meta and doc.meta[key] is not None @@ -174,25 +177,32 @@ def _prepare_texts_to_embed(self, documents: List[Document]) -> List[str]: # copied from OpenAI embedding_utils (https://github.com/openai/openai-python/blob/main/openai/embeddings_utils.py) # replace newlines, which can negatively affect performance. - text_to_embed = text_to_embed.replace("\n", " ") - texts_to_embed.append(text_to_embed) + texts_to_embed[doc.id] = text_to_embed.replace("\n", " ") return texts_to_embed - def _embed_batch(self, texts_to_embed: List[str], batch_size: int) -> Tuple[List[List[float]], Dict[str, Any]]: + def _embed_batch(self, texts_to_embed: Dict[str, str], batch_size: int) -> Tuple[List[List[float]], Dict[str, Any]]: """ Embed a list of texts in batches. """ all_embeddings = [] meta: Dict[str, Any] = {} - for i in tqdm( - range(0, len(texts_to_embed), batch_size), disable=not self.progress_bar, desc="Calculating embeddings" + for batch in tqdm( + batched(texts_to_embed.items(), batch_size), disable=not self.progress_bar, desc="Calculating embeddings" ): - batch = texts_to_embed[i : i + batch_size] + args: Dict[str, Any] = {"model": self.model, "input": [b[1] for b in batch]} + if self.dimensions is not None: - response = self.client.embeddings.create(model=self.model, dimensions=self.dimensions, input=batch) - else: - response = self.client.embeddings.create(model=self.model, input=batch) + args["dimensions"] = self.dimensions + + try: + response = self.client.embeddings.create(**args) + except APIError as exc: + ids = ", ".join(b[0] for b in batch) + msg = "Failed embedding of documents {ids} caused by {exc}" + logger.exception(msg, ids=ids, exc=exc) + continue + embeddings = [el.embedding for el in response.data] all_embeddings.extend(embeddings) diff --git a/releasenotes/notes/patch-1-34479efe3bea0e4f.yaml b/releasenotes/notes/patch-1-34479efe3bea0e4f.yaml new file mode 100644 index 0000000000..7635382a75 --- /dev/null +++ b/releasenotes/notes/patch-1-34479efe3bea0e4f.yaml @@ -0,0 +1,5 @@ +--- +enhancements: + - | + Change `OpenAIDocumentEmbedder` to keep running if a batch fails embedding. + Now OpenAI returns an error we log that error and keep processing following batches. diff --git a/test/components/embedders/test_openai_document_embedder.py b/test/components/embedders/test_openai_document_embedder.py index 89ce62a929..87ed6afbb6 100644 --- a/test/components/embedders/test_openai_document_embedder.py +++ b/test/components/embedders/test_openai_document_embedder.py @@ -2,14 +2,16 @@ # # SPDX-License-Identifier: Apache-2.0 import os +import random from typing import List -from haystack.utils.auth import Secret +from unittest.mock import Mock, patch -import random import pytest +from openai import APIError from haystack import Document from haystack.components.embedders.openai_document_embedder import OpenAIDocumentEmbedder +from haystack.utils.auth import Secret def mock_openai_response(input: List[str], model: str = "text-embedding-ada-002", **kwargs) -> dict: @@ -155,7 +157,8 @@ def test_to_dict_with_custom_init_parameters(self, monkeypatch): def test_prepare_texts_to_embed_w_metadata(self): documents = [ - Document(content=f"document number {i}:\ncontent", meta={"meta_field": f"meta_value {i}"}) for i in range(5) + Document(id=f"{i}", content=f"document number {i}:\ncontent", meta={"meta_field": f"meta_value {i}"}) + for i in range(5) ] embedder = OpenAIDocumentEmbedder( @@ -165,16 +168,16 @@ def test_prepare_texts_to_embed_w_metadata(self): prepared_texts = embedder._prepare_texts_to_embed(documents) # note that newline is replaced by space - assert prepared_texts == [ - "meta_value 0 | document number 0: content", - "meta_value 1 | document number 1: content", - "meta_value 2 | document number 2: content", - "meta_value 3 | document number 3: content", - "meta_value 4 | document number 4: content", - ] + assert prepared_texts == { + "0": "meta_value 0 | document number 0: content", + "1": "meta_value 1 | document number 1: content", + "2": "meta_value 2 | document number 2: content", + "3": "meta_value 3 | document number 3: content", + "4": "meta_value 4 | document number 4: content", + } def test_prepare_texts_to_embed_w_suffix(self): - documents = [Document(content=f"document number {i}") for i in range(5)] + documents = [Document(id=f"{i}", content=f"document number {i}") for i in range(5)] embedder = OpenAIDocumentEmbedder( api_key=Secret.from_token("fake-api-key"), prefix="my_prefix ", suffix=" my_suffix" @@ -182,13 +185,13 @@ def test_prepare_texts_to_embed_w_suffix(self): prepared_texts = embedder._prepare_texts_to_embed(documents) - assert prepared_texts == [ - "my_prefix document number 0 my_suffix", - "my_prefix document number 1 my_suffix", - "my_prefix document number 2 my_suffix", - "my_prefix document number 3 my_suffix", - "my_prefix document number 4 my_suffix", - ] + assert prepared_texts == { + "0": "my_prefix document number 0 my_suffix", + "1": "my_prefix document number 1 my_suffix", + "2": "my_prefix document number 2 my_suffix", + "3": "my_prefix document number 3 my_suffix", + "4": "my_prefix document number 4 my_suffix", + } def test_run_wrong_input_format(self): embedder = OpenAIDocumentEmbedder(api_key=Secret.from_token("fake-api-key")) @@ -212,6 +215,19 @@ def test_run_on_empty_list(self): assert result["documents"] is not None assert not result["documents"] # empty list + def test_embed_batch_handles_exceptions_gracefully(self, caplog): + embedder = OpenAIDocumentEmbedder(api_key=Secret.from_token("fake_api_key")) + fake_texts_to_embed = {"1": "text1", "2": "text2"} + with patch.object( + embedder.client.embeddings, + "create", + side_effect=APIError(message="Mocked error", request=Mock(), body=None), + ): + embedder._embed_batch(texts_to_embed=fake_texts_to_embed, batch_size=2) + + assert len(caplog.records) == 1 + assert "Failed embedding of documents 1, 2 caused by Mocked error" in caplog.records[0].msg + @pytest.mark.skipif(os.environ.get("OPENAI_API_KEY", "") == "", reason="OPENAI_API_KEY is not set") @pytest.mark.integration def test_run(self):