diff --git a/core/src/main/java/com/datastax/dse/driver/internal/core/cql/continuous/ContinuousRequestHandlerBase.java b/core/src/main/java/com/datastax/dse/driver/internal/core/cql/continuous/ContinuousRequestHandlerBase.java index 44df3b3a03d..e62935fe2f6 100644 --- a/core/src/main/java/com/datastax/dse/driver/internal/core/cql/continuous/ContinuousRequestHandlerBase.java +++ b/core/src/main/java/com/datastax/dse/driver/internal/core/cql/continuous/ContinuousRequestHandlerBase.java @@ -65,6 +65,7 @@ import com.datastax.oss.driver.internal.core.session.DefaultSession; import com.datastax.oss.driver.internal.core.session.RepreparePayload; import com.datastax.oss.driver.internal.core.util.Loggers; +import com.datastax.oss.driver.internal.core.util.collection.DebugQueryPlan; import com.datastax.oss.driver.internal.core.util.collection.SimpleQueryPlan; import com.datastax.oss.driver.shaded.guava.common.annotations.VisibleForTesting; import com.datastax.oss.protocol.internal.Frame; @@ -360,7 +361,11 @@ private void sendRequest( // We've reached the end of the query plan without finding any node to write to; abort the // continuous paging session. if (activeExecutionsCount.decrementAndGet() == 0) { - abortGlobalRequestOrChosenCallback(AllNodesFailedException.fromErrors(errors)); + if (queryPlan instanceof DebugQueryPlan) { + abortGlobalRequestOrChosenCallback(AllNodesFailedException.fromErrors(errors, queryPlan)); + } else { + abortGlobalRequestOrChosenCallback(AllNodesFailedException.fromErrors(errors)); + } } } else if (!chosenCallback.isDone()) { NodeResponseCallback nodeResponseCallback = diff --git a/core/src/main/java/com/datastax/dse/driver/internal/core/graph/GraphRequestHandler.java b/core/src/main/java/com/datastax/dse/driver/internal/core/graph/GraphRequestHandler.java index c2298458805..03ffd6b5a7d 100644 --- a/core/src/main/java/com/datastax/dse/driver/internal/core/graph/GraphRequestHandler.java +++ b/core/src/main/java/com/datastax/dse/driver/internal/core/graph/GraphRequestHandler.java @@ -60,6 +60,7 @@ import com.datastax.oss.driver.internal.core.tracker.NoopRequestTracker; import com.datastax.oss.driver.internal.core.tracker.RequestLogger; import com.datastax.oss.driver.internal.core.util.Loggers; +import com.datastax.oss.driver.internal.core.util.collection.DebugQueryPlan; import com.datastax.oss.driver.internal.core.util.collection.SimpleQueryPlan; import com.datastax.oss.protocol.internal.Frame; import com.datastax.oss.protocol.internal.Message; @@ -265,11 +266,13 @@ private void sendRequest( // We've reached the end of the query plan without finding any node to write to if (!result.isDone() && activeExecutionsCount.decrementAndGet() == 0) { // We're the last execution so fail the result - setFinalError( - statement, - AllNodesFailedException.fromErrors(this.errors), - null, - NO_SUCCESSFUL_EXECUTION); + AllNodesFailedException exception; + if (queryPlan instanceof DebugQueryPlan) { + exception = AllNodesFailedException.fromErrors(this.errors, queryPlan); + } else { + exception = AllNodesFailedException.fromErrors(this.errors); + } + setFinalError(statement, exception, null, NO_SUCCESSFUL_EXECUTION); } } else { NodeResponseCallback nodeResponseCallback = diff --git a/core/src/main/java/com/datastax/oss/driver/api/core/AllNodesFailedException.java b/core/src/main/java/com/datastax/oss/driver/api/core/AllNodesFailedException.java index b6f1bf93838..a539eae0a82 100644 --- a/core/src/main/java/com/datastax/oss/driver/api/core/AllNodesFailedException.java +++ b/core/src/main/java/com/datastax/oss/driver/api/core/AllNodesFailedException.java @@ -30,6 +30,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Queue; /** * Thrown when a query failed on all the coordinators it was tried on. This exception may wrap @@ -37,7 +38,6 @@ * exceptions}, or via {@link #getAllErrors()} where they are grouped by node. */ public class AllNodesFailedException extends DriverException { - /** @deprecated Use {@link #fromErrors(List)} instead. */ @NonNull @Deprecated @@ -45,7 +45,17 @@ public static AllNodesFailedException fromErrors(@Nullable Map if (errors == null || errors.isEmpty()) { return new NoNodeAvailableException(); } else { - return new AllNodesFailedException(groupByNode(errors)); + return new AllNodesFailedException(groupByNode(errors), null); + } + } + + @NonNull + public static AllNodesFailedException fromErrors( + @Nullable List> errors, Queue queryPlan) { + if (errors == null || errors.isEmpty()) { + return new NoNodeAvailableException(queryPlan); + } else { + return new AllNodesFailedException(groupByNode(errors), queryPlan); } } @@ -59,6 +69,7 @@ public static AllNodesFailedException fromErrors(@Nullable List> errors; + private final Queue queryPlan; /** @deprecated Use {@link #AllNodesFailedException(String, ExecutionInfo, Iterable)} instead. */ @Deprecated @@ -68,6 +79,7 @@ protected AllNodesFailedException( @NonNull Map errors) { super(message, executionInfo, null, true); this.errors = toDeepImmutableMap(groupByNode(errors)); + this.queryPlan = null; addSuppressedErrors(); } @@ -77,6 +89,18 @@ protected AllNodesFailedException( @NonNull Iterable>> errors) { super(message, executionInfo, null, true); this.errors = toDeepImmutableMap(errors); + this.queryPlan = null; + addSuppressedErrors(); + } + + protected AllNodesFailedException( + @NonNull String message, + @Nullable ExecutionInfo executionInfo, + @NonNull Iterable>> errors, + @Nullable Queue queryPlan) { + super(message, executionInfo, null, true); + this.errors = toDeepImmutableMap(errors); + this.queryPlan = queryPlan; addSuppressedErrors(); } @@ -91,12 +115,26 @@ private void addSuppressedErrors() { private AllNodesFailedException(Map> errors) { this( buildMessage( - String.format("All %d node(s) tried for the query failed", errors.size()), errors), + String.format("All %d node(s) tried for the query failed", errors.size()), + errors, + null), null, errors.entrySet()); } - private static String buildMessage(String baseMessage, Map> errors) { + private AllNodesFailedException(Map> errors, Queue queryPlan) { + this( + buildMessage( + String.format("All %d node(s) tried for the query failed", errors.size()), + errors, + queryPlan), + null, + errors.entrySet(), + queryPlan); + } + + private static String buildMessage( + String baseMessage, Map> errors, Queue queryPlan) { int limit = Math.min(errors.size(), 3); Iterator>> iterator = Iterables.limit(errors.entrySet(), limit).iterator(); @@ -108,9 +146,14 @@ private static String buildMessage(String baseMessage, Map details.append(", "); } } + if (queryPlan == null) { + return String.format( + "%s (showing first %d nodes, use getAllErrors() for more): %s", + baseMessage, limit, details); + } return String.format( - "%s (showing first %d nodes, use getAllErrors() for more): %s", - baseMessage, limit, details); + "%s\nQuery Plan: %s\n(showing first %d nodes, use getAllErrors() for more): %s", + baseMessage, queryPlan, limit, details); } /** @@ -131,6 +174,10 @@ public Map getErrors() { return builder.build(); } + protected Queue getQueryPlan() { + return this.queryPlan; + } + /** An immutable map containing all errors on each tried node. */ @NonNull public Map> getAllErrors() { @@ -146,7 +193,7 @@ public DriverException copy() { @NonNull public AllNodesFailedException reword(String newMessage) { return new AllNodesFailedException( - buildMessage(newMessage, errors), getExecutionInfo(), errors.entrySet()); + buildMessage(newMessage, errors, queryPlan), getExecutionInfo(), errors.entrySet()); } private static Map> groupByNode(Map errors) { diff --git a/core/src/main/java/com/datastax/oss/driver/api/core/NoNodeAvailableException.java b/core/src/main/java/com/datastax/oss/driver/api/core/NoNodeAvailableException.java index 9ef51fb99b6..cfccc8c8d1f 100644 --- a/core/src/main/java/com/datastax/oss/driver/api/core/NoNodeAvailableException.java +++ b/core/src/main/java/com/datastax/oss/driver/api/core/NoNodeAvailableException.java @@ -18,8 +18,10 @@ package com.datastax.oss.driver.api.core; import com.datastax.oss.driver.api.core.cql.ExecutionInfo; +import com.datastax.oss.driver.api.core.metadata.Node; import edu.umd.cs.findbugs.annotations.NonNull; import java.util.Collections; +import java.util.Queue; /** * Specialization of {@code AllNodesFailedException} when no coordinators were tried. @@ -32,13 +34,25 @@ public NoNodeAvailableException() { this(null); } - private NoNodeAvailableException(ExecutionInfo executionInfo) { - super("No node was available to execute the query", executionInfo, Collections.emptySet()); + private NoNodeAvailableException( + String message, ExecutionInfo executionInfo, Queue queryPlan) { + super(message, executionInfo, Collections.emptySet(), queryPlan); + } + + public NoNodeAvailableException(Queue queryPlan) { + this(buildMessage(queryPlan), null, queryPlan); + } + + private static String buildMessage(Queue queryPlan) { + if (queryPlan == null) { + return "No node was available to execute the query"; + } + return "No node was available to execute the query. Query Plan: " + queryPlan; } @NonNull @Override public DriverException copy() { - return new NoNodeAvailableException(getExecutionInfo()); + return new NoNodeAvailableException(getMessage(), getExecutionInfo(), getQueryPlan()); } } diff --git a/core/src/main/java/com/datastax/oss/driver/api/core/config/DefaultDriverOption.java b/core/src/main/java/com/datastax/oss/driver/api/core/config/DefaultDriverOption.java index 55e8d53dc66..93a6705ab83 100644 --- a/core/src/main/java/com/datastax/oss/driver/api/core/config/DefaultDriverOption.java +++ b/core/src/main/java/com/datastax/oss/driver/api/core/config/DefaultDriverOption.java @@ -141,6 +141,12 @@ public enum DefaultDriverOption implements DriverOption { *

Value-type: boolean */ CONNECTION_WARN_INIT_ERROR("advanced.connection.warn-on-init-error"), + /** + * Provide more details when query execution has failed. + * + *

Value-Type: boolean + */ + CONNECTION_QUERY_PLAN_EXCEPTIONS("advanced.connection.detailed-query-plan-exceptions"), /** * The number of connections in the LOCAL pool. * diff --git a/core/src/main/java/com/datastax/oss/driver/api/core/config/OptionsMap.java b/core/src/main/java/com/datastax/oss/driver/api/core/config/OptionsMap.java index 8906e1dd349..a397995b077 100644 --- a/core/src/main/java/com/datastax/oss/driver/api/core/config/OptionsMap.java +++ b/core/src/main/java/com/datastax/oss/driver/api/core/config/OptionsMap.java @@ -381,6 +381,7 @@ protected static void fillWithDriverDefaults(OptionsMap map) { map.put(TypedDriverOption.LOAD_BALANCING_DC_FAILOVER_MAX_NODES_PER_REMOTE_DC, 0); map.put(TypedDriverOption.LOAD_BALANCING_DC_FAILOVER_ALLOW_FOR_LOCAL_CONSISTENCY_LEVELS, false); map.put(TypedDriverOption.METRICS_GENERATE_AGGREGABLE_HISTOGRAMS, true); + map.put(TypedDriverOption.CONNECTION_QUERY_PLAN_EXCEPTIONS, false); } @Immutable diff --git a/core/src/main/java/com/datastax/oss/driver/api/core/config/TypedDriverOption.java b/core/src/main/java/com/datastax/oss/driver/api/core/config/TypedDriverOption.java index 9be69d0424f..c3296db28c0 100644 --- a/core/src/main/java/com/datastax/oss/driver/api/core/config/TypedDriverOption.java +++ b/core/src/main/java/com/datastax/oss/driver/api/core/config/TypedDriverOption.java @@ -889,6 +889,11 @@ public String toString() { DefaultDriverOption.LOAD_BALANCING_DC_FAILOVER_ALLOW_FOR_LOCAL_CONSISTENCY_LEVELS, GenericType.BOOLEAN); + /** TBD. */ + public static final TypedDriverOption CONNECTION_QUERY_PLAN_EXCEPTIONS = + new TypedDriverOption<>( + DefaultDriverOption.CONNECTION_QUERY_PLAN_EXCEPTIONS, GenericType.BOOLEAN); + private static Iterable> introspectBuiltInValues() { try { ImmutableList.Builder> result = ImmutableList.builder(); diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/control/ControlConnection.java b/core/src/main/java/com/datastax/oss/driver/internal/core/control/ControlConnection.java index 7e9592c64d3..96f85df6978 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/control/ControlConnection.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/control/ControlConnection.java @@ -37,6 +37,7 @@ import com.datastax.oss.driver.internal.core.metadata.NodeStateEvent; import com.datastax.oss.driver.internal.core.metadata.TopologyEvent; import com.datastax.oss.driver.internal.core.util.Loggers; +import com.datastax.oss.driver.internal.core.util.collection.DebugQueryPlan; import com.datastax.oss.driver.internal.core.util.concurrent.CompletableFutures; import com.datastax.oss.driver.internal.core.util.concurrent.Reconnection; import com.datastax.oss.driver.internal.core.util.concurrent.RunOrSchedule; @@ -357,7 +358,11 @@ private void connect( assert adminExecutor.inEventLoop(); Node node = nodes.poll(); if (node == null) { - onFailure.accept(AllNodesFailedException.fromErrors(errors)); + if (nodes instanceof DebugQueryPlan) { + onFailure.accept(AllNodesFailedException.fromErrors(errors, nodes)); + } else { + onFailure.accept(AllNodesFailedException.fromErrors(errors)); + } } else { LOG.debug("[{}] Trying to establish a connection to {}", logPrefix, node); context diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlPrepareHandler.java b/core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlPrepareHandler.java index 53f730e9d4e..85c16fb8b6c 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlPrepareHandler.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlPrepareHandler.java @@ -47,6 +47,7 @@ import com.datastax.oss.driver.internal.core.context.InternalDriverContext; import com.datastax.oss.driver.internal.core.session.DefaultSession; import com.datastax.oss.driver.internal.core.util.Loggers; +import com.datastax.oss.driver.internal.core.util.collection.DebugQueryPlan; import com.datastax.oss.driver.internal.core.util.concurrent.CompletableFutures; import com.datastax.oss.protocol.internal.Frame; import com.datastax.oss.protocol.internal.Message; @@ -197,7 +198,11 @@ private void sendRequest(PrepareRequest request, Node node, int retryCount) { } } if (channel == null) { - setFinalError(AllNodesFailedException.fromErrors(this.errors)); + if (queryPlan instanceof DebugQueryPlan) { + setFinalError(AllNodesFailedException.fromErrors(this.errors, queryPlan)); + } else { + setFinalError(AllNodesFailedException.fromErrors(this.errors)); + } } else { InitialPrepareCallback initialPrepareCallback = new InitialPrepareCallback(request, node, channel, retryCount); diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandler.java b/core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandler.java index b0cfff488f8..06012c17a82 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandler.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandler.java @@ -72,6 +72,7 @@ import com.datastax.oss.driver.internal.core.tracker.NoopRequestTracker; import com.datastax.oss.driver.internal.core.tracker.RequestLogger; import com.datastax.oss.driver.internal.core.util.Loggers; +import com.datastax.oss.driver.internal.core.util.collection.DebugQueryPlan; import com.datastax.oss.driver.internal.core.util.collection.SimpleQueryPlan; import com.datastax.oss.protocol.internal.Frame; import com.datastax.oss.protocol.internal.Message; @@ -385,7 +386,12 @@ private void sendRequest( // We've reached the end of the query plan without finding any node to write to if (!result.isDone() && activeExecutionsCount.decrementAndGet() == 0) { // We're the last execution so fail the result - setFinalError(statement, AllNodesFailedException.fromErrors(this.errors), null, -1); + if (queryPlan instanceof DebugQueryPlan) { + setFinalError( + statement, AllNodesFailedException.fromErrors(this.errors, queryPlan), null, -1); + } else { + setFinalError(statement, AllNodesFailedException.fromErrors(this.errors), null, -1); + } } } else { NodeResponseCallback nodeResponseCallback = diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/BasicLoadBalancingPolicy.java b/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/BasicLoadBalancingPolicy.java index 996c967b7f8..7c8d37be7e8 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/BasicLoadBalancingPolicy.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/BasicLoadBalancingPolicy.java @@ -48,23 +48,28 @@ import com.datastax.oss.driver.internal.core.loadbalancing.nodeset.DcAgnosticNodeSet; import com.datastax.oss.driver.internal.core.loadbalancing.nodeset.MultiDcNodeSet; import com.datastax.oss.driver.internal.core.loadbalancing.nodeset.NodeSet; +import com.datastax.oss.driver.internal.core.loadbalancing.nodeset.NodeSetInfo; import com.datastax.oss.driver.internal.core.loadbalancing.nodeset.SingleDcNodeSet; import com.datastax.oss.driver.internal.core.metadata.token.TokenLong64; +import com.datastax.oss.driver.internal.core.metadata.DefaultNodeInfo; import com.datastax.oss.driver.internal.core.util.ArrayUtils; import com.datastax.oss.driver.internal.core.util.collection.CompositeQueryPlan; +import com.datastax.oss.driver.internal.core.util.collection.DebugQueryPlan; import com.datastax.oss.driver.internal.core.util.collection.LazyQueryPlan; import com.datastax.oss.driver.internal.core.util.collection.QueryPlan; import com.datastax.oss.driver.internal.core.util.collection.SimpleQueryPlan; import com.datastax.oss.driver.shaded.guava.common.base.Predicates; import edu.umd.cs.findbugs.annotations.NonNull; import edu.umd.cs.findbugs.annotations.Nullable; +import java.io.Serializable; import java.nio.ByteBuffer; import java.util.Collections; -import java.util.Map; +import java.util.HashSet; import java.util.Objects; +import java.util.Set; +import java.util.Map; import java.util.Optional; import java.util.Queue; -import java.util.Set; import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.IntUnaryOperator; @@ -88,7 +93,7 @@ * } * * - * See {@code reference.conf} (in the manual or core driver JAR) for more details. + *

See {@code reference.conf} (in the manual or core driver JAR) for more details. * *

Local datacenter: This implementation will only define a local datacenter if it is * explicitly set either through configuration or programmatically; if the local datacenter is @@ -121,6 +126,7 @@ public class BasicLoadBalancingPolicy implements LoadBalancingPolicy { private final int maxNodesPerRemoteDc; private final boolean allowDcFailoverForLocalCl; + protected final boolean detailedQueryPlanExceptions; private final ConsistencyLevel defaultConsistencyLevel; // private because they should be set in init() and never be modified after @@ -139,6 +145,8 @@ public BasicLoadBalancingPolicy(@NonNull DriverContext context, @NonNull String allowDcFailoverForLocalCl = profile.getBoolean( DefaultDriverOption.LOAD_BALANCING_DC_FAILOVER_ALLOW_FOR_LOCAL_CONSISTENCY_LEVELS); + detailedQueryPlanExceptions = + profile.getBoolean(DefaultDriverOption.CONNECTION_QUERY_PLAN_EXCEPTIONS); defaultConsistencyLevel = this.context .getConsistencyLevelRegistry() @@ -249,12 +257,19 @@ protected NodeDistanceEvaluator createNodeDistanceEvaluator( @NonNull @Override public Queue newQueryPlan(@Nullable Request request, @Nullable Session session) { + BasicLoadBalancingPolicyDebugInfo debugInfo = + detailedQueryPlanExceptions ? new BasicLoadBalancingPolicyDebugInfo(this) : null; + // Take a snapshot since the set is concurrent: Object[] currentNodes = liveNodes.dc(localDc).toArray(); Set allReplicas = getReplicas(request, session); int replicaCount = 0; // in currentNodes + if (debugInfo != null) { + debugInfo.withReplicas(allReplicas); + } + if (!allReplicas.isEmpty()) { // Move replicas to the beginning for (int i = 0; i < currentNodes.length; i++) { @@ -280,7 +295,13 @@ public Queue newQueryPlan(@Nullable Request request, @Nullable Session ses roundRobinAmount.getAndUpdate(INCREMENT)); QueryPlan plan = currentNodes.length == 0 ? QueryPlan.EMPTY : new SimpleQueryPlan(currentNodes); - return maybeAddDcFailover(request, plan); + Queue finalPlan = maybeAddDcFailover(request, plan); + if (debugInfo != null) { + DebugQueryPlan debugPlan = new DebugQueryPlan(finalPlan); + debugPlan.setLoadBalancingPolicyInfo(debugInfo); + return debugPlan; + } + return finalPlan; } @NonNull @@ -479,4 +500,44 @@ protected NodeDistance computeNodeDistance(@NonNull Node node) { public void close() { // nothing to do } + + protected static class BasicLoadBalancingPolicyDebugInfo implements Serializable { + protected int maxNodesPerRemoteDc; + protected boolean allowDcFailoverForLocalCl; + protected ConsistencyLevel defaultConsistencyLevel; + protected String localDc; + protected String localRack; + protected NodeSetInfo liveNodes; + protected Set replicas = null; + + protected BasicLoadBalancingPolicyDebugInfo(BasicLoadBalancingPolicy policy) { + this.maxNodesPerRemoteDc = policy.maxNodesPerRemoteDc; + this.allowDcFailoverForLocalCl = policy.allowDcFailoverForLocalCl; + this.defaultConsistencyLevel = policy.defaultConsistencyLevel; + this.localDc = policy.localDc; + this.localRack = policy.localRack; + this.liveNodes = policy.liveNodes.toInfo(); + } + + @Override + public String toString() { + return String.format( + "BasicLoadBalancingPolicyDebugInfo{localDc: %s, localRack: %s, liveNodes: %s, replicas: %s" + + "defaultConsistencyLevel: %s, allowDcFailoverForLocalCl: %s, maxNodesPerRemoteDc: %s}", + localDc, + localRack, + liveNodes, + replicas, + defaultConsistencyLevel, + allowDcFailoverForLocalCl, + maxNodesPerRemoteDc); + } + + public void withReplicas(Set nodes) { + replicas = new HashSet<>(); + for (Node node : nodes) { + replicas.add(new DefaultNodeInfo.Builder(node).build()); + } + } + } } diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/DefaultLoadBalancingPolicy.java b/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/DefaultLoadBalancingPolicy.java index 4f31ff012c1..390ce903d3a 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/DefaultLoadBalancingPolicy.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/DefaultLoadBalancingPolicy.java @@ -28,14 +28,17 @@ import com.datastax.oss.driver.api.core.session.Session; import com.datastax.oss.driver.api.core.tracker.RequestTracker; import com.datastax.oss.driver.internal.core.loadbalancing.helper.MandatoryLocalDcHelper; +import com.datastax.oss.driver.internal.core.loadbalancing.nodeset.LazyCopyQueryPlan; import com.datastax.oss.driver.internal.core.pool.ChannelPool; import com.datastax.oss.driver.internal.core.session.DefaultSession; import com.datastax.oss.driver.internal.core.util.ArrayUtils; +import com.datastax.oss.driver.internal.core.util.collection.DebugQueryPlan; import com.datastax.oss.driver.internal.core.util.collection.QueryPlan; import com.datastax.oss.driver.internal.core.util.collection.SimpleQueryPlan; import edu.umd.cs.findbugs.annotations.NonNull; import edu.umd.cs.findbugs.annotations.Nullable; import java.util.BitSet; +import java.util.HashMap; import java.util.Map; import java.util.Objects; import java.util.Optional; @@ -64,7 +67,7 @@ * } * * - * See {@code reference.conf} (in the manual or core driver JAR) for more details. + *

See {@code reference.conf} (in the manual or core driver JAR) for more details. * *

Local datacenter: This implementation requires a local datacenter to be defined, * otherwise it will throw an {@link IllegalStateException}. A local datacenter can be supplied @@ -126,8 +129,16 @@ protected Optional discoverLocalDc(@NonNull Map nodes) { @NonNull @Override public Queue newQueryPlan(@Nullable Request request, @Nullable Session session) { + DefaultLoadBalancingPolicyDebugInfo debugInfo = + detailedQueryPlanExceptions ? new DefaultLoadBalancingPolicyDebugInfo(this) : null; if (!avoidSlowReplicas) { - return super.newQueryPlan(request, session); + Queue basicPlan = super.newQueryPlan(request, session); + if (debugInfo == null) { + return basicPlan; + } + DebugQueryPlan debugPlan = new DebugQueryPlan(basicPlan); + debugPlan.setLoadBalancingPolicyInfo(debugInfo); + return debugPlan; } // Take a snapshot since the set is concurrent: @@ -246,8 +257,20 @@ > getInFlight((Node) currentNodes[1], session)) { currentNodes.length - replicaCount, roundRobinAmount.getAndUpdate(INCREMENT)); + if (debugInfo == null) { + QueryPlan plan = + currentNodes.length == 0 ? QueryPlan.EMPTY : new SimpleQueryPlan(currentNodes); + return maybeAddDcFailover(request, plan); + } + QueryPlan plan = currentNodes.length == 0 ? QueryPlan.EMPTY : new SimpleQueryPlan(currentNodes); - return maybeAddDcFailover(request, plan); + plan = new LazyCopyQueryPlan(plan); + Queue finalPlan = maybeAddDcFailover(request, plan); + + DebugQueryPlan debugPlan = new DebugQueryPlan(finalPlan); + debugPlan.setLocalPlan(plan); + debugPlan.setLoadBalancingPolicyInfo(debugInfo); + return debugPlan; } @Override @@ -334,4 +357,35 @@ protected int getInFlight(@NonNull Node node, @NonNull Session session) { // processing them). return (pool == null) ? 0 : pool.getInFlight(); } + + private static class DefaultLoadBalancingPolicyDebugInfo + extends BasicLoadBalancingPolicyDebugInfo { + protected final Map responseTimes = new HashMap<>(); + protected final Map upTimes = new HashMap<>(); + private final boolean avoidSlowReplicas; + + public DefaultLoadBalancingPolicyDebugInfo(DefaultLoadBalancingPolicy policy) { + super(policy); + responseTimes.putAll(policy.responseTimes); + upTimes.putAll(policy.upTimes); + avoidSlowReplicas = policy.avoidSlowReplicas; + } + + @Override + public String toString() { + return String.format( + "DefaultLoadBalancingPolicyDebugInfo{localDc: %s, localRack: %s, liveNodes: %s, replicas: %s" + + ", defaultConsistencyLevel: %s, allowDcFailoverForLocalCl: %s, maxNodesPerRemoteDc: %s, responseTimes: %s, upTimes: %s, avoidSlowReplicas: %s}", + localDc, + localRack, + liveNodes, + replicas, + defaultConsistencyLevel, + allowDcFailoverForLocalCl, + maxNodesPerRemoteDc, + responseTimes, + upTimes, + avoidSlowReplicas); + } + } } diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/nodeset/DcAgnosticNodeSet.java b/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/nodeset/DcAgnosticNodeSet.java index 2a6e79023de..816ce3d6b68 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/nodeset/DcAgnosticNodeSet.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/nodeset/DcAgnosticNodeSet.java @@ -18,9 +18,11 @@ package com.datastax.oss.driver.internal.core.loadbalancing.nodeset; import com.datastax.oss.driver.api.core.metadata.Node; +import com.datastax.oss.driver.internal.core.metadata.DefaultNodeInfo; import edu.umd.cs.findbugs.annotations.NonNull; import edu.umd.cs.findbugs.annotations.Nullable; import java.util.Collections; +import java.util.HashSet; import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; import net.jcip.annotations.ThreadSafe; @@ -50,4 +52,36 @@ public Set dc(@Nullable String dc) { public Set dcs() { return Collections.emptySet(); } + + @Override + public NodeSetInfo toInfo() { + return new DcAgnosticNodeSetInfo(nodes); + } + + private static class DcAgnosticNodeSetInfo implements NodeSetInfo { + private final HashSet nodes; + + private DcAgnosticNodeSetInfo(Set nodes) { + this.nodes = new HashSet<>(); + for (Node node : nodes) { + this.nodes.add(new DefaultNodeInfo.Builder(node).build()); + } + } + + @Override + @NonNull + public Set dc(@Nullable String dc) { + return nodes; + } + + @Override + public String toString() { + return "DcAgnosticNodeSet(nodes: " + nodes.toString() + ")"; + } + + @Override + public Set dcs() { + return Collections.emptySet(); + } + } } diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/nodeset/LazyCopyQueryPlan.java b/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/nodeset/LazyCopyQueryPlan.java new file mode 100644 index 00000000000..8b00d7bfcd9 --- /dev/null +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/nodeset/LazyCopyQueryPlan.java @@ -0,0 +1,82 @@ +package com.datastax.oss.driver.internal.core.loadbalancing.nodeset; + +import com.datastax.oss.driver.api.core.metadata.Node; +import com.datastax.oss.driver.internal.core.metadata.DefaultNodeInfo; +import com.datastax.oss.driver.internal.core.util.collection.QueryPlan; +import edu.umd.cs.findbugs.annotations.NonNull; +import edu.umd.cs.findbugs.annotations.Nullable; +import jnr.ffi.annotations.Synchronized; +import net.jcip.annotations.ThreadSafe; +import java.util.AbstractQueue; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Queue; +import java.util.NoSuchElementException; + +@ThreadSafe +public class LazyCopyQueryPlan extends AbstractQueue implements QueryPlan { + private final Queue originalPlan; + private final List itemsPulled = Collections.synchronizedList(new ArrayList<>()); + + public LazyCopyQueryPlan(@NonNull Queue originalPlan) { + this.originalPlan = originalPlan; + } + + @Nullable + @Override + @Synchronized + public Node poll() { + Node node = originalPlan.poll(); + if (node == null) { + return null; + } + this.itemsPulled.add(new DefaultNodeInfo.Builder(node).build()); + return node; + } + + @NonNull + @Override + @Synchronized + public Iterator iterator() { + return new NodeIterator(); + } + + @Override + @Synchronized + public int size() { + return originalPlan.size(); + } + + @Override + @Synchronized + public String toString() { + List inQueue = new ArrayList<>(); + for (Node node : originalPlan) { + inQueue.add(new DefaultNodeInfo.Builder(node).build()); + } + + return String.format( + "%s(inQueue: %s, itemsPulled: %s)", + originalPlan.getClass().getName(), inQueue, itemsPulled); + } + + private class NodeIterator implements Iterator { + @Override + @Synchronized + public boolean hasNext() { + return !originalPlan.isEmpty(); + } + + @Override + @Synchronized + public Node next() { + Node next = poll(); + if (next == null) { + throw new NoSuchElementException(); + } + return next; + } + } +} diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/nodeset/MultiDcNodeSet.java b/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/nodeset/MultiDcNodeSet.java index 37f02bec878..255b8c38e80 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/nodeset/MultiDcNodeSet.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/nodeset/MultiDcNodeSet.java @@ -18,9 +18,13 @@ package com.datastax.oss.driver.internal.core.loadbalancing.nodeset; import com.datastax.oss.driver.api.core.metadata.Node; +import com.datastax.oss.driver.internal.core.metadata.DefaultNodeInfo; import edu.umd.cs.findbugs.annotations.NonNull; import edu.umd.cs.findbugs.annotations.Nullable; + import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -82,6 +86,11 @@ public Set dcs() { return nodes.keySet(); } + @Override + public NodeSetInfo toInfo() { + return new MultiDcNodeSetInfo(nodes); + } + @NonNull private String getMapKey(@NonNull Node node) { return getMapKey(node.getDatacenter()); @@ -91,4 +100,38 @@ private String getMapKey(@NonNull Node node) { private String getMapKey(@Nullable String dc) { return dc == null ? UNKNOWN_DC : dc; } + + private static class MultiDcNodeSetInfo implements NodeSetInfo { + private final Map> nodes; + + private MultiDcNodeSetInfo(Map> nodes) { + this.nodes = new HashMap<>(); + for (Map.Entry> entry : nodes.entrySet()) { + Set dcNodes = this.nodes.getOrDefault(entry.getKey(), null); + if (dcNodes == null) { + dcNodes = new HashSet<>(); + this.nodes.put(entry.getKey(), dcNodes); + } + for (Node node : entry.getValue()) { + dcNodes.add(new DefaultNodeInfo.Builder(node).build()); + } + } + } + + @Override + public String toString() { + return "MultiDcNodeSetInfo(nodes: " + nodes.toString() + ")"; + } + + @NonNull + @Override + public Set dc(@Nullable String dc) { + return Collections.emptySet(); + } + + @Override + public Set dcs() { + return Collections.emptySet(); + } + } } diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/nodeset/NodeSet.java b/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/nodeset/NodeSet.java index 66460e16a7c..be7a53586b3 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/nodeset/NodeSet.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/nodeset/NodeSet.java @@ -68,4 +68,7 @@ public interface NodeSet { * disabled, this method returns an empty set. */ Set dcs(); + + /** Returns deep copy of the NodeSet. */ + NodeSetInfo toInfo(); } diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/nodeset/NodeSetInfo.java b/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/nodeset/NodeSetInfo.java new file mode 100644 index 00000000000..06da376d721 --- /dev/null +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/nodeset/NodeSetInfo.java @@ -0,0 +1,47 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datastax.oss.driver.internal.core.loadbalancing.nodeset; + +import com.datastax.oss.driver.internal.core.metadata.DefaultNodeInfo; +import edu.umd.cs.findbugs.annotations.NonNull; +import edu.umd.cs.findbugs.annotations.Nullable; +import java.io.Serializable; +import java.util.Set; + +/** + * A thread-safe abstraction around a map of nodes per datacenter, to facilitate node management by + * load balancing policies. + */ +public interface NodeSetInfo extends Serializable { + /** + * Returns the current nodes in the given datacenter. + * + *

If this set was initialized with datacenter awareness, the returned set will contain only + * nodes pertaining to the given datacenter; otherwise, the given datacenter name is ignored and + * the returned set will contain all nodes in the cluster. + * + * @param dc The datacenter name, or null if the datacenter name is not known, or irrelevant. + * @return the current nodes in the given datacenter. + */ + @NonNull + Set dc(@Nullable String dc); + + /** + * Returns the current datacenter names known to this set. If datacenter awareness has been + * disabled, this method returns an empty set. + */ + Set dcs(); +} diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/nodeset/SingleDcNodeSet.java b/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/nodeset/SingleDcNodeSet.java index 21c89d46927..a5b208b5caf 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/nodeset/SingleDcNodeSet.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/nodeset/SingleDcNodeSet.java @@ -18,10 +18,12 @@ package com.datastax.oss.driver.internal.core.loadbalancing.nodeset; import com.datastax.oss.driver.api.core.metadata.Node; +import com.datastax.oss.driver.internal.core.metadata.DefaultNodeInfo; import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableSet; import edu.umd.cs.findbugs.annotations.NonNull; import edu.umd.cs.findbugs.annotations.Nullable; import java.util.Collections; +import java.util.HashSet; import java.util.Objects; import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; @@ -69,4 +71,44 @@ public Set dc(@Nullable String dc) { public Set dcs() { return dcs; } + + @Override + public NodeSetInfo toInfo() { + return new SingleDcNodeSetInfo(dc, dcs, nodes); + } + + private static class SingleDcNodeSetInfo implements NodeSetInfo { + private final Set nodes = new HashSet<>(); + + private final String dc; + private final Set dcs; + + private SingleDcNodeSetInfo(String dc, Set dcs, Set nodes) { + this.dc = dc; + this.dcs = dcs; + + for (Node node : nodes) { + this.nodes.add(new DefaultNodeInfo.Builder(node).build()); + } + } + + @Override + @NonNull + public Set dc(@Nullable String dc) { + if (Objects.equals(this.dc, dc)) { + return nodes; + } + return Collections.emptySet(); + } + + @Override + public Set dcs() { + return dcs; + } + + @Override + public String toString() { + return "SingleDcNodeSetInfo(dc:" + dc + ", dcs: " + dcs + ", nodes: " + nodes + ")"; + } + } } diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/DefaultNodeInfo.java b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/DefaultNodeInfo.java index 8908f0be078..366b648026b 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/DefaultNodeInfo.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/DefaultNodeInfo.java @@ -17,7 +17,10 @@ */ package com.datastax.oss.driver.internal.core.metadata; +import com.datastax.oss.driver.api.core.Version; +import com.datastax.oss.driver.api.core.loadbalancing.NodeDistance; import com.datastax.oss.driver.api.core.metadata.EndPoint; +import com.datastax.oss.driver.api.core.metadata.Node; import edu.umd.cs.findbugs.annotations.NonNull; import edu.umd.cs.findbugs.annotations.Nullable; import java.net.InetSocketAddress; @@ -48,6 +51,9 @@ public static Builder builder() { private final Map extras; private final UUID hostId; private final UUID schemaVersion; + private final boolean isReconnecting; + private final NodeDistance distance; + private final int openConnections; private DefaultNodeInfo(Builder builder) { this.endPoint = builder.endPoint; @@ -62,6 +68,9 @@ private DefaultNodeInfo(Builder builder) { this.hostId = builder.hostId; this.schemaVersion = builder.schemaVersion; this.extras = (builder.extras == null) ? Collections.emptyMap() : builder.extras; + this.isReconnecting = builder.isReconnecting; + this.distance = builder.distance; + this.openConnections = builder.openConnections; } @NonNull @@ -129,6 +138,40 @@ public UUID getSchemaVersion() { return schemaVersion; } + @Override + public NodeDistance getDistance() { + return distance; + } + + @Override + public boolean isReconnecting() { + return isReconnecting; + } + + @Override + public int getOpenConnections() { + return openConnections; + } + + @Override + public String toString() { + return String.format( + "DefaultNodeInfo(hostId: %s, endPoint: %s, datacenter: %s, rack: %s, distance: %s, schemaVersion: %s, " + + "broadcastRpcAddress: %s, broadcastAddress: %s, listenAddress: %s, partitioner: %s, isReconnecting: %b, openConnections: %d)", + hostId, + endPoint, + datacenter, + rack, + distance, + schemaVersion, + broadcastRpcAddress, + broadcastAddress, + listenAddress, + partitioner, + isReconnecting, + openConnections); + } + @NotThreadSafe public static class Builder { private EndPoint endPoint; @@ -143,6 +186,9 @@ public static class Builder { private Map extras; private UUID hostId; private UUID schemaVersion; + private boolean isReconnecting; + private NodeDistance distance; + private int openConnections; public Builder withEndPoint(@NonNull EndPoint endPoint) { this.endPoint = endPoint; @@ -199,6 +245,50 @@ public Builder withSchemaVersion(@Nullable UUID schemaVersion) { return this; } + public Builder withReconnecting(boolean isReconnecting) { + this.isReconnecting = isReconnecting; + return this; + } + + public Builder withOpenConnections(int openConnections) { + this.openConnections = openConnections; + return this; + } + + public Builder withDistance(NodeDistance distance) { + this.distance = distance; + return this; + } + + public Builder() {} + + public Builder(Node node) { + this.withEndPoint(node.getEndPoint()); + this.withBroadcastAddress(node.getBroadcastAddress().orElse(null)); + this.withBroadcastRpcAddress(node.getBroadcastRpcAddress().orElse(null)); + this.withListenAddress(node.getListenAddress().orElse(null)); + this.withDatacenter(node.getDatacenter()); + this.withReconnecting(node.isReconnecting()); + this.withDistance(node.getDistance()); + this.withRack(node.getRack()); + this.withOpenConnections(node.getOpenConnections()); + + Version cassandraVersion = node.getCassandraVersion(); + if (cassandraVersion != null) { + this.withCassandraVersion(cassandraVersion.toString()); + } + + UUID hostID = node.getHostId(); + if (hostID != null) { + this.withHostId(hostID); + } + + UUID schemaVersion = node.getSchemaVersion(); + if (schemaVersion != null) { + this.withSchemaVersion(schemaVersion); + } + } + public Builder withExtra(@NonNull String key, @Nullable Object value) { if (value != null) { if (this.extras == null) { diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/LoadBalancingPolicyWrapper.java b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/LoadBalancingPolicyWrapper.java index 20d045d4e72..2e8bfb25e91 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/LoadBalancingPolicyWrapper.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/LoadBalancingPolicyWrapper.java @@ -17,6 +17,7 @@ */ package com.datastax.oss.driver.internal.core.metadata; +import com.datastax.oss.driver.api.core.config.DefaultDriverOption; import com.datastax.oss.driver.api.core.config.DriverExecutionProfile; import com.datastax.oss.driver.api.core.loadbalancing.LoadBalancingPolicy; import com.datastax.oss.driver.api.core.loadbalancing.NodeDistance; @@ -26,14 +27,18 @@ import com.datastax.oss.driver.api.core.session.Request; import com.datastax.oss.driver.api.core.session.Session; import com.datastax.oss.driver.internal.core.context.InternalDriverContext; +import com.datastax.oss.driver.internal.core.util.collection.DebugQueryPlan; +import com.datastax.oss.driver.internal.core.util.collection.QueryPlan; import com.datastax.oss.driver.internal.core.util.concurrent.ReplayingEventFilter; import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableMap; import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableSet; import edu.umd.cs.findbugs.annotations.NonNull; import edu.umd.cs.findbugs.annotations.Nullable; +import java.io.Serializable; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Queue; @@ -75,6 +80,7 @@ private enum State { private final Set policies; private final Map policiesPerProfile; private final Map reporters; + private final boolean detailedQueryPlanExceptions; private final Lock distancesLock = new ReentrantLock(); @@ -94,6 +100,13 @@ public LoadBalancingPolicyWrapper( this.context = context; this.policiesPerProfile = policiesPerProfile; + + detailedQueryPlanExceptions = + context + .getConfig() + .getDefaultProfile() + .getBoolean(DefaultDriverOption.CONNECTION_QUERY_PLAN_EXCEPTIONS); + ImmutableMap.Builder reportersBuilder = ImmutableMap.builder(); // ImmutableMap.values does not remove duplicates, do it now so that we won't invoke a policy @@ -142,13 +155,22 @@ public void init() { @NonNull public Queue newQueryPlan( @Nullable Request request, @NonNull String executionProfileName, @Nullable Session session) { - switch (stateRef.get()) { + State state = stateRef.get(); + switch (state) { case BEFORE_INIT: case DURING_INIT: - // The contact points are not stored in the metadata yet: List nodes = new ArrayList<>(context.getMetadataManager().getContactPoints()); Collections.shuffle(nodes); - return new ConcurrentLinkedQueue<>(nodes); + Queue plan = new ConcurrentLinkedQueue<>(nodes); + if (!detailedQueryPlanExceptions) { + // The contact points are not stored in the metadata yet: + return plan; + } + DebugQueryPlan debugPlan = new DebugQueryPlan(plan); + LoadBalancingPolicyWrapperInfo wrapperInfo = + new LoadBalancingPolicyWrapperInfo(this, state); + debugPlan.setLoadBalancingPolicyWrapperInfo(wrapperInfo); + return debugPlan; case RUNNING: LoadBalancingPolicy policy = policiesPerProfile.get(executionProfileName); if (policy == null) { @@ -156,7 +178,14 @@ public Queue newQueryPlan( } return policy.newQueryPlan(request, session); default: - return new ConcurrentLinkedQueue<>(); + if (!detailedQueryPlanExceptions) { + return QueryPlan.EMPTY; + } + plan = QueryPlan.EMPTY; + debugPlan = new DebugQueryPlan(plan); + wrapperInfo = new LoadBalancingPolicyWrapperInfo(this, state); + debugPlan.setLoadBalancingPolicyWrapperInfo(wrapperInfo); + return debugPlan; } } @@ -270,4 +299,25 @@ private NodeDistance aggregate(Map distances) return minimum; } } + + private static class LoadBalancingPolicyWrapperInfo implements Serializable { + private final InternalDriverContext context; + private final Set policies; + private final Map policiesPerProfile; + private final State state; + + private LoadBalancingPolicyWrapperInfo(LoadBalancingPolicyWrapper wrapper, State state) { + this.context = wrapper.context; + this.policies = new HashSet<>(); + this.policiesPerProfile = new HashMap<>(); + this.state = state; + } + + @Override + public String toString() { + return String.format( + "LoadBalancingPolicyWrapperInfo(state: %s, context: %s, policies: %s, policiesPerProfile: %s)", + state, context, policies, policiesPerProfile); + } + } } diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/NodeInfo.java b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/NodeInfo.java index 6a9651d8376..3cdbf8d233c 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/NodeInfo.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/NodeInfo.java @@ -158,4 +158,28 @@ public interface NodeInfo { */ @Nullable UUID getSchemaVersion(); + + /** + * The current node distance. + * + *

This is not required; the driver reports it in {@link Node#getDistance()}, but for + * informational purposes only. It is not used anywhere internally. + */ + NodeDistance getDistance(); + + /** + * The current reconnecting status. + * + *

This is not required; the driver reports it in {@link Node#isReconnecting()}, but for + * informational purposes only. It is not used anywhere internally. + */ + boolean isReconnecting(); + + /** + * The current number of opened connections to the host. + * + *

This is not required; the driver reports it in {@link Node#getOpenConnections()}, but for + * informational purposes only. It is not used anywhere internally. + */ + int getOpenConnections(); } diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/util/collection/DebugQueryPlan.java b/core/src/main/java/com/datastax/oss/driver/internal/core/util/collection/DebugQueryPlan.java new file mode 100644 index 00000000000..db9266289ca --- /dev/null +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/util/collection/DebugQueryPlan.java @@ -0,0 +1,89 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datastax.oss.driver.internal.core.util.collection; + +import com.datastax.oss.driver.api.core.metadata.Node; +import com.datastax.oss.driver.internal.core.loadbalancing.nodeset.LazyCopyQueryPlan; +import edu.umd.cs.findbugs.annotations.NonNull; +import java.io.Serializable; +import java.util.AbstractQueue; +import java.util.Iterator; +import java.util.Queue; + +import net.jcip.annotations.ThreadSafe; + +/** A query plan that attaches debug information to original query plan. */ +@ThreadSafe +public class DebugQueryPlan extends AbstractQueue implements QueryPlan { + private final Queue plan; + private Queue localPlan; + private Serializable policyInfo; + private Serializable policyWrapperInfo; + + public DebugQueryPlan(@NonNull Queue originalPlan) { + if (originalPlan instanceof LazyCopyQueryPlan) { + this.plan = originalPlan; + } else { + this.plan = new LazyCopyQueryPlan(originalPlan); + } + } + + @Override + public Iterator iterator() { + return this.plan.iterator(); + } + + @Override + public int size() { + return this.plan.size(); + } + + public QueryPlan setLocalPlan(@NonNull QueryPlan plan) { + LazyCopyQueryPlan copyPlan = new LazyCopyQueryPlan(plan); + this.localPlan = copyPlan; + return copyPlan; + } + + public void setLoadBalancingPolicyInfo(Serializable policyInfo) { + this.policyInfo = policyInfo; + } + + public void setLoadBalancingPolicyWrapperInfo(Serializable policyWrapperInfo) { + this.policyWrapperInfo = policyWrapperInfo; + } + + @Override + public String toString() { + StringBuilder result = new StringBuilder(); + result.append(this.getClass().getName()).append("(plan: ").append(plan); + if (this.policyInfo != null) { + result.append(", policy: ").append(policyInfo); + } + if (this.policyWrapperInfo != null) { + result.append(", wrapper: ").append(policyWrapperInfo); + } + if (this.localPlan != null) { + result.append(", localPlan: ").append(localPlan); + } + result.append(")"); + return result.toString(); + } + + @Override + public Node poll() { + return plan.poll(); + } +} diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/util/collection/EmptyQueryPlan.java b/core/src/main/java/com/datastax/oss/driver/internal/core/util/collection/EmptyQueryPlan.java index 53177147695..9dfb4622d5a 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/util/collection/EmptyQueryPlan.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/util/collection/EmptyQueryPlan.java @@ -42,4 +42,9 @@ public Iterator iterator() { public int size() { return 0; } + + @Override + public String toString() { + return "EmptyQueryPlan"; + } } diff --git a/core/src/main/resources/reference.conf b/core/src/main/resources/reference.conf index 75bed97e498..423614558ed 100644 --- a/core/src/main/resources/reference.conf +++ b/core/src/main/resources/reference.conf @@ -535,6 +535,12 @@ datastax-java-driver { # change. # Overridable in a profile: no warn-on-init-error = true + + # TBD: Add more details here + # Required: no + # Modifiable at runtime: no + # Overridable in a profile: yes + detailed-query-plan-exceptions = false } # Advanced options for the built-in load-balancing policies. diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/loadbalancing/nodeset/LazyCopyQueryPlanTest.java b/core/src/test/java/com/datastax/oss/driver/internal/core/loadbalancing/nodeset/LazyCopyQueryPlanTest.java new file mode 100644 index 00000000000..586f9c2938c --- /dev/null +++ b/core/src/test/java/com/datastax/oss/driver/internal/core/loadbalancing/nodeset/LazyCopyQueryPlanTest.java @@ -0,0 +1,54 @@ +package com.datastax.oss.driver.internal.core.loadbalancing.nodeset; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import com.datastax.oss.driver.api.core.metadata.Node; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import org.junit.Test; + +public class LazyCopyQueryPlanTest { + @Test + public void toString_returns_proper_results_after_poll() { + Queue original = new ConcurrentLinkedQueue(); + original.add(mockNode("dc1")); + original.add(mockNode("dc2")); + Queue lc = new LazyCopyQueryPlan(original); + assertThat(lc.toString()) + .isEqualTo( + "java.util.concurrent.ConcurrentLinkedQueue(inQueue: [" + + "DefaultNodeInfo(hostId: null, endPoint: null, datacenter: dc1, rack: null, distance: null, schemaVersion: null, broadcastRpcAddress: null, broadcastAddress: null, listenAddress: null, partitioner: null, isReconnecting: false, openConnections: 0), " + + "DefaultNodeInfo(hostId: null, endPoint: null, datacenter: dc2, rack: null, distance: null, schemaVersion: null, broadcastRpcAddress: null, broadcastAddress: null, listenAddress: null, partitioner: null, isReconnecting: false, openConnections: 0)" + + "], itemsPulled: [])"); + lc.poll(); + assertThat(lc.toString()) + .isEqualTo( + "java.util.concurrent.ConcurrentLinkedQueue(inQueue: [" + + "DefaultNodeInfo(hostId: null, endPoint: null, datacenter: dc2, rack: null, distance: null, schemaVersion: null, broadcastRpcAddress: null, broadcastAddress: null, listenAddress: null, partitioner: null, isReconnecting: false, openConnections: 0)" + + "], itemsPulled: [" + + "DefaultNodeInfo(hostId: null, endPoint: null, datacenter: dc1, rack: null, distance: null, schemaVersion: null, broadcastRpcAddress: null, broadcastAddress: null, listenAddress: null, partitioner: null, isReconnecting: false, openConnections: 0)" + + "])"); + lc.poll(); + assertThat(lc.toString()) + .isEqualTo( + "java.util.concurrent.ConcurrentLinkedQueue(inQueue: [], itemsPulled: [" + + "DefaultNodeInfo(hostId: null, endPoint: null, datacenter: dc1, rack: null, distance: null, schemaVersion: null, broadcastRpcAddress: null, broadcastAddress: null, listenAddress: null, partitioner: null, isReconnecting: false, openConnections: 0), " + + "DefaultNodeInfo(hostId: null, endPoint: null, datacenter: dc2, rack: null, distance: null, schemaVersion: null, broadcastRpcAddress: null, broadcastAddress: null, listenAddress: null, partitioner: null, isReconnecting: false, openConnections: 0)])"); + + // Make sure that when queue is exhausted next poll does not break toString results + lc.poll(); + assertThat(lc.toString()) + .isEqualTo( + "java.util.concurrent.ConcurrentLinkedQueue(inQueue: [], itemsPulled: [" + + "DefaultNodeInfo(hostId: null, endPoint: null, datacenter: dc1, rack: null, distance: null, schemaVersion: null, broadcastRpcAddress: null, broadcastAddress: null, listenAddress: null, partitioner: null, isReconnecting: false, openConnections: 0), " + + "DefaultNodeInfo(hostId: null, endPoint: null, datacenter: dc2, rack: null, distance: null, schemaVersion: null, broadcastRpcAddress: null, broadcastAddress: null, listenAddress: null, partitioner: null, isReconnecting: false, openConnections: 0)])"); + } + + private Node mockNode(String dc) { + Node node = mock(Node.class); + when(node.getDatacenter()).thenReturn(dc); + return node; + } +} diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/metadata/LoadBalancingPolicyWrapperTest.java b/core/src/test/java/com/datastax/oss/driver/internal/core/metadata/LoadBalancingPolicyWrapperTest.java index 1a0292e3947..047622e5b3b 100644 --- a/core/src/test/java/com/datastax/oss/driver/internal/core/metadata/LoadBalancingPolicyWrapperTest.java +++ b/core/src/test/java/com/datastax/oss/driver/internal/core/metadata/LoadBalancingPolicyWrapperTest.java @@ -27,6 +27,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import com.datastax.oss.driver.api.core.config.DriverConfigLoader; import com.datastax.oss.driver.api.core.config.DriverExecutionProfile; import com.datastax.oss.driver.api.core.loadbalancing.LoadBalancingPolicy; import com.datastax.oss.driver.api.core.loadbalancing.LoadBalancingPolicy.DistanceReporter; @@ -77,6 +78,9 @@ public class LoadBalancingPolicyWrapperTest { @Mock protected MetricsFactory metricsFactory; @Captor private ArgumentCaptor> initNodesCaptor; + private static final DriverConfigLoader configLoader = + DriverConfigLoader.programmaticBuilder().build(); + private LoadBalancingPolicyWrapper wrapper; @Before @@ -103,6 +107,7 @@ public void setup() { eventBus = spy(new EventBus("test")); when(context.getEventBus()).thenReturn(eventBus); + when(context.getConfig()).thenReturn(configLoader.getInitialConfig()); wrapper = new LoadBalancingPolicyWrapper( diff --git a/test-infra/src/main/java/com/datastax/oss/driver/api/testinfra/ccm/BaseCcmRule.java b/test-infra/src/main/java/com/datastax/oss/driver/api/testinfra/ccm/BaseCcmRule.java index 2e137b08543..d0bd46ad479 100644 --- a/test-infra/src/main/java/com/datastax/oss/driver/api/testinfra/ccm/BaseCcmRule.java +++ b/test-infra/src/main/java/com/datastax/oss/driver/api/testinfra/ccm/BaseCcmRule.java @@ -186,6 +186,10 @@ public Version getCassandraVersion() { return ccmBridge.getCassandraVersion(); } + public CcmBridge getCcmBridge() { + return ccmBridge; + } + public Optional getDseVersion() { return ccmBridge.getDseVersion(); } diff --git a/test-infra/src/main/java/com/datastax/oss/driver/api/testinfra/ccm/CustomCcmRule.java b/test-infra/src/main/java/com/datastax/oss/driver/api/testinfra/ccm/CustomCcmRule.java index 79cc0f7e601..ce40bc5fe1b 100644 --- a/test-infra/src/main/java/com/datastax/oss/driver/api/testinfra/ccm/CustomCcmRule.java +++ b/test-infra/src/main/java/com/datastax/oss/driver/api/testinfra/ccm/CustomCcmRule.java @@ -63,6 +63,7 @@ protected void after() { } } + @Override public CcmBridge getCcmBridge() { return ccmBridge; }