-
Notifications
You must be signed in to change notification settings - Fork 986
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support for StreamingCredentials #3068
Closed
Closed
Changes from 16 commits
Commits
Show all changes
21 commits
Select commit
Hold shift + click to select a range
91871b6
Support for StreamingCredentials
ggivo e9d4d63
Tests & publish ReauthEvent
ggivo 820ffff
Clean up & Format & Add ReauthenticateEvent test
ggivo 5858286
Conditionally enable connection reauthentication based on client setting
ggivo 779edca
Client setting for enabling reauthentication
ggivo 21bf696
formating
ggivo c8ca829
Merge branch 'main' into streaming-auth
ggivo f3aef04
resolve conflict with main
ggivo 631d420
format
ggivo 6f46022
dispath using connection handler
ggivo b32f84c
Support multi with re-auth
ggivo 7eaaf6b
Fix EndpointId missing in events
ggivo 086ccf3
format
ggivo 61158f2
Add unit tests for setCredenatials
ggivo 9a0e513
Skip preProcessing of auth command to avoid replacing the credential …
ggivo 6ec2846
clean up - remove dead code
ggivo 110eb1a
Moved almost all code inside the new handler
tishun 8e9ab48
fix inTransaction lock with dispatch command batch
ggivo 746dd82
Remove StreamingCredentialsProvider interface.
ggivo 31341f1
Add authentication handler to ClusterPubSub connections
ggivo 6891281
Merge branch 'main' into streaming-auth
ggivo File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
149 changes: 149 additions & 0 deletions
149
src/main/java/io/lettuce/core/RedisAuthenticationHandler.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,149 @@ | ||
/* | ||
* Copyright 2019-Present, Redis Ltd. and Contributors | ||
* All rights reserved. | ||
* | ||
* Licensed under the MIT License. | ||
* | ||
* This file contains contributions from third-party contributors | ||
* licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* https://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package io.lettuce.core; | ||
|
||
import io.lettuce.core.codec.StringCodec; | ||
import io.lettuce.core.event.EventBus; | ||
import io.lettuce.core.event.connection.ReauthenticateEvent; | ||
import io.lettuce.core.event.connection.ReauthenticateFailedEvent; | ||
import io.lettuce.core.internal.LettuceAssert; | ||
import io.lettuce.core.protocol.AsyncCommand; | ||
import io.lettuce.core.protocol.Endpoint; | ||
import io.lettuce.core.protocol.ProtocolVersion; | ||
import io.lettuce.core.protocol.RedisCommand; | ||
import io.netty.util.internal.logging.InternalLogger; | ||
import io.netty.util.internal.logging.InternalLoggerFactory; | ||
import reactor.core.Disposable; | ||
import reactor.core.publisher.Flux; | ||
|
||
import java.nio.CharBuffer; | ||
import java.util.concurrent.atomic.AtomicReference; | ||
|
||
/** | ||
* Redis authentication handler. Internally used to authenticate a Redis connection. This class is part of the internal API. | ||
* | ||
* @author Ivo Gaydazhiev | ||
* @since 6.6.0 | ||
*/ | ||
public class RedisAuthenticationHandler { | ||
|
||
private static final InternalLogger log = InternalLoggerFactory.getInstance(RedisAuthenticationHandler.class); | ||
|
||
private final StatefulRedisConnectionImpl<?, ?> connection; | ||
|
||
private final RedisCredentialsProvider credentialsProvider; | ||
|
||
private final AtomicReference<Disposable> credentialsSubscription = new AtomicReference<>(); | ||
|
||
private final Boolean isPubSubConnection; | ||
|
||
public RedisAuthenticationHandler(StatefulRedisConnectionImpl<?, ?> connection, | ||
RedisCredentialsProvider credentialsProvider, Boolean isPubSubConnection) { | ||
this.connection = connection; | ||
this.credentialsProvider = credentialsProvider; | ||
this.isPubSubConnection = isPubSubConnection; | ||
} | ||
|
||
/** | ||
* This method subscribes to a stream of credentials provided by the `StreamingCredentialsProvider`. | ||
* <p> | ||
* Each time new credentials are received, the client is re-authenticated. The previous subscription, if any, is disposed of | ||
* before setting the new subscription. | ||
*/ | ||
public void subscribe() { | ||
if (credentialsProvider == null) { | ||
return; | ||
} | ||
|
||
if (credentialsProvider instanceof StreamingCredentialsProvider) { | ||
if (!isSupportedConnection()) { | ||
return; | ||
} | ||
|
||
Flux<RedisCredentials> credentialsFlux = ((StreamingCredentialsProvider) credentialsProvider).credentials(); | ||
|
||
Disposable subscription = credentialsFlux.subscribe(this::onNext, this::onError, this::complete); | ||
|
||
Disposable oldSubscription = credentialsSubscription.getAndSet(subscription); | ||
if (oldSubscription != null && !oldSubscription.isDisposed()) { | ||
oldSubscription.dispose(); | ||
} | ||
} | ||
} | ||
|
||
/** | ||
* Unsubscribes from the current credentials stream. | ||
*/ | ||
public void unsubscribe() { | ||
Disposable subscription = credentialsSubscription.getAndSet(null); | ||
if (subscription != null && !subscription.isDisposed()) { | ||
subscription.dispose(); | ||
} | ||
} | ||
|
||
protected void complete() { | ||
log.debug("Credentials stream completed"); | ||
} | ||
|
||
protected void onNext(RedisCredentials credentials) { | ||
reauthenticate(credentials); | ||
} | ||
|
||
protected void onError(Throwable e) { | ||
log.error("Credentials renew failed.", e); | ||
publishReauthFailedEvent(e); | ||
} | ||
|
||
/** | ||
* Performs re-authentication with the provided credentials. | ||
* | ||
* @param credentials the new credentials | ||
*/ | ||
protected void reauthenticate(RedisCredentials credentials) { | ||
connection.setCredentials(credentials); | ||
} | ||
|
||
protected boolean isSupportedConnection() { | ||
if (isPubSubConnection && ProtocolVersion.RESP2 == connection.getConnectionState().getNegotiatedProtocolVersion()) { | ||
log.warn("Renewable credentials are not supported with RESP2 protocol on a pub/sub connection."); | ||
return false; | ||
} | ||
return true; | ||
} | ||
|
||
private void publishReauthFailedEvent(Throwable throwable) { | ||
connection.getResources().eventBus().publish(new ReauthenticateFailedEvent(throwable)); | ||
} | ||
|
||
public static boolean isSupported(ClientOptions clientOptions) { | ||
ggivo marked this conversation as resolved.
Show resolved
Hide resolved
|
||
LettuceAssert.notNull(clientOptions, "ClientOptions must not be null"); | ||
switch (clientOptions.getReauthenticateBehaviour()) { | ||
case ON_NEW_CREDENTIALS: | ||
return true; | ||
|
||
case DEFAULT: | ||
return false; | ||
|
||
default: | ||
return false; | ||
} | ||
} | ||
|
||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are we good with the behavior when its configured with StreamingCredentials but doesnt work that way. It looks like it happens silently and nobody knows about it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I missed that one. WIll add a warning to the log.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On second thought, this is expected. We can have a streaming-enabled credentials provider and disabled the reauthentication