Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for StreamingCredentials #3068

Closed
wants to merge 21 commits into from
Closed
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 65 additions & 5 deletions src/main/java/io/lettuce/core/ClientOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ public class ClientOptions implements Serializable {

public static final DisconnectedBehavior DEFAULT_DISCONNECTED_BEHAVIOR = DisconnectedBehavior.DEFAULT;

public static final ReauthenticateBehavior DEFAULT_REAUTHENTICATE_BEHAVIOUR = ReauthenticateBehavior.DEFAULT;

public static final boolean DEFAULT_PUBLISH_ON_SCHEDULER = false;

public static final boolean DEFAULT_PING_BEFORE_ACTIVATE_CONNECTION = true;
Expand Down Expand Up @@ -95,6 +97,8 @@ public class ClientOptions implements Serializable {

private final DisconnectedBehavior disconnectedBehavior;

private final ReauthenticateBehavior reauthenticateBehavior;

private final boolean publishOnScheduler;

private final boolean pingBeforeActivateConnection;
Expand Down Expand Up @@ -124,6 +128,7 @@ protected ClientOptions(Builder builder) {
this.cancelCommandsOnReconnectFailure = builder.cancelCommandsOnReconnectFailure;
this.decodeBufferPolicy = builder.decodeBufferPolicy;
this.disconnectedBehavior = builder.disconnectedBehavior;
this.reauthenticateBehavior = builder.reauthenticateBehavior;
this.publishOnScheduler = builder.publishOnScheduler;
this.pingBeforeActivateConnection = builder.pingBeforeActivateConnection;
this.protocolVersion = builder.protocolVersion;
Expand All @@ -143,6 +148,7 @@ protected ClientOptions(ClientOptions original) {
this.cancelCommandsOnReconnectFailure = original.isCancelCommandsOnReconnectFailure();
this.decodeBufferPolicy = original.getDecodeBufferPolicy();
this.disconnectedBehavior = original.getDisconnectedBehavior();
this.reauthenticateBehavior = original.getReauthenticateBehaviour();
this.publishOnScheduler = original.isPublishOnScheduler();
this.pingBeforeActivateConnection = original.isPingBeforeActivateConnection();
this.protocolVersion = original.getConfiguredProtocolVersion();
Expand Down Expand Up @@ -220,6 +226,8 @@ public static class Builder {

private TimeoutOptions timeoutOptions = DEFAULT_TIMEOUT_OPTIONS;

private ReauthenticateBehavior reauthenticateBehavior = DEFAULT_REAUTHENTICATE_BEHAVIOUR;

private boolean useHashIndexedQueue = DEFAULT_USE_HASH_INDEX_QUEUE;

protected Builder() {
Expand Down Expand Up @@ -301,6 +309,13 @@ public Builder disconnectedBehavior(DisconnectedBehavior disconnectedBehavior) {
return this;
}

public Builder reauthenticateBehavior(ReauthenticateBehavior reauthenticateBehavior) {

LettuceAssert.notNull(reauthenticateBehavior, "ReuthenticatBehavior must not be null");
this.reauthenticateBehavior = reauthenticateBehavior;
return this;
}

/**
* Perform a lightweight {@literal PING} connection handshake when establishing a Redis connection. If {@code true}
* (default is {@code true}, {@link #DEFAULT_PING_BEFORE_ACTIVATE_CONNECTION}), every connection and reconnect will
Expand Down Expand Up @@ -505,11 +520,12 @@ public ClientOptions.Builder mutate() {

builder.autoReconnect(isAutoReconnect()).cancelCommandsOnReconnectFailure(isCancelCommandsOnReconnectFailure())
.decodeBufferPolicy(getDecodeBufferPolicy()).disconnectedBehavior(getDisconnectedBehavior())
.readOnlyCommands(getReadOnlyCommands()).publishOnScheduler(isPublishOnScheduler())
.pingBeforeActivateConnection(isPingBeforeActivateConnection()).protocolVersion(getConfiguredProtocolVersion())
.requestQueueSize(getRequestQueueSize()).scriptCharset(getScriptCharset()).jsonParser(getJsonParser())
.socketOptions(getSocketOptions()).sslOptions(getSslOptions())
.suspendReconnectOnProtocolFailure(isSuspendReconnectOnProtocolFailure()).timeoutOptions(getTimeoutOptions());
.reauthenticateBehavior(getReauthenticateBehaviour()).readOnlyCommands(getReadOnlyCommands())
.publishOnScheduler(isPublishOnScheduler()).pingBeforeActivateConnection(isPingBeforeActivateConnection())
.protocolVersion(getConfiguredProtocolVersion()).requestQueueSize(getRequestQueueSize())
.scriptCharset(getScriptCharset()).jsonParser(getJsonParser()).socketOptions(getSocketOptions())
.sslOptions(getSslOptions()).suspendReconnectOnProtocolFailure(isSuspendReconnectOnProtocolFailure())
.timeoutOptions(getTimeoutOptions());

return builder;
}
Expand Down Expand Up @@ -573,6 +589,10 @@ public DisconnectedBehavior getDisconnectedBehavior() {
return disconnectedBehavior;
}

public ReauthenticateBehavior getReauthenticateBehaviour() {
return reauthenticateBehavior;
}

/**
* Predicate to identify commands as read-only. Defaults to {@link #DEFAULT_READ_ONLY_COMMANDS}.
*
Expand Down Expand Up @@ -704,6 +724,46 @@ public TimeoutOptions getTimeoutOptions() {
return timeoutOptions;
}

/**
* Defines the re-authentication behavior of the Redis client in relation to the {@link CredentialsProvider}.
*/
public enum ReauthenticateBehavior {

/**
* This is the default behavior. The client will fetch current credentials from the underlying
* {@link RedisCredentialsProvider} only when required.
*
* <p>
* No re-authentication is performed automatically when new credentials are emitted by the
* {@link StreamingCredentialsProvider} .
* </p>
*
* <p>
* This behavior is suitable for use cases with static credentials or where explicit reconnection is required to change
* credentials.
* </p>
*/
DEFAULT,

/**
* Automatically triggers re-authentication whenever new credentials are emitted by the
* {@link StreamingCredentialsProvider} or any other credentials manager.
*
* <p>
* When enabled, the client subscribes to the credentials stream provided by the {@link StreamingCredentialsProvider}
* (or other compatible sources) and issues an {@code AUTH} command to the Redis server each time new credentials are
* received. This behavior supports dynamic credential scenarios, such as token-based authentication, or credential
* rotation where credentials are refreshed periodically to maintain access.
* </p>
*
* <p>
* Note: {@code AUTH} commands issued as part of this behavior may interleave with user-submitted commands, as the
* client performs re-authentication independently of user command flow.
* </p>
*/
ON_NEW_CREDENTIALS
}

/**
* Whether we should use hash indexed queue, which provides O(1) remove(Object)
*
Expand Down
149 changes: 149 additions & 0 deletions src/main/java/io/lettuce/core/RedisAuthenticationHandler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
/*
* Copyright 2019-Present, Redis Ltd. and Contributors
* All rights reserved.
*
* Licensed under the MIT License.
*
* This file contains contributions from third-party contributors
* licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.lettuce.core;

import io.lettuce.core.codec.StringCodec;
import io.lettuce.core.event.EventBus;
import io.lettuce.core.event.connection.ReauthenticateEvent;
import io.lettuce.core.event.connection.ReauthenticateFailedEvent;
import io.lettuce.core.internal.LettuceAssert;
import io.lettuce.core.protocol.AsyncCommand;
import io.lettuce.core.protocol.Endpoint;
import io.lettuce.core.protocol.ProtocolVersion;
import io.lettuce.core.protocol.RedisCommand;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;

import java.nio.CharBuffer;
import java.util.concurrent.atomic.AtomicReference;

/**
* Redis authentication handler. Internally used to authenticate a Redis connection. This class is part of the internal API.
*
* @author Ivo Gaydazhiev
* @since 6.6.0
*/
public class RedisAuthenticationHandler {

private static final InternalLogger log = InternalLoggerFactory.getInstance(RedisAuthenticationHandler.class);

private final StatefulRedisConnectionImpl<?, ?> connection;

private final RedisCredentialsProvider credentialsProvider;

private final AtomicReference<Disposable> credentialsSubscription = new AtomicReference<>();

private final Boolean isPubSubConnection;

public RedisAuthenticationHandler(StatefulRedisConnectionImpl<?, ?> connection,
RedisCredentialsProvider credentialsProvider, Boolean isPubSubConnection) {
this.connection = connection;
this.credentialsProvider = credentialsProvider;
this.isPubSubConnection = isPubSubConnection;
}

/**
* This method subscribes to a stream of credentials provided by the `StreamingCredentialsProvider`.
* <p>
* Each time new credentials are received, the client is re-authenticated. The previous subscription, if any, is disposed of
* before setting the new subscription.
*/
public void subscribe() {
if (credentialsProvider == null) {
return;
}

if (credentialsProvider instanceof StreamingCredentialsProvider) {
if (!isSupportedConnection()) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are we good with the behavior when its configured with StreamingCredentials but doesnt work that way. It looks like it happens silently and nobody knows about it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I missed that one. WIll add a warning to the log.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On second thought, this is expected. We can have a streaming-enabled credentials provider and disabled the reauthentication

return;
}

Flux<RedisCredentials> credentialsFlux = ((StreamingCredentialsProvider) credentialsProvider).credentials();

Disposable subscription = credentialsFlux.subscribe(this::onNext, this::onError, this::complete);

Disposable oldSubscription = credentialsSubscription.getAndSet(subscription);
if (oldSubscription != null && !oldSubscription.isDisposed()) {
oldSubscription.dispose();
}
}
}

/**
* Unsubscribes from the current credentials stream.
*/
public void unsubscribe() {
Disposable subscription = credentialsSubscription.getAndSet(null);
if (subscription != null && !subscription.isDisposed()) {
subscription.dispose();
}
}

protected void complete() {
log.debug("Credentials stream completed");
}

protected void onNext(RedisCredentials credentials) {
reauthenticate(credentials);
}

protected void onError(Throwable e) {
log.error("Credentials renew failed.", e);
publishReauthFailedEvent(e);
}

/**
* Performs re-authentication with the provided credentials.
*
* @param credentials the new credentials
*/
protected void reauthenticate(RedisCredentials credentials) {
connection.setCredentials(credentials);
}

protected boolean isSupportedConnection() {
if (isPubSubConnection && ProtocolVersion.RESP2 == connection.getConnectionState().getNegotiatedProtocolVersion()) {
log.warn("Renewable credentials are not supported with RESP2 protocol on a pub/sub connection.");
return false;
}
return true;
}

private void publishReauthFailedEvent(Throwable throwable) {
connection.getResources().eventBus().publish(new ReauthenticateFailedEvent(throwable));
}

public static boolean isSupported(ClientOptions clientOptions) {
ggivo marked this conversation as resolved.
Show resolved Hide resolved
LettuceAssert.notNull(clientOptions, "ClientOptions must not be null");
switch (clientOptions.getReauthenticateBehaviour()) {
case ON_NEW_CREDENTIALS:
return true;

case DEFAULT:
return false;

default:
return false;
}
}

}
13 changes: 8 additions & 5 deletions src/main/java/io/lettuce/core/RedisClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import io.lettuce.core.internal.ExceptionFactory;
import io.lettuce.core.internal.Futures;
import io.lettuce.core.internal.LettuceAssert;
import io.lettuce.core.json.JsonParser;
import io.lettuce.core.masterreplica.MasterReplica;
import io.lettuce.core.protocol.CommandExpiryWriter;
import io.lettuce.core.protocol.CommandHandler;
Expand Down Expand Up @@ -288,8 +287,9 @@ private <K, V> ConnectionFuture<StatefulRedisConnection<K, V>> connectStandalone
}

StatefulRedisConnectionImpl<K, V> connection = newStatefulRedisConnection(writer, endpoint, codec, timeout);

ConnectionFuture<StatefulRedisConnection<K, V>> future = connectStatefulAsync(connection, endpoint, redisURI,
() -> new CommandHandler(getOptions(), getResources(), endpoint));
() -> new CommandHandler(getOptions(), getResources(), endpoint), false);

future.whenComplete((channelHandler, throwable) -> {

Expand All @@ -303,7 +303,7 @@ private <K, V> ConnectionFuture<StatefulRedisConnection<K, V>> connectStandalone

@SuppressWarnings("unchecked")
private <K, V, S> ConnectionFuture<S> connectStatefulAsync(StatefulRedisConnectionImpl<K, V> connection, Endpoint endpoint,
RedisURI redisURI, Supplier<CommandHandler> commandHandlerSupplier) {
RedisURI redisURI, Supplier<CommandHandler> commandHandlerSupplier, Boolean isPubSub) {

ConnectionBuilder connectionBuilder;
if (redisURI.isSsl()) {
Expand All @@ -317,7 +317,10 @@ private <K, V, S> ConnectionFuture<S> connectStatefulAsync(StatefulRedisConnecti
ConnectionState state = connection.getConnectionState();
state.apply(redisURI);
state.setDb(redisURI.getDatabase());

if (RedisAuthenticationHandler.isSupported(getOptions())) {
connection.setAuthenticationHandler(
new RedisAuthenticationHandler(connection, redisURI.getCredentialsProvider(), isPubSub));
}
connectionBuilder.connection(connection);
connectionBuilder.clientOptions(getOptions());
connectionBuilder.clientResources(getResources());
Expand Down Expand Up @@ -421,7 +424,7 @@ private <K, V> ConnectionFuture<StatefulRedisPubSubConnection<K, V>> connectPubS
StatefulRedisPubSubConnectionImpl<K, V> connection = newStatefulRedisPubSubConnection(endpoint, writer, codec, timeout);

ConnectionFuture<StatefulRedisPubSubConnection<K, V>> future = connectStatefulAsync(connection, endpoint, redisURI,
() -> new PubSubCommandHandler<>(getOptions(), getResources(), codec, endpoint));
() -> new PubSubCommandHandler<>(getOptions(), getResources(), codec, endpoint), true);

return future.whenComplete((conn, throwable) -> {

Expand Down
Loading
Loading