Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

3.x: Reconnect to original contact points #344

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions driver-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,13 @@
<version>1.78.1</version>
</dependency>

<!-- added for easier DNS hostname resolution mocking -->
<dependency>
<groupId>org.burningwave</groupId>
<artifactId>tools</artifactId>
<scope>test</scope>
</dependency>


</dependencies>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import com.datastax.driver.core.utils.MoreFutures;
import com.datastax.driver.core.utils.MoreObjects;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterators;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
Expand Down Expand Up @@ -160,6 +161,15 @@ protected Connection tryReconnect() throws ConnectionException {
if (isShutdown) throw new ConnectionException(null, "Control connection was shut down");

try {
if (cluster
.configuration
.getQueryOptions()
.shouldAddOriginalContactsToReconnectionPlan()) {
List<Host> initialContacts = cluster.metadata.getContactPoints();
Collections.shuffle(initialContacts);
return reconnectInternal(
Iterators.concat(queryPlan(), initialContacts.iterator()), false);
}
return reconnectInternal(queryPlan(), false);
} catch (NoHostAvailableException e) {
throw new ConnectionException(null, e.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ public class QueryOptions {

private volatile boolean schemaQueriesPaged = true;

private volatile boolean addOriginalContactsToReconnectionPlan = false;

/**
* Creates a new {@link QueryOptions} instance using the {@link #DEFAULT_CONSISTENCY_LEVEL},
* {@link #DEFAULT_SERIAL_CONSISTENCY_LEVEL} and {@link #DEFAULT_FETCH_SIZE}.
Expand Down Expand Up @@ -499,6 +501,26 @@ public int getMaxPendingRefreshNodeRequests() {
return maxPendingRefreshNodeRequests;
}

/**
* Whether the driver should use original contact points when reconnecting to a control node. In
* practice this forces driver to manually add original contact points to the end of the query
* plan. It is possible that it may introduce duplicates (but under differnet Host class
* instances) in the query plan. If this is set to false it does not mean that original contact
* points will be excluded.
*
* <p>One use case of this feature is that if the original contact point is defined by hostname
* and its IP address changes then setting this to {@code true} allows trying reconnecting to the
* new IP if all connection was lost.
*/
public QueryOptions setAddOriginalContactsToReconnectionPlan(boolean enabled) {
this.addOriginalContactsToReconnectionPlan = enabled;
return this;
}

public boolean shouldAddOriginalContactsToReconnectionPlan() {
return this.addOriginalContactsToReconnectionPlan;
}

@Override
public boolean equals(Object that) {
if (that == null || !(that instanceof QueryOptions)) {
Expand Down
12 changes: 12 additions & 0 deletions driver-core/src/test/java/com/datastax/driver/core/CCMBridge.java
Original file line number Diff line number Diff line change
Expand Up @@ -949,6 +949,7 @@ public static class Builder {
private static final Pattern RANDOM_PORT_PATTERN = Pattern.compile(RANDOM_PORT);

private String ipPrefix = TestUtils.IP_PREFIX;
private String providedClusterName = null;
int[] nodes = {1};
private int[] jmxPorts = {};
private boolean start = true;
Expand Down Expand Up @@ -991,6 +992,15 @@ public Builder withSniProxy() {
return this;
}

/**
* Builder takes care of naming and numbering clusters on its own. Use if you really need a
* specific name
*/
public Builder withClusterName(String clusterName) {
this.providedClusterName = clusterName;
return this;
}

/** Enables SSL encryption. */
public Builder withSSL() {
cassandraConfiguration.put("client_encryption_options.enabled", "true");
Expand Down Expand Up @@ -1115,6 +1125,8 @@ public CCMBridge build() {
// be careful NOT to alter internal state (hashCode/equals) during build!
String clusterName = TestUtils.generateIdentifier("ccm_");

if (providedClusterName != null) clusterName = providedClusterName;

VersionNumber dseVersion;
VersionNumber cassandraVersion;
boolean versionConfigured = this.version != null;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package com.datastax.driver.core;

import static org.assertj.core.api.Assertions.assertThat;
import static org.testng.Assert.assertTrue;

import java.net.InetSocketAddress;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.Test;

public class DnsEndpointTests {

private static final Logger logger = LoggerFactory.getLogger(DnsEndpointTests.class);

@Test(groups = "long")
public void replace_cluster_test() {
// Configure host resolution
MappedHostResolverProvider.addResolverEntry("control.reconnect.test", "127.1.1.1");
dkropachev marked this conversation as resolved.
Show resolved Hide resolved

Cluster cluster = null;
Session session = null;
CCMBridge bridgeA = null;
try {
bridgeA =
CCMBridge.builder()
.withNodes(1)
.withIpPrefix("127.1.1.")
.withBinaryPort(9042)
.withClusterName("same_name")
.build();
bridgeA.start();

cluster =
Cluster.builder()
.addContactPointsWithPorts(
InetSocketAddress.createUnresolved("control.reconnect.test", 9042))
.withPort(9042)
.withoutAdvancedShardAwareness()
.withQueryOptions(new QueryOptions().setAddOriginalContactsToReconnectionPlan(true))
.build();
session = cluster.connect();

ResultSet rs = session.execute("select * from system.local");
Row row = rs.one();
String address = row.getInet("broadcast_address").toString();
logger.info("Queried node has broadcast_address: {}}", address);
System.out.flush();
} finally {
assert bridgeA != null;
bridgeA.close();
}

CCMBridge bridgeB = null;
// Overwrite host resolution
MappedHostResolverProvider.removeResolverEntry("control.reconnect.test");
MappedHostResolverProvider.addResolverEntry("control.reconnect.test", "127.2.2.1");
try {
bridgeB =
CCMBridge.builder()
.withNodes(1)
.withIpPrefix("127.2.2.")
.withBinaryPort(9042)
.withClusterName("same_name")
.build();
bridgeB.start();
Thread.sleep(1000 * 92);
ResultSet rs = session.execute("select * from system.local");
Row row = rs.one();
String address = row.getInet("broadcast_address").toString();
logger.info("Queried node has broadcast_address: {}}", address);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
assert bridgeB != null;
bridgeB.close();
}
}

@Test(groups = "long")
public void should_connect_with_mocked_hostname() {
MappedHostResolverProvider.addResolverEntry("mocked.hostname.test", "127.0.1.1");
try (CCMBridge ccmBridge =
CCMBridge.builder().withNodes(1).withIpPrefix("127.0.1.").withBinaryPort(9042).build();
Cluster cluster =
Cluster.builder()
.addContactPointsWithPorts(
InetSocketAddress.createUnresolved("mocked.hostname.test", 9042))
.withPort(9042)
.withoutAdvancedShardAwareness()
.build()) {
ccmBridge.start();
Session session = cluster.connect();
ResultSet rs = session.execute("SELECT * FROM system.local");
List<Row> rows = rs.all();
assertThat(rows).hasSize(1);
Row row = rows.get(0);
assertThat(row.getInet("broadcast_address").toString()).contains("127.0.1.1");
assertTrue(
session.getCluster().getMetadata().getAllHosts().stream()
.map(Host::toString)
.anyMatch(hostString -> hostString.contains("mocked.hostname.test")));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package com.datastax.driver.core;

import org.burningwave.tools.net.DefaultHostResolver;
import org.burningwave.tools.net.HostResolutionRequestInterceptor;
import org.burningwave.tools.net.MappedHostResolver;

public class MappedHostResolverProvider {
private static volatile MappedHostResolver resolver = null;

private MappedHostResolverProvider() {}

public static synchronized boolean setResolver(MappedHostResolver newResolver) {
if (resolver != null) {
return false;
}
resolver = newResolver;
HostResolutionRequestInterceptor.INSTANCE.install(resolver, DefaultHostResolver.INSTANCE);
return true;
}

public static synchronized void addResolverEntry(String hostname, String address) {
if (resolver == null) {
setResolver(new MappedHostResolver());
}
resolver.putHost(hostname, address);
}

public static synchronized void removeResolverEntry(String hostname) {
if (resolver == null) {
return;
}
resolver.removeHost(hostname);
}
}
4 changes: 4 additions & 0 deletions driver-core/src/test/resources/burningwave.static.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
managed-logger.repository=autodetect
managed-logger.repository.enabled=false
banner.hide=true
priority-of-this-configuration=1000
7 changes: 7 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@
<scassandra.version>1.1.2</scassandra.version>
<logback.version>1.2.13</logback.version>
<byteman.version>3.0.8</byteman.version>
<burningwave.tools.version>0.26.2</burningwave.tools.version>
<ipprefix>127.0.1.</ipprefix>
<!-- defaults below are overridden by profiles and/or submodules -->
<test.groups>unit</test.groups>
Expand Down Expand Up @@ -398,6 +399,12 @@
<version>${groovy.version}</version>
</dependency>

<dependency>
<groupId>org.burningwave</groupId>
<artifactId>tools</artifactId>
<version>${burningwave.tools.version}</version>
</dependency>

</dependencies>

</dependencyManagement>
Expand Down
Loading