From 54c23a6f660a220bfc1a5844a7dc668c9e1015d8 Mon Sep 17 00:00:00 2001 From: ggivo Date: Wed, 30 Oct 2024 18:59:50 +0200 Subject: [PATCH] io.lettuce.core.RedisCommandExecutionException: NOAUTH Authentication required on CLIENT and READONLY command Using custom credentials provider can delay providing of credentials. In this case applyPostHandshake and applyConnectionMetadata got executed before handshake and lead to NOAUTH error in the log for CLIENT command. --- pom.xml | 14 ++++- .../java/io/lettuce/core/RedisHandshake.java | 32 ++++++----- .../lettuce/core/RedisHandshakeUnitTests.java | 55 +++++++++++++++++++ 3 files changed, 85 insertions(+), 16 deletions(-) diff --git a/pom.xml b/pom.xml index da7a50f5c5..ddfc4877b2 100644 --- a/pom.xml +++ b/pom.xml @@ -45,6 +45,7 @@ + 4.2.2 3.25.3 2.0.SP1 5.13.11 @@ -103,7 +104,12 @@ - + + org.awaitility + awaitility + ${awaitility.version} + test + io.netty netty-bom @@ -327,6 +333,12 @@ test + + org.awaitility + awaitility + test + + org.hdrhistogram HdrHistogram diff --git a/src/main/java/io/lettuce/core/RedisHandshake.java b/src/main/java/io/lettuce/core/RedisHandshake.java index 21fe829bb4..80dc62096c 100644 --- a/src/main/java/io/lettuce/core/RedisHandshake.java +++ b/src/main/java/io/lettuce/core/RedisHandshake.java @@ -101,21 +101,14 @@ public CompletionStage initialize(Channel channel) { new RedisConnectionException("Protocol version" + this.requestedProtocolVersion + " not supported")); } - // post-handshake commands, whose execution failures would cause the connection to be considered - // unsuccessfully established - CompletableFuture postHandshake = applyPostHandshake(channel); - - // post-handshake commands, executed in a 'fire and forget' manner, to avoid having to react to different - // implementations or versions of the server runtime, and whose execution result (whether a success or a - // failure ) should not alter the outcome of the connection attempt - CompletableFuture connectionMetadata = applyConnectionMetadata(channel).handle((result, error) -> { - if (error != null) { - LOG.debug("Error applying connection metadata", error); - } - return null; - }); - - return handshake.thenCompose(ignore -> postHandshake).thenCompose(ignore -> connectionMetadata); + return handshake + // post-handshake commands, whose execution failures would cause the connection to be considered + // unsuccessfully established + .thenCompose(ignore -> applyPostHandshake(channel)) + // post-handshake commands, executed in a 'fire and forget' manner, to avoid having to react to different + // implementations or versions of the server runtime, and whose execution result (whether a success or a + // failure ) should not alter the outcome of the connection attempt + .thenCompose(ignore -> applyConnectionMetadataSafely(channel)); } private CompletionStage tryHandshakeResp3(Channel channel) { @@ -271,6 +264,15 @@ private CompletableFuture applyPostHandshake(Channel channel) { return dispatch(channel, postHandshake); } + private CompletionStage applyConnectionMetadataSafely(Channel channel) { + return applyConnectionMetadata(channel).handle((result, error) -> { + if (error != null) { + LOG.debug("Error applying connection metadata", error); + } + return null; + }); + } + private CompletableFuture applyConnectionMetadata(Channel channel) { List> postHandshake = new ArrayList<>(); diff --git a/src/test/java/io/lettuce/core/RedisHandshakeUnitTests.java b/src/test/java/io/lettuce/core/RedisHandshakeUnitTests.java index d4b968c4f3..b01fd810fa 100644 --- a/src/test/java/io/lettuce/core/RedisHandshakeUnitTests.java +++ b/src/test/java/io/lettuce/core/RedisHandshakeUnitTests.java @@ -1,6 +1,7 @@ package io.lettuce.core; import static io.lettuce.TestTags.UNIT_TEST; +import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.assertj.core.api.Assertions.*; import java.nio.ByteBuffer; @@ -8,6 +9,7 @@ import java.util.Map; import java.util.concurrent.CompletionStage; +import org.awaitility.Awaitility; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; @@ -15,6 +17,8 @@ import io.lettuce.core.protocol.AsyncCommand; import io.lettuce.core.protocol.ProtocolVersion; import io.netty.channel.embedded.EmbeddedChannel; +import reactor.core.publisher.Mono; +import reactor.core.publisher.Sinks; /** * Unit tests for {@link RedisHandshake}. @@ -106,6 +110,42 @@ void handshakeFireAndForgetPostHandshake() { assertThat(handshakeInit.toCompletableFuture().isCompletedExceptionally()).isFalse(); } + @Test + void handshakeDelayedCredentialProvider() { + + DelayedRedisCredentialsProvider cp = new DelayedRedisCredentialsProvider(); + // RedisCredentialsProvider cp = () -> Mono.just(RedisCredentials.just("foo", + // "bar")).delayElement(Duration.ofMillis(3)); + EmbeddedChannel channel = new EmbeddedChannel(true, false); + + ConnectionMetadata connectionMetdata = new ConnectionMetadata(); + connectionMetdata.setLibraryName("library-name"); + connectionMetdata.setLibraryVersion("library-version"); + + ConnectionState state = new ConnectionState(); + state.setCredentialsProvider(cp); + state.apply(connectionMetdata); + RedisHandshake handshake = new RedisHandshake(null, false, state); + CompletionStage handshakeInit = handshake.initialize(channel); + cp.completeCredentials(RedisCredentials.just("foo", "bar")); + + Awaitility.await().atMost(50, MILLISECONDS) // Wait up to 5 seconds + .pollInterval(5, MILLISECONDS) // Poll every 50 milliseconds + .until(() -> !channel.outboundMessages().isEmpty()); + + AsyncCommand> hello = channel.readOutbound(); + helloResponse(hello.getOutput()); + hello.complete(); + + List>> postHandshake = channel.readOutbound(); + postHandshake.get(0).getOutput().setError(ERR_UNKNOWN_COMMAND); + postHandshake.get(0).completeExceptionally(new RedisException(ERR_UNKNOWN_COMMAND)); + postHandshake.get(0).complete(); + + assertThat(postHandshake.size()).isEqualTo(2); + assertThat(handshakeInit.toCompletableFuture().isCompletedExceptionally()).isFalse(); + } + @Test void shouldParseVersionWithCharacters() { @@ -136,4 +176,19 @@ private static void helloResponse(CommandOutput credentialsSink = Sinks.one(); + + @Override + public Mono resolveCredentials() { + return credentialsSink.asMono(); + } + + public void completeCredentials(RedisCredentials credentials) { + credentialsSink.tryEmitValue(credentials); + } + + } + }