Skip to content
This repository has been archived by the owner on Oct 1, 2024. It is now read-only.

Ability to load a services.Dataset into a Neo4J instance #76

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .env.template
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ POSTGRES_DB=glossary
API_KEY="123456789"
DATABASE_URL="postgresql+psycopg://admin:123456789@postgres:5432/glossary"
SENTRY_DSN="https://…"
NEO4J_AUTH=
6 changes: 6 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
start-neo4j:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make, but ideally would later move this to tasks.py

docker compose -f docker-compose.neo4j.yml --env-file .env up -d

stop-neo4j:
docker compose -f docker-compose.neo4j.yml down

167 changes: 167 additions & 0 deletions dds_glossary/neo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
import logging
from typing import Any, Dict, List, Tuple

from . import services

import neo4j
from owlready2 import get_ontology

logging.basicConfig(format="%(asctime)s - %(message)s", level=logging.INFO)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would probably be better to structure the log

logger = logging.getLogger(__name__)



def load_services_dataset(driver: neo4j.Driver, dataset: services.Dataset, dataset_path: str, import_chunk_size: int=1000):
"""
Load a given dataset into a Neo4J instance
"""
ontology = get_ontology(dataset.url).load()
ontology.save(file=str(dataset_path), format="rdfxml")
concept_schemes_raw, concepts_raw, _, semantic_relations_raw = services.GlossaryController.parse_dataset(dataset_path)

logging.info("Loaded the dataset in memory")

concept_schemes = [c.to_dict() for c in concept_schemes_raw]
concepts = [c.to_dict() for c in concepts_raw]
semantic_relations = [c.to_dict() for c in semantic_relations_raw]

# One service.Dataset has exactly one concept scheme
main_concept_scheme = concept_schemes[0]

logging.info("Importing concept schemes...")

# Load concept schemes
for chunk in chunk_list(concept_schemes, import_chunk_size):
query, args = build_nodes_import_query_and_args(["ConceptScheme"], chunk)
driver.execute_query(query, args)

logging.info("Imported concept schemes")

logging.info("Importing concepts...")

# Load concepts
for chunk in chunk_list(concepts, import_chunk_size):
query, args = build_nodes_import_query_and_args(["Concept"], chunk)
driver.execute_query(query, args)

logging.info("Imported concept schemes")

logging.info("Adding indices...")

# Index by the IRI
for key, label in [("ConceptScheme", "iri"), ("Concept", "iri")]:
driver.execute_query(build_index_query(key=key, label=label))

logging.info("Added indices...")

logging.info("Importing concept -> concept scheme relationships...")

# Load concept -> concept scheme relationships
for concept in concepts:
edge_type = "inScheme"
edge = ("Concept", concept["iri"], "ConceptScheme", main_concept_scheme["iri"])
query, args = build_edges_import_query_and_args([edge_type], [edge])
driver.execute_query(query, args)

logging.info("Imported concept -> concept scheme relationships")

logging.info("Importing concept -> concept 'broader' relationships...")

# Load concept "broader" their relationships
for semantic_relation in semantic_relations:
edge_type = semantic_relations[0]["type"]

edge = ("Concept", semantic_relation["source_concept_iri"], "Concept", semantic_relation["target_concept_iri"])

query, args = build_edges_import_query_and_args([edge_type], [edge])
driver.execute_query(query, args)

logging.info("Imported concept -> concept 'broader' relationships")


def build_nodes_import_query_and_args(labels: List[str], nodes: List[Dict[str, Any]]):
"""
Bulk import nodes into Neo4J

## Example

> build_nodes_import_query_and_args(["Hello", "World"], [{"a": 1, "b": 2}, {"a": 1, "c": 10}])
(
"MERGE (e_0:Hello:World) {a: $a_0, b: $b_0}\nMERGE (e_1:Hello:World) {a: $a_1, c: $c_1}",
{'a_0': 1, 'b_0': 2, 'a_1': 1, 'c_1': 10}
)
"""
query_args = {}
for idx, node in enumerate(nodes):
for k, v in node.items():
query_args[f"{k}_{idx}"] = v

schema_keys = set()
for node in nodes:
for k in node.keys():
schema_keys.add(k)

node_labels_str = ':'.join(labels)

query_rows = []
for idx, node in enumerate(nodes):
schema_kv = [f"{k}: ${k}_{idx}" for k in node.keys()]
query_row = f"MERGE (e_{idx}:{node_labels_str} {{{', '.join(schema_kv)}}})"
query_rows.append(query_row)

query = "\n".join(query_rows)
return query, query_args


def build_edges_import_query_and_args(labels: List[str], edges: List[Tuple[str, str, str, str]]):
"""
Bulk import nodes into Neo4J

## Example

> build_edges_import_query_and_args(["IsFrom"], [("Concept", "def", "ConceptScheme", "abcd")])
(
"MATCH (src_0:Concept {iri: $iri_src_0}), (tgt_0: ConceptScheme {iri: $iri_tgt_0})\nWITH src_0, tgt_0\nMERGE (src_0)-[r_0:IsFrom]->(tgt_0)",
{'iri_src_0': 'def', 'iri_tgt_0': 'abcd'}
)
"""
query_args = {}
for idx, edge in enumerate(edges):
_, source_iri, _, target_iri = edge
query_args[f"iri_src_{idx}"] = source_iri
query_args[f"iri_tgt_{idx}"] = target_iri

edge_labels_str = ":".join(labels)

matches = []
withs = []
merges = []
for idx, edge in enumerate(edges):
source_label, source_iri, target_label, target_iri = edge
matches.extend([
f"(src_{idx}:{source_label} {{iri: $iri_src_{idx}}})",
f"(tgt_{idx}:{target_label} {{iri: $iri_tgt_{idx}}})"
])
withs.extend([f"src_{idx}", f"tgt_{idx}"])
merges.extend([f"(src_{idx})-[r_{idx}:{edge_labels_str}]->(tgt_{idx})"])

query = f"""
MATCH {', '.join(matches)}
"""
for merge in merges:
query += f"MERGE {merge}"
jsvgoncalves marked this conversation as resolved.
Show resolved Hide resolved

return query, query_args


def build_index_query(label: str, key: str):
"""
Build indices for a list of keys on labels
"""
return f"CREATE INDEX {label}_{key}_index IF NOT EXISTS FOR (c:{label}) ON (c.{key})"


def chunk_list(lst: list, n: int):
"""
Yield successive n-sized chunks from list `lst`.
"""
for i in range(0, len(lst), n):
yield lst[i:i + n]
2 changes: 1 addition & 1 deletion dds_glossary/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,8 @@ def get_scheme_members(
"""
return [member for member in members if member.member_type == member_type]

@staticmethod
def parse_dataset(
self,
dataset_path: Path,
) -> tuple[
list[ConceptScheme],
Expand Down
14 changes: 14 additions & 0 deletions docker-compose.neo4j.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
services:
neo4j:
image: "neo4j:5.20.0-community-bullseye"
volumes:
- neo4j_data:/data
- neo4j_logs:/logs
ports:
- 7474:7474
- 7687:7687
environment:
- NEO4J_AUTH=${NEO4J_AUTH}
volumes:
neo4j_data:
neo4j_logs:
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ dependencies = [
"SQLAlchemy>=2.0.0",
"sqlalchemy-utils",
"uvicorn[standard]",
"neo4j"
]

[project.urls]
Expand Down