Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

Commit

Permalink
failfast
Browse files Browse the repository at this point in the history
  • Loading branch information
Jolyon-S committed Mar 2, 2022
1 parent 6c3f727 commit 7495396
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,17 @@ default int socketQueryTimeoutMillis() {
return 62 * 1000;
}

/**
* When a Cassandra node is down or acting malignantly, it is plausible that we succeed in creating a socket but
* subsequently do not read anything, and thus are at the mercy of the {@link #socketQueryTimeoutMillis()}. This is
* particularly problematic on the creation of transaction managers and client pools in general: we can end up in
* a state where a query to a bad host blocks us for _at least_
*/
@Value.Default
default int initialSocketQueryTimeoutMillis() {
return 2 * 1000;
}

@Value.Default
default int cqlPoolTimeoutMillis() {
return 20 * 1000;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,13 @@
import com.palantir.logsafe.logger.SafeLoggerFactory;
import com.palantir.util.TimedRunner;
import com.palantir.util.TimedRunner.TaskContext;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketException;
import java.util.HashMap;
import java.util.Map;
import javax.net.ssl.SSLSocket;
import javax.net.ssl.SSLSocketFactory;
import org.apache.cassandra.thrift.AuthenticationRequest;
import org.apache.cassandra.thrift.Cassandra;
import org.apache.cassandra.thrift.Cassandra.Client;
Expand All @@ -46,14 +53,6 @@
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;

import javax.net.ssl.SSLSocket;
import javax.net.ssl.SSLSocketFactory;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketException;
import java.util.HashMap;
import java.util.Map;

public class CassandraClientFactory extends BasePooledObjectFactory<CassandraClient> {
private static final SafeLogger log = SafeLoggerFactory.get(CassandraClientFactory.class);
private static final LoadingCache<SslConfiguration, SSLSocketFactory> sslSocketFactoryCache =
Expand Down Expand Up @@ -140,15 +139,13 @@ private static Cassandra.Client getRawClient(
throws TException {
TSocket thriftSocket = new TSocket(addr.getHostString(), addr.getPort(), config.socketTimeoutMillis());
thriftSocket.open();
try {
thriftSocket.getSocket().setKeepAlive(true);
thriftSocket.getSocket().setSoTimeout(config.socketQueryTimeoutMillis());
} catch (SocketException e) {
log.error(
"Couldn't set socket keep alive for host {}",
SafeArg.of("address", CassandraLogHelper.host(addr)),
e);
}
setSocketOptions(
thriftSocket,
socket -> {
socket.getSocket().setKeepAlive(true);
socket.getSocket().setSoTimeout(config.initialSocketQueryTimeoutMillis());
},
addr);

if (config.usingSsl()) {
boolean success = false;
Expand All @@ -172,6 +169,8 @@ private static Cassandra.Client getRawClient(

try {
login(client, config.credentials());
setSocketOptions(
thriftSocket, socket -> socket.getSocket().setSoTimeout(config.socketQueryTimeoutMillis()), addr);
} catch (TException e) {
client.getOutputProtocol().getTransport().close();
log.error("Exception thrown attempting to authenticate with config provided credentials", e);
Expand Down Expand Up @@ -249,4 +248,18 @@ public Exception getCause() {
return (Exception) super.getCause();
}
}

private static void setSocketOptions(TSocket thriftSocket, SocketConsumer consumer, InetSocketAddress addr) {
try {
consumer.accept(thriftSocket);
} catch (SocketException e) {
log.error(
"Couldn't set socket options for host {}", SafeArg.of("address", CassandraLogHelper.host(addr)), e);
}
}

@FunctionalInterface
private interface SocketConsumer {
void accept(TSocket value) throws SocketException;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,6 @@
import com.palantir.logsafe.logger.SafeLogger;
import com.palantir.logsafe.logger.SafeLoggerFactory;
import com.palantir.util.Pair;
import org.apache.cassandra.thrift.CfDef;
import org.apache.cassandra.thrift.Column;
import org.apache.cassandra.thrift.ColumnOrSuperColumn;
import org.apache.thrift.TException;

import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.charset.Charset;
Expand All @@ -54,6 +49,10 @@
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import org.apache.cassandra.thrift.CfDef;
import org.apache.cassandra.thrift.Column;
import org.apache.cassandra.thrift.ColumnOrSuperColumn;
import org.apache.thrift.TException;

public final class CassandraKeyValueServices {
private static final SafeLogger log = SafeLoggerFactory.get(CassandraKeyValueServices.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,16 @@
import com.palantir.atlasdb.logging.LoggingArgs;
import com.palantir.tritium.metrics.registry.MetricName;
import com.palantir.tritium.metrics.registry.TaggedMetricRegistry;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import org.apache.cassandra.thrift.ConsistencyLevel;
import org.apache.cassandra.thrift.InvalidRequestException;
import org.apache.cassandra.thrift.Mutation;
import org.apache.cassandra.thrift.TimedOutException;
import org.apache.cassandra.thrift.UnavailableException;
import org.apache.thrift.TException;

import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;

@SuppressWarnings({"all"}) // thrift variable names.
public class InstrumentedCassandraClient implements AutoDelegate_CassandraClient {
private final CassandraClient delegate;
Expand Down

0 comments on commit 7495396

Please sign in to comment.