Skip to content

Commit

Permalink
Addressing review comments from @tishun
Browse files Browse the repository at this point in the history
  • Loading branch information
ggivo committed Dec 19, 2024
1 parent 8c760f1 commit a2a8e10
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

import io.lettuce.core.RedisCredentials;
import io.lettuce.core.RedisCredentialsProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
Expand All @@ -18,42 +20,31 @@

public class TokenBasedRedisCredentialsProvider implements RedisCredentialsProvider, AutoCloseable {

private static final Logger log = LoggerFactory.getLogger(TokenBasedRedisCredentialsProvider.class);

private final TokenManager tokenManager;

private final Sinks.Many<RedisCredentials> credentialsSink = Sinks.many().replay().latest();

public TokenBasedRedisCredentialsProvider(TokenAuthConfig tokenAuthConfig) {
this(new TokenManager(tokenAuthConfig.getIdentityProviderConfig().getProvider(),
tokenAuthConfig.getTokenManagerConfig()));

}

public TokenBasedRedisCredentialsProvider(TokenManager tokenManager) {
private TokenBasedRedisCredentialsProvider(TokenManager tokenManager) {
this.tokenManager = tokenManager;
initializeTokenManager();
}

/**
* Initialize the TokenManager and subscribe to token renewal events.
*/
private void initializeTokenManager() {
private void init() {

TokenListener listener = new TokenListener() {

@Override
public void onTokenRenewed(Token token) {
try {
String username = token.getUser();
char[] pass = token.getValue().toCharArray();
RedisCredentials credentials = RedisCredentials.just(username, pass);
credentialsSink.tryEmitNext(credentials);
} catch (Exception e) {
credentialsSink.emitError(e, Sinks.EmitFailureHandler.FAIL_FAST);
}
String username = token.getUser();
char[] pass = token.getValue().toCharArray();
RedisCredentials credentials = RedisCredentials.just(username, pass);
credentialsSink.tryEmitNext(credentials);
}

@Override
public void onError(Exception exception) {
credentialsSink.tryEmitError(exception);
log.error("Token renew failed!", exception);
}

};
Expand All @@ -62,6 +53,8 @@ public void onError(Exception exception) {
tokenManager.start(listener, false);
} catch (Exception e) {
credentialsSink.tryEmitError(e);
tokenManager.stop();
throw new RuntimeException("Failed to start TokenManager", e);
}
}

Expand Down Expand Up @@ -111,4 +104,15 @@ public void close() {
tokenManager.stop();
}

public static TokenBasedRedisCredentialsProvider create(TokenAuthConfig tokenAuthConfig) {
return create(new TokenManager(tokenAuthConfig.getIdentityProviderConfig().getProvider(),
tokenAuthConfig.getTokenManagerConfig()));
}

public static TokenBasedRedisCredentialsProvider create(TokenManager tokenManager) {
TokenBasedRedisCredentialsProvider credentialManager = new TokenBasedRedisCredentialsProvider(tokenManager);
credentialManager.init();
return credentialManager;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public static void setup() {
.secret(testCtx.getClientSecret()).authority(testCtx.getAuthority()).scopes(testCtx.getRedisScopes())
.expirationRefreshRatio(0.0000001F).build();

credentialsProvider = new TokenBasedRedisCredentialsProvider(tokenAuthConfig);
credentialsProvider = TokenBasedRedisCredentialsProvider.create(tokenAuthConfig);

RedisURI uri = RedisURI.builder().withHost(testCtx.host()).withPort(testCtx.port())
.withAuthentication(credentialsProvider).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public class TokenBasedRedisCredentialsProviderTest {
public void setUp() {
// Use TestToken manager to emit tokens/errors on request
tokenManager = new TestTokenManager(null, null);
credentialsProvider = new TokenBasedRedisCredentialsProvider(tokenManager);
credentialsProvider = TokenBasedRedisCredentialsProvider.create(tokenManager);
}

@Test
Expand Down Expand Up @@ -128,13 +128,17 @@ public void shouldPropagateMultipleTokensOnStream() {
@Test
public void shouldHandleTokenRequestErrorGracefully() {
Exception simulatedError = new RuntimeException("Token request failed");
tokenManager.emitError(simulatedError);

Flux<RedisCredentials> result = credentialsProvider.credentials();

StepVerifier.create(result).expectErrorMatches(
throwable -> throwable instanceof RuntimeException && "Token request failed".equals(throwable.getMessage()))
.verify();
StepVerifier.create(result).then(() -> {
tokenManager.emitToken(testToken("test-user", "token1"));
tokenManager.emitError(simulatedError);
tokenManager.emitToken(testToken("test-user", "token2"));
}).assertNext(credentials -> assertThat(String.valueOf(credentials.getPassword())).isEqualTo("token1"))
.assertNext(credentials -> assertThat(String.valueOf(credentials.getPassword())).isEqualTo("token2"))
.thenCancel().verify(Duration.ofMillis(100));

}

private SimpleToken testToken(String username, String value) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ void tokenBasedCredentialProvider(RedisClient client) {
.reauthenticateBehavior(ClientOptions.ReauthenticateBehavior.ON_NEW_CREDENTIALS).build());

TestTokenManager tokenManager = new TestTokenManager(null, null);
TokenBasedRedisCredentialsProvider credentialsProvider = new TokenBasedRedisCredentialsProvider(tokenManager);
TokenBasedRedisCredentialsProvider credentialsProvider = TokenBasedRedisCredentialsProvider.create(tokenManager);

// Build RedisURI with streaming credentials provider
RedisURI uri = RedisURI.builder().withHost(TestSettings.host()).withPort(TestSettings.port())
Expand Down
4 changes: 2 additions & 2 deletions src/test/java/io/lettuce/examples/TokenBasedAuthExample.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public static void main(String[] args) throws Exception {
TokenAuthConfig tokenAuthConfigUser1 = TokenAuthConfig.builder().tokenRequestExecTimeoutInMs(10000)
.expirationRefreshRatio(0.1f).identityProviderConfig(config1).build();
// Create credentials provider user1
TokenBasedRedisCredentialsProvider credentialsUser1 = new TokenBasedRedisCredentialsProvider(tokenAuthConfigUser1);
TokenBasedRedisCredentialsProvider credentialsUser1 = TokenBasedRedisCredentialsProvider.create(tokenAuthConfigUser1);

// User2
// from redis-authx-entraind
Expand All @@ -58,7 +58,7 @@ public static void main(String[] args) throws Exception {
.expirationRefreshRatio(0.1f).identityProviderConfig(config2).build();
// Create credentials provider user2
// TODO: lettuce-autx-tba ( TokenBasedRedisCredentialsProvider & Example there)
TokenBasedRedisCredentialsProvider credentialsUser2 = new TokenBasedRedisCredentialsProvider(tokenAuthConfigUser2);
TokenBasedRedisCredentialsProvider credentialsUser2 = TokenBasedRedisCredentialsProvider.create(tokenAuthConfigUser2);

// lettuce-core
RedisURI redisURI1 = RedisURI.create(REDIS_URI);
Expand Down

0 comments on commit a2a8e10

Please sign in to comment.