From 1cfc2df60be5a6d7bb6fd847eab9a1125aeea3db Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20B=C4=85czkowski?= <36934780+Bouncheck@users.noreply.github.com> Date: Fri, 22 Nov 2024 15:56:57 +0100 Subject: [PATCH] Add option to consider initial contact points during reconnection (#344) When control connection tries to reconnect usually it considers only nodes provided by load balancing policy. Usually those do not include what was initially passed to the driver but the recently seen alive nodes. In some setups the IPs can keep changing so it may be useful to have an option to try initial contact points as one of the options during reconnection. Mainly if the contact point is a hostname. This change adds the option to the `QueryOptions` to control that behaviour and adds necessary logic to `ControlConnection` class. It is disabled by default, meaning that default behaviour remains unchanged. Additionally adds org.burningwave tools dependency. This dependency has features that allow for easier host resolution mocking. Adds MappedHostResolverProvider class for testing as a single entry point for controlling hostname resolution. Adds an option to CcmBridge Builder to specify cluster name. Driver checks the cluster name when reconnecting so it will refuse to reconnect to a different CcmBridge auto-generated name. --- driver-core/pom.xml | 7 ++ .../driver/core/ControlConnection.java | 10 ++ .../datastax/driver/core/QueryOptions.java | 22 ++++ .../com/datastax/driver/core/CCMBridge.java | 12 ++ .../driver/core/DnsEndpointTests.java | 105 ++++++++++++++++++ .../core/MappedHostResolverProvider.java | 34 ++++++ .../resources/burningwave.static.properties | 4 + pom.xml | 7 ++ 8 files changed, 201 insertions(+) create mode 100644 driver-core/src/test/java/com/datastax/driver/core/DnsEndpointTests.java create mode 100644 driver-core/src/test/java/com/datastax/driver/core/MappedHostResolverProvider.java create mode 100644 driver-core/src/test/resources/burningwave.static.properties diff --git a/driver-core/pom.xml b/driver-core/pom.xml index f7d727012dd..03b427bd8e6 100644 --- a/driver-core/pom.xml +++ b/driver-core/pom.xml @@ -195,6 +195,13 @@ 1.78.1 + + + org.burningwave + tools + test + + diff --git a/driver-core/src/main/java/com/datastax/driver/core/ControlConnection.java b/driver-core/src/main/java/com/datastax/driver/core/ControlConnection.java index cb2f424df72..7f3c3bde636 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/ControlConnection.java +++ b/driver-core/src/main/java/com/datastax/driver/core/ControlConnection.java @@ -34,6 +34,7 @@ import com.datastax.driver.core.utils.MoreFutures; import com.datastax.driver.core.utils.MoreObjects; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Iterators; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; @@ -160,6 +161,15 @@ protected Connection tryReconnect() throws ConnectionException { if (isShutdown) throw new ConnectionException(null, "Control connection was shut down"); try { + if (cluster + .configuration + .getQueryOptions() + .shouldAddOriginalContactsToReconnectionPlan()) { + List initialContacts = cluster.metadata.getContactPoints(); + Collections.shuffle(initialContacts); + return reconnectInternal( + Iterators.concat(queryPlan(), initialContacts.iterator()), false); + } return reconnectInternal(queryPlan(), false); } catch (NoHostAvailableException e) { throw new ConnectionException(null, e.getMessage()); diff --git a/driver-core/src/main/java/com/datastax/driver/core/QueryOptions.java b/driver-core/src/main/java/com/datastax/driver/core/QueryOptions.java index 9fcd6a437d4..137c56aa747 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/QueryOptions.java +++ b/driver-core/src/main/java/com/datastax/driver/core/QueryOptions.java @@ -72,6 +72,8 @@ public class QueryOptions { private volatile boolean schemaQueriesPaged = true; + private volatile boolean addOriginalContactsToReconnectionPlan = false; + /** * Creates a new {@link QueryOptions} instance using the {@link #DEFAULT_CONSISTENCY_LEVEL}, * {@link #DEFAULT_SERIAL_CONSISTENCY_LEVEL} and {@link #DEFAULT_FETCH_SIZE}. @@ -499,6 +501,26 @@ public int getMaxPendingRefreshNodeRequests() { return maxPendingRefreshNodeRequests; } + /** + * Whether the driver should use original contact points when reconnecting to a control node. In + * practice this forces driver to manually add original contact points to the end of the query + * plan. It is possible that it may introduce duplicates (but under differnet Host class + * instances) in the query plan. If this is set to false it does not mean that original contact + * points will be excluded. + * + *

One use case of this feature is that if the original contact point is defined by hostname + * and its IP address changes then setting this to {@code true} allows trying reconnecting to the + * new IP if all connection was lost. + */ + public QueryOptions setAddOriginalContactsToReconnectionPlan(boolean enabled) { + this.addOriginalContactsToReconnectionPlan = enabled; + return this; + } + + public boolean shouldAddOriginalContactsToReconnectionPlan() { + return this.addOriginalContactsToReconnectionPlan; + } + @Override public boolean equals(Object that) { if (that == null || !(that instanceof QueryOptions)) { diff --git a/driver-core/src/test/java/com/datastax/driver/core/CCMBridge.java b/driver-core/src/test/java/com/datastax/driver/core/CCMBridge.java index 4c9ba61fc57..dc9b807c023 100644 --- a/driver-core/src/test/java/com/datastax/driver/core/CCMBridge.java +++ b/driver-core/src/test/java/com/datastax/driver/core/CCMBridge.java @@ -949,6 +949,7 @@ public static class Builder { private static final Pattern RANDOM_PORT_PATTERN = Pattern.compile(RANDOM_PORT); private String ipPrefix = TestUtils.IP_PREFIX; + private String providedClusterName = null; int[] nodes = {1}; private int[] jmxPorts = {}; private boolean start = true; @@ -991,6 +992,15 @@ public Builder withSniProxy() { return this; } + /** + * Builder takes care of naming and numbering clusters on its own. Use if you really need a + * specific name + */ + public Builder withClusterName(String clusterName) { + this.providedClusterName = clusterName; + return this; + } + /** Enables SSL encryption. */ public Builder withSSL() { cassandraConfiguration.put("client_encryption_options.enabled", "true"); @@ -1115,6 +1125,8 @@ public CCMBridge build() { // be careful NOT to alter internal state (hashCode/equals) during build! String clusterName = TestUtils.generateIdentifier("ccm_"); + if (providedClusterName != null) clusterName = providedClusterName; + VersionNumber dseVersion; VersionNumber cassandraVersion; boolean versionConfigured = this.version != null; diff --git a/driver-core/src/test/java/com/datastax/driver/core/DnsEndpointTests.java b/driver-core/src/test/java/com/datastax/driver/core/DnsEndpointTests.java new file mode 100644 index 00000000000..150f7a3a2dc --- /dev/null +++ b/driver-core/src/test/java/com/datastax/driver/core/DnsEndpointTests.java @@ -0,0 +1,105 @@ +package com.datastax.driver.core; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.testng.Assert.assertTrue; + +import java.net.InetSocketAddress; +import java.util.List; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.annotations.Test; + +public class DnsEndpointTests { + + private static final Logger logger = LoggerFactory.getLogger(DnsEndpointTests.class); + + @Test(groups = "long") + public void replace_cluster_test() { + // Configure host resolution + MappedHostResolverProvider.addResolverEntry("control.reconnect.test", "127.1.1.1"); + + Cluster cluster = null; + Session session = null; + CCMBridge bridgeA = null; + try { + bridgeA = + CCMBridge.builder() + .withNodes(1) + .withIpPrefix("127.1.1.") + .withBinaryPort(9042) + .withClusterName("same_name") + .build(); + bridgeA.start(); + + cluster = + Cluster.builder() + .addContactPointsWithPorts( + InetSocketAddress.createUnresolved("control.reconnect.test", 9042)) + .withPort(9042) + .withoutAdvancedShardAwareness() + .withQueryOptions(new QueryOptions().setAddOriginalContactsToReconnectionPlan(true)) + .build(); + session = cluster.connect(); + + ResultSet rs = session.execute("select * from system.local"); + Row row = rs.one(); + String address = row.getInet("broadcast_address").toString(); + logger.info("Queried node has broadcast_address: {}}", address); + System.out.flush(); + } finally { + assert bridgeA != null; + bridgeA.close(); + } + + CCMBridge bridgeB = null; + // Overwrite host resolution + MappedHostResolverProvider.removeResolverEntry("control.reconnect.test"); + MappedHostResolverProvider.addResolverEntry("control.reconnect.test", "127.2.2.1"); + try { + bridgeB = + CCMBridge.builder() + .withNodes(1) + .withIpPrefix("127.2.2.") + .withBinaryPort(9042) + .withClusterName("same_name") + .build(); + bridgeB.start(); + Thread.sleep(1000 * 92); + ResultSet rs = session.execute("select * from system.local"); + Row row = rs.one(); + String address = row.getInet("broadcast_address").toString(); + logger.info("Queried node has broadcast_address: {}}", address); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } finally { + assert bridgeB != null; + bridgeB.close(); + } + } + + @Test(groups = "long") + public void should_connect_with_mocked_hostname() { + MappedHostResolverProvider.addResolverEntry("mocked.hostname.test", "127.0.1.1"); + try (CCMBridge ccmBridge = + CCMBridge.builder().withNodes(1).withIpPrefix("127.0.1.").withBinaryPort(9042).build(); + Cluster cluster = + Cluster.builder() + .addContactPointsWithPorts( + InetSocketAddress.createUnresolved("mocked.hostname.test", 9042)) + .withPort(9042) + .withoutAdvancedShardAwareness() + .build()) { + ccmBridge.start(); + Session session = cluster.connect(); + ResultSet rs = session.execute("SELECT * FROM system.local"); + List rows = rs.all(); + assertThat(rows).hasSize(1); + Row row = rows.get(0); + assertThat(row.getInet("broadcast_address").toString()).contains("127.0.1.1"); + assertTrue( + session.getCluster().getMetadata().getAllHosts().stream() + .map(Host::toString) + .anyMatch(hostString -> hostString.contains("mocked.hostname.test"))); + } + } +} diff --git a/driver-core/src/test/java/com/datastax/driver/core/MappedHostResolverProvider.java b/driver-core/src/test/java/com/datastax/driver/core/MappedHostResolverProvider.java new file mode 100644 index 00000000000..1bc4b1884ba --- /dev/null +++ b/driver-core/src/test/java/com/datastax/driver/core/MappedHostResolverProvider.java @@ -0,0 +1,34 @@ +package com.datastax.driver.core; + +import org.burningwave.tools.net.DefaultHostResolver; +import org.burningwave.tools.net.HostResolutionRequestInterceptor; +import org.burningwave.tools.net.MappedHostResolver; + +public class MappedHostResolverProvider { + private static volatile MappedHostResolver resolver = null; + + private MappedHostResolverProvider() {} + + public static synchronized boolean setResolver(MappedHostResolver newResolver) { + if (resolver != null) { + return false; + } + resolver = newResolver; + HostResolutionRequestInterceptor.INSTANCE.install(resolver, DefaultHostResolver.INSTANCE); + return true; + } + + public static synchronized void addResolverEntry(String hostname, String address) { + if (resolver == null) { + setResolver(new MappedHostResolver()); + } + resolver.putHost(hostname, address); + } + + public static synchronized void removeResolverEntry(String hostname) { + if (resolver == null) { + return; + } + resolver.removeHost(hostname); + } +} diff --git a/driver-core/src/test/resources/burningwave.static.properties b/driver-core/src/test/resources/burningwave.static.properties new file mode 100644 index 00000000000..7108b42c0fb --- /dev/null +++ b/driver-core/src/test/resources/burningwave.static.properties @@ -0,0 +1,4 @@ +managed-logger.repository=autodetect +managed-logger.repository.enabled=false +banner.hide=true +priority-of-this-configuration=1000 \ No newline at end of file diff --git a/pom.xml b/pom.xml index 6c500d4c022..19c200f5eb9 100644 --- a/pom.xml +++ b/pom.xml @@ -86,6 +86,7 @@ 1.1.2 1.2.13 3.0.8 + 0.26.2 127.0.1. unit @@ -398,6 +399,12 @@ ${groovy.version} + + org.burningwave + tools + ${burningwave.tools.version} + +