YOLO_KEY = Attributes.Key.create("yolo");
+ private static Attributes ATTRIBUTES = Attributes.newBuilder()
+ .set(YOLO_KEY, "To be, or not to be?").build();
+ private static ConfigOrError CONFIG = ConfigOrError.fromConfig("foo");
+
+ @Rule
+ public final MockitoRule mocks = MockitoJUnit.rule();
private final int defaultPort = 293;
private final ProxyDetector proxyDetector = mock(ProxyDetector.class);
private final SynchronizationContext syncContext =
@@ -41,6 +64,7 @@ public class NameResolverTest {
private final ChannelLogger channelLogger = mock(ChannelLogger.class);
private final Executor executor = Executors.newSingleThreadExecutor();
private final String overrideAuthority = "grpc.io";
+ @Mock NameResolver.Listener mockListener;
@Test
public void args() {
@@ -80,4 +104,90 @@ private NameResolver.Args createArgs() {
.setOverrideAuthority(overrideAuthority)
.build();
}
+
+ @Test
+ @SuppressWarnings("deprecation")
+ public void startOnOldListener_wrapperListener2UsedToStart() {
+ final Listener2[] listener2 = new Listener2[1];
+ NameResolver nameResolver = new NameResolver() {
+ @Override
+ public String getServiceAuthority() {
+ return null;
+ }
+
+ @Override
+ public void shutdown() {}
+
+ @Override
+ public void start(Listener2 listener2Arg) {
+ listener2[0] = listener2Arg;
+ }
+ };
+ nameResolver.start(mockListener);
+
+ listener2[0].onResult(ResolutionResult.newBuilder().setAddresses(ADDRESSES)
+ .setAttributes(ATTRIBUTES).build());
+ verify(mockListener).onAddresses(eq(ADDRESSES), eq(ATTRIBUTES));
+ listener2[0].onError(Status.CANCELLED);
+ verify(mockListener).onError(Status.CANCELLED);
+ }
+
+ @Test
+ @SuppressWarnings({"deprecation", "InlineMeInliner"})
+ public void listener2AddressesToListener2ResolutionResultConversion() {
+ final ResolutionResult[] resolutionResult = new ResolutionResult[1];
+ NameResolver.Listener2 listener2 = new Listener2() {
+ @Override
+ public void onResult(ResolutionResult resolutionResultArg) {
+ resolutionResult[0] = resolutionResultArg;
+ }
+
+ @Override
+ public void onError(Status error) {}
+ };
+
+ listener2.onAddresses(ADDRESSES, ATTRIBUTES);
+
+ assertThat(resolutionResult[0].getAddressesOrError().getValue()).isEqualTo(ADDRESSES);
+ assertThat(resolutionResult[0].getAttributes()).isEqualTo(ATTRIBUTES);
+ }
+
+ @Test
+ public void resolutionResult_toString_addressesAttributesAndConfig() {
+ ResolutionResult resolutionResult = ResolutionResult.newBuilder()
+ .setAddressesOrError(StatusOr.fromValue(ADDRESSES))
+ .setAttributes(ATTRIBUTES)
+ .setServiceConfig(CONFIG)
+ .build();
+
+ assertThat(resolutionResult.toString()).isEqualTo(
+ "ResolutionResult{addressesOrError=StatusOr{value="
+ + "[[[FakeSocketAddress-fake-address-1]/{}]]}, attributes={yolo=To be, or not to be?}, "
+ + "serviceConfigOrError=ConfigOrError{config=foo}}");
+ }
+
+ @Test
+ public void resolutionResult_hashCode() {
+ ResolutionResult resolutionResult = ResolutionResult.newBuilder()
+ .setAddressesOrError(StatusOr.fromValue(ADDRESSES))
+ .setAttributes(ATTRIBUTES)
+ .setServiceConfig(CONFIG)
+ .build();
+
+ assertThat(resolutionResult.hashCode()).isEqualTo(
+ Objects.hashCode(StatusOr.fromValue(ADDRESSES), ATTRIBUTES, CONFIG));
+ }
+
+ private static class FakeSocketAddress extends SocketAddress {
+ final String name;
+
+ FakeSocketAddress(String name) {
+ this.name = name;
+ }
+
+ @Override
+ public String toString() {
+ return "FakeSocketAddress-" + name;
+ }
+ }
}
diff --git a/api/src/test/java/io/grpc/StatusOrTest.java b/api/src/test/java/io/grpc/StatusOrTest.java
new file mode 100644
index 00000000000..f63a314a2bb
--- /dev/null
+++ b/api/src/test/java/io/grpc/StatusOrTest.java
@@ -0,0 +1,81 @@
+/*
+ * Copyright 2015 The gRPC Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.grpc;
+
+import static com.google.common.truth.Truth.assertThat;
+import static junit.framework.TestCase.fail;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Unit tests for {@link StatusOr}. **/
+@RunWith(JUnit4.class)
+public class StatusOrTest {
+
+ @Test
+ public void getValue_throwsIfNoValuePresent() {
+ try {
+ StatusOr.fromStatus(Status.ABORTED).getValue();
+
+ fail("Expected exception.");
+ } catch (IllegalStateException expected) { }
+ }
+
+ @Test
+ @SuppressWarnings("TruthIncompatibleType")
+ public void equals_differentValueTypes() {
+ assertThat(StatusOr.fromValue(1)).isNotEqualTo(StatusOr.fromValue("1"));
+ }
+
+ @Test
+ public void equals_differentValues() {
+ assertThat(StatusOr.fromValue(1)).isNotEqualTo(StatusOr.fromValue(2));
+ }
+
+ @Test
+ public void equals_sameValues() {
+ assertThat(StatusOr.fromValue(1)).isEqualTo(StatusOr.fromValue(1));
+ }
+
+ @Test
+ public void equals_differentStatuses() {
+ assertThat(StatusOr.fromStatus(Status.ABORTED)).isNotEqualTo(
+ StatusOr.fromStatus(Status.CANCELLED));
+ }
+
+ @Test
+ public void equals_sameStatuses() {
+ assertThat(StatusOr.fromStatus(Status.ABORTED)).isEqualTo(StatusOr.fromStatus(Status.ABORTED));
+ }
+
+ @Test
+ public void toString_value() {
+ assertThat(StatusOr.fromValue(1).toString()).isEqualTo("StatusOr{value=1}");
+ }
+
+ @Test
+ public void toString_nullValue() {
+ assertThat(StatusOr.fromValue(null).toString()).isEqualTo("StatusOr{value=null}");
+ }
+
+ @Test
+ public void toString_errorStatus() {
+ assertThat(StatusOr.fromStatus(Status.ABORTED).toString()).isEqualTo(
+ "StatusOr{error=Status{code=ABORTED, description=null, cause=null}}");
+ }
+}
\ No newline at end of file
diff --git a/build.gradle b/build.gradle
index 740f534e136..6afe010de4d 100644
--- a/build.gradle
+++ b/build.gradle
@@ -500,3 +500,4 @@ configurations {
}
tasks.register('checkForUpdates', CheckForUpdatesTask, project.configurations.checkForUpdates, "libs")
+
diff --git a/core/src/main/java/io/grpc/internal/DnsNameResolver.java b/core/src/main/java/io/grpc/internal/DnsNameResolver.java
index df51d6f2c5c..b59de833d7c 100644
--- a/core/src/main/java/io/grpc/internal/DnsNameResolver.java
+++ b/core/src/main/java/io/grpc/internal/DnsNameResolver.java
@@ -32,6 +32,7 @@
import io.grpc.ProxiedSocketAddress;
import io.grpc.ProxyDetector;
import io.grpc.Status;
+import io.grpc.StatusOr;
import io.grpc.SynchronizationContext;
import io.grpc.internal.SharedResourceHolder.Resource;
import java.io.IOException;
@@ -59,7 +60,7 @@
* A DNS-based {@link NameResolver}.
*
* Each {@code A} or {@code AAAA} record emits an {@link EquivalentAddressGroup} in the list
- * passed to {@link NameResolver.Listener2#onResult(ResolutionResult)}.
+ * passed to {@link NameResolver.Listener2#onResult2(ResolutionResult)}.
*
* @see DnsNameResolverProvider
*/
@@ -313,15 +314,20 @@ public void run() {
if (logger.isLoggable(Level.FINER)) {
logger.finer("Using proxy address " + proxiedAddr);
}
- resolutionResultBuilder.setAddresses(Collections.singletonList(proxiedAddr));
+ resolutionResultBuilder.setAddressesOrError(
+ StatusOr.fromValue(Collections.singletonList(proxiedAddr)));
} else {
result = doResolve(false);
if (result.error != null) {
- savedListener.onError(result.error);
+ InternalResolutionResult finalResult = result;
+ syncContext.execute(() ->
+ savedListener.onResult2(ResolutionResult.newBuilder()
+ .setAddressesOrError(StatusOr.fromStatus(finalResult.error))
+ .build()));
return;
}
if (result.addresses != null) {
- resolutionResultBuilder.setAddresses(result.addresses);
+ resolutionResultBuilder.setAddressesOrError(StatusOr.fromValue(result.addresses));
}
if (result.config != null) {
resolutionResultBuilder.setServiceConfig(result.config);
@@ -334,8 +340,12 @@ public void run() {
savedListener.onResult2(resolutionResultBuilder.build());
});
} catch (IOException e) {
- savedListener.onError(
- Status.UNAVAILABLE.withDescription("Unable to resolve host " + host).withCause(e));
+ syncContext.execute(() ->
+ savedListener.onResult2(ResolutionResult.newBuilder()
+ .setAddressesOrError(
+ StatusOr.fromStatus(
+ Status.UNAVAILABLE.withDescription(
+ "Unable to resolve host " + host).withCause(e))).build()));
} finally {
final boolean succeed = result != null && result.error == null;
syncContext.execute(new Runnable() {
diff --git a/core/src/main/java/io/grpc/internal/InternalSubchannel.java b/core/src/main/java/io/grpc/internal/InternalSubchannel.java
index 70e42e2f5f1..27a80f7c191 100644
--- a/core/src/main/java/io/grpc/internal/InternalSubchannel.java
+++ b/core/src/main/java/io/grpc/internal/InternalSubchannel.java
@@ -45,6 +45,7 @@
import io.grpc.InternalInstrumented;
import io.grpc.InternalLogId;
import io.grpc.InternalWithLogId;
+import io.grpc.LoadBalancer;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Status;
@@ -77,6 +78,7 @@ final class InternalSubchannel implements InternalInstrumented, Tr
private final CallTracer callsTracer;
private final ChannelTracer channelTracer;
private final ChannelLogger channelLogger;
+ private final boolean reconnectDisabled;
private final List transportFilters;
@@ -159,13 +161,15 @@ protected void handleNotInUse() {
private volatile Attributes connectedAddressAttributes;
- InternalSubchannel(List addressGroups, String authority, String userAgent,
- BackoffPolicy.Provider backoffPolicyProvider,
- ClientTransportFactory transportFactory, ScheduledExecutorService scheduledExecutor,
- Supplier stopwatchSupplier, SynchronizationContext syncContext, Callback callback,
- InternalChannelz channelz, CallTracer callsTracer, ChannelTracer channelTracer,
- InternalLogId logId, ChannelLogger channelLogger,
- List transportFilters) {
+ InternalSubchannel(LoadBalancer.CreateSubchannelArgs args, String authority, String userAgent,
+ BackoffPolicy.Provider backoffPolicyProvider,
+ ClientTransportFactory transportFactory,
+ ScheduledExecutorService scheduledExecutor,
+ Supplier stopwatchSupplier, SynchronizationContext syncContext,
+ Callback callback, InternalChannelz channelz, CallTracer callsTracer,
+ ChannelTracer channelTracer, InternalLogId logId,
+ ChannelLogger channelLogger, List transportFilters) {
+ List addressGroups = args.getAddresses();
Preconditions.checkNotNull(addressGroups, "addressGroups");
Preconditions.checkArgument(!addressGroups.isEmpty(), "addressGroups is empty");
checkListHasNoNulls(addressGroups, "addressGroups contains null entry");
@@ -187,6 +191,7 @@ protected void handleNotInUse() {
this.logId = Preconditions.checkNotNull(logId, "logId");
this.channelLogger = Preconditions.checkNotNull(channelLogger, "channelLogger");
this.transportFilters = transportFilters;
+ this.reconnectDisabled = args.getOption(LoadBalancer.DISABLE_SUBCHANNEL_RECONNECT_KEY);
}
ChannelLogger getChannelLogger() {
@@ -289,6 +294,11 @@ public void run() {
}
gotoState(ConnectivityStateInfo.forTransientFailure(status));
+
+ if (reconnectDisabled) {
+ return;
+ }
+
if (reconnectPolicy == null) {
reconnectPolicy = backoffPolicyProvider.get();
}
@@ -337,7 +347,11 @@ private void gotoState(final ConnectivityStateInfo newState) {
if (state.getState() != newState.getState()) {
Preconditions.checkState(state.getState() != SHUTDOWN,
"Cannot transition out of SHUTDOWN to " + newState);
- state = newState;
+ if (reconnectDisabled && newState.getState() == TRANSIENT_FAILURE) {
+ state = ConnectivityStateInfo.forNonError(IDLE);
+ } else {
+ state = newState;
+ }
callback.onStateChange(InternalSubchannel.this, newState);
}
}
diff --git a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
index 7e36086ac94..cda4299acec 100644
--- a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
+++ b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
@@ -81,6 +81,7 @@
import io.grpc.NameResolverRegistry;
import io.grpc.ProxyDetector;
import io.grpc.Status;
+import io.grpc.StatusOr;
import io.grpc.SynchronizationContext;
import io.grpc.SynchronizationContext.ScheduledHandle;
import io.grpc.internal.AutoConfiguredLoadBalancerFactory.AutoConfiguredLoadBalancer;
@@ -1483,7 +1484,7 @@ void onStateChange(InternalSubchannel is, ConnectivityStateInfo newState) {
}
final InternalSubchannel internalSubchannel = new InternalSubchannel(
- addressGroup,
+ CreateSubchannelArgs.newBuilder().setAddresses(addressGroup).build(),
authority, userAgent, backoffPolicyProvider, oobTransportFactory,
oobTransportFactory.getScheduledExecutorService(), stopwatchSupplier, syncContext,
// All callback methods are run from syncContext
@@ -1701,7 +1702,13 @@ public Status onResult2(final ResolutionResult resolutionResult) {
return Status.OK;
}
- List servers = resolutionResult.getAddresses();
+ StatusOr> serversOrError =
+ resolutionResult.getAddressesOrError();
+ if (!serversOrError.hasValue()) {
+ handleErrorInSyncContext(serversOrError.getStatus());
+ return serversOrError.getStatus();
+ }
+ List servers = serversOrError.getValue();
channelLogger.log(
ChannelLogLevel.DEBUG,
"Resolved address: {0}, config={1}",
@@ -1709,10 +1716,10 @@ public Status onResult2(final ResolutionResult resolutionResult) {
resolutionResult.getAttributes());
if (lastResolutionState != ResolutionState.SUCCESS) {
- channelLogger.log(ChannelLogLevel.INFO, "Address resolved: {0}", servers);
+ channelLogger.log(ChannelLogLevel.INFO, "Address resolved: {0}",
+ servers);
lastResolutionState = ResolutionState.SUCCESS;
}
-
ConfigOrError configOrError = resolutionResult.getServiceConfig();
InternalConfigSelector resolvedConfigSelector =
resolutionResult.getAttributes().get(InternalConfigSelector.KEY);
@@ -1788,7 +1795,7 @@ public Status onResult2(final ResolutionResult resolutionResult) {
}
try {
- // TODO(creamsoup): when `servers` is empty and lastResolutionStateCopy == SUCCESS
+ // TODO(creamsoup): when `serversOrError` is empty and lastResolutionStateCopy == SUCCESS
// and lbNeedAddress, it shouldn't call the handleServiceConfigUpdate. But,
// lbNeedAddress is not deterministic
serviceConfigUpdated = true;
@@ -1814,12 +1821,13 @@ public Status onResult2(final ResolutionResult resolutionResult) {
}
Attributes attributes = attrBuilder.build();
- return helper.lb.tryAcceptResolvedAddresses(
- ResolvedAddresses.newBuilder()
- .setAddresses(servers)
- .setAttributes(attributes)
- .setLoadBalancingPolicyConfig(effectiveServiceConfig.getLoadBalancingConfig())
- .build());
+ ResolvedAddresses.Builder resolvedAddresses = ResolvedAddresses.newBuilder()
+ .setAddresses(serversOrError.getValue())
+ .setAttributes(attributes)
+ .setLoadBalancingPolicyConfig(effectiveServiceConfig.getLoadBalancingConfig());
+ Status addressAcceptanceStatus = helper.lb.tryAcceptResolvedAddresses(
+ resolvedAddresses.build());
+ return addressAcceptanceStatus;
}
return Status.OK;
}
@@ -1915,7 +1923,7 @@ void onNotInUse(InternalSubchannel is) {
}
final InternalSubchannel internalSubchannel = new InternalSubchannel(
- args.getAddresses(),
+ args,
authority(),
userAgent,
backoffPolicyProvider,
diff --git a/core/src/main/java/io/grpc/internal/ManagedChannelImplBuilder.java b/core/src/main/java/io/grpc/internal/ManagedChannelImplBuilder.java
index 7da9125087e..48a255472e1 100644
--- a/core/src/main/java/io/grpc/internal/ManagedChannelImplBuilder.java
+++ b/core/src/main/java/io/grpc/internal/ManagedChannelImplBuilder.java
@@ -45,6 +45,7 @@
import io.grpc.NameResolverProvider;
import io.grpc.NameResolverRegistry;
import io.grpc.ProxyDetector;
+import io.grpc.StatusOr;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.SocketAddress;
@@ -877,9 +878,11 @@ public String getServiceAuthority() {
@Override
public void start(Listener2 listener) {
- listener.onResult(
+ listener.onResult2(
ResolutionResult.newBuilder()
- .setAddresses(Collections.singletonList(new EquivalentAddressGroup(address)))
+ .setAddressesOrError(
+ StatusOr.fromValue(
+ Collections.singletonList(new EquivalentAddressGroup(address))))
.setAttributes(Attributes.EMPTY)
.build());
}
diff --git a/core/src/main/java/io/grpc/internal/MessageDeframer.java b/core/src/main/java/io/grpc/internal/MessageDeframer.java
index c8b250c2143..13a01efec0a 100644
--- a/core/src/main/java/io/grpc/internal/MessageDeframer.java
+++ b/core/src/main/java/io/grpc/internal/MessageDeframer.java
@@ -406,7 +406,8 @@ private void processBody() {
// There is no reliable way to get the uncompressed size per message when it's compressed,
// because the uncompressed bytes are provided through an InputStream whose total size is
// unknown until all bytes are read, and we don't know when it happens.
- statsTraceCtx.inboundMessageRead(currentMessageSeqNo, inboundBodyWireSize, -1);
+ statsTraceCtx.inboundMessageRead(currentMessageSeqNo, inboundBodyWireSize,
+ (compressedFlag || fullStreamDecompressor != null) ? -1 : inboundBodyWireSize);
inboundBodyWireSize = 0;
InputStream stream = compressedFlag ? getCompressedBody() : getUncompressedBody();
nextFrame.touch();
diff --git a/core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java b/core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java
index bfa462e16e1..6f4794fdd46 100644
--- a/core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java
+++ b/core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java
@@ -64,17 +64,26 @@ final class PickFirstLeafLoadBalancer extends LoadBalancer {
private int numTf = 0;
private boolean firstPass = true;
@Nullable
- private ScheduledHandle scheduleConnectionTask;
+ private ScheduledHandle scheduleConnectionTask = null;
private ConnectivityState rawConnectivityState = IDLE;
private ConnectivityState concludedState = IDLE;
- private final boolean enableHappyEyeballs =
- PickFirstLoadBalancerProvider.isEnabledHappyEyeballs();
+ private final boolean enableHappyEyeballs = !isSerializingRetries()
+ && PickFirstLoadBalancerProvider.isEnabledHappyEyeballs();
private boolean notAPetiolePolicy = true; // means not under a petiole policy
+ private final BackoffPolicy.Provider bkoffPolProvider = new ExponentialBackoffPolicy.Provider();
+ private BackoffPolicy reconnectPolicy;
+ @Nullable
+ private ScheduledHandle reconnectTask = null;
+ private final boolean serializingRetries = isSerializingRetries();
PickFirstLeafLoadBalancer(Helper helper) {
this.helper = checkNotNull(helper, "helper");
}
+ static boolean isSerializingRetries() {
+ return GrpcUtil.getFlag("GRPC_SERIALIZE_RETRIES", false);
+ }
+
@Override
public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
if (rawConnectivityState == SHUTDOWN) {
@@ -225,9 +234,10 @@ void processSubchannelState(SubchannelData subchannelData, ConnectivityStateInfo
return;
}
- if (newState == IDLE) {
+ if (newState == IDLE && subchannelData.state == READY) {
helper.refreshNameResolution();
}
+
// If we are transitioning from a TRANSIENT_FAILURE to CONNECTING or IDLE we ignore this state
// transition and still keep the LB in TRANSIENT_FAILURE state. This is referred to as "sticky
// transient failure". Only a subchannel state change to READY will get the LB out of
@@ -277,6 +287,8 @@ void processSubchannelState(SubchannelData subchannelData, ConnectivityStateInfo
if (addressIndex.increment()) {
cancelScheduleTask();
requestConnection(); // is recursive so might hit the end of the addresses
+ } else {
+ scheduleBackoff();
}
}
@@ -304,6 +316,39 @@ void processSubchannelState(SubchannelData subchannelData, ConnectivityStateInfo
}
}
+ /**
+ * Only called after all addresses attempted and failed (TRANSIENT_FAILURE).
+ */
+ private void scheduleBackoff() {
+ if (!serializingRetries) {
+ return;
+ }
+
+ class EndOfCurrentBackoff implements Runnable {
+ @Override
+ public void run() {
+ reconnectTask = null;
+ addressIndex.reset();
+ requestConnection();
+ }
+ }
+
+ // Just allow the previous one to trigger when ready if we're already in backoff
+ if (reconnectTask != null) {
+ return;
+ }
+
+ if (reconnectPolicy == null) {
+ reconnectPolicy = bkoffPolProvider.get();
+ }
+ long delayNanos = reconnectPolicy.nextBackoffNanos();
+ reconnectTask = helper.getSynchronizationContext().schedule(
+ new EndOfCurrentBackoff(),
+ delayNanos,
+ TimeUnit.NANOSECONDS,
+ helper.getScheduledExecutorService());
+ }
+
private void updateHealthCheckedState(SubchannelData subchannelData) {
if (subchannelData.state != READY) {
return;
@@ -337,6 +382,11 @@ public void shutdown() {
rawConnectivityState = SHUTDOWN;
concludedState = SHUTDOWN;
cancelScheduleTask();
+ if (reconnectTask != null) {
+ reconnectTask.cancel();
+ reconnectTask = null;
+ }
+ reconnectPolicy = null;
for (SubchannelData subchannelData : subchannels.values()) {
subchannelData.getSubchannel().shutdown();
@@ -350,6 +400,12 @@ public void shutdown() {
* that all other subchannels must be shutdown.
*/
private void shutdownRemaining(SubchannelData activeSubchannelData) {
+ if (reconnectTask != null) {
+ reconnectTask.cancel();
+ reconnectTask = null;
+ }
+ reconnectPolicy = null;
+
cancelScheduleTask();
for (SubchannelData subchannelData : subchannels.values()) {
if (!subchannelData.getSubchannel().equals(activeSubchannelData.subchannel)) {
@@ -391,8 +447,17 @@ public void requestConnection() {
scheduleNextConnection();
break;
case TRANSIENT_FAILURE:
- addressIndex.increment();
- requestConnection();
+ if (!serializingRetries) {
+ addressIndex.increment();
+ requestConnection();
+ } else {
+ if (!addressIndex.isValid()) {
+ scheduleBackoff();
+ } else {
+ subchannelData.subchannel.requestConnection();
+ subchannelData.updateState(CONNECTING);
+ }
+ }
break;
default:
// Wait for current subchannel to change state
@@ -438,9 +503,10 @@ private SubchannelData createNewSubchannel(SocketAddress addr, Attributes attrs)
HealthListener hcListener = new HealthListener();
final Subchannel subchannel = helper.createSubchannel(
CreateSubchannelArgs.newBuilder()
- .setAddresses(Lists.newArrayList(
- new EquivalentAddressGroup(addr, attrs)))
- .addOption(HEALTH_CONSUMER_LISTENER_ARG_KEY, hcListener)
+ .setAddresses(Lists.newArrayList(
+ new EquivalentAddressGroup(addr, attrs)))
+ .addOption(HEALTH_CONSUMER_LISTENER_ARG_KEY, hcListener)
+ .addOption(LoadBalancer.DISABLE_SUBCHANNEL_RECONNECT_KEY, serializingRetries)
.build());
if (subchannel == null) {
log.warning("Was not able to create subchannel for " + addr);
@@ -458,7 +524,7 @@ private SubchannelData createNewSubchannel(SocketAddress addr, Attributes attrs)
}
private boolean isPassComplete() {
- if (addressIndex.isValid() || subchannels.size() < addressIndex.size()) {
+ if (subchannels.size() < addressIndex.size()) {
return false;
}
for (SubchannelData sc : subchannels.values()) {
@@ -646,6 +712,16 @@ public int size() {
}
}
+ @VisibleForTesting
+ int getGroupIndex() {
+ return addressIndex.groupIndex;
+ }
+
+ @VisibleForTesting
+ boolean isIndexValid() {
+ return addressIndex.isValid();
+ }
+
private static final class SubchannelData {
private final Subchannel subchannel;
private ConnectivityState state;
diff --git a/core/src/test/java/io/grpc/internal/DnsNameResolverTest.java b/core/src/test/java/io/grpc/internal/DnsNameResolverTest.java
index 0512171f4e7..be304ad326b 100644
--- a/core/src/test/java/io/grpc/internal/DnsNameResolverTest.java
+++ b/core/src/test/java/io/grpc/internal/DnsNameResolverTest.java
@@ -24,6 +24,7 @@
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.isA;
import static org.mockito.Mockito.mock;
@@ -152,12 +153,11 @@ public void close(Executor instance) {}
private NameResolver.Listener2 mockListener;
@Captor
private ArgumentCaptor resultCaptor;
- @Captor
- private ArgumentCaptor errorCaptor;
@Nullable
private String networkaddressCacheTtlPropertyValue;
@Mock
private RecordFetcher recordFetcher;
+ @Mock private ProxyDetector mockProxyDetector;
private RetryingNameResolver newResolver(String name, int defaultPort) {
return newResolver(
@@ -570,7 +570,7 @@ public List resolveAddress(String host) throws Exception {
ArgumentCaptor ac = ArgumentCaptor.forClass(ResolutionResult.class);
verify(mockListener).onResult2(ac.capture());
verifyNoMoreInteractions(mockListener);
- assertThat(ac.getValue().getAddresses()).isEmpty();
+ assertThat(ac.getValue().getAddressesOrError().getValue()).isEmpty();
assertThat(ac.getValue().getServiceConfig()).isNull();
verify(mockResourceResolver, never()).resolveSrv(anyString());
@@ -578,6 +578,39 @@ public List resolveAddress(String host) throws Exception {
assertEquals(0, fakeExecutor.numPendingTasks());
}
+ @Test
+ public void resolve_addressResolutionError() throws Exception {
+ DnsNameResolver.enableTxt = true;
+ when(mockProxyDetector.proxyFor(any(SocketAddress.class))).thenThrow(new IOException());
+ RetryingNameResolver resolver = newResolver(
+ "addr.fake:1234", 443, mockProxyDetector, Stopwatch.createUnstarted());
+ DnsNameResolver dnsResolver = (DnsNameResolver) resolver.getRetriedNameResolver();
+ dnsResolver.setAddressResolver(new AddressResolver() {
+ @Override
+ public List resolveAddress(String host) throws Exception {
+ return Collections.emptyList();
+ }
+ });
+ ResourceResolver mockResourceResolver = mock(ResourceResolver.class);
+ when(mockResourceResolver.resolveTxt(anyString()))
+ .thenReturn(Collections.emptyList());
+
+ dnsResolver.setResourceResolver(mockResourceResolver);
+
+ resolver.start(mockListener);
+ assertThat(fakeExecutor.runDueTasks()).isEqualTo(1);
+
+ ArgumentCaptor ac = ArgumentCaptor.forClass(ResolutionResult.class);
+ verify(mockListener).onResult2(ac.capture());
+ verifyNoMoreInteractions(mockListener);
+ assertThat(ac.getValue().getAddressesOrError().getStatus().getCode()).isEqualTo(
+ Status.UNAVAILABLE.getCode());
+ assertThat(ac.getValue().getAddressesOrError().getStatus().getDescription()).isEqualTo(
+ "Unable to resolve host addr.fake");
+ assertThat(ac.getValue().getAddressesOrError().getStatus().getCause())
+ .isInstanceOf(IOException.class);
+ }
+
// Load balancer rejects the empty addresses.
@Test
public void resolve_emptyResult_notAccepted() throws Exception {
@@ -604,7 +637,7 @@ public List resolveAddress(String host) throws Exception {
ArgumentCaptor ac = ArgumentCaptor.forClass(ResolutionResult.class);
verify(mockListener).onResult2(ac.capture());
verifyNoMoreInteractions(mockListener);
- assertThat(ac.getValue().getAddresses()).isEmpty();
+ assertThat(ac.getValue().getAddressesOrError().getValue()).isEmpty();
assertThat(ac.getValue().getServiceConfig()).isNull();
verify(mockResourceResolver, never()).resolveSrv(anyString());
@@ -632,7 +665,7 @@ public void resolve_nullResourceResolver() throws Exception {
ResolutionResult result = resultCaptor.getValue();
InetSocketAddress resolvedBackendAddr =
(InetSocketAddress) Iterables.getOnlyElement(
- Iterables.getOnlyElement(result.getAddresses()).getAddresses());
+ Iterables.getOnlyElement(result.getAddressesOrError().getValue()).getAddresses());
assertThat(resolvedBackendAddr.getAddress()).isEqualTo(backendAddr);
verify(mockAddressResolver).resolveAddress(name);
assertThat(result.getServiceConfig()).isNull();
@@ -647,6 +680,7 @@ public void resolve_nullResourceResolver_addressFailure() throws Exception {
AddressResolver mockAddressResolver = mock(AddressResolver.class);
when(mockAddressResolver.resolveAddress(anyString()))
.thenThrow(new IOException("no addr"));
+ when(mockListener.onResult2(isA(ResolutionResult.class))).thenReturn(Status.UNAVAILABLE);
String name = "foo.googleapis.com";
RetryingNameResolver resolver = newResolver(name, 81);
@@ -655,8 +689,8 @@ public void resolve_nullResourceResolver_addressFailure() throws Exception {
dnsResolver.setResourceResolver(null);
resolver.start(mockListener);
assertEquals(1, fakeExecutor.runDueTasks());
- verify(mockListener).onError(errorCaptor.capture());
- Status errorStatus = errorCaptor.getValue();
+ verify(mockListener).onResult2(resultCaptor.capture());
+ Status errorStatus = resultCaptor.getValue().getAddressesOrError().getStatus();
assertThat(errorStatus.getCode()).isEqualTo(Code.UNAVAILABLE);
assertThat(errorStatus.getCause()).hasMessageThat().contains("no addr");
@@ -704,7 +738,7 @@ public ConfigOrError parseServiceConfig(Map rawServiceConfig) {
ResolutionResult result = resultCaptor.getValue();
InetSocketAddress resolvedBackendAddr =
(InetSocketAddress) Iterables.getOnlyElement(
- Iterables.getOnlyElement(result.getAddresses()).getAddresses());
+ Iterables.getOnlyElement(result.getAddressesOrError().getValue()).getAddresses());
assertThat(resolvedBackendAddr.getAddress()).isEqualTo(backendAddr);
assertThat(result.getServiceConfig().getConfig()).isNotNull();
verify(mockAddressResolver).resolveAddress(name);
@@ -720,6 +754,7 @@ public void resolve_addressFailure_neverLookUpServiceConfig() throws Exception {
AddressResolver mockAddressResolver = mock(AddressResolver.class);
when(mockAddressResolver.resolveAddress(anyString()))
.thenThrow(new IOException("no addr"));
+ when(mockListener.onResult2(isA(ResolutionResult.class))).thenReturn(Status.UNAVAILABLE);
String name = "foo.googleapis.com";
ResourceResolver mockResourceResolver = mock(ResourceResolver.class);
@@ -729,8 +764,8 @@ public void resolve_addressFailure_neverLookUpServiceConfig() throws Exception {
dnsResolver.setResourceResolver(mockResourceResolver);
resolver.start(mockListener);
assertEquals(1, fakeExecutor.runDueTasks());
- verify(mockListener).onError(errorCaptor.capture());
- Status errorStatus = errorCaptor.getValue();
+ verify(mockListener).onResult2(resultCaptor.capture());
+ Status errorStatus = resultCaptor.getValue().getAddressesOrError().getStatus();
assertThat(errorStatus.getCode()).isEqualTo(Code.UNAVAILABLE);
assertThat(errorStatus.getCause()).hasMessageThat().contains("no addr");
verify(mockResourceResolver, never()).resolveTxt(anyString());
@@ -762,7 +797,7 @@ public void resolve_serviceConfigLookupFails_nullServiceConfig() throws Exceptio
ResolutionResult result = resultCaptor.getValue();
InetSocketAddress resolvedBackendAddr =
(InetSocketAddress) Iterables.getOnlyElement(
- Iterables.getOnlyElement(result.getAddresses()).getAddresses());
+ Iterables.getOnlyElement(result.getAddressesOrError().getValue()).getAddresses());
assertThat(resolvedBackendAddr.getAddress()).isEqualTo(backendAddr);
verify(mockAddressResolver).resolveAddress(name);
assertThat(result.getServiceConfig()).isNull();
@@ -794,7 +829,7 @@ public void resolve_serviceConfigMalformed_serviceConfigError() throws Exception
ResolutionResult result = resultCaptor.getValue();
InetSocketAddress resolvedBackendAddr =
(InetSocketAddress) Iterables.getOnlyElement(
- Iterables.getOnlyElement(result.getAddresses()).getAddresses());
+ Iterables.getOnlyElement(result.getAddressesOrError().getValue()).getAddresses());
assertThat(resolvedBackendAddr.getAddress()).isEqualTo(backendAddr);
verify(mockAddressResolver).resolveAddress(name);
assertThat(result.getServiceConfig()).isNotNull();
@@ -859,7 +894,7 @@ public HttpConnectProxiedSocketAddress proxyFor(SocketAddress targetAddress) {
assertEquals(1, fakeExecutor.runDueTasks());
verify(mockListener).onResult2(resultCaptor.capture());
- List result = resultCaptor.getValue().getAddresses();
+ List result = resultCaptor.getValue().getAddressesOrError().getValue();
assertThat(result).hasSize(1);
EquivalentAddressGroup eag = result.get(0);
assertThat(eag.getAddresses()).hasSize(1);
@@ -1299,9 +1334,9 @@ private List createAddressList(int n) throws UnknownHostException {
private static void assertAnswerMatches(
List addrs, int port, ResolutionResult resolutionResult) {
- assertThat(resolutionResult.getAddresses()).hasSize(addrs.size());
+ assertThat(resolutionResult.getAddressesOrError().getValue()).hasSize(addrs.size());
for (int i = 0; i < addrs.size(); i++) {
- EquivalentAddressGroup addrGroup = resolutionResult.getAddresses().get(i);
+ EquivalentAddressGroup addrGroup = resolutionResult.getAddressesOrError().getValue().get(i);
InetSocketAddress socketAddr =
(InetSocketAddress) Iterables.getOnlyElement(addrGroup.getAddresses());
assertEquals("Addr " + i, port, socketAddr.getPort());
diff --git a/core/src/test/java/io/grpc/internal/InternalSubchannelTest.java b/core/src/test/java/io/grpc/internal/InternalSubchannelTest.java
index e4d9f27ed46..b75fd43a743 100644
--- a/core/src/test/java/io/grpc/internal/InternalSubchannelTest.java
+++ b/core/src/test/java/io/grpc/internal/InternalSubchannelTest.java
@@ -46,6 +46,7 @@
import io.grpc.InternalChannelz;
import io.grpc.InternalLogId;
import io.grpc.InternalWithLogId;
+import io.grpc.LoadBalancer;
import io.grpc.Status;
import io.grpc.SynchronizationContext;
import io.grpc.internal.InternalSubchannel.CallTracingTransport;
@@ -309,10 +310,57 @@ public void constructor_eagListWithNull_throws() {
verify(mockBackoffPolicy2, times(backoff2Consulted)).nextBackoffNanos();
}
+ @Test public void twoAddressesReconnectDisabled() {
+ SocketAddress addr1 = mock(SocketAddress.class);
+ SocketAddress addr2 = mock(SocketAddress.class);
+ createInternalSubchannel(true,
+ new EquivalentAddressGroup(Arrays.asList(addr1, addr2)));
+ assertEquals(IDLE, internalSubchannel.getState());
+
+ assertNull(internalSubchannel.obtainActiveTransport());
+ assertExactCallbackInvokes("onStateChange:CONNECTING");
+ assertEquals(CONNECTING, internalSubchannel.getState());
+ verify(mockTransportFactory).newClientTransport(eq(addr1), any(), any());
+ // Let this one fail without success
+ transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
+ // Still in CONNECTING
+ assertNull(internalSubchannel.obtainActiveTransport());
+ assertNoCallbackInvoke();
+ assertEquals(CONNECTING, internalSubchannel.getState());
+
+ // Second attempt will start immediately. Still no back-off policy.
+ verify(mockBackoffPolicyProvider, times(0)).get();
+ verify(mockTransportFactory, times(1))
+ .newClientTransport(
+ eq(addr2),
+ eq(createClientTransportOptions()),
+ isA(TransportLogger.class));
+ assertNull(internalSubchannel.obtainActiveTransport());
+ // Fail this one too
+ assertNoCallbackInvoke();
+ transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
+ // All addresses have failed, but we aren't controlling retries.
+ assertEquals(IDLE, internalSubchannel.getState());
+ assertExactCallbackInvokes("onStateChange:" + UNAVAILABLE_STATE);
+ // Backoff reset and first back-off interval begins
+ verify(mockBackoffPolicy1, never()).nextBackoffNanos();
+ verify(mockBackoffPolicyProvider, never()).get();
+ assertTrue("Nothing should have been scheduled", fakeClock.getPendingTasks().isEmpty());
+
+ // Should follow orders and create an active transport.
+ internalSubchannel.obtainActiveTransport();
+ assertExactCallbackInvokes("onStateChange:CONNECTING");
+ assertEquals(CONNECTING, internalSubchannel.getState());
+
+ // Shouldn't have anything scheduled, so shouldn't do anything
+ assertTrue("Nothing should have been scheduled 2", fakeClock.getPendingTasks().isEmpty());
+ }
+
@Test public void twoAddressesReconnect() {
SocketAddress addr1 = mock(SocketAddress.class);
SocketAddress addr2 = mock(SocketAddress.class);
- createInternalSubchannel(addr1, addr2);
+ createInternalSubchannel(false,
+ new EquivalentAddressGroup(Arrays.asList(addr1, addr2)));
assertEquals(IDLE, internalSubchannel.getState());
// Invocation counters
int transportsAddr1 = 0;
@@ -1377,11 +1425,24 @@ private void createInternalSubchannel(SocketAddress ... addrs) {
}
private void createInternalSubchannel(EquivalentAddressGroup ... addrs) {
+ createInternalSubchannel(false, addrs);
+ }
+
+ private void createInternalSubchannel(boolean reconnectDisabled,
+ EquivalentAddressGroup ... addrs) {
List addressGroups = Arrays.asList(addrs);
InternalLogId logId = InternalLogId.allocate("Subchannel", /*details=*/ AUTHORITY);
ChannelTracer subchannelTracer = new ChannelTracer(logId, 10,
fakeClock.getTimeProvider().currentTimeNanos(), "Subchannel");
- internalSubchannel = new InternalSubchannel(addressGroups, AUTHORITY, USER_AGENT,
+ LoadBalancer.CreateSubchannelArgs.Builder argBuilder =
+ LoadBalancer.CreateSubchannelArgs.newBuilder().setAddresses(addressGroups);
+ if (reconnectDisabled) {
+ argBuilder.addOption(LoadBalancer.DISABLE_SUBCHANNEL_RECONNECT_KEY, reconnectDisabled);
+ }
+ LoadBalancer.CreateSubchannelArgs createSubchannelArgs = argBuilder.build();
+ internalSubchannel = new InternalSubchannel(
+ createSubchannelArgs,
+ AUTHORITY, USER_AGENT,
mockBackoffPolicyProvider, mockTransportFactory, fakeClock.getScheduledExecutorService(),
fakeClock.getStopwatchSupplier(), syncContext, mockInternalSubchannelCallback,
channelz, CallTracer.getDefaultFactory().create(),
diff --git a/core/src/test/java/io/grpc/internal/ManagedChannelImplIdlenessTest.java b/core/src/test/java/io/grpc/internal/ManagedChannelImplIdlenessTest.java
index 90008c1be30..293d0e70961 100644
--- a/core/src/test/java/io/grpc/internal/ManagedChannelImplIdlenessTest.java
+++ b/core/src/test/java/io/grpc/internal/ManagedChannelImplIdlenessTest.java
@@ -63,6 +63,7 @@
import io.grpc.NameResolver.ResolutionResult;
import io.grpc.NameResolverProvider;
import io.grpc.Status;
+import io.grpc.StatusOr;
import io.grpc.StringMarshaller;
import io.grpc.internal.FakeClock.ScheduledTask;
import io.grpc.internal.ManagedChannelImplBuilder.UnsupportedClientTransportFactoryBuilder;
@@ -615,7 +616,7 @@ private void deliverResolutionResult() {
// the NameResolver.
ResolutionResult resolutionResult =
ResolutionResult.newBuilder()
- .setAddresses(servers)
+ .setAddressesOrError(StatusOr.fromValue(servers))
.setAttributes(Attributes.EMPTY)
.build();
nameResolverListenerCaptor.getValue().onResult(resolutionResult);
diff --git a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java
index bea14bcef47..16700096827 100644
--- a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java
+++ b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java
@@ -116,6 +116,7 @@
import io.grpc.ServerMethodDefinition;
import io.grpc.Status;
import io.grpc.Status.Code;
+import io.grpc.StatusOr;
import io.grpc.StringMarshaller;
import io.grpc.SynchronizationContext;
import io.grpc.internal.ClientTransportFactory.ClientTransportOptions;
@@ -1056,7 +1057,7 @@ public void noMoreCallbackAfterLoadBalancerShutdown() {
verifyNoMoreInteractions(mockLoadBalancer);
}
- @Test
+ @Test
public void noMoreCallbackAfterLoadBalancerShutdown_configError() throws InterruptedException {
FakeNameResolverFactory nameResolverFactory =
new FakeNameResolverFactory.Builder(expectedUri)
@@ -1095,7 +1096,10 @@ public void noMoreCallbackAfterLoadBalancerShutdown_configError() throws Interru
verify(stateListener2).onSubchannelState(stateInfoCaptor.capture());
assertSame(CONNECTING, stateInfoCaptor.getValue().getState());
- resolver.listener.onError(resolutionError);
+ channel.syncContext.execute(() ->
+ resolver.listener.onResult2(
+ ResolutionResult.newBuilder()
+ .setAddressesOrError(StatusOr.fromStatus(resolutionError)).build()));
verify(mockLoadBalancer).handleNameResolutionError(resolutionError);
verifyNoMoreInteractions(mockLoadBalancer);
@@ -1117,13 +1121,10 @@ public void noMoreCallbackAfterLoadBalancerShutdown_configError() throws Interru
verifyNoMoreInteractions(stateListener1, stateListener2);
// No more callback should be delivered to LoadBalancer after it's shut down
- resolver.listener.onResult(
- ResolutionResult.newBuilder()
- .setAddresses(new ArrayList<>())
- .setServiceConfig(
- ConfigOrError.fromError(Status.UNAVAILABLE.withDescription("Resolution failed")))
- .build());
- Thread.sleep(1100);
+ channel.syncContext.execute(() ->
+ resolver.listener.onResult2(
+ ResolutionResult.newBuilder()
+ .setAddressesOrError(StatusOr.fromStatus(resolutionError)).build()));
assertThat(timer.getPendingTasks()).isEmpty();
resolver.resolved();
verifyNoMoreInteractions(mockLoadBalancer);
@@ -3286,11 +3287,19 @@ public void channelTracing_nameResolvedEvent_zeorAndNonzeroBackends_usesListener
assertThat(getStats(channel).channelTrace.events).hasSize(prevSize);
prevSize = getStats(channel).channelTrace.events.size();
- nameResolverFactory.resolvers.get(0).listener.onError(Status.INTERNAL);
+ channel.syncContext.execute(() ->
+ nameResolverFactory.resolvers.get(0).listener.onResult2(
+ ResolutionResult.newBuilder()
+ .setAddressesOrError(
+ StatusOr.fromStatus(Status.INTERNAL)).build()));
assertThat(getStats(channel).channelTrace.events).hasSize(prevSize + 1);
prevSize = getStats(channel).channelTrace.events.size();
- nameResolverFactory.resolvers.get(0).listener.onError(Status.INTERNAL);
+ channel.syncContext.execute(() ->
+ nameResolverFactory.resolvers.get(0).listener.onResult2(
+ ResolutionResult.newBuilder()
+ .setAddressesOrError(
+ StatusOr.fromStatus(Status.INTERNAL)).build()));
assertThat(getStats(channel).channelTrace.events).hasSize(prevSize);
prevSize = getStats(channel).channelTrace.events.size();
@@ -4919,7 +4928,10 @@ final class FakeNameResolver extends NameResolver {
void resolved() {
if (error != null) {
- listener.onError(error);
+ syncContext.execute(() ->
+ listener.onResult2(
+ ResolutionResult.newBuilder()
+ .setAddressesOrError(StatusOr.fromStatus(error)).build()));
return;
}
ResolutionResult.Builder builder =
diff --git a/core/src/test/java/io/grpc/internal/MessageDeframerTest.java b/core/src/test/java/io/grpc/internal/MessageDeframerTest.java
index 1ec1ccb2082..8f1b908e999 100644
--- a/core/src/test/java/io/grpc/internal/MessageDeframerTest.java
+++ b/core/src/test/java/io/grpc/internal/MessageDeframerTest.java
@@ -133,7 +133,7 @@ public void simplePayload() {
assertEquals(Bytes.asList(new byte[]{3, 14}), bytes(producer.getValue().next()));
verify(listener, atLeastOnce()).bytesRead(anyInt());
verifyNoMoreInteractions(listener);
- checkStats(tracer, transportTracer.getStats(), fakeClock, 2, 2);
+ checkStats(tracer, transportTracer.getStats(), fakeClock, useGzipInflatingBuffer, 2, 2);
}
@Test
@@ -148,7 +148,7 @@ public void smallCombinedPayloads() {
verify(listener, atLeastOnce()).bytesRead(anyInt());
assertEquals(Bytes.asList(new byte[]{14, 15}), bytes(streams.get(1).next()));
verifyNoMoreInteractions(listener);
- checkStats(tracer, transportTracer.getStats(), fakeClock, 1, 1, 2, 2);
+ checkStats(tracer, transportTracer.getStats(), fakeClock, useGzipInflatingBuffer, 1, 1, 2, 2);
}
@Test
@@ -162,7 +162,7 @@ public void endOfStreamWithPayloadShouldNotifyEndOfStream() {
verify(listener).deframerClosed(false);
verify(listener, atLeastOnce()).bytesRead(anyInt());
verifyNoMoreInteractions(listener);
- checkStats(tracer, transportTracer.getStats(), fakeClock, 1, 1);
+ checkStats(tracer, transportTracer.getStats(), fakeClock, useGzipInflatingBuffer, 1, 1);
}
@Test
@@ -177,7 +177,7 @@ public void endOfStreamShouldNotifyEndOfStream() {
}
verify(listener).deframerClosed(false);
verifyNoMoreInteractions(listener);
- checkStats(tracer, transportTracer.getStats(), fakeClock);
+ checkStats(tracer, transportTracer.getStats(), fakeClock, false);
}
@Test
@@ -189,7 +189,7 @@ public void endOfStreamWithPartialMessageShouldNotifyDeframerClosedWithPartialMe
verify(listener, atLeastOnce()).bytesRead(anyInt());
verify(listener).deframerClosed(true);
verifyNoMoreInteractions(listener);
- checkStats(tracer, transportTracer.getStats(), fakeClock);
+ checkStats(tracer, transportTracer.getStats(), fakeClock, false);
}
@Test
@@ -206,7 +206,7 @@ public void endOfStreamWithInvalidGzipBlockShouldNotifyDeframerClosedWithPartial
deframer.closeWhenComplete();
verify(listener).deframerClosed(true);
verifyNoMoreInteractions(listener);
- checkStats(tracer, transportTracer.getStats(), fakeClock);
+ checkStats(tracer, transportTracer.getStats(), fakeClock, false);
}
@Test
@@ -228,10 +228,11 @@ public void payloadSplitBetweenBuffers() {
tracer,
transportTracer.getStats(),
fakeClock,
+ true,
7 /* msg size */ + 2 /* second buffer adds two bytes of overhead in deflate block */,
7);
} else {
- checkStats(tracer, transportTracer.getStats(), fakeClock, 7, 7);
+ checkStats(tracer, transportTracer.getStats(), fakeClock, false, 7, 7);
}
}
@@ -248,7 +249,7 @@ public void frameHeaderSplitBetweenBuffers() {
assertEquals(Bytes.asList(new byte[]{3}), bytes(producer.getValue().next()));
verify(listener, atLeastOnce()).bytesRead(anyInt());
verifyNoMoreInteractions(listener);
- checkStats(tracer, transportTracer.getStats(), fakeClock, 1, 1);
+ checkStats(tracer, transportTracer.getStats(), fakeClock, useGzipInflatingBuffer, 1, 1);
}
@Test
@@ -259,7 +260,7 @@ public void emptyPayload() {
assertEquals(Bytes.asList(), bytes(producer.getValue().next()));
verify(listener, atLeastOnce()).bytesRead(anyInt());
verifyNoMoreInteractions(listener);
- checkStats(tracer, transportTracer.getStats(), fakeClock, 0, 0);
+ checkStats(tracer, transportTracer.getStats(), fakeClock, useGzipInflatingBuffer, 0, 0);
}
@Test
@@ -273,9 +274,10 @@ public void largerFrameSize() {
verify(listener, atLeastOnce()).bytesRead(anyInt());
verifyNoMoreInteractions(listener);
if (useGzipInflatingBuffer) {
- checkStats(tracer, transportTracer.getStats(), fakeClock, 8 /* compressed size */, 1000);
+ checkStats(tracer, transportTracer.getStats(), fakeClock,true,
+ 8 /* compressed size */, 1000);
} else {
- checkStats(tracer, transportTracer.getStats(), fakeClock, 1000, 1000);
+ checkStats(tracer, transportTracer.getStats(), fakeClock, false, 1000, 1000);
}
}
@@ -292,7 +294,7 @@ public void endOfStreamCallbackShouldWaitForMessageDelivery() {
verify(listener).deframerClosed(false);
verify(listener, atLeastOnce()).bytesRead(anyInt());
verifyNoMoreInteractions(listener);
- checkStats(tracer, transportTracer.getStats(), fakeClock, 1, 1);
+ checkStats(tracer, transportTracer.getStats(), fakeClock, useGzipInflatingBuffer, 1, 1);
}
@Test
@@ -308,6 +310,7 @@ public void compressed() {
verify(listener).messagesAvailable(producer.capture());
assertEquals(Bytes.asList(new byte[1000]), bytes(producer.getValue().next()));
verify(listener, atLeastOnce()).bytesRead(anyInt());
+ checkStats(tracer, transportTracer.getStats(), fakeClock, true, 29, 1000);
verifyNoMoreInteractions(listener);
}
@@ -502,7 +505,8 @@ public void sizeEnforcingInputStream_markReset() throws IOException {
* @param sizes in the format {wire0, uncompressed0, wire1, uncompressed1, ...}
*/
private static void checkStats(
- TestBaseStreamTracer tracer, TransportStats transportStats, FakeClock clock, long... sizes) {
+ TestBaseStreamTracer tracer, TransportStats transportStats, FakeClock clock,
+ boolean compressed, long... sizes) {
assertEquals(0, sizes.length % 2);
int count = sizes.length / 2;
long expectedWireSize = 0;
@@ -510,7 +514,8 @@ private static void checkStats(
for (int i = 0; i < count; i++) {
assertEquals("inboundMessage(" + i + ")", tracer.nextInboundEvent());
assertEquals(
- String.format(Locale.US, "inboundMessageRead(%d, %d, -1)", i, sizes[i * 2]),
+ String.format(Locale.US, "inboundMessageRead(%d, %d, %d)", i, sizes[i * 2],
+ compressed ? -1 : sizes[i * 2 + 1]),
tracer.nextInboundEvent());
expectedWireSize += sizes[i * 2];
expectedUncompressedSize += sizes[i * 2 + 1];
diff --git a/core/src/test/java/io/grpc/internal/PickFirstLeafLoadBalancerTest.java b/core/src/test/java/io/grpc/internal/PickFirstLeafLoadBalancerTest.java
index 63915bddc99..f0031a6ae62 100644
--- a/core/src/test/java/io/grpc/internal/PickFirstLeafLoadBalancerTest.java
+++ b/core/src/test/java/io/grpc/internal/PickFirstLeafLoadBalancerTest.java
@@ -27,10 +27,12 @@
import static io.grpc.LoadBalancer.HEALTH_CONSUMER_LISTENER_ARG_KEY;
import static io.grpc.LoadBalancer.IS_PETIOLE_POLICY;
import static io.grpc.internal.PickFirstLeafLoadBalancer.CONNECTION_DELAY_INTERVAL_MS;
+import static io.grpc.internal.PickFirstLeafLoadBalancer.isSerializingRetries;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
+import static org.junit.Assume.assumeTrue;
import static org.mockito.AdditionalAnswers.delegatesTo;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
@@ -73,7 +75,6 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.junit.After;
-import org.junit.Assume;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@@ -92,14 +93,22 @@
public class PickFirstLeafLoadBalancerTest {
public static final Status CONNECTION_ERROR =
Status.UNAVAILABLE.withDescription("Simulated connection error");
-
- @Parameterized.Parameters(name = "{0}")
- public static List enableHappyEyeballs() {
- return Arrays.asList(true, false);
+ public static final String GRPC_SERIALIZE_RETRIES = "GRPC_SERIALIZE_RETRIES";
+
+ @Parameterized.Parameters(name = "{0}-{1}")
+ public static List