Skip to content

Commit

Permalink
feat(openai): be tolerant to exceptions (#8526)
Browse files Browse the repository at this point in the history
* feat: be tolerant to exceptions

if ever an error is raised by the OpenAI API, don't fail the entire processing

* fix: missing import, string separator

* Enhance error handling

* Use batched from more_itertools for compatibility with older Python versions

* Fix batching and add test

---------

Co-authored-by: Silvano Cerza <[email protected]>
  • Loading branch information
nilleb and silvanocerza authored Nov 15, 2024
1 parent f085959 commit c78545d
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 32 deletions.
38 changes: 24 additions & 14 deletions haystack/components/embedders/openai_document_embedder.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,15 @@
import os
from typing import Any, Dict, List, Optional, Tuple

from openai import OpenAI
from more_itertools import batched
from openai import APIError, OpenAI
from tqdm import tqdm

from haystack import Document, component, default_from_dict, default_to_dict
from haystack import Document, component, default_from_dict, default_to_dict, logging
from haystack.utils import Secret, deserialize_secrets_inplace

logger = logging.getLogger(__name__)


@component
class OpenAIDocumentEmbedder:
Expand All @@ -34,7 +37,7 @@ class OpenAIDocumentEmbedder:
```
"""

def __init__(
def __init__( # pylint: disable=too-many-positional-arguments
self,
api_key: Secret = Secret.from_env_var("OPENAI_API_KEY"),
model: str = "text-embedding-ada-002",
Expand Down Expand Up @@ -158,11 +161,11 @@ def from_dict(cls, data: Dict[str, Any]) -> "OpenAIDocumentEmbedder":
deserialize_secrets_inplace(data["init_parameters"], keys=["api_key"])
return default_from_dict(cls, data)

def _prepare_texts_to_embed(self, documents: List[Document]) -> List[str]:
def _prepare_texts_to_embed(self, documents: List[Document]) -> Dict[str, str]:
"""
Prepare the texts to embed by concatenating the Document text with the metadata fields to embed.
"""
texts_to_embed = []
texts_to_embed = {}
for doc in documents:
meta_values_to_embed = [
str(doc.meta[key]) for key in self.meta_fields_to_embed if key in doc.meta and doc.meta[key] is not None
Expand All @@ -174,25 +177,32 @@ def _prepare_texts_to_embed(self, documents: List[Document]) -> List[str]:

# copied from OpenAI embedding_utils (https://github.com/openai/openai-python/blob/main/openai/embeddings_utils.py)
# replace newlines, which can negatively affect performance.
text_to_embed = text_to_embed.replace("\n", " ")
texts_to_embed.append(text_to_embed)
texts_to_embed[doc.id] = text_to_embed.replace("\n", " ")
return texts_to_embed

def _embed_batch(self, texts_to_embed: List[str], batch_size: int) -> Tuple[List[List[float]], Dict[str, Any]]:
def _embed_batch(self, texts_to_embed: Dict[str, str], batch_size: int) -> Tuple[List[List[float]], Dict[str, Any]]:
"""
Embed a list of texts in batches.
"""

all_embeddings = []
meta: Dict[str, Any] = {}
for i in tqdm(
range(0, len(texts_to_embed), batch_size), disable=not self.progress_bar, desc="Calculating embeddings"
for batch in tqdm(
batched(texts_to_embed.items(), batch_size), disable=not self.progress_bar, desc="Calculating embeddings"
):
batch = texts_to_embed[i : i + batch_size]
args: Dict[str, Any] = {"model": self.model, "input": [b[1] for b in batch]}

if self.dimensions is not None:
response = self.client.embeddings.create(model=self.model, dimensions=self.dimensions, input=batch)
else:
response = self.client.embeddings.create(model=self.model, input=batch)
args["dimensions"] = self.dimensions

try:
response = self.client.embeddings.create(**args)
except APIError as exc:
ids = ", ".join(b[0] for b in batch)
msg = "Failed embedding of documents {ids} caused by {exc}"
logger.exception(msg, ids=ids, exc=exc)
continue

embeddings = [el.embedding for el in response.data]
all_embeddings.extend(embeddings)

Expand Down
5 changes: 5 additions & 0 deletions releasenotes/notes/patch-1-34479efe3bea0e4f.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
enhancements:
- |
Change `OpenAIDocumentEmbedder` to keep running if a batch fails embedding.
Now OpenAI returns an error we log that error and keep processing following batches.
52 changes: 34 additions & 18 deletions test/components/embedders/test_openai_document_embedder.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,16 @@
#
# SPDX-License-Identifier: Apache-2.0
import os
import random
from typing import List
from haystack.utils.auth import Secret
from unittest.mock import Mock, patch

import random
import pytest
from openai import APIError

from haystack import Document
from haystack.components.embedders.openai_document_embedder import OpenAIDocumentEmbedder
from haystack.utils.auth import Secret


def mock_openai_response(input: List[str], model: str = "text-embedding-ada-002", **kwargs) -> dict:
Expand Down Expand Up @@ -155,7 +157,8 @@ def test_to_dict_with_custom_init_parameters(self, monkeypatch):

def test_prepare_texts_to_embed_w_metadata(self):
documents = [
Document(content=f"document number {i}:\ncontent", meta={"meta_field": f"meta_value {i}"}) for i in range(5)
Document(id=f"{i}", content=f"document number {i}:\ncontent", meta={"meta_field": f"meta_value {i}"})
for i in range(5)
]

embedder = OpenAIDocumentEmbedder(
Expand All @@ -165,30 +168,30 @@ def test_prepare_texts_to_embed_w_metadata(self):
prepared_texts = embedder._prepare_texts_to_embed(documents)

# note that newline is replaced by space
assert prepared_texts == [
"meta_value 0 | document number 0: content",
"meta_value 1 | document number 1: content",
"meta_value 2 | document number 2: content",
"meta_value 3 | document number 3: content",
"meta_value 4 | document number 4: content",
]
assert prepared_texts == {
"0": "meta_value 0 | document number 0: content",
"1": "meta_value 1 | document number 1: content",
"2": "meta_value 2 | document number 2: content",
"3": "meta_value 3 | document number 3: content",
"4": "meta_value 4 | document number 4: content",
}

def test_prepare_texts_to_embed_w_suffix(self):
documents = [Document(content=f"document number {i}") for i in range(5)]
documents = [Document(id=f"{i}", content=f"document number {i}") for i in range(5)]

embedder = OpenAIDocumentEmbedder(
api_key=Secret.from_token("fake-api-key"), prefix="my_prefix ", suffix=" my_suffix"
)

prepared_texts = embedder._prepare_texts_to_embed(documents)

assert prepared_texts == [
"my_prefix document number 0 my_suffix",
"my_prefix document number 1 my_suffix",
"my_prefix document number 2 my_suffix",
"my_prefix document number 3 my_suffix",
"my_prefix document number 4 my_suffix",
]
assert prepared_texts == {
"0": "my_prefix document number 0 my_suffix",
"1": "my_prefix document number 1 my_suffix",
"2": "my_prefix document number 2 my_suffix",
"3": "my_prefix document number 3 my_suffix",
"4": "my_prefix document number 4 my_suffix",
}

def test_run_wrong_input_format(self):
embedder = OpenAIDocumentEmbedder(api_key=Secret.from_token("fake-api-key"))
Expand All @@ -212,6 +215,19 @@ def test_run_on_empty_list(self):
assert result["documents"] is not None
assert not result["documents"] # empty list

def test_embed_batch_handles_exceptions_gracefully(self, caplog):
embedder = OpenAIDocumentEmbedder(api_key=Secret.from_token("fake_api_key"))
fake_texts_to_embed = {"1": "text1", "2": "text2"}
with patch.object(
embedder.client.embeddings,
"create",
side_effect=APIError(message="Mocked error", request=Mock(), body=None),
):
embedder._embed_batch(texts_to_embed=fake_texts_to_embed, batch_size=2)

assert len(caplog.records) == 1
assert "Failed embedding of documents 1, 2 caused by Mocked error" in caplog.records[0].msg

@pytest.mark.skipif(os.environ.get("OPENAI_API_KEY", "") == "", reason="OPENAI_API_KEY is not set")
@pytest.mark.integration
def test_run(self):
Expand Down

0 comments on commit c78545d

Please sign in to comment.