Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Qdrant - add hybrid retriever #675

Merged
merged 7 commits into from
Apr 24, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
#
# SPDX-License-Identifier: Apache-2.0

from .retriever import QdrantEmbeddingRetriever, QdrantSparseEmbeddingRetriever
from .retriever import QdrantEmbeddingRetriever, QdrantHybridRetriever, QdrantSparseEmbeddingRetriever

__all__ = ("QdrantEmbeddingRetriever", "QdrantSparseEmbeddingRetriever")
__all__ = ("QdrantEmbeddingRetriever", "QdrantSparseEmbeddingRetriever", "QdrantHybridRetriever")
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ class QdrantEmbeddingRetriever:
":memory:",
recreate_index=True,
return_embedding=True,
wait_result_from_api=True,
)

document_store.write_documents([Document(content="test", embedding=[0.5]*768)])

retriever = QdrantEmbeddingRetriever(document_store=document_store)

# using a fake vector to keep the example simple
Expand Down Expand Up @@ -112,7 +114,7 @@ def run(
The retrieved documents.

"""
docs = self._document_store.query_by_embedding(
docs = self._document_store._query_by_embedding(
query_embedding=query_embedding,
filters=filters or self._filters,
top_k=top_k or self._top_k,
Expand All @@ -136,10 +138,14 @@ class QdrantSparseEmbeddingRetriever:

document_store = QdrantDocumentStore(
":memory:",
use_sparse_embeddings=True,
recreate_index=True,
return_embedding=True,
wait_result_from_api=True,
)

doc = Document(content="test", sparse_embedding=SparseEmbedding(indices=[0, 3, 5], values=[0.1, 0.5, 0.12]))
document_store.write_documents([doc])

retriever = QdrantSparseEmbeddingRetriever(document_store=document_store)
sparse_embedding = SparseEmbedding(indices=[0, 1, 2, 3], values=[0.1, 0.8, 0.05, 0.33])
retriever.run(query_sparse_embedding=sparse_embedding)
Expand Down Expand Up @@ -196,7 +202,7 @@ def to_dict(self) -> Dict[str, Any]:
return d

@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "QdrantEmbeddingRetriever":
def from_dict(cls, data: Dict[str, Any]) -> "QdrantSparseEmbeddingRetriever":
"""
Deserializes the component from a dictionary.

Expand Down Expand Up @@ -230,7 +236,7 @@ def run(
The retrieved documents.

"""
docs = self._document_store.query_by_sparse(
docs = self._document_store._query_by_sparse(
query_sparse_embedding=query_sparse_embedding,
filters=filters or self._filters,
top_k=top_k or self._top_k,
Expand All @@ -239,3 +245,124 @@ def run(
)

return {"documents": docs}


@component
class QdrantHybridRetriever:
"""
A component for retrieving documents from an QdrantDocumentStore using both dense and sparse vectors
and fusing the results using Reciprocal Rank Fusion.

Usage example:
```python
from haystack_integrations.components.retrievers.qdrant import QdrantHybridRetriever
from haystack_integrations.document_stores.qdrant import QdrantDocumentStore
from haystack.dataclasses.sparse_embedding import SparseEmbedding

document_store = QdrantDocumentStore(
":memory:",
use_sparse_embeddings=True,
recreate_index=True,
return_embedding=True,
wait_result_from_api=True,
)

doc = Document(content="test",
embedding=[0.5]*768,
sparse_embedding=SparseEmbedding(indices=[0, 3, 5], values=[0.1, 0.5, 0.12]))

document_store.write_documents([doc])

retriever = QdrantHybridRetriever(document_store=document_store)
embedding = [0.1]*768
sparse_embedding = SparseEmbedding(indices=[0, 1, 2, 3], values=[0.1, 0.8, 0.05, 0.33])
retriever.run(query_embedding=embedding, query_sparse_embedding=sparse_embedding)
```
"""

def __init__(
self,
document_store: QdrantDocumentStore,
filters: Optional[Dict[str, Any]] = None,
top_k: int = 10,
return_embedding: bool = False,
):
"""
Create a QdrantHybridRetriever component.

:param document_store: An instance of QdrantDocumentStore.
:param filters: A dictionary with filters to narrow down the search space. Default is None.
:param top_k: The maximum number of documents to retrieve. Default is 10.
:param return_embedding: Whether to return the embeddings of the retrieved Documents. Default is False.
anakin87 marked this conversation as resolved.
Show resolved Hide resolved

:raises ValueError: If 'document_store' is not an instance of QdrantDocumentStore.
"""

if not isinstance(document_store, QdrantDocumentStore):
msg = "document_store must be an instance of QdrantDocumentStore"
raise ValueError(msg)

self._document_store = document_store
self._filters = filters
self._top_k = top_k
self._return_embedding = return_embedding

def to_dict(self) -> Dict[str, Any]:
"""
Serializes the component to a dictionary.

:returns:
Dictionary with serialized data.
"""
return default_to_dict(
self,
document_store=self._document_store.to_dict(),
filters=self._filters,
top_k=self._top_k,
return_embedding=self._return_embedding,
)

@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "QdrantHybridRetriever":
"""
Deserializes the component from a dictionary.

:param data:
Dictionary to deserialize from.
:returns:
Deserialized component.
"""
document_store = QdrantDocumentStore.from_dict(data["init_parameters"]["document_store"])
data["init_parameters"]["document_store"] = document_store
return default_from_dict(cls, data)

@component.output_types(documents=List[Document])
def run(
self,
query_embedding: List[float],
query_sparse_embedding: SparseEmbedding,
filters: Optional[Dict[str, Any]] = None,
top_k: Optional[int] = None,
return_embedding: Optional[bool] = None,
):
"""
Run the Sparse Embedding Retriever on the given input data.

:param query_embedding: Dense embedding of the query.
:param query_sparse_embedding: Sparse embedding of the query.
:param filters: A dictionary with filters to narrow down the search space.
:param top_k: The maximum number of documents to return.
:param return_embedding: Whether to return the embedding of the retrieved Documents.
:returns:
The retrieved documents.

"""
docs = self._document_store._query_hybrid(
query_embedding=query_embedding,
query_sparse_embedding=query_sparse_embedding,
filters=filters or self._filters,
top_k=top_k or self._top_k,
return_embedding=return_embedding or self._return_embedding,
)

return {"documents": docs}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from qdrant_client import grpc
from qdrant_client.http import models as rest
from qdrant_client.http.exceptions import UnexpectedResponse
from qdrant_client.hybrid.fusion import reciprocal_rank_fusion
from tqdm import tqdm

from .converters import (
Expand Down Expand Up @@ -307,7 +308,7 @@ def get_documents_by_id(
)
return documents

def query_by_sparse(
def _query_by_sparse(
self,
query_sparse_embedding: SparseEmbedding,
filters: Optional[Dict[str, Any]] = None,
Expand Down Expand Up @@ -349,7 +350,7 @@ def query_by_sparse(
document.score = score
return results

def query_by_embedding(
def _query_by_embedding(
self,
query_embedding: List[float],
filters: Optional[Dict[str, Any]] = None,
Expand Down Expand Up @@ -383,6 +384,77 @@ def query_by_embedding(
document.score = score
return results

def _query_hybrid(
self,
query_embedding: List[float],
query_sparse_embedding: SparseEmbedding,
filters: Optional[Dict[str, Any]] = None,
top_k: int = 10,
return_embedding: bool = False,
) -> List[Document]:
"""
Retrieves documents based on both dense and sparse embeddings
and fuses the results using Reciprocal Rank Fusion.
anakin87 marked this conversation as resolved.
Show resolved Hide resolved

This method is not mean to be part of the public interface of
`QdrantDocumentStore` nor called directly.
`QdrantHybridRetriever` uses this method directly and is the public interface for it.
anakin87 marked this conversation as resolved.
Show resolved Hide resolved

:param query_embedding: Dense embedding of the query.
:param query_sparse_embedding: Sparse embedding of the query.
:param filters: Filters applied to the retrieved Documents.
:param top_k: Maximum number of Documents to return.
:param return_embedding: Whether to return the embeddings of the retrieved documents.

:returns: List of Document that are most similar to `query_embedding` and `query_sparse_embedding`.
"""

# this implementation is based on that of Python Qdrant client
anakin87 marked this conversation as resolved.
Show resolved Hide resolved
# https://github.com/qdrant/qdrant-client/blob/8e3ea58f781e4110d11c0a6985b5e6bb66b85d33/qdrant_client/qdrant_fastembed.py#L519
if not self.use_sparse_embeddings:
message = (
"You are trying to query using sparse embeddings, but the Document Store "
"was initialized with `use_sparse_embeddings=False`. "
)
raise QdrantStoreError(message)

qdrant_filters = convert_filters_to_qdrant(filters)

sparse_request = rest.SearchRequest(
masci marked this conversation as resolved.
Show resolved Hide resolved
vector=rest.NamedSparseVector(
name=SPARSE_VECTORS_NAME,
vector=rest.SparseVector(
indices=query_sparse_embedding.indices,
values=query_sparse_embedding.values,
),
),
filter=qdrant_filters,
limit=top_k,
with_payload=True,
with_vector=return_embedding,
)

dense_request = rest.SearchRequest(
vector=rest.NamedVector(
name=DENSE_VECTORS_NAME,
vector=query_embedding,
),
filter=qdrant_filters,
limit=top_k,
with_payload=True,
with_vector=return_embedding,
)

dense_request_response, sparse_request_response = self.client.search_batch(
collection_name=self.index, requests=[dense_request, sparse_request]
)

points = reciprocal_rank_fusion(responses=[dense_request_response, sparse_request_response], limit=top_k)

results = [convert_qdrant_point_to_haystack_document(point, use_sparse_embeddings=True) for point in points]

return results

def _get_distance(self, similarity: str) -> rest.Distance:
try:
return self.SIMILARITY[similarity]
Expand Down
55 changes: 53 additions & 2 deletions integrations/qdrant/tests/test_document_store.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,41 @@
from typing import List

import numpy as np
import pytest
from haystack import Document
from haystack.dataclasses import SparseEmbedding
from haystack.document_stores.errors import DuplicateDocumentError
from haystack.document_stores.types import DuplicatePolicy
from haystack.testing.document_store import (
CountDocumentsTest,
DeleteDocumentsTest,
WriteDocumentsTest,
_random_embeddings,
)
from haystack_integrations.document_stores.qdrant import QdrantDocumentStore
from haystack_integrations.document_stores.qdrant.document_store import QdrantDocumentStore, QdrantStoreError


class TestQdrantStoreBaseTests(CountDocumentsTest, WriteDocumentsTest, DeleteDocumentsTest):
def _generate_mocked_sparse_embedding(n):
anakin87 marked this conversation as resolved.
Show resolved Hide resolved
list_of_sparse_vectors = []
for _ in range(n):
random_indice_length = np.random.randint(3, 15)
data = {
"indices": list(range(random_indice_length)),
"values": [np.random.random_sample() for _ in range(random_indice_length)],
}
list_of_sparse_vectors.append(data)
return list_of_sparse_vectors


class TestQdrantDocumentStore(CountDocumentsTest, WriteDocumentsTest, DeleteDocumentsTest):
@pytest.fixture
def document_store(self) -> QdrantDocumentStore:
return QdrantDocumentStore(
":memory:",
recreate_index=True,
return_embedding=True,
wait_result_from_api=True,
use_sparse_embeddings=False,
)

def assert_documents_are_equal(self, received: List[Document], expected: List[Document]):
Expand All @@ -39,3 +55,38 @@ def test_write_documents(self, document_store: QdrantDocumentStore):
assert document_store.write_documents(docs) == 1
with pytest.raises(DuplicateDocumentError):
document_store.write_documents(docs, DuplicatePolicy.FAIL)

def test_query_hybrid(self):
document_store = QdrantDocumentStore(location=":memory:", use_sparse_embeddings=True)

docs = []
for i in range(20):
sparse_embedding = SparseEmbedding.from_dict(_generate_mocked_sparse_embedding(1)[0])
docs.append(
Document(content=f"doc {i}", sparse_embedding=sparse_embedding, embedding=_random_embeddings(768))
)

document_store.write_documents(docs)

sparse_embedding = SparseEmbedding(indices=[0, 1, 2, 3], values=[0.1, 0.8, 0.05, 0.33])
embedding = [0.1] * 768

results: List[Document] = document_store._query_hybrid(
query_sparse_embedding=sparse_embedding, query_embedding=embedding, top_k=10, return_embedding=True
)
assert len(results) == 10

for document in results:
assert document.sparse_embedding
assert document.embedding

def test_query_hybrid_fail_without_sparse_embedding(self, document_store):
sparse_embedding = SparseEmbedding(indices=[0, 1, 2, 3], values=[0.1, 0.8, 0.05, 0.33])
embedding = [0.1] * 768

with pytest.raises(QdrantStoreError):

document_store._query_hybrid(
query_sparse_embedding=sparse_embedding,
query_embedding=embedding,
)
Loading