Skip to content

Commit

Permalink
feat: Add NvidiaTextEmbedder, NvidiaDocumentEmbedder and co.
Browse files Browse the repository at this point in the history
  • Loading branch information
shadeMe committed Mar 5, 2024
1 parent 52a8354 commit e747dc9
Show file tree
Hide file tree
Showing 12 changed files with 960 additions and 3 deletions.
7 changes: 6 additions & 1 deletion integrations/nvidia/pydoc/config.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
loaders:
- type: haystack_pydoc_tools.loaders.CustomPythonLoader
search_path: [../src]
modules: []
modules:
[
"haystack_integrations.components.embedders.nvidia.document_embedder",
"haystack_integrations.components.embedders.nvidia.text_embedder",
"haystack_integrations.components.embedders.nvidia.models",
]
ignore_when_discovered: ["__init__"]
processors:
- type: filter
Expand Down
1 change: 1 addition & 0 deletions integrations/nvidia/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ unfixable = [
# Don't touch unused imports
"F401",
]
extend-exclude = ["tests", "example"]

[tool.ruff.isort]
known-first-party = ["src"]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from .document_embedder import NvidiaDocumentEmbedder
from .models import NvidiaEmbeddingModel
from .text_embedder import NvidiaTextEmbedder

__all__ = [
"NvidiaDocumentEmbedder",
"NvidiaEmbeddingModel",
"NvidiaTextEmbedder",
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
from dataclasses import asdict, dataclass
from typing import Any, Dict, List, Literal, Union

from haystack_integrations.utils.nvidia import NvidiaCloudFunctionsClient

from .models import NvidiaEmbeddingModel

MAX_INPUT_STRING_LENGTH = 2048
MAX_INPUTS = 50


def get_model_nvcf_id(model: NvidiaEmbeddingModel, client: NvidiaCloudFunctionsClient) -> str:
"""
Returns the Nvidia Cloud Functions UUID for the given model.
"""

available_functions = client.available_functions()
func = available_functions.get(str(model))
if func is None:
msg = f"Model '{model}' was not found on the Nvidia Cloud Functions backend"
raise ValueError(msg)
elif func.status != "ACTIVE":
msg = f"Model '{model}' is not currently active/usable on the Nvidia Cloud Functions backend"
raise ValueError(msg)

return func.id


@dataclass
class EmbeddingsRequest:
input: Union[str, List[str]]
model: Literal["query", "passage"]
encoding_format: Literal["float", "base64"] = "float"

def __post_init__(self):
if isinstance(self.input, list):
if len(self.input) > MAX_INPUTS:
msg = f"The number of inputs should not exceed {MAX_INPUTS}"
raise ValueError(msg)
else:
self.input = [self.input]

if len(self.input) == 0:
msg = "The number of inputs should not be 0"
raise ValueError(msg)

if any(len(x) > MAX_INPUT_STRING_LENGTH for x in self.input):
msg = f"The length of each input should not exceed {MAX_INPUT_STRING_LENGTH} characters"
raise ValueError(msg)

if self.encoding_format not in ["float", "base64"]:
msg = "encoding_format should be either 'float' or 'base64'"
raise ValueError(msg)

if self.model not in ["query", "passage"]:
msg = "model should be either 'query' or 'passage'"
raise ValueError(msg)

def to_dict(self) -> Dict[str, Any]:
return asdict(self)


@dataclass
class Usage:
prompt_tokens: int
total_tokens: int

def to_dict(self) -> Dict[str, Any]:
return asdict(self)


@dataclass
class Embeddings:
index: int
embedding: Union[List[float], str]


@dataclass
class EmbeddingsResponse:
data: List[Embeddings]
usage: Usage

@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "EmbeddingsResponse":
try:
embeddings = [Embeddings(**x) for x in data["data"]]
usage = Usage(**data["usage"])
return cls(data=embeddings, usage=usage)
except (KeyError, TypeError) as e:
msg = f"Failed to parse EmbeddingsResponse from data: {data}"
raise ValueError(msg) from e
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
from typing import Any, Dict, List, Optional, Tuple, Union

from haystack import Document, component, default_from_dict, default_to_dict
from haystack.utils import Secret, deserialize_secrets_inplace
from haystack_integrations.utils.nvidia import NvidiaCloudFunctionsClient
from tqdm import tqdm

from ._schema import MAX_INPUTS, EmbeddingsRequest, EmbeddingsResponse, Usage, get_model_nvcf_id
from .models import NvidiaEmbeddingModel


@component
class NvidiaDocumentEmbedder:
"""
A component for embedding documents using embedding models provided by
[NVIDIA AI Foundation Endpoints](https://www.nvidia.com/en-us/ai-data-science/foundation-models/).
Usage example:
```python
from haystack_integrations.components.embedders.nvidia import NvidiaDocumentEmbedder, NvidiaEmbeddingModel
doc = Document(content="I love pizza!")
text_embedder = NvidiaDocumentEmbedder(model=NvidiaEmbeddingModel.NVOLVE_40K)
text_embedder.warm_up()
result = document_embedder.run([doc])
print(result["documents"][0].embedding)
```
"""

def __init__(
self,
model: Union[str, NvidiaEmbeddingModel],
api_key: Secret = Secret.from_env_var("NVIDIA_API_KEY"),
prefix: str = "",
suffix: str = "",
batch_size: int = 32,
progress_bar: bool = True,
meta_fields_to_embed: Optional[List[str]] = None,
embedding_separator: str = "\n",
):
"""
Create a NvidiaTextEmbedder component.
:param model:
Embedding model to use.
:param api_key:
API key for the NVIDIA AI Foundation Endpoints.
:param prefix:
A string to add to the beginning of each text.
:param suffix:
A string to add to the end of each text.
:param batch_size:
Number of Documents to encode at once.
Cannot be greater than 50.
:param progress_bar:
Whether to show a progress bar or not.
:param meta_fields_to_embed:
List of meta fields that should be embedded along with the Document text.
:param embedding_separator:
Separator used to concatenate the meta fields to the Document text.
"""

if isinstance(model, str):
model = NvidiaEmbeddingModel.from_str(model)

resolved_api_key = api_key.resolve_value()
assert resolved_api_key is not None

# Upper-limit for the endpoint.
if batch_size > MAX_INPUTS:
msg = f"NVIDIA Cloud Functions currently support a maximum batch size of {MAX_INPUTS}."
raise ValueError(msg)

self.api_key = api_key
self.model = model
self.prefix = prefix
self.suffix = suffix
self.batch_size = batch_size
self.progress_bar = progress_bar
self.meta_fields_to_embed = meta_fields_to_embed or []
self.embedding_separator = embedding_separator

self.client = NvidiaCloudFunctionsClient(
api_key=resolved_api_key,
headers={
"Content-Type": "application/json",
"Accept": "application/json",
},
)
self.nvcf_id = None
self._initialized = False

def warm_up(self):
"""
Initializes the component.
"""
if self._initialized:
return

self.nvcf_id = get_model_nvcf_id(self.model, self.client)
self._initialized = True

def to_dict(self) -> Dict[str, Any]:
"""
Serializes the component to a dictionary.
:returns:
Dictionary with serialized data.
"""
return default_to_dict(
self,
api_key=self.api_key.to_dict(),
model=str(self.model),
prefix=self.prefix,
suffix=self.suffix,
batch_size=self.batch_size,
progress_bar=self.progress_bar,
meta_fields_to_embed=self.meta_fields_to_embed,
embedding_separator=self.embedding_separator,
)

@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "NvidiaDocumentEmbedder":
"""
Deserializes the component from a dictionary.
:param data:
The dictionary to deserialize from.
:returns:
The deserialized component.
"""
data["init_parameters"]["model"] = NvidiaEmbeddingModel.from_str(data["init_parameters"]["model"])
deserialize_secrets_inplace(data["init_parameters"], keys=["api_key"])
return default_from_dict(cls, data)

def _prepare_texts_to_embed(self, documents: List[Document]) -> List[str]:
texts_to_embed = []
for doc in documents:
meta_values_to_embed = [
str(doc.meta[key]) for key in self.meta_fields_to_embed if key in doc.meta and doc.meta[key] is not None
]
text_to_embed = (
self.prefix + self.embedding_separator.join([*meta_values_to_embed, doc.content or ""]) + self.suffix
)
texts_to_embed.append(text_to_embed)

return texts_to_embed

def _embed_batch(self, texts_to_embed: List[str], batch_size: int) -> Tuple[List[List[float]], Dict[str, Any]]:
all_embeddings: List[List[float]] = []
usage = Usage(prompt_tokens=0, total_tokens=0)
assert self.nvcf_id is not None

for i in tqdm(
range(0, len(texts_to_embed), batch_size), disable=not self.progress_bar, desc="Calculating embeddings"
):
batch = texts_to_embed[i : i + batch_size]

request = EmbeddingsRequest(input=batch, model="passage").to_dict()
json_response = self.client.query_function(self.nvcf_id, request)
response = EmbeddingsResponse.from_dict(json_response)

# Sort resulting embeddings by index
assert all(isinstance(r.embedding, list) for r in response.data)
sorted_embeddings: List[List[float]] = [r.embedding for r in sorted(response.data, key=lambda e: e.index)] # type: ignore
all_embeddings.extend(sorted_embeddings)

usage.prompt_tokens += response.usage.prompt_tokens
usage.total_tokens += response.usage.total_tokens

return all_embeddings, {"usage": usage.to_dict()}

@component.output_types(documents=List[Document], meta=Dict[str, Any])
def run(self, documents: List[Document]):
"""
Embed a list of Documents.
The embedding of each Document is stored in the `embedding` field of the Document.
:param documents:
A list of Documents to embed.
:returns:
A dictionary with the following keys and values:
- `documents` - List of processed Documents with embeddings.
- `meta` - Metadata on usage statistics, etc.
:raises RuntimeError:
If the component was not initialized.
:raises TypeError:
If the input is not a string.
"""
if not self._initialized:
msg = "The embedding model has not been loaded. Please call warm_up() before running."
raise RuntimeError(msg)
if not isinstance(documents, list) or documents and not isinstance(documents[0], Document):
msg = (
"NvidiaDocumentEmbedder expects a list of Documents as input."
"In case you want to embed a string, please use the NvidiaTextEmbedder."
)
raise TypeError(msg)

texts_to_embed = self._prepare_texts_to_embed(documents)
embeddings, metadata = self._embed_batch(texts_to_embed, self.batch_size)
for doc, emb in zip(documents, embeddings):
doc.embedding = emb

return {"documents": documents, "meta": metadata}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from enum import Enum


class NvidiaEmbeddingModel(Enum):
"""
[NVIDIA AI Foundation models](https://catalog.ngc.nvidia.com/ai-foundation-models)
used for generating embeddings.
"""

#: [Retrieval QA Embedding Model](https://catalog.ngc.nvidia.com/orgs/nvidia/teams/ai-foundation/models/nvolve-40k).
NVOLVE_40K = "playground_nvolveqa_40k"

def __str__(self):
return self.value

@classmethod
def from_str(cls, string: str) -> "NvidiaEmbeddingModel":
"""
Create an embedding model from a string.
:param string:
String to convert.
:returns:
Embedding model.
"""
enum_map = {e.value: e for e in NvidiaEmbeddingModel}
emb_model = enum_map.get(string)
if emb_model is None:
msg = f"Unknown embedding model '{string}'. Supported modes are: {list(enum_map.keys())}"
raise ValueError(msg)
return emb_model
Loading

0 comments on commit e747dc9

Please sign in to comment.