Skip to content

Commit

Permalink
Connect to all DNS records of unresolved endpoint
Browse files Browse the repository at this point in the history
netty bootstrap.connect uses only first address of unresolved InetSocketAddress.
That causes 4.x to not even try to connect to other when it first one fails.

This PR makes driver to resolve unresolved endpoint, instead of leaving
to to netty.
Making it possible to connect to any ip address from DNS contact endpoint.
  • Loading branch information
dkropachev committed Oct 8, 2024
1 parent d69bdd4 commit f937a30
Show file tree
Hide file tree
Showing 12 changed files with 272 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,17 @@
import com.datastax.oss.driver.api.core.metadata.EndPoint;
import com.datastax.oss.driver.api.core.session.Session;
import com.datastax.oss.driver.shaded.guava.common.base.Charsets;
import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableList;
import edu.umd.cs.findbugs.annotations.NonNull;
import edu.umd.cs.findbugs.annotations.Nullable;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import net.jcip.annotations.ThreadSafe;
import org.slf4j.Logger;
Expand Down Expand Up @@ -171,6 +174,12 @@ public SocketAddress resolve() {
return new InetSocketAddress("127.0.0.1", 9042);
}

@NonNull
@Override
public List<EndPoint> resolveAll() {
return ImmutableList.of(this);
}

@NonNull
@Override
public String asMetricPrefix() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import edu.umd.cs.findbugs.annotations.NonNull;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.UnknownHostException;
import java.util.List;

/**
* Encapsulates the information needed to open connections to a node.
Expand All @@ -40,6 +42,14 @@ public interface EndPoint {
@NonNull
SocketAddress resolve();

/**
* Resolves this instance to a socket address.
*
* <p>This will be called each time the driver opens a new connection to the node. The returned
* address cannot be null.
*/
@NonNull
List<EndPoint> resolveAll();
/**
* Returns an alternate string representation for use in node-level metric names.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import com.datastax.oss.driver.api.core.metadata.EndPoint;
import com.datastax.oss.driver.internal.core.metadata.DefaultEndPoint;
import com.datastax.oss.driver.internal.core.metadata.UnresolvedEndPoint;
import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableSet;
import com.datastax.oss.driver.shaded.guava.common.collect.Sets;
import java.net.InetAddress;
Expand All @@ -41,18 +42,17 @@ public static Set<EndPoint> merge(

Set<EndPoint> result = Sets.newHashSet(programmaticContactPoints);
for (String spec : configContactPoints) {
for (InetSocketAddress address : extract(spec, resolve)) {
DefaultEndPoint endPoint = new DefaultEndPoint(address);
for (EndPoint endPoint : extract(spec, resolve)) {
boolean wasNew = result.add(endPoint);
if (!wasNew) {
LOG.warn("Duplicate contact point {}", address);
LOG.warn("Duplicate contact point {}", endPoint);
}
}
}
return ImmutableSet.copyOf(result);
}

private static Set<InetSocketAddress> extract(String spec, boolean resolve) {
private static Set<EndPoint> extract(String spec, boolean resolve) {
int separator = spec.lastIndexOf(':');
if (separator < 0) {
LOG.warn("Ignoring invalid contact point {} (expecting host:port)", spec);
Expand All @@ -69,7 +69,7 @@ private static Set<InetSocketAddress> extract(String spec, boolean resolve) {
return Collections.emptySet();
}
if (!resolve) {
return ImmutableSet.of(InetSocketAddress.createUnresolved(host, port));
return ImmutableSet.of(new UnresolvedEndPoint(host, port));
} else {
try {
InetAddress[] inetAddresses = InetAddress.getAllByName(host);
Expand All @@ -79,9 +79,9 @@ private static Set<InetSocketAddress> extract(String spec, boolean resolve) {
spec,
Arrays.deepToString(inetAddresses));
}
Set<InetSocketAddress> result = new HashSet<>();
Set<EndPoint> result = new HashSet<>();
for (InetAddress inetAddress : inetAddresses) {
result.add(new InetSocketAddress(inetAddress, port));
result.add(new DefaultEndPoint(new InetSocketAddress(inetAddress, port)));
}
return result;
} catch (UnknownHostException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
package com.datastax.oss.driver.internal.core.metadata;

import com.datastax.oss.driver.api.core.metadata.EndPoint;
import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableList;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.io.Serializable;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Objects;

public class DefaultEndPoint implements EndPoint, Serializable {
Expand All @@ -41,6 +43,12 @@ public InetSocketAddress resolve() {
return address;
}

@NonNull
@Override
public List<EndPoint> resolveAll() {
return ImmutableList.of(this);
}

@Override
public boolean equals(Object other) {
if (other == this) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,20 +72,26 @@ public Result compute(
+ "keeping only the first one",
logPrefix,
hostId);
continue;
}
EndPoint endPoint = nodeInfo.getEndPoint();
DefaultNode node = findIn(contactPoints, endPoint);
if (node == null) {
node = new DefaultNode(endPoint, context);
LOG.debug("[{}] Adding new node {}", logPrefix, node);
} else {
EndPoint endPoint = nodeInfo.getEndPoint();
DefaultNode node = findIn(contactPoints, endPoint);
if (node == null) {
node = new DefaultNode(endPoint, context);
LOG.debug("[{}] Adding new node {}", logPrefix, node);
} else {
LOG.debug("[{}] Copying contact point {}", logPrefix, node);
}
if (tokenMapEnabled && tokenFactory == null && nodeInfo.getPartitioner() != null) {
LOG.debug("[{}] Copying contact point {}", logPrefix, node);
}
copyInfos(nodeInfo, node, context);
newNodes.put(hostId, node);
}

if (tokenMapEnabled) {
for (NodeInfo nodeInfo : nodeInfos) {
if (nodeInfo.getPartitioner() != null) {
tokenFactory = tokenFactoryRegistry.tokenFactoryFor(nodeInfo.getPartitioner());
break;
}
copyInfos(nodeInfo, node, context);
newNodes.put(hostId, node);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,19 +47,26 @@
import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableSet;
import edu.umd.cs.findbugs.annotations.NonNull;
import io.netty.util.concurrent.EventExecutor;

import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CopyOnWriteArraySet;

import jnr.ffi.annotations.Synchronized;
import net.jcip.annotations.ThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** Holds the immutable instance of the {@link Metadata}, and handles requests to update it. */
/**
* Holds the immutable instance of the {@link Metadata}, and handles requests to update it.
*/
@ThreadSafe
public class MetadataManager implements AsyncAutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(MetadataManager.class);
Expand All @@ -80,7 +87,8 @@ public class MetadataManager implements AsyncAutoCloseable {
private volatile KeyspaceFilter keyspaceFilter;
private volatile Boolean schemaEnabledProgrammatically;
private volatile boolean tokenMapEnabled;
private volatile Set<DefaultNode> contactPoints;
private volatile Set<EndPoint> contactPoints;
private volatile Set<DefaultNode> resolvedContactPoints;
private volatile boolean wasImplicitContactPoint;
private volatile TypeCodec<TupleValue> tabletPayloadCodec = null;

Expand All @@ -102,7 +110,7 @@ protected MetadataManager(InternalDriverContext context, DefaultMetadata initial
DefaultDriverOption.METADATA_SCHEMA_REFRESHED_KEYSPACES, Collections.emptyList());
this.keyspaceFilter = KeyspaceFilter.newInstance(logPrefix, refreshedKeyspaces);
this.tokenMapEnabled = config.getBoolean(DefaultDriverOption.METADATA_TOKEN_MAP_ENABLED);

this.resolvedContactPoints = new CopyOnWriteArraySet<>();
context.getEventBus().register(ConfigChangeEvent.class, this::onConfigChanged);
}

Expand All @@ -119,8 +127,8 @@ private void onConfigChanged(@SuppressWarnings("unused") ConfigChangeEvent event
this.tokenMapEnabled = config.getBoolean(DefaultDriverOption.METADATA_TOKEN_MAP_ENABLED);

if ((!schemaEnabledBefore
|| !keyspacesBefore.equals(refreshedKeyspaces)
|| (!tokenMapEnabledBefore && tokenMapEnabled))
|| !keyspacesBefore.equals(refreshedKeyspaces)
|| (!tokenMapEnabledBefore && tokenMapEnabled))
&& isSchemaEnabled()) {
refreshSchema(null, false, true)
.whenComplete(
Expand All @@ -145,18 +153,19 @@ public void addContactPoints(Set<EndPoint> providedContactPoints) {
// Convert the EndPoints to Nodes, but we can't put them into the Metadata yet, because we
// don't know their host_id. So store them in a volatile field instead, they will get copied
// during the first node refresh.
ImmutableSet.Builder<DefaultNode> contactPointsBuilder = ImmutableSet.builder();
ImmutableSet.Builder<EndPoint> contactPointsBuilder = ImmutableSet.builder();
if (providedContactPoints == null || providedContactPoints.isEmpty()) {
LOG.info(
"[{}] No contact points provided, defaulting to {}", logPrefix, DEFAULT_CONTACT_POINT);
this.wasImplicitContactPoint = true;
contactPointsBuilder.add(new DefaultNode(DEFAULT_CONTACT_POINT, context));
contactPointsBuilder.add(DEFAULT_CONTACT_POINT);
} else {
for (EndPoint endPoint : providedContactPoints) {
contactPointsBuilder.add(new DefaultNode(endPoint, context));
contactPointsBuilder.add(endPoint);
}
}
this.contactPoints = contactPointsBuilder.build();
this.resolveContactPoints();
LOG.debug("[{}] Adding initial contact points {}", logPrefix, contactPoints);
}

Expand All @@ -167,10 +176,36 @@ public void addContactPoints(Set<EndPoint> providedContactPoints) {
* @see #wasImplicitContactPoint()
*/
public Set<DefaultNode> getContactPoints() {
return contactPoints;
return resolvedContactPoints;
}

/** Whether the default contact point was used (because none were provided explicitly). */
@Synchronized
public void resolveContactPoints() {
ImmutableSet.Builder<EndPoint> resultBuilder = ImmutableSet.builder();
for (EndPoint endPoint : contactPoints) {
List<EndPoint> resolveEndpoints = endPoint.resolveAll();
if (!resolveEndpoints.isEmpty()) {
LOG.error("failed to resolve contact endpoint {}", endPoint);
} else {
resultBuilder.addAll(resolveEndpoints);
}
}

Set<EndPoint> result = resultBuilder.build();
for (EndPoint endPoint : result) {
if (resolvedContactPoints.stream()
.anyMatch(resolved -> resolved.getEndPoint().equals(endPoint))) {
continue;
}
this.resolvedContactPoints.add(new DefaultNode(endPoint, context));
}

this.resolvedContactPoints.removeIf(endPoint -> !result.contains(endPoint.getEndPoint()));
}

/**
* Whether the default contact point was used (because none were provided explicitly).
*/
public boolean wasImplicitContactPoint() {
return wasImplicitContactPoint;
}
Expand Down Expand Up @@ -230,12 +265,12 @@ public void removeNode(InetSocketAddress broadcastRpcAddress) {
}

/**
* @param keyspace if this refresh was triggered by an event, that event's keyspace, otherwise
* null (this is only used to discard the event if it targets a keyspace that we're ignoring)
* @param keyspace if this refresh was triggered by an event, that event's keyspace, otherwise
* null (this is only used to discard the event if it targets a keyspace that we're ignoring)
* @param evenIfDisabled force the refresh even if schema is currently disabled (used for user
* request)
* @param flushNow bypass the debouncer and force an immediate refresh (used to avoid a delay at
* startup)
* request)
* @param flushNow bypass the debouncer and force an immediate refresh (used to avoid a delay at
* startup)
*/
public CompletionStage<RefreshSchemaResult> refreshSchema(
String keyspace, boolean evenIfDisabled, boolean flushNow) {
Expand Down Expand Up @@ -311,7 +346,7 @@ private class SingleThreaded {
private boolean closeWasCalled;
private final CompletableFuture<Void> firstSchemaRefreshFuture = new CompletableFuture<>();
private final Debouncer<
CompletableFuture<RefreshSchemaResult>, CompletableFuture<RefreshSchemaResult>>
CompletableFuture<RefreshSchemaResult>, CompletableFuture<RefreshSchemaResult>>
schemaRefreshDebouncer;
private final SchemaQueriesFactory schemaQueriesFactory;
private final SchemaParserFactory schemaParserFactory;
Expand All @@ -337,10 +372,13 @@ private SingleThreaded(InternalDriverContext context, DriverExecutionProfile con
}

private Void refreshNodes(Iterable<NodeInfo> nodeInfos) {
if (!didFirstNodeListRefresh) {
resolveContactPoints();
}
MetadataRefresh refresh =
didFirstNodeListRefresh
? new FullNodeListRefresh(nodeInfos)
: new InitialNodeListRefresh(nodeInfos, contactPoints);
: new InitialNodeListRefresh(nodeInfos, resolvedContactPoints);
didFirstNodeListRefresh = true;
return apply(refresh);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@
package com.datastax.oss.driver.internal.core.metadata;

import com.datastax.oss.driver.api.core.metadata.EndPoint;
import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableList;
import com.datastax.oss.driver.shaded.guava.common.primitives.UnsignedBytes;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicLong;

Expand Down Expand Up @@ -72,6 +74,12 @@ public InetSocketAddress resolve() {
}
}

@NonNull
@Override
public List<EndPoint> resolveAll() {
return ImmutableList.of(this);
}

@Override
public boolean equals(Object other) {
if (other == this) {
Expand Down
Loading

0 comments on commit f937a30

Please sign in to comment.