diff --git a/integrations/fastembed/src/haystack_integrations/components/embedders/fastembed/fastembed_document_embedder.py b/integrations/fastembed/src/haystack_integrations/components/embedders/fastembed/fastembed_document_embedder.py index b913b0de4..8f7dd8cd0 100644 --- a/integrations/fastembed/src/haystack_integrations/components/embedders/fastembed/fastembed_document_embedder.py +++ b/integrations/fastembed/src/haystack_integrations/components/embedders/fastembed/fastembed_document_embedder.py @@ -57,6 +57,7 @@ def __init__( suffix: str = "", batch_size: int = 256, progress_bar: bool = True, + parallel: Optional[int] = None, meta_fields_to_embed: Optional[List[str]] = None, embedding_separator: str = "\n", ): @@ -69,6 +70,10 @@ def __init__( :param suffix: A string to add to the end of each text. :param batch_size: Number of strings to encode at once. :param progress_bar: If true, displays progress bar during embedding. + :param parallel: + If > 1, data-parallel encoding will be used, recommended for offline encoding of large datasets. + If 0, use all available cores. + If None, don't use data-parallel processing, use default onnxruntime threading instead. :param meta_fields_to_embed: List of meta fields that should be embedded along with the Document content. :param embedding_separator: Separator used to concatenate the meta fields to the Document content. """ @@ -78,6 +83,7 @@ def __init__( self.suffix = suffix self.batch_size = batch_size self.progress_bar = progress_bar + self.parallel = parallel self.meta_fields_to_embed = meta_fields_to_embed or [] self.embedding_separator = embedding_separator @@ -92,6 +98,7 @@ def to_dict(self) -> Dict[str, Any]: suffix=self.suffix, batch_size=self.batch_size, progress_bar=self.progress_bar, + parallel=self.parallel, meta_fields_to_embed=self.meta_fields_to_embed, embedding_separator=self.embedding_separator, ) @@ -139,6 +146,7 @@ def run(self, documents: List[Document]): texts_to_embed, batch_size=self.batch_size, show_progress_bar=self.progress_bar, + parallel=self.parallel, ) for doc, emb in zip(documents, embeddings): diff --git a/integrations/fastembed/src/haystack_integrations/components/embedders/fastembed/fastembed_text_embedder.py b/integrations/fastembed/src/haystack_integrations/components/embedders/fastembed/fastembed_text_embedder.py index 455a1f94b..2f0b3ae62 100644 --- a/integrations/fastembed/src/haystack_integrations/components/embedders/fastembed/fastembed_text_embedder.py +++ b/integrations/fastembed/src/haystack_integrations/components/embedders/fastembed/fastembed_text_embedder.py @@ -1,4 +1,4 @@ -from typing import Any, Dict, List +from typing import Any, Dict, List, Optional from haystack import component, default_to_dict @@ -35,6 +35,7 @@ def __init__( suffix: str = "", batch_size: int = 256, progress_bar: bool = True, + parallel: Optional[int] = None, ): """ Create a FastembedTextEmbedder component. @@ -44,6 +45,11 @@ def __init__( :param batch_size: Number of strings to encode at once. :param prefix: A string to add to the beginning of each text. :param suffix: A string to add to the end of each text. + :param progress_bar: If true, displays progress bar during embedding. + :param parallel: + If > 1, data-parallel encoding will be used, recommended for offline encoding of large datasets. + If 0, use all available cores. + If None, don't use data-parallel processing, use default onnxruntime threading instead. """ # TODO add parallel @@ -53,6 +59,7 @@ def __init__( self.suffix = suffix self.batch_size = batch_size self.progress_bar = progress_bar + self.parallel = parallel def to_dict(self) -> Dict[str, Any]: """ @@ -65,6 +72,7 @@ def to_dict(self) -> Dict[str, Any]: suffix=self.suffix, batch_size=self.batch_size, progress_bar=self.progress_bar, + parallel=self.parallel, ) def warm_up(self): @@ -93,6 +101,7 @@ def run(self, text: str): text_to_embed, batch_size=self.batch_size, show_progress_bar=self.progress_bar, + parallel=self.parallel, )[0] ) return {"embedding": embedding} diff --git a/integrations/fastembed/tests/test_fastembed_document_embedder.py b/integrations/fastembed/tests/test_fastembed_document_embedder.py index 597999354..baaf250f2 100644 --- a/integrations/fastembed/tests/test_fastembed_document_embedder.py +++ b/integrations/fastembed/tests/test_fastembed_document_embedder.py @@ -19,6 +19,7 @@ def test_init_default(self): assert embedder.suffix == "" assert embedder.batch_size == 256 assert embedder.progress_bar is True + assert embedder.parallel is None assert embedder.meta_fields_to_embed == [] assert embedder.embedding_separator == "\n" @@ -32,6 +33,7 @@ def test_init_with_parameters(self): suffix="suffix", batch_size=64, progress_bar=False, + parallel=1, meta_fields_to_embed=["test_field"], embedding_separator=" | ", ) @@ -40,6 +42,7 @@ def test_init_with_parameters(self): assert embedder.suffix == "suffix" assert embedder.batch_size == 64 assert embedder.progress_bar is False + assert embedder.parallel == 1 assert embedder.meta_fields_to_embed == ["test_field"] assert embedder.embedding_separator == " | " @@ -57,6 +60,7 @@ def test_to_dict(self): "suffix": "", "batch_size": 256, "progress_bar": True, + "parallel": None, "embedding_separator": "\n", "meta_fields_to_embed": [], }, @@ -72,6 +76,7 @@ def test_to_dict_with_custom_init_parameters(self): suffix="suffix", batch_size=64, progress_bar=False, + parallel=1, meta_fields_to_embed=["test_field"], embedding_separator=" | ", ) @@ -84,6 +89,7 @@ def test_to_dict_with_custom_init_parameters(self): "suffix": "suffix", "batch_size": 64, "progress_bar": False, + "parallel": 1, "meta_fields_to_embed": ["test_field"], "embedding_separator": " | ", }, @@ -101,6 +107,7 @@ def test_from_dict(self): "suffix": "", "batch_size": 256, "progress_bar": True, + "parallel": None, "meta_fields_to_embed": [], "embedding_separator": "\n", }, @@ -111,6 +118,7 @@ def test_from_dict(self): assert embedder.suffix == "" assert embedder.batch_size == 256 assert embedder.progress_bar is True + assert embedder.parallel is None assert embedder.meta_fields_to_embed == [] assert embedder.embedding_separator == "\n" @@ -126,6 +134,7 @@ def test_from_dict_with_custom_init_parameters(self): "suffix": "suffix", "batch_size": 64, "progress_bar": False, + "parallel": 1, "meta_fields_to_embed": ["test_field"], "embedding_separator": " | ", }, @@ -136,6 +145,7 @@ def test_from_dict_with_custom_init_parameters(self): assert embedder.suffix == "suffix" assert embedder.batch_size == 64 assert embedder.progress_bar is False + assert embedder.parallel == 1 assert embedder.meta_fields_to_embed == ["test_field"] assert embedder.embedding_separator == " | " @@ -232,6 +242,7 @@ def test_embed_metadata(self): ], batch_size=256, show_progress_bar=True, + parallel=None, ) @pytest.mark.integration diff --git a/integrations/fastembed/tests/test_fastembed_text_embedder.py b/integrations/fastembed/tests/test_fastembed_text_embedder.py index 3a7588263..42134a60e 100644 --- a/integrations/fastembed/tests/test_fastembed_text_embedder.py +++ b/integrations/fastembed/tests/test_fastembed_text_embedder.py @@ -19,6 +19,7 @@ def test_init_default(self): assert embedder.suffix == "" assert embedder.batch_size == 256 assert embedder.progress_bar is True + assert embedder.parallel is None def test_init_with_parameters(self): """ @@ -30,12 +31,14 @@ def test_init_with_parameters(self): suffix="suffix", batch_size=64, progress_bar=False, + parallel=1, ) assert embedder.model_name == "BAAI/bge-small-en-v1.5" assert embedder.prefix == "prefix" assert embedder.suffix == "suffix" assert embedder.batch_size == 64 assert embedder.progress_bar is False + assert embedder.parallel == 1 def test_to_dict(self): """ @@ -51,6 +54,7 @@ def test_to_dict(self): "suffix": "", "batch_size": 256, "progress_bar": True, + "parallel": None, }, } @@ -64,6 +68,7 @@ def test_to_dict_with_custom_init_parameters(self): suffix="suffix", batch_size=64, progress_bar=False, + parallel=1, ) embedder_dict = embedder.to_dict() assert embedder_dict == { @@ -74,6 +79,7 @@ def test_to_dict_with_custom_init_parameters(self): "suffix": "suffix", "batch_size": 64, "progress_bar": False, + "parallel": 1, }, } @@ -89,6 +95,7 @@ def test_from_dict(self): "suffix": "", "batch_size": 256, "progress_bar": True, + "parallel": None, }, } embedder = default_from_dict(FastembedTextEmbedder, embedder_dict) @@ -97,6 +104,7 @@ def test_from_dict(self): assert embedder.suffix == "" assert embedder.batch_size == 256 assert embedder.progress_bar is True + assert embedder.parallel is None def test_from_dict_with_custom_init_parameters(self): """ @@ -110,6 +118,7 @@ def test_from_dict_with_custom_init_parameters(self): "suffix": "suffix", "batch_size": 64, "progress_bar": False, + "parallel": 1, }, } embedder = default_from_dict(FastembedTextEmbedder, embedder_dict) @@ -118,6 +127,7 @@ def test_from_dict_with_custom_init_parameters(self): assert embedder.suffix == "suffix" assert embedder.batch_size == 64 assert embedder.progress_bar is False + assert embedder.parallel == 1 @patch( "haystack_integrations.components.embedders.fastembed.fastembed_text_embedder._FastembedEmbeddingBackendFactory"