From b7b7c970a811ac09baff10911f8bc291a0ce7395 Mon Sep 17 00:00:00 2001 From: Dmitry Kropachev Date: Thu, 26 Sep 2024 00:18:01 -0400 Subject: [PATCH] Add more information to NoNodeAvailableException and AllNodesFailedException Sometimes users see these exceptions on production in elusive rare cases. Since it is PROD it is imposible to enable DEBUG logs. In order to debug these cases we need these errors to contain all the information that can help to investigate what is happening. This information needs to be as recent as it is possible and be collected as close as possible to the place when it is being read by driver. --- .../ContinuousRequestHandlerBase.java | 7 +- .../core/graph/GraphRequestHandler.java | 13 +-- .../api/core/AllNodesFailedException.java | 61 +++++++++++-- .../api/core/NoNodeAvailableException.java | 20 ++++- .../api/core/config/DefaultDriverOption.java | 6 ++ .../driver/api/core/config/OptionsMap.java | 1 + .../api/core/config/TypedDriverOption.java | 5 ++ .../core/control/ControlConnection.java | 7 +- .../internal/core/cql/CqlPrepareHandler.java | 7 +- .../internal/core/cql/CqlRequestHandler.java | 8 +- .../BasicLoadBalancingPolicy.java | 65 +++++++++++++- .../DefaultLoadBalancingPolicy.java | 60 ++++++++++++- .../nodeset/DcAgnosticNodeSet.java | 34 +++++++ .../nodeset/LazyCopyQueryPlan.java | 82 +++++++++++++++++ .../loadbalancing/nodeset/MultiDcNodeSet.java | 42 +++++++++ .../core/loadbalancing/nodeset/NodeSet.java | 3 + .../loadbalancing/nodeset/NodeSetInfo.java | 47 ++++++++++ .../nodeset/SingleDcNodeSet.java | 42 +++++++++ .../core/metadata/DefaultNodeInfo.java | 90 +++++++++++++++++++ .../metadata/LoadBalancingPolicyWrapper.java | 58 +++++++++++- .../internal/core/metadata/NodeInfo.java | 24 +++++ .../core/util/collection/DebugQueryPlan.java | 88 ++++++++++++++++++ .../core/util/collection/EmptyQueryPlan.java | 5 ++ core/src/main/resources/reference.conf | 6 ++ .../nodeset/LazyCopyQueryPlanTest.java | 54 +++++++++++ .../LoadBalancingPolicyWrapperTest.java | 5 ++ .../driver/api/testinfra/ccm/BaseCcmRule.java | 4 + .../api/testinfra/ccm/CustomCcmRule.java | 1 + 28 files changed, 817 insertions(+), 28 deletions(-) create mode 100644 core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/nodeset/LazyCopyQueryPlan.java create mode 100644 core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/nodeset/NodeSetInfo.java create mode 100644 core/src/main/java/com/datastax/oss/driver/internal/core/util/collection/DebugQueryPlan.java create mode 100644 core/src/test/java/com/datastax/oss/driver/internal/core/loadbalancing/nodeset/LazyCopyQueryPlanTest.java diff --git a/core/src/main/java/com/datastax/dse/driver/internal/core/cql/continuous/ContinuousRequestHandlerBase.java b/core/src/main/java/com/datastax/dse/driver/internal/core/cql/continuous/ContinuousRequestHandlerBase.java index 44df3b3a03d..e62935fe2f6 100644 --- a/core/src/main/java/com/datastax/dse/driver/internal/core/cql/continuous/ContinuousRequestHandlerBase.java +++ b/core/src/main/java/com/datastax/dse/driver/internal/core/cql/continuous/ContinuousRequestHandlerBase.java @@ -65,6 +65,7 @@ import com.datastax.oss.driver.internal.core.session.DefaultSession; import com.datastax.oss.driver.internal.core.session.RepreparePayload; import com.datastax.oss.driver.internal.core.util.Loggers; +import com.datastax.oss.driver.internal.core.util.collection.DebugQueryPlan; import com.datastax.oss.driver.internal.core.util.collection.SimpleQueryPlan; import com.datastax.oss.driver.shaded.guava.common.annotations.VisibleForTesting; import com.datastax.oss.protocol.internal.Frame; @@ -360,7 +361,11 @@ private void sendRequest( // We've reached the end of the query plan without finding any node to write to; abort the // continuous paging session. if (activeExecutionsCount.decrementAndGet() == 0) { - abortGlobalRequestOrChosenCallback(AllNodesFailedException.fromErrors(errors)); + if (queryPlan instanceof DebugQueryPlan) { + abortGlobalRequestOrChosenCallback(AllNodesFailedException.fromErrors(errors, queryPlan)); + } else { + abortGlobalRequestOrChosenCallback(AllNodesFailedException.fromErrors(errors)); + } } } else if (!chosenCallback.isDone()) { NodeResponseCallback nodeResponseCallback = diff --git a/core/src/main/java/com/datastax/dse/driver/internal/core/graph/GraphRequestHandler.java b/core/src/main/java/com/datastax/dse/driver/internal/core/graph/GraphRequestHandler.java index c2298458805..03ffd6b5a7d 100644 --- a/core/src/main/java/com/datastax/dse/driver/internal/core/graph/GraphRequestHandler.java +++ b/core/src/main/java/com/datastax/dse/driver/internal/core/graph/GraphRequestHandler.java @@ -60,6 +60,7 @@ import com.datastax.oss.driver.internal.core.tracker.NoopRequestTracker; import com.datastax.oss.driver.internal.core.tracker.RequestLogger; import com.datastax.oss.driver.internal.core.util.Loggers; +import com.datastax.oss.driver.internal.core.util.collection.DebugQueryPlan; import com.datastax.oss.driver.internal.core.util.collection.SimpleQueryPlan; import com.datastax.oss.protocol.internal.Frame; import com.datastax.oss.protocol.internal.Message; @@ -265,11 +266,13 @@ private void sendRequest( // We've reached the end of the query plan without finding any node to write to if (!result.isDone() && activeExecutionsCount.decrementAndGet() == 0) { // We're the last execution so fail the result - setFinalError( - statement, - AllNodesFailedException.fromErrors(this.errors), - null, - NO_SUCCESSFUL_EXECUTION); + AllNodesFailedException exception; + if (queryPlan instanceof DebugQueryPlan) { + exception = AllNodesFailedException.fromErrors(this.errors, queryPlan); + } else { + exception = AllNodesFailedException.fromErrors(this.errors); + } + setFinalError(statement, exception, null, NO_SUCCESSFUL_EXECUTION); } } else { NodeResponseCallback nodeResponseCallback = diff --git a/core/src/main/java/com/datastax/oss/driver/api/core/AllNodesFailedException.java b/core/src/main/java/com/datastax/oss/driver/api/core/AllNodesFailedException.java index b6f1bf93838..a539eae0a82 100644 --- a/core/src/main/java/com/datastax/oss/driver/api/core/AllNodesFailedException.java +++ b/core/src/main/java/com/datastax/oss/driver/api/core/AllNodesFailedException.java @@ -30,6 +30,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Queue; /** * Thrown when a query failed on all the coordinators it was tried on. This exception may wrap @@ -37,7 +38,6 @@ * exceptions}, or via {@link #getAllErrors()} where they are grouped by node. */ public class AllNodesFailedException extends DriverException { - /** @deprecated Use {@link #fromErrors(List)} instead. */ @NonNull @Deprecated @@ -45,7 +45,17 @@ public static AllNodesFailedException fromErrors(@Nullable Map if (errors == null || errors.isEmpty()) { return new NoNodeAvailableException(); } else { - return new AllNodesFailedException(groupByNode(errors)); + return new AllNodesFailedException(groupByNode(errors), null); + } + } + + @NonNull + public static AllNodesFailedException fromErrors( + @Nullable List> errors, Queue queryPlan) { + if (errors == null || errors.isEmpty()) { + return new NoNodeAvailableException(queryPlan); + } else { + return new AllNodesFailedException(groupByNode(errors), queryPlan); } } @@ -59,6 +69,7 @@ public static AllNodesFailedException fromErrors(@Nullable List> errors; + private final Queue queryPlan; /** @deprecated Use {@link #AllNodesFailedException(String, ExecutionInfo, Iterable)} instead. */ @Deprecated @@ -68,6 +79,7 @@ protected AllNodesFailedException( @NonNull Map errors) { super(message, executionInfo, null, true); this.errors = toDeepImmutableMap(groupByNode(errors)); + this.queryPlan = null; addSuppressedErrors(); } @@ -77,6 +89,18 @@ protected AllNodesFailedException( @NonNull Iterable>> errors) { super(message, executionInfo, null, true); this.errors = toDeepImmutableMap(errors); + this.queryPlan = null; + addSuppressedErrors(); + } + + protected AllNodesFailedException( + @NonNull String message, + @Nullable ExecutionInfo executionInfo, + @NonNull Iterable>> errors, + @Nullable Queue queryPlan) { + super(message, executionInfo, null, true); + this.errors = toDeepImmutableMap(errors); + this.queryPlan = queryPlan; addSuppressedErrors(); } @@ -91,12 +115,26 @@ private void addSuppressedErrors() { private AllNodesFailedException(Map> errors) { this( buildMessage( - String.format("All %d node(s) tried for the query failed", errors.size()), errors), + String.format("All %d node(s) tried for the query failed", errors.size()), + errors, + null), null, errors.entrySet()); } - private static String buildMessage(String baseMessage, Map> errors) { + private AllNodesFailedException(Map> errors, Queue queryPlan) { + this( + buildMessage( + String.format("All %d node(s) tried for the query failed", errors.size()), + errors, + queryPlan), + null, + errors.entrySet(), + queryPlan); + } + + private static String buildMessage( + String baseMessage, Map> errors, Queue queryPlan) { int limit = Math.min(errors.size(), 3); Iterator>> iterator = Iterables.limit(errors.entrySet(), limit).iterator(); @@ -108,9 +146,14 @@ private static String buildMessage(String baseMessage, Map details.append(", "); } } + if (queryPlan == null) { + return String.format( + "%s (showing first %d nodes, use getAllErrors() for more): %s", + baseMessage, limit, details); + } return String.format( - "%s (showing first %d nodes, use getAllErrors() for more): %s", - baseMessage, limit, details); + "%s\nQuery Plan: %s\n(showing first %d nodes, use getAllErrors() for more): %s", + baseMessage, queryPlan, limit, details); } /** @@ -131,6 +174,10 @@ public Map getErrors() { return builder.build(); } + protected Queue getQueryPlan() { + return this.queryPlan; + } + /** An immutable map containing all errors on each tried node. */ @NonNull public Map> getAllErrors() { @@ -146,7 +193,7 @@ public DriverException copy() { @NonNull public AllNodesFailedException reword(String newMessage) { return new AllNodesFailedException( - buildMessage(newMessage, errors), getExecutionInfo(), errors.entrySet()); + buildMessage(newMessage, errors, queryPlan), getExecutionInfo(), errors.entrySet()); } private static Map> groupByNode(Map errors) { diff --git a/core/src/main/java/com/datastax/oss/driver/api/core/NoNodeAvailableException.java b/core/src/main/java/com/datastax/oss/driver/api/core/NoNodeAvailableException.java index 9ef51fb99b6..cfccc8c8d1f 100644 --- a/core/src/main/java/com/datastax/oss/driver/api/core/NoNodeAvailableException.java +++ b/core/src/main/java/com/datastax/oss/driver/api/core/NoNodeAvailableException.java @@ -18,8 +18,10 @@ package com.datastax.oss.driver.api.core; import com.datastax.oss.driver.api.core.cql.ExecutionInfo; +import com.datastax.oss.driver.api.core.metadata.Node; import edu.umd.cs.findbugs.annotations.NonNull; import java.util.Collections; +import java.util.Queue; /** * Specialization of {@code AllNodesFailedException} when no coordinators were tried. @@ -32,13 +34,25 @@ public NoNodeAvailableException() { this(null); } - private NoNodeAvailableException(ExecutionInfo executionInfo) { - super("No node was available to execute the query", executionInfo, Collections.emptySet()); + private NoNodeAvailableException( + String message, ExecutionInfo executionInfo, Queue queryPlan) { + super(message, executionInfo, Collections.emptySet(), queryPlan); + } + + public NoNodeAvailableException(Queue queryPlan) { + this(buildMessage(queryPlan), null, queryPlan); + } + + private static String buildMessage(Queue queryPlan) { + if (queryPlan == null) { + return "No node was available to execute the query"; + } + return "No node was available to execute the query. Query Plan: " + queryPlan; } @NonNull @Override public DriverException copy() { - return new NoNodeAvailableException(getExecutionInfo()); + return new NoNodeAvailableException(getMessage(), getExecutionInfo(), getQueryPlan()); } } diff --git a/core/src/main/java/com/datastax/oss/driver/api/core/config/DefaultDriverOption.java b/core/src/main/java/com/datastax/oss/driver/api/core/config/DefaultDriverOption.java index 55e8d53dc66..93a6705ab83 100644 --- a/core/src/main/java/com/datastax/oss/driver/api/core/config/DefaultDriverOption.java +++ b/core/src/main/java/com/datastax/oss/driver/api/core/config/DefaultDriverOption.java @@ -141,6 +141,12 @@ public enum DefaultDriverOption implements DriverOption { *

Value-type: boolean */ CONNECTION_WARN_INIT_ERROR("advanced.connection.warn-on-init-error"), + /** + * Provide more details when query execution has failed. + * + *

Value-Type: boolean + */ + CONNECTION_QUERY_PLAN_EXCEPTIONS("advanced.connection.detailed-query-plan-exceptions"), /** * The number of connections in the LOCAL pool. * diff --git a/core/src/main/java/com/datastax/oss/driver/api/core/config/OptionsMap.java b/core/src/main/java/com/datastax/oss/driver/api/core/config/OptionsMap.java index 8906e1dd349..a397995b077 100644 --- a/core/src/main/java/com/datastax/oss/driver/api/core/config/OptionsMap.java +++ b/core/src/main/java/com/datastax/oss/driver/api/core/config/OptionsMap.java @@ -381,6 +381,7 @@ protected static void fillWithDriverDefaults(OptionsMap map) { map.put(TypedDriverOption.LOAD_BALANCING_DC_FAILOVER_MAX_NODES_PER_REMOTE_DC, 0); map.put(TypedDriverOption.LOAD_BALANCING_DC_FAILOVER_ALLOW_FOR_LOCAL_CONSISTENCY_LEVELS, false); map.put(TypedDriverOption.METRICS_GENERATE_AGGREGABLE_HISTOGRAMS, true); + map.put(TypedDriverOption.CONNECTION_QUERY_PLAN_EXCEPTIONS, false); } @Immutable diff --git a/core/src/main/java/com/datastax/oss/driver/api/core/config/TypedDriverOption.java b/core/src/main/java/com/datastax/oss/driver/api/core/config/TypedDriverOption.java index 9be69d0424f..c3296db28c0 100644 --- a/core/src/main/java/com/datastax/oss/driver/api/core/config/TypedDriverOption.java +++ b/core/src/main/java/com/datastax/oss/driver/api/core/config/TypedDriverOption.java @@ -889,6 +889,11 @@ public String toString() { DefaultDriverOption.LOAD_BALANCING_DC_FAILOVER_ALLOW_FOR_LOCAL_CONSISTENCY_LEVELS, GenericType.BOOLEAN); + /** TBD. */ + public static final TypedDriverOption CONNECTION_QUERY_PLAN_EXCEPTIONS = + new TypedDriverOption<>( + DefaultDriverOption.CONNECTION_QUERY_PLAN_EXCEPTIONS, GenericType.BOOLEAN); + private static Iterable> introspectBuiltInValues() { try { ImmutableList.Builder> result = ImmutableList.builder(); diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/control/ControlConnection.java b/core/src/main/java/com/datastax/oss/driver/internal/core/control/ControlConnection.java index 7e9592c64d3..96f85df6978 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/control/ControlConnection.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/control/ControlConnection.java @@ -37,6 +37,7 @@ import com.datastax.oss.driver.internal.core.metadata.NodeStateEvent; import com.datastax.oss.driver.internal.core.metadata.TopologyEvent; import com.datastax.oss.driver.internal.core.util.Loggers; +import com.datastax.oss.driver.internal.core.util.collection.DebugQueryPlan; import com.datastax.oss.driver.internal.core.util.concurrent.CompletableFutures; import com.datastax.oss.driver.internal.core.util.concurrent.Reconnection; import com.datastax.oss.driver.internal.core.util.concurrent.RunOrSchedule; @@ -357,7 +358,11 @@ private void connect( assert adminExecutor.inEventLoop(); Node node = nodes.poll(); if (node == null) { - onFailure.accept(AllNodesFailedException.fromErrors(errors)); + if (nodes instanceof DebugQueryPlan) { + onFailure.accept(AllNodesFailedException.fromErrors(errors, nodes)); + } else { + onFailure.accept(AllNodesFailedException.fromErrors(errors)); + } } else { LOG.debug("[{}] Trying to establish a connection to {}", logPrefix, node); context diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlPrepareHandler.java b/core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlPrepareHandler.java index 53f730e9d4e..85c16fb8b6c 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlPrepareHandler.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlPrepareHandler.java @@ -47,6 +47,7 @@ import com.datastax.oss.driver.internal.core.context.InternalDriverContext; import com.datastax.oss.driver.internal.core.session.DefaultSession; import com.datastax.oss.driver.internal.core.util.Loggers; +import com.datastax.oss.driver.internal.core.util.collection.DebugQueryPlan; import com.datastax.oss.driver.internal.core.util.concurrent.CompletableFutures; import com.datastax.oss.protocol.internal.Frame; import com.datastax.oss.protocol.internal.Message; @@ -197,7 +198,11 @@ private void sendRequest(PrepareRequest request, Node node, int retryCount) { } } if (channel == null) { - setFinalError(AllNodesFailedException.fromErrors(this.errors)); + if (queryPlan instanceof DebugQueryPlan) { + setFinalError(AllNodesFailedException.fromErrors(this.errors, queryPlan)); + } else { + setFinalError(AllNodesFailedException.fromErrors(this.errors)); + } } else { InitialPrepareCallback initialPrepareCallback = new InitialPrepareCallback(request, node, channel, retryCount); diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandler.java b/core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandler.java index b0cfff488f8..06012c17a82 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandler.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandler.java @@ -72,6 +72,7 @@ import com.datastax.oss.driver.internal.core.tracker.NoopRequestTracker; import com.datastax.oss.driver.internal.core.tracker.RequestLogger; import com.datastax.oss.driver.internal.core.util.Loggers; +import com.datastax.oss.driver.internal.core.util.collection.DebugQueryPlan; import com.datastax.oss.driver.internal.core.util.collection.SimpleQueryPlan; import com.datastax.oss.protocol.internal.Frame; import com.datastax.oss.protocol.internal.Message; @@ -385,7 +386,12 @@ private void sendRequest( // We've reached the end of the query plan without finding any node to write to if (!result.isDone() && activeExecutionsCount.decrementAndGet() == 0) { // We're the last execution so fail the result - setFinalError(statement, AllNodesFailedException.fromErrors(this.errors), null, -1); + if (queryPlan instanceof DebugQueryPlan) { + setFinalError( + statement, AllNodesFailedException.fromErrors(this.errors, queryPlan), null, -1); + } else { + setFinalError(statement, AllNodesFailedException.fromErrors(this.errors), null, -1); + } } } else { NodeResponseCallback nodeResponseCallback = diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/BasicLoadBalancingPolicy.java b/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/BasicLoadBalancingPolicy.java index 996c967b7f8..72deaff388f 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/BasicLoadBalancingPolicy.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/BasicLoadBalancingPolicy.java @@ -48,18 +48,23 @@ import com.datastax.oss.driver.internal.core.loadbalancing.nodeset.DcAgnosticNodeSet; import com.datastax.oss.driver.internal.core.loadbalancing.nodeset.MultiDcNodeSet; import com.datastax.oss.driver.internal.core.loadbalancing.nodeset.NodeSet; +import com.datastax.oss.driver.internal.core.loadbalancing.nodeset.NodeSetInfo; import com.datastax.oss.driver.internal.core.loadbalancing.nodeset.SingleDcNodeSet; +import com.datastax.oss.driver.internal.core.metadata.DefaultNodeInfo; import com.datastax.oss.driver.internal.core.metadata.token.TokenLong64; import com.datastax.oss.driver.internal.core.util.ArrayUtils; import com.datastax.oss.driver.internal.core.util.collection.CompositeQueryPlan; +import com.datastax.oss.driver.internal.core.util.collection.DebugQueryPlan; import com.datastax.oss.driver.internal.core.util.collection.LazyQueryPlan; import com.datastax.oss.driver.internal.core.util.collection.QueryPlan; import com.datastax.oss.driver.internal.core.util.collection.SimpleQueryPlan; import com.datastax.oss.driver.shaded.guava.common.base.Predicates; import edu.umd.cs.findbugs.annotations.NonNull; import edu.umd.cs.findbugs.annotations.Nullable; +import java.io.Serializable; import java.nio.ByteBuffer; import java.util.Collections; +import java.util.HashSet; import java.util.Map; import java.util.Objects; import java.util.Optional; @@ -88,7 +93,7 @@ * } * * - * See {@code reference.conf} (in the manual or core driver JAR) for more details. + *

See {@code reference.conf} (in the manual or core driver JAR) for more details. * *

Local datacenter: This implementation will only define a local datacenter if it is * explicitly set either through configuration or programmatically; if the local datacenter is @@ -121,6 +126,7 @@ public class BasicLoadBalancingPolicy implements LoadBalancingPolicy { private final int maxNodesPerRemoteDc; private final boolean allowDcFailoverForLocalCl; + protected final boolean detailedQueryPlanExceptions; private final ConsistencyLevel defaultConsistencyLevel; // private because they should be set in init() and never be modified after @@ -139,6 +145,8 @@ public BasicLoadBalancingPolicy(@NonNull DriverContext context, @NonNull String allowDcFailoverForLocalCl = profile.getBoolean( DefaultDriverOption.LOAD_BALANCING_DC_FAILOVER_ALLOW_FOR_LOCAL_CONSISTENCY_LEVELS); + detailedQueryPlanExceptions = + profile.getBoolean(DefaultDriverOption.CONNECTION_QUERY_PLAN_EXCEPTIONS); defaultConsistencyLevel = this.context .getConsistencyLevelRegistry() @@ -249,12 +257,19 @@ protected NodeDistanceEvaluator createNodeDistanceEvaluator( @NonNull @Override public Queue newQueryPlan(@Nullable Request request, @Nullable Session session) { + BasicLoadBalancingPolicyDebugInfo debugInfo = + detailedQueryPlanExceptions ? new BasicLoadBalancingPolicyDebugInfo(this) : null; + // Take a snapshot since the set is concurrent: Object[] currentNodes = liveNodes.dc(localDc).toArray(); Set allReplicas = getReplicas(request, session); int replicaCount = 0; // in currentNodes + if (debugInfo != null) { + debugInfo.withReplicas(allReplicas); + } + if (!allReplicas.isEmpty()) { // Move replicas to the beginning for (int i = 0; i < currentNodes.length; i++) { @@ -280,7 +295,13 @@ public Queue newQueryPlan(@Nullable Request request, @Nullable Session ses roundRobinAmount.getAndUpdate(INCREMENT)); QueryPlan plan = currentNodes.length == 0 ? QueryPlan.EMPTY : new SimpleQueryPlan(currentNodes); - return maybeAddDcFailover(request, plan); + Queue finalPlan = maybeAddDcFailover(request, plan); + if (debugInfo != null) { + DebugQueryPlan debugPlan = new DebugQueryPlan(finalPlan); + debugPlan.setLoadBalancingPolicyInfo(debugInfo); + return debugPlan; + } + return finalPlan; } @NonNull @@ -479,4 +500,44 @@ protected NodeDistance computeNodeDistance(@NonNull Node node) { public void close() { // nothing to do } + + protected static class BasicLoadBalancingPolicyDebugInfo implements Serializable { + protected int maxNodesPerRemoteDc; + protected boolean allowDcFailoverForLocalCl; + protected ConsistencyLevel defaultConsistencyLevel; + protected String localDc; + protected String localRack; + protected NodeSetInfo liveNodes; + protected Set replicas = null; + + protected BasicLoadBalancingPolicyDebugInfo(BasicLoadBalancingPolicy policy) { + this.maxNodesPerRemoteDc = policy.maxNodesPerRemoteDc; + this.allowDcFailoverForLocalCl = policy.allowDcFailoverForLocalCl; + this.defaultConsistencyLevel = policy.defaultConsistencyLevel; + this.localDc = policy.localDc; + this.localRack = policy.localRack; + this.liveNodes = policy.liveNodes.toInfo(); + } + + @Override + public String toString() { + return String.format( + "BasicLoadBalancingPolicyDebugInfo{localDc: %s, localRack: %s, liveNodes: %s, replicas: %s" + + "defaultConsistencyLevel: %s, allowDcFailoverForLocalCl: %s, maxNodesPerRemoteDc: %s}", + localDc, + localRack, + liveNodes, + replicas, + defaultConsistencyLevel, + allowDcFailoverForLocalCl, + maxNodesPerRemoteDc); + } + + public void withReplicas(Set nodes) { + replicas = new HashSet<>(); + for (Node node : nodes) { + replicas.add(new DefaultNodeInfo.Builder(node).build()); + } + } + } } diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/DefaultLoadBalancingPolicy.java b/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/DefaultLoadBalancingPolicy.java index 4f31ff012c1..390ce903d3a 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/DefaultLoadBalancingPolicy.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/DefaultLoadBalancingPolicy.java @@ -28,14 +28,17 @@ import com.datastax.oss.driver.api.core.session.Session; import com.datastax.oss.driver.api.core.tracker.RequestTracker; import com.datastax.oss.driver.internal.core.loadbalancing.helper.MandatoryLocalDcHelper; +import com.datastax.oss.driver.internal.core.loadbalancing.nodeset.LazyCopyQueryPlan; import com.datastax.oss.driver.internal.core.pool.ChannelPool; import com.datastax.oss.driver.internal.core.session.DefaultSession; import com.datastax.oss.driver.internal.core.util.ArrayUtils; +import com.datastax.oss.driver.internal.core.util.collection.DebugQueryPlan; import com.datastax.oss.driver.internal.core.util.collection.QueryPlan; import com.datastax.oss.driver.internal.core.util.collection.SimpleQueryPlan; import edu.umd.cs.findbugs.annotations.NonNull; import edu.umd.cs.findbugs.annotations.Nullable; import java.util.BitSet; +import java.util.HashMap; import java.util.Map; import java.util.Objects; import java.util.Optional; @@ -64,7 +67,7 @@ * } * * - * See {@code reference.conf} (in the manual or core driver JAR) for more details. + *

See {@code reference.conf} (in the manual or core driver JAR) for more details. * *

Local datacenter: This implementation requires a local datacenter to be defined, * otherwise it will throw an {@link IllegalStateException}. A local datacenter can be supplied @@ -126,8 +129,16 @@ protected Optional discoverLocalDc(@NonNull Map nodes) { @NonNull @Override public Queue newQueryPlan(@Nullable Request request, @Nullable Session session) { + DefaultLoadBalancingPolicyDebugInfo debugInfo = + detailedQueryPlanExceptions ? new DefaultLoadBalancingPolicyDebugInfo(this) : null; if (!avoidSlowReplicas) { - return super.newQueryPlan(request, session); + Queue basicPlan = super.newQueryPlan(request, session); + if (debugInfo == null) { + return basicPlan; + } + DebugQueryPlan debugPlan = new DebugQueryPlan(basicPlan); + debugPlan.setLoadBalancingPolicyInfo(debugInfo); + return debugPlan; } // Take a snapshot since the set is concurrent: @@ -246,8 +257,20 @@ > getInFlight((Node) currentNodes[1], session)) { currentNodes.length - replicaCount, roundRobinAmount.getAndUpdate(INCREMENT)); + if (debugInfo == null) { + QueryPlan plan = + currentNodes.length == 0 ? QueryPlan.EMPTY : new SimpleQueryPlan(currentNodes); + return maybeAddDcFailover(request, plan); + } + QueryPlan plan = currentNodes.length == 0 ? QueryPlan.EMPTY : new SimpleQueryPlan(currentNodes); - return maybeAddDcFailover(request, plan); + plan = new LazyCopyQueryPlan(plan); + Queue finalPlan = maybeAddDcFailover(request, plan); + + DebugQueryPlan debugPlan = new DebugQueryPlan(finalPlan); + debugPlan.setLocalPlan(plan); + debugPlan.setLoadBalancingPolicyInfo(debugInfo); + return debugPlan; } @Override @@ -334,4 +357,35 @@ protected int getInFlight(@NonNull Node node, @NonNull Session session) { // processing them). return (pool == null) ? 0 : pool.getInFlight(); } + + private static class DefaultLoadBalancingPolicyDebugInfo + extends BasicLoadBalancingPolicyDebugInfo { + protected final Map responseTimes = new HashMap<>(); + protected final Map upTimes = new HashMap<>(); + private final boolean avoidSlowReplicas; + + public DefaultLoadBalancingPolicyDebugInfo(DefaultLoadBalancingPolicy policy) { + super(policy); + responseTimes.putAll(policy.responseTimes); + upTimes.putAll(policy.upTimes); + avoidSlowReplicas = policy.avoidSlowReplicas; + } + + @Override + public String toString() { + return String.format( + "DefaultLoadBalancingPolicyDebugInfo{localDc: %s, localRack: %s, liveNodes: %s, replicas: %s" + + ", defaultConsistencyLevel: %s, allowDcFailoverForLocalCl: %s, maxNodesPerRemoteDc: %s, responseTimes: %s, upTimes: %s, avoidSlowReplicas: %s}", + localDc, + localRack, + liveNodes, + replicas, + defaultConsistencyLevel, + allowDcFailoverForLocalCl, + maxNodesPerRemoteDc, + responseTimes, + upTimes, + avoidSlowReplicas); + } + } } diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/nodeset/DcAgnosticNodeSet.java b/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/nodeset/DcAgnosticNodeSet.java index 2a6e79023de..816ce3d6b68 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/nodeset/DcAgnosticNodeSet.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/nodeset/DcAgnosticNodeSet.java @@ -18,9 +18,11 @@ package com.datastax.oss.driver.internal.core.loadbalancing.nodeset; import com.datastax.oss.driver.api.core.metadata.Node; +import com.datastax.oss.driver.internal.core.metadata.DefaultNodeInfo; import edu.umd.cs.findbugs.annotations.NonNull; import edu.umd.cs.findbugs.annotations.Nullable; import java.util.Collections; +import java.util.HashSet; import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; import net.jcip.annotations.ThreadSafe; @@ -50,4 +52,36 @@ public Set dc(@Nullable String dc) { public Set dcs() { return Collections.emptySet(); } + + @Override + public NodeSetInfo toInfo() { + return new DcAgnosticNodeSetInfo(nodes); + } + + private static class DcAgnosticNodeSetInfo implements NodeSetInfo { + private final HashSet nodes; + + private DcAgnosticNodeSetInfo(Set nodes) { + this.nodes = new HashSet<>(); + for (Node node : nodes) { + this.nodes.add(new DefaultNodeInfo.Builder(node).build()); + } + } + + @Override + @NonNull + public Set dc(@Nullable String dc) { + return nodes; + } + + @Override + public String toString() { + return "DcAgnosticNodeSet(nodes: " + nodes.toString() + ")"; + } + + @Override + public Set dcs() { + return Collections.emptySet(); + } + } } diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/nodeset/LazyCopyQueryPlan.java b/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/nodeset/LazyCopyQueryPlan.java new file mode 100644 index 00000000000..d099231d892 --- /dev/null +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/nodeset/LazyCopyQueryPlan.java @@ -0,0 +1,82 @@ +package com.datastax.oss.driver.internal.core.loadbalancing.nodeset; + +import com.datastax.oss.driver.api.core.metadata.Node; +import com.datastax.oss.driver.internal.core.metadata.DefaultNodeInfo; +import com.datastax.oss.driver.internal.core.util.collection.QueryPlan; +import edu.umd.cs.findbugs.annotations.NonNull; +import edu.umd.cs.findbugs.annotations.Nullable; +import java.util.AbstractQueue; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.Queue; +import jnr.ffi.annotations.Synchronized; +import net.jcip.annotations.ThreadSafe; + +@ThreadSafe +public class LazyCopyQueryPlan extends AbstractQueue implements QueryPlan { + private final Queue originalPlan; + private final List itemsPulled = Collections.synchronizedList(new ArrayList<>()); + + public LazyCopyQueryPlan(@NonNull Queue originalPlan) { + this.originalPlan = originalPlan; + } + + @Nullable + @Override + @Synchronized + public Node poll() { + Node node = originalPlan.poll(); + if (node == null) { + return null; + } + this.itemsPulled.add(new DefaultNodeInfo.Builder(node).build()); + return node; + } + + @NonNull + @Override + @Synchronized + public Iterator iterator() { + return new NodeIterator(); + } + + @Override + @Synchronized + public int size() { + return originalPlan.size(); + } + + @Override + @Synchronized + public String toString() { + List inQueue = new ArrayList<>(); + for (Node node : originalPlan) { + inQueue.add(new DefaultNodeInfo.Builder(node).build()); + } + + return String.format( + "%s(inQueue: %s, itemsPulled: %s)", + originalPlan.getClass().getName(), inQueue, itemsPulled); + } + + private class NodeIterator implements Iterator { + @Override + @Synchronized + public boolean hasNext() { + return !originalPlan.isEmpty(); + } + + @Override + @Synchronized + public Node next() { + Node next = poll(); + if (next == null) { + throw new NoSuchElementException(); + } + return next; + } + } +} diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/nodeset/MultiDcNodeSet.java b/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/nodeset/MultiDcNodeSet.java index 37f02bec878..973192ed02d 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/nodeset/MultiDcNodeSet.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/nodeset/MultiDcNodeSet.java @@ -18,9 +18,12 @@ package com.datastax.oss.driver.internal.core.loadbalancing.nodeset; import com.datastax.oss.driver.api.core.metadata.Node; +import com.datastax.oss.driver.internal.core.metadata.DefaultNodeInfo; import edu.umd.cs.findbugs.annotations.NonNull; import edu.umd.cs.findbugs.annotations.Nullable; import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -82,6 +85,11 @@ public Set dcs() { return nodes.keySet(); } + @Override + public NodeSetInfo toInfo() { + return new MultiDcNodeSetInfo(nodes); + } + @NonNull private String getMapKey(@NonNull Node node) { return getMapKey(node.getDatacenter()); @@ -91,4 +99,38 @@ private String getMapKey(@NonNull Node node) { private String getMapKey(@Nullable String dc) { return dc == null ? UNKNOWN_DC : dc; } + + private static class MultiDcNodeSetInfo implements NodeSetInfo { + private final Map> nodes; + + private MultiDcNodeSetInfo(Map> nodes) { + this.nodes = new HashMap<>(); + for (Map.Entry> entry : nodes.entrySet()) { + Set dcNodes = this.nodes.getOrDefault(entry.getKey(), null); + if (dcNodes == null) { + dcNodes = new HashSet<>(); + this.nodes.put(entry.getKey(), dcNodes); + } + for (Node node : entry.getValue()) { + dcNodes.add(new DefaultNodeInfo.Builder(node).build()); + } + } + } + + @Override + public String toString() { + return "MultiDcNodeSetInfo(nodes: " + nodes.toString() + ")"; + } + + @NonNull + @Override + public Set dc(@Nullable String dc) { + return Collections.emptySet(); + } + + @Override + public Set dcs() { + return Collections.emptySet(); + } + } } diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/nodeset/NodeSet.java b/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/nodeset/NodeSet.java index 66460e16a7c..be7a53586b3 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/nodeset/NodeSet.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/nodeset/NodeSet.java @@ -68,4 +68,7 @@ public interface NodeSet { * disabled, this method returns an empty set. */ Set dcs(); + + /** Returns deep copy of the NodeSet. */ + NodeSetInfo toInfo(); } diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/nodeset/NodeSetInfo.java b/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/nodeset/NodeSetInfo.java new file mode 100644 index 00000000000..06da376d721 --- /dev/null +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/nodeset/NodeSetInfo.java @@ -0,0 +1,47 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datastax.oss.driver.internal.core.loadbalancing.nodeset; + +import com.datastax.oss.driver.internal.core.metadata.DefaultNodeInfo; +import edu.umd.cs.findbugs.annotations.NonNull; +import edu.umd.cs.findbugs.annotations.Nullable; +import java.io.Serializable; +import java.util.Set; + +/** + * A thread-safe abstraction around a map of nodes per datacenter, to facilitate node management by + * load balancing policies. + */ +public interface NodeSetInfo extends Serializable { + /** + * Returns the current nodes in the given datacenter. + * + *

If this set was initialized with datacenter awareness, the returned set will contain only + * nodes pertaining to the given datacenter; otherwise, the given datacenter name is ignored and + * the returned set will contain all nodes in the cluster. + * + * @param dc The datacenter name, or null if the datacenter name is not known, or irrelevant. + * @return the current nodes in the given datacenter. + */ + @NonNull + Set dc(@Nullable String dc); + + /** + * Returns the current datacenter names known to this set. If datacenter awareness has been + * disabled, this method returns an empty set. + */ + Set dcs(); +} diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/nodeset/SingleDcNodeSet.java b/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/nodeset/SingleDcNodeSet.java index 21c89d46927..a5b208b5caf 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/nodeset/SingleDcNodeSet.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/nodeset/SingleDcNodeSet.java @@ -18,10 +18,12 @@ package com.datastax.oss.driver.internal.core.loadbalancing.nodeset; import com.datastax.oss.driver.api.core.metadata.Node; +import com.datastax.oss.driver.internal.core.metadata.DefaultNodeInfo; import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableSet; import edu.umd.cs.findbugs.annotations.NonNull; import edu.umd.cs.findbugs.annotations.Nullable; import java.util.Collections; +import java.util.HashSet; import java.util.Objects; import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; @@ -69,4 +71,44 @@ public Set dc(@Nullable String dc) { public Set dcs() { return dcs; } + + @Override + public NodeSetInfo toInfo() { + return new SingleDcNodeSetInfo(dc, dcs, nodes); + } + + private static class SingleDcNodeSetInfo implements NodeSetInfo { + private final Set nodes = new HashSet<>(); + + private final String dc; + private final Set dcs; + + private SingleDcNodeSetInfo(String dc, Set dcs, Set nodes) { + this.dc = dc; + this.dcs = dcs; + + for (Node node : nodes) { + this.nodes.add(new DefaultNodeInfo.Builder(node).build()); + } + } + + @Override + @NonNull + public Set dc(@Nullable String dc) { + if (Objects.equals(this.dc, dc)) { + return nodes; + } + return Collections.emptySet(); + } + + @Override + public Set dcs() { + return dcs; + } + + @Override + public String toString() { + return "SingleDcNodeSetInfo(dc:" + dc + ", dcs: " + dcs + ", nodes: " + nodes + ")"; + } + } } diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/DefaultNodeInfo.java b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/DefaultNodeInfo.java index 8908f0be078..366b648026b 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/DefaultNodeInfo.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/DefaultNodeInfo.java @@ -17,7 +17,10 @@ */ package com.datastax.oss.driver.internal.core.metadata; +import com.datastax.oss.driver.api.core.Version; +import com.datastax.oss.driver.api.core.loadbalancing.NodeDistance; import com.datastax.oss.driver.api.core.metadata.EndPoint; +import com.datastax.oss.driver.api.core.metadata.Node; import edu.umd.cs.findbugs.annotations.NonNull; import edu.umd.cs.findbugs.annotations.Nullable; import java.net.InetSocketAddress; @@ -48,6 +51,9 @@ public static Builder builder() { private final Map extras; private final UUID hostId; private final UUID schemaVersion; + private final boolean isReconnecting; + private final NodeDistance distance; + private final int openConnections; private DefaultNodeInfo(Builder builder) { this.endPoint = builder.endPoint; @@ -62,6 +68,9 @@ private DefaultNodeInfo(Builder builder) { this.hostId = builder.hostId; this.schemaVersion = builder.schemaVersion; this.extras = (builder.extras == null) ? Collections.emptyMap() : builder.extras; + this.isReconnecting = builder.isReconnecting; + this.distance = builder.distance; + this.openConnections = builder.openConnections; } @NonNull @@ -129,6 +138,40 @@ public UUID getSchemaVersion() { return schemaVersion; } + @Override + public NodeDistance getDistance() { + return distance; + } + + @Override + public boolean isReconnecting() { + return isReconnecting; + } + + @Override + public int getOpenConnections() { + return openConnections; + } + + @Override + public String toString() { + return String.format( + "DefaultNodeInfo(hostId: %s, endPoint: %s, datacenter: %s, rack: %s, distance: %s, schemaVersion: %s, " + + "broadcastRpcAddress: %s, broadcastAddress: %s, listenAddress: %s, partitioner: %s, isReconnecting: %b, openConnections: %d)", + hostId, + endPoint, + datacenter, + rack, + distance, + schemaVersion, + broadcastRpcAddress, + broadcastAddress, + listenAddress, + partitioner, + isReconnecting, + openConnections); + } + @NotThreadSafe public static class Builder { private EndPoint endPoint; @@ -143,6 +186,9 @@ public static class Builder { private Map extras; private UUID hostId; private UUID schemaVersion; + private boolean isReconnecting; + private NodeDistance distance; + private int openConnections; public Builder withEndPoint(@NonNull EndPoint endPoint) { this.endPoint = endPoint; @@ -199,6 +245,50 @@ public Builder withSchemaVersion(@Nullable UUID schemaVersion) { return this; } + public Builder withReconnecting(boolean isReconnecting) { + this.isReconnecting = isReconnecting; + return this; + } + + public Builder withOpenConnections(int openConnections) { + this.openConnections = openConnections; + return this; + } + + public Builder withDistance(NodeDistance distance) { + this.distance = distance; + return this; + } + + public Builder() {} + + public Builder(Node node) { + this.withEndPoint(node.getEndPoint()); + this.withBroadcastAddress(node.getBroadcastAddress().orElse(null)); + this.withBroadcastRpcAddress(node.getBroadcastRpcAddress().orElse(null)); + this.withListenAddress(node.getListenAddress().orElse(null)); + this.withDatacenter(node.getDatacenter()); + this.withReconnecting(node.isReconnecting()); + this.withDistance(node.getDistance()); + this.withRack(node.getRack()); + this.withOpenConnections(node.getOpenConnections()); + + Version cassandraVersion = node.getCassandraVersion(); + if (cassandraVersion != null) { + this.withCassandraVersion(cassandraVersion.toString()); + } + + UUID hostID = node.getHostId(); + if (hostID != null) { + this.withHostId(hostID); + } + + UUID schemaVersion = node.getSchemaVersion(); + if (schemaVersion != null) { + this.withSchemaVersion(schemaVersion); + } + } + public Builder withExtra(@NonNull String key, @Nullable Object value) { if (value != null) { if (this.extras == null) { diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/LoadBalancingPolicyWrapper.java b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/LoadBalancingPolicyWrapper.java index 20d045d4e72..2e8bfb25e91 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/LoadBalancingPolicyWrapper.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/LoadBalancingPolicyWrapper.java @@ -17,6 +17,7 @@ */ package com.datastax.oss.driver.internal.core.metadata; +import com.datastax.oss.driver.api.core.config.DefaultDriverOption; import com.datastax.oss.driver.api.core.config.DriverExecutionProfile; import com.datastax.oss.driver.api.core.loadbalancing.LoadBalancingPolicy; import com.datastax.oss.driver.api.core.loadbalancing.NodeDistance; @@ -26,14 +27,18 @@ import com.datastax.oss.driver.api.core.session.Request; import com.datastax.oss.driver.api.core.session.Session; import com.datastax.oss.driver.internal.core.context.InternalDriverContext; +import com.datastax.oss.driver.internal.core.util.collection.DebugQueryPlan; +import com.datastax.oss.driver.internal.core.util.collection.QueryPlan; import com.datastax.oss.driver.internal.core.util.concurrent.ReplayingEventFilter; import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableMap; import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableSet; import edu.umd.cs.findbugs.annotations.NonNull; import edu.umd.cs.findbugs.annotations.Nullable; +import java.io.Serializable; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Queue; @@ -75,6 +80,7 @@ private enum State { private final Set policies; private final Map policiesPerProfile; private final Map reporters; + private final boolean detailedQueryPlanExceptions; private final Lock distancesLock = new ReentrantLock(); @@ -94,6 +100,13 @@ public LoadBalancingPolicyWrapper( this.context = context; this.policiesPerProfile = policiesPerProfile; + + detailedQueryPlanExceptions = + context + .getConfig() + .getDefaultProfile() + .getBoolean(DefaultDriverOption.CONNECTION_QUERY_PLAN_EXCEPTIONS); + ImmutableMap.Builder reportersBuilder = ImmutableMap.builder(); // ImmutableMap.values does not remove duplicates, do it now so that we won't invoke a policy @@ -142,13 +155,22 @@ public void init() { @NonNull public Queue newQueryPlan( @Nullable Request request, @NonNull String executionProfileName, @Nullable Session session) { - switch (stateRef.get()) { + State state = stateRef.get(); + switch (state) { case BEFORE_INIT: case DURING_INIT: - // The contact points are not stored in the metadata yet: List nodes = new ArrayList<>(context.getMetadataManager().getContactPoints()); Collections.shuffle(nodes); - return new ConcurrentLinkedQueue<>(nodes); + Queue plan = new ConcurrentLinkedQueue<>(nodes); + if (!detailedQueryPlanExceptions) { + // The contact points are not stored in the metadata yet: + return plan; + } + DebugQueryPlan debugPlan = new DebugQueryPlan(plan); + LoadBalancingPolicyWrapperInfo wrapperInfo = + new LoadBalancingPolicyWrapperInfo(this, state); + debugPlan.setLoadBalancingPolicyWrapperInfo(wrapperInfo); + return debugPlan; case RUNNING: LoadBalancingPolicy policy = policiesPerProfile.get(executionProfileName); if (policy == null) { @@ -156,7 +178,14 @@ public Queue newQueryPlan( } return policy.newQueryPlan(request, session); default: - return new ConcurrentLinkedQueue<>(); + if (!detailedQueryPlanExceptions) { + return QueryPlan.EMPTY; + } + plan = QueryPlan.EMPTY; + debugPlan = new DebugQueryPlan(plan); + wrapperInfo = new LoadBalancingPolicyWrapperInfo(this, state); + debugPlan.setLoadBalancingPolicyWrapperInfo(wrapperInfo); + return debugPlan; } } @@ -270,4 +299,25 @@ private NodeDistance aggregate(Map distances) return minimum; } } + + private static class LoadBalancingPolicyWrapperInfo implements Serializable { + private final InternalDriverContext context; + private final Set policies; + private final Map policiesPerProfile; + private final State state; + + private LoadBalancingPolicyWrapperInfo(LoadBalancingPolicyWrapper wrapper, State state) { + this.context = wrapper.context; + this.policies = new HashSet<>(); + this.policiesPerProfile = new HashMap<>(); + this.state = state; + } + + @Override + public String toString() { + return String.format( + "LoadBalancingPolicyWrapperInfo(state: %s, context: %s, policies: %s, policiesPerProfile: %s)", + state, context, policies, policiesPerProfile); + } + } } diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/NodeInfo.java b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/NodeInfo.java index 6a9651d8376..3cdbf8d233c 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/NodeInfo.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/NodeInfo.java @@ -158,4 +158,28 @@ public interface NodeInfo { */ @Nullable UUID getSchemaVersion(); + + /** + * The current node distance. + * + *

This is not required; the driver reports it in {@link Node#getDistance()}, but for + * informational purposes only. It is not used anywhere internally. + */ + NodeDistance getDistance(); + + /** + * The current reconnecting status. + * + *

This is not required; the driver reports it in {@link Node#isReconnecting()}, but for + * informational purposes only. It is not used anywhere internally. + */ + boolean isReconnecting(); + + /** + * The current number of opened connections to the host. + * + *

This is not required; the driver reports it in {@link Node#getOpenConnections()}, but for + * informational purposes only. It is not used anywhere internally. + */ + int getOpenConnections(); } diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/util/collection/DebugQueryPlan.java b/core/src/main/java/com/datastax/oss/driver/internal/core/util/collection/DebugQueryPlan.java new file mode 100644 index 00000000000..c41e0431e67 --- /dev/null +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/util/collection/DebugQueryPlan.java @@ -0,0 +1,88 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datastax.oss.driver.internal.core.util.collection; + +import com.datastax.oss.driver.api.core.metadata.Node; +import com.datastax.oss.driver.internal.core.loadbalancing.nodeset.LazyCopyQueryPlan; +import edu.umd.cs.findbugs.annotations.NonNull; +import java.io.Serializable; +import java.util.AbstractQueue; +import java.util.Iterator; +import java.util.Queue; +import net.jcip.annotations.ThreadSafe; + +/** A query plan that attaches debug information to original query plan. */ +@ThreadSafe +public class DebugQueryPlan extends AbstractQueue implements QueryPlan { + private final Queue plan; + private Queue localPlan; + private Serializable policyInfo; + private Serializable policyWrapperInfo; + + public DebugQueryPlan(@NonNull Queue originalPlan) { + if (originalPlan instanceof LazyCopyQueryPlan) { + this.plan = originalPlan; + } else { + this.plan = new LazyCopyQueryPlan(originalPlan); + } + } + + @Override + public Iterator iterator() { + return this.plan.iterator(); + } + + @Override + public int size() { + return this.plan.size(); + } + + public QueryPlan setLocalPlan(@NonNull QueryPlan plan) { + LazyCopyQueryPlan copyPlan = new LazyCopyQueryPlan(plan); + this.localPlan = copyPlan; + return copyPlan; + } + + public void setLoadBalancingPolicyInfo(Serializable policyInfo) { + this.policyInfo = policyInfo; + } + + public void setLoadBalancingPolicyWrapperInfo(Serializable policyWrapperInfo) { + this.policyWrapperInfo = policyWrapperInfo; + } + + @Override + public String toString() { + StringBuilder result = new StringBuilder(); + result.append(this.getClass().getName()).append("(plan: ").append(plan); + if (this.policyInfo != null) { + result.append(", policy: ").append(policyInfo); + } + if (this.policyWrapperInfo != null) { + result.append(", wrapper: ").append(policyWrapperInfo); + } + if (this.localPlan != null) { + result.append(", localPlan: ").append(localPlan); + } + result.append(")"); + return result.toString(); + } + + @Override + public Node poll() { + return plan.poll(); + } +} diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/util/collection/EmptyQueryPlan.java b/core/src/main/java/com/datastax/oss/driver/internal/core/util/collection/EmptyQueryPlan.java index 53177147695..9dfb4622d5a 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/util/collection/EmptyQueryPlan.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/util/collection/EmptyQueryPlan.java @@ -42,4 +42,9 @@ public Iterator iterator() { public int size() { return 0; } + + @Override + public String toString() { + return "EmptyQueryPlan"; + } } diff --git a/core/src/main/resources/reference.conf b/core/src/main/resources/reference.conf index 75bed97e498..423614558ed 100644 --- a/core/src/main/resources/reference.conf +++ b/core/src/main/resources/reference.conf @@ -535,6 +535,12 @@ datastax-java-driver { # change. # Overridable in a profile: no warn-on-init-error = true + + # TBD: Add more details here + # Required: no + # Modifiable at runtime: no + # Overridable in a profile: yes + detailed-query-plan-exceptions = false } # Advanced options for the built-in load-balancing policies. diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/loadbalancing/nodeset/LazyCopyQueryPlanTest.java b/core/src/test/java/com/datastax/oss/driver/internal/core/loadbalancing/nodeset/LazyCopyQueryPlanTest.java new file mode 100644 index 00000000000..586f9c2938c --- /dev/null +++ b/core/src/test/java/com/datastax/oss/driver/internal/core/loadbalancing/nodeset/LazyCopyQueryPlanTest.java @@ -0,0 +1,54 @@ +package com.datastax.oss.driver.internal.core.loadbalancing.nodeset; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import com.datastax.oss.driver.api.core.metadata.Node; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import org.junit.Test; + +public class LazyCopyQueryPlanTest { + @Test + public void toString_returns_proper_results_after_poll() { + Queue original = new ConcurrentLinkedQueue(); + original.add(mockNode("dc1")); + original.add(mockNode("dc2")); + Queue lc = new LazyCopyQueryPlan(original); + assertThat(lc.toString()) + .isEqualTo( + "java.util.concurrent.ConcurrentLinkedQueue(inQueue: [" + + "DefaultNodeInfo(hostId: null, endPoint: null, datacenter: dc1, rack: null, distance: null, schemaVersion: null, broadcastRpcAddress: null, broadcastAddress: null, listenAddress: null, partitioner: null, isReconnecting: false, openConnections: 0), " + + "DefaultNodeInfo(hostId: null, endPoint: null, datacenter: dc2, rack: null, distance: null, schemaVersion: null, broadcastRpcAddress: null, broadcastAddress: null, listenAddress: null, partitioner: null, isReconnecting: false, openConnections: 0)" + + "], itemsPulled: [])"); + lc.poll(); + assertThat(lc.toString()) + .isEqualTo( + "java.util.concurrent.ConcurrentLinkedQueue(inQueue: [" + + "DefaultNodeInfo(hostId: null, endPoint: null, datacenter: dc2, rack: null, distance: null, schemaVersion: null, broadcastRpcAddress: null, broadcastAddress: null, listenAddress: null, partitioner: null, isReconnecting: false, openConnections: 0)" + + "], itemsPulled: [" + + "DefaultNodeInfo(hostId: null, endPoint: null, datacenter: dc1, rack: null, distance: null, schemaVersion: null, broadcastRpcAddress: null, broadcastAddress: null, listenAddress: null, partitioner: null, isReconnecting: false, openConnections: 0)" + + "])"); + lc.poll(); + assertThat(lc.toString()) + .isEqualTo( + "java.util.concurrent.ConcurrentLinkedQueue(inQueue: [], itemsPulled: [" + + "DefaultNodeInfo(hostId: null, endPoint: null, datacenter: dc1, rack: null, distance: null, schemaVersion: null, broadcastRpcAddress: null, broadcastAddress: null, listenAddress: null, partitioner: null, isReconnecting: false, openConnections: 0), " + + "DefaultNodeInfo(hostId: null, endPoint: null, datacenter: dc2, rack: null, distance: null, schemaVersion: null, broadcastRpcAddress: null, broadcastAddress: null, listenAddress: null, partitioner: null, isReconnecting: false, openConnections: 0)])"); + + // Make sure that when queue is exhausted next poll does not break toString results + lc.poll(); + assertThat(lc.toString()) + .isEqualTo( + "java.util.concurrent.ConcurrentLinkedQueue(inQueue: [], itemsPulled: [" + + "DefaultNodeInfo(hostId: null, endPoint: null, datacenter: dc1, rack: null, distance: null, schemaVersion: null, broadcastRpcAddress: null, broadcastAddress: null, listenAddress: null, partitioner: null, isReconnecting: false, openConnections: 0), " + + "DefaultNodeInfo(hostId: null, endPoint: null, datacenter: dc2, rack: null, distance: null, schemaVersion: null, broadcastRpcAddress: null, broadcastAddress: null, listenAddress: null, partitioner: null, isReconnecting: false, openConnections: 0)])"); + } + + private Node mockNode(String dc) { + Node node = mock(Node.class); + when(node.getDatacenter()).thenReturn(dc); + return node; + } +} diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/metadata/LoadBalancingPolicyWrapperTest.java b/core/src/test/java/com/datastax/oss/driver/internal/core/metadata/LoadBalancingPolicyWrapperTest.java index 1a0292e3947..047622e5b3b 100644 --- a/core/src/test/java/com/datastax/oss/driver/internal/core/metadata/LoadBalancingPolicyWrapperTest.java +++ b/core/src/test/java/com/datastax/oss/driver/internal/core/metadata/LoadBalancingPolicyWrapperTest.java @@ -27,6 +27,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import com.datastax.oss.driver.api.core.config.DriverConfigLoader; import com.datastax.oss.driver.api.core.config.DriverExecutionProfile; import com.datastax.oss.driver.api.core.loadbalancing.LoadBalancingPolicy; import com.datastax.oss.driver.api.core.loadbalancing.LoadBalancingPolicy.DistanceReporter; @@ -77,6 +78,9 @@ public class LoadBalancingPolicyWrapperTest { @Mock protected MetricsFactory metricsFactory; @Captor private ArgumentCaptor> initNodesCaptor; + private static final DriverConfigLoader configLoader = + DriverConfigLoader.programmaticBuilder().build(); + private LoadBalancingPolicyWrapper wrapper; @Before @@ -103,6 +107,7 @@ public void setup() { eventBus = spy(new EventBus("test")); when(context.getEventBus()).thenReturn(eventBus); + when(context.getConfig()).thenReturn(configLoader.getInitialConfig()); wrapper = new LoadBalancingPolicyWrapper( diff --git a/test-infra/src/main/java/com/datastax/oss/driver/api/testinfra/ccm/BaseCcmRule.java b/test-infra/src/main/java/com/datastax/oss/driver/api/testinfra/ccm/BaseCcmRule.java index 2e137b08543..d0bd46ad479 100644 --- a/test-infra/src/main/java/com/datastax/oss/driver/api/testinfra/ccm/BaseCcmRule.java +++ b/test-infra/src/main/java/com/datastax/oss/driver/api/testinfra/ccm/BaseCcmRule.java @@ -186,6 +186,10 @@ public Version getCassandraVersion() { return ccmBridge.getCassandraVersion(); } + public CcmBridge getCcmBridge() { + return ccmBridge; + } + public Optional getDseVersion() { return ccmBridge.getDseVersion(); } diff --git a/test-infra/src/main/java/com/datastax/oss/driver/api/testinfra/ccm/CustomCcmRule.java b/test-infra/src/main/java/com/datastax/oss/driver/api/testinfra/ccm/CustomCcmRule.java index 79cc0f7e601..ce40bc5fe1b 100644 --- a/test-infra/src/main/java/com/datastax/oss/driver/api/testinfra/ccm/CustomCcmRule.java +++ b/test-infra/src/main/java/com/datastax/oss/driver/api/testinfra/ccm/CustomCcmRule.java @@ -63,6 +63,7 @@ protected void after() { } } + @Override public CcmBridge getCcmBridge() { return ccmBridge; }