From c453926c2848f03e0605b8cc1f6f06ac4b2b990c Mon Sep 17 00:00:00 2001 From: Ludwik Trammer Date: Mon, 18 Nov 2024 17:33:20 +0100 Subject: [PATCH] feat(document-search): batch ingestion of documents (#185) --- .../document_search/execution_strategies.md | 7 ++ .../{ingestion.md => processing.md} | 0 .../document_search/async_processing.md | 73 +++++++++++++++ .../create_custom_execution_strategy.md | 91 +++++++++++++++++++ docs/{ => how-to}/integrations/promptfoo.md | 2 +- docs/how-to/use_guardrails.md | 2 +- mkdocs.yml | 12 ++- .../src/ragbits/document_search/_main.py | 48 +++------- .../processor_strategies/__init__.py | 33 +++++++ .../ingestion/processor_strategies/base.py | 91 +++++++++++++++++++ .../ingestion/processor_strategies/batched.py | 67 ++++++++++++++ .../processor_strategies/sequential.py | 40 ++++++++ .../ingestion/providers/dummy.py | 2 +- .../tests/unit/test_document_search.py | 38 ++++++++ .../tests/unit/test_processing_strategies.py | 32 +++++++ 15 files changed, 498 insertions(+), 40 deletions(-) create mode 100644 docs/api_reference/document_search/execution_strategies.md rename docs/api_reference/document_search/{ingestion.md => processing.md} (100%) create mode 100644 docs/how-to/document_search/async_processing.md create mode 100644 docs/how-to/document_search/create_custom_execution_strategy.md rename docs/{ => how-to}/integrations/promptfoo.md (95%) create mode 100644 packages/ragbits-document-search/src/ragbits/document_search/ingestion/processor_strategies/__init__.py create mode 100644 packages/ragbits-document-search/src/ragbits/document_search/ingestion/processor_strategies/base.py create mode 100644 packages/ragbits-document-search/src/ragbits/document_search/ingestion/processor_strategies/batched.py create mode 100644 packages/ragbits-document-search/src/ragbits/document_search/ingestion/processor_strategies/sequential.py create mode 100644 packages/ragbits-document-search/tests/unit/test_processing_strategies.py diff --git a/docs/api_reference/document_search/execution_strategies.md b/docs/api_reference/document_search/execution_strategies.md new file mode 100644 index 00000000..0d09d18c --- /dev/null +++ b/docs/api_reference/document_search/execution_strategies.md @@ -0,0 +1,7 @@ +# Execution Strategies + +::: ragbits.document_search.ingestion.processor_strategies.ProcessingExecutionStrategy + +::: ragbits.document_search.ingestion.processor_strategies.SequentialProcessing + +::: ragbits.document_search.ingestion.processor_strategies.BatchedAsyncProcessing \ No newline at end of file diff --git a/docs/api_reference/document_search/ingestion.md b/docs/api_reference/document_search/processing.md similarity index 100% rename from docs/api_reference/document_search/ingestion.md rename to docs/api_reference/document_search/processing.md diff --git a/docs/how-to/document_search/async_processing.md b/docs/how-to/document_search/async_processing.md new file mode 100644 index 00000000..4846a298 --- /dev/null +++ b/docs/how-to/document_search/async_processing.md @@ -0,0 +1,73 @@ +# How to Ingest Documents Asynchronously + +In Ragbits, a component called "processing execution strategy" controls how document processing is executed during ingestion. There are multiple execution strategies available in Ragbits that can be easily interchanged. You can also [create new custom execution strategies](create_custom_execution_strategy.md) to meet your specific needs. + +!!! note + It's important to note that processing execution strategies are a separate concept from processors. While the former manage how the processing is executed, the latter deals with the actual processing of documents. Processors are managed by [DocumentProcessorRouter][ragbits.document_search.ingestion.document_processor.DocumentProcessorRouter]. + +# The Synchronous Execution Strategy + +The default execution strategy in Ragbits is [`SequentialProcessing`][ragbits.document_search.ingestion.processor_strategies.SequentialProcessing]. This strategy processes documents one by one, waiting for each document to be processed before moving on to the next. Although it's the simplest and most straightforward strategy, it may be slow when processing a large number of documents. + +Unless you specify a different strategy, Ragbits will use the `SequentialProcessing` strategy by default when ingesting documents: + +```python +from ragbits.core.embeddings.litellm import LiteLLMEmbeddings +from ragbits.core.vector_stores.in_memory import InMemoryVectorStore +from ragbits.document_search import DocumentSearch +from ragbits.document_search.documents.document import DocumentMeta + +documents = [ + DocumentMeta.create_text_document_from_literal("Example document 1"), + DocumentMeta.create_text_document_from_literal("Example document 2"), +] + +embedder = LiteLLMEmbeddings( + model="text-embedding-3-small", +) +vector_store = InMemoryVectorStore() + +document_search = DocumentSearch( + embedder=embedder, + vector_store=vector_store, +) + +await document_search.ingest(documents) +``` + +# The Asynchronous Execution Strategy + +If you need to process documents simultaneously, you can use the [`BatchedAsyncProcessing`][ragbits.document_search.ingestion.processor_strategies.BatchedAsyncProcessing] execution strategy. This strategy uses Python's built-in `asyncio` library to process documents in parallel, making it faster than the `SequentialProcessing` strategy, especially with large document volumes. + +To use the `BatchedAsyncProcessing` strategy, specify it when creating the [`DocumentSearch`][ragbits.document_search.DocumentSearch] instance: + +```python +from ragbits.core.embeddings.litellm import LiteLLMEmbeddings +from ragbits.core.vector_stores.in_memory import InMemoryVectorStore +from ragbits.document_search import DocumentSearch +from ragbits.document_search.documents.document import DocumentMeta +from ragbits.document_search.ingestion.processor_strategies import BatchedAsyncProcessing + +documents = [ + DocumentMeta.create_text_document_from_literal("Example document 1"), + DocumentMeta.create_text_document_from_literal("Example document 2"), +] + +embedder = LiteLLMEmbeddings( + model="text-embedding-3-small", +) +vector_store = InMemoryVectorStore() +processing_strategy = BatchedAsyncProcessing() + +document_search = DocumentSearch( + embedder=embedder, + vector_store=vector_store, + processing_strategy=processing_strategy +) +``` + +Also, you can adjust the batch size for the `BatchedAsyncProcessing` strategy. The batch size controls how many documents are processed at once. By default, the batch size is 10, but you can modify it by passing the `batch_size` parameter to the `BatchedAsyncProcessing` constructor: + +```python +processing_strategy = BatchedAsyncProcessing(batch_size=64) +``` \ No newline at end of file diff --git a/docs/how-to/document_search/create_custom_execution_strategy.md b/docs/how-to/document_search/create_custom_execution_strategy.md new file mode 100644 index 00000000..14772c37 --- /dev/null +++ b/docs/how-to/document_search/create_custom_execution_strategy.md @@ -0,0 +1,91 @@ +# How to Create a Custom Execution Strategy + +!!! note + To learn how to use a built-in asynchronous execution strategy, see [How to Ingest Documents Asynchronously](async_processing.md). + +In Ragbits, document processing during ingestion is controlled by a component known as "processing execution strategy". It doesn't deal with the actual processing of documents, but rather, it orchestrates how the processing is executed. + +Ragbits provides several built-in execution strategies that can be easily interchanged. You can also create your own custom execution strategy to fulfill your specific needs. This guide will show you how to develop a custom execution strategy using a somewhat impractical example of a strategy that processes documents one by one, but with a delay between each document. + +## Implementing a Custom Execution Strategy +To create a custom execution strategy, you need to create a new class that inherits from [`ProcessingExecutionStrategy`][ragbits.document_search.ingestion.processor_strategies.ProcessingExecutionStrategy] and implement the abstract method `execute`. This method should take a list of documents and process them asynchronously. It should also implement the abstract method `process_documents`. + +While implementing the `process_documents` method, you can use the built-in `process_document` method, which has the same signature and performs the actual processing of a single document. + +```python +import asyncio + +from ragbits.document_search.ingestion.processor_strategies import ProcessingExecutionStrategy + +class DelayedExecutionStrategy(ProcessingExecutionStrategy): + async def process_documents( + self, + documents: Sequence[DocumentMeta | Document | Source], + processor_router: DocumentProcessorRouter, + processor_overwrite: BaseProvider | None = None, + ) -> list[Element]: + elements = [] + for document in documents: + await asyncio.sleep(1) + element = await self.process_document(document, processor_router, processor_overwrite) + elements.append(element) + return elements +``` + +## Implementing an Advanced Custom Execution Strategy +Alternatively, instead of using the `process_document` method, you can process documents directly using the `processor_router` and `processor_overwrite` parameters. This gives you more control over the processing of documents. + +```python +import asyncio + +from ragbits.document_search.ingestion.processor_strategies import ProcessingExecutionStrategy + +class DelayedExecutionStrategy(ProcessingExecutionStrategy): + async def process_documents( + self, + documents: Sequence[DocumentMeta | Document | Source], + processor_router: DocumentProcessorRouter, + processor_overwrite: BaseProvider | None = None, + ) -> list[Element]: + elements = [] + for document in documents: + # Convert the document to DocumentMeta + document_meta = await self.to_document_meta(document) + + # Get the processor for the document + processor = processor_overwrite or processor_router.get_processor(document) + + await asyncio.sleep(1) + + element = await processor.process(document_meta) + elements.append(element) + return elements +``` + +## Using the Custom Execution Strategy +To use your custom execution strategy, you need to specify it when creating the [`DocumentSearch`][ragbits.document_search.DocumentSearch] instance: + +```python +from ragbits.core.embeddings.litellm import LiteLLMEmbeddings +from ragbits.core.vector_stores.in_memory import InMemoryVectorStore +from ragbits.document_search import DocumentSearch +from ragbits.document_search.documents.document import DocumentMeta + + +documents = [ + DocumentMeta.create_text_document_from_literal("Example document 1"), + DocumentMeta.create_text_document_from_literal("Example document 2"), +] + +embedder = LiteLLMEmbeddings( + model="text-embedding-3-small", +) +vector_store = InMemoryVectorStore() +processing_strategy = DelayedExecutionStrategy() + +document_search = DocumentSearch( + embedder=embedder, + vector_store=vector_store, + processing_strategy=processing_strategy +) +``` \ No newline at end of file diff --git a/docs/integrations/promptfoo.md b/docs/how-to/integrations/promptfoo.md similarity index 95% rename from docs/integrations/promptfoo.md rename to docs/how-to/integrations/promptfoo.md index 05eeb7b6..f899c93d 100644 --- a/docs/integrations/promptfoo.md +++ b/docs/how-to/integrations/promptfoo.md @@ -1,4 +1,4 @@ -## Promptfoo Integration +# How to integrate Promptfoo with Ragbits Ragbits' `Prompt` abstraction can be seamlessly integrated with the `promptfoo` tool. After installing `promptfoo` as specified in the [promptfoo documentation](https://www.promptfoo.dev/docs/installation/), you can generate promptfoo diff --git a/docs/how-to/use_guardrails.md b/docs/how-to/use_guardrails.md index 430e883c..6350b8f1 100644 --- a/docs/how-to/use_guardrails.md +++ b/docs/how-to/use_guardrails.md @@ -1,4 +1,4 @@ -# How-To: Use Guardrails +# How to use Guardrails Ragbits offers an expandable guardrails system. You can use one of the available guardrails or create your own to prevent toxic language, PII leaks etc. diff --git a/mkdocs.yml b/mkdocs.yml index 4e9f1f42..ae87765f 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -7,7 +7,12 @@ copyright: Copyright © 2024 deepsense.ai nav: - rabgbits: index.md - How-to Guides: - - integrations/promptfoo.md + - how-to/optimize.md + - how-to/use_guardrails.md + - how-to/integrations/promptfoo.md + - Document Search: + - how-to/document_search/async_processing.md + - how-to/document_search/create_custom_execution_strategy.md - API Reference: - Core: - api_reference/core/prompt.md @@ -17,8 +22,9 @@ nav: - Document Search: - api_reference/document_search/index.md - api_reference/document_search/documents.md - - api_reference/document_search/ingestion.md - + - Ingestion: + - api_reference/document_search/processing.md + - api_reference/document_search/execution_strategies.md theme: name: material icon: diff --git a/packages/ragbits-document-search/src/ragbits/document_search/_main.py b/packages/ragbits-document-search/src/ragbits/document_search/_main.py index daa56dc0..706e1e86 100644 --- a/packages/ragbits-document-search/src/ragbits/document_search/_main.py +++ b/packages/ragbits-document-search/src/ragbits/document_search/_main.py @@ -12,6 +12,11 @@ from ragbits.document_search.documents.element import Element, ImageElement from ragbits.document_search.documents.sources import Source from ragbits.document_search.ingestion.document_processor import DocumentProcessorRouter +from ragbits.document_search.ingestion.processor_strategies import ( + ProcessingExecutionStrategy, + SequentialProcessing, + get_processing_strategy, +) from ragbits.document_search.ingestion.providers.base import BaseProvider from ragbits.document_search.retrieval.rephrasers import get_rephraser from ragbits.document_search.retrieval.rephrasers.base import QueryRephraser @@ -48,6 +53,8 @@ class DocumentSearch: vector_store: VectorStore query_rephraser: QueryRephraser reranker: Reranker + document_processor_router: DocumentProcessorRouter + processing_strategy: ProcessingExecutionStrategy def __init__( self, @@ -56,12 +63,14 @@ def __init__( query_rephraser: QueryRephraser | None = None, reranker: Reranker | None = None, document_processor_router: DocumentProcessorRouter | None = None, + processing_strategy: ProcessingExecutionStrategy | None = None, ) -> None: self.embedder = embedder self.vector_store = vector_store self.query_rephraser = query_rephraser or NoopQueryRephraser() self.reranker = reranker or NoopReranker() self.document_processor_router = document_processor_router or DocumentProcessorRouter.from_config() + self.processing_strategy = processing_strategy or SequentialProcessing() @classmethod def from_config(cls, config: dict) -> "DocumentSearch": @@ -78,12 +87,13 @@ def from_config(cls, config: dict) -> "DocumentSearch": query_rephraser = get_rephraser(config.get("rephraser")) reranker = get_reranker(config.get("reranker")) vector_store = get_vector_store(config["vector_store"]) + processing_strategy = get_processing_strategy(config.get("processing_strategy")) providers_config_dict: dict = config.get("providers", {}) providers_config = DocumentProcessorRouter.from_dict_to_providers_config(providers_config_dict) document_processor_router = DocumentProcessorRouter.from_config(providers_config) - return cls(embedder, vector_store, query_rephraser, reranker, document_processor_router) + return cls(embedder, vector_store, query_rephraser, reranker, document_processor_router, processing_strategy) @traceable async def search(self, query: str, config: SearchConfig | None = None) -> Sequence[Element]: @@ -114,35 +124,6 @@ async def search(self, query: str, config: SearchConfig | None = None) -> Sequen options=RerankerOptions(**config.reranker_kwargs), ) - async def _process_document( - self, - document: DocumentMeta | Document | Source, - document_processor: BaseProvider | None = None, - ) -> list[Element]: - """ - Process a document and return the elements. - - Args: - document: The document to process. - document_processor: The document processor to use. If not provided, the document processor will be - determined based on the document metadata. - - Returns: - The elements. - """ - if isinstance(document, Source): - document_meta = await DocumentMeta.from_source(document) - elif isinstance(document, DocumentMeta): - document_meta = document - else: - document_meta = document.metadata - - if document_processor is None: - document_processor = self.document_processor_router.get_provider(document_meta) - - document_processor = self.document_processor_router.get_provider(document_meta) - return await document_processor.process(document_meta) - @traceable async def ingest( self, @@ -157,10 +138,9 @@ async def ingest( document_processor: The document processor to use. If not provided, the document processor will be determined based on the document metadata. """ - elements = [] - # TODO: Parallelize - for document in documents: - elements.extend(await self._process_document(document, document_processor)) + elements = await self.processing_strategy.process_documents( + documents, self.document_processor_router, document_processor + ) await self.insert_elements(elements) async def insert_elements(self, elements: list[Element]) -> None: diff --git a/packages/ragbits-document-search/src/ragbits/document_search/ingestion/processor_strategies/__init__.py b/packages/ragbits-document-search/src/ragbits/document_search/ingestion/processor_strategies/__init__.py new file mode 100644 index 00000000..4924d7be --- /dev/null +++ b/packages/ragbits-document-search/src/ragbits/document_search/ingestion/processor_strategies/__init__.py @@ -0,0 +1,33 @@ +import sys + +from ragbits.core.utils.config_handling import get_cls_from_config + +from .base import ProcessingExecutionStrategy +from .batched import BatchedAsyncProcessing +from .sequential import SequentialProcessing + +__all__ = ["BatchedAsyncProcessing", "ProcessingExecutionStrategy", "SequentialProcessing"] + + +def get_processing_strategy(config: dict | None = None) -> ProcessingExecutionStrategy: + """ + Initializes and returns a ProcessingExecutionStrategy object based on the provided configuration. + + Args: + config: A dictionary containing configuration details for the ProcessingExecutionStrategy. + + Returns: + An instance of the specified ProcessingExecutionStrategy class, initialized with the provided config + (if any) or default arguments. + + Raises: + KeyError: If the provided configuration does not contain a valid "type" key. + InvalidConfigurationError: If the provided configuration is invalid. + NotImplementedError: If the specified ProcessingExecutionStrategy class cannot be created from + the provided configuration. + """ + if config is None: + return SequentialProcessing() + + strategy_cls = get_cls_from_config(config["type"], sys.modules[__name__]) + return strategy_cls.from_config(config.get("config", {})) diff --git a/packages/ragbits-document-search/src/ragbits/document_search/ingestion/processor_strategies/base.py b/packages/ragbits-document-search/src/ragbits/document_search/ingestion/processor_strategies/base.py new file mode 100644 index 00000000..f2c94936 --- /dev/null +++ b/packages/ragbits-document-search/src/ragbits/document_search/ingestion/processor_strategies/base.py @@ -0,0 +1,91 @@ +from abc import ABC, abstractmethod +from collections.abc import Sequence + +from typing_extensions import Self + +from ragbits.document_search.documents.document import Document, DocumentMeta +from ragbits.document_search.documents.element import Element +from ragbits.document_search.documents.sources import Source +from ragbits.document_search.ingestion.document_processor import DocumentProcessorRouter +from ragbits.document_search.ingestion.providers.base import BaseProvider + + +class ProcessingExecutionStrategy(ABC): + """ + Base class for processing execution strategies that define how documents are processed to become elements. + + Processing execution strategies are responsible for processing documents using the appropriate processor, + which means that they don't usually determine the business logic of the processing itself, but rather how + the processing is executed. + """ + + @classmethod + def from_config(cls, config: dict) -> Self: + """ + Creates and returns an instance of the ProcessingExecutionStrategy subclass from the given configuration. + + Args: + config: A dictionary containing the configuration for initializing the instance. + + Returns: + An initialized instance of the ProcessingExecutionStrategy subclass. + """ + return cls(**config) + + @staticmethod + async def to_document_meta(document: DocumentMeta | Document | Source) -> DocumentMeta: + """ + Convert a document, document meta or source to a document meta object. + + Args: + document: The document to convert. + + Returns: + The document meta object. + """ + if isinstance(document, Source): + return await DocumentMeta.from_source(document) + elif isinstance(document, DocumentMeta): + return document + else: + return document.metadata + + async def process_document( + self, + document: DocumentMeta | Document | Source, + processor_router: DocumentProcessorRouter, + processor_overwrite: BaseProvider | None = None, + ) -> list[Element]: + """ + Process a single document and return the elements. + + Args: + document: The document to process. + processor_router: The document processor router to use. + processor_overwrite: Forces the use of a specific processor, instead of the one provided by the router. + + Returns: + A list of elements. + """ + document_meta = await self.to_document_meta(document) + processor = processor_overwrite or processor_router.get_provider(document_meta) + return await processor.process(document_meta) + + @abstractmethod + async def process_documents( + self, + documents: Sequence[DocumentMeta | Document | Source], + processor_router: DocumentProcessorRouter, + processor_overwrite: BaseProvider | None = None, + ) -> list[Element]: + """ + Process documents using the given processor and return the resulting elements. + + Args: + documents: The documents to process. + processor_router: The document processor router to use. + processor_overwrite: Forces the use of a specific processor, instead of the one provided by the router. + + Returns: + A list of elements. + """ diff --git a/packages/ragbits-document-search/src/ragbits/document_search/ingestion/processor_strategies/batched.py b/packages/ragbits-document-search/src/ragbits/document_search/ingestion/processor_strategies/batched.py new file mode 100644 index 00000000..56f3729c --- /dev/null +++ b/packages/ragbits-document-search/src/ragbits/document_search/ingestion/processor_strategies/batched.py @@ -0,0 +1,67 @@ +import asyncio +from collections.abc import Sequence + +from ragbits.document_search.documents.document import Document, DocumentMeta +from ragbits.document_search.documents.element import Element +from ragbits.document_search.documents.sources import Source +from ragbits.document_search.ingestion.document_processor import DocumentProcessorRouter +from ragbits.document_search.ingestion.providers.base import BaseProvider + +from .base import ProcessingExecutionStrategy + + +class BatchedAsyncProcessing(ProcessingExecutionStrategy): + """ + A processing execution strategy that processes documents asynchronously in batches. + """ + + def __init__(self, batch_size: int = 10): + """ + Initialize the BatchedAsyncProcessing instance. + + Args: + batch_size: The size of the batch to process documents in. + """ + self.batch_size = batch_size + + async def _process_with_semaphore( + self, + semaphore: asyncio.Semaphore, + document: DocumentMeta | Document | Source, + processor_router: DocumentProcessorRouter, + processor_overwrite: BaseProvider | None = None, + ) -> list[Element]: + async with semaphore: + return await self.process_document(document, processor_router, processor_overwrite) + + async def process_documents( + self, + documents: Sequence[DocumentMeta | Document | Source], + processor_router: DocumentProcessorRouter, + processor_overwrite: BaseProvider | None = None, + ) -> list[Element]: + """ + Process documents using the given processor and return the resulting elements. + + Args: + documents: The documents to process. + processor_router: The document processor router to use. + processor_overwrite: Forces the use of a specific processor, instead of the one provided by the router. + + Returns: + A list of elements. + + Returns: + A list of elements. + """ + semaphore = asyncio.Semaphore(self.batch_size) + + responses = await asyncio.gather( + *[ + self._process_with_semaphore(semaphore, document, processor_router, processor_overwrite) + for document in documents + ] + ) + + # Return a flattened list of elements + return [element for response in responses for element in response] diff --git a/packages/ragbits-document-search/src/ragbits/document_search/ingestion/processor_strategies/sequential.py b/packages/ragbits-document-search/src/ragbits/document_search/ingestion/processor_strategies/sequential.py new file mode 100644 index 00000000..a0ea57da --- /dev/null +++ b/packages/ragbits-document-search/src/ragbits/document_search/ingestion/processor_strategies/sequential.py @@ -0,0 +1,40 @@ +from collections.abc import Sequence + +from ragbits.document_search.documents.document import Document, DocumentMeta +from ragbits.document_search.documents.element import Element +from ragbits.document_search.documents.sources import Source +from ragbits.document_search.ingestion.document_processor import DocumentProcessorRouter +from ragbits.document_search.ingestion.providers.base import BaseProvider + +from .base import ProcessingExecutionStrategy + + +class SequentialProcessing(ProcessingExecutionStrategy): + """ + A processing execution strategy that processes documents in sequence, one at a time. + """ + + async def process_documents( + self, + documents: Sequence[DocumentMeta | Document | Source], + processor_router: DocumentProcessorRouter, + processor_overwrite: BaseProvider | None = None, + ) -> list[Element]: + """ + Process documents using the given processor and return the resulting elements. + + Args: + documents: The documents to process. + processor_router: The document processor router to use. + processor_overwrite: Forces the use of a specific processor, instead of the one provided by the router. + + Returns: + A list of elements. + + Returns: + A list of elements. + """ + elements = [] + for document in documents: + elements.extend(await self.process_document(document, processor_router, processor_overwrite)) + return elements diff --git a/packages/ragbits-document-search/src/ragbits/document_search/ingestion/providers/dummy.py b/packages/ragbits-document-search/src/ragbits/document_search/ingestion/providers/dummy.py index d65e84b2..a646d374 100644 --- a/packages/ragbits-document-search/src/ragbits/document_search/ingestion/providers/dummy.py +++ b/packages/ragbits-document-search/src/ragbits/document_search/ingestion/providers/dummy.py @@ -13,7 +13,7 @@ class DummyProvider(BaseProvider): It should be used for testing purposes only. """ - SUPPORTED_DOCUMENT_TYPES = {DocumentType.TXT} + SUPPORTED_DOCUMENT_TYPES = {DocumentType.TXT, DocumentType.MD} async def process(self, document_meta: DocumentMeta) -> list[Element]: """ diff --git a/packages/ragbits-document-search/tests/unit/test_document_search.py b/packages/ragbits-document-search/tests/unit/test_document_search.py index 3b93c9b7..b544ca47 100644 --- a/packages/ragbits-document-search/tests/unit/test_document_search.py +++ b/packages/ragbits-document-search/tests/unit/test_document_search.py @@ -11,6 +11,7 @@ from ragbits.document_search.documents.element import TextElement from ragbits.document_search.documents.sources import LocalFileSource from ragbits.document_search.ingestion.document_processor import DocumentProcessorRouter +from ragbits.document_search.ingestion.processor_strategies.batched import BatchedAsyncProcessing from ragbits.document_search.ingestion.providers import BaseProvider from ragbits.document_search.ingestion.providers.dummy import DummyProvider @@ -19,6 +20,7 @@ "vector_store": {"type": "ragbits.core.vector_stores.in_memory:InMemoryVectorStore"}, "reranker": {"type": "NoopReranker"}, "providers": {"txt": {"type": "DummyProvider"}}, + "processing_strategy": {"type": "SequentialProcessing"}, } @@ -163,3 +165,39 @@ async def test_document_search_ingest_multiple_from_sources(): assert len(results) == 2 assert {result.content for result in results} == {"foo", "bar"} # type: ignore + + +async def test_document_search_with_batched(): + documents = [ + DocumentMeta.create_text_document_from_literal("Name of Peppa's brother is George"), + DocumentMeta.create_text_document_from_literal("Name of Peppa's father is Daddy Pig"), + DocumentMeta.create_text_document_from_literal("Name of Peppa's mother is Mummy Pig"), + DocumentMeta.create_text_document_from_literal("Name of Peppa's friend is Suzy Sheep"), + DocumentMeta.create_text_document_from_literal("Name of Peppa's friend is Danny Dog"), + DocumentMeta.create_text_document_from_literal("Name of Peppa's friend is Pedro Pony"), + DocumentMeta.create_text_document_from_literal("Name of Peppa's friend is Emily Elephant"), + DocumentMeta.create_text_document_from_literal("Name of Peppa's friend is Candy Cat"), + DocumentMeta.create_text_document_from_literal("Name of Peppa's teacher is Madame Gazelle"), + DocumentMeta.create_text_document_from_literal("Name of Peppa's doctor is Dr. Brown Bear"), + DocumentMeta.create_text_document_from_literal("Name of Peppa's cousin is Chloe Pig"), + DocumentMeta.create_text_document_from_literal("Name of Peppa's cousin is Alexander Pig"), + ] + + embeddings_mock = AsyncMock() + embeddings_mock.embed_text.return_value = [[0.1, 0.1]] * len(documents) + + processing_strategy = BatchedAsyncProcessing(batch_size=5) + vectore_store = InMemoryVectorStore() + + document_search = DocumentSearch( + embedder=embeddings_mock, + vector_store=vectore_store, + processing_strategy=processing_strategy, + ) + + await document_search.ingest(documents) + + results = await document_search.search("Peppa's brother", config=SearchConfig(vector_store_kwargs={"k": 100})) + + assert len(await vectore_store.list()) == 12 + assert len(results) == 12 diff --git a/packages/ragbits-document-search/tests/unit/test_processing_strategies.py b/packages/ragbits-document-search/tests/unit/test_processing_strategies.py new file mode 100644 index 00000000..3a9ae497 --- /dev/null +++ b/packages/ragbits-document-search/tests/unit/test_processing_strategies.py @@ -0,0 +1,32 @@ +import pytest + +from ragbits.document_search.documents.document import DocumentMeta, DocumentType +from ragbits.document_search.ingestion.document_processor import DocumentProcessorRouter +from ragbits.document_search.ingestion.processor_strategies.batched import BatchedAsyncProcessing +from ragbits.document_search.ingestion.processor_strategies.sequential import SequentialProcessing +from ragbits.document_search.ingestion.providers.dummy import DummyProvider + + +@pytest.fixture(name="documents") +def documents_fixture() -> list[DocumentMeta]: + return [ + DocumentMeta.create_text_document_from_literal("Name of Peppa's brother is George"), + DocumentMeta.create_text_document_from_literal("Name of Peppa's mother is Mummy Pig"), + DocumentMeta.create_text_document_from_literal("Name of Peppa's father is Daddy Pig"), + DocumentMeta.create_text_document_from_literal("Name of Peppa's grandfather is Grandpa Pig"), + DocumentMeta.create_text_document_from_literal("Name of Peppa's grandmother is Granny Pig"), + ] + + +async def test_sequential_strategy(documents: list[DocumentMeta]): + router = DocumentProcessorRouter.from_config({DocumentType.TXT: DummyProvider()}) + strategy = SequentialProcessing() + elements = await strategy.process_documents(documents, router) + assert len(elements) == 5 + + +async def test_batched_strategy(documents: list[DocumentMeta]): + router = DocumentProcessorRouter.from_config({DocumentType.TXT: DummyProvider()}) + strategy = BatchedAsyncProcessing(batch_size=2) + elements = await strategy.process_documents(documents, router) + assert len(elements) == 5