-
Notifications
You must be signed in to change notification settings - Fork 5
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
63a8b61
commit 5e3b830
Showing
7 changed files
with
165 additions
and
5 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
# Execution Strategies | ||
|
||
::: ragbits.document_search.ingestion.processor_strategies.ProcessingExecutionStrategy | ||
|
||
::: ragbits.document_search.ingestion.processor_strategies.SequentialProcessing | ||
|
||
::: ragbits.document_search.ingestion.processor_strategies.BatchedAsyncProcessing |
File renamed without changes.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,62 @@ | ||
# How to Ingest Documents Asynchronously | ||
|
||
In Ragbits, a component called "processing execution strategy" controls how document processing is executed during ingestion. There are multiple execution strategies available in Ragbits that can be easily interchanged. You can also [create new custom execution strategies](create_custom_execution_strategy.md) to meet your specific needs. | ||
|
||
!!! note | ||
It's important to note that processing execution strategies are a separate concept from processors. While the former manage how the processing is executed, the latter deals with the actual processing of documents. Processors are managed by [DocumentProcessorRouter][ragbits.document_search.ingestion.document_processor.DocumentProcessorRouter]. | ||
|
||
# The Synchronous Execution Strategy | ||
|
||
The default execution strategy in Ragbits is [`SequentialProcessing`][ragbits.document_search.ingestion.processor_strategies.SequentialProcessing]. This strategy processes documents one by one, waiting for each document to be processed before moving on to the next. Although it's the simplest and most straightforward strategy, it may be slow when processing a large number of documents. | ||
|
||
Unless you specify a different strategy, Ragbits will use the `SequentialProcessing` strategy by default when ingesting documents: | ||
|
||
```python | ||
documents = [ | ||
DocumentMeta.create_text_document_from_literal("Example document 1"), | ||
DocumentMeta.create_text_document_from_literal("Example document 2"), | ||
] | ||
|
||
embedder = LiteLLMEmbeddings( | ||
model="text-embedding-3-small", | ||
) | ||
vector_store = InMemoryVectorStore() | ||
|
||
document_search = DocumentSearch( | ||
embedder=embedder, | ||
vector_store=vector_store, | ||
) | ||
|
||
await document_search.ingest(documents) | ||
``` | ||
|
||
# The Asynchronous Execution Strategy | ||
|
||
If you need to process documents simultaneously, you can use the [`BatchedAsyncProcessing`][ragbits.document_search.ingestion.processor_strategies.BatchedAsyncProcessing] execution strategy. This strategy uses Python's built-in `asyncio` library to process documents in parallel, making it faster than the `SequentialProcessing` strategy, especially with large document volumes. | ||
|
||
To use the `BatchedAsyncProcessing` strategy, specify it when creating the [`DocumentSearch`][ragbits.document_search.DocumentSearch] instance: | ||
|
||
```python | ||
documents = [ | ||
DocumentMeta.create_text_document_from_literal("Example document 1"), | ||
DocumentMeta.create_text_document_from_literal("Example document 2"), | ||
] | ||
|
||
embedder = LiteLLMEmbeddings( | ||
model="text-embedding-3-small", | ||
) | ||
vector_store = InMemoryVectorStore() | ||
processing_strategy = BatchedAsyncProcessing() | ||
|
||
document_search = DocumentSearch( | ||
embedder=embedder, | ||
vector_store=vector_store, | ||
processing_strategy=processing_strategy | ||
) | ||
``` | ||
|
||
Also, you can adjust the batch size for the `BatchedAsyncProcessing` strategy. The batch size controls how many documents are processed at once. By default, the batch size is 10, but you can modify it by passing the `batch_size` parameter to the `BatchedAsyncProcessing` constructor: | ||
|
||
```python | ||
processing_strategy = BatchedAsyncProcessing(batch_size=64) | ||
``` |
85 changes: 85 additions & 0 deletions
85
docs/how-to/document_search/create_custom_execution_strategy.md
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,85 @@ | ||
# How to Create a Custom Execution Strategy | ||
|
||
!!! note | ||
To learn how to use a built-in asynchronous execution strategy, see [How to Ingest Documents Asynchronously](async_processing.md). | ||
|
||
In Ragbits, document processing during ingestion is controlled by a component known as "processing execution strategy". It doesn't deal with the actual processing of documents, but rather, it orchestrates how the processing is executed. | ||
|
||
Ragbits provides several built-in execution strategies that can be easily interchanged. You can also create your own custom execution strategy to fulfill your specific needs. This guide will show you how to develop a custom execution strategy using a somewhat impractical example of a strategy that processes documents one by one, but with a delay between each document. | ||
|
||
## Implementing a Custom Execution Strategy | ||
To create a custom execution strategy, you need to create a new class that inherits from [`ProcessingExecutionStrategy`][ragbits.document_search.ingestion.processor_strategies.ProcessingExecutionStrategy] and implement the abstract method `execute`. This method should take a list of documents and process them asynchronously. It should also implement the abstract method `process_documents`. | ||
|
||
While implementing the `process_documents` method, you can use the built-in `process_document` method, which has the same signature and performs the actual processing of a single document. | ||
|
||
```python | ||
import asyncio | ||
|
||
from ragbits.document_search.ingestion.processor_strategies import ProcessingExecutionStrategy | ||
|
||
class DelayedExecutionStrategy(ProcessingExecutionStrategy): | ||
async def process_documents( | ||
self, | ||
documents: Sequence[DocumentMeta | Document | Source], | ||
processor_router: DocumentProcessorRouter, | ||
processor_overwrite: BaseProvider | None = None, | ||
) -> list[Element]: | ||
elements = [] | ||
for document in documents: | ||
await asyncio.sleep(1) | ||
element = await self.process_document(document, processor_router, processor_overwrite) | ||
elements.append(element) | ||
return elements | ||
``` | ||
|
||
## Implementing an Advanced Custom Execution Strategy | ||
Alternatively, instead of using the `process_document` method, you can process documents directly using the `processor_router` and `processor_overwrite` parameters. This gives you more control over the processing of documents. | ||
|
||
```python | ||
import asyncio | ||
|
||
from ragbits.document_search.ingestion.processor_strategies import ProcessingExecutionStrategy | ||
|
||
class DelayedExecutionStrategy(ProcessingExecutionStrategy): | ||
async def process_documents( | ||
self, | ||
documents: Sequence[DocumentMeta | Document | Source], | ||
processor_router: DocumentProcessorRouter, | ||
processor_overwrite: BaseProvider | None = None, | ||
) -> list[Element]: | ||
elements = [] | ||
for document in documents: | ||
# Convert the document to DocumentMeta | ||
document_meta = await self.to_document_meta(document) | ||
|
||
# Get the processor for the document | ||
processor = processor_overwrite or processor_router.get_processor(document) | ||
|
||
await asyncio.sleep(1) | ||
|
||
element = await processor.process(document_meta) | ||
elements.append(element) | ||
return elements | ||
``` | ||
|
||
## Using the Custom Execution Strategy | ||
To use your custom execution strategy, you need to specify it when creating the [`DocumentSearch`][ragbits.document_search.DocumentSearch] instance: | ||
|
||
```python | ||
documents = [ | ||
DocumentMeta.create_text_document_from_literal("Example document 1"), | ||
DocumentMeta.create_text_document_from_literal("Example document 2"), | ||
] | ||
|
||
embedder = LiteLLMEmbeddings( | ||
model="text-embedding-3-small", | ||
) | ||
vector_store = InMemoryVectorStore() | ||
processing_strategy = DelayedExecutionStrategy() | ||
|
||
document_search = DocumentSearch( | ||
embedder=embedder, | ||
vector_store=vector_store, | ||
processing_strategy=processing_strategy | ||
) | ||
``` |
2 changes: 1 addition & 1 deletion
2
docs/integrations/promptfoo.md → docs/how-to/integrations/promptfoo.md
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters