Skip to content

Commit

Permalink
Support for StreamingCredentials
Browse files Browse the repository at this point in the history
     This enables use cases like credential rotation and token based auth without client disconnect. Especially with Pub/Sub clients will reduce the chnance of missing events.
  • Loading branch information
ggivo committed Dec 2, 2024
1 parent 28a4154 commit 91871b6
Show file tree
Hide file tree
Showing 9 changed files with 534 additions and 0 deletions.
117 changes: 117 additions & 0 deletions src/main/java/io/lettuce/core/BaseRedisAuthenticationHandler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package io.lettuce.core;

import io.lettuce.core.codec.StringCodec;
import io.lettuce.core.protocol.AsyncCommand;
import io.lettuce.core.protocol.RedisCommand;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;

import java.nio.CharBuffer;
import java.util.concurrent.atomic.AtomicReference;

public abstract class BaseRedisAuthenticationHandler<T extends RedisChannelHandler<?, ?>> {

private static final InternalLogger log = InternalLoggerFactory.getInstance(BaseRedisAuthenticationHandler.class);

protected final T connection;

private final RedisCommandBuilder<String, String> commandBuilder = new RedisCommandBuilder<>(StringCodec.UTF8);

private final AtomicReference<Disposable> credentialsSubscription = new AtomicReference<>();

public BaseRedisAuthenticationHandler(T connection) {
this.connection = connection;
}

/**
* Subscribes to the provided `Flux` of credentials if the given `RedisCredentialsProvider` supports streaming credentials.
* <p>
* This method subscribes to a stream of credentials provided by the `StreamingCredentialsProvider`. Each time new
* credentials are received, the client is reauthenticated. If the connection is not supported, the method returns without
* subscribing.
* <p>
* The previous subscription, if any, is disposed of before setting the new subscription.
*
* @param credentialsProvider the credentials provider to subscribe to
*/
public void subscribe(RedisCredentialsProvider credentialsProvider) {
if (credentialsProvider == null) {
return;
}

if (credentialsProvider instanceof StreamingCredentialsProvider) {
if (!isSupportedConnection()) {
return;
}

Flux<RedisCredentials> credentialsFlux = ((StreamingCredentialsProvider) credentialsProvider).credentials();

Disposable subscription = credentialsFlux.subscribe(this::onNext, this::onError, this::complete);

Disposable oldSubscription = credentialsSubscription.getAndSet(subscription);
if (oldSubscription != null && !oldSubscription.isDisposed()) {
oldSubscription.dispose();
}
}
}

/**
* Unsubscribes from the current credentials stream.
*/
public void unsubscribe() {
Disposable subscription = credentialsSubscription.getAndSet(null);
if (subscription != null && !subscription.isDisposed()) {
subscription.dispose();
}
}

protected void complete() {
log.debug("Credentials stream completed");
}

protected void onNext(RedisCredentials credentials) {
reauthenticate(credentials);
}

protected void onError(Throwable e) {
log.error("Credentials renew failed.", e);
}

/**
* Performs re-authentication with the provided credentials.
*
* @param credentials the new credentials
*/
private void reauthenticate(RedisCredentials credentials) {
CharSequence password = CharBuffer.wrap(credentials.getPassword());

AsyncCommand<String, String, String> authCmd;
if (credentials.hasUsername()) {
authCmd = new AsyncCommand<>(commandBuilder.auth(credentials.getUsername(), password));
} else {
authCmd = new AsyncCommand<>(commandBuilder.auth(password));
}

dispatchAuth(authCmd).exceptionally(throwable -> {
log.error("Re-authentication {} failed.", credentials.hasUsername() ? "with username" : "without username",
throwable);
return null;
});
}

protected boolean isSupportedConnection() {
return true;
}

private AsyncCommand<String, String, String> dispatchAuth(RedisCommand<String, String, String> authCommand) {
AsyncCommand<String, String, String> asyncCommand = new AsyncCommand<>(authCommand);
RedisCommand<String, String, String> dispatched = connection.getChannelWriter().write(asyncCommand);
if (dispatched instanceof AsyncCommand) {
return (AsyncCommand<String, String, String>) dispatched;
}
return asyncCommand;
}

}
44 changes: 44 additions & 0 deletions src/main/java/io/lettuce/core/RedisAuthenticationHandler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright 2019-Present, Redis Ltd. and Contributors
* All rights reserved.
*
* Licensed under the MIT License.
*
* This file contains contributions from third-party contributors
* licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.lettuce.core;

import io.lettuce.core.protocol.ProtocolVersion;
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;

class RedisAuthenticationHandler extends BaseRedisAuthenticationHandler<StatefulRedisConnectionImpl<?, ?>> {

private static final InternalLogger logger = InternalLoggerFactory.getInstance(RedisAuthenticationHandler.class);

public RedisAuthenticationHandler(StatefulRedisConnectionImpl<?, ?> connection) {
super(connection);
}

protected boolean isSupportedConnection() {
if (connection instanceof StatefulRedisPubSubConnection
&& ProtocolVersion.RESP2 == connection.getConnectionState().getNegotiatedProtocolVersion()) {
logger.warn("Renewable credentials are not supported with RESP2 protocol on a pub/sub connection.");
return false;
}
return true;
}

}
16 changes: 16 additions & 0 deletions src/main/java/io/lettuce/core/StatefulRedisConnectionImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ public class StatefulRedisConnectionImpl<K, V> extends RedisChannelHandler<K, V>

private final PushHandler pushHandler;

private final RedisAuthenticationHandler authHandler;

private final Mono<JsonParser> parser;

protected MultiOutput<K, V> multi;
Expand Down Expand Up @@ -104,6 +106,8 @@ public StatefulRedisConnectionImpl(RedisChannelWriter writer, PushHandler pushHa
this.async = newRedisAsyncCommandsImpl();
this.sync = newRedisSyncCommandsImpl();
this.reactive = newRedisReactiveCommandsImpl();

this.authHandler = new RedisAuthenticationHandler(this);
}

public RedisCodec<K, V> getCodec() {
Expand Down Expand Up @@ -315,4 +319,16 @@ public ConnectionState getConnectionState() {
return state;
}

@Override
public void activated() {
super.activated();
authHandler.subscribe(state.getCredentialsProvider());
}

@Override
public void deactivated() {
authHandler.unsubscribe();
super.deactivated();
}

}
15 changes: 15 additions & 0 deletions src/main/java/io/lettuce/core/StreamingCredentialsProvider.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package io.lettuce.core;

import reactor.core.publisher.Flux;

public interface StreamingCredentialsProvider extends RedisCredentialsProvider {

/**
* Returns a {@link Flux} emitting {@link RedisCredentials} that can be used to authorize a Redis connection. This
* credential provider supports streaming credentials, meaning that it can emit multiple credentials over time.
*
* @return
*/
Flux<RedisCredentials> credentials();

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright 2019-Present, Redis Ltd. and Contributors
* All rights reserved.
*
* Licensed under the MIT License.
*
* This file contains contributions from third-party contributors
* licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.lettuce.core.cluster;

import io.lettuce.core.BaseRedisAuthenticationHandler;
import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection;
import io.lettuce.core.protocol.ProtocolVersion;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;

class RedisClusterAuthenticationHandler extends BaseRedisAuthenticationHandler<StatefulRedisClusterConnectionImpl<?, ?>> {

private static final InternalLogger logger = InternalLoggerFactory.getInstance(RedisClusterAuthenticationHandler.class);

public RedisClusterAuthenticationHandler(StatefulRedisClusterConnectionImpl<?, ?> connection) {
super(connection);
}

protected boolean isSupportedConnection() {
if (connection instanceof StatefulRedisClusterPubSubConnection
&& ProtocolVersion.RESP2 == connection.getConnectionState().getNegotiatedProtocolVersion()) {
logger.warn("Renewable credentials are not supported with RESP2 protocol on a pub/sub connection.");
return false;
}
return true;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ public class StatefulRedisClusterConnectionImpl<K, V> extends RedisChannelHandle

private volatile Partitions partitions;

private final RedisClusterAuthenticationHandler authHandler;

/**
* Initialize a new connection.
*
Expand Down Expand Up @@ -123,6 +125,8 @@ public StatefulRedisClusterConnectionImpl(RedisChannelWriter writer, ClusterPush
this.async = newRedisAdvancedClusterAsyncCommandsImpl();
this.sync = newRedisAdvancedClusterCommandsImpl();
this.reactive = newRedisAdvancedClusterReactiveCommandsImpl();

this.authHandler = new RedisClusterAuthenticationHandler(this);
}

protected RedisAdvancedClusterReactiveCommandsImpl<K, V> newRedisAdvancedClusterReactiveCommandsImpl() {
Expand Down Expand Up @@ -230,6 +234,12 @@ public void activated() {
super.activated();

async.clusterMyId().thenAccept(connectionState::setNodeId);
authHandler.subscribe(connectionState.getCredentialsProvider());
}

@Override
public void deactivated() {
authHandler.unsubscribe();
}

ClusterDistributionChannelWriter getClusterDistributionChannelWriter() {
Expand Down
Loading

0 comments on commit 91871b6

Please sign in to comment.