Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add NvidiaTextEmbedder, NvidiaDocumentEmbedder and co. #537

Merged
merged 1 commit into from
Mar 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion integrations/nvidia/pydoc/config.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
loaders:
- type: haystack_pydoc_tools.loaders.CustomPythonLoader
search_path: [../src]
modules: []
modules:
[
"haystack_integrations.components.embedders.nvidia.document_embedder",
"haystack_integrations.components.embedders.nvidia.text_embedder",
"haystack_integrations.components.embedders.nvidia.models",
]
ignore_when_discovered: ["__init__"]
processors:
- type: filter
Expand Down
1 change: 1 addition & 0 deletions integrations/nvidia/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ unfixable = [
# Don't touch unused imports
"F401",
]
extend-exclude = ["tests", "example"]

[tool.ruff.isort]
known-first-party = ["src"]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from .document_embedder import NvidiaDocumentEmbedder
from .models import NvidiaEmbeddingModel
from .text_embedder import NvidiaTextEmbedder

__all__ = [
"NvidiaDocumentEmbedder",
"NvidiaEmbeddingModel",
"NvidiaTextEmbedder",
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
from dataclasses import asdict, dataclass
from typing import Any, Dict, List, Literal, Union

from haystack_integrations.utils.nvidia import NvidiaCloudFunctionsClient

from .models import NvidiaEmbeddingModel

MAX_INPUT_STRING_LENGTH = 2048
MAX_INPUTS = 50


def get_model_nvcf_id(model: NvidiaEmbeddingModel, client: NvidiaCloudFunctionsClient) -> str:
"""
Returns the Nvidia Cloud Functions UUID for the given model.
"""

available_functions = client.available_functions()
func = available_functions.get(str(model))
if func is None:
msg = f"Model '{model}' was not found on the Nvidia Cloud Functions backend"
raise ValueError(msg)
elif func.status != "ACTIVE":
msg = f"Model '{model}' is not currently active/usable on the Nvidia Cloud Functions backend"
raise ValueError(msg)

return func.id


@dataclass
class EmbeddingsRequest:
input: Union[str, List[str]]
model: Literal["query", "passage"]
encoding_format: Literal["float", "base64"] = "float"

def __post_init__(self):
if isinstance(self.input, list):
if len(self.input) > MAX_INPUTS:
msg = f"The number of inputs should not exceed {MAX_INPUTS}"
raise ValueError(msg)
else:
self.input = [self.input]

if len(self.input) == 0:
msg = "The number of inputs should not be 0"
raise ValueError(msg)

if any(len(x) > MAX_INPUT_STRING_LENGTH for x in self.input):
msg = f"The length of each input should not exceed {MAX_INPUT_STRING_LENGTH} characters"
raise ValueError(msg)

if self.encoding_format not in ["float", "base64"]:
msg = "encoding_format should be either 'float' or 'base64'"
raise ValueError(msg)

if self.model not in ["query", "passage"]:
msg = "model should be either 'query' or 'passage'"
raise ValueError(msg)

def to_dict(self) -> Dict[str, Any]:
return asdict(self)


@dataclass
class Usage:
prompt_tokens: int
total_tokens: int

def to_dict(self) -> Dict[str, Any]:
return asdict(self)


@dataclass
class Embeddings:
index: int
embedding: Union[List[float], str]


@dataclass
class EmbeddingsResponse:
data: List[Embeddings]
usage: Usage

@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "EmbeddingsResponse":
try:
embeddings = [Embeddings(**x) for x in data["data"]]
usage = Usage(**data["usage"])
return cls(data=embeddings, usage=usage)
except (KeyError, TypeError) as e:
msg = f"Failed to parse EmbeddingsResponse from data: {data}"
raise ValueError(msg) from e
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
from typing import Any, Dict, List, Optional, Tuple, Union

from haystack import Document, component, default_from_dict, default_to_dict
from haystack.utils import Secret, deserialize_secrets_inplace
from haystack_integrations.utils.nvidia import NvidiaCloudFunctionsClient
from tqdm import tqdm

from ._schema import MAX_INPUTS, EmbeddingsRequest, EmbeddingsResponse, Usage, get_model_nvcf_id
from .models import NvidiaEmbeddingModel


@component
class NvidiaDocumentEmbedder:
"""
A component for embedding documents using embedding models provided by
[NVIDIA AI Foundation Endpoints](https://www.nvidia.com/en-us/ai-data-science/foundation-models/).

Usage example:
```python
from haystack_integrations.components.embedders.nvidia import NvidiaDocumentEmbedder, NvidiaEmbeddingModel

doc = Document(content="I love pizza!")

text_embedder = NvidiaDocumentEmbedder(model=NvidiaEmbeddingModel.NVOLVE_40K)
text_embedder.warm_up()

result = document_embedder.run([doc])
print(result["documents"][0].embedding)
```
"""

def __init__(
self,
model: Union[str, NvidiaEmbeddingModel],
api_key: Secret = Secret.from_env_var("NVIDIA_API_KEY"),
prefix: str = "",
suffix: str = "",
batch_size: int = 32,
progress_bar: bool = True,
meta_fields_to_embed: Optional[List[str]] = None,
embedding_separator: str = "\n",
):
"""
Create a NvidiaTextEmbedder component.

:param model:
Embedding model to use.
:param api_key:
API key for the NVIDIA AI Foundation Endpoints.
:param prefix:
A string to add to the beginning of each text.
:param suffix:
A string to add to the end of each text.
:param batch_size:
Number of Documents to encode at once.
Cannot be greater than 50.
:param progress_bar:
Whether to show a progress bar or not.
:param meta_fields_to_embed:
List of meta fields that should be embedded along with the Document text.
:param embedding_separator:
Separator used to concatenate the meta fields to the Document text.
"""

if isinstance(model, str):
model = NvidiaEmbeddingModel.from_str(model)

resolved_api_key = api_key.resolve_value()
assert resolved_api_key is not None

# Upper-limit for the endpoint.
if batch_size > MAX_INPUTS:
msg = f"NVIDIA Cloud Functions currently support a maximum batch size of {MAX_INPUTS}."
raise ValueError(msg)

self.api_key = api_key
self.model = model
self.prefix = prefix
self.suffix = suffix
self.batch_size = batch_size
self.progress_bar = progress_bar
self.meta_fields_to_embed = meta_fields_to_embed or []
self.embedding_separator = embedding_separator

self.client = NvidiaCloudFunctionsClient(
api_key=resolved_api_key,
headers={
"Content-Type": "application/json",
"Accept": "application/json",
},
)
self.nvcf_id = None
self._initialized = False

def warm_up(self):
"""
Initializes the component.
"""
if self._initialized:
return

self.nvcf_id = get_model_nvcf_id(self.model, self.client)
self._initialized = True

def to_dict(self) -> Dict[str, Any]:
"""
Serializes the component to a dictionary.

:returns:
Dictionary with serialized data.
"""
return default_to_dict(
self,
api_key=self.api_key.to_dict(),
model=str(self.model),
prefix=self.prefix,
suffix=self.suffix,
batch_size=self.batch_size,
progress_bar=self.progress_bar,
meta_fields_to_embed=self.meta_fields_to_embed,
embedding_separator=self.embedding_separator,
)

@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "NvidiaDocumentEmbedder":
"""
Deserializes the component from a dictionary.

:param data:
The dictionary to deserialize from.
:returns:
The deserialized component.
"""
data["init_parameters"]["model"] = NvidiaEmbeddingModel.from_str(data["init_parameters"]["model"])
deserialize_secrets_inplace(data["init_parameters"], keys=["api_key"])
return default_from_dict(cls, data)

def _prepare_texts_to_embed(self, documents: List[Document]) -> List[str]:
texts_to_embed = []
for doc in documents:
meta_values_to_embed = [
str(doc.meta[key]) for key in self.meta_fields_to_embed if key in doc.meta and doc.meta[key] is not None
]
text_to_embed = (
self.prefix + self.embedding_separator.join([*meta_values_to_embed, doc.content or ""]) + self.suffix
)
texts_to_embed.append(text_to_embed)

return texts_to_embed

def _embed_batch(self, texts_to_embed: List[str], batch_size: int) -> Tuple[List[List[float]], Dict[str, Any]]:
all_embeddings: List[List[float]] = []
usage = Usage(prompt_tokens=0, total_tokens=0)
assert self.nvcf_id is not None

for i in tqdm(
range(0, len(texts_to_embed), batch_size), disable=not self.progress_bar, desc="Calculating embeddings"
):
batch = texts_to_embed[i : i + batch_size]

request = EmbeddingsRequest(input=batch, model="passage").to_dict()
json_response = self.client.query_function(self.nvcf_id, request)
response = EmbeddingsResponse.from_dict(json_response)

# Sort resulting embeddings by index
assert all(isinstance(r.embedding, list) for r in response.data)
sorted_embeddings: List[List[float]] = [r.embedding for r in sorted(response.data, key=lambda e: e.index)] # type: ignore
all_embeddings.extend(sorted_embeddings)

usage.prompt_tokens += response.usage.prompt_tokens
usage.total_tokens += response.usage.total_tokens

return all_embeddings, {"usage": usage.to_dict()}

@component.output_types(documents=List[Document], meta=Dict[str, Any])
def run(self, documents: List[Document]):
"""
Embed a list of Documents.

The embedding of each Document is stored in the `embedding` field of the Document.

:param documents:
A list of Documents to embed.
:returns:
A dictionary with the following keys and values:
- `documents` - List of processed Documents with embeddings.
- `meta` - Metadata on usage statistics, etc.
:raises RuntimeError:
If the component was not initialized.
:raises TypeError:
If the input is not a string.
"""
if not self._initialized:
msg = "The embedding model has not been loaded. Please call warm_up() before running."
raise RuntimeError(msg)
if not isinstance(documents, list) or documents and not isinstance(documents[0], Document):
msg = (
"NvidiaDocumentEmbedder expects a list of Documents as input."
"In case you want to embed a string, please use the NvidiaTextEmbedder."
)
raise TypeError(msg)

texts_to_embed = self._prepare_texts_to_embed(documents)
embeddings, metadata = self._embed_batch(texts_to_embed, self.batch_size)
for doc, emb in zip(documents, embeddings):
doc.embedding = emb

return {"documents": documents, "meta": metadata}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from enum import Enum


class NvidiaEmbeddingModel(Enum):
"""
[NVIDIA AI Foundation models](https://catalog.ngc.nvidia.com/ai-foundation-models)
used for generating embeddings.
"""

#: [Retrieval QA Embedding Model](https://catalog.ngc.nvidia.com/orgs/nvidia/teams/ai-foundation/models/nvolve-40k).
NVOLVE_40K = "playground_nvolveqa_40k"

def __str__(self):
return self.value

@classmethod
def from_str(cls, string: str) -> "NvidiaEmbeddingModel":
"""
Create an embedding model from a string.

:param string:
String to convert.
:returns:
Embedding model.
"""
enum_map = {e.value: e for e in NvidiaEmbeddingModel}
emb_model = enum_map.get(string)
if emb_model is None:
msg = f"Unknown embedding model '{string}'. Supported modes are: {list(enum_map.keys())}"
raise ValueError(msg)
return emb_model
Loading