Skip to content

Commit

Permalink
Add authentication handler to ClusterPubSub connections
Browse files Browse the repository at this point in the history
  • Loading branch information
ggivo committed Dec 18, 2024
1 parent 746dd82 commit 31341f1
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 0 deletions.
6 changes: 6 additions & 0 deletions src/main/java/io/lettuce/core/RedisAuthenticationHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,12 @@ public static <K, V> RedisAuthenticationHandler<K, V> createHandler(StatefulRedi
RedisCredentialsProvider credentialsProvider, Boolean isPubSubConnection, ClientOptions options) {

if (isSupported(options)) {

if (isPubSubConnection && options.getConfiguredProtocolVersion() == ProtocolVersion.RESP2) {
throw new RedisConnectionException(
"Renewable credentials are not supported with RESP2 protocol on a pub/sub connection.");
}

return new RedisAuthenticationHandler<>(connection, credentialsProvider, isPubSubConnection);
}

Expand Down
2 changes: 2 additions & 0 deletions src/main/java/io/lettuce/core/cluster/RedisClusterClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -780,6 +780,8 @@ private <K, V> CompletableFuture<StatefulRedisClusterPubSubConnection<K, V>> con

clusterWriter.setClusterConnectionProvider(pooledClusterConnectionProvider);
connection.setPartitions(partitions);
connection.setAuthenticationHandler(
createHandler(connection, getFirstUri().getCredentialsProvider(), true, getOptions()));

Supplier<CommandHandler> commandHandlerSupplier = () -> new PubSubCommandHandler<>(getClusterClientOptions(),
getResources(), codec, endpoint);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

import javax.inject.Inject;

import io.lettuce.core.protocol.ProtocolVersion;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Tag;
Expand Down Expand Up @@ -219,6 +220,23 @@ void connectPubSubCodecSentinelMissingHostAndSocketUri() {
assertThatThrownBy(() -> client.connectPubSub(UTF8, invalidSentinel())).isInstanceOf(IllegalArgumentException.class);
}

@Test
void connectPubSubAsyncReauthNotSupportedWithRESP2() {
ClientOptions.ReauthenticateBehavior reauth = client.getOptions().getReauthenticateBehaviour();
ProtocolVersion protocolVersion = client.getOptions().getConfiguredProtocolVersion();
try {
client.setOptions(client.getOptions().mutate().protocolVersion(ProtocolVersion.RESP2)
.reauthenticateBehavior(ClientOptions.ReauthenticateBehavior.ON_NEW_CREDENTIALS).build());

RedisURI redisURI = redis(host, port).build();
assertThatThrownBy(() -> client.connectPubSubAsync(UTF8, redisURI)).isInstanceOf(RedisConnectionException.class);

} finally {
client.setOptions(
client.getOptions().mutate().protocolVersion(protocolVersion).reauthenticateBehavior(reauth).build());
}
}

/*
* Sentinel Stateful
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
package io.lettuce.core.cluster;

import static io.lettuce.TestTags.INTEGRATION_TEST;
import static io.lettuce.core.codec.StringCodec.UTF8;
import static org.assertj.core.api.Assertions.*;

import java.time.Duration;
import java.util.concurrent.ExecutionException;

import javax.inject.Inject;

import io.lettuce.core.ClientOptions;
import io.lettuce.core.RedisConnectionException;
import io.lettuce.core.protocol.ProtocolVersion;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -80,4 +84,23 @@ void shouldApplyTimeoutOptionsToPubSubClusterConnection() throws InterruptedExce
Thread.sleep(300);
}

@Test
void connectPubSubAsyncReauthNotSupportedWithRESP2() {

ClientOptions.ReauthenticateBehavior reauth = clusterClient.getClusterClientOptions().getReauthenticateBehaviour();
ProtocolVersion protocolVersion = clusterClient.getClusterClientOptions().getConfiguredProtocolVersion();

try {
clusterClient.setOptions(clusterClient.getClusterClientOptions().mutate().protocolVersion(ProtocolVersion.RESP2)
.reauthenticateBehavior(ClientOptions.ReauthenticateBehavior.ON_NEW_CREDENTIALS).build());
assertThatThrownBy(() -> clusterClient.connectPubSub(UTF8)).isInstanceOf(RedisConnectionException.class);

} finally {

clusterClient.setOptions(clusterClient.getClusterClientOptions().mutate().protocolVersion(protocolVersion)
.reauthenticateBehavior(reauth).build());
}

}

}

0 comments on commit 31341f1

Please sign in to comment.