Skip to content

Commit

Permalink
dispath using connection handler
Browse files Browse the repository at this point in the history
  • Loading branch information
ggivo committed Dec 10, 2024
1 parent 631d420 commit 6f46022
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 30 deletions.
4 changes: 2 additions & 2 deletions src/main/java/io/lettuce/core/ConnectionBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,8 @@ public void registerAuthenticationHandler(RedisCredentialsProvider credentialsPr
LettuceAssert.assertState(connection != null, "Connection must be set");
LettuceAssert.assertState(clientResources != null, "ClientResources must be set");

RedisAuthenticationHandler authenticationHandler = new RedisAuthenticationHandler(connection.getChannelWriter(),
credentialsProvider, state, clientResources.eventBus(), isPubSubConnection);
RedisAuthenticationHandler authenticationHandler = new RedisAuthenticationHandler(connection, credentialsProvider,
state, clientResources.eventBus(), isPubSubConnection);
endpoint.registerAuthenticationHandler(authenticationHandler);
}

Expand Down
18 changes: 9 additions & 9 deletions src/main/java/io/lettuce/core/RedisAuthenticationHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public class RedisAuthenticationHandler {

private static final InternalLogger log = InternalLoggerFactory.getInstance(RedisAuthenticationHandler.class);

private final RedisChannelWriter writer;
private final RedisChannelHandler<?, ?> connection;

private final ConnectionState state;

Expand All @@ -60,9 +60,9 @@ public class RedisAuthenticationHandler {

private final Boolean isPubSubConnection;

public RedisAuthenticationHandler(RedisChannelWriter writer, RedisCredentialsProvider credentialsProvider,
public RedisAuthenticationHandler(RedisChannelHandler<?, ?> connection, RedisCredentialsProvider credentialsProvider,
ConnectionState state, EventBus eventBus, Boolean isPubSubConnection) {
this.writer = writer;
this.connection = connection;
this.state = state;
this.credentialsProvider = credentialsProvider;
this.eventBus = eventBus;
Expand Down Expand Up @@ -144,11 +144,11 @@ protected void reauthenticate(RedisCredentials credentials) {
});
}

private AsyncCommand<String, String, String> dispatchAuth(RedisCommand<String, String, String> authCommand) {
AsyncCommand<String, String, String> asyncCommand = new AsyncCommand<>(authCommand);
RedisCommand<String, String, String> dispatched = writer.write(asyncCommand);
private AsyncCommand<?, ?, ?> dispatchAuth(RedisCommand<?, ?, ?> authCommand) {
AsyncCommand asyncCommand = new AsyncCommand<>(authCommand);
RedisCommand<?, ?, ?> dispatched = connection.dispatch(asyncCommand);
if (dispatched instanceof AsyncCommand) {
return (AsyncCommand<String, String, String>) dispatched;
return (AsyncCommand<?, ?, ?>) dispatched;
}
return asyncCommand;
}
Expand All @@ -170,8 +170,8 @@ protected boolean isSupportedConnection() {
}

private String getEpid() {
if (writer instanceof Endpoint) {
return ((Endpoint) writer).getId();
if (connection.getChannelWriter() instanceof Endpoint) {
return ((Endpoint) connection.getChannelWriter()).getId();
}
return "unknown";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.stream.Collectors;

Expand Down
36 changes: 18 additions & 18 deletions src/test/java/io/lettuce/core/RedisAuthenticationHandlerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

public class RedisAuthenticationHandlerTest {

private RedisChannelWriter channelWriter;
private RedisChannelHandler<String, String> channelHandler;

EventBus eventBus;

Expand All @@ -34,7 +34,7 @@ public class RedisAuthenticationHandlerTest {
@BeforeEach
void setUp() {
eventBus = new DefaultEventBus(Schedulers.immediate());
channelWriter = mock(RedisChannelWriter.class);
channelHandler = mock(RedisChannelHandler.class);
connectionState = mock(ConnectionState.class);
}

Expand All @@ -43,17 +43,17 @@ void setUp() {
void subscribeWithStreamingCredentialsProviderInvokesReauth() {
MyStreamingRedisCredentialsProvider credentialsProvider = new MyStreamingRedisCredentialsProvider();

RedisAuthenticationHandler handler = new RedisAuthenticationHandler(channelWriter, credentialsProvider, connectionState,
eventBus, false);
RedisAuthenticationHandler handler = new RedisAuthenticationHandler(channelHandler, credentialsProvider,
connectionState, eventBus, false);

// Subscribe to the provider
handler.subscribe();
credentialsProvider.emitCredentials("newuser", "newpassword".toCharArray());

ArgumentCaptor<RedisCommand<Object, Object, Object>> captor = ArgumentCaptor.forClass(RedisCommand.class);
verify(channelWriter).write(captor.capture());
ArgumentCaptor<RedisCommand<String, String, Object>> captor = ArgumentCaptor.forClass(RedisCommand.class);
verify(channelHandler).dispatch(captor.capture());

RedisCommand<Object, Object, Object> capturedCommand = captor.getValue();
RedisCommand<String, String, Object> capturedCommand = captor.getValue();
assertThat(capturedCommand.getType()).isEqualTo(CommandType.AUTH);
assertThat(capturedCommand.getArgs().toCommandString()).contains("newuser");
assertThat(capturedCommand.getArgs().toCommandString()).contains("newpassword");
Expand All @@ -65,10 +65,10 @@ void subscribeWithStreamingCredentialsProviderInvokesReauth() {
void shouldHandleErrorInCredentialsStream() {
MyStreamingRedisCredentialsProvider credentialsProvider = new MyStreamingRedisCredentialsProvider();

RedisAuthenticationHandler handler = new RedisAuthenticationHandler(channelWriter, credentialsProvider, connectionState,
eventBus, false);
RedisAuthenticationHandler handler = new RedisAuthenticationHandler(channelHandler, credentialsProvider,
connectionState, eventBus, false);

verify(channelWriter, times(0)).write(any(RedisCommand.class)); // No command should be sent
verify(channelHandler, times(0)).dispatch(any(RedisCommand.class)); // No command should be sent

// Verify the event was published
StepVerifier.create(eventBus.get()).then(() -> {
Expand All @@ -84,8 +84,8 @@ void shouldNotSubscribeIfConnectionIsNotSupported() {
StreamingCredentialsProvider credentialsProvider = mock(StreamingCredentialsProvider.class);

when(connectionState.getNegotiatedProtocolVersion()).thenReturn(ProtocolVersion.RESP2);
RedisAuthenticationHandler handler = new RedisAuthenticationHandler(channelWriter, credentialsProvider, connectionState,
eventBus, true);
RedisAuthenticationHandler handler = new RedisAuthenticationHandler(channelHandler, credentialsProvider,
connectionState, eventBus, true);

// Subscribe to the provider (it should not subscribe due to unsupported connection)
handler.subscribe();
Expand All @@ -96,24 +96,24 @@ void shouldNotSubscribeIfConnectionIsNotSupported() {

@Test
void testIsSupportedConnectionWithRESP2ProtocolOnPubSubConnection() {
RedisChannelWriter writer = mock(RedisChannelWriter.class);
RedisChannelHandler<?, ?> connection = mock(RedisChannelHandler.class);

ConnectionState connectionState = mock(ConnectionState.class);
when(connectionState.getNegotiatedProtocolVersion()).thenReturn(ProtocolVersion.RESP2);

RedisAuthenticationHandler handler = new RedisAuthenticationHandler(writer, mock(RedisCredentialsProvider.class),
RedisAuthenticationHandler handler = new RedisAuthenticationHandler(connection, mock(RedisCredentialsProvider.class),
connectionState, mock(EventBus.class), true);

assertFalse(handler.isSupportedConnection());
}

@Test
void testIsSupportedConnectionWithNonPubSubConnection() {
RedisChannelWriter writer = mock(RedisChannelWriter.class);
RedisChannelHandler<?, ?> connection = mock(RedisChannelHandler.class);
ConnectionState connectionState = mock(ConnectionState.class);
when(connectionState.getNegotiatedProtocolVersion()).thenReturn(ProtocolVersion.RESP2);

RedisAuthenticationHandler handler = new RedisAuthenticationHandler(writer, mock(RedisCredentialsProvider.class),
RedisAuthenticationHandler handler = new RedisAuthenticationHandler(connection, mock(RedisCredentialsProvider.class),
connectionState, mock(EventBus.class), false);

assertTrue(handler.isSupportedConnection());
Expand All @@ -122,11 +122,11 @@ void testIsSupportedConnectionWithNonPubSubConnection() {
@Test
void testIsSupportedConnectionWithRESP3ProtocolOnPubSubConnection() {

RedisChannelWriter writer = mock(RedisChannelWriter.class);
RedisChannelHandler<?, ?> connection = mock(RedisChannelHandler.class);
ConnectionState connectionState = mock(ConnectionState.class);
when(connectionState.getNegotiatedProtocolVersion()).thenReturn(ProtocolVersion.RESP3);

RedisAuthenticationHandler handler = new RedisAuthenticationHandler(writer, mock(RedisCredentialsProvider.class),
RedisAuthenticationHandler handler = new RedisAuthenticationHandler(connection, mock(RedisCredentialsProvider.class),
connectionState, mock(EventBus.class), true);

assertTrue(handler.isSupportedConnection());
Expand Down

0 comments on commit 6f46022

Please sign in to comment.