diff --git a/deeplake/core/vectorstore/dataset_handlers/dataset_handler_base.py b/deeplake/core/vectorstore/dataset_handlers/dataset_handler_base.py index 3e4d255146..75d4f96e79 100644 --- a/deeplake/core/vectorstore/dataset_handlers/dataset_handler_base.py +++ b/deeplake/core/vectorstore/dataset_handlers/dataset_handler_base.py @@ -109,7 +109,7 @@ def __init__( self.index_params = utils.parse_index_params(index_params) kwargs["index_params"] = self.index_params self.num_workers = num_workers - self.creds = creds or {} + self.creds = creds self.embedding_function = utils.create_embedding_function(embedding_function) self.tensor_params = tensor_params self.read_only = read_only diff --git a/deeplake/core/vectorstore/dataset_handlers/managed_dataset_handler.py b/deeplake/core/vectorstore/dataset_handlers/managed_dataset_handler.py index 5de60a7c36..8778b204dc 100644 --- a/deeplake/core/vectorstore/dataset_handlers/managed_dataset_handler.py +++ b/deeplake/core/vectorstore/dataset_handlers/managed_dataset_handler.py @@ -104,6 +104,15 @@ def __init__( self.verbose = verbose + self.deep_memory = DeepMemory( + dataset=None, + path=self.path, + token=self.token, + logger=self.logger, + embedding_function=self.embedding_function, + creds=self.creds, + ) + # verifying not implemented args self.args_verifier.verify_init_args( cls=self, @@ -113,6 +122,7 @@ def __init__( creds=creds, org_id=org_id, other_kwargs=kwargs, + deep_memory=self.deep_memory, ) self.client = ManagedServiceClient(token=self.token) @@ -128,14 +138,11 @@ def __init__( log_visualizer_link(response.path) logger.info(response.summary) - self.deep_memory = DeepMemory( - dataset=None, - path=self.path, - token=self.token, - logger=self.logger, - embedding_function=self.embedding_function, - creds=self.creds, - ) + if self.deep_memory is not None and self.embedding_function is not None: + logger.warning( + "ManagedVectorStore does not support passing embedding_function for now. " + "Embedding function will be used only for deepmemory training and inference." + ) def add( self, @@ -167,6 +174,7 @@ def add( # verifying not implemented args self.args_verifier.verify_add_args( embedding_function=embedding_function, + deep_memory=self.deep_memory, embedding_data=embedding_data, embedding_tensor=embedding_tensor, rate_limiter=rate_limiter, @@ -215,6 +223,7 @@ def search( return_view: bool, deep_memory: bool, exec_option: Optional[str] = "tensor_db", + return_tql: bool = False, ) -> Union[Dict, Dataset]: feature_report_path( path=self.bugout_reporting_path, @@ -244,6 +253,7 @@ def search( exec_option=exec_option, return_view=return_view, filter=filter, + return_tql=return_tql, ) response = self.client.vectorstore_search( @@ -374,6 +384,10 @@ def delete_by_path( creds=creds, ) + @property + def exec_option(self): + return "tensor_db" + def _get_summary(self): """Returns a summary of the Managed Vector Store.""" return self.client.get_vectorstore_summary(self.path) @@ -426,11 +440,16 @@ def _verify_filter_is_dictionary(self): "Only Filter Dictionary is supported for the ManagedVectorStore." ) + def _verify_kwarg_is_non_default_and_nonsupported(self, kwarg): + if self.kwargs.get(kwarg, False) is not False: + raise NotImplementedError( + f"`{kwarg}` is not supported for ManagedVectorStore for now." + ) + class InitArgsVerfier(ArgsVerifierBase): _not_implemented_args = [ "dataset", - "embedding_function", "creds", "org_id", ] @@ -447,6 +466,14 @@ def verify(self, cls): "ManagedVectorStore can only be initialized with a Deep Lake Cloud path." ) + if ( + self.kwargs.get("deep_memory", False) is False + and self.kwargs.get("embedding_function", None) is not None + ): + raise NotImplementedError( + "ManagedVectorStore does not support passing embedding_function for now." + ) + if self.kwargs.get("other_kwargs", {}) != {}: other_kwargs = self.kwargs["other_kwargs"] other_kwargs_names = list(other_kwargs.keys()) @@ -459,8 +486,6 @@ def verify(self, cls): class AddArgsVerfier(ArgsVerifierBase): _not_implemented_args = [ - "embedding_function", - "embedding_data", "embedding_tensor", ] @@ -486,11 +511,8 @@ class SearchArgsVerfier(ArgsVerifierBase): def verify(self): super().verify() self._verify_filter_is_dictionary() - - if self.kwargs.get("return_view", False) is not False: - raise NotImplementedError( - "return_view is not supported for the ManagedVectorStore." - ) + self._verify_kwarg_is_non_default_and_nonsupported("return_view") + self._verify_kwarg_is_non_default_and_nonsupported("return_tql") class UpdateArgsVerfier(ArgsVerifierBase): @@ -516,6 +538,9 @@ def verify(self): class DeleteByPathArgsVerfier(ArgsVerifierBase): _not_implemented_args = [ - "force", "creds", ] + + def verify(self): + super().verify() + self._verify_kwarg_is_not_false("force") diff --git a/deeplake/core/vectorstore/dataset_handlers/test_managed_dh.py b/deeplake/core/vectorstore/dataset_handlers/test_managed_dh.py index e7829f5bf3..f5af2da4b0 100644 --- a/deeplake/core/vectorstore/dataset_handlers/test_managed_dh.py +++ b/deeplake/core/vectorstore/dataset_handlers/test_managed_dh.py @@ -18,16 +18,16 @@ def test_managed_vectorstore_should_not_accept_dataset_during_init( ) -def test_managed_vectorstore_should_not_accept_embedding_function_during_init( - hub_cloud_path, hub_cloud_dev_token -): - with pytest.raises(NotImplementedError): - VectorStore( - path=hub_cloud_path, - token=hub_cloud_dev_token, - runtime={"tensor_db": True}, - embedding_function=lambda x: x, - ) +# def test_managed_vectorstore_should_not_accept_embedding_function_during_init( +# hub_cloud_path, hub_cloud_dev_token +# ): +# with pytest.raises(NotImplementedError): +# VectorStore( +# path=hub_cloud_path, +# token=hub_cloud_dev_token, +# runtime={"tensor_db": True}, +# embedding_function=lambda x: x, +# ) def test_managed_vectorstore_should_not_accept_exec_option_during_init( @@ -240,10 +240,10 @@ def test_managed_vectorstore_should_not_accept_exec_option_during_update_embeddi path=hub_cloud_path, token=hub_cloud_dev_token, runtime={"tensor_db": True}, - embedding_dim=100, + embedding_dim=3, ) - embedding_dict = {"embedding": [np.zeros(100, dtype=np.float32)] * 3} + embedding_dict = {"embedding": [[0]] * 3} with pytest.raises(NotImplementedError): db.update_embedding( @@ -260,10 +260,10 @@ def test_managed_vectorstore_should_not_accept_embedding_function_during_update_ path=hub_cloud_path, token=hub_cloud_dev_token, runtime={"tensor_db": True}, - embedding_dim=100, + embedding_dim=3, ) - embedding_dict = {"embedding": [np.zeros(100, dtype=np.float32)] * 3} + embedding_dict = {"embedding": [[0, 0, 0]] * 3} with pytest.raises(NotImplementedError): db.update_embedding( @@ -272,44 +272,6 @@ def test_managed_vectorstore_should_not_accept_embedding_function_during_update_ ) -# def test_managed_vectorstore_should_not_accept_embedding_source_tensor_during_update_embedding( -# hub_cloud_path, hub_cloud_dev_token -# ): -# db = utils.create_and_populate_vs( -# path=hub_cloud_path, -# token=hub_cloud_dev_token, -# runtime={"tensor_db": True}, -# embedding_dim=100, -# ) - -# embedding_dict = {"embedding": [np.zeros(100, dtype=np.float32)] * 3} - -# with pytest.raises(NotImplementedError): -# db.update_embedding( -# embedding_dict=embedding_dict, -# embedding_source_tensor="text", -# ) - - -def test_managed_vectorstore_should_not_accept_embedding_tensor_during_update_embedding( - hub_cloud_path, hub_cloud_dev_token -): - db = utils.create_and_populate_vs( - path=hub_cloud_path, - token=hub_cloud_dev_token, - runtime={"tensor_db": True}, - embedding_dim=100, - ) - - embedding_dict = {"embedding": [np.zeros(100, dtype=np.float32)] * 3} - - with pytest.raises(NotImplementedError): - db.update_embedding( - embedding_dict=embedding_dict, - embedding_tensor="text", - ) - - def test_managed_vectorstore_should_not_accept_force_during_delete_by_path( hub_cloud_path, hub_cloud_dev_token ): @@ -320,7 +282,7 @@ def test_managed_vectorstore_should_not_accept_force_during_delete_by_path( ) with pytest.raises(NotImplementedError): - db.delete_by_path(path=hub_cloud_path, force=True) + db.delete_by_path(path=hub_cloud_path, force=True, runtime={"tensor_db": True}) def test_managed_vectorstore_should_not_accept_creds_during_delete_by_path( diff --git a/deeplake/core/vectorstore/deep_memory/deep_memory.py b/deeplake/core/vectorstore/deep_memory/deep_memory.py index 42b145a468..5ff5c41553 100644 --- a/deeplake/core/vectorstore/deep_memory/deep_memory.py +++ b/deeplake/core/vectorstore/deep_memory/deep_memory.py @@ -5,6 +5,7 @@ from pydantic import BaseModel, ValidationError from typing import Any, Dict, Optional, List, Union, Callable, Tuple from time import time +from tqdm import tqdm import numpy as np @@ -118,7 +119,7 @@ def __init__( self.token = token self.embedding_function = embedding_function self.client = self._get_dm_client() - self.creds = creds or {} + self.creds = creds self.logger = logger @access_control @@ -194,13 +195,30 @@ def train( ) self.logger.info("Preparing training data for deepmemory:") + + embedding = [] + + # TODO: after allowing to send embedding function to managed side, remove the following lines: 203-209 + # internal batch size for embedding function + batch_size = 128 + query_batches = [ + queries[i : i + batch_size] for i in range(0, len(queries), 100) + ] + + for _, query in tqdm(enumerate(query_batches), total=len(query_batches)): + embedded_docs = embedding_function.embed_documents(query) + for idx, embedded_doc in enumerate(embedded_docs): + if isinstance(embedded_doc, np.ndarray): + embedded_docs[idx] = embedded_doc.tolist() + + embedding.extend(embedded_docs) + queries_vs.add( text=[query for query in queries], metadata=[ {"relevance": relevance_per_doc} for relevance_per_doc in relevance ], - embedding_data=[query for query in queries], - embedding_function=embedding_function, + embedding=embedding, ) # do some rest_api calls to train the model diff --git a/deeplake/core/vectorstore/test_deeplake_vectorstore.py b/deeplake/core/vectorstore/test_deeplake_vectorstore.py index d5d3af0fc2..d1d49ef495 100644 --- a/deeplake/core/vectorstore/test_deeplake_vectorstore.py +++ b/deeplake/core/vectorstore/test_deeplake_vectorstore.py @@ -1240,16 +1240,17 @@ def test_update_embedding( embedding_function=embedding_fn, ) - # case 8-9: single embedding_source_tensor, multiple embedding_tensor, single init_embedding_function - with pytest.raises(ValueError): - # case 8: error out because embedding_function is not specified during init call and update call - vector_store.update_embedding( - ids=vector_store_hash_ids, - row_ids=vector_store_row_ids, - filter=vector_store_filters, - query=vector_store_query, - embedding_source_tensor=embedding_source_tensor, - ) + if init_embedding_function is None: + # case 8-9: single embedding_source_tensor, multiple embedding_tensor, single init_embedding_function + with pytest.raises(ValueError): + # case 8: error out because embedding_function is not specified during init call and update call + vector_store.update_embedding( + ids=vector_store_hash_ids, + row_ids=vector_store_row_ids, + filter=vector_store_filters, + query=vector_store_query, + embedding_source_tensor=embedding_source_tensor, + ) # case 10: single embedding_source_tensor, multiple embedding_tensor, multiple embedding_function -> error out? with pytest.raises(ValueError): diff --git a/deeplake/tests/path_fixtures.py b/deeplake/tests/path_fixtures.py index 45b1f00cac..0e1ea4dfb8 100644 --- a/deeplake/tests/path_fixtures.py +++ b/deeplake/tests/path_fixtures.py @@ -480,7 +480,7 @@ def corpus_query_relevances_copy(request, hub_cloud_dev_token): corpus = _get_storage_path(request, HUB_CLOUD) query_vs = VectorStore( - path=f"hub://{HUB_CLOUD_DEV_USERNAME}/deepmemory_test_queries", + path=f"hub://{HUB_CLOUD_DEV_USERNAME}/deepmemory_test_queries2", runtime={"tensor_db": True}, token=hub_cloud_dev_token, )