diff --git a/pom.xml b/pom.xml index da7a50f5c..ddfc4877b 100644 --- a/pom.xml +++ b/pom.xml @@ -45,6 +45,7 @@ + 4.2.2 3.25.3 2.0.SP1 5.13.11 @@ -103,7 +104,12 @@ - + + org.awaitility + awaitility + ${awaitility.version} + test + io.netty netty-bom @@ -327,6 +333,12 @@ test + + org.awaitility + awaitility + test + + org.hdrhistogram HdrHistogram diff --git a/src/main/java/io/lettuce/core/RedisHandshake.java b/src/main/java/io/lettuce/core/RedisHandshake.java index 21fe829bb..80dc62096 100644 --- a/src/main/java/io/lettuce/core/RedisHandshake.java +++ b/src/main/java/io/lettuce/core/RedisHandshake.java @@ -101,21 +101,14 @@ public CompletionStage initialize(Channel channel) { new RedisConnectionException("Protocol version" + this.requestedProtocolVersion + " not supported")); } - // post-handshake commands, whose execution failures would cause the connection to be considered - // unsuccessfully established - CompletableFuture postHandshake = applyPostHandshake(channel); - - // post-handshake commands, executed in a 'fire and forget' manner, to avoid having to react to different - // implementations or versions of the server runtime, and whose execution result (whether a success or a - // failure ) should not alter the outcome of the connection attempt - CompletableFuture connectionMetadata = applyConnectionMetadata(channel).handle((result, error) -> { - if (error != null) { - LOG.debug("Error applying connection metadata", error); - } - return null; - }); - - return handshake.thenCompose(ignore -> postHandshake).thenCompose(ignore -> connectionMetadata); + return handshake + // post-handshake commands, whose execution failures would cause the connection to be considered + // unsuccessfully established + .thenCompose(ignore -> applyPostHandshake(channel)) + // post-handshake commands, executed in a 'fire and forget' manner, to avoid having to react to different + // implementations or versions of the server runtime, and whose execution result (whether a success or a + // failure ) should not alter the outcome of the connection attempt + .thenCompose(ignore -> applyConnectionMetadataSafely(channel)); } private CompletionStage tryHandshakeResp3(Channel channel) { @@ -271,6 +264,15 @@ private CompletableFuture applyPostHandshake(Channel channel) { return dispatch(channel, postHandshake); } + private CompletionStage applyConnectionMetadataSafely(Channel channel) { + return applyConnectionMetadata(channel).handle((result, error) -> { + if (error != null) { + LOG.debug("Error applying connection metadata", error); + } + return null; + }); + } + private CompletableFuture applyConnectionMetadata(Channel channel) { List> postHandshake = new ArrayList<>(); diff --git a/src/test/java/io/lettuce/core/RedisHandshakeUnitTests.java b/src/test/java/io/lettuce/core/RedisHandshakeUnitTests.java index d4b968c4f..b01fd810f 100644 --- a/src/test/java/io/lettuce/core/RedisHandshakeUnitTests.java +++ b/src/test/java/io/lettuce/core/RedisHandshakeUnitTests.java @@ -1,6 +1,7 @@ package io.lettuce.core; import static io.lettuce.TestTags.UNIT_TEST; +import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.assertj.core.api.Assertions.*; import java.nio.ByteBuffer; @@ -8,6 +9,7 @@ import java.util.Map; import java.util.concurrent.CompletionStage; +import org.awaitility.Awaitility; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; @@ -15,6 +17,8 @@ import io.lettuce.core.protocol.AsyncCommand; import io.lettuce.core.protocol.ProtocolVersion; import io.netty.channel.embedded.EmbeddedChannel; +import reactor.core.publisher.Mono; +import reactor.core.publisher.Sinks; /** * Unit tests for {@link RedisHandshake}. @@ -106,6 +110,42 @@ void handshakeFireAndForgetPostHandshake() { assertThat(handshakeInit.toCompletableFuture().isCompletedExceptionally()).isFalse(); } + @Test + void handshakeDelayedCredentialProvider() { + + DelayedRedisCredentialsProvider cp = new DelayedRedisCredentialsProvider(); + // RedisCredentialsProvider cp = () -> Mono.just(RedisCredentials.just("foo", + // "bar")).delayElement(Duration.ofMillis(3)); + EmbeddedChannel channel = new EmbeddedChannel(true, false); + + ConnectionMetadata connectionMetdata = new ConnectionMetadata(); + connectionMetdata.setLibraryName("library-name"); + connectionMetdata.setLibraryVersion("library-version"); + + ConnectionState state = new ConnectionState(); + state.setCredentialsProvider(cp); + state.apply(connectionMetdata); + RedisHandshake handshake = new RedisHandshake(null, false, state); + CompletionStage handshakeInit = handshake.initialize(channel); + cp.completeCredentials(RedisCredentials.just("foo", "bar")); + + Awaitility.await().atMost(50, MILLISECONDS) // Wait up to 5 seconds + .pollInterval(5, MILLISECONDS) // Poll every 50 milliseconds + .until(() -> !channel.outboundMessages().isEmpty()); + + AsyncCommand> hello = channel.readOutbound(); + helloResponse(hello.getOutput()); + hello.complete(); + + List>> postHandshake = channel.readOutbound(); + postHandshake.get(0).getOutput().setError(ERR_UNKNOWN_COMMAND); + postHandshake.get(0).completeExceptionally(new RedisException(ERR_UNKNOWN_COMMAND)); + postHandshake.get(0).complete(); + + assertThat(postHandshake.size()).isEqualTo(2); + assertThat(handshakeInit.toCompletableFuture().isCompletedExceptionally()).isFalse(); + } + @Test void shouldParseVersionWithCharacters() { @@ -136,4 +176,19 @@ private static void helloResponse(CommandOutput credentialsSink = Sinks.one(); + + @Override + public Mono resolveCredentials() { + return credentialsSink.asMono(); + } + + public void completeCredentials(RedisCredentials credentials) { + credentialsSink.tryEmitValue(credentials); + } + + } + }