Skip to content

Commit

Permalink
Reorder name in Component arguments (neo4j#111)
Browse files Browse the repository at this point in the history
* Reorder name in Component arguments

* Replace deprecated .dict() with .model_dump()
  • Loading branch information
willtai authored Aug 29, 2024
1 parent b89b932 commit cced13b
Show file tree
Hide file tree
Showing 7 changed files with 112 additions and 47 deletions.
10 changes: 5 additions & 5 deletions examples/pipeline/kg_builder_from_pdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,17 +139,16 @@ async def main(neo4j_driver: neo4j.Driver) -> dict[str, Any]:

# Set up the pipeline
pipe = Pipeline()
pipe.add_component("pdf_loader", PdfLoader())
pipe.add_component(PdfLoader(), "pdf_loader")
pipe.add_component(
"splitter",
LangChainTextSplitterAdapter(
# chunk_size=50 for the sake of this demo
CharacterTextSplitter(chunk_size=50, chunk_overlap=10, separator=".")
),
"splitter",
)
pipe.add_component("schema", SchemaBuilder())
pipe.add_component(SchemaBuilder(), "schema")
pipe.add_component(
"extractor",
LLMEntityRelationExtractor(
llm=OpenAILLM(
model_name="gpt-4o",
Expand All @@ -160,8 +159,9 @@ async def main(neo4j_driver: neo4j.Driver) -> dict[str, Any]:
),
on_error=OnError.RAISE,
),
"extractor",
)
pipe.add_component("writer", Neo4jWriter(neo4j_driver))
pipe.add_component(Neo4jWriter(neo4j_driver), "writer")
pipe.connect("pdf_loader", "splitter", input_config={"text": "pdf_loader.text"})
pipe.connect("splitter", "extractor", input_config={"chunks": "splitter"})
pipe.connect("schema", "extractor", input_config={"schema": "schema"})
Expand Down
11 changes: 4 additions & 7 deletions examples/pipeline/kg_builder_from_text.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@

import neo4j
from langchain_text_splitters import CharacterTextSplitter
from neo4j_genai.embeddings.openai import OpenAIEmbeddings
from neo4j_genai.experimental.components.embedder import TextChunkEmbedder
from neo4j_genai.experimental.components.entity_relation_extractor import (
LLMEntityRelationExtractor,
OnError,
Expand Down Expand Up @@ -76,16 +74,14 @@ async def main(neo4j_driver: neo4j.Driver) -> dict[str, Any]:
pipe = Pipeline()
# define the components
pipe.add_component(
"splitter",
LangChainTextSplitterAdapter(
# chunk_size=50 for the sake of this demo
CharacterTextSplitter(chunk_size=50, chunk_overlap=10, separator=".")
),
"splitter",
)
pipe.add_component("chunk_embedder", TextChunkEmbedder(embedder=OpenAIEmbeddings()))
pipe.add_component("schema", SchemaBuilder())
pipe.add_component(SchemaBuilder(), "schema")
pipe.add_component(
"extractor",
LLMEntityRelationExtractor(
llm=OpenAILLM(
model_name="gpt-4o",
Expand All @@ -96,8 +92,9 @@ async def main(neo4j_driver: neo4j.Driver) -> dict[str, Any]:
),
on_error=OnError.RAISE,
),
"extractor",
)
pipe.add_component("writer", Neo4jWriter(neo4j_driver))
pipe.add_component(Neo4jWriter(neo4j_driver), "writer")
# define the execution order of component
# and how the output of previous components must be used
pipe.connect("splitter", "chunk_embedder", input_config={"text_chunks": "splitter"})
Expand Down
6 changes: 4 additions & 2 deletions src/neo4j_genai/experimental/components/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,10 @@ def create_schema_model(
Returns:
SchemaConfig: A configured schema object.
"""
entity_dict = {entity.label: entity.dict() for entity in entities}
relation_dict = {relation.label: relation.dict() for relation in relations}
entity_dict = {entity.label: entity.model_dump() for entity in entities}
relation_dict = {
relation.label: relation.model_dump() for relation in relations
}

try:
return SchemaConfig(
Expand Down
7 changes: 5 additions & 2 deletions src/neo4j_genai/experimental/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,10 @@ def from_template(
"""Create a Pipeline from a pydantic model defining the components and their connections"""
pipeline = Pipeline(store=store)
for component in pipeline_template.components:
pipeline.add_component(component.name, component.component)
pipeline.add_component(
component.component,
component.name,
)
for edge in pipeline_template.connections:
pipeline_edge = PipelineEdge(
edge.start, edge.end, data={"input_config": edge.input_config}
Expand Down Expand Up @@ -363,7 +366,7 @@ def show_as_dict(self) -> dict[str, Any]:
)
return pipeline_config.model_dump()

def add_component(self, name: str, component: Component) -> None:
def add_component(self, component: Component, name: str) -> None:
task = TaskPipelineNode(name, component, self)
self.add_node(task)

Expand Down
19 changes: 14 additions & 5 deletions tests/e2e/test_kb_builder_pipeline_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,16 +97,25 @@ def kg_builder_pipeline(
pipe = Pipeline()
# define the components
pipe.add_component(
"splitter",
text_splitter,
"splitter",
)
pipe.add_component(
chunk_embedder,
"embedder",
)
pipe.add_component(
schema_builder,
"schema",
)
pipe.add_component("embedder", chunk_embedder)
pipe.add_component("schema", schema_builder)
pipe.add_component(
"extractor",
entity_relation_extractor,
"extractor",
)
pipe.add_component(
kg_writer,
"writer",
)
pipe.add_component("writer", kg_writer)
# define the execution order of component
# and how the output of previous components must be used
pipe.connect("splitter", "embedder", input_config={"text_chunks": "splitter"})
Expand Down
12 changes: 6 additions & 6 deletions tests/unit/experimental/pipeline/test_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@
@pytest.fixture(scope="function")
def pipeline_branch() -> Pipeline:
pipe = Pipeline()
pipe.add_component("a", Component()) # type: ignore
pipe.add_component("b", Component()) # type: ignore
pipe.add_component("c", Component()) # type: ignore
pipe.add_component(Component(), "a") # type: ignore
pipe.add_component(Component(), "b") # type: ignore
pipe.add_component(Component(), "c") # type: ignore
pipe.connect("a", "b")
pipe.connect("a", "c")
return pipe
Expand All @@ -31,9 +31,9 @@ def pipeline_branch() -> Pipeline:
@pytest.fixture(scope="function")
def pipeline_aggregation() -> Pipeline:
pipe = Pipeline()
pipe.add_component("a", Component()) # type: ignore
pipe.add_component("b", Component()) # type: ignore
pipe.add_component("c", Component()) # type: ignore
pipe.add_component(Component(), "a") # type: ignore
pipe.add_component(Component(), "b") # type: ignore
pipe.add_component(Component(), "c") # type: ignore
pipe.connect("a", "b")
pipe.connect("a", "c")
return pipe
Expand Down
94 changes: 74 additions & 20 deletions tests/unit/experimental/pipeline/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,14 @@ async def test_simple_pipeline_two_components() -> None:
pipe = Pipeline()
component_a = ComponentNoParam()
component_b = ComponentNoParam()
pipe.add_component("a", component_a)
pipe.add_component("b", component_b)
pipe.add_component(
component_a,
"a",
)
pipe.add_component(
component_b,
"b",
)
pipe.connect("a", "b", {})
with mock.patch(
"tests.unit.experimental.pipeline.test_pipeline.ComponentNoParam.run"
Expand All @@ -57,8 +63,14 @@ async def test_pipeline_parameter_propagation() -> None:
pipe = Pipeline()
component_a = ComponentPassThrough()
component_b = ComponentPassThrough()
pipe.add_component("a", component_a)
pipe.add_component("b", component_b)
pipe.add_component(
component_a,
"a",
)
pipe.add_component(
component_b,
"b",
)
# first component output product goes to second component input number1
pipe.connect(
"a",
Expand Down Expand Up @@ -90,9 +102,15 @@ async def test_pipeline_branches() -> None:
component_c = AsyncMock(spec=Component)
component_c.run = AsyncMock(return_value={})

pipe.add_component("a", component_a)
pipe.add_component("b", component_b)
pipe.add_component("c", component_c)
pipe.add_component(
component_a,
"a",
)
pipe.add_component(
component_b,
"b",
)
pipe.add_component(component_c, "c")
pipe.connect("a", "b")
pipe.connect("a", "c")
res = await pipe.run({})
Expand All @@ -110,9 +128,15 @@ async def test_pipeline_aggregation() -> None:
component_c = AsyncMock(spec=Component)
component_c.run = AsyncMock(return_value={})

pipe.add_component("a", component_a)
pipe.add_component("b", component_b)
pipe.add_component("c", component_c)
pipe.add_component(
component_a,
"a",
)
pipe.add_component(
component_b,
"b",
)
pipe.add_component(component_c, "c")
pipe.connect("a", "c")
pipe.connect("b", "c")
res = await pipe.run({})
Expand All @@ -124,8 +148,14 @@ async def test_pipeline_missing_param_on_init() -> None:
pipe = Pipeline()
component_a = ComponentAdd()
component_b = ComponentAdd()
pipe.add_component("a", component_a)
pipe.add_component("b", component_b)
pipe.add_component(
component_a,
"a",
)
pipe.add_component(
component_b,
"b",
)
pipe.connect("a", "b", {"number1": "a.result"})
with pytest.raises(PipelineDefinitionError) as excinfo:
await pipe.run({"a": {"number1": 1}})
Expand All @@ -140,8 +170,14 @@ async def test_pipeline_missing_param_on_connect() -> None:
pipe = Pipeline()
component_a = ComponentAdd()
component_b = ComponentAdd()
pipe.add_component("a", component_a)
pipe.add_component("b", component_b)
pipe.add_component(
component_a,
"a",
)
pipe.add_component(
component_b,
"b",
)
pipe.connect("a", "b", {"number1": "a.result"})
with pytest.raises(PipelineDefinitionError) as excinfo:
await pipe.run({"a": {"number1": 1, "number2": 2}})
Expand All @@ -156,8 +192,14 @@ async def test_pipeline_with_default_params() -> None:
pipe = Pipeline()
component_a = ComponentAdd()
component_b = ComponentMultiply()
pipe.add_component("a", component_a)
pipe.add_component("b", component_b)
pipe.add_component(
component_a,
"a",
)
pipe.add_component(
component_b,
"b",
)
pipe.connect("a", "b", {"number1": "a.result"})
res = await pipe.run({"a": {"number1": 1, "number2": 2}})
assert res == {"b": {"result": 6}} # (1+2)*2
Expand All @@ -168,8 +210,14 @@ async def test_pipeline_cycle() -> None:
pipe = Pipeline()
component_a = ComponentNoParam()
component_b = ComponentNoParam()
pipe.add_component("a", component_a)
pipe.add_component("b", component_b)
pipe.add_component(
component_a,
"a",
)
pipe.add_component(
component_b,
"b",
)
pipe.connect("a", "b", {})
with pytest.raises(PipelineDefinitionError) as excinfo:
pipe.connect("b", "a", {})
Expand All @@ -181,8 +229,14 @@ async def test_pipeline_wrong_component_name() -> None:
pipe = Pipeline()
component_a = ComponentNoParam()
component_b = ComponentNoParam()
pipe.add_component("a", component_a)
pipe.add_component("b", component_b)
pipe.add_component(
component_a,
"a",
)
pipe.add_component(
component_b,
"b",
)
with pytest.raises(PipelineDefinitionError) as excinfo:
pipe.connect("a", "c", {})
assert "a or c not in the Pipeline" in str(excinfo.value)

0 comments on commit cced13b

Please sign in to comment.