Skip to content

Commit

Permalink
pub/sub test basic functionality with entraid auth
Browse files Browse the repository at this point in the history
  • Loading branch information
ggivo committed Dec 19, 2024
1 parent 8c760f1 commit c4dafe4
Showing 1 changed file with 25 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

import io.lettuce.core.RedisCredentials;
import io.lettuce.core.RedisCredentialsProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
Expand All @@ -18,50 +20,40 @@

public class TokenBasedRedisCredentialsProvider implements RedisCredentialsProvider, AutoCloseable {

private static final Logger log = LoggerFactory.getLogger(TokenBasedRedisCredentialsProvider.class);

private final TokenManager tokenManager;

private final Sinks.Many<RedisCredentials> credentialsSink = Sinks.many().replay().latest();

public TokenBasedRedisCredentialsProvider(TokenAuthConfig tokenAuthConfig) {
this(new TokenManager(tokenAuthConfig.getIdentityProviderConfig().getProvider(),
tokenAuthConfig.getTokenManagerConfig()));

}

public TokenBasedRedisCredentialsProvider(TokenManager tokenManager) {
private TokenBasedRedisCredentialsProvider(TokenManager tokenManager) {
this.tokenManager = tokenManager;
initializeTokenManager();
}

/**
* Initialize the TokenManager and subscribe to token renewal events.
*/
private void initializeTokenManager() {
private void init(){

TokenListener listener = new TokenListener() {

@Override
public void onTokenRenewed(Token token) {
try {
String username = token.getUser();
char[] pass = token.getValue().toCharArray();
RedisCredentials credentials = RedisCredentials.just(username, pass);
credentialsSink.tryEmitNext(credentials);
} catch (Exception e) {
credentialsSink.emitError(e, Sinks.EmitFailureHandler.FAIL_FAST);
}
String username = token.getUser();
char[] pass = token.getValue().toCharArray();
RedisCredentials credentials = RedisCredentials.just(username, pass);
credentialsSink.tryEmitNext(credentials);
}

@Override
public void onError(Exception exception) {
credentialsSink.tryEmitError(exception);
log.error("Token renew failed!", exception);
}

};

try {
tokenManager.start(listener, false);
} catch (Exception e) {
credentialsSink.tryEmitError(e);
tokenManager.stop();
throw new RuntimeException("Failed to start TokenManager", e);
}
}

Expand Down Expand Up @@ -111,4 +103,15 @@ public void close() {
tokenManager.stop();
}

public TokenBasedRedisCredentialsProvider create(TokenAuthConfig tokenAuthConfig) {
return create(new TokenManager(tokenAuthConfig.getIdentityProviderConfig().getProvider(),
tokenAuthConfig.getTokenManagerConfig()));
}

public static TokenBasedRedisCredentialsProvider create(TokenManager tokenManager) {
TokenBasedRedisCredentialsProvider credentialManager = new TokenBasedRedisCredentialsProvider(tokenManager);
credentialManager.init();
return credentialManager;
}

}

0 comments on commit c4dafe4

Please sign in to comment.