diff --git a/integrations/qdrant/src/haystack_integrations/components/retrievers/qdrant/__init__.py b/integrations/qdrant/src/haystack_integrations/components/retrievers/qdrant/__init__.py index e9cede428..ed6422bfe 100644 --- a/integrations/qdrant/src/haystack_integrations/components/retrievers/qdrant/__init__.py +++ b/integrations/qdrant/src/haystack_integrations/components/retrievers/qdrant/__init__.py @@ -2,6 +2,6 @@ # # SPDX-License-Identifier: Apache-2.0 -from .retriever import QdrantEmbeddingRetriever, QdrantSparseEmbeddingRetriever +from .retriever import QdrantEmbeddingRetriever, QdrantHybridRetriever, QdrantSparseEmbeddingRetriever -__all__ = ("QdrantEmbeddingRetriever", "QdrantSparseEmbeddingRetriever") +__all__ = ("QdrantEmbeddingRetriever", "QdrantSparseEmbeddingRetriever", "QdrantHybridRetriever") diff --git a/integrations/qdrant/src/haystack_integrations/components/retrievers/qdrant/retriever.py b/integrations/qdrant/src/haystack_integrations/components/retrievers/qdrant/retriever.py index 250a79550..7befa3612 100644 --- a/integrations/qdrant/src/haystack_integrations/components/retrievers/qdrant/retriever.py +++ b/integrations/qdrant/src/haystack_integrations/components/retrievers/qdrant/retriever.py @@ -19,8 +19,10 @@ class QdrantEmbeddingRetriever: ":memory:", recreate_index=True, return_embedding=True, - wait_result_from_api=True, ) + + document_store.write_documents([Document(content="test", embedding=[0.5]*768)]) + retriever = QdrantEmbeddingRetriever(document_store=document_store) # using a fake vector to keep the example simple @@ -112,7 +114,7 @@ def run( The retrieved documents. """ - docs = self._document_store.query_by_embedding( + docs = self._document_store._query_by_embedding( query_embedding=query_embedding, filters=filters or self._filters, top_k=top_k or self._top_k, @@ -136,10 +138,14 @@ class QdrantSparseEmbeddingRetriever: document_store = QdrantDocumentStore( ":memory:", + use_sparse_embeddings=True, recreate_index=True, return_embedding=True, - wait_result_from_api=True, ) + + doc = Document(content="test", sparse_embedding=SparseEmbedding(indices=[0, 3, 5], values=[0.1, 0.5, 0.12])) + document_store.write_documents([doc]) + retriever = QdrantSparseEmbeddingRetriever(document_store=document_store) sparse_embedding = SparseEmbedding(indices=[0, 1, 2, 3], values=[0.1, 0.8, 0.05, 0.33]) retriever.run(query_sparse_embedding=sparse_embedding) @@ -196,7 +202,7 @@ def to_dict(self) -> Dict[str, Any]: return d @classmethod - def from_dict(cls, data: Dict[str, Any]) -> "QdrantEmbeddingRetriever": + def from_dict(cls, data: Dict[str, Any]) -> "QdrantSparseEmbeddingRetriever": """ Deserializes the component from a dictionary. @@ -230,7 +236,7 @@ def run( The retrieved documents. """ - docs = self._document_store.query_by_sparse( + docs = self._document_store._query_by_sparse( query_sparse_embedding=query_sparse_embedding, filters=filters or self._filters, top_k=top_k or self._top_k, @@ -239,3 +245,124 @@ def run( ) return {"documents": docs} + + +@component +class QdrantHybridRetriever: + """ + A component for retrieving documents from an QdrantDocumentStore using both dense and sparse vectors + and fusing the results using Reciprocal Rank Fusion. + + Usage example: + ```python + from haystack_integrations.components.retrievers.qdrant import QdrantHybridRetriever + from haystack_integrations.document_stores.qdrant import QdrantDocumentStore + from haystack.dataclasses.sparse_embedding import SparseEmbedding + + document_store = QdrantDocumentStore( + ":memory:", + use_sparse_embeddings=True, + recreate_index=True, + return_embedding=True, + wait_result_from_api=True, + ) + + doc = Document(content="test", + embedding=[0.5]*768, + sparse_embedding=SparseEmbedding(indices=[0, 3, 5], values=[0.1, 0.5, 0.12])) + + document_store.write_documents([doc]) + + retriever = QdrantHybridRetriever(document_store=document_store) + embedding = [0.1]*768 + sparse_embedding = SparseEmbedding(indices=[0, 1, 2, 3], values=[0.1, 0.8, 0.05, 0.33]) + retriever.run(query_embedding=embedding, query_sparse_embedding=sparse_embedding) + ``` + """ + + def __init__( + self, + document_store: QdrantDocumentStore, + filters: Optional[Dict[str, Any]] = None, + top_k: int = 10, + return_embedding: bool = False, + ): + """ + Create a QdrantHybridRetriever component. + + :param document_store: An instance of QdrantDocumentStore. + :param filters: A dictionary with filters to narrow down the search space. + :param top_k: The maximum number of documents to retrieve. + :param return_embedding: Whether to return the embeddings of the retrieved Documents. + + :raises ValueError: If 'document_store' is not an instance of QdrantDocumentStore. + """ + + if not isinstance(document_store, QdrantDocumentStore): + msg = "document_store must be an instance of QdrantDocumentStore" + raise ValueError(msg) + + self._document_store = document_store + self._filters = filters + self._top_k = top_k + self._return_embedding = return_embedding + + def to_dict(self) -> Dict[str, Any]: + """ + Serializes the component to a dictionary. + + :returns: + Dictionary with serialized data. + """ + return default_to_dict( + self, + document_store=self._document_store.to_dict(), + filters=self._filters, + top_k=self._top_k, + return_embedding=self._return_embedding, + ) + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> "QdrantHybridRetriever": + """ + Deserializes the component from a dictionary. + + :param data: + Dictionary to deserialize from. + :returns: + Deserialized component. + """ + document_store = QdrantDocumentStore.from_dict(data["init_parameters"]["document_store"]) + data["init_parameters"]["document_store"] = document_store + return default_from_dict(cls, data) + + @component.output_types(documents=List[Document]) + def run( + self, + query_embedding: List[float], + query_sparse_embedding: SparseEmbedding, + filters: Optional[Dict[str, Any]] = None, + top_k: Optional[int] = None, + return_embedding: Optional[bool] = None, + ): + """ + Run the Sparse Embedding Retriever on the given input data. + + :param query_embedding: Dense embedding of the query. + :param query_sparse_embedding: Sparse embedding of the query. + :param filters: A dictionary with filters to narrow down the search space. + :param top_k: The maximum number of documents to return. + :param return_embedding: Whether to return the embedding of the retrieved Documents. + :returns: + The retrieved documents. + + """ + docs = self._document_store._query_hybrid( + query_embedding=query_embedding, + query_sparse_embedding=query_sparse_embedding, + filters=filters or self._filters, + top_k=top_k or self._top_k, + return_embedding=return_embedding or self._return_embedding, + ) + + return {"documents": docs} diff --git a/integrations/qdrant/src/haystack_integrations/document_stores/qdrant/document_store.py b/integrations/qdrant/src/haystack_integrations/document_stores/qdrant/document_store.py index f75e06689..120735411 100644 --- a/integrations/qdrant/src/haystack_integrations/document_stores/qdrant/document_store.py +++ b/integrations/qdrant/src/haystack_integrations/document_stores/qdrant/document_store.py @@ -16,6 +16,7 @@ from qdrant_client import grpc from qdrant_client.http import models as rest from qdrant_client.http.exceptions import UnexpectedResponse +from qdrant_client.hybrid.fusion import reciprocal_rank_fusion from tqdm import tqdm from .converters import ( @@ -307,7 +308,7 @@ def get_documents_by_id( ) return documents - def query_by_sparse( + def _query_by_sparse( self, query_sparse_embedding: SparseEmbedding, filters: Optional[Dict[str, Any]] = None, @@ -349,7 +350,7 @@ def query_by_sparse( document.score = score return results - def query_by_embedding( + def _query_by_embedding( self, query_embedding: List[float], filters: Optional[Dict[str, Any]] = None, @@ -383,6 +384,86 @@ def query_by_embedding( document.score = score return results + def _query_hybrid( + self, + query_embedding: List[float], + query_sparse_embedding: SparseEmbedding, + filters: Optional[Dict[str, Any]] = None, + top_k: int = 10, + return_embedding: bool = False, + ) -> List[Document]: + """ + Retrieves documents based on dense and sparse embeddings and fuses the results using Reciprocal Rank Fusion. + + This method is not part of the public interface of `QdrantDocumentStore` and shouldn't be used directly. + Use the `QdrantHybridRetriever` instead. + + :param query_embedding: Dense embedding of the query. + :param query_sparse_embedding: Sparse embedding of the query. + :param filters: Filters applied to the retrieved Documents. + :param top_k: Maximum number of Documents to return. + :param return_embedding: Whether to return the embeddings of the retrieved documents. + + :returns: List of Document that are most similar to `query_embedding` and `query_sparse_embedding`. + + :raises QdrantStoreError: + If the Document Store was initialized with `use_sparse_embeddings=False`. + """ + + # This implementation is based on the code from the Python Qdrant client: + # https://github.com/qdrant/qdrant-client/blob/8e3ea58f781e4110d11c0a6985b5e6bb66b85d33/qdrant_client/qdrant_fastembed.py#L519 + if not self.use_sparse_embeddings: + message = ( + "You are trying to query using sparse embeddings, but the Document Store " + "was initialized with `use_sparse_embeddings=False`. " + ) + raise QdrantStoreError(message) + + qdrant_filters = convert_filters_to_qdrant(filters) + + sparse_request = rest.SearchRequest( + vector=rest.NamedSparseVector( + name=SPARSE_VECTORS_NAME, + vector=rest.SparseVector( + indices=query_sparse_embedding.indices, + values=query_sparse_embedding.values, + ), + ), + filter=qdrant_filters, + limit=top_k, + with_payload=True, + with_vector=return_embedding, + ) + + dense_request = rest.SearchRequest( + vector=rest.NamedVector( + name=DENSE_VECTORS_NAME, + vector=query_embedding, + ), + filter=qdrant_filters, + limit=top_k, + with_payload=True, + with_vector=return_embedding, + ) + + try: + dense_request_response, sparse_request_response = self.client.search_batch( + collection_name=self.index, requests=[dense_request, sparse_request] + ) + except Exception as e: + msg = "Error during hybrid search" + raise QdrantStoreError(msg) from e + + try: + points = reciprocal_rank_fusion(responses=[dense_request_response, sparse_request_response], limit=top_k) + except Exception as e: + msg = "Error while applying Reciprocal Rank Fusion" + raise QdrantStoreError(msg) from e + + results = [convert_qdrant_point_to_haystack_document(point, use_sparse_embeddings=True) for point in points] + + return results + def _get_distance(self, similarity: str) -> rest.Distance: try: return self.SIMILARITY[similarity] diff --git a/integrations/qdrant/tests/conftest.py b/integrations/qdrant/tests/conftest.py new file mode 100644 index 000000000..3658e134a --- /dev/null +++ b/integrations/qdrant/tests/conftest.py @@ -0,0 +1,18 @@ +import numpy as np +import pytest +from haystack.dataclasses import SparseEmbedding + + +@pytest.fixture(scope="session") +def generate_sparse_embedding(): + """ + This fixture returns a function that generates a random SparseEmbedding each time it is called. + """ + + def _generate_random_sparse_embedding(): + random_indice_length = np.random.randint(3, 15) + indices = list(range(random_indice_length)) + values = [np.random.random_sample() for _ in range(random_indice_length)] + return SparseEmbedding(indices=indices, values=values) + + return _generate_random_sparse_embedding diff --git a/integrations/qdrant/tests/test_document_store.py b/integrations/qdrant/tests/test_document_store.py index 8316ee565..cbd5c62d0 100644 --- a/integrations/qdrant/tests/test_document_store.py +++ b/integrations/qdrant/tests/test_document_store.py @@ -1,18 +1,21 @@ from typing import List +from unittest.mock import patch import pytest from haystack import Document +from haystack.dataclasses import SparseEmbedding from haystack.document_stores.errors import DuplicateDocumentError from haystack.document_stores.types import DuplicatePolicy from haystack.testing.document_store import ( CountDocumentsTest, DeleteDocumentsTest, WriteDocumentsTest, + _random_embeddings, ) -from haystack_integrations.document_stores.qdrant import QdrantDocumentStore +from haystack_integrations.document_stores.qdrant.document_store import QdrantDocumentStore, QdrantStoreError -class TestQdrantStoreBaseTests(CountDocumentsTest, WriteDocumentsTest, DeleteDocumentsTest): +class TestQdrantDocumentStore(CountDocumentsTest, WriteDocumentsTest, DeleteDocumentsTest): @pytest.fixture def document_store(self) -> QdrantDocumentStore: return QdrantDocumentStore( @@ -20,6 +23,7 @@ def document_store(self) -> QdrantDocumentStore: recreate_index=True, return_embedding=True, wait_result_from_api=True, + use_sparse_embeddings=False, ) def assert_documents_are_equal(self, received: List[Document], expected: List[Document]): @@ -39,3 +43,62 @@ def test_write_documents(self, document_store: QdrantDocumentStore): assert document_store.write_documents(docs) == 1 with pytest.raises(DuplicateDocumentError): document_store.write_documents(docs, DuplicatePolicy.FAIL) + + def test_query_hybrid(self, generate_sparse_embedding): + document_store = QdrantDocumentStore(location=":memory:", use_sparse_embeddings=True) + + docs = [] + for i in range(20): + docs.append( + Document( + content=f"doc {i}", sparse_embedding=generate_sparse_embedding(), embedding=_random_embeddings(768) + ) + ) + + document_store.write_documents(docs) + + sparse_embedding = SparseEmbedding(indices=[0, 1, 2, 3], values=[0.1, 0.8, 0.05, 0.33]) + embedding = [0.1] * 768 + + results: List[Document] = document_store._query_hybrid( + query_sparse_embedding=sparse_embedding, query_embedding=embedding, top_k=10, return_embedding=True + ) + assert len(results) == 10 + + for document in results: + assert document.sparse_embedding + assert document.embedding + + def test_query_hybrid_fail_without_sparse_embedding(self, document_store): + sparse_embedding = SparseEmbedding(indices=[0, 1, 2, 3], values=[0.1, 0.8, 0.05, 0.33]) + embedding = [0.1] * 768 + + with pytest.raises(QdrantStoreError): + + document_store._query_hybrid( + query_sparse_embedding=sparse_embedding, + query_embedding=embedding, + ) + + def test_query_hybrid_search_batch_failure(self): + document_store = QdrantDocumentStore(location=":memory:", use_sparse_embeddings=True) + + sparse_embedding = SparseEmbedding(indices=[0, 1, 2, 3], values=[0.1, 0.8, 0.05, 0.33]) + embedding = [0.1] * 768 + + with patch.object(document_store.client, "search_batch", side_effect=Exception("search_batch error")): + + with pytest.raises(QdrantStoreError): + document_store._query_hybrid(query_sparse_embedding=sparse_embedding, query_embedding=embedding) + + @patch("haystack_integrations.document_stores.qdrant.document_store.reciprocal_rank_fusion") + def test_query_hybrid_reciprocal_rank_fusion_failure(self, mocked_fusion): + document_store = QdrantDocumentStore(location=":memory:", use_sparse_embeddings=True) + + sparse_embedding = SparseEmbedding(indices=[0, 1, 2, 3], values=[0.1, 0.8, 0.05, 0.33]) + embedding = [0.1] * 768 + + mocked_fusion.side_effect = Exception("reciprocal_rank_fusion error") + + with pytest.raises(QdrantStoreError): + document_store._query_hybrid(query_sparse_embedding=sparse_embedding, query_embedding=embedding) diff --git a/integrations/qdrant/tests/test_retriever.py b/integrations/qdrant/tests/test_retriever.py index d03b1c940..47fec5968 100644 --- a/integrations/qdrant/tests/test_retriever.py +++ b/integrations/qdrant/tests/test_retriever.py @@ -1,6 +1,6 @@ from typing import List +from unittest.mock import Mock -import numpy as np from haystack.dataclasses import Document, SparseEmbedding from haystack.testing.document_store import ( FilterableDocsFixtureMixin, @@ -8,6 +8,7 @@ ) from haystack_integrations.components.retrievers.qdrant import ( QdrantEmbeddingRetriever, + QdrantHybridRetriever, QdrantSparseEmbeddingRetriever, ) from haystack_integrations.document_stores.qdrant import QdrantDocumentStore @@ -222,23 +223,12 @@ def test_from_dict(self): assert retriever._scale_score is False assert retriever._return_embedding is True - def _generate_mocked_sparse_embedding(self, n): - list_of_sparse_vectors = [] - for _ in range(n): - random_indice_length = np.random.randint(3, 15) - data = { - "indices": list(range(random_indice_length)), - "values": [np.random.random_sample() for _ in range(random_indice_length)], - } - list_of_sparse_vectors.append(data) - return list_of_sparse_vectors - - def test_run(self, filterable_docs: List[Document]): + def test_run(self, filterable_docs: List[Document], generate_sparse_embedding): document_store = QdrantDocumentStore(location=":memory:", index="Boi", use_sparse_embeddings=True) # Add fake sparse embedding to documents for doc in filterable_docs: - doc.sparse_embedding = SparseEmbedding.from_dict(self._generate_mocked_sparse_embedding(1)[0]) + doc.sparse_embedding = generate_sparse_embedding() document_store.write_documents(filterable_docs) retriever = QdrantSparseEmbeddingRetriever(document_store=document_store) @@ -252,3 +242,112 @@ def test_run(self, filterable_docs: List[Document]): for document in results: assert document.sparse_embedding + + +class TestQdrantHybridRetriever: + def test_init_default(self): + document_store = QdrantDocumentStore(location=":memory:", index="test", use_sparse_embeddings=True) + retriever = QdrantHybridRetriever(document_store=document_store) + + assert retriever._document_store == document_store + assert retriever._filters is None + assert retriever._top_k == 10 + assert retriever._return_embedding is False + + def test_to_dict(self): + document_store = QdrantDocumentStore(location=":memory:", index="test") + retriever = QdrantHybridRetriever(document_store=document_store, top_k=5, return_embedding=True) + res = retriever.to_dict() + assert res == { + "type": "haystack_integrations.components.retrievers.qdrant.retriever.QdrantHybridRetriever", + "init_parameters": { + "document_store": { + "type": "haystack_integrations.document_stores.qdrant.document_store.QdrantDocumentStore", + "init_parameters": { + "location": ":memory:", + "url": None, + "port": 6333, + "grpc_port": 6334, + "prefer_grpc": False, + "https": None, + "api_key": None, + "prefix": None, + "timeout": None, + "host": None, + "path": None, + "index": "test", + "embedding_dim": 768, + "on_disk": False, + "content_field": "content", + "name_field": "name", + "embedding_field": "embedding", + "use_sparse_embeddings": False, + "similarity": "cosine", + "return_embedding": False, + "progress_bar": True, + "duplicate_documents": "overwrite", + "recreate_index": False, + "shard_number": None, + "replication_factor": None, + "write_consistency_factor": None, + "on_disk_payload": None, + "hnsw_config": None, + "optimizers_config": None, + "wal_config": None, + "quantization_config": None, + "init_from": None, + "wait_result_from_api": True, + "metadata": {}, + "write_batch_size": 100, + "scroll_size": 10000, + "payload_fields_to_index": None, + }, + }, + "filters": None, + "top_k": 5, + "return_embedding": True, + }, + } + + def test_from_dict(self): + data = { + "type": "haystack_integrations.components.retrievers.qdrant.retriever.QdrantHybridRetriever", + "init_parameters": { + "document_store": { + "init_parameters": {"location": ":memory:", "index": "test"}, + "type": "haystack_integrations.document_stores.qdrant.document_store.QdrantDocumentStore", + }, + "filters": None, + "top_k": 5, + "return_embedding": True, + }, + } + retriever = QdrantHybridRetriever.from_dict(data) + assert isinstance(retriever._document_store, QdrantDocumentStore) + assert retriever._document_store.index == "test" + assert retriever._filters is None + assert retriever._top_k == 5 + assert retriever._return_embedding + + def test_run(self): + mock_store = Mock(spec=QdrantDocumentStore) + sparse_embedding = SparseEmbedding(indices=[0, 1, 2, 3], values=[0.1, 0.8, 0.05, 0.33]) + mock_store._query_hybrid.return_value = [ + Document(content="Test doc", embedding=[0.1, 0.2], sparse_embedding=sparse_embedding) + ] + + retriever = QdrantHybridRetriever(document_store=mock_store) + res = retriever.run( + query_embedding=[0.5, 0.7], query_sparse_embedding=SparseEmbedding(indices=[0, 5], values=[0.1, 0.7]) + ) + + call_args = mock_store._query_hybrid.call_args + assert call_args[1]["query_embedding"] == [0.5, 0.7] + assert call_args[1]["query_sparse_embedding"].indices == [0, 5] + assert call_args[1]["query_sparse_embedding"].values == [0.1, 0.7] + assert call_args[1]["top_k"] == 10 + assert call_args[1]["return_embedding"] is False + + assert res["documents"][0].content == "Test doc" + assert res["documents"][0].embedding == [0.1, 0.2] + assert res["documents"][0].sparse_embedding == sparse_embedding