From 24b901ad14d409663619a6c201235a3bfb12de4e Mon Sep 17 00:00:00 2001 From: ggivo Date: Fri, 29 Nov 2024 11:33:50 +0200 Subject: [PATCH] TBA auth support for cluster connections --- .../core/RedisAuthenticationHandler.java | 105 ++---------------- .../RenewableRedisCredentialsProvider.java | 9 -- .../core/StreamingCredentialsProvider.java | 15 +++ .../TokenBasedRedisCredentialsProvider.java | 6 +- .../RedisClusterAuthenticationHandler.java | 46 ++++++++ .../StatefulRedisClusterConnectionImpl.java | 54 +++++---- ...okenBasedRedisCredentialsProviderTest.java | 8 +- 7 files changed, 109 insertions(+), 134 deletions(-) delete mode 100644 src/main/java/io/lettuce/core/RenewableRedisCredentialsProvider.java create mode 100644 src/main/java/io/lettuce/core/StreamingCredentialsProvider.java create mode 100644 src/main/java/io/lettuce/core/cluster/RedisClusterAuthenticationHandler.java diff --git a/src/main/java/io/lettuce/core/RedisAuthenticationHandler.java b/src/main/java/io/lettuce/core/RedisAuthenticationHandler.java index 96abaff08..b4e29f8c2 100644 --- a/src/main/java/io/lettuce/core/RedisAuthenticationHandler.java +++ b/src/main/java/io/lettuce/core/RedisAuthenticationHandler.java @@ -19,125 +19,34 @@ */ package io.lettuce.core; -import io.lettuce.core.codec.StringCodec; -import io.lettuce.core.protocol.AsyncCommand; +import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection; import io.lettuce.core.protocol.ProtocolVersion; -import io.lettuce.core.protocol.RedisCommand; -import io.lettuce.core.pubsub.StatefulRedisPubSubConnectionImpl; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; -import reactor.core.Disposable; -import reactor.core.publisher.Flux; - -import java.nio.CharBuffer; -import java.util.concurrent.atomic.AtomicReference; /** * Handles reauthentication of a connection each time a new authentication token is provided by * `RenewableRedisCredentialsProvider`. - * + *

* This class is part of the internal API. * * @author Ivo Gaydajiev */ -class RedisAuthenticationHandler { - - private static final InternalLogger log = InternalLoggerFactory.getInstance(RedisAuthenticationHandler.class); - - private final RedisCommandBuilder commandBuilder = new RedisCommandBuilder<>(StringCodec.UTF8); +class RedisAuthenticationHandler extends BaseRedisAuthenticationHandler> { - private final StatefulRedisConnectionImpl connection; - - private final AtomicReference credentialsSubscription = new AtomicReference<>(); + private static final InternalLogger logger = InternalLoggerFactory.getInstance(RedisChannelHandler.class); public RedisAuthenticationHandler(StatefulRedisConnectionImpl connection) { - this.connection = connection; - } - - /** - * Subscribes to the provided `Flux` of credentials if the given `RedisCredentialsProvider` supports streaming credentials. - * - * Each time new credentials are received, the client is reauthenticated. - * - * @param credentialsProvider the credentials provider to subscribe to - */ - public void subscribe(RedisCredentialsProvider credentialsProvider) { - if (credentialsProvider instanceof RenewableRedisCredentialsProvider) { - if (!isSupportedConnection()) { - return; - } - - Flux credentialsFlux = ((RenewableRedisCredentialsProvider) credentialsProvider) - .credentialsStream(); - - Disposable subscription = credentialsFlux.subscribe(this::onNext, this::onError, this::complete); - - Disposable oldSubscription = credentialsSubscription.getAndSet(subscription); - if (oldSubscription != null && !oldSubscription.isDisposed()) { - oldSubscription.dispose(); - } - } - } - - /** - * Unsubscribes from the current credentials stream. - */ - public void unsubscribe() { - Disposable subscription = credentialsSubscription.getAndSet(null); - if (subscription != null && !subscription.isDisposed()) { - subscription.dispose(); - } - } - - private void complete() { - log.debug("Credentials stream completed"); - } - - public void onNext(RedisCredentials credentials) { - reauthenticate(credentials); - } - - public void onError(Throwable e) { - log.error("Credentials renew failed.", e); - } - - /** - * Performs re-authentication with the provided credentials. - * - * @param credentials the new credentials - */ - private void reauthenticate(RedisCredentials credentials) { - CharSequence password = CharBuffer.wrap(credentials.getPassword()); - if (credentials.hasUsername()) { - dispatchAuth(new AsyncCommand<>(commandBuilder.auth(credentials.getUsername(), password))) - .exceptionally(throwable -> { - log.error("Re-authentication with username failed.", throwable); - return null; - }); - } else { - dispatchAuth(new AsyncCommand<>(commandBuilder.auth(password))).exceptionally(throwable -> { - log.error("Re-authentication without username failed.", throwable); - return null; - }); - } + super(connection); } protected boolean isSupportedConnection() { - if (connection instanceof StatefulRedisPubSubConnectionImpl + if (connection instanceof StatefulRedisClusterPubSubConnection && ProtocolVersion.RESP2 == connection.getConnectionState().getNegotiatedProtocolVersion()) { - log.warn("Renewable credentials are not supported with RESP2 protocol on a pub/sub connection."); + logger.warn("Renewable credentials are not supported with RESP2 protocol on a pub/sub connection."); return false; } return true; } - public AsyncCommand dispatchAuth(RedisCommand cmd) { - AsyncCommand asyncCommand = new AsyncCommand<>(cmd); - RedisCommand dispatched = connection.getChannelWriter().write(asyncCommand); - if (dispatched instanceof AsyncCommand) { - return (AsyncCommand) dispatched; - } - return asyncCommand; - } - } diff --git a/src/main/java/io/lettuce/core/RenewableRedisCredentialsProvider.java b/src/main/java/io/lettuce/core/RenewableRedisCredentialsProvider.java deleted file mode 100644 index 61340ea9c..000000000 --- a/src/main/java/io/lettuce/core/RenewableRedisCredentialsProvider.java +++ /dev/null @@ -1,9 +0,0 @@ -package io.lettuce.core; - -import reactor.core.publisher.Flux; - -public interface RenewableRedisCredentialsProvider extends RedisCredentialsProvider { - - Flux credentialsStream(); - -} diff --git a/src/main/java/io/lettuce/core/StreamingCredentialsProvider.java b/src/main/java/io/lettuce/core/StreamingCredentialsProvider.java new file mode 100644 index 000000000..08ab89850 --- /dev/null +++ b/src/main/java/io/lettuce/core/StreamingCredentialsProvider.java @@ -0,0 +1,15 @@ +package io.lettuce.core; + +import reactor.core.publisher.Flux; + +public interface StreamingCredentialsProvider extends RedisCredentialsProvider { + + /** + * Returns a {@link Flux} emitting {@link RedisCredentials} that can be used to authorize a Redis connection. This + * credential provider supports streaming credentials, meaning that it can emit multiple credentials over time. + * + * @return + */ + Flux credentials(); + +} diff --git a/src/main/java/io/lettuce/core/TokenBasedRedisCredentialsProvider.java b/src/main/java/io/lettuce/core/TokenBasedRedisCredentialsProvider.java index 875200e74..54d857b37 100644 --- a/src/main/java/io/lettuce/core/TokenBasedRedisCredentialsProvider.java +++ b/src/main/java/io/lettuce/core/TokenBasedRedisCredentialsProvider.java @@ -1,7 +1,5 @@ package io.lettuce.core; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -11,7 +9,7 @@ import redis.clients.authentication.core.TokenListener; import redis.clients.authentication.core.TokenManager; -public class TokenBasedRedisCredentialsProvider implements RenewableRedisCredentialsProvider { +public class TokenBasedRedisCredentialsProvider implements StreamingCredentialsProvider { private final TokenManager tokenManager; @@ -66,7 +64,7 @@ public Mono resolveCredentials() { * Expose the Flux for all credential updates. */ @Override - public Flux credentialsStream() { + public Flux credentials() { return credentialsSink.asFlux().onBackpressureLatest(); // Provide a continuous stream of credentials } diff --git a/src/main/java/io/lettuce/core/cluster/RedisClusterAuthenticationHandler.java b/src/main/java/io/lettuce/core/cluster/RedisClusterAuthenticationHandler.java new file mode 100644 index 000000000..29584c404 --- /dev/null +++ b/src/main/java/io/lettuce/core/cluster/RedisClusterAuthenticationHandler.java @@ -0,0 +1,46 @@ +/* + * Copyright 2019-Present, Redis Ltd. and Contributors + * All rights reserved. + * + * Licensed under the MIT License. + * + * This file contains contributions from third-party contributors + * licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lettuce.core.cluster; + +import io.lettuce.core.*; +import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection; + +import io.lettuce.core.protocol.*; +import io.netty.util.internal.logging.InternalLogger; +import io.netty.util.internal.logging.InternalLoggerFactory; + +class RedisClusterAuthenticationHandler extends BaseRedisAuthenticationHandler> { + + private static final InternalLogger logger = InternalLoggerFactory.getInstance(RedisChannelHandler.class); + + public RedisClusterAuthenticationHandler(StatefulRedisClusterConnectionImpl connection) { + super(connection); + } + + protected boolean isSupportedConnection() { + if (connection instanceof StatefulRedisClusterPubSubConnection + && ProtocolVersion.RESP2 == connection.getConnectionState().getNegotiatedProtocolVersion()) { + logger.warn("Renewable credentials are not supported with RESP2 protocol on a pub/sub connection."); + return false; + } + return true; + } + +} diff --git a/src/main/java/io/lettuce/core/cluster/StatefulRedisClusterConnectionImpl.java b/src/main/java/io/lettuce/core/cluster/StatefulRedisClusterConnectionImpl.java index df30f4888..b7851a561 100644 --- a/src/main/java/io/lettuce/core/cluster/StatefulRedisClusterConnectionImpl.java +++ b/src/main/java/io/lettuce/core/cluster/StatefulRedisClusterConnectionImpl.java @@ -19,24 +19,13 @@ */ package io.lettuce.core.cluster; -import static io.lettuce.core.protocol.CommandType.*; - -import java.lang.reflect.InvocationHandler; -import java.lang.reflect.Proxy; -import java.time.Duration; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.function.Consumer; -import java.util.stream.Collectors; - import io.lettuce.core.AbstractRedisClient; import io.lettuce.core.ClientOptions; import io.lettuce.core.ConnectionState; import io.lettuce.core.ReadFrom; import io.lettuce.core.RedisChannelHandler; import io.lettuce.core.RedisChannelWriter; +import io.lettuce.core.RedisCredentialsProvider; import io.lettuce.core.RedisException; import io.lettuce.core.RedisURI; import io.lettuce.core.api.StatefulRedisConnection; @@ -60,9 +49,23 @@ import io.lettuce.core.protocol.RedisCommand; import reactor.core.publisher.Mono; +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.Proxy; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; +import java.util.stream.Collectors; + +import static io.lettuce.core.protocol.CommandType.AUTH; +import static io.lettuce.core.protocol.CommandType.READONLY; +import static io.lettuce.core.protocol.CommandType.READWRITE; + /** * A thread-safe connection to a Redis Cluster. Multiple threads may share one {@link StatefulRedisClusterConnectionImpl} - * + *

* A {@link ConnectionWatchdog} monitors each connection and reconnects automatically until {@link #close} is called. All * pending commands will be (re)sent after successful reconnection. * @@ -88,6 +91,8 @@ public class StatefulRedisClusterConnectionImpl extends RedisChannelHandle private volatile Partitions partitions; + private final RedisClusterAuthenticationHandler authHandler; + /** * Initialize a new connection. * @@ -107,6 +112,8 @@ public StatefulRedisClusterConnectionImpl(RedisChannelWriter writer, ClusterPush this.async = newRedisAdvancedClusterAsyncCommandsImpl(); this.sync = newRedisAdvancedClusterCommandsImpl(); this.reactive = newRedisAdvancedClusterReactiveCommandsImpl(); + + this.authHandler = new RedisClusterAuthenticationHandler(this); } protected RedisAdvancedClusterReactiveCommandsImpl newRedisAdvancedClusterReactiveCommandsImpl() { @@ -187,8 +194,7 @@ public CompletableFuture> getConnectionAsync(Strin throw new RedisException("NodeId " + nodeId + " does not belong to the cluster"); } - AsyncClusterConnectionProvider provider = (AsyncClusterConnectionProvider) getClusterDistributionChannelWriter() - .getClusterConnectionProvider(); + AsyncClusterConnectionProvider provider = (AsyncClusterConnectionProvider) getClusterDistributionChannelWriter().getClusterConnectionProvider(); return provider.getConnectionAsync(connectionIntent, nodeId); } @@ -203,8 +209,7 @@ public StatefulRedisConnection getConnection(String host, int port, Connec public CompletableFuture> getConnectionAsync(String host, int port, ConnectionIntent connectionIntent) { - AsyncClusterConnectionProvider provider = (AsyncClusterConnectionProvider) getClusterDistributionChannelWriter() - .getClusterConnectionProvider(); + AsyncClusterConnectionProvider provider = (AsyncClusterConnectionProvider) getClusterDistributionChannelWriter().getClusterConnectionProvider(); return provider.getConnectionAsync(connectionIntent, host, port); } @@ -213,6 +218,17 @@ public CompletableFuture> getConnectionAsync(Strin public void activated() { super.activated(); async.clusterMyId().thenAccept(connectionState::setNodeId); + RedisCredentialsProvider credentialsProvider = connectionState.getCredentialsProvider(); + if (credentialsProvider != null && authHandler != null) { + authHandler.subscribe(credentialsProvider); + } + } + + @Override + public void deactivated() { + if (authHandler != null) { + authHandler.unsubscribe(); + } } ClusterDistributionChannelWriter getClusterDistributionChannelWriter() { @@ -249,8 +265,8 @@ private RedisCommand preProcessCommand(RedisCommand comman } else { List stringArgs = CommandArgsAccessor.getStringArguments(command.getArgs()); - this.connectionState - .setUserNamePassword(stringArgs.stream().map(String::toCharArray).collect(Collectors.toList())); + this.connectionState.setUserNamePassword( + stringArgs.stream().map(String::toCharArray).collect(Collectors.toList())); } } }); diff --git a/src/test/java/io/lettuce/core/TokenBasedRedisCredentialsProviderTest.java b/src/test/java/io/lettuce/core/TokenBasedRedisCredentialsProviderTest.java index e336a6d9d..f36085edf 100644 --- a/src/test/java/io/lettuce/core/TokenBasedRedisCredentialsProviderTest.java +++ b/src/test/java/io/lettuce/core/TokenBasedRedisCredentialsProviderTest.java @@ -87,8 +87,8 @@ public void shouldWaitForAndReturnTokenWhenEmittedLater() { @Test public void shouldCompleteAllSubscribersOnStop() { - Flux credentialsFlux1 = credentialsProvider.credentialsStream(); - Flux credentialsFlux2 = credentialsProvider.credentialsStream(); + Flux credentialsFlux1 = credentialsProvider.credentials(); + Flux credentialsFlux2 = credentialsProvider.credentials(); Disposable subscription1 = credentialsFlux1.subscribe(); Disposable subscription2 = credentialsFlux2.subscribe(); @@ -116,7 +116,7 @@ public void shouldCompleteAllSubscribersOnStop() { @Test public void shouldPropagateMultipleTokensOnStream() { - Flux result = credentialsProvider.credentialsStream(); + Flux result = credentialsProvider.credentials(); StepVerifier.create(result).then(() -> tokenManager.emitToken(testToken("test-user", "token1"))) .then(() -> tokenManager.emitToken(testToken("test-user", "token2"))) .assertNext(credentials -> assertThat(String.valueOf(credentials.getPassword())).isEqualTo("token1")) @@ -129,7 +129,7 @@ public void shouldHandleTokenRequestErrorGracefully() { Exception simulatedError = new RuntimeException("Token request failed"); tokenManager.emitError(simulatedError); - Flux result = credentialsProvider.credentialsStream(); + Flux result = credentialsProvider.credentials(); StepVerifier.create(result).expectErrorMatches( throwable -> throwable instanceof RuntimeException && "Token request failed".equals(throwable.getMessage()))