From f85bfa6bf65851fd42a5bed2c73df8631fba0242 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Chmielak?= Date: Tue, 26 Nov 2024 09:47:24 +0100 Subject: [PATCH 01/10] Add Ray distributed processing strategy --- examples/document-search/distributed.py | 83 +++++++++++++++++++ .../ragbits-document-search/pyproject.toml | 3 + .../processor_strategies/distributed.py | 49 +++++++++++ uv.lock | 38 +++++++++ 4 files changed, 173 insertions(+) create mode 100644 examples/document-search/distributed.py create mode 100644 packages/ragbits-document-search/src/ragbits/document_search/ingestion/processor_strategies/distributed.py diff --git a/examples/document-search/distributed.py b/examples/document-search/distributed.py new file mode 100644 index 00000000..873b1cbe --- /dev/null +++ b/examples/document-search/distributed.py @@ -0,0 +1,83 @@ +""" +Ragbits Document Search Example: Basic wtih distributed ingestion + +This example is based on the "Basic" example, but it demonstrates how to ingest documents in a distributed manner. +The distributed ingestion is provided by "DistributedProcessing" which uses Ray to parallelize the ingestion process. + +The script performs the following steps: + + 1. Create a list of documents. + 2. Initialize the `LiteLLMEmbeddings` class with the OpenAI `text-embedding-3-small` embedding model. + 3. Initialize the `InMemoryVectorStore` class. + 4. Initialize the `DocumentSearch` class with the embedder and the vector store. + 5. Ingest the documents into the `DocumentSearch` instance in a distributed manner. + 6. Search for documents using a query. + 7. Print the search results. + +To run the script, execute the following command: + + ```bash + uv run examples/document-search/distributed.py + ``` +""" + +# /// script +# requires-python = ">=3.10" +# dependencies = [ +# "ragbits-document-search[cluster]", +# "ragbits-core[litellm]", +# ] +# /// + +import asyncio + +from ragbits.core.embeddings.litellm import LiteLLMEmbeddings +from ragbits.core.vector_stores.in_memory import InMemoryVectorStore +from ragbits.document_search import DocumentSearch +from ragbits.document_search.documents.document import DocumentMeta + +documents = [ + DocumentMeta.create_text_document_from_literal( + """ + RIP boiled water. You will be mist. + """ + ), + DocumentMeta.create_text_document_from_literal( + """ + Why doesn't James Bond fart in bed? Because it would blow his cover. + """ + ), + DocumentMeta.create_text_document_from_literal( + """ + Why programmers don't like to swim? Because they're scared of the floating points. + """ + ), + DocumentMeta.create_text_document_from_literal( + """ + This one is completely unrelated. + """ + ), +] + + +async def main() -> None: + """ + Run the example. + """ + embedder = LiteLLMEmbeddings( + model="text-embedding-3-small", + ) + vector_store = InMemoryVectorStore() + document_search = DocumentSearch( + embedder=embedder, + vector_store=vector_store, + ) + + await document_search.ingest(documents) + + results = await document_search.search("I'm boiling my water and I need a joke") + print(results) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/packages/ragbits-document-search/pyproject.toml b/packages/ragbits-document-search/pyproject.toml index d0e1db25..33047a48 100644 --- a/packages/ragbits-document-search/pyproject.toml +++ b/packages/ragbits-document-search/pyproject.toml @@ -40,6 +40,9 @@ gcs = [ huggingface = [ "datasets~=3.0.1", ] +distributed = [ + "ray>=2.39.0", +] [tool.uv] dev-dependencies = [ diff --git a/packages/ragbits-document-search/src/ragbits/document_search/ingestion/processor_strategies/distributed.py b/packages/ragbits-document-search/src/ragbits/document_search/ingestion/processor_strategies/distributed.py new file mode 100644 index 00000000..c68742e7 --- /dev/null +++ b/packages/ragbits-document-search/src/ragbits/document_search/ingestion/processor_strategies/distributed.py @@ -0,0 +1,49 @@ +import asyncio +from collections.abc import Sequence + +import ray + +from ragbits.document_search.documents.document import Document, DocumentMeta +from ragbits.document_search.documents.element import Element +from ragbits.document_search.documents.sources import Source +from ragbits.document_search.ingestion.document_processor import DocumentProcessorRouter +from ragbits.document_search.ingestion.processor_strategies.base import ProcessingExecutionStrategy +from ragbits.document_search.ingestion.providers.base import BaseProvider + + +class DistributedProcessing(ProcessingExecutionStrategy): + """ + A processing execution strategy that processes documents on a cluster, using Ray. + """ + + async def process_documents( + self, + documents: Sequence[DocumentMeta | Document | Source], + processor_router: DocumentProcessorRouter, + processor_overwrite: BaseProvider | None = None, + ) -> list[Element]: + """Process multiple documents asynchronously using Ray distributed computing framework. + + This method processes a sequence of documents in parallel using Ray distributed computing capabilities. + Each document is processed remotely as a separate Ray task. + + Args: + documents (Sequence[DocumentMeta | Document | Source]): A sequence of documents to process, + can be DocumentMeta, Document or Source objects + processor_router (DocumentProcessorRouter): Router that determines which processor to use for each document + processor_overwrite (BaseProvider | None, optional): Provider to override the default processor. + + Returns: + list[Element]: List of processed elements, one for each input document + + Raises: + RayNotInitializedException: If Ray framework is not initialized + """ + process_document_remotely = ray.remote(self.process_document) + + tasks = [ + process_document_remotely.remote(document, processor_router, processor_overwrite) # type: ignore[call-arg] + for document in documents + ] + results = await asyncio.gather(*tasks) + return results # type: ignore[return-value] diff --git a/uv.lock b/uv.lock index 0d7e6720..6eb89c72 100644 --- a/uv.lock +++ b/uv.lock @@ -3901,6 +3901,9 @@ dependencies = [ ] [package.optional-dependencies] +distributed = [ + { name = "ray" }, +] gcs = [ { name = "gcloud-aio-storage" }, ] @@ -3924,6 +3927,7 @@ requires-dist = [ { name = "gcloud-aio-storage", marker = "extra == 'gcs'", specifier = "~=9.3.0" }, { name = "pdf2image", specifier = ">=1.17.0" }, { name = "ragbits-core", editable = "packages/ragbits-core" }, + { name = "ray", marker = "extra == 'distributed'", specifier = ">=2.39.0" }, { name = "unstructured", specifier = ">=0.15.13" }, { name = "unstructured-client", specifier = ">=0.26.0" }, ] @@ -4153,6 +4157,40 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/22/88/a38224c3059a464e7f32115af6eb9cfae755e2a6bb1a4e95cf74227b656f/rapidfuzz-3.10.0-pp310-pypy310_pp73-win_amd64.whl", hash = "sha256:50484d563f8bfa723c74c944b0bb15b9e054db9c889348c8c307abcbee75ab92", size = 1544256 }, ] +[[package]] +name = "ray" +version = "2.39.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "aiosignal" }, + { name = "click" }, + { name = "filelock" }, + { name = "frozenlist" }, + { name = "jsonschema" }, + { name = "msgpack" }, + { name = "packaging" }, + { name = "protobuf" }, + { name = "pyyaml" }, + { name = "requests" }, +] +wheels = [ + { url = "https://files.pythonhosted.org/packages/96/4b/63877b7f78c324675dfa4c0d29e65f61f8ad3edde590eba6400a33bea1c0/ray-2.39.0-cp310-cp310-macosx_10_15_x86_64.whl", hash = "sha256:13d62cead910f433817ca5b41eda75d9c24e81a6b727e0d4e9c5817da86eca5b", size = 66813312 }, + { url = "https://files.pythonhosted.org/packages/6d/20/5c5046798812c2d5569869ed7508adadf0397ea709003c58a245352296eb/ray-2.39.0-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:74219fade4acaf722d34a2630008220a2a5b2ba856e874cd5a8c24ab2f2b2412", size = 64190359 }, + { url = "https://files.pythonhosted.org/packages/d4/eb/595b1f5d3c9562779d6dfc6294d8210f41de2377d748e147a919e7d8bb70/ray-2.39.0-cp310-cp310-manylinux2014_aarch64.whl", hash = "sha256:54ed235b4542ad6d0e317988dc4feaf46af99902f3dfd2097600e0294751bf88", size = 65375308 }, + { url = "https://files.pythonhosted.org/packages/56/41/df809b6a516c0285f7d9aaeefd040910df088508d5471f85e0409f52f070/ray-2.39.0-cp310-cp310-manylinux2014_x86_64.whl", hash = "sha256:6298fb981cd0fa8607f1917deb27925ab8add48c60ba5bd0f6cf40d4cc5dace4", size = 66282109 }, + { url = "https://files.pythonhosted.org/packages/c7/76/080614f743dd4e114e9531a23a672251eb5db7fd75558d7a7107f1a43e3c/ray-2.39.0-cp310-cp310-win_amd64.whl", hash = "sha256:c9d1a26fa3c4d32555c483fab57f54c4ba017f7552732fe9841396aaa24ee6ea", size = 25122397 }, + { url = "https://files.pythonhosted.org/packages/9a/5d/d79ea469070fd220de55c7fe6320c7f89d9c5dc1524eacc522a678c8e278/ray-2.39.0-cp311-cp311-macosx_10_15_x86_64.whl", hash = "sha256:5547f2e6cf3b5d5aaea8aabea2d223a65c9566db198349c0aac668f454710f1a", size = 66748078 }, + { url = "https://files.pythonhosted.org/packages/08/0b/ad824cb0c7637e0ced75766e00a8134b0a756ce532cbb1437ad6d3074a4c/ray-2.39.0-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:7f8a83c2b7719386b3f8d6e3120aae49d9aa4cf49050acaee059b45df92eb281", size = 64121471 }, + { url = "https://files.pythonhosted.org/packages/22/a7/19f327c104b796144048b761ebbb863138cddfc048afcef0ce0a1c41a081/ray-2.39.0-cp311-cp311-manylinux2014_aarch64.whl", hash = "sha256:413488eb2f8bfced8ecc269b120321f33106cbe412a69c3e23ce20c6d5b6f702", size = 65494591 }, + { url = "https://files.pythonhosted.org/packages/1d/c6/6dd145b3952d1454793056849292c6f8b9e1cf4b010cbd3536ef154d9ea1/ray-2.39.0-cp311-cp311-manylinux2014_x86_64.whl", hash = "sha256:21aee127ae1a9cf6193001ab41d2551bcc81331ba3b7196d000f16d10f15c705", size = 66393165 }, + { url = "https://files.pythonhosted.org/packages/6a/0e/f2a02a709c07cbb347a0c32646e67a5ebb0cb57753213b1f0159e98c5ecf/ray-2.39.0-cp311-cp311-win_amd64.whl", hash = "sha256:fdcb7ad51883d194f7b49f23533d29b3c96d78034f829b6cde1e24b6783dff9d", size = 25064437 }, + { url = "https://files.pythonhosted.org/packages/14/8c/e2baa89afb52cf0c82f3796620f8f2a8775dd8bee7f82a51dba2f4928be8/ray-2.39.0-cp312-cp312-macosx_10_15_x86_64.whl", hash = "sha256:77fbcf0002cfbb673b2832e273ee8a834358a2a2bff77e2ff5c97924fcd2b389", size = 66735531 }, + { url = "https://files.pythonhosted.org/packages/36/d4/e5be22aa78e845b60143a9968d37eb0b1ab4211be44c2318d8a02e02b8cd/ray-2.39.0-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:a10cfca3a2f05d285ba1ab3cdd3ce43ec2934b05eb91516a9766bcfc4c070425", size = 64111065 }, + { url = "https://files.pythonhosted.org/packages/be/b8/2ef8ef9c6e3f30c2ad066c8254bd2b6463a6314cd47113f375720ee95dfc/ray-2.39.0-cp312-cp312-manylinux2014_aarch64.whl", hash = "sha256:f8d01550f718a65e0be48da578fa2a3f2e1be85a5453d4b98c3576e1cfaab01b", size = 65516719 }, + { url = "https://files.pythonhosted.org/packages/85/d3/9bc39aa29733a15c553fb23bd97054f8c562590fe47a3cc6096e0bbe1946/ray-2.39.0-cp312-cp312-manylinux2014_x86_64.whl", hash = "sha256:016930e6ba74b91b40117a64b24f7bfff48a6a780f23d2b064a7a3f43bc4e1a2", size = 66442043 }, + { url = "https://files.pythonhosted.org/packages/1d/11/5e0eae513d7bbc5644c56f5bcdb8a32a6aece7487dfa749c7f2b5f0ea3b6/ray-2.39.0-cp312-cp312-win_amd64.whl", hash = "sha256:4893cc7fd8b3c48c68c3d90bc5fe2023ee2732f91e9664ee79e8272b18ddb170", size = 25053175 }, +] + [[package]] name = "referencing" version = "0.35.1" From 2bb0ddaf62b54dc58b32ad53ef3c422ae0b7f879 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Chmielak?= Date: Tue, 26 Nov 2024 12:52:11 +0100 Subject: [PATCH 02/10] Ray logic ready --- examples/document-search/distributed.py | 5 +++- .../processor_strategies/__init__.py | 3 ++- .../processor_strategies/distributed.py | 27 ++++++++++--------- pyproject.toml | 2 +- uv.lock | 4 +-- 5 files changed, 23 insertions(+), 18 deletions(-) diff --git a/examples/document-search/distributed.py b/examples/document-search/distributed.py index 873b1cbe..82d14896 100644 --- a/examples/document-search/distributed.py +++ b/examples/document-search/distributed.py @@ -24,7 +24,7 @@ # /// script # requires-python = ">=3.10" # dependencies = [ -# "ragbits-document-search[cluster]", +# "ragbits-document-search[distributed]", # "ragbits-core[litellm]", # ] # /// @@ -35,6 +35,7 @@ from ragbits.core.vector_stores.in_memory import InMemoryVectorStore from ragbits.document_search import DocumentSearch from ragbits.document_search.documents.document import DocumentMeta +from ragbits.document_search.ingestion.processor_strategies import DistributedProcessing documents = [ DocumentMeta.create_text_document_from_literal( @@ -68,9 +69,11 @@ async def main() -> None: model="text-embedding-3-small", ) vector_store = InMemoryVectorStore() + processing_strategy = DistributedProcessing() document_search = DocumentSearch( embedder=embedder, vector_store=vector_store, + processing_strategy=processing_strategy, ) await document_search.ingest(documents) diff --git a/packages/ragbits-document-search/src/ragbits/document_search/ingestion/processor_strategies/__init__.py b/packages/ragbits-document-search/src/ragbits/document_search/ingestion/processor_strategies/__init__.py index 4924d7be..6a1eb593 100644 --- a/packages/ragbits-document-search/src/ragbits/document_search/ingestion/processor_strategies/__init__.py +++ b/packages/ragbits-document-search/src/ragbits/document_search/ingestion/processor_strategies/__init__.py @@ -5,8 +5,9 @@ from .base import ProcessingExecutionStrategy from .batched import BatchedAsyncProcessing from .sequential import SequentialProcessing +from .distributed import DistributedProcessing -__all__ = ["BatchedAsyncProcessing", "ProcessingExecutionStrategy", "SequentialProcessing"] +__all__ = ["BatchedAsyncProcessing", "ProcessingExecutionStrategy", "SequentialProcessing", "DistributedProcessing"] def get_processing_strategy(config: dict | None = None) -> ProcessingExecutionStrategy: diff --git a/packages/ragbits-document-search/src/ragbits/document_search/ingestion/processor_strategies/distributed.py b/packages/ragbits-document-search/src/ragbits/document_search/ingestion/processor_strategies/distributed.py index c68742e7..f010994a 100644 --- a/packages/ragbits-document-search/src/ragbits/document_search/ingestion/processor_strategies/distributed.py +++ b/packages/ragbits-document-search/src/ragbits/document_search/ingestion/processor_strategies/distributed.py @@ -1,8 +1,6 @@ import asyncio from collections.abc import Sequence -import ray - from ragbits.document_search.documents.document import Document, DocumentMeta from ragbits.document_search.documents.element import Element from ragbits.document_search.documents.sources import Source @@ -22,28 +20,31 @@ async def process_documents( processor_router: DocumentProcessorRouter, processor_overwrite: BaseProvider | None = None, ) -> list[Element]: - """Process multiple documents asynchronously using Ray distributed computing framework. + """Process multiple documents in parallel using Ray distributed computing framework. This method processes a sequence of documents in parallel using Ray distributed computing capabilities. Each document is processed remotely as a separate Ray task. Args: - documents (Sequence[DocumentMeta | Document | Source]): A sequence of documents to process, - can be DocumentMeta, Document or Source objects - processor_router (DocumentProcessorRouter): Router that determines which processor to use for each document - processor_overwrite (BaseProvider | None, optional): Provider to override the default processor. + documents: The documents to process. + processor_router: The document processor router to use. + processor_overwrite: Forces the use of a specific processor, instead of the one provided by the router. Returns: - list[Element]: List of processed elements, one for each input document + A list of elements. Raises: - RayNotInitializedException: If Ray framework is not initialized + ModuleNotFoundError: If Ray is not installed """ - process_document_remotely = ray.remote(self.process_document) + import ray + + @ray.remote + def process_document_remotely(document: DocumentMeta | Document | Source) -> list[Element]: + return asyncio.run(self.process_document(document, processor_router, processor_overwrite)) tasks = [ - process_document_remotely.remote(document, processor_router, processor_overwrite) # type: ignore[call-arg] + process_document_remotely.remote(document) for document in documents ] - results = await asyncio.gather(*tasks) - return results # type: ignore[return-value] + + return sum(await asyncio.gather(*tasks), []) diff --git a/pyproject.toml b/pyproject.toml index 0ee989d7..c15c0f82 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -7,7 +7,7 @@ requires-python = ">=3.10" dependencies = [ "ragbits-cli", "ragbits-core[chroma,lab,litellm,local,otel,qdrant]", - "ragbits-document-search[gcs,huggingface]", + "ragbits-document-search[gcs,huggingface,distributed]", "ragbits-evaluate[relari]", "ragbits-guardrails[openai]", ] diff --git a/uv.lock b/uv.lock index 6eb89c72..ef4b1bcd 100644 --- a/uv.lock +++ b/uv.lock @@ -4031,7 +4031,7 @@ source = { virtual = "." } dependencies = [ { name = "ragbits-cli" }, { name = "ragbits-core", extra = ["chroma", "lab", "litellm", "local", "otel", "qdrant"] }, - { name = "ragbits-document-search", extra = ["gcs", "huggingface"] }, + { name = "ragbits-document-search", extra = ["distributed", "gcs", "huggingface"] }, { name = "ragbits-evaluate", extra = ["relari"] }, { name = "ragbits-guardrails", extra = ["openai"] }, ] @@ -4059,7 +4059,7 @@ dev = [ requires-dist = [ { name = "ragbits-cli", editable = "packages/ragbits-cli" }, { name = "ragbits-core", extras = ["chroma", "lab", "litellm", "local", "otel", "qdrant"], editable = "packages/ragbits-core" }, - { name = "ragbits-document-search", extras = ["gcs", "huggingface"], editable = "packages/ragbits-document-search" }, + { name = "ragbits-document-search", extras = ["gcs", "huggingface", "distributed"], editable = "packages/ragbits-document-search" }, { name = "ragbits-evaluate", extras = ["relari"], editable = "packages/ragbits-evaluate" }, { name = "ragbits-guardrails", extras = ["openai"], editable = "packages/ragbits-guardrails" }, ] From 74238162b0cd09bd6deed7450b54611d0e060cb2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Chmielak?= Date: Tue, 26 Nov 2024 12:53:48 +0100 Subject: [PATCH 03/10] Apply ruff --- .../ingestion/processor_strategies/__init__.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/ragbits-document-search/src/ragbits/document_search/ingestion/processor_strategies/__init__.py b/packages/ragbits-document-search/src/ragbits/document_search/ingestion/processor_strategies/__init__.py index 6a1eb593..b231ed3c 100644 --- a/packages/ragbits-document-search/src/ragbits/document_search/ingestion/processor_strategies/__init__.py +++ b/packages/ragbits-document-search/src/ragbits/document_search/ingestion/processor_strategies/__init__.py @@ -4,10 +4,10 @@ from .base import ProcessingExecutionStrategy from .batched import BatchedAsyncProcessing -from .sequential import SequentialProcessing from .distributed import DistributedProcessing +from .sequential import SequentialProcessing -__all__ = ["BatchedAsyncProcessing", "ProcessingExecutionStrategy", "SequentialProcessing", "DistributedProcessing"] +__all__ = ["BatchedAsyncProcessing", "DistributedProcessing", "ProcessingExecutionStrategy", "SequentialProcessing"] def get_processing_strategy(config: dict | None = None) -> ProcessingExecutionStrategy: From 0667fb0b8f206304ea9f4ae470a05b45b3a6bff0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Chmielak?= Date: Wed, 27 Nov 2024 12:23:28 +0100 Subject: [PATCH 04/10] Unit test --- .../tests/unit/test_processing_strategies.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/packages/ragbits-document-search/tests/unit/test_processing_strategies.py b/packages/ragbits-document-search/tests/unit/test_processing_strategies.py index 3a9ae497..86ae09fb 100644 --- a/packages/ragbits-document-search/tests/unit/test_processing_strategies.py +++ b/packages/ragbits-document-search/tests/unit/test_processing_strategies.py @@ -3,6 +3,7 @@ from ragbits.document_search.documents.document import DocumentMeta, DocumentType from ragbits.document_search.ingestion.document_processor import DocumentProcessorRouter from ragbits.document_search.ingestion.processor_strategies.batched import BatchedAsyncProcessing +from ragbits.document_search.ingestion.processor_strategies.distributed import DistributedProcessing from ragbits.document_search.ingestion.processor_strategies.sequential import SequentialProcessing from ragbits.document_search.ingestion.providers.dummy import DummyProvider @@ -30,3 +31,9 @@ async def test_batched_strategy(documents: list[DocumentMeta]): strategy = BatchedAsyncProcessing(batch_size=2) elements = await strategy.process_documents(documents, router) assert len(elements) == 5 + +async def test_distributed_strategy(documents: list[DocumentMeta]): + router = DocumentProcessorRouter.from_config({DocumentType.TXT: DummyProvider()}) + strategy = DistributedProcessing() + elements = await strategy.process_documents(documents, router) + assert len(elements) == 5 \ No newline at end of file From 2867bd5d005bc214a09c7c8a1fcbb9344d94dedc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Chmielak?= Date: Wed, 27 Nov 2024 12:29:14 +0100 Subject: [PATCH 05/10] Empty line --- .../tests/unit/test_processing_strategies.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/ragbits-document-search/tests/unit/test_processing_strategies.py b/packages/ragbits-document-search/tests/unit/test_processing_strategies.py index 86ae09fb..0883b7d1 100644 --- a/packages/ragbits-document-search/tests/unit/test_processing_strategies.py +++ b/packages/ragbits-document-search/tests/unit/test_processing_strategies.py @@ -36,4 +36,4 @@ async def test_distributed_strategy(documents: list[DocumentMeta]): router = DocumentProcessorRouter.from_config({DocumentType.TXT: DummyProvider()}) strategy = DistributedProcessing() elements = await strategy.process_documents(documents, router) - assert len(elements) == 5 \ No newline at end of file + assert len(elements) == 5 From 828f42e35b3e6b87b724910dee3c18f36d385f89 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Chmielak?= Date: Wed, 27 Nov 2024 12:30:03 +0100 Subject: [PATCH 06/10] Ruff reformat --- .../ingestion/processor_strategies/distributed.py | 5 +---- .../tests/unit/test_processing_strategies.py | 1 + 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/packages/ragbits-document-search/src/ragbits/document_search/ingestion/processor_strategies/distributed.py b/packages/ragbits-document-search/src/ragbits/document_search/ingestion/processor_strategies/distributed.py index f010994a..828be889 100644 --- a/packages/ragbits-document-search/src/ragbits/document_search/ingestion/processor_strategies/distributed.py +++ b/packages/ragbits-document-search/src/ragbits/document_search/ingestion/processor_strategies/distributed.py @@ -42,9 +42,6 @@ async def process_documents( def process_document_remotely(document: DocumentMeta | Document | Source) -> list[Element]: return asyncio.run(self.process_document(document, processor_router, processor_overwrite)) - tasks = [ - process_document_remotely.remote(document) - for document in documents - ] + tasks = [process_document_remotely.remote(document) for document in documents] return sum(await asyncio.gather(*tasks), []) diff --git a/packages/ragbits-document-search/tests/unit/test_processing_strategies.py b/packages/ragbits-document-search/tests/unit/test_processing_strategies.py index 0883b7d1..5b0bdced 100644 --- a/packages/ragbits-document-search/tests/unit/test_processing_strategies.py +++ b/packages/ragbits-document-search/tests/unit/test_processing_strategies.py @@ -32,6 +32,7 @@ async def test_batched_strategy(documents: list[DocumentMeta]): elements = await strategy.process_documents(documents, router) assert len(elements) == 5 + async def test_distributed_strategy(documents: list[DocumentMeta]): router = DocumentProcessorRouter.from_config({DocumentType.TXT: DummyProvider()}) strategy = DistributedProcessing() From 8ce4344e42ea8a676b7e8802161753305d05541a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Chmielak?= Date: Wed, 27 Nov 2024 15:46:56 +0100 Subject: [PATCH 07/10] How-to added --- .../document_search/distributed_ingestion.md | 69 +++++++++++++++++++ 1 file changed, 69 insertions(+) create mode 100644 docs/how-to/document_search/distributed_ingestion.md diff --git a/docs/how-to/document_search/distributed_ingestion.md b/docs/how-to/document_search/distributed_ingestion.md new file mode 100644 index 00000000..a94fa82b --- /dev/null +++ b/docs/how-to/document_search/distributed_ingestion.md @@ -0,0 +1,69 @@ +# How to Ingest Documents in a distributed fashion + +Ragbits Document Search can ingest documents in a distributed fashion if it's installed with `distributed` extra. This can be set up by specifying the `DistributedProcessing` execution strategy when creating the `DocumentSearch` instance. + +```python +from ragbits.core.embeddings.litellm import LiteLLMEmbeddings +from ragbits.core.vector_stores.in_memory import InMemoryVectorStore +from ragbits.document_search import DocumentSearch +from ragbits.document_search.documents.document import DocumentMeta +from ragbits.document_search.ingestion.processor_strategies.distributed import DistributedProcessing + +documents = [ + DocumentMeta.create_text_document_from_literal("Example document 1"), + DocumentMeta.create_text_document_from_literal("Example document 2"), +] + +embedder = LiteLLMEmbeddings( + model="text-embedding-3-small", +) + +vector_store = InMemoryVectorStore() + +processing_strategy = DistributedProcessing() + +document_search = DocumentSearch( + embedder=embedder, + vector_store=vector_store, + processing_strategy=processing_strategy +) +``` + +## Local document ingestion + +By default, when run outside of a Ray cluster, the Ray Core library will parallelize the processing of documents on the local machine, using cores available on the machine. If that is acceptable, you can just use the code above and the documents will be processed in parallel on the local machine. + +## Remote document ingestion + +When run inside a Ray cluster, the Ray Core library will parallelize the processing of documents across the nodes in the cluster. There are several ways of sending documents to the Ray cluster for processing, but using Ray Jobs API is by far the most recommended one. +To use Ray Jobs API, you should prepare the processing script and the documents to be processed, and then submit the job to the Ray cluster. +Make sure to replace `` with the address of your Ray cluster and adjust the `entrypoint` and `runtime_env` parameters to match your setup. + +```python +from ray.job_submission import JobSubmissionClient + +client = JobSubmissionClient("http://:8265") +job_id = client.submit_job( + entrypoint="python script.py", + runtime_env={ + "working_dir": "./", + "pip": [ + "ragbits-core[litellm]", + "ragbits-document-search[distributed]" + ] + }, +) +print(job_id) +``` + +Ray Jobs is also available as CLI commands. You can submit a job using the following command: + +```bash +ray job submit \ + --address http://:8265 \ + --runtime-env '{"pip": ["ragbits-core[litellm]", "ragbits-document-search[distributed]"]}'\ + --working-dir . \ + -- python script.py +``` + +There are also other ways to submit jobs to the Ray cluster. For more information, please refer to the [Ray documentation](https://docs.ray.io/en/latest/ray-overview/index.html). \ No newline at end of file From 32ac95e6863c0e8d8d6f80f7e0fe0fbb7f0c8018 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Chmielak?= Date: Wed, 27 Nov 2024 15:49:49 +0100 Subject: [PATCH 08/10] Remove whitespaces --- docs/how-to/document_search/distributed_ingestion.md | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/docs/how-to/document_search/distributed_ingestion.md b/docs/how-to/document_search/distributed_ingestion.md index a94fa82b..63b49ebf 100644 --- a/docs/how-to/document_search/distributed_ingestion.md +++ b/docs/how-to/document_search/distributed_ingestion.md @@ -35,7 +35,8 @@ By default, when run outside of a Ray cluster, the Ray Core library will paralle ## Remote document ingestion -When run inside a Ray cluster, the Ray Core library will parallelize the processing of documents across the nodes in the cluster. There are several ways of sending documents to the Ray cluster for processing, but using Ray Jobs API is by far the most recommended one. +When run inside a Ray cluster, the Ray Core library will parallelize the processing of documents across the nodes in the cluster. There are several ways of sending documents to the Ray cluster for processing, but using Ray Jobs API is by far the most recommended one. + To use Ray Jobs API, you should prepare the processing script and the documents to be processed, and then submit the job to the Ray cluster. Make sure to replace `` with the address of your Ray cluster and adjust the `entrypoint` and `runtime_env` parameters to match your setup. @@ -46,7 +47,7 @@ client = JobSubmissionClient("http://:8265") job_id = client.submit_job( entrypoint="python script.py", runtime_env={ - "working_dir": "./", + "working_dir": "./", "pip": [ "ragbits-core[litellm]", "ragbits-document-search[distributed]" From 3afcbbeedbea9143e0d9cf01e37ef40632d5100e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Chmielak?= Date: Thu, 28 Nov 2024 12:36:01 +0100 Subject: [PATCH 09/10] Introduce batching --- .../processor_strategies/distributed.py | 37 +++++++++++++++++-- 1 file changed, 33 insertions(+), 4 deletions(-) diff --git a/packages/ragbits-document-search/src/ragbits/document_search/ingestion/processor_strategies/distributed.py b/packages/ragbits-document-search/src/ragbits/document_search/ingestion/processor_strategies/distributed.py index 828be889..a7305e3d 100644 --- a/packages/ragbits-document-search/src/ragbits/document_search/ingestion/processor_strategies/distributed.py +++ b/packages/ragbits-document-search/src/ragbits/document_search/ingestion/processor_strategies/distributed.py @@ -1,6 +1,13 @@ import asyncio from collections.abc import Sequence +try: + import ray + + HAS_RAY = True +except ImportError: + HAS_RAY = False + from ragbits.document_search.documents.document import Document, DocumentMeta from ragbits.document_search.documents.element import Element from ragbits.document_search.documents.sources import Source @@ -14,6 +21,16 @@ class DistributedProcessing(ProcessingExecutionStrategy): A processing execution strategy that processes documents on a cluster, using Ray. """ + def __init__(self, batch_size: int = 10): + """ + Initialize the DistributedProcessing instance. + + Args: + batch_size: The size of the batch to process documents in. + It defaults to 10, but should be increased if the document processing is trivial (< 1s per batch). + """ + self.batch_size = batch_size + async def process_documents( self, documents: Sequence[DocumentMeta | Document | Source], @@ -36,12 +53,24 @@ async def process_documents( Raises: ModuleNotFoundError: If Ray is not installed """ - import ray + if not HAS_RAY: + raise ModuleNotFoundError( + "You need to install the 'distributed' extra requirements to use Ray distributed computing" + ) @ray.remote - def process_document_remotely(document: DocumentMeta | Document | Source) -> list[Element]: - return asyncio.run(self.process_document(document, processor_router, processor_overwrite)) + def process_document_remotely(documents: Sequence[DocumentMeta | Document | Source]) -> list[Element]: + async def process_batch() -> list[list[Element]]: + tasks = [ + self.process_document(document, processor_router, processor_overwrite) for document in documents + ] + return await asyncio.gather(*tasks) + + results = asyncio.run(process_batch()) + return sum(results, []) - tasks = [process_document_remotely.remote(document) for document in documents] + tasks = [] + for i in range(0, len(documents), self.batch_size): + tasks.append(process_document_remotely.remote(documents[i : i + self.batch_size])) return sum(await asyncio.gather(*tasks), []) From ae3f23a8c7ffc8b442ce83ac68297ad6e9a5c280 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Chmielak?= Date: Thu, 28 Nov 2024 13:03:51 +0100 Subject: [PATCH 10/10] Move module check to __init__ --- .../ingestion/processor_strategies/distributed.py | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/packages/ragbits-document-search/src/ragbits/document_search/ingestion/processor_strategies/distributed.py b/packages/ragbits-document-search/src/ragbits/document_search/ingestion/processor_strategies/distributed.py index a7305e3d..aae636da 100644 --- a/packages/ragbits-document-search/src/ragbits/document_search/ingestion/processor_strategies/distributed.py +++ b/packages/ragbits-document-search/src/ragbits/document_search/ingestion/processor_strategies/distributed.py @@ -28,7 +28,15 @@ def __init__(self, batch_size: int = 10): Args: batch_size: The size of the batch to process documents in. It defaults to 10, but should be increased if the document processing is trivial (< 1s per batch). + + Raises: + ModuleNotFoundError: If Ray is not installed. """ + if not HAS_RAY: + raise ModuleNotFoundError( + "You need to install the 'distributed' extra requirements to use Ray distributed computing" + ) + self.batch_size = batch_size async def process_documents( @@ -49,14 +57,7 @@ async def process_documents( Returns: A list of elements. - - Raises: - ModuleNotFoundError: If Ray is not installed """ - if not HAS_RAY: - raise ModuleNotFoundError( - "You need to install the 'distributed' extra requirements to use Ray distributed computing" - ) @ray.remote def process_document_remotely(documents: Sequence[DocumentMeta | Document | Source]) -> list[Element]: