Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Improve OpenSearchDocumentStore.__init__ arguments #739

Merged
merged 11 commits into from
May 27, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -27,68 +27,93 @@
# all be mapped to scores ~1.
BM25_SCALING_FACTOR = 8

DEFAULT_SETTINGS = {"index.knn": True}
DEFAULT_MAX_CHUNK_BYTES = 100 * 1024 * 1024


class OpenSearchDocumentStore:
def __init__(
self,
*,
hosts: Optional[Hosts] = None,
index: str = "default",
max_chunk_bytes: int = DEFAULT_MAX_CHUNK_BYTES,
embedding_dim: int = 768,
method: Optional[Dict[str, Any]] = None,
mappings: Optional[Dict[str, Any]] = None,
settings: Optional[Dict[str, Any]] = DEFAULT_SETTINGS,
**kwargs,
):
"""
Creates a new OpenSearchDocumentStore instance.

For more information on connection parameters, see the [official OpenSearch documentation](https://opensearch.org/docs/latest/clients/python-low-level/#connecting-to-opensearch)
The ``embeddings_dim``, ``method``, ``mappings``, and ``settings`` arguments are only used if the index does not
exists and needs to be created. If the index already exists, its current configurations will be used.

For the full list of supported kwargs, see the [official OpenSearch reference](https://opensearch-project.github.io/opensearch-py/api-ref/clients/opensearch_client.html)
For more information on connection parameters, see the [official OpenSearch documentation](https://opensearch.org/docs/latest/clients/python-low-level/#connecting-to-opensearch)

:param hosts: List of hosts running the OpenSearch client. Defaults to None
:param index: Name of index in OpenSearch, if it doesn't exist it will be created. Defaults to "default"
:param **kwargs: Optional arguments that ``OpenSearch`` takes.
:param max_chunk_bytes: Maximum size of the requests in bytes. Defaults to 100MB
:param embedding_dim: Dimension of the embeddings. Defaults to 768
:param method: The method definition of the underlying configuration of the approximate k-NN algorithm. Please
see the [official OpenSearch docs](https://opensearch.org/docs/latest/search-plugins/knn/knn-index/#method-definitions)
for more information. Defaults to None
:param mappings: The mapping of how the documents are stored and indexed. Please see the [official OpenSearch docs](https://opensearch.org/docs/latest/field-types/)
for more information. If None, it uses the embedding_dim and method arguments to create default mappings.
Defaults to None
:param settings: The settings of the index to be created. Please see the [official OpenSearch docs](https://opensearch.org/docs/latest/search-plugins/knn/knn-index/#index-settings)
for more information. Defaults to {"index.knn": True}
:param **kwargs: Optional arguments that ``OpenSearch`` takes. For the full list of supported kwargs,
see the [official OpenSearch reference](https://opensearch-project.github.io/opensearch-py/api-ref/clients/opensearch_client.html)
"""
self._client = None
self._hosts = hosts
self._index = index
self._max_chunk_bytes = max_chunk_bytes
self._embedding_dim = embedding_dim
self._method = method
self._mappings = mappings or self._get_default_mappings()
self._settings = settings
self._kwargs = kwargs

def _get_default_mappings(self) -> Dict[str, Any]:
default_mappings: Dict[str, Any] = {
"properties": {
"embedding": {"type": "knn_vector", "index": True, "dimension": self._embedding_dim},
"content": {"type": "text"},
},
"dynamic_templates": [
{
"strings": {
"match_mapping_type": "string",
"mapping": {"type": "keyword"},
}
}
],
}
if self._method:
default_mappings["properties"]["embedding"]["method"] = self._method
return default_mappings

@property
def client(self) -> OpenSearch:
if not self._client:
self._client = OpenSearch(self._hosts, **self._kwargs)
# Check client connection, this will raise if not connected
self._client.info() # type:ignore

if self._client.indices.exists(index=self._index): # type:ignore
logger.debug(
"The index '%s' already exists. The `embedding_dim`, `method`, `mappings`, and "
"`settings` values will be ignored.",
self._index,
)
else:
# Create the index if it doesn't exist
if not self._client.indices.exists(index=self._index): # type:ignore
# configure fallback mapping for the embedding field
method = self._kwargs.get("method", None)
embedding_dim = self._kwargs.get("embedding_dim", 768)
default_mappings: Dict[str, Any] = {
"properties": {
"embedding": {"type": "knn_vector", "index": True, "dimension": embedding_dim},
"content": {"type": "text"},
},
"dynamic_templates": [
{
"strings": {
"match_mapping_type": "string",
"mapping": {
"type": "keyword",
},
}
}
],
}
if method:
default_mappings["properties"]["embedding"]["method"] = method

body = {
"mappings": self._kwargs.get("mappings", default_mappings),
"settings": self._kwargs.get("settings", {"index.knn": True}),
}
self._client.indices.create(index=self._index, body=body) # type:ignore
body = {"mappings": self._mappings, "settings": self._settings}

self._client.indices.create(index=self._index, body=body) # type:ignore
return self._client

def to_dict(self) -> Dict[str, Any]:
Expand All @@ -105,6 +130,11 @@ def to_dict(self) -> Dict[str, Any]:
self,
hosts=self._hosts,
index=self._index,
max_chunk_bytes=self._max_chunk_bytes,
embedding_dim=self._embedding_dim,
method=self._method,
mappings=self._mappings,
settings=self._settings,
**self._kwargs,
)

Expand Down Expand Up @@ -178,6 +208,7 @@ def write_documents(self, documents: List[Document], policy: DuplicatePolicy = D
refresh="wait_for",
index=self._index,
raise_on_error=False,
max_chunk_bytes=self._max_chunk_bytes,
EdAbati marked this conversation as resolved.
Show resolved Hide resolved
)

if errors:
Expand Down Expand Up @@ -234,6 +265,7 @@ def delete_documents(self, document_ids: List[str]) -> None:
refresh="wait_for",
index=self._index,
raise_on_error=False,
max_chunk_bytes=self._max_chunk_bytes,
)

def _bm25_retrieval(
Expand Down
14 changes: 14 additions & 0 deletions integrations/opensearch/tests/test_bm25_retriever.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from haystack.dataclasses import Document
from haystack_integrations.components.retrievers.opensearch import OpenSearchBM25Retriever
from haystack_integrations.document_stores.opensearch import OpenSearchDocumentStore
from haystack_integrations.document_stores.opensearch.document_store import DEFAULT_MAX_CHUNK_BYTES


def test_init_default():
Expand All @@ -27,8 +28,21 @@ def test_to_dict(_mock_opensearch_client):
"init_parameters": {
"document_store": {
"init_parameters": {
"embedding_dim": 768,
"hosts": "some fake host",
"index": "default",
"mappings": {
"dynamic_templates": [
{"strings": {"mapping": {"type": "keyword"}, "match_mapping_type": "string"}}
],
"properties": {
"content": {"type": "text"},
"embedding": {"dimension": 768, "index": True, "type": "knn_vector"},
},
},
"max_chunk_bytes": DEFAULT_MAX_CHUNK_BYTES,
"method": None,
"settings": {"index.knn": True},
},
"type": "haystack_integrations.document_stores.opensearch.document_store.OpenSearchDocumentStore",
},
Expand Down
53 changes: 49 additions & 4 deletions integrations/opensearch/tests/test_document_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from haystack.document_stores.types import DuplicatePolicy
from haystack.testing.document_store import DocumentStoreBaseTests
from haystack_integrations.document_stores.opensearch import OpenSearchDocumentStore
from haystack_integrations.document_stores.opensearch.document_store import DEFAULT_MAX_CHUNK_BYTES
from opensearchpy.exceptions import RequestError


Expand All @@ -21,8 +22,19 @@ def test_to_dict(_mock_opensearch_client):
assert res == {
"type": "haystack_integrations.document_stores.opensearch.document_store.OpenSearchDocumentStore",
"init_parameters": {
"embedding_dim": 768,
"hosts": "some hosts",
"index": "default",
"mappings": {
"dynamic_templates": [{"strings": {"mapping": {"type": "keyword"}, "match_mapping_type": "string"}}],
"properties": {
"content": {"type": "text"},
"embedding": {"dimension": 768, "index": True, "type": "knn_vector"},
},
},
"max_chunk_bytes": DEFAULT_MAX_CHUNK_BYTES,
"method": None,
"settings": {"index.knn": True},
},
}

Expand All @@ -31,14 +43,29 @@ def test_to_dict(_mock_opensearch_client):
def test_from_dict(_mock_opensearch_client):
data = {
"type": "haystack_integrations.document_stores.opensearch.document_store.OpenSearchDocumentStore",
"init_parameters": {
"hosts": "some hosts",
"index": "default",
},
"init_parameters": {"hosts": "some hosts", "index": "default", "max_chunk_bytes": 1000, "embedding_dim": 1536},
}
document_store = OpenSearchDocumentStore.from_dict(data)
assert document_store._hosts == "some hosts"
assert document_store._index == "default"
assert document_store._max_chunk_bytes == 1000
assert document_store._embedding_dim == 1536
assert document_store._method is None
assert document_store._mappings == {
"properties": {
"embedding": {"type": "knn_vector", "index": True, "dimension": 1536},
"content": {"type": "text"},
},
"dynamic_templates": [
{
"strings": {
"match_mapping_type": "string",
"mapping": {"type": "keyword"},
}
}
],
}
assert document_store._settings == {"index.knn": True}


@patch("haystack_integrations.document_stores.opensearch.document_store.OpenSearch")
Expand All @@ -47,6 +74,17 @@ def test_init_is_lazy(_mock_opensearch_client):
_mock_opensearch_client.assert_not_called()


@patch("haystack_integrations.document_stores.opensearch.document_store.OpenSearch")
def test_get_default_mappings(_mock_opensearch_client):
store = OpenSearchDocumentStore(hosts="testhost", embedding_dim=1536, method={"name": "hnsw"})
assert store._mappings["properties"]["embedding"] == {
"type": "knn_vector",
"index": True,
"dimension": 1536,
"method": {"name": "hnsw"},
}


@pytest.mark.integration
class TestDocumentStore(DocumentStoreBaseTests):
"""
Expand Down Expand Up @@ -339,3 +377,10 @@ def test_write_documents_with_badly_formatted_bulk_errors(self, mock_bulk, docum
with pytest.raises(DocumentStoreError) as e:
document_store.write_documents([Document(content="Hello world")])
e.match(f"{error}")

@patch("haystack_integrations.document_stores.opensearch.document_store.bulk")
def test_write_documents_max_chunk_bytes(self, mock_bulk, document_store):
mock_bulk.return_value = (1, [])
document_store.write_documents([Document(content="Hello world")])

assert mock_bulk.call_args.kwargs["max_chunk_bytes"] == DEFAULT_MAX_CHUNK_BYTES
29 changes: 29 additions & 0 deletions integrations/opensearch/tests/test_embedding_retriever.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from haystack.dataclasses import Document
from haystack_integrations.components.retrievers.opensearch import OpenSearchEmbeddingRetriever
from haystack_integrations.document_stores.opensearch import OpenSearchDocumentStore
from haystack_integrations.document_stores.opensearch.document_store import DEFAULT_MAX_CHUNK_BYTES


def test_init_default():
Expand All @@ -27,8 +28,36 @@ def test_to_dict(_mock_opensearch_client):
"init_parameters": {
"document_store": {
"init_parameters": {
"embedding_dim": 768,
"hosts": "some fake host",
"index": "default",
"mappings": {
"dynamic_templates": [
{
"strings": {
"mapping": {
"type": "keyword",
},
"match_mapping_type": "string",
},
},
],
"properties": {
"content": {
"type": "text",
},
"embedding": {
"dimension": 768,
"index": True,
"type": "knn_vector",
},
},
},
"max_chunk_bytes": DEFAULT_MAX_CHUNK_BYTES,
"method": None,
"settings": {
"index.knn": True,
},
},
"type": "haystack_integrations.document_stores.opensearch.document_store.OpenSearchDocumentStore",
},
Expand Down