Facing java.lang.OutOfMemoryError: Java heap space issue when using multi-threading with JanusGraph #4174
Unanswered
Prabha2025
asked this question in
Q&A
Replies: 1 comment 1 reply
-
You could make your
|
Beta Was this translation helpful? Give feedback.
1 reply
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
-
I am currently working on data ingestion into JanusGraph using a multi-threading approach. In my use case, I am reading data from Kafka streams, and the goal is to ingest data into a graph based on the client name. The data is client-specific, and there are separate graphs for different clients. Each thread is responsible for processing and ingesting one event at a time. However, there is a scenario where multiple events with the same client name are sent to the Kafka stream, leading to multiple threads attempting to ingest data into the same graph.
I have implemented two different approaches for creating graph instances in this context. I am seeking advice and insights from the JanusGraph community on how to effectively handle this scenario and ensure proper data ingestion into the graph in a multi-threaded environment
First Approach:
JanusGraphFactory.Builder builder = JanusGraphFactory.build()
.set("storage.backend", config.getString("app.janus.storage.backend"))
.set("storage.hostname", config.getString("app.janus.storage.hostname"))
.set("storage.cql.keyspace", client)
.set("storage.batch-loading", config.getBoolean("app.janus.storage.batch-loading"))
.set("storage.buffer-size", config.getInt("app.janus.storage.buffer-size"));
if (config.getBoolean("app.janus.index.storage.configured")) {
builder.set("index.search.backend", "elasticsearch")
.set("index.search.hostname",
config.getString("app.janus.index.storage.hosts"));
}
graph = builder.open();
LOGGER.info("Thread id ::::::: " + this.threadID +":::"+"******JanusGraph: " + this.graph);
this.graphSource = this.graph.traversal();
LOGGER.info("Thread id ::::::: " + this.threadID +":::"+"graphSource : " + graphSource);
if (!graphSource.tx().isOpen()||graphSource.tx() == null ) {
LOGGER.info("Thread id ::::::: " + threadID +":::"+" graphSource.tx() : " + graphSource.tx() );
graphSource.tx().open();
}
Above code is to open graph instance and transaction
@OverRide
public void close() throws Exception {
if (this.graphSource.tx().isOpen()) {
this.graphSource.tx().commit();
this.graphSource.tx().close();
LOGGER.info("Thread id ::::::: " + this.threadID + ":::" + "closed graph txn");
}
if (this.graphSource != null)
this.graphSource.close();
LOGGER.info("Thread id ::::::: " + this.threadID +":::"+"closed graph source");
if (graph != null)
this.graph.close();
LOGGER.info("Thread id ::::::: " + this.threadID +":::"+"closed the graph instance");
}
This is the implementation of Auto-closable interface where graph transactions and instances are closed after control exits the scope.
In this approach multiple threads are connecting to same graph instance and same graph transactions, and when one thread complete execution it closes the graph instance and transactions and other threads throw exception as they cannot commit the transactions.
Second Approach is:
WriteConfiguration configuration = new CommonsConfiguration();
configuration.set("storage.backend", config.getString("app.janus.storage.backend"));
configuration .set("storage.hostname", config.getString("app.janus.storage.hostname"));
configuration.set("storage.cql.keyspace", client);
configuration.set("storage.batch-loading", config.getBoolean("app.janus.storage.batch-loading"));
configuration.set("storage.buffer-size", config.getInt("app.janus.storage.buffer-size"));
configuration.set("index.search.backend", "elasticsearch");
configuration.set("storage.lock.wait-time",5000);
configuration.set("storage.write-consistency-level",config.getString("app.janus.storage.write-consistency-level"));
configuration.set("index.search.hostname",
config.getString("app.janus.index.storage.hosts"));
configuration.set("storage.cql.executor-service.core-pool-size",5);
graph = new StandardJanusGraph(new GraphDatabaseConfigurationBuilder().build(configuration));
This worked for certain amount of time then eventually threads started getting blocked or stuck in waiting state which cause retention of resourses by threads. This is leading to OutOfMemeory Exception which is as follows
2023-12-04 14:55:11.589
1202598 [cluster-rtt-ClusterId{value='656d96454e315779b58dbbe6', description='null'}-mcs-mongo01.stg.phenom.local:27017] INFO org.mongodb.driver.connection - Opened connection [connectionId{localValue:11, serverValue:4254937}] to mcs-mongo01.stg.phenom.local:27017
2023-12-04 14:55:11.589
1202598 [pool-1-thread-5] INFO org.janusgraph.diskstorage.Backend - Configuring index [search]
2023-12-04 14:55:11.588
1202597 [pool-1-thread-5] INFO org.janusgraph.diskstorage.configuration.ExecutorServiceBuilder - Initiated fixed thread pool of size 10
2023-12-04 14:55:11.587
java.lang.OutOfMemoryError: Java heap space
2023-12-04 14:55:11.587
Exception in thread "JanusGraph Session-io-1" Exception in thread "JanusGraph Session-io-0" java.lang.OutOfMemoryError: Java heap space
2023-12-04 14:55:11.586
java.lang.OutOfMemoryError: Java heap space
2023-12-04 14:55:11.586
1202587 [JanusGraph Session-io-0] WARN io.netty.util.concurrent.SingleThreadEventExecutor - Unexpected exception from an event executor:
2023-12-04 14:55:11.586
java.lang.OutOfMemoryError: Java heap space
2023-12-04 14:55:11.586
1202587 [JanusGraph Session-io-1] WARN io.netty.util.concurrent.AbstractEventExecutor - A task raised an exception. Task: ScheduledFutureTask@54e32fb4(uncancellable, task: io.netty.handler.timeout.IdleStateHandler$ReaderIdleTimeoutTask@5691129c, deadline: 0, period: 0)
2023-12-04 14:55:11.586
Caused by: java.lang.OutOfMemoryError: Java heap space
2023-12-04 14:55:11.586
at java.base/java.lang.Thread.run(Thread.java:829)
2023-12-04 14:55:11.586
at com.mongodb.internal.connection.DefaultServerMonitor$ServerMonitorRunnable.run(DefaultServerMonitor.java:152)
2023-12-04 14:55:11.586
at com.mongodb.internal.connection.DefaultServerMonitor$ServerMonitorRunnable.lookupServerDescription(DefaultServerMonitor.java:216)
2023-12-04 14:55:11.586
at com.mongodb.internal.connection.InternalStreamConnection.receive(InternalStreamConnection.java:374)
2023-12-04 14:55:11.586
at com.mongodb.internal.connection.InternalStreamConnection.receiveCommandMessageResponse(InternalStreamConnection.java:415)
2023-12-04 14:55:11.586
at com.mongodb.internal.connection.InternalStreamConnection.receiveMessageWithAdditionalTimeout(InternalStreamConnection.java:579)
2023-12-04 14:55:11.586
at com.mongodb.internal.connection.InternalStreamConnection.translateReadException(InternalStreamConnection.java:713)
2023-12-04 14:55:11.586
com.mongodb.MongoInternalException: Unexpected exception
2023-12-04 14:55:11.586
1202588 [cluster-ClusterId{value='656d96454e315779b58dbbe6', description='null'}-mcs-mongo02.stg.phenom.local:27017] INFO org.mongodb.driver.cluster - Exception in monitor thread while connecting to server mcs-mongo02.stg.phenom.local:27017
2023-12-04 14:55:11.585
1202589 [JanusGraph Session-admin-0] WARN com.datastax.oss.driver.internal.core.pool.ChannelPool - [JanusGraph Session|etg-cassandra03.stg.phenom.local/172.29.9.94:9042] Error while opening new channel (ConnectionInitException: [JanusGraph Session|connecting...] Protocol initialization request, step 1 (STARTUP {CQL_VERSION=3.0.0, DRIVER_NAME=DataStax Java driver for Apache Cassandra(R), DRIVER_VERSION=4.13.0, CLIENT_ID=de76ec99-4a88-49e8-b8bd-363d5ce88cd1}): failed to send request (java.nio.channels.NotYetConnectedException))
2023-12-04 14:55:11.585
Caused by: java.lang.OutOfMemoryError: Java heap space
2023-12-04 14:55:11.585
at java.base/java.lang.Thread.run(Thread.java:829)
2023-12-04 14:55:11.585
at org.apache.http.impl.nio.client.CloseableHttpAsyncClientBase$1.run(CloseableHttpAsyncClientBase.java:64)
2023-12-04 14:55:11.585
at org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager.execute(PoolingNHttpClientConnectionManager.java:221)
2023-12-04 14:55:11.585
at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor.execute(AbstractMultiworkerIOReactor.java:359)
2023-12-04 14:55:11.575
org.apache.http.nio.reactor.IOReactorException: I/O dispatch worker terminated abnormally
2023-12-04 14:55:11.549
1202558 [pool-485-thread-1] ERROR org.apache.http.impl.nio.client.InternalHttpAsyncClient - I/O reactor terminated abnormally
2023-12-04 14:55:09.796
1200805 [JanusGraph Session-admin-0] WARN com.datastax.oss.driver.internal.core.session.DefaultSession - [JanusGraph Session] Unexpected error while refreshing schema during initialization, proceeding without schema metadata (CompletionException: java.lang.OutOfMemoryError: Java heap space)
2023-12-04 14:55:01.610
Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "I/O dispatcher 24"
Can someone help me with how to implement multi-threading for data ingestion into graph?
Beta Was this translation helpful? Give feedback.
All reactions