Skip to content

Commit

Permalink
JAVA-2959: Don't throw NoNodeAvailableException when all connections …
Browse files Browse the repository at this point in the history
…busy (apache#1570)

For cases in which there are no connections available to
send requests to a Node in the query plan, collect the error
rather than silently skipping over the node. The error will
be thrown as part of an AllNodesFailedException if all nodes
fail.
This can happen when we've saturated the max in-flight
requests across all nodes or when the request is directed to
a particular node and it has no connections available (or
all its connections are saturated).
Note that in the latter case we used to throw a
NoNodeAvailableException but we now throw
AllNodesFailedException.
  • Loading branch information
akhaku authored Aug 24, 2021
1 parent 2a1e37a commit 0a7b1c8
Show file tree
Hide file tree
Showing 10 changed files with 178 additions and 6 deletions.
4 changes: 4 additions & 0 deletions changelog/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

<!-- Note: contrary to 3.x, insert new entries *first* in their section -->

### 4.14.0 (in progress)

- [improvement] JAVA-2959: Don't throw NoNodeAvailableException when all connections busy

### 4.13.0

- [improvement] JAVA-2940: Add GraalVM native image build configurations
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.datastax.oss.driver.api.core.AllNodesFailedException;
import com.datastax.oss.driver.api.core.CqlIdentifier;
import com.datastax.oss.driver.api.core.DriverTimeoutException;
import com.datastax.oss.driver.api.core.NodeUnavailableException;
import com.datastax.oss.driver.api.core.ProtocolVersion;
import com.datastax.oss.driver.api.core.RequestThrottlingException;
import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
Expand Down Expand Up @@ -348,6 +349,8 @@ private void sendRequest(
channel = session.getChannel(node, logPrefix);
if (channel != null) {
break;
} else {
recordError(node, new NodeUnavailableException(node));
}
}
}
Expand Down Expand Up @@ -455,6 +458,10 @@ CompletableFuture<ResultSetT> getPendingResult() {
}
}

private void recordError(@NonNull Node node, @NonNull Throwable error) {
errors.add(new AbstractMap.SimpleEntry<>(node, error));
}

/**
* Handles the interaction with a single node in the query plan.
*
Expand Down Expand Up @@ -1433,10 +1440,6 @@ private void reenableAutoReadIfNeeded() {

// ERROR HANDLING

private void recordError(@NonNull Node node, @NonNull Throwable error) {
errors.add(new AbstractMap.SimpleEntry<>(node, error));
}

private void trackNodeError(@NonNull Node node, @NonNull Throwable error) {
if (nodeErrorReported.compareAndSet(false, true)) {
long latencyNanos = System.nanoTime() - this.messageStartTimeNanos;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.datastax.oss.driver.api.core.AllNodesFailedException;
import com.datastax.oss.driver.api.core.DriverException;
import com.datastax.oss.driver.api.core.DriverTimeoutException;
import com.datastax.oss.driver.api.core.NodeUnavailableException;
import com.datastax.oss.driver.api.core.RequestThrottlingException;
import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
Expand Down Expand Up @@ -253,6 +254,8 @@ private void sendRequest(
channel = session.getChannel(node, logPrefix);
if (channel != null) {
break;
} else {
recordError(node, new NodeUnavailableException(node));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright DataStax, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.datastax.oss.driver.api.core;

import com.datastax.oss.driver.api.core.metadata.Node;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.util.Objects;

/**
* Indicates that a {@link Node} was selected in a query plan, but it had no connection available.
*
* <p>A common reason to encounter this error is when the configured number of connections per node
* and requests per connection is not high enough to absorb the overall request rate. This can be
* mitigated by tuning the following options:
*
* <ul>
* <li>{@code advanced.connection.pool.local.size};
* <li>{@code advanced.connection.pool.remote.size};
* <li>{@code advanced.connection.max-requests-per-connection}.
* </ul>
*
* See {@code reference.conf} for more details.
*
* <p>Another possibility is when you are trying to direct a request {@linkplain
* com.datastax.oss.driver.api.core.cql.Statement#setNode(Node) to a particular node}, but that node
* has no connections available.
*/
public class NodeUnavailableException extends DriverException {

private final Node node;

public NodeUnavailableException(Node node) {
super("No connection was available to " + node, null, null, true);
this.node = Objects.requireNonNull(node);
}

@NonNull
public Node getNode() {
return node;
}

@Override
@NonNull
public DriverException copy() {
return new NodeUnavailableException(node);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.datastax.oss.driver.api.core.AllNodesFailedException;
import com.datastax.oss.driver.api.core.CqlIdentifier;
import com.datastax.oss.driver.api.core.DriverTimeoutException;
import com.datastax.oss.driver.api.core.NodeUnavailableException;
import com.datastax.oss.driver.api.core.ProtocolVersion;
import com.datastax.oss.driver.api.core.RequestThrottlingException;
import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
Expand Down Expand Up @@ -188,6 +189,8 @@ private void sendRequest(PrepareRequest request, Node node, int retryCount) {
channel = session.getChannel(node, logPrefix);
if (channel != null) {
break;
} else {
recordError(node, new NodeUnavailableException(node));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.datastax.oss.driver.api.core.CqlIdentifier;
import com.datastax.oss.driver.api.core.DriverException;
import com.datastax.oss.driver.api.core.DriverTimeoutException;
import com.datastax.oss.driver.api.core.NodeUnavailableException;
import com.datastax.oss.driver.api.core.RequestThrottlingException;
import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
Expand Down Expand Up @@ -251,6 +252,8 @@ private void sendRequest(
channel = session.getChannel(node, logPrefix);
if (channel != null) {
break;
} else {
recordError(node, new NodeUnavailableException(node));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,17 @@
import com.datastax.dse.driver.DseTestFixtures;
import com.datastax.dse.driver.api.core.DseProtocolVersion;
import com.datastax.dse.driver.api.core.cql.continuous.ContinuousAsyncResultSet;
import com.datastax.oss.driver.api.core.NoNodeAvailableException;
import com.datastax.oss.driver.api.core.AllNodesFailedException;
import com.datastax.oss.driver.api.core.NodeUnavailableException;
import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
import com.datastax.oss.driver.api.core.metadata.Node;
import com.datastax.oss.driver.api.core.session.Request;
import com.datastax.oss.driver.api.core.session.Session;
import com.datastax.oss.driver.internal.core.cql.RequestHandlerTestHarness;
import com.datastax.oss.driver.internal.core.metadata.LoadBalancingPolicyWrapper;
import com.tngtech.java.junit.dataprovider.UseDataProvider;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletionStage;
import org.junit.Test;
import org.mockito.InOrder;
Expand Down Expand Up @@ -67,7 +71,12 @@ public void should_fail_if_targeted_node_not_available(DseProtocolVersion versio
assertThatStage(resultSetFuture)
.isFailed(
error -> {
assertThat(error).isInstanceOf(NoNodeAvailableException.class);
assertThat(error).isInstanceOf(AllNodesFailedException.class);
Map<Node, List<Throwable>> errors =
((AllNodesFailedException) error).getAllErrors();
assertThat(errors).hasSize(1);
List<Throwable> nodeErrors = errors.values().iterator().next();
assertThat(nodeErrors).singleElement().isInstanceOf(NodeUnavailableException.class);
invocations
.verify(loadBalancingPolicy, never())
.newQueryPlan(any(Request.class), anyString(), any(Session.class));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import com.datastax.oss.driver.api.core.AllNodesFailedException;
import com.datastax.oss.driver.api.core.NodeUnavailableException;
import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
import com.datastax.oss.driver.api.core.cql.ColumnDefinitions;
Expand All @@ -47,6 +49,7 @@
import com.datastax.oss.protocol.internal.response.result.RowsMetadata;
import com.datastax.oss.protocol.internal.util.Bytes;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletionStage;
import org.junit.Before;
Expand Down Expand Up @@ -228,6 +231,39 @@ public void should_not_retry_initial_prepare_if_unrecoverable_error() {
}
}

@Test
public void should_fail_if_nodes_unavailable() {
RequestHandlerTestHarness.Builder harnessBuilder = RequestHandlerTestHarness.builder();
try (RequestHandlerTestHarness harness =
harnessBuilder.withEmptyPool(node1).withEmptyPool(node2).build()) {
CompletionStage<PreparedStatement> prepareFuture =
new CqlPrepareHandler(PREPARE_REQUEST, harness.getSession(), harness.getContext(), "test")
.handle();
assertThatStage(prepareFuture)
.isFailed(
error -> {
assertThat(error).isInstanceOf(AllNodesFailedException.class);
Map<Node, List<Throwable>> allErrors =
((AllNodesFailedException) error).getAllErrors();
assertThat(allErrors).hasSize(2);
assertThat(allErrors)
.hasEntrySatisfying(
node1,
nodeErrors ->
assertThat(nodeErrors)
.singleElement()
.isInstanceOf(NodeUnavailableException.class));
assertThat(allErrors)
.hasEntrySatisfying(
node2,
nodeErrors ->
assertThat(nodeErrors)
.singleElement()
.isInstanceOf(NodeUnavailableException.class));
});
}
}

@Test
public void should_fail_if_retry_policy_ignores_error() {
RequestHandlerTestHarness.Builder harnessBuilder =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import com.datastax.oss.driver.api.core.AllNodesFailedException;
import com.datastax.oss.driver.api.core.CqlIdentifier;
import com.datastax.oss.driver.api.core.DriverTimeoutException;
import com.datastax.oss.driver.api.core.NoNodeAvailableException;
import com.datastax.oss.driver.api.core.NodeUnavailableException;
import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
import com.datastax.oss.driver.api.core.cql.AsyncResultSet;
import com.datastax.oss.driver.api.core.cql.BoundStatement;
Expand All @@ -32,6 +34,7 @@
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import com.datastax.oss.driver.api.core.cql.Row;
import com.datastax.oss.driver.api.core.cql.Statement;
import com.datastax.oss.driver.api.core.metadata.Node;
import com.datastax.oss.driver.internal.core.session.RepreparePayload;
import com.datastax.oss.driver.internal.core.util.concurrent.CapturingTimer.CapturedTimeout;
import com.datastax.oss.protocol.internal.request.Prepare;
Expand All @@ -43,6 +46,8 @@
import java.time.Duration;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
Expand Down Expand Up @@ -105,6 +110,43 @@ public void should_fail_if_no_node_available() {
}
}

@Test
public void should_fail_if_nodes_unavailable() {
RequestHandlerTestHarness.Builder harnessBuilder = RequestHandlerTestHarness.builder();
try (RequestHandlerTestHarness harness =
harnessBuilder.withEmptyPool(node1).withEmptyPool(node2).build()) {
CompletionStage<AsyncResultSet> resultSetFuture =
new CqlRequestHandler(
UNDEFINED_IDEMPOTENCE_STATEMENT,
harness.getSession(),
harness.getContext(),
"test")
.handle();
assertThatStage(resultSetFuture)
.isFailed(
error -> {
assertThat(error).isInstanceOf(AllNodesFailedException.class);
Map<Node, List<Throwable>> allErrors =
((AllNodesFailedException) error).getAllErrors();
assertThat(allErrors).hasSize(2);
assertThat(allErrors)
.hasEntrySatisfying(
node1,
nodeErrors ->
assertThat(nodeErrors)
.singleElement()
.isInstanceOf(NodeUnavailableException.class));
assertThat(allErrors)
.hasEntrySatisfying(
node2,
nodeErrors ->
assertThat(nodeErrors)
.singleElement()
.isInstanceOf(NodeUnavailableException.class));
});
}
}

@Test
public void should_time_out_if_first_node_takes_too_long_to_respond() throws Exception {
RequestHandlerTestHarness.Builder harnessBuilder = RequestHandlerTestHarness.builder();
Expand Down
9 changes: 9 additions & 0 deletions upgrade_guide/README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
## Upgrade guide

### 4.14.0

#### AllNodesFailedException instead of NoNodeAvailableException in certain cases

[JAVA-2959](https://datastax-oss.atlassian.net/browse/JAVA-2959) changed the behavior for when a
request cannot be executed because all nodes tried were busy. Previously you would get back a
`NoNodeAvailableException` but you will now get back an `AllNodesFailedException` where the
`getAllErrors` map contains a `NodeUnavailableException` for that node.

### 4.13.0

#### Enhanced support for GraalVM native images
Expand Down

0 comments on commit 0a7b1c8

Please sign in to comment.