Skip to content

Commit

Permalink
Speedup database creation and log edge import progress
Browse files Browse the repository at this point in the history
  • Loading branch information
AstrorEnales committed May 24, 2024
1 parent 6f1d507 commit 5eb0230
Showing 1 changed file with 72 additions and 33 deletions.
105 changes: 72 additions & 33 deletions src/src/main/java/de/unibi/agbi/biodwh2/neo4j/server/Neo4jService.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
package de.unibi.agbi.biodwh2.neo4j.server;

import de.unibi.agbi.biodwh2.core.collections.BatchIterable;
import de.unibi.agbi.biodwh2.core.collections.Tuple2;
import de.unibi.agbi.biodwh2.core.io.mvstore.MVStoreCollection;
import de.unibi.agbi.biodwh2.core.io.mvstore.MVStoreModel;
import de.unibi.agbi.biodwh2.core.model.graph.Edge;
import de.unibi.agbi.biodwh2.core.model.graph.Graph;
import de.unibi.agbi.biodwh2.core.model.graph.IndexDescription;
Expand All @@ -24,10 +28,11 @@
import org.neo4j.kernel.internal.GraphDatabaseAPI;

import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.*;
import java.util.function.Consumer;

class Neo4jService {
private static final Logger LOGGER = LogManager.getLogger(Neo4jService.class);
Expand All @@ -41,20 +46,34 @@ class Neo4jService {
private final String importPath;
private DatabaseManagementService managementService;
private GraphDatabaseService dbService;
private Method getOrCreateNodeRepository;
private Method getOrCreateEdgeRepository;

public Neo4jService(final String workspacePath) {
this.workspacePath = workspacePath;
neo4jPath = Paths.get(workspacePath, "neo4j").toString();
databasePath = Paths.get(neo4jPath, "neo4j.db");
importPath = Paths.get(neo4jPath, "import").toString();

try {
getOrCreateNodeRepository = Graph.class.getSuperclass().getDeclaredMethod("getOrCreateNodeRepository",
String.class);
getOrCreateNodeRepository.setAccessible(true);
getOrCreateEdgeRepository = Graph.class.getSuperclass().getDeclaredMethod("getOrCreateEdgeRepository",
String.class);
getOrCreateEdgeRepository.setAccessible(true);
} catch (NoSuchMethodException ignored) {
getOrCreateNodeRepository = null;
getOrCreateEdgeRepository = null;
}
}

public void startNeo4jService(final Integer boltPort) {
Paths.get(neo4jPath).toFile().mkdir();
Paths.get(importPath).toFile().mkdir();
var boltListenAddress = new SocketAddress("0.0.0.0", boltPort == null ? 8083 : boltPort);
if (LOGGER.isInfoEnabled())
LOGGER.info("Starting Neo4j DBMS on bolt://" + boltListenAddress + "...");
LOGGER.info("Starting Neo4j DBMS on bolt://{}...", boltListenAddress);
final var builder = new DatabaseManagementServiceBuilder(databasePath);
builder.setConfig(GraphDatabaseSettings.pagecache_memory, 512 * 1024L);
builder.setConfig(HttpConnector.enabled, false);
Expand Down Expand Up @@ -106,7 +125,7 @@ public void deleteOldDatabase() {
FileUtils.deleteQuietly(Paths.get(neo4jPath, "store_lock").toFile());
} catch (IOException e) {
if (LOGGER.isErrorEnabled())
LOGGER.error("Failed to remove old database '" + neo4jPath + "'", e);
LOGGER.error("Failed to remove old database '{}'", neo4jPath, e);
}
}

Expand All @@ -123,7 +142,7 @@ public void createDatabase() {
createNeo4jIndices(graph);
} catch (Exception e) {
if (LOGGER.isErrorEnabled())
LOGGER.error("Failed to create neo4j database '" + databasePath + "'", e);
LOGGER.error("Failed to create neo4j database '{}'", databasePath, e);
}
}

Expand All @@ -132,10 +151,11 @@ private HashMap<Long, String> createNeo4jNodes(final Graph graph) {
final String[] labels = graph.getNodeLabels();
for (int i = 0; i < labels.length; i++) {
if (LOGGER.isInfoEnabled())
LOGGER.info("Creating nodes with label '" + labels[i] + "' (" + (i + 1) + "/" + labels.length + ")...");
batchIterate(graph.getNodes(labels[i]), nodes -> {
LOGGER.info("Creating nodes with label '{}' ({}/{})...", labels[i], i + 1, labels.length);
final var nodes = getNodes(graph, labels[i]);
while (nodes.getFirst().hasNext()) {
try (Transaction tx = dbService.beginTx()) {
for (final Node node : nodes) {
for (final Node node : nodes.getSecond()) {
final org.neo4j.graphdb.Node neo4jNode = tx.createNode();
for (final String propertyKey : node.keySet())
setPropertySafe(node, neo4jNode, propertyKey);
Expand All @@ -144,27 +164,42 @@ private HashMap<Long, String> createNeo4jNodes(final Graph graph) {
}
tx.commit();
}
});
}
}
return nodeIdNeo4jIdMap;
}

private <T> void batchIterate(final Iterable<T> iterable, final Consumer<List<T>> consumer) {
final var currentBatch = new ArrayList<T>();
for (T element : iterable) {
currentBatch.add(element);
if (currentBatch.size() == 1000) {
consumer.accept(currentBatch);
currentBatch.clear();
private Tuple2<Iterator<Node>, BatchIterable<Node>> getNodes(final Graph graph, final String label) {
if (getOrCreateNodeRepository != null) {
try {
//noinspection unchecked
final var collection = (MVStoreCollection<Node>) getOrCreateNodeRepository.invoke(graph, label);
final var iterator = collection.unsafeIterator();
return new Tuple2<>(iterator, new BatchIterable<>(iterator));
} catch (IllegalAccessException | InvocationTargetException ignored) {
}
}
final var iterator = graph.getNodes(label).iterator();
return new Tuple2<>(iterator, new BatchIterable<>(iterator));
}

private Tuple2<Iterator<Edge>, BatchIterable<Edge>> getEdges(final Graph graph, final String label) {
if (getOrCreateEdgeRepository != null) {
try {
//noinspection unchecked
final var collection = (MVStoreCollection<Edge>) getOrCreateEdgeRepository.invoke(graph, label);
final var iterator = collection.unsafeIterator();
return new Tuple2<>(iterator, new BatchIterable<>(iterator));
} catch (IllegalAccessException | InvocationTargetException ignored) {
}
}
if (!currentBatch.isEmpty())
consumer.accept(currentBatch);
final var iterator = graph.getEdges(label).iterator();
return new Tuple2<>(iterator, new BatchIterable<>(iterator));
}

private void setPropertySafe(final Node node, final org.neo4j.graphdb.Node neo4jNode, final String propertyKey) {
try {
if (!Node.IGNORED_FIELDS.contains(propertyKey)) {
if (MVStoreModel.ID_FIELD.equals(propertyKey) || !Node.IGNORED_FIELDS.contains(propertyKey)) {
Object value = node.getProperty(propertyKey);
if (value instanceof Integer[] array)
for (int i = 0; i < array.length; i++)
Expand All @@ -181,9 +216,8 @@ private void setPropertySafe(final Node node, final org.neo4j.graphdb.Node neo4j
}
} catch (IllegalArgumentException e) {
if (LOGGER.isWarnEnabled())
LOGGER.warn(
"Illegal property '" + propertyKey + "' -> '" + node.getProperty(propertyKey) + "' for node '" +
node.getId() + "[:" + node.getLabel() + "]'", e);
LOGGER.warn("Illegal property '{}' -> '{}' for node '{}[:{}]'", propertyKey,
node.getProperty(propertyKey), node.getId(), node.getLabel(), e);
}
}

Expand Down Expand Up @@ -221,28 +255,33 @@ private void createNeo4jEdges(final Graph graph, final HashMap<Long, String> nod
final String[] labels = graph.getEdgeLabels();
for (int i = 0; i < labels.length; i++) {
if (LOGGER.isInfoEnabled())
LOGGER.info("Creating edges with label '" + labels[i] + "' (" + (i + 1) + "/" + labels.length + ")...");
batchIterate(graph.getEdges(labels[i]), edges -> {
try (Transaction tx = dbService.beginTx()) {
for (final Edge edge : edges) {
LOGGER.info("Creating edges with label '{}' ({}/{})...", labels[i], i + 1, labels.length);
final long edgeCount = graph.getNumberOfEdges(labels[i]);
final long[] counter = {0};
final var edges = getEdges(graph, labels[i]);
while (edges.getFirst().hasNext()) {
try (final Transaction tx = dbService.beginTx()) {
for (final Edge edge : edges.getSecond()) {
final RelationshipType relationshipType = RelationshipType.withName(edge.getLabel());
final org.neo4j.graphdb.Node fromNode = tx.getNodeByElementId(
nodeIdNeo4jIdMap.get(edge.getFromId()));
final org.neo4j.graphdb.Node toNode = tx.getNodeByElementId(
nodeIdNeo4jIdMap.get(edge.getToId()));
final var fromNode = tx.getNodeByElementId(nodeIdNeo4jIdMap.get(edge.getFromId()));
final var toNode = tx.getNodeByElementId(nodeIdNeo4jIdMap.get(edge.getToId()));
final Relationship relationship = fromNode.createRelationshipTo(toNode, relationshipType);
for (final String propertyKey : edge.keySet())
if (!Edge.IGNORED_FIELDS.contains(propertyKey)) {
if (MVStoreModel.ID_FIELD.equals(propertyKey) || !Edge.IGNORED_FIELDS.contains(
propertyKey)) {
Object value = edge.getProperty(propertyKey);
if (value instanceof Collection)
value = convertCollectionToArray((Collection<?>) value);
if (value != null)
relationship.setProperty(propertyKey, value);
}
counter[0]++;
if (counter[0] % 100_000 == 0)
LOGGER.info("\tProgress: {}/{}...", counter[0], edgeCount);
}
tx.commit();
}
});
}
}
}

Expand All @@ -254,8 +293,8 @@ private void createNeo4jIndices(final Graph graph) {
final Schema schema = tx.schema();
for (final IndexDescription index : indices) {
if (LOGGER.isInfoEnabled())
LOGGER.info("Creating " + index.getType() + " index on '" + index.getProperty() + "' field for " +
index.getTarget() + " label '" + index.getLabel() + "'...");
LOGGER.info("Creating {} index on '{}' field for {} label '{}'...", index.getType(),
index.getProperty(), index.getTarget(), index.getLabel());
final Label label = Label.label(index.getLabel());
if (index.getTarget() == IndexDescription.Target.NODE) {
if (index.getType() == IndexDescription.Type.UNIQUE)
Expand Down

0 comments on commit 5eb0230

Please sign in to comment.