Skip to content

Commit

Permalink
Merge branch 'main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
mhaligowski authored Aug 1, 2024
2 parents 8951d5c + fa4c6cc commit 57a63a9
Show file tree
Hide file tree
Showing 24 changed files with 739 additions and 96 deletions.
52 changes: 39 additions & 13 deletions integrations/astra/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ pyenv local 3.9
Local install for the package
`pip install -e .`
To execute integration tests, add needed environment variables
`ASTRA_DB_API_ENDPOINT=<id>`
`ASTRA_DB_APPLICATION_TOKEN=<token>`
`ASTRA_DB_API_ENDPOINT="https://<id>-<region>.apps.astra.datastax.com"`,
`ASTRA_DB_APPLICATION_TOKEN="AstraCS:..."`
and execute
`python examples/example.py`

Expand All @@ -34,10 +34,10 @@ Install requirements

Export environment variables
```
export ASTRA_DB_API_ENDPOINT=
export ASTRA_DB_APPLICATION_TOKEN=
export COLLECTION_NAME=
export OPENAI_API_KEY=
export ASTRA_DB_API_ENDPOINT="https://<id>-<region>.apps.astra.datastax.com"
export ASTRA_DB_APPLICATION_TOKEN="AstraCS:..."
export COLLECTION_NAME="my_collection"
export OPENAI_API_KEY="sk-..."
```

run the python examples
Expand All @@ -59,19 +59,17 @@ from haystack.document_stores.types.policy import DuplicatePolicy

Load in environment variables:
```
api_endpoint = os.getenv("ASTRA_DB_API_ENDPOINT", "")
token = os.getenv("ASTRA_DB_APPLICATION_TOKEN", "")
collection_name = os.getenv("COLLECTION_NAME", "haystack_vector_search")
namespace = os.environ.get("ASTRA_DB_KEYSPACE")
collection_name = os.environ.get("COLLECTION_NAME", "haystack_vector_search")
```

Create the Document Store object:
Create the Document Store object (API Endpoint and Token are read off the environment):
```
document_store = AstraDocumentStore(
api_endpoint=api_endpoint,
token=token,
collection_name=collection_name,
namespace=namespace,
duplicates_policy=DuplicatePolicy.SKIP,
embedding_dim=384,
embedding_dimension=384,
)
```

Expand All @@ -92,3 +90,31 @@ Add your AstraEmbeddingRetriever into the pipeline

Add other components and connect them as desired. Then run your pipeline:
`pipeline.run(...)`

## Warnings about indexing

When creating an Astra DB document store, you may see a warning similar to the following:

> Astra DB collection '...' is detected as having indexing turned on for all fields (either created manually or by older versions of this plugin). This implies stricter limitations on the amount of text each string in a document can store. Consider indexing anew on a fresh collection to be able to store longer texts.
or,

> Astra DB collection '...' is detected as having the following indexing policy: {...}. This does not match the requested indexing policy for this object: {...}. In particular, there may be stricter limitations on the amount of text each string in a document can store. Consider indexing anew on a fresh collection to be able to store longer texts.

The reason for the warning is that the requested collection already exists on the database, and it is configured to [index all of its fields for search](https://docs.datastax.com/en/astra-db-serverless/api-reference/collections.html#the-indexing-option), possibly implicitly, by default. When the Haystack object tries to create it, it attempts to enforce, instead, an indexing policy tailored to the prospected usage: this is both to enable storing very long texts and to avoid indexing fields that will never be used in filtering a search (indexing those would also have a slight performance cost for writes).

Typically there are two reasons why you may encounter the warning:

1. you have created a collection by other means than letting this component do it for you: for example, through the Astra UI, or using AstraPy's `create_collection` method of class `Database` directly;
2. you have created the collection with an older version of the plugin.

Keep in mind that this is a warning and your application will continue running just fine, as long as you don't store very long texts.
However, should you need to add to the document store, for example, a document with a very long textual content, you will get an indexing error from the database.

### Remediation

You have several options:

- you can ignore the warning because you know your application will never need to store very long textual contents;
- if you can afford populating the collection anew, you can drop it and re-run the Haystack application: the collection will be created with the optimized indexing settings. **This is the recommended option, when possible**.
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,13 @@ def __init__(
caller_version=integration_version,
)

indexing_options = {"indexing": {"deny": NON_INDEXED_FIELDS}}
try:
# Create and connect to the newly created collection
self._astra_db_collection = self._astra_db.create_collection(
collection_name=collection_name,
dimension=embedding_dimension,
options={"indexing": {"deny": NON_INDEXED_FIELDS}},
options=indexing_options,
)
except APIRequestError:
# possibly the collection is preexisting and has legacy
Expand All @@ -98,35 +99,49 @@ def __init__(
if "indexing" not in pre_col_options:
warn(
(
f"Collection '{collection_name}' is detected as legacy"
" and has indexing turned on for all fields. This"
" implies stricter limitations on the amount of text"
" each entry can store. Consider reindexing anew on a"
" fresh collection to be able to store longer texts."
f"Astra DB collection '{collection_name}' is "
"detected as having indexing turned on for all "
"fields (either created manually or by older "
"versions of this plugin). This implies stricter "
"limitations on the amount of text each string in a "
"document can store. Consider indexing anew on a "
"fresh collection to be able to store longer texts. "
"See https://github.com/deepset-ai/haystack-core-"
"integrations/blob/main/integrations/astra/README"
".md#warnings-about-indexing for more details."
),
UserWarning,
stacklevel=2,
)
self._astra_db_collection = self._astra_db.collection(
collection_name=collection_name,
)
else:
options_json = json.dumps(pre_col_options["indexing"])
elif pre_col_options["indexing"] != indexing_options["indexing"]:
detected_options_json = json.dumps(pre_col_options["indexing"])
indexing_options_json = json.dumps(indexing_options["indexing"])
warn(
(
f"Collection '{collection_name}' has unexpected 'indexing'"
f" settings (options.indexing = {options_json})."
" This can result in odd behaviour when running "
" metadata filtering and/or unwarranted limitations"
" on storing long texts. Consider reindexing anew on a"
" fresh collection."
f"Astra DB collection '{collection_name}' is "
"detected as having the following indexing policy: "
f"{detected_options_json}. This does not match the requested "
f"indexing policy for this object: {indexing_options_json}. "
"In particular, there may be stricter "
"limitations on the amount of text each string in a "
"document can store. Consider indexing anew on a "
"fresh collection to be able to store longer texts. "
"See https://github.com/deepset-ai/haystack-core-"
"integrations/blob/main/integrations/astra/README"
".md#warnings-about-indexing for more details."
),
UserWarning,
stacklevel=2,
)
self._astra_db_collection = self._astra_db.collection(
collection_name=collection_name,
)
else:
# the collection mismatch lies elsewhere than the indexing
raise
else:
# other exception
raise
Expand Down
6 changes: 3 additions & 3 deletions integrations/langfuse/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ os.environ["LANGFUSE_HOST"] = "https://cloud.langfuse.com"
os.environ["TOKENIZERS_PARALLELISM"] = "false"
os.environ["HAYSTACK_CONTENT_TRACING_ENABLED"] = "true"

from haystack.components.builders import DynamicChatPromptBuilder
from haystack.components.builders import ChatPromptBuilder
from haystack.components.generators.chat import OpenAIChatGenerator
from haystack.dataclasses import ChatMessage
from haystack import Pipeline
Expand All @@ -46,7 +46,7 @@ from haystack_integrations.components.connectors.langfuse import LangfuseConnect
if __name__ == "__main__":
pipe = Pipeline()
pipe.add_component("tracer", LangfuseConnector("Chat example"))
pipe.add_component("prompt_builder", DynamicChatPromptBuilder())
pipe.add_component("prompt_builder", ChatPromptBuilder())
pipe.add_component("llm", OpenAIChatGenerator(model="gpt-3.5-turbo"))

pipe.connect("prompt_builder.prompt", "llm.messages")
Expand All @@ -57,7 +57,7 @@ if __name__ == "__main__":
]

response = pipe.run(
data={"prompt_builder": {"template_variables": {"location": "Berlin"}, "prompt_source": messages}}
data={"prompt_builder": {"template_variables": {"location": "Berlin"}, "template": messages}}
)
print(response["llm"]["replies"][0])
print(response["tracer"]["trace_url"])
Expand Down
8 changes: 3 additions & 5 deletions integrations/langfuse/example/chat.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
os.environ["HAYSTACK_CONTENT_TRACING_ENABLED"] = "true"

from haystack import Pipeline
from haystack.components.builders import DynamicChatPromptBuilder
from haystack.components.builders import ChatPromptBuilder
from haystack.components.generators.chat import OpenAIChatGenerator
from haystack.dataclasses import ChatMessage
from haystack_integrations.components.connectors.langfuse import LangfuseConnector
Expand All @@ -12,7 +12,7 @@

pipe = Pipeline()
pipe.add_component("tracer", LangfuseConnector("Chat example"))
pipe.add_component("prompt_builder", DynamicChatPromptBuilder())
pipe.add_component("prompt_builder", ChatPromptBuilder())
pipe.add_component("llm", OpenAIChatGenerator(model="gpt-3.5-turbo"))

pipe.connect("prompt_builder.prompt", "llm.messages")
Expand All @@ -22,8 +22,6 @@
ChatMessage.from_user("Tell me about {{location}}"),
]

response = pipe.run(
data={"prompt_builder": {"template_variables": {"location": "Berlin"}, "prompt_source": messages}}
)
response = pipe.run(data={"prompt_builder": {"template_variables": {"location": "Berlin"}, "template": messages}})
print(response["llm"]["replies"][0])
print(response["tracer"]["trace_url"])
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ class LangfuseConnector:
# ...
@app.on_event("shutdown")
async def shutdown_event():
tracer.actual_tracer.flush()
Expand All @@ -53,27 +54,35 @@ async def shutdown_event():
os.environ["HAYSTACK_CONTENT_TRACING_ENABLED"] = "true"
from haystack import Pipeline
from haystack.components.builders import DynamicChatPromptBuilder
from haystack.components.builders import ChatPromptBuilder
from haystack.components.generators.chat import OpenAIChatGenerator
from haystack.dataclasses import ChatMessage
from haystack_integrations.components.connectors.langfuse import LangfuseConnector
from haystack_integrations.components.connectors.langfuse import (
LangfuseConnector,
)
if __name__ == "__main__":
pipe = Pipeline()
pipe.add_component("tracer", LangfuseConnector("Chat example"))
pipe.add_component("prompt_builder", DynamicChatPromptBuilder())
pipe.add_component("prompt_builder", ChatPromptBuilder())
pipe.add_component("llm", OpenAIChatGenerator(model="gpt-3.5-turbo"))
pipe.connect("prompt_builder.prompt", "llm.messages")
messages = [
ChatMessage.from_system("Always respond in German even if some input data is in other languages."),
ChatMessage.from_system(
"Always respond in German even if some input data is in other languages."
),
ChatMessage.from_user("Tell me about {{location}}"),
]
response = pipe.run(
data={"prompt_builder": {"template_variables": {"location": "Berlin"}, "prompt_source": messages}}
data={
"prompt_builder": {
"template_variables": {"location": "Berlin"},
"template": messages,
}
}
)
print(response["llm"]["replies"][0])
print(response["tracer"]["trace_url"])
Expand Down
8 changes: 3 additions & 5 deletions integrations/langfuse/tests/test_tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import requests

from haystack import Pipeline
from haystack.components.builders import DynamicChatPromptBuilder
from haystack.components.builders import ChatPromptBuilder
from haystack.components.generators.chat import OpenAIChatGenerator
from haystack.dataclasses import ChatMessage
from requests.auth import HTTPBasicAuth
Expand All @@ -26,7 +26,7 @@ def test_tracing_integration():

pipe = Pipeline()
pipe.add_component("tracer", LangfuseConnector(name="Chat example", public=True)) # public so anyone can verify run
pipe.add_component("prompt_builder", DynamicChatPromptBuilder())
pipe.add_component("prompt_builder", ChatPromptBuilder())
pipe.add_component("llm", OpenAIChatGenerator(model="gpt-3.5-turbo"))

pipe.connect("prompt_builder.prompt", "llm.messages")
Expand All @@ -36,9 +36,7 @@ def test_tracing_integration():
ChatMessage.from_user("Tell me about {{location}}"),
]

response = pipe.run(
data={"prompt_builder": {"template_variables": {"location": "Berlin"}, "prompt_source": messages}}
)
response = pipe.run(data={"prompt_builder": {"template_variables": {"location": "Berlin"}, "template": messages}})
assert "Berlin" in response["llm"]["replies"][0].content
assert response["tracer"]["trace_url"]
url = "https://cloud.langfuse.com/api/public/traces/"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ def __init__(
msg = f'Invalid collection name: "{collection_name}". It can only contain letters, numbers, -, or _.'
raise ValueError(msg)

self.resolved_connection_string = mongo_connection_string.resolve_value()
self.mongo_connection_string = mongo_connection_string

self.database_name = database_name
Expand All @@ -95,7 +94,7 @@ def __init__(
def connection(self) -> MongoClient:
if self._connection is None:
self._connection = MongoClient(
self.resolved_connection_string, driver=DriverInfo(name="MongoDBAtlasHaystackIntegration")
self.mongo_connection_string.resolve_value(), driver=DriverInfo(name="MongoDBAtlasHaystackIntegration")
)

return self._connection
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@

from haystack import Document, component, default_from_dict, default_to_dict
from haystack.utils import Secret, deserialize_secrets_inplace
from haystack_integrations.utils.nvidia import url_validation
from tqdm import tqdm

from ._nim_backend import NimBackend
from .backend import EmbedderBackend
from .truncate import EmbeddingTruncateMode

_DEFAULT_API_URL = "https://ai.api.nvidia.com/v1/retrieval/nvidia"


@component
class NvidiaDocumentEmbedder:
Expand All @@ -33,7 +36,7 @@ def __init__(
self,
model: str = "NV-Embed-QA",
api_key: Optional[Secret] = Secret.from_env_var("NVIDIA_API_KEY"),
api_url: str = "https://ai.api.nvidia.com/v1/retrieval/nvidia",
api_url: str = _DEFAULT_API_URL,
prefix: str = "",
suffix: str = "",
batch_size: int = 32,
Expand All @@ -51,6 +54,7 @@ def __init__(
API key for the NVIDIA NIM.
:param api_url:
Custom API URL for the NVIDIA NIM.
Format for API URL is http://host:port
:param prefix:
A string to add to the beginning of each text.
:param suffix:
Expand All @@ -71,7 +75,7 @@ def __init__(

self.api_key = api_key
self.model = model
self.api_url = api_url
self.api_url = url_validation(api_url, _DEFAULT_API_URL, ["v1/embeddings"])
self.prefix = prefix
self.suffix = suffix
self.batch_size = batch_size
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@

from haystack import component, default_from_dict, default_to_dict
from haystack.utils import Secret, deserialize_secrets_inplace
from haystack_integrations.utils.nvidia import url_validation

from ._nim_backend import NimBackend
from .backend import EmbedderBackend
from .truncate import EmbeddingTruncateMode

_DEFAULT_API_URL = "https://ai.api.nvidia.com/v1/retrieval/nvidia"


@component
class NvidiaTextEmbedder:
Expand Down Expand Up @@ -34,7 +37,7 @@ def __init__(
self,
model: str = "NV-Embed-QA",
api_key: Optional[Secret] = Secret.from_env_var("NVIDIA_API_KEY"),
api_url: str = "https://ai.api.nvidia.com/v1/retrieval/nvidia",
api_url: str = _DEFAULT_API_URL,
prefix: str = "",
suffix: str = "",
truncate: Optional[Union[EmbeddingTruncateMode, str]] = None,
Expand All @@ -48,6 +51,7 @@ def __init__(
API key for the NVIDIA NIM.
:param api_url:
Custom API URL for the NVIDIA NIM.
Format for API URL is http://host:port
:param prefix:
A string to add to the beginning of each text.
:param suffix:
Expand All @@ -59,7 +63,7 @@ def __init__(

self.api_key = api_key
self.model = model
self.api_url = api_url
self.api_url = url_validation(api_url, _DEFAULT_API_URL, ["v1/embeddings"])
self.prefix = prefix
self.suffix = suffix

Expand Down
Loading

0 comments on commit 57a63a9

Please sign in to comment.