Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add NIM backend support #597

Merged
merged 20 commits into from
Mar 18, 2024
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion integrations/nvidia/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ classifiers = [
"Programming Language :: Python :: Implementation :: CPython",
"Programming Language :: Python :: Implementation :: PyPy",
]
dependencies = ["haystack-ai>=2.0.0b6"]
dependencies = ["haystack-ai", "requests"]

[project.urls]
Documentation = "https://github.com/deepset-ai/haystack-core-integrations/tree/main/integrations/nvidia#readme"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
from .document_embedder import NvidiaDocumentEmbedder
from .models import NvidiaEmbeddingModel
from .text_embedder import NvidiaTextEmbedder

__all__ = [
"NvidiaDocumentEmbedder",
"NvidiaEmbeddingModel",
"NvidiaTextEmbedder",
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
from typing import Any, Dict, List, Optional, Tuple

import requests

from .backend import EmbedderBackend

REQUEST_TIMEOUT = 60


class NimBackend(EmbedderBackend):
def __init__(
self,
model: str,
api_url: str,
model_kwargs: Optional[Dict[str, Any]] = None,
):
headers = {
"Content-Type": "application/json",
"accept": "application/json",
}
self.session = requests.Session()
self.session.headers.update(headers)

self.model = model
self.api_url = api_url
self.model_kwargs = model_kwargs or {}

def embed(self, texts: List[str]) -> Tuple[List[List[float]], Dict[str, Any]]:
url = f"{self.api_url}/embeddings"

res = self.session.post(
url,
json={
"model": self.model,
"input": texts,
**self.model_kwargs,
},
timeout=REQUEST_TIMEOUT,
)
res.raise_for_status()

data = res.json()
# Sort the embeddings by index, we don't know whether they're out of order or not
embeddings = [e["embedding"] for e in sorted(data["data"], key=lambda e: e["index"])]

return embeddings, {"usage": data["usage"]}
Original file line number Diff line number Diff line change
@@ -1,8 +1,46 @@
from dataclasses import asdict, dataclass
from typing import Any, Dict, List, Literal, Union
from typing import Any, Dict, List, Literal, Optional, Tuple, Union

from haystack.utils.auth import Secret
from haystack_integrations.utils.nvidia import NvidiaCloudFunctionsClient

from .backend import EmbedderBackend

MAX_INPUT_STRING_LENGTH = 2048
MAX_INPUTS = 50


class NvcfBackend(EmbedderBackend):
def __init__(
self,
model: str,
api_key: Secret,
model_kwargs: Optional[Dict[str, Any]] = None,
):
if not model.startswith("playground_"):
model = f"playground_{model}"

super().__init__(model=model, model_kwargs=model_kwargs)

self.api_key = api_key
self.client = NvidiaCloudFunctionsClient(
api_key=api_key,
headers={
"Content-Type": "application/json",
"Accept": "application/json",
},
)
self.nvcf_id = self.client.get_model_nvcf_id(self.model_name)

def embed(self, texts: List[str]) -> Tuple[List[List[float]], Dict[str, Any]]:
request = EmbeddingsRequest(input=texts, **self.model_kwargs).to_dict()
json_response = self.client.query_function(self.nvcf_id, request)
response = EmbeddingsResponse.from_dict(json_response)

# Sort resulting embeddings by index
assert all(isinstance(r.embedding, list) for r in response.data)
sorted_embeddings: List[List[float]] = [r.embedding for r in sorted(response.data, key=lambda e: e.index)] # type: ignore
metadata = {"usage": response.usage.to_dict()}
return sorted_embeddings, metadata


@dataclass
Expand All @@ -12,11 +50,7 @@ class EmbeddingsRequest:
encoding_format: Literal["float", "base64"] = "float"

def __post_init__(self):
if isinstance(self.input, list):
if len(self.input) > MAX_INPUTS:
msg = f"The number of inputs should not exceed {MAX_INPUTS}"
raise ValueError(msg)
else:
if not isinstance(self.input, list):
shadeMe marked this conversation as resolved.
Show resolved Hide resolved
self.input = [self.input]

if len(self.input) == 0:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from abc import ABC, abstractmethod
from typing import Any, Dict, List, Optional, Tuple


class EmbedderBackend(ABC):
def __init__(self, model: str, model_kwargs: Optional[Dict[str, Any]] = None):
"""
Initialize the backend.

:param model:
The name of the model to use.
:param model_kwargs:
Additional keyword arguments to pass to the model.
"""
self.model_name = model
self.model_kwargs = model_kwargs or {}

@abstractmethod
def embed(self, texts: List[str]) -> Tuple[List[List[float]], Dict[str, Any]]:
"""
Invoke the backend and embed the given texts.

:param texts:
Texts to embed.
:return:
Vector representation of the texts and
metadata returned by the service.
"""
pass
Original file line number Diff line number Diff line change
@@ -1,27 +1,28 @@
from typing import Any, Dict, List, Optional, Tuple, Union
from typing import Any, Dict, List, Optional, Tuple

from haystack import Document, component, default_from_dict, default_to_dict
from haystack.utils import Secret, deserialize_secrets_inplace
from haystack_integrations.utils.nvidia import NvidiaCloudFunctionsClient
from tqdm import tqdm

from ._schema import MAX_INPUTS, EmbeddingsRequest, EmbeddingsResponse, Usage
from .models import NvidiaEmbeddingModel
from ._nim_backend import NimBackend
from ._nvcf_backend import NvcfBackend
from .backend import EmbedderBackend


@component
class NvidiaDocumentEmbedder:
"""
A component for embedding documents using embedding models provided by
[NVIDIA AI Foundation Endpoints](https://www.nvidia.com/en-us/ai-data-science/foundation-models/).
[NVIDIA AI Foundation Endpoints](https://www.nvidia.com/en-us/ai-data-science/foundation-models/)
and NVIDIA NeMo Inference Microservices.

Usage example:
```python
from haystack_integrations.components.embedders.nvidia import NvidiaDocumentEmbedder, NvidiaEmbeddingModel

doc = Document(content="I love pizza!")

text_embedder = NvidiaDocumentEmbedder(model=NvidiaEmbeddingModel.NVOLVE_40K)
text_embedder = NvidiaDocumentEmbedder(model="nvolveqa_40k")
text_embedder.warm_up()

result = document_embedder.run([doc])
Expand All @@ -31,8 +32,9 @@ class NvidiaDocumentEmbedder:

def __init__(
self,
model: Union[str, NvidiaEmbeddingModel],
api_key: Secret = Secret.from_env_var("NVIDIA_API_KEY"),
model: str,
api_key: Optional[Secret] = Secret.from_env_var("NVIDIA_API_KEY"),
api_url: Optional[str] = None,
prefix: str = "",
suffix: str = "",
batch_size: int = 32,
Expand All @@ -47,6 +49,8 @@ def __init__(
Embedding model to use.
:param api_key:
API key for the NVIDIA AI Foundation Endpoints.
:param api_url:
Custom API URL for the NVIDIA NeMo Inference Microservices.
:param prefix:
A string to add to the beginning of each text.
:param suffix:
Expand All @@ -62,31 +66,17 @@ def __init__(
Separator used to concatenate the meta fields to the Document text.
"""

if isinstance(model, str):
model = NvidiaEmbeddingModel.from_str(model)

# Upper-limit for the endpoint.
if batch_size > MAX_INPUTS:
msg = f"NVIDIA Cloud Functions currently support a maximum batch size of {MAX_INPUTS}."
raise ValueError(msg)

self.api_key = api_key
self.model = model
self.api_url = api_url
self.prefix = prefix
self.suffix = suffix
self.batch_size = batch_size
self.progress_bar = progress_bar
self.meta_fields_to_embed = meta_fields_to_embed or []
self.embedding_separator = embedding_separator

self.client = NvidiaCloudFunctionsClient(
api_key=api_key,
headers={
"Content-Type": "application/json",
"Accept": "application/json",
},
)
self.nvcf_id = None
self.backend: Optional[EmbedderBackend] = None
self._initialized = False

def warm_up(self):
Expand All @@ -96,7 +86,15 @@ def warm_up(self):
if self._initialized:
return

self.nvcf_id = self.client.get_model_nvcf_id(str(self.model))
if self.api_url is None:
if self.api_key is None:
msg = "API key is required for NVIDIA AI Foundation Endpoints."
raise ValueError(msg)

self.backend = NvcfBackend(self.model, api_key=self.api_key, model_kwargs={"model": "passage"})
else:
self.backend = NimBackend(self.model, api_url=self.api_url, model_kwargs={"input_type": "passage"})

self._initialized = True

def to_dict(self) -> Dict[str, Any]:
Expand All @@ -108,8 +106,9 @@ def to_dict(self) -> Dict[str, Any]:
"""
return default_to_dict(
self,
api_key=self.api_key.to_dict(),
model=str(self.model),
api_key=self.api_key.to_dict() if self.api_key else None,
model=self.model,
api_url=self.api_url,
prefix=self.prefix,
suffix=self.suffix,
batch_size=self.batch_size,
Expand All @@ -128,7 +127,6 @@ def from_dict(cls, data: Dict[str, Any]) -> "NvidiaDocumentEmbedder":
:returns:
The deserialized component.
"""
data["init_parameters"]["model"] = NvidiaEmbeddingModel.from_str(data["init_parameters"]["model"])
deserialize_secrets_inplace(data["init_parameters"], keys=["api_key"])
return default_from_dict(cls, data)

Expand All @@ -147,27 +145,23 @@ def _prepare_texts_to_embed(self, documents: List[Document]) -> List[str]:

def _embed_batch(self, texts_to_embed: List[str], batch_size: int) -> Tuple[List[List[float]], Dict[str, Any]]:
all_embeddings: List[List[float]] = []
usage = Usage(prompt_tokens=0, total_tokens=0)
assert self.nvcf_id is not None
usage_prompt_tokens = 0
usage_total_tokens = 0

assert self.backend is not None

for i in tqdm(
range(0, len(texts_to_embed), batch_size), disable=not self.progress_bar, desc="Calculating embeddings"
):
batch = texts_to_embed[i : i + batch_size]

request = EmbeddingsRequest(input=batch, model="passage").to_dict()
json_response = self.client.query_function(self.nvcf_id, request)
response = EmbeddingsResponse.from_dict(json_response)

# Sort resulting embeddings by index
assert all(isinstance(r.embedding, list) for r in response.data)
sorted_embeddings: List[List[float]] = [r.embedding for r in sorted(response.data, key=lambda e: e.index)] # type: ignore
sorted_embeddings, meta = self.backend.embed(batch)
all_embeddings.extend(sorted_embeddings)

usage.prompt_tokens += response.usage.prompt_tokens
usage.total_tokens += response.usage.total_tokens
usage_prompt_tokens += meta.get("usage", {}).get("prompt_tokens", 0)
usage_total_tokens += meta.get("usage", {}).get("total_tokens", 0)

return all_embeddings, {"usage": usage.to_dict()}
return all_embeddings, {"usage": {"prompt_tokens": usage_prompt_tokens, "total_tokens": usage_total_tokens}}

@component.output_types(documents=List[Document], meta=Dict[str, Any])
def run(self, documents: List[Document]):
Expand Down

This file was deleted.

Loading
Loading