From 695628999458206fe582001ebc3a88d2573ae189 Mon Sep 17 00:00:00 2001 From: Selim Youssry Date: Thu, 27 Jun 2024 11:42:22 +0000 Subject: [PATCH] load a services.Dataset into a Neo4J instance --- .env.template | 1 + Makefile | 6 ++ dds_glossary/neo.py | 167 +++++++++++++++++++++++++++++++++++++++ dds_glossary/services.py | 2 +- docker-compose.neo4j.yml | 14 ++++ pyproject.toml | 1 + 6 files changed, 190 insertions(+), 1 deletion(-) create mode 100644 Makefile create mode 100644 dds_glossary/neo.py create mode 100644 docker-compose.neo4j.yml diff --git a/.env.template b/.env.template index b289c85..f9f189b 100644 --- a/.env.template +++ b/.env.template @@ -5,3 +5,4 @@ POSTGRES_DB=glossary API_KEY="123456789" DATABASE_URL="postgresql+psycopg://admin:123456789@postgres:5432/glossary" SENTRY_DSN="https://…" +NEO4J_AUTH= diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..28d76fc --- /dev/null +++ b/Makefile @@ -0,0 +1,6 @@ +start-neo4j: + docker compose -f docker-compose.neo4j.yml --env-file .env up -d + +stop-neo4j: + docker compose -f docker-compose.neo4j.yml down + diff --git a/dds_glossary/neo.py b/dds_glossary/neo.py new file mode 100644 index 0000000..e57387c --- /dev/null +++ b/dds_glossary/neo.py @@ -0,0 +1,167 @@ +import logging +from typing import Any, Dict, List, Tuple + +from . import services + +import neo4j +from owlready2 import get_ontology + +logging.basicConfig(format="%(asctime)s - %(message)s", level=logging.INFO) + + +def load_services_dataset(driver: neo4j.Driver, dataset: services.Dataset, dataset_path: str, import_chunk_size: int=1000): + """ + Load a given dataset into a Neo4J instance + """ + ontology = get_ontology(dataset.url).load() + ontology.save(file=str(dataset_path), format="rdfxml") + concept_schemes_raw, concepts_raw, _, semantic_relations_raw = services.GlossaryController.parse_dataset(dataset_path) + + logging.info("Loaded the dataset in memory") + + concept_schemes = [c.to_dict() for c in concept_schemes_raw] + concepts = [c.to_dict() for c in concepts_raw] + semantic_relations = [c.to_dict() for c in semantic_relations_raw] + + # One service.Dataset has exactly one concept scheme + main_concept_scheme = concept_schemes[0] + + logging.info("Importing concept schemes...") + + # Load concept schemes + # for chunk in chunk_list(concept_schemes, import_chunk_size): + # query, args = build_nodes_import_query_and_args(["ConceptScheme"], chunk) + # driver.execute_query(query, args) + + logging.info("Imported concept schemes") + + logging.info("Importing concepts...") + + # Load concepts + # for chunk in chunk_list(concepts, import_chunk_size): + # query, args = build_nodes_import_query_and_args(["Concept"], chunk) + # driver.execute_query(query, args) + + logging.info("Imported concept schemes") + + logging.info("Adding indices...") + + # Index by the IRI + for key, label in [("ConceptScheme", "iri"), ("Concept", "iri")]: + driver.execute_query(build_index_query(key=key, label=label)) + + logging.info("Added indices...") + + logging.info("Importing concept -> concept scheme relationships...") + + # Load concept -> concept scheme relationships + for concept in concepts: + edge_type = "IsFrom" + edge = ("Concept", concept["iri"], "ConceptScheme", main_concept_scheme["iri"]) + query, args = build_edges_import_query_and_args([edge_type], [edge]) + driver.execute_query(query, args) + + logging.info("Imported concept -> concept scheme relationships") + + logging.info("Importing concept -> concept 'broader' relationships...") + + # Load concept "broader" their relationships + for semantic_relation in semantic_relations: + edge_type = semantic_relations[0]["type"] + + edge = ("Concept", semantic_relation["source_concept_iri"], "Concept", semantic_relation["target_concept_iri"]) + + query, args = build_edges_import_query_and_args([edge_type], [edge]) + driver.execute_query(query, args) + + logging.info("Imported concept -> concept 'broader' relationships") + + +def build_nodes_import_query_and_args(labels: List[str], nodes: List[Dict[str, Any]]): + """ + Bulk import nodes into Neo4J + + ## Example + + > build_nodes_import_query_and_args(["Hello", "World"], [{"a": 1, "b": 2}, {"a": 1, "c": 10}]) + ( + "MERGE (e_0:Hello:World) {a: $a_0, b: $b_0}\nMERGE (e_1:Hello:World) {a: $a_1, c: $c_1}", + {'a_0': 1, 'b_0': 2, 'a_1': 1, 'c_1': 10} + ) + """ + query_args = {} + for idx, node in enumerate(nodes): + for k, v in node.items(): + query_args[f"{k}_{idx}"] = v + + schema_keys = set() + for node in nodes: + for k in node.keys(): + schema_keys.add(k) + + node_labels_str = ':'.join(labels) + + query_rows = [] + for idx, node in enumerate(nodes): + schema_kv = [f"{k}: ${k}_{idx}" for k in node.keys()] + query_row = f"MERGE (e_{idx}:{node_labels_str} {{{', '.join(schema_kv)}}})" + query_rows.append(query_row) + + query = "\n".join(query_rows) + return query, query_args + + +def build_edges_import_query_and_args(labels: List[str], edges: List[Tuple[str, str, str, str]]): + """ + Bulk import nodes into Neo4J + + ## Example + + > build_edges_import_query_and_args(["IsFrom"], [("Concept", "def", "ConceptScheme", "abcd")]) + ( + "MATCH (src_0:Concept {iri: $iri_src_0}), (tgt_0: ConceptScheme {iri: $iri_tgt_0})\nWITH src_0, tgt_0\nMERGE (src_0)-[r_0:IsFrom]->(tgt_0)", + {'iri_src_0': 'def', 'iri_tgt_0': 'abcd'} + ) + """ + query_args = {} + for idx, edge in enumerate(edges): + _, source_iri, _, target_iri = edge + query_args[f"iri_src_{idx}"] = source_iri + query_args[f"iri_tgt_{idx}"] = target_iri + + edge_labels_str = ":".join(labels) + + matches = [] + withs = [] + merges = [] + for idx, edge in enumerate(edges): + source_label, source_iri, target_label, target_iri = edge + matches.extend([ + f"(src_{idx}:{source_label} {{iri: $iri_src_{idx}}})", + f"(tgt_{idx}:{target_label} {{iri: $iri_tgt_{idx}}})" + ]) + withs.extend([f"src_{idx}", f"tgt_{idx}"]) + merges.extend([f"(src_{idx})-[r_{idx}:{edge_labels_str}]->(tgt_{idx})"]) + + query = f""" + MATCH {', '.join(matches)} + """ + for merge in merges: + query += f"MERGE {merge}" + + return query, query_args + + +def build_index_query(label: str, key: str): + """ + Build indices for a list of keys on labels + """ + return f"CREATE INDEX {label}_{key}_index IF NOT EXISTS FOR (c:{label}) ON (c.{key})" + + +def chunk_list(lst: list, n: int): + """ + Yield successive n-sized chunks from list `lst`. + """ + for i in range(0, len(lst), n): + yield lst[i:i + n] diff --git a/dds_glossary/services.py b/dds_glossary/services.py index e90a65c..50cc100 100644 --- a/dds_glossary/services.py +++ b/dds_glossary/services.py @@ -118,8 +118,8 @@ def get_scheme_members( """ return [member for member in members if member.member_type == member_type] + @staticmethod def parse_dataset( - self, dataset_path: Path, ) -> tuple[ list[ConceptScheme], diff --git a/docker-compose.neo4j.yml b/docker-compose.neo4j.yml new file mode 100644 index 0000000..286be58 --- /dev/null +++ b/docker-compose.neo4j.yml @@ -0,0 +1,14 @@ +services: + neo4j: + image: "neo4j:5.20.0-community-bullseye" + volumes: + - neo4j_data:/data + - neo4j_logs:/logs + ports: + - 7474:7474 + - 7687:7687 + environment: + - NEO4J_AUTH=${NEO4J_AUTH} +volumes: + neo4j_data: + neo4j_logs: diff --git a/pyproject.toml b/pyproject.toml index b916a01..bd5c591 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -39,6 +39,7 @@ dependencies = [ "SQLAlchemy>=2.0.0", "sqlalchemy-utils", "uvicorn[standard]", + "neo4j" ] [project.urls]