From f0a579bf18f7d57f02536725b8a7b8e7f62719c4 Mon Sep 17 00:00:00 2001 From: Eugene Yurtsev Date: Wed, 24 Apr 2024 13:18:42 -0400 Subject: [PATCH] core[minor],langchain[patch]: Move base indexing interface and logic to core (#20667) This PR moves the interface and the logic to core. The following changes to namespaces: `indexes` -> `indexing` `indexes._api` -> `indexing.api` Testing code is intentionally duplicated for now since it's testing different implementations of the record manager (in-memory vs. SQL). Common logic will need to be pulled out into the test client. A follow up PR will move the SQL based implementation outside of LangChain. --- libs/core/langchain_core/indexing/__init__.py | 15 + libs/core/langchain_core/indexing/api.py | 606 +++++++ .../langchain_core/indexing}/base.py | 27 +- .../tests/unit_tests/indexing/__init__.py | 0 .../tests/unit_tests/indexing/in_memory.py | 105 ++ .../indexing/test_hashed_document.py | 50 + .../indexing/test_in_memory_record_manager.py | 223 +++ .../unit_tests/indexing/test_indexing.py | 1398 +++++++++++++++++ .../unit_tests/indexing/test_public_api.py | 12 + libs/langchain/langchain/indexes/__init__.py | 3 +- libs/langchain/langchain/indexes/_api.py | 603 +------ .../langchain/indexes/_sql_record_manager.py | 3 +- 12 files changed, 2436 insertions(+), 609 deletions(-) create mode 100644 libs/core/langchain_core/indexing/__init__.py create mode 100644 libs/core/langchain_core/indexing/api.py rename libs/{langchain/langchain/indexes => core/langchain_core/indexing}/base.py (82%) create mode 100644 libs/core/tests/unit_tests/indexing/__init__.py create mode 100644 libs/core/tests/unit_tests/indexing/in_memory.py create mode 100644 libs/core/tests/unit_tests/indexing/test_hashed_document.py create mode 100644 libs/core/tests/unit_tests/indexing/test_in_memory_record_manager.py create mode 100644 libs/core/tests/unit_tests/indexing/test_indexing.py create mode 100644 libs/core/tests/unit_tests/indexing/test_public_api.py diff --git a/libs/core/langchain_core/indexing/__init__.py b/libs/core/langchain_core/indexing/__init__.py new file mode 100644 index 0000000000000..d67ec5eec0c98 --- /dev/null +++ b/libs/core/langchain_core/indexing/__init__.py @@ -0,0 +1,15 @@ +"""Code to help indexing data into a vectorstore. + +This package contains helper logic to help deal with indexing data into +a vectorstore while avoiding duplicated content and over-writing content +if it's unchanged. +""" +from langchain_core.indexing.api import IndexingResult, aindex, index +from langchain_core.indexing.base import RecordManager + +__all__ = [ + "aindex", + "index", + "IndexingResult", + "RecordManager", +] diff --git a/libs/core/langchain_core/indexing/api.py b/libs/core/langchain_core/indexing/api.py new file mode 100644 index 0000000000000..01bc9bffb2b31 --- /dev/null +++ b/libs/core/langchain_core/indexing/api.py @@ -0,0 +1,606 @@ +"""Module contains logic for indexing documents into vector stores.""" +from __future__ import annotations + +import hashlib +import json +import uuid +from itertools import islice +from typing import ( + Any, + AsyncIterable, + AsyncIterator, + Callable, + Dict, + Iterable, + Iterator, + List, + Literal, + Optional, + Sequence, + Set, + TypedDict, + TypeVar, + Union, + cast, +) + +from langchain_core.document_loaders.base import BaseLoader +from langchain_core.documents import Document +from langchain_core.indexing.base import RecordManager +from langchain_core.pydantic_v1 import root_validator +from langchain_core.vectorstores import VectorStore + +# Magic UUID to use as a namespace for hashing. +# Used to try and generate a unique UUID for each document +# from hashing the document content and metadata. +NAMESPACE_UUID = uuid.UUID(int=1984) + + +T = TypeVar("T") + + +def _hash_string_to_uuid(input_string: str) -> uuid.UUID: + """Hashes a string and returns the corresponding UUID.""" + hash_value = hashlib.sha1(input_string.encode("utf-8")).hexdigest() + return uuid.uuid5(NAMESPACE_UUID, hash_value) + + +def _hash_nested_dict_to_uuid(data: dict[Any, Any]) -> uuid.UUID: + """Hashes a nested dictionary and returns the corresponding UUID.""" + serialized_data = json.dumps(data, sort_keys=True) + hash_value = hashlib.sha1(serialized_data.encode("utf-8")).hexdigest() + return uuid.uuid5(NAMESPACE_UUID, hash_value) + + +class _HashedDocument(Document): + """A hashed document with a unique ID.""" + + uid: str + hash_: str + """The hash of the document including content and metadata.""" + content_hash: str + """The hash of the document content.""" + metadata_hash: str + """The hash of the document metadata.""" + + @classmethod + def is_lc_serializable(cls) -> bool: + return False + + @root_validator(pre=True) + def calculate_hashes(cls, values: Dict[str, Any]) -> Dict[str, Any]: + """Root validator to calculate content and metadata hash.""" + content = values.get("page_content", "") + metadata = values.get("metadata", {}) + + forbidden_keys = ("hash_", "content_hash", "metadata_hash") + + for key in forbidden_keys: + if key in metadata: + raise ValueError( + f"Metadata cannot contain key {key} as it " + f"is reserved for internal use." + ) + + content_hash = str(_hash_string_to_uuid(content)) + + try: + metadata_hash = str(_hash_nested_dict_to_uuid(metadata)) + except Exception as e: + raise ValueError( + f"Failed to hash metadata: {e}. " + f"Please use a dict that can be serialized using json." + ) + + values["content_hash"] = content_hash + values["metadata_hash"] = metadata_hash + values["hash_"] = str(_hash_string_to_uuid(content_hash + metadata_hash)) + + _uid = values.get("uid", None) + + if _uid is None: + values["uid"] = values["hash_"] + return values + + def to_document(self) -> Document: + """Return a Document object.""" + return Document( + page_content=self.page_content, + metadata=self.metadata, + ) + + @classmethod + def from_document( + cls, document: Document, *, uid: Optional[str] = None + ) -> _HashedDocument: + """Create a HashedDocument from a Document.""" + return cls( # type: ignore[call-arg] + uid=uid, # type: ignore[arg-type] + page_content=document.page_content, + metadata=document.metadata, + ) + + +def _batch(size: int, iterable: Iterable[T]) -> Iterator[List[T]]: + """Utility batching function.""" + it = iter(iterable) + while True: + chunk = list(islice(it, size)) + if not chunk: + return + yield chunk + + +async def _abatch(size: int, iterable: AsyncIterable[T]) -> AsyncIterator[List[T]]: + """Utility batching function.""" + batch: List[T] = [] + async for element in iterable: + if len(batch) < size: + batch.append(element) + + if len(batch) >= size: + yield batch + batch = [] + + if batch: + yield batch + + +def _get_source_id_assigner( + source_id_key: Union[str, Callable[[Document], str], None], +) -> Callable[[Document], Union[str, None]]: + """Get the source id from the document.""" + if source_id_key is None: + return lambda doc: None + elif isinstance(source_id_key, str): + return lambda doc: doc.metadata[source_id_key] + elif callable(source_id_key): + return source_id_key + else: + raise ValueError( + f"source_id_key should be either None, a string or a callable. " + f"Got {source_id_key} of type {type(source_id_key)}." + ) + + +def _deduplicate_in_order( + hashed_documents: Iterable[_HashedDocument], +) -> Iterator[_HashedDocument]: + """Deduplicate a list of hashed documents while preserving order.""" + seen: Set[str] = set() + + for hashed_doc in hashed_documents: + if hashed_doc.hash_ not in seen: + seen.add(hashed_doc.hash_) + yield hashed_doc + + +# PUBLIC API + + +class IndexingResult(TypedDict): + """Return a detailed a breakdown of the result of the indexing operation.""" + + num_added: int + """Number of added documents.""" + num_updated: int + """Number of updated documents because they were not up to date.""" + num_deleted: int + """Number of deleted documents.""" + num_skipped: int + """Number of skipped documents because they were already up to date.""" + + +def index( + docs_source: Union[BaseLoader, Iterable[Document]], + record_manager: RecordManager, + vector_store: VectorStore, + *, + batch_size: int = 100, + cleanup: Literal["incremental", "full", None] = None, + source_id_key: Union[str, Callable[[Document], str], None] = None, + cleanup_batch_size: int = 1_000, + force_update: bool = False, +) -> IndexingResult: + """Index data from the loader into the vector store. + + Indexing functionality uses a manager to keep track of which documents + are in the vector store. + + This allows us to keep track of which documents were updated, and which + documents were deleted, which documents should be skipped. + + For the time being, documents are indexed using their hashes, and users + are not able to specify the uid of the document. + + IMPORTANT: + if auto_cleanup is set to True, the loader should be returning + the entire dataset, and not just a subset of the dataset. + Otherwise, the auto_cleanup will remove documents that it is not + supposed to. + + Args: + docs_source: Data loader or iterable of documents to index. + record_manager: Timestamped set to keep track of which documents were + updated. + vector_store: Vector store to index the documents into. + batch_size: Batch size to use when indexing. + cleanup: How to handle clean up of documents. + - Incremental: Cleans up all documents that haven't been updated AND + that are associated with source ids that were seen + during indexing. + Clean up is done continuously during indexing helping + to minimize the probability of users seeing duplicated + content. + - Full: Delete all documents that have not been returned by the loader + during this run of indexing. + Clean up runs after all documents have been indexed. + This means that users may see duplicated content during indexing. + - None: Do not delete any documents. + source_id_key: Optional key that helps identify the original source + of the document. + cleanup_batch_size: Batch size to use when cleaning up documents. + force_update: Force update documents even if they are present in the + record manager. Useful if you are re-indexing with updated embeddings. + + Returns: + Indexing result which contains information about how many documents + were added, updated, deleted, or skipped. + """ + if cleanup not in {"incremental", "full", None}: + raise ValueError( + f"cleanup should be one of 'incremental', 'full' or None. " + f"Got {cleanup}." + ) + + if cleanup == "incremental" and source_id_key is None: + raise ValueError("Source id key is required when cleanup mode is incremental.") + + # Check that the Vectorstore has required methods implemented + methods = ["delete", "add_documents"] + + for method in methods: + if not hasattr(vector_store, method): + raise ValueError( + f"Vectorstore {vector_store} does not have required method {method}" + ) + + if type(vector_store).delete == VectorStore.delete: + # Checking if the vectorstore has overridden the default delete method + # implementation which just raises a NotImplementedError + raise ValueError("Vectorstore has not implemented the delete method") + + if isinstance(docs_source, BaseLoader): + try: + doc_iterator = docs_source.lazy_load() + except NotImplementedError: + doc_iterator = iter(docs_source.load()) + else: + doc_iterator = iter(docs_source) + + source_id_assigner = _get_source_id_assigner(source_id_key) + + # Mark when the update started. + index_start_dt = record_manager.get_time() + num_added = 0 + num_skipped = 0 + num_updated = 0 + num_deleted = 0 + + for doc_batch in _batch(batch_size, doc_iterator): + hashed_docs = list( + _deduplicate_in_order( + [_HashedDocument.from_document(doc) for doc in doc_batch] + ) + ) + + source_ids: Sequence[Optional[str]] = [ + source_id_assigner(doc) for doc in hashed_docs + ] + + if cleanup == "incremental": + # If the cleanup mode is incremental, source ids are required. + for source_id, hashed_doc in zip(source_ids, hashed_docs): + if source_id is None: + raise ValueError( + "Source ids are required when cleanup mode is incremental. " + f"Document that starts with " + f"content: {hashed_doc.page_content[:100]} was not assigned " + f"as source id." + ) + # source ids cannot be None after for loop above. + source_ids = cast(Sequence[str], source_ids) # type: ignore[assignment] + + exists_batch = record_manager.exists([doc.uid for doc in hashed_docs]) + + # Filter out documents that already exist in the record store. + uids = [] + docs_to_index = [] + uids_to_refresh = [] + seen_docs: Set[str] = set() + for hashed_doc, doc_exists in zip(hashed_docs, exists_batch): + if doc_exists: + if force_update: + seen_docs.add(hashed_doc.uid) + else: + uids_to_refresh.append(hashed_doc.uid) + continue + uids.append(hashed_doc.uid) + docs_to_index.append(hashed_doc.to_document()) + + # Update refresh timestamp + if uids_to_refresh: + record_manager.update(uids_to_refresh, time_at_least=index_start_dt) + num_skipped += len(uids_to_refresh) + + # Be pessimistic and assume that all vector store write will fail. + # First write to vector store + if docs_to_index: + vector_store.add_documents(docs_to_index, ids=uids, batch_size=batch_size) + num_added += len(docs_to_index) - len(seen_docs) + num_updated += len(seen_docs) + + # And only then update the record store. + # Update ALL records, even if they already exist since we want to refresh + # their timestamp. + record_manager.update( + [doc.uid for doc in hashed_docs], + group_ids=source_ids, + time_at_least=index_start_dt, + ) + + # If source IDs are provided, we can do the deletion incrementally! + if cleanup == "incremental": + # Get the uids of the documents that were not returned by the loader. + + # mypy isn't good enough to determine that source ids cannot be None + # here due to a check that's happening above, so we check again. + for source_id in source_ids: + if source_id is None: + raise AssertionError("Source ids cannot be None here.") + + _source_ids = cast(Sequence[str], source_ids) + + uids_to_delete = record_manager.list_keys( + group_ids=_source_ids, before=index_start_dt + ) + if uids_to_delete: + # Then delete from vector store. + vector_store.delete(uids_to_delete) + # First delete from record store. + record_manager.delete_keys(uids_to_delete) + num_deleted += len(uids_to_delete) + + if cleanup == "full": + while uids_to_delete := record_manager.list_keys( + before=index_start_dt, limit=cleanup_batch_size + ): + # First delete from record store. + vector_store.delete(uids_to_delete) + # Then delete from record manager. + record_manager.delete_keys(uids_to_delete) + num_deleted += len(uids_to_delete) + + return { + "num_added": num_added, + "num_updated": num_updated, + "num_skipped": num_skipped, + "num_deleted": num_deleted, + } + + +# Define an asynchronous generator function +async def _to_async_iterator(iterator: Iterable[T]) -> AsyncIterator[T]: + """Convert an iterable to an async iterator.""" + for item in iterator: + yield item + + +async def aindex( + docs_source: Union[BaseLoader, Iterable[Document], AsyncIterator[Document]], + record_manager: RecordManager, + vector_store: VectorStore, + *, + batch_size: int = 100, + cleanup: Literal["incremental", "full", None] = None, + source_id_key: Union[str, Callable[[Document], str], None] = None, + cleanup_batch_size: int = 1_000, + force_update: bool = False, +) -> IndexingResult: + """Index data from the loader into the vector store. + + Indexing functionality uses a manager to keep track of which documents + are in the vector store. + + This allows us to keep track of which documents were updated, and which + documents were deleted, which documents should be skipped. + + For the time being, documents are indexed using their hashes, and users + are not able to specify the uid of the document. + + IMPORTANT: + if auto_cleanup is set to True, the loader should be returning + the entire dataset, and not just a subset of the dataset. + Otherwise, the auto_cleanup will remove documents that it is not + supposed to. + + Args: + docs_source: Data loader or iterable of documents to index. + record_manager: Timestamped set to keep track of which documents were + updated. + vector_store: Vector store to index the documents into. + batch_size: Batch size to use when indexing. + cleanup: How to handle clean up of documents. + - Incremental: Cleans up all documents that haven't been updated AND + that are associated with source ids that were seen + during indexing. + Clean up is done continuously during indexing helping + to minimize the probability of users seeing duplicated + content. + - Full: Delete all documents that haven to been returned by the loader. + Clean up runs after all documents have been indexed. + This means that users may see duplicated content during indexing. + - None: Do not delete any documents. + source_id_key: Optional key that helps identify the original source + of the document. + cleanup_batch_size: Batch size to use when cleaning up documents. + force_update: Force update documents even if they are present in the + record manager. Useful if you are re-indexing with updated embeddings. + + Returns: + Indexing result which contains information about how many documents + were added, updated, deleted, or skipped. + """ + + if cleanup not in {"incremental", "full", None}: + raise ValueError( + f"cleanup should be one of 'incremental', 'full' or None. " + f"Got {cleanup}." + ) + + if cleanup == "incremental" and source_id_key is None: + raise ValueError("Source id key is required when cleanup mode is incremental.") + + # Check that the Vectorstore has required methods implemented + methods = ["adelete", "aadd_documents"] + + for method in methods: + if not hasattr(vector_store, method): + raise ValueError( + f"Vectorstore {vector_store} does not have required method {method}" + ) + + if type(vector_store).adelete == VectorStore.adelete: + # Checking if the vectorstore has overridden the default delete method + # implementation which just raises a NotImplementedError + raise ValueError("Vectorstore has not implemented the delete method") + + async_doc_iterator: AsyncIterator[Document] + if isinstance(docs_source, BaseLoader): + try: + async_doc_iterator = docs_source.alazy_load() + except NotImplementedError: + # Exception triggered when neither lazy_load nor alazy_load are implemented. + # * The default implementation of alazy_load uses lazy_load. + # * The default implementation of lazy_load raises NotImplementedError. + # In such a case, we use the load method and convert it to an async + # iterator. + async_doc_iterator = _to_async_iterator(docs_source.load()) + else: + if hasattr(docs_source, "__aiter__"): + async_doc_iterator = docs_source # type: ignore[assignment] + else: + async_doc_iterator = _to_async_iterator(docs_source) + + source_id_assigner = _get_source_id_assigner(source_id_key) + + # Mark when the update started. + index_start_dt = await record_manager.aget_time() + num_added = 0 + num_skipped = 0 + num_updated = 0 + num_deleted = 0 + + async for doc_batch in _abatch(batch_size, async_doc_iterator): + hashed_docs = list( + _deduplicate_in_order( + [_HashedDocument.from_document(doc) for doc in doc_batch] + ) + ) + + source_ids: Sequence[Optional[str]] = [ + source_id_assigner(doc) for doc in hashed_docs + ] + + if cleanup == "incremental": + # If the cleanup mode is incremental, source ids are required. + for source_id, hashed_doc in zip(source_ids, hashed_docs): + if source_id is None: + raise ValueError( + "Source ids are required when cleanup mode is incremental. " + f"Document that starts with " + f"content: {hashed_doc.page_content[:100]} was not assigned " + f"as source id." + ) + # source ids cannot be None after for loop above. + source_ids = cast(Sequence[str], source_ids) + + exists_batch = await record_manager.aexists([doc.uid for doc in hashed_docs]) + + # Filter out documents that already exist in the record store. + uids: list[str] = [] + docs_to_index: list[Document] = [] + uids_to_refresh = [] + seen_docs: Set[str] = set() + for hashed_doc, doc_exists in zip(hashed_docs, exists_batch): + if doc_exists: + if force_update: + seen_docs.add(hashed_doc.uid) + else: + uids_to_refresh.append(hashed_doc.uid) + continue + uids.append(hashed_doc.uid) + docs_to_index.append(hashed_doc.to_document()) + + if uids_to_refresh: + # Must be updated to refresh timestamp. + await record_manager.aupdate(uids_to_refresh, time_at_least=index_start_dt) + num_skipped += len(uids_to_refresh) + + # Be pessimistic and assume that all vector store write will fail. + # First write to vector store + if docs_to_index: + await vector_store.aadd_documents( + docs_to_index, ids=uids, batch_size=batch_size + ) + num_added += len(docs_to_index) - len(seen_docs) + num_updated += len(seen_docs) + + # And only then update the record store. + # Update ALL records, even if they already exist since we want to refresh + # their timestamp. + await record_manager.aupdate( + [doc.uid for doc in hashed_docs], + group_ids=source_ids, + time_at_least=index_start_dt, + ) + + # If source IDs are provided, we can do the deletion incrementally! + + if cleanup == "incremental": + # Get the uids of the documents that were not returned by the loader. + + # mypy isn't good enough to determine that source ids cannot be None + # here due to a check that's happening above, so we check again. + for source_id in source_ids: + if source_id is None: + raise AssertionError("Source ids cannot be None here.") + + _source_ids = cast(Sequence[str], source_ids) + + uids_to_delete = await record_manager.alist_keys( + group_ids=_source_ids, before=index_start_dt + ) + if uids_to_delete: + # Then delete from vector store. + await vector_store.adelete(uids_to_delete) + # First delete from record store. + await record_manager.adelete_keys(uids_to_delete) + num_deleted += len(uids_to_delete) + + if cleanup == "full": + while uids_to_delete := await record_manager.alist_keys( + before=index_start_dt, limit=cleanup_batch_size + ): + # First delete from record store. + await vector_store.adelete(uids_to_delete) + # Then delete from record manager. + await record_manager.adelete_keys(uids_to_delete) + num_deleted += len(uids_to_delete) + + return { + "num_added": num_added, + "num_updated": num_updated, + "num_skipped": num_skipped, + "num_deleted": num_deleted, + } diff --git a/libs/langchain/langchain/indexes/base.py b/libs/core/langchain_core/indexing/base.py similarity index 82% rename from libs/langchain/langchain/indexes/base.py rename to libs/core/langchain_core/indexing/base.py index 46ef5bf2efab2..c91ffa78fd351 100644 --- a/libs/langchain/langchain/indexes/base.py +++ b/libs/core/langchain_core/indexing/base.py @@ -1,11 +1,8 @@ from __future__ import annotations -import uuid from abc import ABC, abstractmethod from typing import List, Optional, Sequence -NAMESPACE_UUID = uuid.UUID(int=1984) - class RecordManager(ABC): """An abstract base class representing the interface for a record manager.""" @@ -64,8 +61,16 @@ def update( Args: keys: A list of record keys to upsert. group_ids: A list of group IDs corresponding to the keys. - time_at_least: if provided, updates should only happen if the - updated_at field is at least this time. + time_at_least: Optional timestamp. Implementation can use this + to optionally verify that the timestamp IS at least this time + in the system that stores the data. + + e.g., use to validate that the time in the postgres database + is equal to or larger than the given timestamp, if not + raise an error. + + This is meant to help prevent time-drift issues since + time may not be monotonically increasing! Raises: ValueError: If the length of keys doesn't match the length of group_ids. @@ -84,8 +89,16 @@ async def aupdate( Args: keys: A list of record keys to upsert. group_ids: A list of group IDs corresponding to the keys. - time_at_least: if provided, updates should only happen if the - updated_at field is at least this time. + time_at_least: Optional timestamp. Implementation can use this + to optionally verify that the timestamp IS at least this time + in the system that stores the data. + + e.g., use to validate that the time in the postgres database + is equal to or larger than the given timestamp, if not + raise an error. + + This is meant to help prevent time-drift issues since + time may not be monotonically increasing! Raises: ValueError: If the length of keys doesn't match the length of group_ids. diff --git a/libs/core/tests/unit_tests/indexing/__init__.py b/libs/core/tests/unit_tests/indexing/__init__.py new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/libs/core/tests/unit_tests/indexing/in_memory.py b/libs/core/tests/unit_tests/indexing/in_memory.py new file mode 100644 index 0000000000000..c9d55f51a79a7 --- /dev/null +++ b/libs/core/tests/unit_tests/indexing/in_memory.py @@ -0,0 +1,105 @@ +import time +from typing import Dict, List, Optional, Sequence, TypedDict + +from langchain_core.indexing.base import RecordManager + + +class _Record(TypedDict): + group_id: Optional[str] + updated_at: float + + +class InMemoryRecordManager(RecordManager): + """An in-memory record manager for testing purposes.""" + + def __init__(self, namespace: str) -> None: + super().__init__(namespace) + # Each key points to a dictionary + # of {'group_id': group_id, 'updated_at': timestamp} + self.records: Dict[str, _Record] = {} + self.namespace = namespace + + def create_schema(self) -> None: + """In-memory schema creation is simply ensuring the structure is initialized.""" + + async def acreate_schema(self) -> None: + """In-memory schema creation is simply ensuring the structure is initialized.""" + + def get_time(self) -> float: + """Get the current server time as a high resolution timestamp!""" + return time.time() + + async def aget_time(self) -> float: + """Get the current server time as a high resolution timestamp!""" + return self.get_time() + + def update( + self, + keys: Sequence[str], + *, + group_ids: Optional[Sequence[Optional[str]]] = None, + time_at_least: Optional[float] = None, + ) -> None: + if group_ids and len(keys) != len(group_ids): + raise ValueError("Length of keys must match length of group_ids") + for index, key in enumerate(keys): + group_id = group_ids[index] if group_ids else None + if time_at_least and time_at_least > self.get_time(): + raise ValueError("time_at_least must be in the past") + self.records[key] = {"group_id": group_id, "updated_at": self.get_time()} + + async def aupdate( + self, + keys: Sequence[str], + *, + group_ids: Optional[Sequence[Optional[str]]] = None, + time_at_least: Optional[float] = None, + ) -> None: + self.update(keys, group_ids=group_ids, time_at_least=time_at_least) + + def exists(self, keys: Sequence[str]) -> List[bool]: + return [key in self.records for key in keys] + + async def aexists(self, keys: Sequence[str]) -> List[bool]: + return self.exists(keys) + + def list_keys( + self, + *, + before: Optional[float] = None, + after: Optional[float] = None, + group_ids: Optional[Sequence[str]] = None, + limit: Optional[int] = None, + ) -> List[str]: + result = [] + for key, data in self.records.items(): + if before and data["updated_at"] >= before: + continue + if after and data["updated_at"] <= after: + continue + if group_ids and data["group_id"] not in group_ids: + continue + result.append(key) + if limit: + return result[:limit] + return result + + async def alist_keys( + self, + *, + before: Optional[float] = None, + after: Optional[float] = None, + group_ids: Optional[Sequence[str]] = None, + limit: Optional[int] = None, + ) -> List[str]: + return self.list_keys( + before=before, after=after, group_ids=group_ids, limit=limit + ) + + def delete_keys(self, keys: Sequence[str]) -> None: + for key in keys: + if key in self.records: + del self.records[key] + + async def adelete_keys(self, keys: Sequence[str]) -> None: + self.delete_keys(keys) diff --git a/libs/core/tests/unit_tests/indexing/test_hashed_document.py b/libs/core/tests/unit_tests/indexing/test_hashed_document.py new file mode 100644 index 0000000000000..343787328526f --- /dev/null +++ b/libs/core/tests/unit_tests/indexing/test_hashed_document.py @@ -0,0 +1,50 @@ +import pytest + +from langchain_core.documents import Document +from langchain_core.indexing.api import _HashedDocument + + +def test_hashed_document_hashing() -> None: + hashed_document = _HashedDocument( # type: ignore[call-arg] + uid="123", page_content="Lorem ipsum dolor sit amet", metadata={"key": "value"} + ) + assert isinstance(hashed_document.hash_, str) + + +def test_hashing_with_missing_content() -> None: + """Check that ValueError is raised if page_content is missing.""" + with pytest.raises(TypeError): + _HashedDocument( + metadata={"key": "value"}, + ) # type: ignore + + +def test_uid_auto_assigned_to_hash() -> None: + """Test uid is auto-assigned to the hashed_document hash.""" + hashed_document = _HashedDocument( # type: ignore[call-arg] + page_content="Lorem ipsum dolor sit amet", metadata={"key": "value"} + ) + assert hashed_document.uid == hashed_document.hash_ + + +def test_to_document() -> None: + """Test to_document method.""" + hashed_document = _HashedDocument( # type: ignore[call-arg] + page_content="Lorem ipsum dolor sit amet", metadata={"key": "value"} + ) + doc = hashed_document.to_document() + assert isinstance(doc, Document) + assert doc.page_content == "Lorem ipsum dolor sit amet" + assert doc.metadata == {"key": "value"} + + +def test_from_document() -> None: + """Test from document class method.""" + document = Document( + page_content="Lorem ipsum dolor sit amet", metadata={"key": "value"} + ) + + hashed_document = _HashedDocument.from_document(document) + # hash should be deterministic + assert hashed_document.hash_ == "fd1dc827-051b-537d-a1fe-1fa043e8b276" + assert hashed_document.uid == hashed_document.hash_ diff --git a/libs/core/tests/unit_tests/indexing/test_in_memory_record_manager.py b/libs/core/tests/unit_tests/indexing/test_in_memory_record_manager.py new file mode 100644 index 0000000000000..ea88724513fb1 --- /dev/null +++ b/libs/core/tests/unit_tests/indexing/test_in_memory_record_manager.py @@ -0,0 +1,223 @@ +from datetime import datetime +from unittest.mock import patch + +import pytest +import pytest_asyncio + +from tests.unit_tests.indexing.in_memory import InMemoryRecordManager + + +@pytest.fixture() +def manager() -> InMemoryRecordManager: + """Initialize the test database and yield the TimestampedSet instance.""" + # Initialize and yield the TimestampedSet instance + record_manager = InMemoryRecordManager(namespace="kittens") + record_manager.create_schema() + return record_manager + + +@pytest_asyncio.fixture() +async def amanager() -> InMemoryRecordManager: + """Initialize the test database and yield the TimestampedSet instance.""" + # Initialize and yield the TimestampedSet instance + record_manager = InMemoryRecordManager(namespace="kittens") + await record_manager.acreate_schema() + return record_manager + + +def test_update(manager: InMemoryRecordManager) -> None: + """Test updating records in the database.""" + # no keys should be present in the set + read_keys = manager.list_keys() + assert read_keys == [] + # Insert records + keys = ["key1", "key2", "key3"] + manager.update(keys) + # Retrieve the records + read_keys = manager.list_keys() + assert read_keys == ["key1", "key2", "key3"] + + +async def test_aupdate(amanager: InMemoryRecordManager) -> None: + """Test updating records in the database.""" + # no keys should be present in the set + read_keys = await amanager.alist_keys() + assert read_keys == [] + # Insert records + keys = ["key1", "key2", "key3"] + await amanager.aupdate(keys) + # Retrieve the records + read_keys = await amanager.alist_keys() + assert read_keys == ["key1", "key2", "key3"] + + +def test_update_timestamp(manager: InMemoryRecordManager) -> None: + """Test updating records in the database.""" + # no keys should be present in the set + with patch.object( + manager, "get_time", return_value=datetime(2021, 1, 2).timestamp() + ): + manager.update(["key1"]) + + assert manager.list_keys() == ["key1"] + assert manager.list_keys(before=datetime(2021, 1, 1).timestamp()) == [] + assert manager.list_keys(after=datetime(2021, 1, 1).timestamp()) == ["key1"] + assert manager.list_keys(after=datetime(2021, 1, 3).timestamp()) == [] + + # Update the timestamp + with patch.object( + manager, "get_time", return_value=datetime(2023, 1, 5).timestamp() + ): + manager.update(["key1"]) + + assert manager.list_keys() == ["key1"] + assert manager.list_keys(before=datetime(2023, 1, 1).timestamp()) == [] + assert manager.list_keys(after=datetime(2023, 1, 1).timestamp()) == ["key1"] + assert manager.list_keys(after=datetime(2023, 1, 3).timestamp()) == ["key1"] + + +async def test_aupdate_timestamp(manager: InMemoryRecordManager) -> None: + """Test updating records in the database.""" + # no keys should be present in the set + with patch.object( + manager, "get_time", return_value=datetime(2021, 1, 2).timestamp() + ): + await manager.aupdate(["key1"]) + + assert await manager.alist_keys() == ["key1"] + assert await manager.alist_keys(before=datetime(2021, 1, 1).timestamp()) == [] + assert await manager.alist_keys(after=datetime(2021, 1, 1).timestamp()) == ["key1"] + assert await manager.alist_keys(after=datetime(2021, 1, 3).timestamp()) == [] + + # Update the timestamp + with patch.object( + manager, "get_time", return_value=datetime(2023, 1, 5).timestamp() + ): + await manager.aupdate(["key1"]) + + assert await manager.alist_keys() == ["key1"] + assert await manager.alist_keys(before=datetime(2023, 1, 1).timestamp()) == [] + assert await manager.alist_keys(after=datetime(2023, 1, 1).timestamp()) == ["key1"] + assert await manager.alist_keys(after=datetime(2023, 1, 3).timestamp()) == ["key1"] + + +def test_exists(manager: InMemoryRecordManager) -> None: + """Test checking if keys exist in the database.""" + # Insert records + keys = ["key1", "key2", "key3"] + manager.update(keys) + # Check if the keys exist in the database + exists = manager.exists(keys) + assert len(exists) == len(keys) + assert exists == [True, True, True] + + exists = manager.exists(["key1", "key4"]) + assert len(exists) == 2 + assert exists == [True, False] + + +async def test_aexists(amanager: InMemoryRecordManager) -> None: + """Test checking if keys exist in the database.""" + # Insert records + keys = ["key1", "key2", "key3"] + await amanager.aupdate(keys) + # Check if the keys exist in the database + exists = await amanager.aexists(keys) + assert len(exists) == len(keys) + assert exists == [True, True, True] + + exists = await amanager.aexists(["key1", "key4"]) + assert len(exists) == 2 + assert exists == [True, False] + + +async def test_list_keys(manager: InMemoryRecordManager) -> None: + """Test listing keys based on the provided date range.""" + # Insert records + assert manager.list_keys() == [] + assert await manager.alist_keys() == [] + + with patch.object( + manager, "get_time", return_value=datetime(2021, 1, 2).timestamp() + ): + manager.update(["key1", "key2"]) + manager.update(["key3"], group_ids=["group1"]) + manager.update(["key4"], group_ids=["group2"]) + + with patch.object( + manager, "get_time", return_value=datetime(2021, 1, 10).timestamp() + ): + manager.update(["key5"]) + + assert sorted(manager.list_keys()) == ["key1", "key2", "key3", "key4", "key5"] + assert sorted(await manager.alist_keys()) == [ + "key1", + "key2", + "key3", + "key4", + "key5", + ] + + # By group + assert manager.list_keys(group_ids=["group1"]) == ["key3"] + assert await manager.alist_keys(group_ids=["group1"]) == ["key3"] + + # Before + assert sorted(manager.list_keys(before=datetime(2021, 1, 3).timestamp())) == [ + "key1", + "key2", + "key3", + "key4", + ] + assert sorted( + await manager.alist_keys(before=datetime(2021, 1, 3).timestamp()) + ) == [ + "key1", + "key2", + "key3", + "key4", + ] + + # After + assert sorted(manager.list_keys(after=datetime(2021, 1, 3).timestamp())) == ["key5"] + assert sorted(await manager.alist_keys(after=datetime(2021, 1, 3).timestamp())) == [ + "key5" + ] + + results = manager.list_keys(limit=1) + assert len(results) == 1 + assert results[0] in ["key1", "key2", "key3", "key4", "key5"] + + results = await manager.alist_keys(limit=1) + assert len(results) == 1 + assert results[0] in ["key1", "key2", "key3", "key4", "key5"] + + +def test_delete_keys(manager: InMemoryRecordManager) -> None: + """Test deleting keys from the database.""" + # Insert records + keys = ["key1", "key2", "key3"] + manager.update(keys) + + # Delete some keys + keys_to_delete = ["key1", "key2"] + manager.delete_keys(keys_to_delete) + + # Check if the deleted keys are no longer in the database + remaining_keys = manager.list_keys() + assert remaining_keys == ["key3"] + + +async def test_adelete_keys(amanager: InMemoryRecordManager) -> None: + """Test deleting keys from the database.""" + # Insert records + keys = ["key1", "key2", "key3"] + await amanager.aupdate(keys) + + # Delete some keys + keys_to_delete = ["key1", "key2"] + await amanager.adelete_keys(keys_to_delete) + + # Check if the deleted keys are no longer in the database + remaining_keys = await amanager.alist_keys() + assert remaining_keys == ["key3"] diff --git a/libs/core/tests/unit_tests/indexing/test_indexing.py b/libs/core/tests/unit_tests/indexing/test_indexing.py new file mode 100644 index 0000000000000..701204363abb0 --- /dev/null +++ b/libs/core/tests/unit_tests/indexing/test_indexing.py @@ -0,0 +1,1398 @@ +from datetime import datetime +from typing import ( + Any, + AsyncIterator, + Dict, + Iterable, + Iterator, + List, + Optional, + Sequence, + Type, +) +from unittest.mock import patch + +import pytest +import pytest_asyncio + +from langchain_core.document_loaders.base import BaseLoader +from langchain_core.documents import Document +from langchain_core.embeddings import Embeddings +from langchain_core.indexing import aindex, index +from langchain_core.indexing.api import _abatch, _HashedDocument +from langchain_core.vectorstores import VST, VectorStore +from tests.unit_tests.indexing.in_memory import InMemoryRecordManager + + +class ToyLoader(BaseLoader): + """Toy loader that always returns the same documents.""" + + def __init__(self, documents: Sequence[Document]) -> None: + """Initialize with the documents to return.""" + self.documents = documents + + def lazy_load( + self, + ) -> Iterator[Document]: + yield from self.documents + + async def alazy_load( + self, + ) -> AsyncIterator[Document]: + for document in self.documents: + yield document + + +class InMemoryVectorStore(VectorStore): + """In-memory implementation of VectorStore using a dictionary.""" + + def __init__(self, permit_upserts: bool = False) -> None: + """Vector store interface for testing things in memory.""" + self.store: Dict[str, Document] = {} + self.permit_upserts = permit_upserts + + def delete(self, ids: Optional[Sequence[str]] = None, **kwargs: Any) -> None: + """Delete the given documents from the store using their IDs.""" + if ids: + for _id in ids: + self.store.pop(_id, None) + + async def adelete(self, ids: Optional[Sequence[str]] = None, **kwargs: Any) -> None: + """Delete the given documents from the store using their IDs.""" + if ids: + for _id in ids: + self.store.pop(_id, None) + + def add_documents( # type: ignore + self, + documents: Sequence[Document], + *, + ids: Optional[Sequence[str]] = None, + **kwargs: Any, + ) -> List[str]: + """Add the given documents to the store (insert behavior).""" + if ids and len(ids) != len(documents): + raise ValueError( + f"Expected {len(ids)} ids, got {len(documents)} documents." + ) + + if not ids: + raise NotImplementedError("This is not implemented yet.") + + for _id, document in zip(ids, documents): + if _id in self.store and not self.permit_upserts: + raise ValueError( + f"Document with uid {_id} already exists in the store." + ) + self.store[_id] = document + + return list(ids) + + async def aadd_documents( + self, + documents: Sequence[Document], + *, + ids: Optional[Sequence[str]] = None, + **kwargs: Any, + ) -> List[str]: + if ids and len(ids) != len(documents): + raise ValueError( + f"Expected {len(ids)} ids, got {len(documents)} documents." + ) + + if not ids: + raise NotImplementedError("This is not implemented yet.") + + for _id, document in zip(ids, documents): + if _id in self.store and not self.permit_upserts: + raise ValueError( + f"Document with uid {_id} already exists in the store." + ) + self.store[_id] = document + return list(ids) + + def add_texts( + self, + texts: Iterable[str], + metadatas: Optional[List[Dict[Any, Any]]] = None, + **kwargs: Any, + ) -> List[str]: + """Add the given texts to the store (insert behavior).""" + raise NotImplementedError() + + @classmethod + def from_texts( + cls: Type[VST], + texts: List[str], + embedding: Embeddings, + metadatas: Optional[List[Dict[Any, Any]]] = None, + **kwargs: Any, + ) -> VST: + """Create a vector store from a list of texts.""" + raise NotImplementedError() + + def similarity_search( + self, query: str, k: int = 4, **kwargs: Any + ) -> List[Document]: + """Find the most similar documents to the given query.""" + raise NotImplementedError() + + +@pytest.fixture +def record_manager() -> InMemoryRecordManager: + """Timestamped set fixture.""" + record_manager = InMemoryRecordManager(namespace="hello") + record_manager.create_schema() + return record_manager + + +@pytest_asyncio.fixture # type: ignore +async def arecord_manager() -> InMemoryRecordManager: + """Timestamped set fixture.""" + record_manager = InMemoryRecordManager(namespace="hello") + await record_manager.acreate_schema() + return record_manager + + +@pytest.fixture +def vector_store() -> InMemoryVectorStore: + """Vector store fixture.""" + return InMemoryVectorStore() + + +@pytest.fixture +def upserting_vector_store() -> InMemoryVectorStore: + """Vector store fixture.""" + return InMemoryVectorStore(permit_upserts=True) + + +def test_indexing_same_content( + record_manager: InMemoryRecordManager, vector_store: InMemoryVectorStore +) -> None: + """Indexing some content to confirm it gets added only once.""" + loader = ToyLoader( + documents=[ + Document( + page_content="This is a test document.", + ), + Document( + page_content="This is another document.", + ), + ] + ) + + assert index(loader, record_manager, vector_store) == { + "num_added": 2, + "num_deleted": 0, + "num_skipped": 0, + "num_updated": 0, + } + + assert len(list(vector_store.store)) == 2 + + for _ in range(2): + # Run the indexing again + assert index(loader, record_manager, vector_store) == { + "num_added": 0, + "num_deleted": 0, + "num_skipped": 2, + "num_updated": 0, + } + + +async def test_aindexing_same_content( + arecord_manager: InMemoryRecordManager, vector_store: InMemoryVectorStore +) -> None: + """Indexing some content to confirm it gets added only once.""" + loader = ToyLoader( + documents=[ + Document( + page_content="This is a test document.", + ), + Document( + page_content="This is another document.", + ), + ] + ) + + assert await aindex(loader, arecord_manager, vector_store) == { + "num_added": 2, + "num_deleted": 0, + "num_skipped": 0, + "num_updated": 0, + } + + assert len(list(vector_store.store)) == 2 + + for _ in range(2): + # Run the indexing again + assert await aindex(loader, arecord_manager, vector_store) == { + "num_added": 0, + "num_deleted": 0, + "num_skipped": 2, + "num_updated": 0, + } + + +def test_index_simple_delete_full( + record_manager: InMemoryRecordManager, vector_store: InMemoryVectorStore +) -> None: + """Indexing some content to confirm it gets added only once.""" + loader = ToyLoader( + documents=[ + Document( + page_content="This is a test document.", + ), + Document( + page_content="This is another document.", + ), + ] + ) + + with patch.object( + record_manager, "get_time", return_value=datetime(2021, 1, 1).timestamp() + ): + assert index(loader, record_manager, vector_store, cleanup="full") == { + "num_added": 2, + "num_deleted": 0, + "num_skipped": 0, + "num_updated": 0, + } + + with patch.object( + record_manager, "get_time", return_value=datetime(2021, 1, 1).timestamp() + ): + assert index(loader, record_manager, vector_store, cleanup="full") == { + "num_added": 0, + "num_deleted": 0, + "num_skipped": 2, + "num_updated": 0, + } + + loader = ToyLoader( + documents=[ + Document( + page_content="mutated document 1", + ), + Document( + page_content="This is another document.", # <-- Same as original + ), + ] + ) + + with patch.object( + record_manager, "get_time", return_value=datetime(2021, 1, 2).timestamp() + ): + indexing_result = index(loader, record_manager, vector_store, cleanup="full") + + doc_texts = set( + # Ignoring type since doc should be in the store and not a None + vector_store.store.get(uid).page_content # type: ignore + for uid in vector_store.store + ) + assert doc_texts == {"mutated document 1", "This is another document."} + + assert indexing_result == { + "num_added": 1, + "num_deleted": 1, + "num_skipped": 1, + "num_updated": 0, + } + + # Attempt to index again verify that nothing changes + with patch.object( + record_manager, "get_time", return_value=datetime(2021, 1, 2).timestamp() + ): + assert index(loader, record_manager, vector_store, cleanup="full") == { + "num_added": 0, + "num_deleted": 0, + "num_skipped": 2, + "num_updated": 0, + } + + +async def test_aindex_simple_delete_full( + arecord_manager: InMemoryRecordManager, vector_store: InMemoryVectorStore +) -> None: + """Indexing some content to confirm it gets added only once.""" + loader = ToyLoader( + documents=[ + Document( + page_content="This is a test document.", + ), + Document( + page_content="This is another document.", + ), + ] + ) + + with patch.object( + arecord_manager, "get_time", return_value=datetime(2021, 1, 1).timestamp() + ): + assert await aindex(loader, arecord_manager, vector_store, cleanup="full") == { + "num_added": 2, + "num_deleted": 0, + "num_skipped": 0, + "num_updated": 0, + } + + with patch.object( + arecord_manager, "get_time", return_value=datetime(2021, 1, 1).timestamp() + ): + assert await aindex(loader, arecord_manager, vector_store, cleanup="full") == { + "num_added": 0, + "num_deleted": 0, + "num_skipped": 2, + "num_updated": 0, + } + + loader = ToyLoader( + documents=[ + Document( + page_content="mutated document 1", + ), + Document( + page_content="This is another document.", # <-- Same as original + ), + ] + ) + + with patch.object( + arecord_manager, "get_time", return_value=datetime(2021, 1, 2).timestamp() + ): + assert await aindex(loader, arecord_manager, vector_store, cleanup="full") == { + "num_added": 1, + "num_deleted": 1, + "num_skipped": 1, + "num_updated": 0, + } + + doc_texts = set( + # Ignoring type since doc should be in the store and not a None + vector_store.store.get(uid).page_content # type: ignore + for uid in vector_store.store + ) + assert doc_texts == {"mutated document 1", "This is another document."} + + # Attempt to index again verify that nothing changes + with patch.object( + arecord_manager, "get_time", return_value=datetime(2021, 1, 2).timestamp() + ): + assert await aindex(loader, arecord_manager, vector_store, cleanup="full") == { + "num_added": 0, + "num_deleted": 0, + "num_skipped": 2, + "num_updated": 0, + } + + +def test_incremental_fails_with_bad_source_ids( + record_manager: InMemoryRecordManager, vector_store: InMemoryVectorStore +) -> None: + """Test indexing with incremental deletion strategy.""" + loader = ToyLoader( + documents=[ + Document( + page_content="This is a test document.", + metadata={"source": "1"}, + ), + Document( + page_content="This is another document.", + metadata={"source": "2"}, + ), + Document( + page_content="This is yet another document.", + metadata={"source": None}, + ), + ] + ) + + with pytest.raises(ValueError): + # Should raise an error because no source id function was specified + index(loader, record_manager, vector_store, cleanup="incremental") + + with pytest.raises(ValueError): + # Should raise an error because no source id function was specified + index( + loader, + record_manager, + vector_store, + cleanup="incremental", + source_id_key="source", + ) + + +async def test_aincremental_fails_with_bad_source_ids( + arecord_manager: InMemoryRecordManager, vector_store: InMemoryVectorStore +) -> None: + """Test indexing with incremental deletion strategy.""" + loader = ToyLoader( + documents=[ + Document( + page_content="This is a test document.", + metadata={"source": "1"}, + ), + Document( + page_content="This is another document.", + metadata={"source": "2"}, + ), + Document( + page_content="This is yet another document.", + metadata={"source": None}, + ), + ] + ) + + with pytest.raises(ValueError): + # Should raise an error because no source id function was specified + await aindex( + loader, + arecord_manager, + vector_store, + cleanup="incremental", + ) + + with pytest.raises(ValueError): + # Should raise an error because no source id function was specified + await aindex( + loader, + arecord_manager, + vector_store, + cleanup="incremental", + source_id_key="source", + ) + + +def test_no_delete( + record_manager: InMemoryRecordManager, vector_store: InMemoryVectorStore +) -> None: + """Test indexing without a deletion strategy.""" + loader = ToyLoader( + documents=[ + Document( + page_content="This is a test document.", + metadata={"source": "1"}, + ), + Document( + page_content="This is another document.", + metadata={"source": "2"}, + ), + ] + ) + + with patch.object( + record_manager, "get_time", return_value=datetime(2021, 1, 2).timestamp() + ): + assert index( + loader, + record_manager, + vector_store, + cleanup=None, + source_id_key="source", + ) == { + "num_added": 2, + "num_deleted": 0, + "num_skipped": 0, + "num_updated": 0, + } + + # If we add the same content twice it should be skipped + with patch.object( + record_manager, "get_time", return_value=datetime(2021, 1, 2).timestamp() + ): + assert index( + loader, + record_manager, + vector_store, + cleanup=None, + source_id_key="source", + ) == { + "num_added": 0, + "num_deleted": 0, + "num_skipped": 2, + "num_updated": 0, + } + + loader = ToyLoader( + documents=[ + Document( + page_content="mutated content", + metadata={"source": "1"}, + ), + Document( + page_content="This is another document.", + metadata={"source": "2"}, + ), + ] + ) + + # Should result in no updates or deletions! + with patch.object( + record_manager, "get_time", return_value=datetime(2021, 1, 2).timestamp() + ): + assert index( + loader, + record_manager, + vector_store, + cleanup=None, + source_id_key="source", + ) == { + "num_added": 1, + "num_deleted": 0, + "num_skipped": 1, + "num_updated": 0, + } + + +async def test_ano_delete( + arecord_manager: InMemoryRecordManager, vector_store: InMemoryVectorStore +) -> None: + """Test indexing without a deletion strategy.""" + loader = ToyLoader( + documents=[ + Document( + page_content="This is a test document.", + metadata={"source": "1"}, + ), + Document( + page_content="This is another document.", + metadata={"source": "2"}, + ), + ] + ) + + with patch.object( + arecord_manager, "get_time", return_value=datetime(2021, 1, 2).timestamp() + ): + assert await aindex( + loader, + arecord_manager, + vector_store, + cleanup=None, + source_id_key="source", + ) == { + "num_added": 2, + "num_deleted": 0, + "num_skipped": 0, + "num_updated": 0, + } + + # If we add the same content twice it should be skipped + with patch.object( + arecord_manager, "get_time", return_value=datetime(2021, 1, 2).timestamp() + ): + assert await aindex( + loader, + arecord_manager, + vector_store, + cleanup=None, + source_id_key="source", + ) == { + "num_added": 0, + "num_deleted": 0, + "num_skipped": 2, + "num_updated": 0, + } + + loader = ToyLoader( + documents=[ + Document( + page_content="mutated content", + metadata={"source": "1"}, + ), + Document( + page_content="This is another document.", + metadata={"source": "2"}, + ), + ] + ) + + # Should result in no updates or deletions! + with patch.object( + arecord_manager, "get_time", return_value=datetime(2021, 1, 2).timestamp() + ): + assert await aindex( + loader, + arecord_manager, + vector_store, + cleanup=None, + source_id_key="source", + ) == { + "num_added": 1, + "num_deleted": 0, + "num_skipped": 1, + "num_updated": 0, + } + + +def test_incremental_delete( + record_manager: InMemoryRecordManager, vector_store: InMemoryVectorStore +) -> None: + """Test indexing with incremental deletion strategy.""" + loader = ToyLoader( + documents=[ + Document( + page_content="This is a test document.", + metadata={"source": "1"}, + ), + Document( + page_content="This is another document.", + metadata={"source": "2"}, + ), + ] + ) + + with patch.object( + record_manager, "get_time", return_value=datetime(2021, 1, 2).timestamp() + ): + assert index( + loader, + record_manager, + vector_store, + cleanup="incremental", + source_id_key="source", + ) == { + "num_added": 2, + "num_deleted": 0, + "num_skipped": 0, + "num_updated": 0, + } + + doc_texts = set( + # Ignoring type since doc should be in the store and not a None + vector_store.store.get(uid).page_content # type: ignore + for uid in vector_store.store + ) + assert doc_texts == {"This is another document.", "This is a test document."} + + # Attempt to index again verify that nothing changes + with patch.object( + record_manager, "get_time", return_value=datetime(2021, 1, 2).timestamp() + ): + assert index( + loader, + record_manager, + vector_store, + cleanup="incremental", + source_id_key="source", + ) == { + "num_added": 0, + "num_deleted": 0, + "num_skipped": 2, + "num_updated": 0, + } + + # Create 2 documents from the same source all with mutated content + loader = ToyLoader( + documents=[ + Document( + page_content="mutated document 1", + metadata={"source": "1"}, + ), + Document( + page_content="mutated document 2", + metadata={"source": "1"}, + ), + Document( + page_content="This is another document.", # <-- Same as original + metadata={"source": "2"}, + ), + ] + ) + + # Attempt to index again verify that nothing changes + with patch.object( + record_manager, "get_time", return_value=datetime(2021, 1, 3).timestamp() + ): + assert index( + loader, + record_manager, + vector_store, + cleanup="incremental", + source_id_key="source", + ) == { + "num_added": 2, + "num_deleted": 1, + "num_skipped": 1, + "num_updated": 0, + } + + doc_texts = set( + # Ignoring type since doc should be in the store and not a None + vector_store.store.get(uid).page_content # type: ignore + for uid in vector_store.store + ) + assert doc_texts == { + "mutated document 1", + "mutated document 2", + "This is another document.", + } + + +def test_incremental_indexing_with_batch_size( + record_manager: InMemoryRecordManager, vector_store: InMemoryVectorStore +) -> None: + """Test indexing with incremental indexing""" + loader = ToyLoader( + documents=[ + Document( + page_content="1", + metadata={"source": "1"}, + ), + Document( + page_content="2", + metadata={"source": "1"}, + ), + Document( + page_content="3", + metadata={"source": "1"}, + ), + Document( + page_content="4", + metadata={"source": "1"}, + ), + ] + ) + + with patch.object( + record_manager, "get_time", return_value=datetime(2021, 1, 2).timestamp() + ): + assert index( + loader, + record_manager, + vector_store, + cleanup="incremental", + source_id_key="source", + batch_size=2, + ) == { + "num_added": 4, + "num_deleted": 0, + "num_skipped": 0, + "num_updated": 0, + } + + assert index( + loader, + record_manager, + vector_store, + cleanup="incremental", + source_id_key="source", + batch_size=2, + ) == { + "num_added": 0, + "num_deleted": 0, + "num_skipped": 4, + "num_updated": 0, + } + + doc_texts = set( + # Ignoring type since doc should be in the store and not a None + vector_store.store.get(uid).page_content # type: ignore + for uid in vector_store.store + ) + assert doc_texts == {"1", "2", "3", "4"} + + +def test_incremental_delete_with_batch_size( + record_manager: InMemoryRecordManager, vector_store: InMemoryVectorStore +) -> None: + """Test indexing with incremental deletion strategy and batch size.""" + loader = ToyLoader( + documents=[ + Document( + page_content="1", + metadata={"source": "1"}, + ), + Document( + page_content="2", + metadata={"source": "2"}, + ), + Document( + page_content="3", + metadata={"source": "3"}, + ), + Document( + page_content="4", + metadata={"source": "4"}, + ), + ] + ) + + with patch.object( + record_manager, "get_time", return_value=datetime(2021, 1, 2).timestamp() + ): + assert index( + loader, + record_manager, + vector_store, + cleanup="incremental", + source_id_key="source", + batch_size=3, + ) == { + "num_added": 4, + "num_deleted": 0, + "num_skipped": 0, + "num_updated": 0, + } + + doc_texts = set( + # Ignoring type since doc should be in the store and not a None + vector_store.store.get(uid).page_content # type: ignore + for uid in vector_store.store + ) + assert doc_texts == {"1", "2", "3", "4"} + + # Attempt to index again verify that nothing changes + with patch.object( + record_manager, "get_time", return_value=datetime(2021, 1, 2).timestamp() + ): + assert index( + loader, + record_manager, + vector_store, + cleanup="incremental", + source_id_key="source", + batch_size=3, + ) == { + "num_added": 0, + "num_deleted": 0, + "num_skipped": 4, + "num_updated": 0, + } + + # Attempt to index again verify that nothing changes + with patch.object( + record_manager, "get_time", return_value=datetime(2022, 1, 3).timestamp() + ): + # Docs with same content + docs = [ + Document( + page_content="1", + metadata={"source": "1"}, + ), + Document( + page_content="2", + metadata={"source": "2"}, + ), + ] + assert index( + docs, + record_manager, + vector_store, + cleanup="incremental", + source_id_key="source", + batch_size=1, + ) == { + "num_added": 0, + "num_deleted": 0, + "num_skipped": 2, + "num_updated": 0, + } + + # Attempt to index again verify that nothing changes + with patch.object( + record_manager, "get_time", return_value=datetime(2023, 1, 3).timestamp() + ): + # Docs with same content + docs = [ + Document( + page_content="1", + metadata={"source": "1"}, + ), + Document( + page_content="2", + metadata={"source": "2"}, + ), + ] + assert index( + docs, + record_manager, + vector_store, + cleanup="incremental", + source_id_key="source", + batch_size=1, + ) == { + "num_added": 0, + "num_deleted": 0, + "num_skipped": 2, + "num_updated": 0, + } + + # Try to index with changed docs now + with patch.object( + record_manager, "get_time", return_value=datetime(2024, 1, 3).timestamp() + ): + # Docs with same content + docs = [ + Document( + page_content="changed 1", + metadata={"source": "1"}, + ), + Document( + page_content="changed 2", + metadata={"source": "2"}, + ), + ] + assert index( + docs, + record_manager, + vector_store, + cleanup="incremental", + source_id_key="source", + ) == { + "num_added": 2, + "num_deleted": 2, + "num_skipped": 0, + "num_updated": 0, + } + + +async def test_aincremental_delete( + arecord_manager: InMemoryRecordManager, vector_store: InMemoryVectorStore +) -> None: + """Test indexing with incremental deletion strategy.""" + loader = ToyLoader( + documents=[ + Document( + page_content="This is a test document.", + metadata={"source": "1"}, + ), + Document( + page_content="This is another document.", + metadata={"source": "2"}, + ), + ] + ) + + with patch.object( + arecord_manager, "get_time", return_value=datetime(2021, 1, 2).timestamp() + ): + assert await aindex( + loader.lazy_load(), + arecord_manager, + vector_store, + cleanup="incremental", + source_id_key="source", + ) == { + "num_added": 2, + "num_deleted": 0, + "num_skipped": 0, + "num_updated": 0, + } + + doc_texts = set( + # Ignoring type since doc should be in the store and not a None + vector_store.store.get(uid).page_content # type: ignore + for uid in vector_store.store + ) + assert doc_texts == {"This is another document.", "This is a test document."} + + # Attempt to index again verify that nothing changes + with patch.object( + arecord_manager, "get_time", return_value=datetime(2021, 1, 2).timestamp() + ): + assert await aindex( + loader.lazy_load(), + arecord_manager, + vector_store, + cleanup="incremental", + source_id_key="source", + ) == { + "num_added": 0, + "num_deleted": 0, + "num_skipped": 2, + "num_updated": 0, + } + + # Create 2 documents from the same source all with mutated content + loader = ToyLoader( + documents=[ + Document( + page_content="mutated document 1", + metadata={"source": "1"}, + ), + Document( + page_content="mutated document 2", + metadata={"source": "1"}, + ), + Document( + page_content="This is another document.", # <-- Same as original + metadata={"source": "2"}, + ), + ] + ) + + # Attempt to index again verify that nothing changes + with patch.object( + arecord_manager, "get_time", return_value=datetime(2021, 1, 3).timestamp() + ): + assert await aindex( + loader.lazy_load(), + arecord_manager, + vector_store, + cleanup="incremental", + source_id_key="source", + ) == { + "num_added": 2, + "num_deleted": 1, + "num_skipped": 1, + "num_updated": 0, + } + + doc_texts = set( + # Ignoring type since doc should be in the store and not a None + vector_store.store.get(uid).page_content # type: ignore + for uid in vector_store.store + ) + assert doc_texts == { + "mutated document 1", + "mutated document 2", + "This is another document.", + } + + +def test_indexing_with_no_docs( + record_manager: InMemoryRecordManager, vector_store: VectorStore +) -> None: + """Check edge case when loader returns no new docs.""" + loader = ToyLoader(documents=[]) + + assert index(loader, record_manager, vector_store, cleanup="full") == { + "num_added": 0, + "num_deleted": 0, + "num_skipped": 0, + "num_updated": 0, + } + + +async def test_aindexing_with_no_docs( + arecord_manager: InMemoryRecordManager, vector_store: VectorStore +) -> None: + """Check edge case when loader returns no new docs.""" + loader = ToyLoader(documents=[]) + + assert await aindex(loader, arecord_manager, vector_store, cleanup="full") == { + "num_added": 0, + "num_deleted": 0, + "num_skipped": 0, + "num_updated": 0, + } + + +def test_deduplication( + record_manager: InMemoryRecordManager, vector_store: VectorStore +) -> None: + """Check edge case when loader returns no new docs.""" + docs = [ + Document( + page_content="This is a test document.", + metadata={"source": "1"}, + ), + Document( + page_content="This is a test document.", + metadata={"source": "1"}, + ), + ] + + # Should result in only a single document being added + assert index(docs, record_manager, vector_store, cleanup="full") == { + "num_added": 1, + "num_deleted": 0, + "num_skipped": 0, + "num_updated": 0, + } + + +async def test_adeduplication( + arecord_manager: InMemoryRecordManager, vector_store: VectorStore +) -> None: + """Check edge case when loader returns no new docs.""" + docs = [ + Document( + page_content="This is a test document.", + metadata={"source": "1"}, + ), + Document( + page_content="This is a test document.", + metadata={"source": "1"}, + ), + ] + + # Should result in only a single document being added + assert await aindex(docs, arecord_manager, vector_store, cleanup="full") == { + "num_added": 1, + "num_deleted": 0, + "num_skipped": 0, + "num_updated": 0, + } + + +def test_cleanup_with_different_batchsize( + record_manager: InMemoryRecordManager, vector_store: VectorStore +) -> None: + """Check that we can clean up with different batch size.""" + docs = [ + Document( + page_content="This is a test document.", + metadata={"source": str(d)}, + ) + for d in range(1000) + ] + + assert index(docs, record_manager, vector_store, cleanup="full") == { + "num_added": 1000, + "num_deleted": 0, + "num_skipped": 0, + "num_updated": 0, + } + + docs = [ + Document( + page_content="Different doc", + metadata={"source": str(d)}, + ) + for d in range(1001) + ] + + assert index( + docs, record_manager, vector_store, cleanup="full", cleanup_batch_size=17 + ) == { + "num_added": 1001, + "num_deleted": 1000, + "num_skipped": 0, + "num_updated": 0, + } + + +async def test_async_cleanup_with_different_batchsize( + arecord_manager: InMemoryRecordManager, vector_store: InMemoryVectorStore +) -> None: + """Check that we can clean up with different batch size.""" + docs = [ + Document( + page_content="This is a test document.", + metadata={"source": str(d)}, + ) + for d in range(1000) + ] + + assert await aindex(docs, arecord_manager, vector_store, cleanup="full") == { + "num_added": 1000, + "num_deleted": 0, + "num_skipped": 0, + "num_updated": 0, + } + + docs = [ + Document( + page_content="Different doc", + metadata={"source": str(d)}, + ) + for d in range(1001) + ] + + assert await aindex( + docs, arecord_manager, vector_store, cleanup="full", cleanup_batch_size=17 + ) == { + "num_added": 1001, + "num_deleted": 1000, + "num_skipped": 0, + "num_updated": 0, + } + + +def test_deduplication_v2( + record_manager: InMemoryRecordManager, vector_store: VectorStore +) -> None: + """Check edge case when loader returns no new docs.""" + docs = [ + Document( + page_content="1", + metadata={"source": "1"}, + ), + Document( + page_content="1", + metadata={"source": "1"}, + ), + Document( + page_content="2", + metadata={"source": "2"}, + ), + Document( + page_content="3", + metadata={"source": "3"}, + ), + ] + + assert index(docs, record_manager, vector_store, cleanup="full") == { + "num_added": 3, + "num_deleted": 0, + "num_skipped": 0, + "num_updated": 0, + } + + # using in memory implementation here + assert isinstance(vector_store, InMemoryVectorStore) + contents = sorted( + [document.page_content for document in vector_store.store.values()] + ) + assert contents == ["1", "2", "3"] + + +async def _to_async_iter(it: Iterable[Any]) -> AsyncIterator[Any]: + """Convert an iterable to an async iterator.""" + for i in it: + yield i + + +async def test_abatch() -> None: + """Test the abatch function.""" + batches = _abatch(5, _to_async_iter(range(12))) + assert isinstance(batches, AsyncIterator) + assert [batch async for batch in batches] == [ + [0, 1, 2, 3, 4], + [5, 6, 7, 8, 9], + [10, 11], + ] + + batches = _abatch(1, _to_async_iter(range(3))) + assert isinstance(batches, AsyncIterator) + assert [batch async for batch in batches] == [[0], [1], [2]] + + batches = _abatch(2, _to_async_iter(range(5))) + assert isinstance(batches, AsyncIterator) + assert [batch async for batch in batches] == [[0, 1], [2, 3], [4]] + + +def test_indexing_force_update( + record_manager: InMemoryRecordManager, upserting_vector_store: VectorStore +) -> None: + """Test indexing with force update.""" + docs = [ + Document( + page_content="This is a test document.", + metadata={"source": "1"}, + ), + Document( + page_content="This is another document.", + metadata={"source": "2"}, + ), + Document( + page_content="This is a test document.", + metadata={"source": "1"}, + ), + ] + + assert index(docs, record_manager, upserting_vector_store, cleanup="full") == { + "num_added": 2, + "num_deleted": 0, + "num_skipped": 0, + "num_updated": 0, + } + + assert index(docs, record_manager, upserting_vector_store, cleanup="full") == { + "num_added": 0, + "num_deleted": 0, + "num_skipped": 2, + "num_updated": 0, + } + + assert index( + docs, record_manager, upserting_vector_store, cleanup="full", force_update=True + ) == { + "num_added": 0, + "num_deleted": 0, + "num_skipped": 0, + "num_updated": 2, + } + + +async def test_aindexing_force_update( + arecord_manager: InMemoryRecordManager, upserting_vector_store: VectorStore +) -> None: + """Test indexing with force update.""" + docs = [ + Document( + page_content="This is a test document.", + metadata={"source": "1"}, + ), + Document( + page_content="This is another document.", + metadata={"source": "2"}, + ), + Document( + page_content="This is a test document.", + metadata={"source": "1"}, + ), + ] + + assert await aindex( + docs, arecord_manager, upserting_vector_store, cleanup="full" + ) == { + "num_added": 2, + "num_deleted": 0, + "num_skipped": 0, + "num_updated": 0, + } + + assert await aindex( + docs, arecord_manager, upserting_vector_store, cleanup="full" + ) == { + "num_added": 0, + "num_deleted": 0, + "num_skipped": 2, + "num_updated": 0, + } + + assert await aindex( + docs, + arecord_manager, + upserting_vector_store, + cleanup="full", + force_update=True, + ) == { + "num_added": 0, + "num_deleted": 0, + "num_skipped": 0, + "num_updated": 2, + } + + +def test_indexing_custom_batch_size( + record_manager: InMemoryRecordManager, vector_store: InMemoryVectorStore +) -> None: + """Test indexing with a custom batch size.""" + docs = [ + Document( + page_content="This is a test document.", + metadata={"source": "1"}, + ), + ] + ids = [_HashedDocument.from_document(doc).uid for doc in docs] + + batch_size = 1 + with patch.object(vector_store, "add_documents") as mock_add_documents: + index(docs, record_manager, vector_store, batch_size=batch_size) + args, kwargs = mock_add_documents.call_args + assert args == (docs,) + assert kwargs == {"ids": ids, "batch_size": batch_size} + + +async def test_aindexing_custom_batch_size( + arecord_manager: InMemoryRecordManager, vector_store: InMemoryVectorStore +) -> None: + """Test indexing with a custom batch size.""" + docs = [ + Document( + page_content="This is a test document.", + metadata={"source": "1"}, + ), + ] + ids = [_HashedDocument.from_document(doc).uid for doc in docs] + + batch_size = 1 + with patch.object(vector_store, "aadd_documents") as mock_add_documents: + await aindex(docs, arecord_manager, vector_store, batch_size=batch_size) + args, kwargs = mock_add_documents.call_args + assert args == (docs,) + assert kwargs == {"ids": ids, "batch_size": batch_size} diff --git a/libs/core/tests/unit_tests/indexing/test_public_api.py b/libs/core/tests/unit_tests/indexing/test_public_api.py new file mode 100644 index 0000000000000..8c0b367dd8537 --- /dev/null +++ b/libs/core/tests/unit_tests/indexing/test_public_api.py @@ -0,0 +1,12 @@ +from langchain_core.indexing import __all__ + + +def test_all() -> None: + """Use to catch obvious breaking changes.""" + assert __all__ == sorted(__all__, key=str.lower) + assert __all__ == [ + "aindex", + "index", + "IndexingResult", + "RecordManager", + ] diff --git a/libs/langchain/langchain/indexes/__init__.py b/libs/langchain/langchain/indexes/__init__.py index 02e766b02b39b..f6c9c5d535ade 100644 --- a/libs/langchain/langchain/indexes/__init__.py +++ b/libs/langchain/langchain/indexes/__init__.py @@ -11,7 +11,8 @@ via a set of transformations from some source content (e.g., indexing children documents that were derived from parent documents by chunking.) """ -from langchain.indexes._api import IndexingResult, aindex, index +from langchain_core.indexing.api import IndexingResult, aindex, index + from langchain.indexes._sql_record_manager import SQLRecordManager from langchain.indexes.graph import GraphIndexCreator from langchain.indexes.vectorstore import VectorstoreIndexCreator diff --git a/libs/langchain/langchain/indexes/_api.py b/libs/langchain/langchain/indexes/_api.py index f41a221795abf..d5919af972b38 100644 --- a/libs/langchain/langchain/indexes/_api.py +++ b/libs/langchain/langchain/indexes/_api.py @@ -1,600 +1,5 @@ -"""Module contains logic for indexing documents into vector stores.""" -from __future__ import annotations +from langchain_core.indexing.api import _abatch, _batch, _HashedDocument -import hashlib -import json -import uuid -from itertools import islice -from typing import ( - Any, - AsyncIterable, - AsyncIterator, - Callable, - Dict, - Iterable, - Iterator, - List, - Literal, - Optional, - Sequence, - Set, - TypedDict, - TypeVar, - Union, - cast, -) - -from langchain_community.document_loaders.base import BaseLoader -from langchain_core.documents import Document -from langchain_core.pydantic_v1 import root_validator -from langchain_core.vectorstores import VectorStore - -from langchain.indexes.base import NAMESPACE_UUID, RecordManager - -T = TypeVar("T") - - -def _hash_string_to_uuid(input_string: str) -> uuid.UUID: - """Hashes a string and returns the corresponding UUID.""" - hash_value = hashlib.sha1(input_string.encode("utf-8")).hexdigest() - return uuid.uuid5(NAMESPACE_UUID, hash_value) - - -def _hash_nested_dict_to_uuid(data: dict[Any, Any]) -> uuid.UUID: - """Hashes a nested dictionary and returns the corresponding UUID.""" - serialized_data = json.dumps(data, sort_keys=True) - hash_value = hashlib.sha1(serialized_data.encode("utf-8")).hexdigest() - return uuid.uuid5(NAMESPACE_UUID, hash_value) - - -class _HashedDocument(Document): - """A hashed document with a unique ID.""" - - uid: str - hash_: str - """The hash of the document including content and metadata.""" - content_hash: str - """The hash of the document content.""" - metadata_hash: str - """The hash of the document metadata.""" - - @classmethod - def is_lc_serializable(cls) -> bool: - return False - - @root_validator(pre=True) - def calculate_hashes(cls, values: Dict[str, Any]) -> Dict[str, Any]: - """Root validator to calculate content and metadata hash.""" - content = values.get("page_content", "") - metadata = values.get("metadata", {}) - - forbidden_keys = ("hash_", "content_hash", "metadata_hash") - - for key in forbidden_keys: - if key in metadata: - raise ValueError( - f"Metadata cannot contain key {key} as it " - f"is reserved for internal use." - ) - - content_hash = str(_hash_string_to_uuid(content)) - - try: - metadata_hash = str(_hash_nested_dict_to_uuid(metadata)) - except Exception as e: - raise ValueError( - f"Failed to hash metadata: {e}. " - f"Please use a dict that can be serialized using json." - ) - - values["content_hash"] = content_hash - values["metadata_hash"] = metadata_hash - values["hash_"] = str(_hash_string_to_uuid(content_hash + metadata_hash)) - - _uid = values.get("uid", None) - - if _uid is None: - values["uid"] = values["hash_"] - return values - - def to_document(self) -> Document: - """Return a Document object.""" - return Document( - page_content=self.page_content, - metadata=self.metadata, - ) - - @classmethod - def from_document( - cls, document: Document, *, uid: Optional[str] = None - ) -> _HashedDocument: - """Create a HashedDocument from a Document.""" - return cls( # type: ignore[call-arg] - uid=uid, # type: ignore[arg-type] - page_content=document.page_content, - metadata=document.metadata, - ) - - -def _batch(size: int, iterable: Iterable[T]) -> Iterator[List[T]]: - """Utility batching function.""" - it = iter(iterable) - while True: - chunk = list(islice(it, size)) - if not chunk: - return - yield chunk - - -async def _abatch(size: int, iterable: AsyncIterable[T]) -> AsyncIterator[List[T]]: - """Utility batching function.""" - batch: List[T] = [] - async for element in iterable: - if len(batch) < size: - batch.append(element) - - if len(batch) >= size: - yield batch - batch = [] - - if batch: - yield batch - - -def _get_source_id_assigner( - source_id_key: Union[str, Callable[[Document], str], None], -) -> Callable[[Document], Union[str, None]]: - """Get the source id from the document.""" - if source_id_key is None: - return lambda doc: None - elif isinstance(source_id_key, str): - return lambda doc: doc.metadata[source_id_key] - elif callable(source_id_key): - return source_id_key - else: - raise ValueError( - f"source_id_key should be either None, a string or a callable. " - f"Got {source_id_key} of type {type(source_id_key)}." - ) - - -def _deduplicate_in_order( - hashed_documents: Iterable[_HashedDocument], -) -> Iterator[_HashedDocument]: - """Deduplicate a list of hashed documents while preserving order.""" - seen: Set[str] = set() - - for hashed_doc in hashed_documents: - if hashed_doc.hash_ not in seen: - seen.add(hashed_doc.hash_) - yield hashed_doc - - -# PUBLIC API - - -class IndexingResult(TypedDict): - """Return a detailed a breakdown of the result of the indexing operation.""" - - num_added: int - """Number of added documents.""" - num_updated: int - """Number of updated documents because they were not up to date.""" - num_deleted: int - """Number of deleted documents.""" - num_skipped: int - """Number of skipped documents because they were already up to date.""" - - -def index( - docs_source: Union[BaseLoader, Iterable[Document]], - record_manager: RecordManager, - vector_store: VectorStore, - *, - batch_size: int = 100, - cleanup: Literal["incremental", "full", None] = None, - source_id_key: Union[str, Callable[[Document], str], None] = None, - cleanup_batch_size: int = 1_000, - force_update: bool = False, -) -> IndexingResult: - """Index data from the loader into the vector store. - - Indexing functionality uses a manager to keep track of which documents - are in the vector store. - - This allows us to keep track of which documents were updated, and which - documents were deleted, which documents should be skipped. - - For the time being, documents are indexed using their hashes, and users - are not able to specify the uid of the document. - - IMPORTANT: - if auto_cleanup is set to True, the loader should be returning - the entire dataset, and not just a subset of the dataset. - Otherwise, the auto_cleanup will remove documents that it is not - supposed to. - - Args: - docs_source: Data loader or iterable of documents to index. - record_manager: Timestamped set to keep track of which documents were - updated. - vector_store: Vector store to index the documents into. - batch_size: Batch size to use when indexing. - cleanup: How to handle clean up of documents. - - Incremental: Cleans up all documents that haven't been updated AND - that are associated with source ids that were seen - during indexing. - Clean up is done continuously during indexing helping - to minimize the probability of users seeing duplicated - content. - - Full: Delete all documents that haven to been returned by the loader. - Clean up runs after all documents have been indexed. - This means that users may see duplicated content during indexing. - - None: Do not delete any documents. - source_id_key: Optional key that helps identify the original source - of the document. - cleanup_batch_size: Batch size to use when cleaning up documents. - force_update: Force update documents even if they are present in the - record manager. Useful if you are re-indexing with updated embeddings. - - Returns: - Indexing result which contains information about how many documents - were added, updated, deleted, or skipped. - """ - if cleanup not in {"incremental", "full", None}: - raise ValueError( - f"cleanup should be one of 'incremental', 'full' or None. " - f"Got {cleanup}." - ) - - if cleanup == "incremental" and source_id_key is None: - raise ValueError("Source id key is required when cleanup mode is incremental.") - - # Check that the Vectorstore has required methods implemented - methods = ["delete", "add_documents"] - - for method in methods: - if not hasattr(vector_store, method): - raise ValueError( - f"Vectorstore {vector_store} does not have required method {method}" - ) - - if type(vector_store).delete == VectorStore.delete: - # Checking if the vectorstore has overridden the default delete method - # implementation which just raises a NotImplementedError - raise ValueError("Vectorstore has not implemented the delete method") - - if isinstance(docs_source, BaseLoader): - try: - doc_iterator = docs_source.lazy_load() - except NotImplementedError: - doc_iterator = iter(docs_source.load()) - else: - doc_iterator = iter(docs_source) - - source_id_assigner = _get_source_id_assigner(source_id_key) - - # Mark when the update started. - index_start_dt = record_manager.get_time() - num_added = 0 - num_skipped = 0 - num_updated = 0 - num_deleted = 0 - - for doc_batch in _batch(batch_size, doc_iterator): - hashed_docs = list( - _deduplicate_in_order( - [_HashedDocument.from_document(doc) for doc in doc_batch] - ) - ) - - source_ids: Sequence[Optional[str]] = [ - source_id_assigner(doc) for doc in hashed_docs - ] - - if cleanup == "incremental": - # If the cleanup mode is incremental, source ids are required. - for source_id, hashed_doc in zip(source_ids, hashed_docs): - if source_id is None: - raise ValueError( - "Source ids are required when cleanup mode is incremental. " - f"Document that starts with " - f"content: {hashed_doc.page_content[:100]} was not assigned " - f"as source id." - ) - # source ids cannot be None after for loop above. - source_ids = cast(Sequence[str], source_ids) # type: ignore[assignment] - - exists_batch = record_manager.exists([doc.uid for doc in hashed_docs]) - - # Filter out documents that already exist in the record store. - uids = [] - docs_to_index = [] - uids_to_refresh = [] - seen_docs: Set[str] = set() - for hashed_doc, doc_exists in zip(hashed_docs, exists_batch): - if doc_exists: - if force_update: - seen_docs.add(hashed_doc.uid) - else: - uids_to_refresh.append(hashed_doc.uid) - continue - uids.append(hashed_doc.uid) - docs_to_index.append(hashed_doc.to_document()) - - # Update refresh timestamp - if uids_to_refresh: - record_manager.update(uids_to_refresh, time_at_least=index_start_dt) - num_skipped += len(uids_to_refresh) - - # Be pessimistic and assume that all vector store write will fail. - # First write to vector store - if docs_to_index: - vector_store.add_documents(docs_to_index, ids=uids, batch_size=batch_size) - num_added += len(docs_to_index) - len(seen_docs) - num_updated += len(seen_docs) - - # And only then update the record store. - # Update ALL records, even if they already exist since we want to refresh - # their timestamp. - record_manager.update( - [doc.uid for doc in hashed_docs], - group_ids=source_ids, - time_at_least=index_start_dt, - ) - - # If source IDs are provided, we can do the deletion incrementally! - if cleanup == "incremental": - # Get the uids of the documents that were not returned by the loader. - - # mypy isn't good enough to determine that source ids cannot be None - # here due to a check that's happening above, so we check again. - for source_id in source_ids: - if source_id is None: - raise AssertionError("Source ids cannot be None here.") - - _source_ids = cast(Sequence[str], source_ids) - - uids_to_delete = record_manager.list_keys( - group_ids=_source_ids, before=index_start_dt - ) - if uids_to_delete: - # Then delete from vector store. - vector_store.delete(uids_to_delete) - # First delete from record store. - record_manager.delete_keys(uids_to_delete) - num_deleted += len(uids_to_delete) - - if cleanup == "full": - while uids_to_delete := record_manager.list_keys( - before=index_start_dt, limit=cleanup_batch_size - ): - # First delete from record store. - vector_store.delete(uids_to_delete) - # Then delete from record manager. - record_manager.delete_keys(uids_to_delete) - num_deleted += len(uids_to_delete) - - return { - "num_added": num_added, - "num_updated": num_updated, - "num_skipped": num_skipped, - "num_deleted": num_deleted, - } - - -# Define an asynchronous generator function -async def _to_async_iterator(iterator: Iterable[T]) -> AsyncIterator[T]: - """Convert an iterable to an async iterator.""" - for item in iterator: - yield item - - -async def aindex( - docs_source: Union[BaseLoader, Iterable[Document], AsyncIterator[Document]], - record_manager: RecordManager, - vector_store: VectorStore, - *, - batch_size: int = 100, - cleanup: Literal["incremental", "full", None] = None, - source_id_key: Union[str, Callable[[Document], str], None] = None, - cleanup_batch_size: int = 1_000, - force_update: bool = False, -) -> IndexingResult: - """Index data from the loader into the vector store. - - Indexing functionality uses a manager to keep track of which documents - are in the vector store. - - This allows us to keep track of which documents were updated, and which - documents were deleted, which documents should be skipped. - - For the time being, documents are indexed using their hashes, and users - are not able to specify the uid of the document. - - IMPORTANT: - if auto_cleanup is set to True, the loader should be returning - the entire dataset, and not just a subset of the dataset. - Otherwise, the auto_cleanup will remove documents that it is not - supposed to. - - Args: - docs_source: Data loader or iterable of documents to index. - record_manager: Timestamped set to keep track of which documents were - updated. - vector_store: Vector store to index the documents into. - batch_size: Batch size to use when indexing. - cleanup: How to handle clean up of documents. - - Incremental: Cleans up all documents that haven't been updated AND - that are associated with source ids that were seen - during indexing. - Clean up is done continuously during indexing helping - to minimize the probability of users seeing duplicated - content. - - Full: Delete all documents that haven to been returned by the loader. - Clean up runs after all documents have been indexed. - This means that users may see duplicated content during indexing. - - None: Do not delete any documents. - source_id_key: Optional key that helps identify the original source - of the document. - cleanup_batch_size: Batch size to use when cleaning up documents. - force_update: Force update documents even if they are present in the - record manager. Useful if you are re-indexing with updated embeddings. - - Returns: - Indexing result which contains information about how many documents - were added, updated, deleted, or skipped. - """ - - if cleanup not in {"incremental", "full", None}: - raise ValueError( - f"cleanup should be one of 'incremental', 'full' or None. " - f"Got {cleanup}." - ) - - if cleanup == "incremental" and source_id_key is None: - raise ValueError("Source id key is required when cleanup mode is incremental.") - - # Check that the Vectorstore has required methods implemented - methods = ["adelete", "aadd_documents"] - - for method in methods: - if not hasattr(vector_store, method): - raise ValueError( - f"Vectorstore {vector_store} does not have required method {method}" - ) - - if type(vector_store).adelete == VectorStore.adelete: - # Checking if the vectorstore has overridden the default delete method - # implementation which just raises a NotImplementedError - raise ValueError("Vectorstore has not implemented the delete method") - - async_doc_iterator: AsyncIterator[Document] - if isinstance(docs_source, BaseLoader): - try: - async_doc_iterator = docs_source.alazy_load() - except NotImplementedError: - # Exception triggered when neither lazy_load nor alazy_load are implemented. - # * The default implementation of alazy_load uses lazy_load. - # * The default implementation of lazy_load raises NotImplementedError. - # In such a case, we use the load method and convert it to an async - # iterator. - async_doc_iterator = _to_async_iterator(docs_source.load()) - else: - if hasattr(docs_source, "__aiter__"): - async_doc_iterator = docs_source # type: ignore[assignment] - else: - async_doc_iterator = _to_async_iterator(docs_source) - - source_id_assigner = _get_source_id_assigner(source_id_key) - - # Mark when the update started. - index_start_dt = await record_manager.aget_time() - num_added = 0 - num_skipped = 0 - num_updated = 0 - num_deleted = 0 - - async for doc_batch in _abatch(batch_size, async_doc_iterator): - hashed_docs = list( - _deduplicate_in_order( - [_HashedDocument.from_document(doc) for doc in doc_batch] - ) - ) - - source_ids: Sequence[Optional[str]] = [ - source_id_assigner(doc) for doc in hashed_docs - ] - - if cleanup == "incremental": - # If the cleanup mode is incremental, source ids are required. - for source_id, hashed_doc in zip(source_ids, hashed_docs): - if source_id is None: - raise ValueError( - "Source ids are required when cleanup mode is incremental. " - f"Document that starts with " - f"content: {hashed_doc.page_content[:100]} was not assigned " - f"as source id." - ) - # source ids cannot be None after for loop above. - source_ids = cast(Sequence[str], source_ids) - - exists_batch = await record_manager.aexists([doc.uid for doc in hashed_docs]) - - # Filter out documents that already exist in the record store. - uids: list[str] = [] - docs_to_index: list[Document] = [] - uids_to_refresh = [] - seen_docs: Set[str] = set() - for hashed_doc, doc_exists in zip(hashed_docs, exists_batch): - if doc_exists: - if force_update: - seen_docs.add(hashed_doc.uid) - else: - uids_to_refresh.append(hashed_doc.uid) - continue - uids.append(hashed_doc.uid) - docs_to_index.append(hashed_doc.to_document()) - - if uids_to_refresh: - # Must be updated to refresh timestamp. - await record_manager.aupdate(uids_to_refresh, time_at_least=index_start_dt) - num_skipped += len(uids_to_refresh) - - # Be pessimistic and assume that all vector store write will fail. - # First write to vector store - if docs_to_index: - await vector_store.aadd_documents( - docs_to_index, ids=uids, batch_size=batch_size - ) - num_added += len(docs_to_index) - len(seen_docs) - num_updated += len(seen_docs) - - # And only then update the record store. - # Update ALL records, even if they already exist since we want to refresh - # their timestamp. - await record_manager.aupdate( - [doc.uid for doc in hashed_docs], - group_ids=source_ids, - time_at_least=index_start_dt, - ) - - # If source IDs are provided, we can do the deletion incrementally! - - if cleanup == "incremental": - # Get the uids of the documents that were not returned by the loader. - - # mypy isn't good enough to determine that source ids cannot be None - # here due to a check that's happening above, so we check again. - for source_id in source_ids: - if source_id is None: - raise AssertionError("Source ids cannot be None here.") - - _source_ids = cast(Sequence[str], source_ids) - - uids_to_delete = await record_manager.alist_keys( - group_ids=_source_ids, before=index_start_dt - ) - if uids_to_delete: - # Then delete from vector store. - await vector_store.adelete(uids_to_delete) - # First delete from record store. - await record_manager.adelete_keys(uids_to_delete) - num_deleted += len(uids_to_delete) - - if cleanup == "full": - while uids_to_delete := await record_manager.alist_keys( - before=index_start_dt, limit=cleanup_batch_size - ): - # First delete from record store. - await vector_store.adelete(uids_to_delete) - # Then delete from record manager. - await record_manager.adelete_keys(uids_to_delete) - num_deleted += len(uids_to_delete) - - return { - "num_added": num_added, - "num_updated": num_updated, - "num_skipped": num_skipped, - "num_deleted": num_deleted, - } +# Please do not use these in your application. These are private APIs. +# Here to avoid changing unit tests during a migration. +__all__ = ["_HashedDocument", "_abatch", "_batch"] diff --git a/libs/langchain/langchain/indexes/_sql_record_manager.py b/libs/langchain/langchain/indexes/_sql_record_manager.py index 435f34e11cc42..a61be7776c9dc 100644 --- a/libs/langchain/langchain/indexes/_sql_record_manager.py +++ b/libs/langchain/langchain/indexes/_sql_record_manager.py @@ -18,6 +18,7 @@ import uuid from typing import Any, AsyncGenerator, Dict, Generator, List, Optional, Sequence, Union +from langchain_core.indexing import RecordManager from sqlalchemy import ( URL, Column, @@ -41,8 +42,6 @@ from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import Query, Session, sessionmaker -from langchain.indexes.base import RecordManager - Base = declarative_base()