Skip to content

Commit

Permalink
io.lettuce.core.RedisCommandExecutionException: NOAUTH Authentication…
Browse files Browse the repository at this point in the history
… required on CLIENT and READONLY command (redis#3035)

Using custom credentials provider can delay providing of credentials. In this case applyPostHandshake and applyConnectionMetadata got executed before handshake and lead to NOAUTH error in the log for CLIENT command.
  • Loading branch information
ggivo authored Oct 31, 2024
1 parent 114755d commit 7f455ec
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 16 deletions.
14 changes: 13 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
</developers>

<properties>
<awaitility.version>4.2.2</awaitility.version>
<assertj-core.version>3.25.3</assertj-core.version>
<cdi-api.version>2.0.SP1</cdi-api.version>
<brave.version>5.13.11</brave.version>
Expand Down Expand Up @@ -103,7 +104,12 @@

<dependencyManagement>
<dependencies>

<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<version>${awaitility.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-bom</artifactId>
Expand Down Expand Up @@ -327,6 +333,12 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.hdrhistogram</groupId>
<artifactId>HdrHistogram</artifactId>
Expand Down
32 changes: 17 additions & 15 deletions src/main/java/io/lettuce/core/RedisHandshake.java
Original file line number Diff line number Diff line change
Expand Up @@ -101,21 +101,14 @@ public CompletionStage<Void> initialize(Channel channel) {
new RedisConnectionException("Protocol version" + this.requestedProtocolVersion + " not supported"));
}

// post-handshake commands, whose execution failures would cause the connection to be considered
// unsuccessfully established
CompletableFuture<Void> postHandshake = applyPostHandshake(channel);

// post-handshake commands, executed in a 'fire and forget' manner, to avoid having to react to different
// implementations or versions of the server runtime, and whose execution result (whether a success or a
// failure ) should not alter the outcome of the connection attempt
CompletableFuture<Void> connectionMetadata = applyConnectionMetadata(channel).handle((result, error) -> {
if (error != null) {
LOG.debug("Error applying connection metadata", error);
}
return null;
});

return handshake.thenCompose(ignore -> postHandshake).thenCompose(ignore -> connectionMetadata);
return handshake
// post-handshake commands, whose execution failures would cause the connection to be considered
// unsuccessfully established
.thenCompose(ignore -> applyPostHandshake(channel))
// post-handshake commands, executed in a 'fire and forget' manner, to avoid having to react to different
// implementations or versions of the server runtime, and whose execution result (whether a success or a
// failure ) should not alter the outcome of the connection attempt
.thenCompose(ignore -> applyConnectionMetadataSafely(channel));
}

private CompletionStage<?> tryHandshakeResp3(Channel channel) {
Expand Down Expand Up @@ -271,6 +264,15 @@ private CompletableFuture<Void> applyPostHandshake(Channel channel) {
return dispatch(channel, postHandshake);
}

private CompletionStage<Void> applyConnectionMetadataSafely(Channel channel) {
return applyConnectionMetadata(channel).handle((result, error) -> {
if (error != null) {
LOG.debug("Error applying connection metadata", error);
}
return null;
});
}

private CompletableFuture<Void> applyConnectionMetadata(Channel channel) {

List<AsyncCommand<?, ?, ?>> postHandshake = new ArrayList<>();
Expand Down
55 changes: 55 additions & 0 deletions src/test/java/io/lettuce/core/RedisHandshakeUnitTests.java
Original file line number Diff line number Diff line change
@@ -1,20 +1,24 @@
package io.lettuce.core;

import static io.lettuce.TestTags.UNIT_TEST;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.assertj.core.api.Assertions.*;

import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletionStage;

import org.awaitility.Awaitility;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;

import io.lettuce.core.output.CommandOutput;
import io.lettuce.core.protocol.AsyncCommand;
import io.lettuce.core.protocol.ProtocolVersion;
import io.netty.channel.embedded.EmbeddedChannel;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;

/**
* Unit tests for {@link RedisHandshake}.
Expand Down Expand Up @@ -106,6 +110,42 @@ void handshakeFireAndForgetPostHandshake() {
assertThat(handshakeInit.toCompletableFuture().isCompletedExceptionally()).isFalse();
}

@Test
void handshakeDelayedCredentialProvider() {

DelayedRedisCredentialsProvider cp = new DelayedRedisCredentialsProvider();
// RedisCredentialsProvider cp = () -> Mono.just(RedisCredentials.just("foo",
// "bar")).delayElement(Duration.ofMillis(3));
EmbeddedChannel channel = new EmbeddedChannel(true, false);

ConnectionMetadata connectionMetdata = new ConnectionMetadata();
connectionMetdata.setLibraryName("library-name");
connectionMetdata.setLibraryVersion("library-version");

ConnectionState state = new ConnectionState();
state.setCredentialsProvider(cp);
state.apply(connectionMetdata);
RedisHandshake handshake = new RedisHandshake(null, false, state);
CompletionStage<Void> handshakeInit = handshake.initialize(channel);
cp.completeCredentials(RedisCredentials.just("foo", "bar"));

Awaitility.await().atMost(50, MILLISECONDS) // Wait up to 5 seconds
.pollInterval(5, MILLISECONDS) // Poll every 50 milliseconds
.until(() -> !channel.outboundMessages().isEmpty());

AsyncCommand<String, String, Map<String, String>> hello = channel.readOutbound();
helloResponse(hello.getOutput());
hello.complete();

List<AsyncCommand<String, String, Map<String, String>>> postHandshake = channel.readOutbound();
postHandshake.get(0).getOutput().setError(ERR_UNKNOWN_COMMAND);
postHandshake.get(0).completeExceptionally(new RedisException(ERR_UNKNOWN_COMMAND));
postHandshake.get(0).complete();

assertThat(postHandshake.size()).isEqualTo(2);
assertThat(handshakeInit.toCompletableFuture().isCompletedExceptionally()).isFalse();
}

@Test
void shouldParseVersionWithCharacters() {

Expand Down Expand Up @@ -136,4 +176,19 @@ private static void helloResponse(CommandOutput<String, String, Map<String, Stri
output.set(ByteBuffer.wrap("1.2.3".getBytes()));
}

static class DelayedRedisCredentialsProvider implements RedisCredentialsProvider {

private final Sinks.One<RedisCredentials> credentialsSink = Sinks.one();

@Override
public Mono<RedisCredentials> resolveCredentials() {
return credentialsSink.asMono();
}

public void completeCredentials(RedisCredentials credentials) {
credentialsSink.tryEmitValue(credentials);
}

}

}

0 comments on commit 7f455ec

Please sign in to comment.