-
Notifications
You must be signed in to change notification settings - Fork 83
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
URGENT: Multiple Requests Handle Issue #66
Comments
Thanks for the report. I'll work on reproducing the issue tomorrow. |
@datalatics-official Could you please describe your workload? Are you sending multiple requests asynchronously? Also, those numbers for connections per host are really high. The driver does coalescing of requests and having more connections can mitigate the benefit of that. |
I'm unable to reproduce using driver v2.1.10.3 using the following for load: package org.example;
import com.codahale.metrics.ConsoleReporter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.codahale.metrics.Timer.Context;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.ConsistencyLevel;
import com.datastax.driver.core.HostDistance;
import com.datastax.driver.core.PoolingOptions;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.SocketOptions;
import com.datastax.driver.core.policies.ConstantSpeculativeExecutionPolicy;
import com.datastax.driver.core.policies.DefaultRetryPolicy;
import com.datastax.driver.core.policies.LatencyAwarePolicy;
import com.datastax.driver.core.policies.RoundRobinPolicy;
import com.datastax.driver.core.policies.TokenAwarePolicy;
import com.google.common.util.concurrent.Futures;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class App
{
private static final int numThreads = 10;
private static final MetricRegistry metrics = new MetricRegistry();
private static final Timer requests = metrics.timer("requests");
private static final ExecutorService executor = Executors.newFixedThreadPool(numThreads);
public static void main( String[] args ) throws InterruptedException {
ConsoleReporter reporter = ConsoleReporter.forRegistry(metrics)
.convertRatesTo(TimeUnit.SECONDS)
.convertDurationsTo(TimeUnit.MILLISECONDS)
.build();
reporter.start(2, TimeUnit.SECONDS);
Cluster cluster = null;
try {
cluster = Cluster.builder()
.addContactPoint("127.0.0.1")
.withPort(9042)
.withPoolingOptions(new PoolingOptions().setConnectionsPerHost(HostDistance.LOCAL, 40, 10000).setConnectionsPerHost(HostDistance.REMOTE, 20, 5000))
.withSocketOptions(new SocketOptions().setReadTimeoutMillis(5000))
.withLoadBalancingPolicy(LatencyAwarePolicy.builder(new TokenAwarePolicy(new RoundRobinPolicy())).build())
.withRetryPolicy(DefaultRetryPolicy.INSTANCE)
.withSpeculativeExecutionPolicy(new ConstantSpeculativeExecutionPolicy(500, 2))
.build();
Session session = cluster.connect("testks1");
PreparedStatement preparedStatement = session.prepare("INSERT INTO testks1.test (pk) VALUES(?)").setConsistencyLevel(
ConsistencyLevel.LOCAL_QUORUM);
for (int i = 0; i < numThreads; ++i) {
final long value = i;
executor.submit(() -> {
while (true) {
try {
try (Context ignored = requests.time()) {
final int numRequests = 100;
List<ResultSetFuture> futures = new ArrayList<>(numRequests);
for (int j = 0; j < numRequests; ++j) {
futures.add(
session.executeAsync(preparedStatement.bind(value)));
}
Futures.successfulAsList(futures).get();
}
} catch (Exception e) {
System.err.println(e);
}
}
});
}
while (!executor.awaitTermination(9999, TimeUnit.DAYS)) {
}
} finally {
if (cluster != null) cluster.close();
}
}
} |
@mpenick Thanks for quick response. Let me check this in details. Get back to you soon with my feedback. |
Hello @mpenick
Currently I'm facing another issue. This is related to timeout. Let me paste the error message here:
Query (SELECT * FROM table_name WHERE id=?;) failed com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: host:9042 (com.datastax.driver.core.exceptions.DriverException: Timeout while trying to acquire available connection (you may want to increase the driver number of per-host connections)))
I've event increase the per host connections. Let share that part of code with you here:
private val cluster = Cluster.builder() .addContactPoints(configuration.astraHostnames: _*) .withPort(9042) .withPoolingOptions(new PoolingOptions().setConnectionsPerHost(HostDistance.LOCAL, 40, 10000).setConnectionsPerHost(HostDistance.REMOTE, 20, 5000)) .withSocketOptions(new SocketOptions().setReadTimeoutMillis(configuration.cassandraReadTimeout)) .withLoadBalancingPolicy(LatencyAwarePolicy.builder(new TokenAwarePolicy(new RoundRobinPolicy())).build) .withRetryPolicy(DefaultRetryPolicy.INSTANCE) .withSpeculativeExecutionPolicy(new ConstantSpeculativeExecutionPolicy(500, 2)) .build()
I think driver is working properly which is v.2.1.10.3. But my cql-proxy service is unable to handle multiple requests in hundreds at the same time.
Could you please share that how many request cql-proxy can handle at a time? and how I can change that number?
The text was updated successfully, but these errors were encountered: