From cced13bbf2b8c3133adde60bcc9f9e2052f032c5 Mon Sep 17 00:00:00 2001 From: willtai Date: Thu, 29 Aug 2024 10:50:21 +0100 Subject: [PATCH] Reorder name in Component arguments (#111) * Reorder name in Component arguments * Replace deprecated .dict() with .model_dump() --- examples/pipeline/kg_builder_from_pdf.py | 10 +- examples/pipeline/kg_builder_from_text.py | 11 +-- .../experimental/components/schema.py | 6 +- .../experimental/pipeline/pipeline.py | 7 +- tests/e2e/test_kb_builder_pipeline_e2e.py | 19 +++- .../pipeline/test_orchestrator.py | 12 +-- .../experimental/pipeline/test_pipeline.py | 94 +++++++++++++++---- 7 files changed, 112 insertions(+), 47 deletions(-) diff --git a/examples/pipeline/kg_builder_from_pdf.py b/examples/pipeline/kg_builder_from_pdf.py index b5e02977e..a5e290160 100644 --- a/examples/pipeline/kg_builder_from_pdf.py +++ b/examples/pipeline/kg_builder_from_pdf.py @@ -139,17 +139,16 @@ async def main(neo4j_driver: neo4j.Driver) -> dict[str, Any]: # Set up the pipeline pipe = Pipeline() - pipe.add_component("pdf_loader", PdfLoader()) + pipe.add_component(PdfLoader(), "pdf_loader") pipe.add_component( - "splitter", LangChainTextSplitterAdapter( # chunk_size=50 for the sake of this demo CharacterTextSplitter(chunk_size=50, chunk_overlap=10, separator=".") ), + "splitter", ) - pipe.add_component("schema", SchemaBuilder()) + pipe.add_component(SchemaBuilder(), "schema") pipe.add_component( - "extractor", LLMEntityRelationExtractor( llm=OpenAILLM( model_name="gpt-4o", @@ -160,8 +159,9 @@ async def main(neo4j_driver: neo4j.Driver) -> dict[str, Any]: ), on_error=OnError.RAISE, ), + "extractor", ) - pipe.add_component("writer", Neo4jWriter(neo4j_driver)) + pipe.add_component(Neo4jWriter(neo4j_driver), "writer") pipe.connect("pdf_loader", "splitter", input_config={"text": "pdf_loader.text"}) pipe.connect("splitter", "extractor", input_config={"chunks": "splitter"}) pipe.connect("schema", "extractor", input_config={"schema": "schema"}) diff --git a/examples/pipeline/kg_builder_from_text.py b/examples/pipeline/kg_builder_from_text.py index dcaa102fd..5e7175787 100644 --- a/examples/pipeline/kg_builder_from_text.py +++ b/examples/pipeline/kg_builder_from_text.py @@ -20,8 +20,6 @@ import neo4j from langchain_text_splitters import CharacterTextSplitter -from neo4j_genai.embeddings.openai import OpenAIEmbeddings -from neo4j_genai.experimental.components.embedder import TextChunkEmbedder from neo4j_genai.experimental.components.entity_relation_extractor import ( LLMEntityRelationExtractor, OnError, @@ -76,16 +74,14 @@ async def main(neo4j_driver: neo4j.Driver) -> dict[str, Any]: pipe = Pipeline() # define the components pipe.add_component( - "splitter", LangChainTextSplitterAdapter( # chunk_size=50 for the sake of this demo CharacterTextSplitter(chunk_size=50, chunk_overlap=10, separator=".") ), + "splitter", ) - pipe.add_component("chunk_embedder", TextChunkEmbedder(embedder=OpenAIEmbeddings())) - pipe.add_component("schema", SchemaBuilder()) + pipe.add_component(SchemaBuilder(), "schema") pipe.add_component( - "extractor", LLMEntityRelationExtractor( llm=OpenAILLM( model_name="gpt-4o", @@ -96,8 +92,9 @@ async def main(neo4j_driver: neo4j.Driver) -> dict[str, Any]: ), on_error=OnError.RAISE, ), + "extractor", ) - pipe.add_component("writer", Neo4jWriter(neo4j_driver)) + pipe.add_component(Neo4jWriter(neo4j_driver), "writer") # define the execution order of component # and how the output of previous components must be used pipe.connect("splitter", "chunk_embedder", input_config={"text_chunks": "splitter"}) diff --git a/src/neo4j_genai/experimental/components/schema.py b/src/neo4j_genai/experimental/components/schema.py index 83ecf0ba9..eed586d3d 100644 --- a/src/neo4j_genai/experimental/components/schema.py +++ b/src/neo4j_genai/experimental/components/schema.py @@ -175,8 +175,10 @@ def create_schema_model( Returns: SchemaConfig: A configured schema object. """ - entity_dict = {entity.label: entity.dict() for entity in entities} - relation_dict = {relation.label: relation.dict() for relation in relations} + entity_dict = {entity.label: entity.model_dump() for entity in entities} + relation_dict = { + relation.label: relation.model_dump() for relation in relations + } try: return SchemaConfig( diff --git a/src/neo4j_genai/experimental/pipeline/pipeline.py b/src/neo4j_genai/experimental/pipeline/pipeline.py index b8243aa62..61bc97954 100644 --- a/src/neo4j_genai/experimental/pipeline/pipeline.py +++ b/src/neo4j_genai/experimental/pipeline/pipeline.py @@ -335,7 +335,10 @@ def from_template( """Create a Pipeline from a pydantic model defining the components and their connections""" pipeline = Pipeline(store=store) for component in pipeline_template.components: - pipeline.add_component(component.name, component.component) + pipeline.add_component( + component.component, + component.name, + ) for edge in pipeline_template.connections: pipeline_edge = PipelineEdge( edge.start, edge.end, data={"input_config": edge.input_config} @@ -363,7 +366,7 @@ def show_as_dict(self) -> dict[str, Any]: ) return pipeline_config.model_dump() - def add_component(self, name: str, component: Component) -> None: + def add_component(self, component: Component, name: str) -> None: task = TaskPipelineNode(name, component, self) self.add_node(task) diff --git a/tests/e2e/test_kb_builder_pipeline_e2e.py b/tests/e2e/test_kb_builder_pipeline_e2e.py index 6d9a52488..ee6b8a856 100644 --- a/tests/e2e/test_kb_builder_pipeline_e2e.py +++ b/tests/e2e/test_kb_builder_pipeline_e2e.py @@ -97,16 +97,25 @@ def kg_builder_pipeline( pipe = Pipeline() # define the components pipe.add_component( - "splitter", text_splitter, + "splitter", + ) + pipe.add_component( + chunk_embedder, + "embedder", + ) + pipe.add_component( + schema_builder, + "schema", ) - pipe.add_component("embedder", chunk_embedder) - pipe.add_component("schema", schema_builder) pipe.add_component( - "extractor", entity_relation_extractor, + "extractor", + ) + pipe.add_component( + kg_writer, + "writer", ) - pipe.add_component("writer", kg_writer) # define the execution order of component # and how the output of previous components must be used pipe.connect("splitter", "embedder", input_config={"text_chunks": "splitter"}) diff --git a/tests/unit/experimental/pipeline/test_orchestrator.py b/tests/unit/experimental/pipeline/test_orchestrator.py index 8e319df9f..1e546877a 100644 --- a/tests/unit/experimental/pipeline/test_orchestrator.py +++ b/tests/unit/experimental/pipeline/test_orchestrator.py @@ -20,9 +20,9 @@ @pytest.fixture(scope="function") def pipeline_branch() -> Pipeline: pipe = Pipeline() - pipe.add_component("a", Component()) # type: ignore - pipe.add_component("b", Component()) # type: ignore - pipe.add_component("c", Component()) # type: ignore + pipe.add_component(Component(), "a") # type: ignore + pipe.add_component(Component(), "b") # type: ignore + pipe.add_component(Component(), "c") # type: ignore pipe.connect("a", "b") pipe.connect("a", "c") return pipe @@ -31,9 +31,9 @@ def pipeline_branch() -> Pipeline: @pytest.fixture(scope="function") def pipeline_aggregation() -> Pipeline: pipe = Pipeline() - pipe.add_component("a", Component()) # type: ignore - pipe.add_component("b", Component()) # type: ignore - pipe.add_component("c", Component()) # type: ignore + pipe.add_component(Component(), "a") # type: ignore + pipe.add_component(Component(), "b") # type: ignore + pipe.add_component(Component(), "c") # type: ignore pipe.connect("a", "b") pipe.connect("a", "c") return pipe diff --git a/tests/unit/experimental/pipeline/test_pipeline.py b/tests/unit/experimental/pipeline/test_pipeline.py index f226dc348..2bf1713cd 100644 --- a/tests/unit/experimental/pipeline/test_pipeline.py +++ b/tests/unit/experimental/pipeline/test_pipeline.py @@ -35,8 +35,14 @@ async def test_simple_pipeline_two_components() -> None: pipe = Pipeline() component_a = ComponentNoParam() component_b = ComponentNoParam() - pipe.add_component("a", component_a) - pipe.add_component("b", component_b) + pipe.add_component( + component_a, + "a", + ) + pipe.add_component( + component_b, + "b", + ) pipe.connect("a", "b", {}) with mock.patch( "tests.unit.experimental.pipeline.test_pipeline.ComponentNoParam.run" @@ -57,8 +63,14 @@ async def test_pipeline_parameter_propagation() -> None: pipe = Pipeline() component_a = ComponentPassThrough() component_b = ComponentPassThrough() - pipe.add_component("a", component_a) - pipe.add_component("b", component_b) + pipe.add_component( + component_a, + "a", + ) + pipe.add_component( + component_b, + "b", + ) # first component output product goes to second component input number1 pipe.connect( "a", @@ -90,9 +102,15 @@ async def test_pipeline_branches() -> None: component_c = AsyncMock(spec=Component) component_c.run = AsyncMock(return_value={}) - pipe.add_component("a", component_a) - pipe.add_component("b", component_b) - pipe.add_component("c", component_c) + pipe.add_component( + component_a, + "a", + ) + pipe.add_component( + component_b, + "b", + ) + pipe.add_component(component_c, "c") pipe.connect("a", "b") pipe.connect("a", "c") res = await pipe.run({}) @@ -110,9 +128,15 @@ async def test_pipeline_aggregation() -> None: component_c = AsyncMock(spec=Component) component_c.run = AsyncMock(return_value={}) - pipe.add_component("a", component_a) - pipe.add_component("b", component_b) - pipe.add_component("c", component_c) + pipe.add_component( + component_a, + "a", + ) + pipe.add_component( + component_b, + "b", + ) + pipe.add_component(component_c, "c") pipe.connect("a", "c") pipe.connect("b", "c") res = await pipe.run({}) @@ -124,8 +148,14 @@ async def test_pipeline_missing_param_on_init() -> None: pipe = Pipeline() component_a = ComponentAdd() component_b = ComponentAdd() - pipe.add_component("a", component_a) - pipe.add_component("b", component_b) + pipe.add_component( + component_a, + "a", + ) + pipe.add_component( + component_b, + "b", + ) pipe.connect("a", "b", {"number1": "a.result"}) with pytest.raises(PipelineDefinitionError) as excinfo: await pipe.run({"a": {"number1": 1}}) @@ -140,8 +170,14 @@ async def test_pipeline_missing_param_on_connect() -> None: pipe = Pipeline() component_a = ComponentAdd() component_b = ComponentAdd() - pipe.add_component("a", component_a) - pipe.add_component("b", component_b) + pipe.add_component( + component_a, + "a", + ) + pipe.add_component( + component_b, + "b", + ) pipe.connect("a", "b", {"number1": "a.result"}) with pytest.raises(PipelineDefinitionError) as excinfo: await pipe.run({"a": {"number1": 1, "number2": 2}}) @@ -156,8 +192,14 @@ async def test_pipeline_with_default_params() -> None: pipe = Pipeline() component_a = ComponentAdd() component_b = ComponentMultiply() - pipe.add_component("a", component_a) - pipe.add_component("b", component_b) + pipe.add_component( + component_a, + "a", + ) + pipe.add_component( + component_b, + "b", + ) pipe.connect("a", "b", {"number1": "a.result"}) res = await pipe.run({"a": {"number1": 1, "number2": 2}}) assert res == {"b": {"result": 6}} # (1+2)*2 @@ -168,8 +210,14 @@ async def test_pipeline_cycle() -> None: pipe = Pipeline() component_a = ComponentNoParam() component_b = ComponentNoParam() - pipe.add_component("a", component_a) - pipe.add_component("b", component_b) + pipe.add_component( + component_a, + "a", + ) + pipe.add_component( + component_b, + "b", + ) pipe.connect("a", "b", {}) with pytest.raises(PipelineDefinitionError) as excinfo: pipe.connect("b", "a", {}) @@ -181,8 +229,14 @@ async def test_pipeline_wrong_component_name() -> None: pipe = Pipeline() component_a = ComponentNoParam() component_b = ComponentNoParam() - pipe.add_component("a", component_a) - pipe.add_component("b", component_b) + pipe.add_component( + component_a, + "a", + ) + pipe.add_component( + component_b, + "b", + ) with pytest.raises(PipelineDefinitionError) as excinfo: pipe.connect("a", "c", {}) assert "a or c not in the Pipeline" in str(excinfo.value)