Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Improve OpenSearchDocumentStore.__init__ arguments #739

Merged
merged 11 commits into from
May 27, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -27,64 +27,90 @@
# all be mapped to scores ~1.
BM25_SCALING_FACTOR = 8

DEFAULT_SETTINGS = {"index.knn": True}
DEFAULT_MAX_CHUNK_BYTES = 100 * 1024 * 1024


class OpenSearchDocumentStore:
def __init__(
self,
*,
hosts: Optional[Hosts] = None,
index: str = "default",
max_chunk_bytes: int = DEFAULT_MAX_CHUNK_BYTES,
embedding_dim: int = 768,
method: Optional[Dict[str, Any]] = None,
mappings: Optional[Dict[str, Any]] = None,
settings: Optional[Dict[str, Any]] = DEFAULT_SETTINGS,
**kwargs,
):
"""
Creates a new OpenSearchDocumentStore instance.

For more information on connection parameters, see the [official OpenSearch documentation](https://opensearch.org/docs/latest/clients/python-low-level/#connecting-to-opensearch)
The ``embeddings_dim``, ``method``, ``mappings``, and ``settings`` arguments are only used if the index does not
exists and needs to be created. If the index already exists, its current configurations will be used.

For the full list of supported kwargs, see the [official OpenSearch reference](https://opensearch-project.github.io/opensearch-py/api-ref/clients/opensearch_client.html)
For more information on connection parameters, see the [official OpenSearch documentation](https://opensearch.org/docs/latest/clients/python-low-level/#connecting-to-opensearch)

:param hosts: List of hosts running the OpenSearch client. Defaults to None
:param index: Name of index in OpenSearch, if it doesn't exist it will be created. Defaults to "default"
:param **kwargs: Optional arguments that ``OpenSearch`` takes.
:param max_chunk_bytes: Maximum size of the requests in bytes. Defaults to 100MB
:param embedding_dim: Dimension of the embeddings. Defaults to 768
:param method: The method definition of the underlying configuration of the approximate k-NN algorithm. Please
see the [official OpenSearch docs](https://opensearch.org/docs/latest/search-plugins/knn/knn-index/#method-definitions)
for more information. Defaults to None
:param mappings: The mapping of how the documents are stored and indexed. Please see the [official OpenSearch docs](https://opensearch.org/docs/latest/field-types/)
for more information. Defaults to None
:param settings: The settings of the index to be created. Please see the [official OpenSearch docs](https://opensearch.org/docs/latest/search-plugins/knn/knn-index/#index-settings)
for more information. Defaults to {"index.knn": True}
:param **kwargs: Optional arguments that ``OpenSearch`` takes. For the full list of supported kwargs,
see the [official OpenSearch reference](https://opensearch-project.github.io/opensearch-py/api-ref/clients/opensearch_client.html)
"""
self._hosts = hosts
self._client = OpenSearch(hosts, **kwargs)
self._index = index
self._max_chunk_bytes = max_chunk_bytes
self._kwargs = kwargs

# Check client connection, this will raise if not connected
self._client.info()

# configure mapping for the embedding field
embedding_dim = kwargs.get("embedding_dim", 768)
method = kwargs.get("method", None)

mappings: Dict[str, Any] = {
"properties": {
"embedding": {"type": "knn_vector", "index": True, "dimension": embedding_dim},
"content": {"type": "text"},
},
"dynamic_templates": [
{
"strings": {
"match_mapping_type": "string",
"mapping": {
"type": "keyword",
},
if self._client.indices.exists(index=index):
logger.debug(
"The index '%s' already exists. The `embedding_dim`, `method`, `mappings`, and "
"`settings` values will be ignored.",
index,
)
# Create the index if it doesn't exist
else:
# configure mapping for the embedding field
self._embedding_dim = embedding_dim
self._method = method

default_mappings: Dict[str, Any] = {
"properties": {
"embedding": {"type": "knn_vector", "index": True, "dimension": self._embedding_dim},
"content": {"type": "text"},
},
"dynamic_templates": [
{
"strings": {
"match_mapping_type": "string",
"mapping": {
"type": "keyword",
},
}
}
}
],
}
if method:
mappings["properties"]["embedding"]["method"] = method
],
}
if self._method:
default_mappings["properties"]["embedding"]["method"] = self._method

mappings = kwargs.get("mappings", mappings)
settings = kwargs.get("settings", {"index.knn": True})
self._mappings = mappings or default_mappings
self._settings = settings

body = {"mappings": mappings, "settings": settings}
body = {"mappings": self._mappings, "settings": self._settings}

# Create the index if it doesn't exist
if not self._client.indices.exists(index=index):
self._client.indices.create(index=index, body=body)

def to_dict(self) -> Dict[str, Any]:
Expand All @@ -101,6 +127,7 @@ def to_dict(self) -> Dict[str, Any]:
self,
hosts=self._hosts,
index=self._index,
max_chunk_bytes=self._max_chunk_bytes,
**self._kwargs,
)
EdAbati marked this conversation as resolved.
Show resolved Hide resolved

Expand Down Expand Up @@ -174,6 +201,7 @@ def write_documents(self, documents: List[Document], policy: DuplicatePolicy = D
refresh="wait_for",
index=self._index,
raise_on_error=False,
max_chunk_bytes=self._max_chunk_bytes,
EdAbati marked this conversation as resolved.
Show resolved Hide resolved
)

if errors:
Expand Down
2 changes: 2 additions & 0 deletions integrations/opensearch/tests/test_bm25_retriever.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from haystack.dataclasses import Document
from haystack_integrations.components.retrievers.opensearch import OpenSearchBM25Retriever
from haystack_integrations.document_stores.opensearch import OpenSearchDocumentStore
from haystack_integrations.document_stores.opensearch.document_store import DEFAULT_MAX_CHUNK_BYTES


def test_init_default():
Expand All @@ -29,6 +30,7 @@ def test_to_dict(_mock_opensearch_client):
"init_parameters": {
"hosts": "some fake host",
"index": "default",
"max_chunk_bytes": DEFAULT_MAX_CHUNK_BYTES,
},
"type": "haystack_integrations.document_stores.opensearch.document_store.OpenSearchDocumentStore",
},
Expand Down
15 changes: 11 additions & 4 deletions integrations/opensearch/tests/test_document_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from haystack.document_stores.types import DuplicatePolicy
from haystack.testing.document_store import DocumentStoreBaseTests
from haystack_integrations.document_stores.opensearch import OpenSearchDocumentStore
from haystack_integrations.document_stores.opensearch.document_store import DEFAULT_MAX_CHUNK_BYTES
from opensearchpy.exceptions import RequestError


Expand All @@ -23,6 +24,7 @@ def test_to_dict(_mock_opensearch_client):
"init_parameters": {
"hosts": "some hosts",
"index": "default",
"max_chunk_bytes": DEFAULT_MAX_CHUNK_BYTES,
},
}

Expand All @@ -31,14 +33,12 @@ def test_to_dict(_mock_opensearch_client):
def test_from_dict(_mock_opensearch_client):
data = {
"type": "haystack_integrations.document_stores.opensearch.document_store.OpenSearchDocumentStore",
"init_parameters": {
"hosts": "some hosts",
"index": "default",
},
"init_parameters": {"hosts": "some hosts", "index": "default", "max_chunk_bytes": 1000},
}
document_store = OpenSearchDocumentStore.from_dict(data)
assert document_store._hosts == "some hosts"
assert document_store._index == "default"
assert document_store._max_chunk_bytes == 1000


@pytest.mark.integration
Expand Down Expand Up @@ -333,3 +333,10 @@ def test_write_documents_with_badly_formatted_bulk_errors(self, mock_bulk, docum
with pytest.raises(DocumentStoreError) as e:
document_store.write_documents([Document(content="Hello world")])
e.match(f"{error}")

@patch("haystack_integrations.document_stores.opensearch.document_store.bulk")
def test_write_documents_max_chunk_bytes(self, mock_bulk, document_store):
mock_bulk.return_value = (1, [])
document_store.write_documents([Document(content="Hello world")])

assert mock_bulk.call_args.kwargs["max_chunk_bytes"] == DEFAULT_MAX_CHUNK_BYTES
2 changes: 2 additions & 0 deletions integrations/opensearch/tests/test_embedding_retriever.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from haystack.dataclasses import Document
from haystack_integrations.components.retrievers.opensearch import OpenSearchEmbeddingRetriever
from haystack_integrations.document_stores.opensearch import OpenSearchDocumentStore
from haystack_integrations.document_stores.opensearch.document_store import DEFAULT_MAX_CHUNK_BYTES


def test_init_default():
Expand All @@ -29,6 +30,7 @@ def test_to_dict(_mock_opensearch_client):
"init_parameters": {
"hosts": "some fake host",
"index": "default",
"max_chunk_bytes": DEFAULT_MAX_CHUNK_BYTES,
},
"type": "haystack_integrations.document_stores.opensearch.document_store.OpenSearchDocumentStore",
},
Expand Down