diff --git a/src/neo4j_graphrag/experimental/pipeline/config/parser.py b/src/neo4j_graphrag/experimental/pipeline/config/parser.py deleted file mode 100644 index b042e860..00000000 --- a/src/neo4j_graphrag/experimental/pipeline/config/parser.py +++ /dev/null @@ -1,199 +0,0 @@ -# Copyright (c) "Neo4j" -# Neo4j Sweden AB [https://neo4j.com] -# # -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# # -# https://www.apache.org/licenses/LICENSE-2.0 -# # -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import importlib -from pathlib import Path -from typing import Any, Optional, Union - -import neo4j - -from neo4j_graphrag.experimental.pipeline.config.param_resolvers import PARAM_RESOLVERS -from neo4j_graphrag.experimental.pipeline.config.reader import ConfigReader -from neo4j_graphrag.experimental.pipeline.config.types import ( - ClassConfig, - DriverConfig, - ParamConfig, - ParamToResolveConfig, - SimpleKGPipelineConfig, -) -from neo4j_graphrag.experimental.pipeline.kg_builder import ( - SimpleKGPipeline, - SimpleKGPipelineModel, -) - - -class SimpleKGPipelineConfigParser: - """Parses a config object into the required objects - to instantiate a SimpleKGPipeline. For instance, config - contains LLM information in this format: - - .. code-block:: json - - { - "llm_config": { - "name_": "openai", - "class_": "OpenAILLM", - "params_": { - "api_key": { - "resolver_": "ENV", - "var_": "OPENAI_API_KEY" - }, - "model_name": "gpt-4o" - } - } - - That must be parsed into an OpenAILLM instance with parameters - `model_name` and resolved `api_key`. - - Args: - config (SimpleKGPipelineConfig): An instance of the SimpleKGPipelineConfig. - """ - - def __init__(self, config: SimpleKGPipelineConfig) -> None: - self.config = config - - def _get_class(self, klass_path: str, optional_module: Optional[str] = None) -> Any: - """Get class type from string and an optional module""" - *modules, klass_name = klass_path.rsplit(".", 1) - module_name = modules[0] if modules else optional_module - if module_name is None: - raise ValueError("Must specify a module to import class from") - try: - module = importlib.import_module(module_name) - klass = getattr(module, klass_name) - except (ImportError, AttributeError): - if optional_module and module_name != optional_module: - full_klass_path = optional_module + "." + klass_path - return self._get_class(full_klass_path) - raise ValueError(f"Could not find {klass_name} in {module_name}") - return klass - - def _parse_neo4j_config(self, neo4j_config: DriverConfig) -> neo4j.Driver: - driver_init_params = { - "uri": self._resolve_param(neo4j_config.uri), - "user": self._resolve_param(neo4j_config.user), - "password": self._resolve_param(neo4j_config.password), - } - # note: we could add a "class" parameter in the config to support async - # driver in the future. For now, since it's not supported anywhere in - # the pipeline, we're assuming sync driver is needed - driver = neo4j.GraphDatabase.driver( - driver_init_params.pop("uri"), - auth=( - driver_init_params.pop("user"), - driver_init_params.pop("password"), - ), - **driver_init_params, - ) - # Note: driver connectivity checks are delegated to the classed using it - return driver - - def _get_object( - self, config: ClassConfig, optional_module: str | None = None - ) -> Any: - """Instantiates an object from a ClassConfig. This includes: - - Finding and importing the proper class - - Resolving all input parameters - - Creating an instance of the class - """ - klass_name = config.class_ - try: - klass = self._get_class( - klass_name, - optional_module=optional_module, - ) - except ValueError: - raise ValueError(f"Class '{klass_name}' not found in '{optional_module}'") - init_params = {} - for key, param in config.params_.items(): - init_params[key] = self._resolve_param(param) - embedder = klass(**init_params) - return embedder - - def _resolve_param(self, param: ParamConfig) -> Union[float, str, dict[str, Any]]: - """Resolve parameter""" - if not isinstance(param, ParamToResolveConfig): - return param - resolver_name = param.resolver_ - if resolver_name not in PARAM_RESOLVERS: - raise ValueError( - f"Resolver {resolver_name} not found in {PARAM_RESOLVERS.keys()}" - ) - resolver_klass = PARAM_RESOLVERS[resolver_name] - resolver = resolver_klass() - return resolver.resolve(param) - - def _parse_config(self) -> SimpleKGPipelineModel: - return SimpleKGPipelineModel( - driver=self._parse_neo4j_config(self.config.neo4j_config), - llm=self._get_object( - self.config.llm_config, optional_module="neo4j_graphrag.llm" - ), - embedder=self._get_object( - self.config.embedder_config, optional_module="neo4j_graphrag.embeddings" - ), - from_pdf=self.config.from_pdf, - entities=list(self.config.entities) if self.config.entities else [], - relations=list(self.config.relations) if self.config.relations else [], - potential_schema=list(self.config.potential_schema) - if self.config.potential_schema - else [], - pdf_loader=self._get_object( - self.config.pdf_loader, - optional_module="neo4j_graphrag.experimental.components.pdf_loader", - ) - if self.config.pdf_loader - else None, - text_splitter=self._get_object( - self.config.text_splitter, - optional_module="neo4j_graphrag.experimental.components.text_splitters", - ) - if self.config.text_splitter - else None, - kg_writer=self._get_object( - self.config.kg_writer, - optional_module="neo4j_graphrag.experimental.components.kg_writer", - ) - if self.config.kg_writer - else None, - on_error=self.config.on_error, - prompt_template=self.config.prompt_template, - perform_entity_resolution=self.config.perform_entity_resolution, - lexical_graph_config=self.config.lexical_graph_config, - neo4j_database=self.config.neo4j_database, - ) - - def parse(self) -> SimpleKGPipeline: - """Parse a config object into a SimpleKGPipeline instance""" - input_model = self._parse_config() - return SimpleKGPipeline(**input_model.model_dump()) - - -class SimpleKGPipelineBuilder: - @classmethod - def from_config_file(cls, file_path: Union[str, Path]) -> SimpleKGPipeline: - """Instantiates a SimpleKGPipeline from a config file - (JSON or YAML format). - - - SimpleKGPipelineConfig: validates the input format - - SimpleKGPipelineConfigParser: parse the config into a SimpleKGPipeline instance - """ - if not isinstance(file_path, Path): - file_path = Path(file_path) - reader = ConfigReader() - config_dict = reader.read(file_path) - config = SimpleKGPipelineConfig(**config_dict) - parser = SimpleKGPipelineConfigParser(config) - return parser.parse() diff --git a/src/neo4j_graphrag/experimental/pipeline/config/types.py b/src/neo4j_graphrag/experimental/pipeline/config/types.py index a1ee2c1b..276f8030 100644 --- a/src/neo4j_graphrag/experimental/pipeline/config/types.py +++ b/src/neo4j_graphrag/experimental/pipeline/config/types.py @@ -14,22 +14,13 @@ # limitations under the License. import enum -from typing import Any, Literal, Optional, Sequence, Union +from typing import Any, Literal, Union -from pydantic import BaseModel, ConfigDict - -from neo4j_graphrag.experimental.components.entity_relation_extractor import OnError -from neo4j_graphrag.experimental.components.types import LexicalGraphConfig -from neo4j_graphrag.experimental.pipeline.types import ( - EntityInputType, - RelationInputType, -) -from neo4j_graphrag.generation.prompts import ERExtractionTemplate +from pydantic import BaseModel class ParamResolverEnum(str, enum.Enum): ENV = "ENV" - CONFIG_ARRAY = "CONFIG_ARRAY" CONFIG_KEY = "CONFIG_KEY" @@ -54,43 +45,3 @@ class ParamFromKeyConfig(ParamToResolveConfig): ParamFromKeyConfig, dict[str, Any], ] - - -class BasePipelineV1Config(BaseModel): - version_: Literal["1"] = "1" - - -class DriverConfig(BaseModel): - uri: ParamConfig - user: ParamConfig - password: ParamConfig - - -class ClassConfig(BaseModel): - class_: str - params_: dict[str, ParamConfig] - - -class SimpleKGPipelineExposedParamConfig(BaseModel): - from_pdf: bool = False - potential_schema: Optional[list[tuple[str, str, str]]] = None - on_error: OnError = OnError.IGNORE - prompt_template: Union[ERExtractionTemplate, str] = ERExtractionTemplate() - perform_entity_resolution: bool = True - lexical_graph_config: Optional[LexicalGraphConfig] = None - neo4j_database: Optional[str] = None - - model_config = ConfigDict( - arbitrary_types_allowed=True, - ) - - -class SimpleKGPipelineConfig(BasePipelineV1Config, SimpleKGPipelineExposedParamConfig): - neo4j_config: DriverConfig - llm_config: ClassConfig - embedder_config: ClassConfig - pdf_loader: ClassConfig | None = None - text_splitter: ClassConfig | None = None - kg_writer: ClassConfig | None = None - entities: Optional[Sequence[EntityInputType]] = None - relations: Optional[Sequence[RelationInputType]] = None diff --git a/src/neo4j_graphrag/experimental/pipeline/kg_builder.py b/src/neo4j_graphrag/experimental/pipeline/kg_builder.py index 2a086b92..3f8db254 100644 --- a/src/neo4j_graphrag/experimental/pipeline/kg_builder.py +++ b/src/neo4j_graphrag/experimental/pipeline/kg_builder.py @@ -18,34 +18,18 @@ from typing import Any, List, Optional, Sequence, Union import neo4j -from pydantic import ConfigDict, Field, field_validator from neo4j_graphrag.embeddings import Embedder -from neo4j_graphrag.experimental.components.embedder import TextChunkEmbedder -from neo4j_graphrag.experimental.components.entity_relation_extractor import ( - LLMEntityRelationExtractor, - OnError, -) -from neo4j_graphrag.experimental.components.kg_writer import KGWriter, Neo4jWriter -from neo4j_graphrag.experimental.components.pdf_loader import PdfLoader -from neo4j_graphrag.experimental.components.resolver import ( - SinglePropertyExactMatchResolver, -) from neo4j_graphrag.experimental.components.schema import ( - SchemaBuilder, SchemaEntity, SchemaRelation, ) -from neo4j_graphrag.experimental.components.text_splitters.base import TextSplitter -from neo4j_graphrag.experimental.components.text_splitters.fixed_size_splitter import ( - FixedSizeSplitter, -) from neo4j_graphrag.experimental.components.types import LexicalGraphConfig -from neo4j_graphrag.experimental.pipeline.config.types import ( - SimpleKGPipelineExposedParamConfig, +from neo4j_graphrag.experimental.pipeline.config.config_poc import ( + PipelineRunner, + SimpleKGPipelineConfig, ) -from neo4j_graphrag.experimental.pipeline.exceptions import PipelineDefinitionError -from neo4j_graphrag.experimental.pipeline.pipeline import Pipeline, PipelineResult +from neo4j_graphrag.experimental.pipeline.pipeline import PipelineResult from neo4j_graphrag.experimental.pipeline.types import ( EntityInputType, RelationInputType, @@ -54,31 +38,6 @@ from neo4j_graphrag.llm.base import LLMInterface -class SimpleKGPipelineModel(SimpleKGPipelineExposedParamConfig): - llm: LLMInterface - driver: neo4j.Driver - embedder: Embedder - pdf_loader: PdfLoader | None = None - kg_writer: KGWriter | None = None - text_splitter: TextSplitter | None = None - entities: list[SchemaEntity] = Field(default_factory=list) - relations: list[SchemaRelation] = Field(default_factory=list) - - model_config = ConfigDict(arbitrary_types_allowed=True) - - @field_validator("entities", mode="before") - @classmethod - def validate_entities(cls, entities: list[SchemaEntity]) -> list[SchemaEntity]: - return [SchemaEntity.from_text_or_dict(e) for e in entities] - - @field_validator("relations", mode="before") - @classmethod - def validate_relations( - cls, relations: list[SchemaRelation] - ) -> list[SchemaRelation]: - return [SchemaRelation.from_text_or_dict(r) for r in relations] - - class SimpleKGPipeline: """ A class to simplify the process of building a knowledge graph from text documents. @@ -129,121 +88,25 @@ def __init__( lexical_graph_config: Optional[LexicalGraphConfig] = None, neo4j_database: Optional[str] = None, ): - self.potential_schema = potential_schema or [] - self.entities = entities or [] - self.relations = relations or [] - - try: - on_error_enum = OnError(on_error) - except ValueError: - raise PipelineDefinitionError( - f"Invalid value for on_error: {on_error}. Expected one of {OnError.possible_values()}." - ) - - config = SimpleKGPipelineModel( - llm=llm, - driver=driver, - entities=self.entities, - relations=self.relations, - potential_schema=self.potential_schema, + config = SimpleKGPipelineConfig( + llm_config=llm, + neo4j_config=driver, + embedder_config=embedder, + entities=entities or [], + relations=relations or [], + potential_schema=potential_schema, from_pdf=from_pdf, pdf_loader=pdf_loader, kg_writer=kg_writer, text_splitter=text_splitter, - on_error=on_error_enum, + on_error=on_error, prompt_template=prompt_template, embedder=embedder, perform_entity_resolution=perform_entity_resolution, lexical_graph_config=lexical_graph_config, neo4j_database=neo4j_database, ) - - self.from_pdf = config.from_pdf - self.llm = config.llm - self.driver = config.driver - self.embedder = config.embedder - self.text_splitter = config.text_splitter or FixedSizeSplitter() - self.on_error = config.on_error - self.pdf_loader = config.pdf_loader if pdf_loader is not None else PdfLoader() - self.kg_writer = ( - config.kg_writer - if kg_writer is not None - else Neo4jWriter(driver, neo4j_database=config.neo4j_database) - ) - self.prompt_template = config.prompt_template - self.perform_entity_resolution = config.perform_entity_resolution - self.lexical_graph_config = config.lexical_graph_config - self.neo4j_database = config.neo4j_database - - self.pipeline = self._build_pipeline() - - def _build_pipeline(self) -> Pipeline: - pipe = Pipeline() - - pipe.add_component(self.text_splitter, "splitter") - pipe.add_component(SchemaBuilder(), "schema") - pipe.add_component( - LLMEntityRelationExtractor( - llm=self.llm, - on_error=self.on_error, - prompt_template=self.prompt_template, - ), - "extractor", - ) - pipe.add_component(TextChunkEmbedder(embedder=self.embedder), "chunk_embedder") - pipe.add_component(self.kg_writer, "writer") - - if self.from_pdf: - pipe.add_component(self.pdf_loader, "pdf_loader") - - pipe.connect( - "pdf_loader", - "splitter", - input_config={"text": "pdf_loader.text"}, - ) - - pipe.connect( - "schema", - "extractor", - input_config={ - "schema": "schema", - "document_info": "pdf_loader.document_info", - }, - ) - else: - pipe.connect( - "schema", - "extractor", - input_config={ - "schema": "schema", - }, - ) - - pipe.connect( - "splitter", "chunk_embedder", input_config={"text_chunks": "splitter"} - ) - - pipe.connect( - "chunk_embedder", "extractor", input_config={"chunks": "chunk_embedder"} - ) - - # Connect extractor to writer - pipe.connect( - "extractor", - "writer", - input_config={"graph": "extractor"}, - ) - - if self.perform_entity_resolution: - pipe.add_component( - SinglePropertyExactMatchResolver( - self.driver, neo4j_database=self.neo4j_database - ), - "resolver", - ) - pipe.connect("writer", "resolver", {}) - - return pipe + self.runner = PipelineRunner.from_config(config) async def run_async( self, file_path: Optional[str] = None, text: Optional[str] = None @@ -258,39 +121,4 @@ async def run_async( Returns: PipelineResult: The result of the pipeline execution. """ - pipe_inputs = self._prepare_inputs(file_path=file_path, text=text) - return await self.pipeline.run(pipe_inputs) - - def _prepare_inputs( - self, file_path: Optional[str], text: Optional[str] - ) -> dict[str, Any]: - if self.from_pdf: - if file_path is None or text is not None: - raise PipelineDefinitionError( - "Expected 'file_path' argument when 'from_pdf' is True." - ) - else: - if text is None or file_path is not None: - raise PipelineDefinitionError( - "Expected 'text' argument when 'from_pdf' is False." - ) - - pipe_inputs: dict[str, Any] = { - "schema": { - "entities": self.entities, - "relations": self.relations, - "potential_schema": self.potential_schema, - }, - } - - if self.from_pdf: - pipe_inputs["pdf_loader"] = {"filepath": file_path} - else: - pipe_inputs["splitter"] = {"text": text} - - if self.lexical_graph_config: - pipe_inputs["extractor"] = { - "lexical_graph_config": self.lexical_graph_config - } - - return pipe_inputs + return await self.runner.run({"file_path": file_path, "text": text})