Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(document-search): add Ray processing strategy #207

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 70 additions & 0 deletions docs/how-to/document_search/distributed_ingestion.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
# How to Ingest Documents in a distributed fashion

Ragbits Document Search can ingest documents in a distributed fashion if it's installed with `distributed` extra. This can be set up by specifying the `DistributedProcessing` execution strategy when creating the `DocumentSearch` instance.

```python
from ragbits.core.embeddings.litellm import LiteLLMEmbeddings
from ragbits.core.vector_stores.in_memory import InMemoryVectorStore
from ragbits.document_search import DocumentSearch
from ragbits.document_search.documents.document import DocumentMeta
from ragbits.document_search.ingestion.processor_strategies.distributed import DistributedProcessing

documents = [
DocumentMeta.create_text_document_from_literal("Example document 1"),
DocumentMeta.create_text_document_from_literal("Example document 2"),
]

embedder = LiteLLMEmbeddings(
model="text-embedding-3-small",
)

vector_store = InMemoryVectorStore()

processing_strategy = DistributedProcessing()

document_search = DocumentSearch(
embedder=embedder,
vector_store=vector_store,
processing_strategy=processing_strategy
)
```

## Local document ingestion

By default, when run outside of a Ray cluster, the Ray Core library will parallelize the processing of documents on the local machine, using cores available on the machine. If that is acceptable, you can just use the code above and the documents will be processed in parallel on the local machine.

## Remote document ingestion

When run inside a Ray cluster, the Ray Core library will parallelize the processing of documents across the nodes in the cluster. There are several ways of sending documents to the Ray cluster for processing, but using Ray Jobs API is by far the most recommended one.

To use Ray Jobs API, you should prepare the processing script and the documents to be processed, and then submit the job to the Ray cluster.
Make sure to replace `<cluster_address>` with the address of your Ray cluster and adjust the `entrypoint` and `runtime_env` parameters to match your setup.

```python
from ray.job_submission import JobSubmissionClient

client = JobSubmissionClient("http://<cluster_address>:8265")
job_id = client.submit_job(
entrypoint="python script.py",
runtime_env={
"working_dir": "./",
"pip": [
"ragbits-core[litellm]",
"ragbits-document-search[distributed]"
]
},
)
print(job_id)
```

Ray Jobs is also available as CLI commands. You can submit a job using the following command:

```bash
ray job submit \
--address http://<cluster_address>:8265 \
--runtime-env '{"pip": ["ragbits-core[litellm]", "ragbits-document-search[distributed]"]}'\
--working-dir . \
-- python script.py
```

There are also other ways to submit jobs to the Ray cluster. For more information, please refer to the [Ray documentation](https://docs.ray.io/en/latest/ray-overview/index.html).
86 changes: 86 additions & 0 deletions examples/document-search/distributed.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
"""
Ragbits Document Search Example: Basic wtih distributed ingestion

This example is based on the "Basic" example, but it demonstrates how to ingest documents in a distributed manner.
The distributed ingestion is provided by "DistributedProcessing" which uses Ray to parallelize the ingestion process.

The script performs the following steps:

1. Create a list of documents.
2. Initialize the `LiteLLMEmbeddings` class with the OpenAI `text-embedding-3-small` embedding model.
3. Initialize the `InMemoryVectorStore` class.
4. Initialize the `DocumentSearch` class with the embedder and the vector store.
5. Ingest the documents into the `DocumentSearch` instance in a distributed manner.
6. Search for documents using a query.
7. Print the search results.

To run the script, execute the following command:

```bash
uv run examples/document-search/distributed.py
```
"""

# /// script
# requires-python = ">=3.10"
# dependencies = [
# "ragbits-document-search[distributed]",
# "ragbits-core[litellm]",
# ]
# ///

import asyncio

from ragbits.core.embeddings.litellm import LiteLLMEmbeddings
from ragbits.core.vector_stores.in_memory import InMemoryVectorStore
from ragbits.document_search import DocumentSearch
from ragbits.document_search.documents.document import DocumentMeta
from ragbits.document_search.ingestion.processor_strategies import DistributedProcessing

documents = [
DocumentMeta.create_text_document_from_literal(
"""
RIP boiled water. You will be mist.
"""
),
DocumentMeta.create_text_document_from_literal(
"""
Why doesn't James Bond fart in bed? Because it would blow his cover.
"""
),
DocumentMeta.create_text_document_from_literal(
"""
Why programmers don't like to swim? Because they're scared of the floating points.
"""
),
DocumentMeta.create_text_document_from_literal(
"""
This one is completely unrelated.
"""
),
]


async def main() -> None:
"""
Run the example.
"""
embedder = LiteLLMEmbeddings(
model="text-embedding-3-small",
)
vector_store = InMemoryVectorStore()
processing_strategy = DistributedProcessing()
document_search = DocumentSearch(
embedder=embedder,
vector_store=vector_store,
processing_strategy=processing_strategy,
)

await document_search.ingest(documents)

results = await document_search.search("I'm boiling my water and I need a joke")
print(results)


if __name__ == "__main__":
asyncio.run(main())
3 changes: 3 additions & 0 deletions packages/ragbits-document-search/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ gcs = [
huggingface = [
"datasets~=3.0.1",
]
distributed = [
"ray>=2.39.0",
]

[tool.uv]
dev-dependencies = [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@

from .base import ProcessingExecutionStrategy
from .batched import BatchedAsyncProcessing
from .distributed import DistributedProcessing
from .sequential import SequentialProcessing

__all__ = ["BatchedAsyncProcessing", "ProcessingExecutionStrategy", "SequentialProcessing"]
__all__ = ["BatchedAsyncProcessing", "DistributedProcessing", "ProcessingExecutionStrategy", "SequentialProcessing"]


def get_processing_strategy(config: dict | None = None) -> ProcessingExecutionStrategy:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import asyncio
from collections.abc import Sequence

try:
import ray

HAS_RAY = True
except ImportError:
HAS_RAY = False

from ragbits.document_search.documents.document import Document, DocumentMeta
from ragbits.document_search.documents.element import Element
from ragbits.document_search.documents.sources import Source
from ragbits.document_search.ingestion.document_processor import DocumentProcessorRouter
from ragbits.document_search.ingestion.processor_strategies.base import ProcessingExecutionStrategy
from ragbits.document_search.ingestion.providers.base import BaseProvider


class DistributedProcessing(ProcessingExecutionStrategy):
kdziedzic68 marked this conversation as resolved.
Show resolved Hide resolved
"""
A processing execution strategy that processes documents on a cluster, using Ray.
"""

def __init__(self, batch_size: int = 10):
"""
Initialize the DistributedProcessing instance.

Args:
batch_size: The size of the batch to process documents in.
It defaults to 10, but should be increased if the document processing is trivial (< 1s per batch).

Raises:
ModuleNotFoundError: If Ray is not installed.
"""
if not HAS_RAY:
raise ModuleNotFoundError(
"You need to install the 'distributed' extra requirements to use Ray distributed computing"
)

self.batch_size = batch_size

async def process_documents(
self,
documents: Sequence[DocumentMeta | Document | Source],
processor_router: DocumentProcessorRouter,
processor_overwrite: BaseProvider | None = None,
) -> list[Element]:
"""Process multiple documents in parallel using Ray distributed computing framework.

This method processes a sequence of documents in parallel using Ray distributed computing capabilities.
Each document is processed remotely as a separate Ray task.

Args:
documents: The documents to process.
processor_router: The document processor router to use.
processor_overwrite: Forces the use of a specific processor, instead of the one provided by the router.

Returns:
A list of elements.
"""

@ray.remote
def process_document_remotely(documents: Sequence[DocumentMeta | Document | Source]) -> list[Element]:
async def process_batch() -> list[list[Element]]:
tasks = [
self.process_document(document, processor_router, processor_overwrite) for document in documents
]
return await asyncio.gather(*tasks)

results = asyncio.run(process_batch())
return sum(results, [])

tasks = []
for i in range(0, len(documents), self.batch_size):
tasks.append(process_document_remotely.remote(documents[i : i + self.batch_size]))

return sum(await asyncio.gather(*tasks), [])
kdziedzic68 marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from ragbits.document_search.documents.document import DocumentMeta, DocumentType
from ragbits.document_search.ingestion.document_processor import DocumentProcessorRouter
from ragbits.document_search.ingestion.processor_strategies.batched import BatchedAsyncProcessing
from ragbits.document_search.ingestion.processor_strategies.distributed import DistributedProcessing
from ragbits.document_search.ingestion.processor_strategies.sequential import SequentialProcessing
from ragbits.document_search.ingestion.providers.dummy import DummyProvider

Expand Down Expand Up @@ -30,3 +31,10 @@ async def test_batched_strategy(documents: list[DocumentMeta]):
strategy = BatchedAsyncProcessing(batch_size=2)
elements = await strategy.process_documents(documents, router)
assert len(elements) == 5


async def test_distributed_strategy(documents: list[DocumentMeta]):
router = DocumentProcessorRouter.from_config({DocumentType.TXT: DummyProvider()})
strategy = DistributedProcessing()
elements = await strategy.process_documents(documents, router)
assert len(elements) == 5
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ requires-python = ">=3.10"
dependencies = [
"ragbits-cli",
"ragbits-core[chroma,lab,litellm,local,otel,qdrant]",
"ragbits-document-search[gcs,huggingface]",
"ragbits-document-search[gcs,huggingface,distributed]",
"ragbits-evaluate[relari]",
"ragbits-guardrails[openai]",
]
Expand Down
42 changes: 40 additions & 2 deletions uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading