From e9ae2c1c32f55d1151cf5726b3ca6a43b7cba35d Mon Sep 17 00:00:00 2001 From: Dmitry Kropachev Date: Fri, 4 Oct 2024 15:23:38 -0400 Subject: [PATCH] Connect to all DNS records of unresolved endpoint netty bootstrap.connect uses only first address of unresolved InetSocketAddress. That causes 4.x to not even try to connect to other when it first one fails. This PR makes driver to resolve unresolved endpoint, instead of leaving to to netty. Making it possible to connect to any ip address from DNS contact endpoint. --- .../core/auth/PlainTextAuthProviderBase.java | 8 ++ .../driver/api/core/metadata/EndPoint.java | 8 ++ .../driver/internal/core/ContactPoints.java | 14 +-- .../core/metadata/DefaultEndPoint.java | 8 ++ .../core/metadata/InitialNodeListRefresh.java | 28 +++-- .../core/metadata/MetadataManager.java | 44 ++++++-- .../internal/core/metadata/SniEndPoint.java | 8 ++ .../core/metadata/UnresolvedEndPoint.java | 101 ++++++++++++++++++ .../internal/core/ContactPointsTest.java | 5 +- .../core/channel/EmbeddedEndPoint.java | 7 ++ .../internal/core/channel/LocalEndPoint.java | 8 ++ .../driver/core/resolver/MockResolverIT.java | 52 +++++++-- 12 files changed, 253 insertions(+), 38 deletions(-) create mode 100644 core/src/main/java/com/datastax/oss/driver/internal/core/metadata/UnresolvedEndPoint.java diff --git a/core/src/main/java/com/datastax/oss/driver/api/core/auth/PlainTextAuthProviderBase.java b/core/src/main/java/com/datastax/oss/driver/api/core/auth/PlainTextAuthProviderBase.java index fb85797af9e..68f74ae3825 100644 --- a/core/src/main/java/com/datastax/oss/driver/api/core/auth/PlainTextAuthProviderBase.java +++ b/core/src/main/java/com/datastax/oss/driver/api/core/auth/PlainTextAuthProviderBase.java @@ -21,6 +21,7 @@ import com.datastax.oss.driver.api.core.metadata.EndPoint; import com.datastax.oss.driver.api.core.session.Session; import com.datastax.oss.driver.shaded.guava.common.base.Charsets; +import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableList; import edu.umd.cs.findbugs.annotations.NonNull; import edu.umd.cs.findbugs.annotations.Nullable; import java.net.InetSocketAddress; @@ -29,6 +30,7 @@ import java.nio.CharBuffer; import java.nio.charset.StandardCharsets; import java.util.Arrays; +import java.util.List; import java.util.Objects; import net.jcip.annotations.ThreadSafe; import org.slf4j.Logger; @@ -171,6 +173,12 @@ public SocketAddress resolve() { return new InetSocketAddress("127.0.0.1", 9042); } + @NonNull + @Override + public List resolveAll() { + return ImmutableList.of(this); + } + @NonNull @Override public String asMetricPrefix() { diff --git a/core/src/main/java/com/datastax/oss/driver/api/core/metadata/EndPoint.java b/core/src/main/java/com/datastax/oss/driver/api/core/metadata/EndPoint.java index 530f2ad38ac..907d2265c2e 100644 --- a/core/src/main/java/com/datastax/oss/driver/api/core/metadata/EndPoint.java +++ b/core/src/main/java/com/datastax/oss/driver/api/core/metadata/EndPoint.java @@ -20,6 +20,7 @@ import edu.umd.cs.findbugs.annotations.NonNull; import java.net.InetSocketAddress; import java.net.SocketAddress; +import java.util.List; /** * Encapsulates the information needed to open connections to a node. @@ -40,6 +41,13 @@ public interface EndPoint { @NonNull SocketAddress resolve(); + /** + * Resolves this instance to a list of {@link EndPoint}. + * + *

This is called occasionally to resolve unresolved endpoints to their resolved counterparts. + */ + @NonNull + List resolveAll(); /** * Returns an alternate string representation for use in node-level metric names. * diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/ContactPoints.java b/core/src/main/java/com/datastax/oss/driver/internal/core/ContactPoints.java index 1ed2a1cebf3..821b3ca2f79 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/ContactPoints.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/ContactPoints.java @@ -19,6 +19,7 @@ import com.datastax.oss.driver.api.core.metadata.EndPoint; import com.datastax.oss.driver.internal.core.metadata.DefaultEndPoint; +import com.datastax.oss.driver.internal.core.metadata.UnresolvedEndPoint; import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableSet; import com.datastax.oss.driver.shaded.guava.common.collect.Sets; import java.net.InetAddress; @@ -41,18 +42,17 @@ public static Set merge( Set result = Sets.newHashSet(programmaticContactPoints); for (String spec : configContactPoints) { - for (InetSocketAddress address : extract(spec, resolve)) { - DefaultEndPoint endPoint = new DefaultEndPoint(address); + for (EndPoint endPoint : extract(spec, resolve)) { boolean wasNew = result.add(endPoint); if (!wasNew) { - LOG.warn("Duplicate contact point {}", address); + LOG.warn("Duplicate contact point {}", endPoint); } } } return ImmutableSet.copyOf(result); } - private static Set extract(String spec, boolean resolve) { + private static Set extract(String spec, boolean resolve) { int separator = spec.lastIndexOf(':'); if (separator < 0) { LOG.warn("Ignoring invalid contact point {} (expecting host:port)", spec); @@ -69,7 +69,7 @@ private static Set extract(String spec, boolean resolve) { return Collections.emptySet(); } if (!resolve) { - return ImmutableSet.of(InetSocketAddress.createUnresolved(host, port)); + return ImmutableSet.of(new UnresolvedEndPoint(host, port)); } else { try { InetAddress[] inetAddresses = InetAddress.getAllByName(host); @@ -79,9 +79,9 @@ private static Set extract(String spec, boolean resolve) { spec, Arrays.deepToString(inetAddresses)); } - Set result = new HashSet<>(); + Set result = new HashSet<>(); for (InetAddress inetAddress : inetAddresses) { - result.add(new InetSocketAddress(inetAddress, port)); + result.add(new DefaultEndPoint(new InetSocketAddress(inetAddress, port))); } return result; } catch (UnknownHostException e) { diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/DefaultEndPoint.java b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/DefaultEndPoint.java index 7ffbee8e4bb..905c8c9f16b 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/DefaultEndPoint.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/DefaultEndPoint.java @@ -18,9 +18,11 @@ package com.datastax.oss.driver.internal.core.metadata; import com.datastax.oss.driver.api.core.metadata.EndPoint; +import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableList; import edu.umd.cs.findbugs.annotations.NonNull; import java.io.Serializable; import java.net.InetSocketAddress; +import java.util.List; import java.util.Objects; public class DefaultEndPoint implements EndPoint, Serializable { @@ -41,6 +43,12 @@ public InetSocketAddress resolve() { return address; } + @NonNull + @Override + public List resolveAll() { + return ImmutableList.of(this); + } + @Override public boolean equals(Object other) { if (other == this) { diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/InitialNodeListRefresh.java b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/InitialNodeListRefresh.java index c21d5d8171e..1d17d334378 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/InitialNodeListRefresh.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/InitialNodeListRefresh.java @@ -72,20 +72,26 @@ public Result compute( + "keeping only the first one", logPrefix, hostId); + continue; + } + EndPoint endPoint = nodeInfo.getEndPoint(); + DefaultNode node = findIn(contactPoints, endPoint); + if (node == null) { + node = new DefaultNode(endPoint, context); + LOG.debug("[{}] Adding new node {}", logPrefix, node); } else { - EndPoint endPoint = nodeInfo.getEndPoint(); - DefaultNode node = findIn(contactPoints, endPoint); - if (node == null) { - node = new DefaultNode(endPoint, context); - LOG.debug("[{}] Adding new node {}", logPrefix, node); - } else { - LOG.debug("[{}] Copying contact point {}", logPrefix, node); - } - if (tokenMapEnabled && tokenFactory == null && nodeInfo.getPartitioner() != null) { + LOG.debug("[{}] Copying contact point {}", logPrefix, node); + } + copyInfos(nodeInfo, node, context); + newNodes.put(hostId, node); + } + + if (tokenMapEnabled) { + for (NodeInfo nodeInfo : nodeInfos) { + if (nodeInfo.getPartitioner() != null) { tokenFactory = tokenFactoryRegistry.tokenFactoryFor(nodeInfo.getPartitioner()); + break; } - copyInfos(nodeInfo, node, context); - newNodes.put(hostId, node); } } diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/MetadataManager.java b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/MetadataManager.java index 7aa2fb13bcd..7e95bf80d6e 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/MetadataManager.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/MetadataManager.java @@ -55,6 +55,8 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; +import java.util.concurrent.CopyOnWriteArraySet; +import jnr.ffi.annotations.Synchronized; import net.jcip.annotations.ThreadSafe; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -80,7 +82,8 @@ public class MetadataManager implements AsyncAutoCloseable { private volatile KeyspaceFilter keyspaceFilter; private volatile Boolean schemaEnabledProgrammatically; private volatile boolean tokenMapEnabled; - private volatile Set contactPoints; + private volatile Set contactPoints; + private volatile Set resolvedContactPoints; private volatile boolean wasImplicitContactPoint; private volatile TypeCodec tabletPayloadCodec = null; @@ -102,7 +105,7 @@ protected MetadataManager(InternalDriverContext context, DefaultMetadata initial DefaultDriverOption.METADATA_SCHEMA_REFRESHED_KEYSPACES, Collections.emptyList()); this.keyspaceFilter = KeyspaceFilter.newInstance(logPrefix, refreshedKeyspaces); this.tokenMapEnabled = config.getBoolean(DefaultDriverOption.METADATA_TOKEN_MAP_ENABLED); - + this.resolvedContactPoints = new CopyOnWriteArraySet<>(); context.getEventBus().register(ConfigChangeEvent.class, this::onConfigChanged); } @@ -145,18 +148,19 @@ public void addContactPoints(Set providedContactPoints) { // Convert the EndPoints to Nodes, but we can't put them into the Metadata yet, because we // don't know their host_id. So store them in a volatile field instead, they will get copied // during the first node refresh. - ImmutableSet.Builder contactPointsBuilder = ImmutableSet.builder(); + ImmutableSet.Builder contactPointsBuilder = ImmutableSet.builder(); if (providedContactPoints == null || providedContactPoints.isEmpty()) { LOG.info( "[{}] No contact points provided, defaulting to {}", logPrefix, DEFAULT_CONTACT_POINT); this.wasImplicitContactPoint = true; - contactPointsBuilder.add(new DefaultNode(DEFAULT_CONTACT_POINT, context)); + contactPointsBuilder.add(DEFAULT_CONTACT_POINT); } else { for (EndPoint endPoint : providedContactPoints) { - contactPointsBuilder.add(new DefaultNode(endPoint, context)); + contactPointsBuilder.add(endPoint); } } this.contactPoints = contactPointsBuilder.build(); + this.resolveContactPoints(); LOG.debug("[{}] Adding initial contact points {}", logPrefix, contactPoints); } @@ -167,7 +171,30 @@ public void addContactPoints(Set providedContactPoints) { * @see #wasImplicitContactPoint() */ public Set getContactPoints() { - return contactPoints; + return resolvedContactPoints; + } + + public synchronized void resolveContactPoints() { + ImmutableSet.Builder resultBuilder = ImmutableSet.builder(); + for (EndPoint endPoint : contactPoints) { + List resolveEndpoints = endPoint.resolveAll(); + if (resolveEndpoints.isEmpty()) { + LOG.error("failed to resolve contact endpoint {}", endPoint); + } else { + resultBuilder.addAll(resolveEndpoints); + } + } + + Set result = resultBuilder.build(); + for (EndPoint endPoint : result) { + if (resolvedContactPoints.stream() + .anyMatch(resolved -> resolved.getEndPoint().equals(endPoint))) { + continue; + } + this.resolvedContactPoints.add(new DefaultNode(endPoint, context)); + } + + this.resolvedContactPoints.removeIf(endPoint -> !result.contains(endPoint.getEndPoint())); } /** Whether the default contact point was used (because none were provided explicitly). */ @@ -337,10 +364,13 @@ private SingleThreaded(InternalDriverContext context, DriverExecutionProfile con } private Void refreshNodes(Iterable nodeInfos) { + if (!didFirstNodeListRefresh) { + resolveContactPoints(); + } MetadataRefresh refresh = didFirstNodeListRefresh ? new FullNodeListRefresh(nodeInfos) - : new InitialNodeListRefresh(nodeInfos, contactPoints); + : new InitialNodeListRefresh(nodeInfos, resolvedContactPoints); didFirstNodeListRefresh = true; return apply(refresh); } diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/SniEndPoint.java b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/SniEndPoint.java index ace4e82617d..f5085df5ab8 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/SniEndPoint.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/SniEndPoint.java @@ -18,6 +18,7 @@ package com.datastax.oss.driver.internal.core.metadata; import com.datastax.oss.driver.api.core.metadata.EndPoint; +import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableList; import com.datastax.oss.driver.shaded.guava.common.primitives.UnsignedBytes; import edu.umd.cs.findbugs.annotations.NonNull; import java.net.InetAddress; @@ -25,6 +26,7 @@ import java.net.UnknownHostException; import java.util.Arrays; import java.util.Comparator; +import java.util.List; import java.util.Objects; import java.util.concurrent.atomic.AtomicLong; @@ -72,6 +74,12 @@ public InetSocketAddress resolve() { } } + @NonNull + @Override + public List resolveAll() { + return ImmutableList.of(this); + } + @Override public boolean equals(Object other) { if (other == this) { diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/UnresolvedEndPoint.java b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/UnresolvedEndPoint.java new file mode 100644 index 00000000000..d55ae047951 --- /dev/null +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/UnresolvedEndPoint.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datastax.oss.driver.internal.core.metadata; + +import com.datastax.oss.driver.api.core.metadata.EndPoint; +import edu.umd.cs.findbugs.annotations.NonNull; +import java.io.Serializable; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +public class UnresolvedEndPoint implements EndPoint, Serializable { + private final String metricPrefix; + String host; + int port; + + private final List EMPTY = new ArrayList<>(); + + public UnresolvedEndPoint(String host, int port) { + this.host = host; + this.port = port; + this.metricPrefix = buildMetricPrefix(host, port); + } + + @NonNull + @Override + public SocketAddress resolve() { + throw new RuntimeException( + String.format( + "This endpoint %s should never been resolved, but it happened, it somehow leaked to downstream code.", + this)); + } + + @NonNull + @Override + public List resolveAll() { + try { + InetAddress[] inetAddresses = InetAddress.getAllByName(host); + Set result = new HashSet<>(); + for (InetAddress inetAddress : inetAddresses) { + result.add(new DefaultEndPoint(new InetSocketAddress(inetAddress, port))); + } + return new ArrayList<>(result); + } catch (UnknownHostException e) { + return EMPTY; + } + } + + @Override + public boolean equals(Object other) { + if (other == this) { + return true; + } + if (other instanceof UnresolvedEndPoint) { + UnresolvedEndPoint that = (UnresolvedEndPoint) other; + return this.host.equals(that.host) && this.port == that.port; + } + return false; + } + + @Override + public int hashCode() { + return host.toLowerCase().hashCode() + port; + } + + @Override + public String toString() { + return host + ":" + port; + } + + @NonNull + @Override + public String asMetricPrefix() { + return metricPrefix; + } + + private static String buildMetricPrefix(String host, int port) { + // Append the port since Cassandra 4 supports nodes with different ports + return host.replace('.', '_') + ':' + port; + } +} diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/ContactPointsTest.java b/core/src/test/java/com/datastax/oss/driver/internal/core/ContactPointsTest.java index 9e0d8737619..81093d35134 100644 --- a/core/src/test/java/com/datastax/oss/driver/internal/core/ContactPointsTest.java +++ b/core/src/test/java/com/datastax/oss/driver/internal/core/ContactPointsTest.java @@ -29,6 +29,7 @@ import ch.qos.logback.core.Appender; import com.datastax.oss.driver.api.core.metadata.EndPoint; import com.datastax.oss.driver.internal.core.metadata.DefaultEndPoint; +import com.datastax.oss.driver.internal.core.metadata.UnresolvedEndPoint; import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableList; import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableSet; import java.net.InetAddress; @@ -94,9 +95,7 @@ public void should_parse_host_and_port_in_configuration_and_create_unresolved() Set endPoints = ContactPoints.merge(Collections.emptySet(), ImmutableList.of("localhost:9042"), false); - assertThat(endPoints) - .containsExactly( - new DefaultEndPoint(InetSocketAddress.createUnresolved("localhost", 9042))); + assertThat(endPoints).containsExactly(new UnresolvedEndPoint("localhost", 9042)); } @Test diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/channel/EmbeddedEndPoint.java b/core/src/test/java/com/datastax/oss/driver/internal/core/channel/EmbeddedEndPoint.java index 5e463299a66..0d9895374d3 100644 --- a/core/src/test/java/com/datastax/oss/driver/internal/core/channel/EmbeddedEndPoint.java +++ b/core/src/test/java/com/datastax/oss/driver/internal/core/channel/EmbeddedEndPoint.java @@ -20,6 +20,7 @@ import com.datastax.oss.driver.api.core.metadata.EndPoint; import edu.umd.cs.findbugs.annotations.NonNull; import java.net.SocketAddress; +import java.util.List; /** Endpoint implementation for unit tests that use an embedded Netty channel. */ public class EmbeddedEndPoint implements EndPoint { @@ -30,6 +31,12 @@ public SocketAddress resolve() { throw new UnsupportedOperationException("This should not get called from unit tests"); } + @NonNull + @Override + public List resolveAll() { + throw new UnsupportedOperationException("This should not get called from unit tests"); + } + @NonNull @Override public String asMetricPrefix() { diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/channel/LocalEndPoint.java b/core/src/test/java/com/datastax/oss/driver/internal/core/channel/LocalEndPoint.java index c90731eece9..8836ae607d7 100644 --- a/core/src/test/java/com/datastax/oss/driver/internal/core/channel/LocalEndPoint.java +++ b/core/src/test/java/com/datastax/oss/driver/internal/core/channel/LocalEndPoint.java @@ -18,9 +18,11 @@ package com.datastax.oss.driver.internal.core.channel; import com.datastax.oss.driver.api.core.metadata.EndPoint; +import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableList; import edu.umd.cs.findbugs.annotations.NonNull; import io.netty.channel.local.LocalAddress; import java.net.SocketAddress; +import java.util.List; /** Endpoint implementation for unit tests that use the local Netty transport. */ public class LocalEndPoint implements EndPoint { @@ -37,6 +39,12 @@ public SocketAddress resolve() { return localAddress; } + @NonNull + @Override + public List resolveAll() { + return ImmutableList.of(this); + } + @NonNull @Override public String asMetricPrefix() { diff --git a/integration-tests/src/test/java/com/datastax/oss/driver/core/resolver/MockResolverIT.java b/integration-tests/src/test/java/com/datastax/oss/driver/core/resolver/MockResolverIT.java index 93ecbf1815c..e30afd56293 100644 --- a/integration-tests/src/test/java/com/datastax/oss/driver/core/resolver/MockResolverIT.java +++ b/integration-tests/src/test/java/com/datastax/oss/driver/core/resolver/MockResolverIT.java @@ -24,8 +24,6 @@ package com.datastax.oss.driver.core.resolver; import static org.assertj.core.api.Assertions.assertThat; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import com.datastax.oss.driver.api.core.CqlSession; @@ -40,7 +38,7 @@ import com.datastax.oss.driver.api.testinfra.ccm.CcmBridge; import com.datastax.oss.driver.categories.IsolatedTests; import com.datastax.oss.driver.internal.core.config.typesafe.DefaultProgrammaticDriverConfigLoaderBuilder; -import java.net.InetSocketAddress; + import java.time.Duration; import java.util.Collection; import java.util.Collections; @@ -48,6 +46,7 @@ import java.util.List; import java.util.Set; import java.util.stream.Collectors; + import org.junit.Test; import org.junit.experimental.categories.Category; import org.slf4j.Logger; @@ -96,9 +95,6 @@ public void should_connect_with_mocked_hostname() { .filter(x -> x.toString().contains("test.cluster.fake")) .collect(Collectors.toSet()); assertThat(filteredNodes).hasSize(1); - InetSocketAddress address = - (InetSocketAddress) filteredNodes.iterator().next().getEndPoint().resolve(); - assertTrue(address.isUnresolved()); } } } @@ -119,7 +115,7 @@ public void replace_cluster_test() { CqlSession session; try (CcmBridge ccmBridge = - CcmBridge.builder().withNodes(numberOfNodes).withIpPrefix("127.0.1.").build()) { + CcmBridge.builder().withNodes(numberOfNodes).withIpPrefix("127.0.1.").build()) { MultimapHostResolverProvider.removeResolverEntries("test.cluster.fake"); MultimapHostResolverProvider.addResolverEntry( "test.cluster.fake", ccmBridge.getNodeIpAddress(1)); @@ -172,10 +168,10 @@ public void replace_cluster_test() { nodes.stream() .filter(x -> x.toString().contains("test.cluster.fake")) .collect(Collectors.toSet()); - assertThat(filteredNodes).hasSize(1); + assertThat(filteredNodes).hasSize(3); } try (CcmBridge ccmBridge = - CcmBridge.builder().withNodes(numberOfNodes).withIpPrefix("127.0.1.").build()) { + CcmBridge.builder().withNodes(numberOfNodes).withIpPrefix("127.0.1.").build()) { ccmBridge.create(); ccmBridge.start(); boolean allNodesUp = false; @@ -320,7 +316,7 @@ public void cannot_reconnect_with_resolved_socket() { "Launching another cluster until we lose resolved socket from metadata (run {}).", counter); try (CcmBridge ccmBridge = - CcmBridge.builder().withNodes(3).withIpPrefix("127.0." + counter + ".").build()) { + CcmBridge.builder().withNodes(3).withIpPrefix("127.0." + counter + ".").build()) { MultimapHostResolverProvider.removeResolverEntries("test.cluster.fake"); MultimapHostResolverProvider.addResolverEntry( "test.cluster.fake", ccmBridge.getNodeIpAddress(1)); @@ -413,4 +409,40 @@ public void cannot_reconnect_with_resolved_socket() { } session.close(); } + + @Test + public void should_connect_when_first_node_is_unavailable() { + // Reproduce case when dns first record points to the node that is unresponsive + // With RESOLVE_CONTACT_POINTS set to false + DriverConfigLoader loader = + new DefaultProgrammaticDriverConfigLoaderBuilder() + .withBoolean(TypedDriverOption.RESOLVE_CONTACT_POINTS.getRawOption(), false) + .withBoolean(TypedDriverOption.RECONNECT_ON_INIT.getRawOption(), true) + .withStringList( + TypedDriverOption.CONTACT_POINTS.getRawOption(), + Collections.singletonList("test.cluster.fake:9042")) + .build(); + + CqlSessionBuilder builder = new CqlSessionBuilder().withConfigLoader(loader); + CqlSession session; + try (CcmBridge ccmBridge = CcmBridge.builder().withNodes(3).withIpPrefix("127.0.1.").build()) { + MultimapHostResolverProvider.removeResolverEntries("test.cluster.fake"); + MultimapHostResolverProvider.addResolverEntry( + "test.cluster.fake", ccmBridge.getNodeIpAddress(11)); + MultimapHostResolverProvider.addResolverEntry( + "test.cluster.fake", ccmBridge.getNodeIpAddress(2)); + MultimapHostResolverProvider.addResolverEntry( + "test.cluster.fake", ccmBridge.getNodeIpAddress(3)); + ccmBridge.create(); + ccmBridge.start(); + session = builder.build(); + SimpleStatement statement = + new SimpleStatementBuilder("SELECT * FROM system.local") + .setTimeout(Duration.ofSeconds(3)) + .build(); + session.execute(statement); + ccmBridge.stop(2); + session.execute(statement); + } + } }