diff --git a/src/src/main/java/de/unibi/agbi/biodwh2/neo4j/server/Neo4jService.java b/src/src/main/java/de/unibi/agbi/biodwh2/neo4j/server/Neo4jService.java index 2299041..67c4b80 100644 --- a/src/src/main/java/de/unibi/agbi/biodwh2/neo4j/server/Neo4jService.java +++ b/src/src/main/java/de/unibi/agbi/biodwh2/neo4j/server/Neo4jService.java @@ -1,5 +1,9 @@ package de.unibi.agbi.biodwh2.neo4j.server; +import de.unibi.agbi.biodwh2.core.collections.BatchIterable; +import de.unibi.agbi.biodwh2.core.collections.Tuple2; +import de.unibi.agbi.biodwh2.core.io.mvstore.MVStoreCollection; +import de.unibi.agbi.biodwh2.core.io.mvstore.MVStoreModel; import de.unibi.agbi.biodwh2.core.model.graph.Edge; import de.unibi.agbi.biodwh2.core.model.graph.Graph; import de.unibi.agbi.biodwh2.core.model.graph.IndexDescription; @@ -24,10 +28,11 @@ import org.neo4j.kernel.internal.GraphDatabaseAPI; import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; import java.nio.file.Path; import java.nio.file.Paths; import java.util.*; -import java.util.function.Consumer; class Neo4jService { private static final Logger LOGGER = LogManager.getLogger(Neo4jService.class); @@ -41,12 +46,26 @@ class Neo4jService { private final String importPath; private DatabaseManagementService managementService; private GraphDatabaseService dbService; + private Method getOrCreateNodeRepository; + private Method getOrCreateEdgeRepository; public Neo4jService(final String workspacePath) { this.workspacePath = workspacePath; neo4jPath = Paths.get(workspacePath, "neo4j").toString(); databasePath = Paths.get(neo4jPath, "neo4j.db"); importPath = Paths.get(neo4jPath, "import").toString(); + + try { + getOrCreateNodeRepository = Graph.class.getSuperclass().getDeclaredMethod("getOrCreateNodeRepository", + String.class); + getOrCreateNodeRepository.setAccessible(true); + getOrCreateEdgeRepository = Graph.class.getSuperclass().getDeclaredMethod("getOrCreateEdgeRepository", + String.class); + getOrCreateEdgeRepository.setAccessible(true); + } catch (NoSuchMethodException ignored) { + getOrCreateNodeRepository = null; + getOrCreateEdgeRepository = null; + } } public void startNeo4jService(final Integer boltPort) { @@ -54,7 +73,7 @@ public void startNeo4jService(final Integer boltPort) { Paths.get(importPath).toFile().mkdir(); var boltListenAddress = new SocketAddress("0.0.0.0", boltPort == null ? 8083 : boltPort); if (LOGGER.isInfoEnabled()) - LOGGER.info("Starting Neo4j DBMS on bolt://" + boltListenAddress + "..."); + LOGGER.info("Starting Neo4j DBMS on bolt://{}...", boltListenAddress); final var builder = new DatabaseManagementServiceBuilder(databasePath); builder.setConfig(GraphDatabaseSettings.pagecache_memory, 512 * 1024L); builder.setConfig(HttpConnector.enabled, false); @@ -106,7 +125,7 @@ public void deleteOldDatabase() { FileUtils.deleteQuietly(Paths.get(neo4jPath, "store_lock").toFile()); } catch (IOException e) { if (LOGGER.isErrorEnabled()) - LOGGER.error("Failed to remove old database '" + neo4jPath + "'", e); + LOGGER.error("Failed to remove old database '{}'", neo4jPath, e); } } @@ -123,7 +142,7 @@ public void createDatabase() { createNeo4jIndices(graph); } catch (Exception e) { if (LOGGER.isErrorEnabled()) - LOGGER.error("Failed to create neo4j database '" + databasePath + "'", e); + LOGGER.error("Failed to create neo4j database '{}'", databasePath, e); } } @@ -132,10 +151,11 @@ private HashMap createNeo4jNodes(final Graph graph) { final String[] labels = graph.getNodeLabels(); for (int i = 0; i < labels.length; i++) { if (LOGGER.isInfoEnabled()) - LOGGER.info("Creating nodes with label '" + labels[i] + "' (" + (i + 1) + "/" + labels.length + ")..."); - batchIterate(graph.getNodes(labels[i]), nodes -> { + LOGGER.info("Creating nodes with label '{}' ({}/{})...", labels[i], i + 1, labels.length); + final var nodes = getNodes(graph, labels[i]); + while (nodes.getFirst().hasNext()) { try (Transaction tx = dbService.beginTx()) { - for (final Node node : nodes) { + for (final Node node : nodes.getSecond()) { final org.neo4j.graphdb.Node neo4jNode = tx.createNode(); for (final String propertyKey : node.keySet()) setPropertySafe(node, neo4jNode, propertyKey); @@ -144,27 +164,42 @@ private HashMap createNeo4jNodes(final Graph graph) { } tx.commit(); } - }); + } } return nodeIdNeo4jIdMap; } - private void batchIterate(final Iterable iterable, final Consumer> consumer) { - final var currentBatch = new ArrayList(); - for (T element : iterable) { - currentBatch.add(element); - if (currentBatch.size() == 1000) { - consumer.accept(currentBatch); - currentBatch.clear(); + private Tuple2, BatchIterable> getNodes(final Graph graph, final String label) { + if (getOrCreateNodeRepository != null) { + try { + //noinspection unchecked + final var collection = (MVStoreCollection) getOrCreateNodeRepository.invoke(graph, label); + final var iterator = collection.unsafeIterator(); + return new Tuple2<>(iterator, new BatchIterable<>(iterator)); + } catch (IllegalAccessException | InvocationTargetException ignored) { + } + } + final var iterator = graph.getNodes(label).iterator(); + return new Tuple2<>(iterator, new BatchIterable<>(iterator)); + } + + private Tuple2, BatchIterable> getEdges(final Graph graph, final String label) { + if (getOrCreateEdgeRepository != null) { + try { + //noinspection unchecked + final var collection = (MVStoreCollection) getOrCreateEdgeRepository.invoke(graph, label); + final var iterator = collection.unsafeIterator(); + return new Tuple2<>(iterator, new BatchIterable<>(iterator)); + } catch (IllegalAccessException | InvocationTargetException ignored) { } } - if (!currentBatch.isEmpty()) - consumer.accept(currentBatch); + final var iterator = graph.getEdges(label).iterator(); + return new Tuple2<>(iterator, new BatchIterable<>(iterator)); } private void setPropertySafe(final Node node, final org.neo4j.graphdb.Node neo4jNode, final String propertyKey) { try { - if (!Node.IGNORED_FIELDS.contains(propertyKey)) { + if (MVStoreModel.ID_FIELD.equals(propertyKey) || !Node.IGNORED_FIELDS.contains(propertyKey)) { Object value = node.getProperty(propertyKey); if (value instanceof Integer[] array) for (int i = 0; i < array.length; i++) @@ -181,9 +216,8 @@ private void setPropertySafe(final Node node, final org.neo4j.graphdb.Node neo4j } } catch (IllegalArgumentException e) { if (LOGGER.isWarnEnabled()) - LOGGER.warn( - "Illegal property '" + propertyKey + "' -> '" + node.getProperty(propertyKey) + "' for node '" + - node.getId() + "[:" + node.getLabel() + "]'", e); + LOGGER.warn("Illegal property '{}' -> '{}' for node '{}[:{}]'", propertyKey, + node.getProperty(propertyKey), node.getId(), node.getLabel(), e); } } @@ -221,28 +255,33 @@ private void createNeo4jEdges(final Graph graph, final HashMap nod final String[] labels = graph.getEdgeLabels(); for (int i = 0; i < labels.length; i++) { if (LOGGER.isInfoEnabled()) - LOGGER.info("Creating edges with label '" + labels[i] + "' (" + (i + 1) + "/" + labels.length + ")..."); - batchIterate(graph.getEdges(labels[i]), edges -> { - try (Transaction tx = dbService.beginTx()) { - for (final Edge edge : edges) { + LOGGER.info("Creating edges with label '{}' ({}/{})...", labels[i], i + 1, labels.length); + final long edgeCount = graph.getNumberOfEdges(labels[i]); + final long[] counter = {0}; + final var edges = getEdges(graph, labels[i]); + while (edges.getFirst().hasNext()) { + try (final Transaction tx = dbService.beginTx()) { + for (final Edge edge : edges.getSecond()) { final RelationshipType relationshipType = RelationshipType.withName(edge.getLabel()); - final org.neo4j.graphdb.Node fromNode = tx.getNodeByElementId( - nodeIdNeo4jIdMap.get(edge.getFromId())); - final org.neo4j.graphdb.Node toNode = tx.getNodeByElementId( - nodeIdNeo4jIdMap.get(edge.getToId())); + final var fromNode = tx.getNodeByElementId(nodeIdNeo4jIdMap.get(edge.getFromId())); + final var toNode = tx.getNodeByElementId(nodeIdNeo4jIdMap.get(edge.getToId())); final Relationship relationship = fromNode.createRelationshipTo(toNode, relationshipType); for (final String propertyKey : edge.keySet()) - if (!Edge.IGNORED_FIELDS.contains(propertyKey)) { + if (MVStoreModel.ID_FIELD.equals(propertyKey) || !Edge.IGNORED_FIELDS.contains( + propertyKey)) { Object value = edge.getProperty(propertyKey); if (value instanceof Collection) value = convertCollectionToArray((Collection) value); if (value != null) relationship.setProperty(propertyKey, value); } + counter[0]++; + if (counter[0] % 100_000 == 0) + LOGGER.info("\tProgress: {}/{}...", counter[0], edgeCount); } tx.commit(); } - }); + } } } @@ -254,8 +293,8 @@ private void createNeo4jIndices(final Graph graph) { final Schema schema = tx.schema(); for (final IndexDescription index : indices) { if (LOGGER.isInfoEnabled()) - LOGGER.info("Creating " + index.getType() + " index on '" + index.getProperty() + "' field for " + - index.getTarget() + " label '" + index.getLabel() + "'..."); + LOGGER.info("Creating {} index on '{}' field for {} label '{}'...", index.getType(), + index.getProperty(), index.getTarget(), index.getLabel()); final Label label = Label.label(index.getLabel()); if (index.getTarget() == IndexDescription.Target.NODE) { if (index.getType() == IndexDescription.Type.UNIQUE)