Skip to content

Commit

Permalink
Add NIM backend support (#597)
Browse files Browse the repository at this point in the history
* Abstract service endpoint backend

* Abstract generators backend

* Implement NimBackend

* Implement NimBackend for embedders

* Fix embedders backends arguments

* Fix text embedder backend arguments

* Make embedders nim backend consistent with nvcf one

* Fix tests

* Update headers, the generator endpoint, and the embedders input_type param

* Update docstrings

* Make api_key optional in generator

* Remove api_key from NIM backend

* Move usage in metadata in generator

* Update tests

* Remove OPENAI_API_KEY env var from workflow

* Fix integration tests

* Fix linting

* Fix linting again

* Address PR comments

* Fix NVCF backend

---------

Co-authored-by: shadeMe <[email protected]>
  • Loading branch information
silvanocerza and shadeMe authored Mar 18, 2024
1 parent 1c31530 commit 5a8796b
Show file tree
Hide file tree
Showing 17 changed files with 662 additions and 297 deletions.
2 changes: 1 addition & 1 deletion integrations/nvidia/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ classifiers = [
"Programming Language :: Python :: Implementation :: CPython",
"Programming Language :: Python :: Implementation :: PyPy",
]
dependencies = ["haystack-ai>=2.0.0b6"]
dependencies = ["haystack-ai", "requests"]

[project.urls]
Documentation = "https://github.com/deepset-ai/haystack-core-integrations/tree/main/integrations/nvidia#readme"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
from .document_embedder import NvidiaDocumentEmbedder
from .models import NvidiaEmbeddingModel
from .text_embedder import NvidiaTextEmbedder

__all__ = [
"NvidiaDocumentEmbedder",
"NvidiaEmbeddingModel",
"NvidiaTextEmbedder",
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
from typing import Any, Dict, List, Optional, Tuple

import requests

from .backend import EmbedderBackend

REQUEST_TIMEOUT = 60


class NimBackend(EmbedderBackend):
def __init__(
self,
model: str,
api_url: str,
model_kwargs: Optional[Dict[str, Any]] = None,
):
headers = {
"Content-Type": "application/json",
"accept": "application/json",
}
self.session = requests.Session()
self.session.headers.update(headers)

self.model = model
self.api_url = api_url
self.model_kwargs = model_kwargs or {}

def embed(self, texts: List[str]) -> Tuple[List[List[float]], Dict[str, Any]]:
url = f"{self.api_url}/embeddings"

res = self.session.post(
url,
json={
"model": self.model,
"input": texts,
**self.model_kwargs,
},
timeout=REQUEST_TIMEOUT,
)
res.raise_for_status()

data = res.json()
# Sort the embeddings by index, we don't know whether they're out of order or not
embeddings = [e["embedding"] for e in sorted(data["data"], key=lambda e: e["index"])]

return embeddings, {"usage": data["usage"]}
Original file line number Diff line number Diff line change
@@ -1,10 +1,49 @@
from dataclasses import asdict, dataclass
from typing import Any, Dict, List, Literal, Union
from typing import Any, Dict, List, Literal, Optional, Tuple, Union

from haystack.utils.auth import Secret
from haystack_integrations.utils.nvidia import NvidiaCloudFunctionsClient

from .backend import EmbedderBackend

MAX_INPUT_STRING_LENGTH = 2048
MAX_INPUTS = 50


class NvcfBackend(EmbedderBackend):
def __init__(
self,
model: str,
api_key: Secret,
model_kwargs: Optional[Dict[str, Any]] = None,
):
if not model.startswith("playground_"):
model = f"playground_{model}"

super().__init__(model=model, model_kwargs=model_kwargs)

self.api_key = api_key
self.client = NvidiaCloudFunctionsClient(
api_key=api_key,
headers={
"Content-Type": "application/json",
"Accept": "application/json",
},
)
self.nvcf_id = self.client.get_model_nvcf_id(self.model_name)

def embed(self, texts: List[str]) -> Tuple[List[List[float]], Dict[str, Any]]:
request = EmbeddingsRequest(input=texts, **self.model_kwargs).to_dict()
json_response = self.client.query_function(self.nvcf_id, request)
response = EmbeddingsResponse.from_dict(json_response)

# Sort resulting embeddings by index
assert all(isinstance(r.embedding, list) for r in response.data)
sorted_embeddings: List[List[float]] = [r.embedding for r in sorted(response.data, key=lambda e: e.index)] # type: ignore
metadata = {"usage": response.usage.to_dict()}
return sorted_embeddings, metadata


@dataclass
class EmbeddingsRequest:
input: Union[str, List[str]]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from abc import ABC, abstractmethod
from typing import Any, Dict, List, Optional, Tuple


class EmbedderBackend(ABC):
def __init__(self, model: str, model_kwargs: Optional[Dict[str, Any]] = None):
"""
Initialize the backend.
:param model:
The name of the model to use.
:param model_kwargs:
Additional keyword arguments to pass to the model.
"""
self.model_name = model
self.model_kwargs = model_kwargs or {}

@abstractmethod
def embed(self, texts: List[str]) -> Tuple[List[List[float]], Dict[str, Any]]:
"""
Invoke the backend and embed the given texts.
:param texts:
Texts to embed.
:return:
Vector representation of the texts and
metadata returned by the service.
"""
pass
Original file line number Diff line number Diff line change
@@ -1,27 +1,28 @@
from typing import Any, Dict, List, Optional, Tuple, Union
from typing import Any, Dict, List, Optional, Tuple

from haystack import Document, component, default_from_dict, default_to_dict
from haystack.utils import Secret, deserialize_secrets_inplace
from haystack_integrations.utils.nvidia import NvidiaCloudFunctionsClient
from tqdm import tqdm

from ._schema import MAX_INPUTS, EmbeddingsRequest, EmbeddingsResponse, Usage
from .models import NvidiaEmbeddingModel
from ._nim_backend import NimBackend
from ._nvcf_backend import NvcfBackend
from .backend import EmbedderBackend


@component
class NvidiaDocumentEmbedder:
"""
A component for embedding documents using embedding models provided by
[NVIDIA AI Foundation Endpoints](https://www.nvidia.com/en-us/ai-data-science/foundation-models/).
[NVIDIA AI Foundation Endpoints](https://www.nvidia.com/en-us/ai-data-science/foundation-models/)
and NVIDIA NeMo Inference Microservices.
Usage example:
```python
from haystack_integrations.components.embedders.nvidia import NvidiaDocumentEmbedder, NvidiaEmbeddingModel
doc = Document(content="I love pizza!")
text_embedder = NvidiaDocumentEmbedder(model=NvidiaEmbeddingModel.NVOLVE_40K)
text_embedder = NvidiaDocumentEmbedder(model="nvolveqa_40k")
text_embedder.warm_up()
result = document_embedder.run([doc])
Expand All @@ -31,8 +32,9 @@ class NvidiaDocumentEmbedder:

def __init__(
self,
model: Union[str, NvidiaEmbeddingModel],
api_key: Secret = Secret.from_env_var("NVIDIA_API_KEY"),
model: str,
api_key: Optional[Secret] = Secret.from_env_var("NVIDIA_API_KEY"),
api_url: Optional[str] = None,
prefix: str = "",
suffix: str = "",
batch_size: int = 32,
Expand All @@ -47,6 +49,8 @@ def __init__(
Embedding model to use.
:param api_key:
API key for the NVIDIA AI Foundation Endpoints.
:param api_url:
Custom API URL for the NVIDIA NeMo Inference Microservices.
:param prefix:
A string to add to the beginning of each text.
:param suffix:
Expand All @@ -62,31 +66,17 @@ def __init__(
Separator used to concatenate the meta fields to the Document text.
"""

if isinstance(model, str):
model = NvidiaEmbeddingModel.from_str(model)

# Upper-limit for the endpoint.
if batch_size > MAX_INPUTS:
msg = f"NVIDIA Cloud Functions currently support a maximum batch size of {MAX_INPUTS}."
raise ValueError(msg)

self.api_key = api_key
self.model = model
self.api_url = api_url
self.prefix = prefix
self.suffix = suffix
self.batch_size = batch_size
self.progress_bar = progress_bar
self.meta_fields_to_embed = meta_fields_to_embed or []
self.embedding_separator = embedding_separator

self.client = NvidiaCloudFunctionsClient(
api_key=api_key,
headers={
"Content-Type": "application/json",
"Accept": "application/json",
},
)
self.nvcf_id = None
self.backend: Optional[EmbedderBackend] = None
self._initialized = False

def warm_up(self):
Expand All @@ -96,7 +86,15 @@ def warm_up(self):
if self._initialized:
return

self.nvcf_id = self.client.get_model_nvcf_id(str(self.model))
if self.api_url is None:
if self.api_key is None:
msg = "API key is required for NVIDIA AI Foundation Endpoints."
raise ValueError(msg)

self.backend = NvcfBackend(self.model, api_key=self.api_key, model_kwargs={"model": "passage"})
else:
self.backend = NimBackend(self.model, api_url=self.api_url, model_kwargs={"input_type": "passage"})

self._initialized = True

def to_dict(self) -> Dict[str, Any]:
Expand All @@ -108,8 +106,9 @@ def to_dict(self) -> Dict[str, Any]:
"""
return default_to_dict(
self,
api_key=self.api_key.to_dict(),
model=str(self.model),
api_key=self.api_key.to_dict() if self.api_key else None,
model=self.model,
api_url=self.api_url,
prefix=self.prefix,
suffix=self.suffix,
batch_size=self.batch_size,
Expand All @@ -128,7 +127,6 @@ def from_dict(cls, data: Dict[str, Any]) -> "NvidiaDocumentEmbedder":
:returns:
The deserialized component.
"""
data["init_parameters"]["model"] = NvidiaEmbeddingModel.from_str(data["init_parameters"]["model"])
deserialize_secrets_inplace(data["init_parameters"], keys=["api_key"])
return default_from_dict(cls, data)

Expand All @@ -147,27 +145,23 @@ def _prepare_texts_to_embed(self, documents: List[Document]) -> List[str]:

def _embed_batch(self, texts_to_embed: List[str], batch_size: int) -> Tuple[List[List[float]], Dict[str, Any]]:
all_embeddings: List[List[float]] = []
usage = Usage(prompt_tokens=0, total_tokens=0)
assert self.nvcf_id is not None
usage_prompt_tokens = 0
usage_total_tokens = 0

assert self.backend is not None

for i in tqdm(
range(0, len(texts_to_embed), batch_size), disable=not self.progress_bar, desc="Calculating embeddings"
):
batch = texts_to_embed[i : i + batch_size]

request = EmbeddingsRequest(input=batch, model="passage").to_dict()
json_response = self.client.query_function(self.nvcf_id, request)
response = EmbeddingsResponse.from_dict(json_response)

# Sort resulting embeddings by index
assert all(isinstance(r.embedding, list) for r in response.data)
sorted_embeddings: List[List[float]] = [r.embedding for r in sorted(response.data, key=lambda e: e.index)] # type: ignore
sorted_embeddings, meta = self.backend.embed(batch)
all_embeddings.extend(sorted_embeddings)

usage.prompt_tokens += response.usage.prompt_tokens
usage.total_tokens += response.usage.total_tokens
usage_prompt_tokens += meta.get("usage", {}).get("prompt_tokens", 0)
usage_total_tokens += meta.get("usage", {}).get("total_tokens", 0)

return all_embeddings, {"usage": usage.to_dict()}
return all_embeddings, {"usage": {"prompt_tokens": usage_prompt_tokens, "total_tokens": usage_total_tokens}}

@component.output_types(documents=List[Document], meta=Dict[str, Any])
def run(self, documents: List[Document]):
Expand Down

This file was deleted.

Loading

0 comments on commit 5a8796b

Please sign in to comment.