From 6f460224a21bcd49521783ec85555847ec96f679 Mon Sep 17 00:00:00 2001 From: ggivo Date: Tue, 10 Dec 2024 07:13:12 +0200 Subject: [PATCH] dispath using connection handler --- .../io/lettuce/core/ConnectionBuilder.java | 4 +-- .../core/RedisAuthenticationHandler.java | 18 +++++----- .../core/StatefulRedisConnectionImpl.java | 1 - .../core/RedisAuthenticationHandlerTest.java | 36 +++++++++---------- 4 files changed, 29 insertions(+), 30 deletions(-) diff --git a/src/main/java/io/lettuce/core/ConnectionBuilder.java b/src/main/java/io/lettuce/core/ConnectionBuilder.java index 444c13f90..2a397b6e1 100644 --- a/src/main/java/io/lettuce/core/ConnectionBuilder.java +++ b/src/main/java/io/lettuce/core/ConnectionBuilder.java @@ -119,8 +119,8 @@ public void registerAuthenticationHandler(RedisCredentialsProvider credentialsPr LettuceAssert.assertState(connection != null, "Connection must be set"); LettuceAssert.assertState(clientResources != null, "ClientResources must be set"); - RedisAuthenticationHandler authenticationHandler = new RedisAuthenticationHandler(connection.getChannelWriter(), - credentialsProvider, state, clientResources.eventBus(), isPubSubConnection); + RedisAuthenticationHandler authenticationHandler = new RedisAuthenticationHandler(connection, credentialsProvider, + state, clientResources.eventBus(), isPubSubConnection); endpoint.registerAuthenticationHandler(authenticationHandler); } diff --git a/src/main/java/io/lettuce/core/RedisAuthenticationHandler.java b/src/main/java/io/lettuce/core/RedisAuthenticationHandler.java index e8cb1d5a0..cd3cf6239 100644 --- a/src/main/java/io/lettuce/core/RedisAuthenticationHandler.java +++ b/src/main/java/io/lettuce/core/RedisAuthenticationHandler.java @@ -46,7 +46,7 @@ public class RedisAuthenticationHandler { private static final InternalLogger log = InternalLoggerFactory.getInstance(RedisAuthenticationHandler.class); - private final RedisChannelWriter writer; + private final RedisChannelHandler connection; private final ConnectionState state; @@ -60,9 +60,9 @@ public class RedisAuthenticationHandler { private final Boolean isPubSubConnection; - public RedisAuthenticationHandler(RedisChannelWriter writer, RedisCredentialsProvider credentialsProvider, + public RedisAuthenticationHandler(RedisChannelHandler connection, RedisCredentialsProvider credentialsProvider, ConnectionState state, EventBus eventBus, Boolean isPubSubConnection) { - this.writer = writer; + this.connection = connection; this.state = state; this.credentialsProvider = credentialsProvider; this.eventBus = eventBus; @@ -144,11 +144,11 @@ protected void reauthenticate(RedisCredentials credentials) { }); } - private AsyncCommand dispatchAuth(RedisCommand authCommand) { - AsyncCommand asyncCommand = new AsyncCommand<>(authCommand); - RedisCommand dispatched = writer.write(asyncCommand); + private AsyncCommand dispatchAuth(RedisCommand authCommand) { + AsyncCommand asyncCommand = new AsyncCommand<>(authCommand); + RedisCommand dispatched = connection.dispatch(asyncCommand); if (dispatched instanceof AsyncCommand) { - return (AsyncCommand) dispatched; + return (AsyncCommand) dispatched; } return asyncCommand; } @@ -170,8 +170,8 @@ protected boolean isSupportedConnection() { } private String getEpid() { - if (writer instanceof Endpoint) { - return ((Endpoint) writer).getId(); + if (connection.getChannelWriter() instanceof Endpoint) { + return ((Endpoint) connection.getChannelWriter()).getId(); } return "unknown"; } diff --git a/src/main/java/io/lettuce/core/StatefulRedisConnectionImpl.java b/src/main/java/io/lettuce/core/StatefulRedisConnectionImpl.java index 3304cfdd4..853f9ee24 100644 --- a/src/main/java/io/lettuce/core/StatefulRedisConnectionImpl.java +++ b/src/main/java/io/lettuce/core/StatefulRedisConnectionImpl.java @@ -26,7 +26,6 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; -import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.stream.Collectors; diff --git a/src/test/java/io/lettuce/core/RedisAuthenticationHandlerTest.java b/src/test/java/io/lettuce/core/RedisAuthenticationHandlerTest.java index 28b7cedaf..98b5b0102 100644 --- a/src/test/java/io/lettuce/core/RedisAuthenticationHandlerTest.java +++ b/src/test/java/io/lettuce/core/RedisAuthenticationHandlerTest.java @@ -25,7 +25,7 @@ public class RedisAuthenticationHandlerTest { - private RedisChannelWriter channelWriter; + private RedisChannelHandler channelHandler; EventBus eventBus; @@ -34,7 +34,7 @@ public class RedisAuthenticationHandlerTest { @BeforeEach void setUp() { eventBus = new DefaultEventBus(Schedulers.immediate()); - channelWriter = mock(RedisChannelWriter.class); + channelHandler = mock(RedisChannelHandler.class); connectionState = mock(ConnectionState.class); } @@ -43,17 +43,17 @@ void setUp() { void subscribeWithStreamingCredentialsProviderInvokesReauth() { MyStreamingRedisCredentialsProvider credentialsProvider = new MyStreamingRedisCredentialsProvider(); - RedisAuthenticationHandler handler = new RedisAuthenticationHandler(channelWriter, credentialsProvider, connectionState, - eventBus, false); + RedisAuthenticationHandler handler = new RedisAuthenticationHandler(channelHandler, credentialsProvider, + connectionState, eventBus, false); // Subscribe to the provider handler.subscribe(); credentialsProvider.emitCredentials("newuser", "newpassword".toCharArray()); - ArgumentCaptor> captor = ArgumentCaptor.forClass(RedisCommand.class); - verify(channelWriter).write(captor.capture()); + ArgumentCaptor> captor = ArgumentCaptor.forClass(RedisCommand.class); + verify(channelHandler).dispatch(captor.capture()); - RedisCommand capturedCommand = captor.getValue(); + RedisCommand capturedCommand = captor.getValue(); assertThat(capturedCommand.getType()).isEqualTo(CommandType.AUTH); assertThat(capturedCommand.getArgs().toCommandString()).contains("newuser"); assertThat(capturedCommand.getArgs().toCommandString()).contains("newpassword"); @@ -65,10 +65,10 @@ void subscribeWithStreamingCredentialsProviderInvokesReauth() { void shouldHandleErrorInCredentialsStream() { MyStreamingRedisCredentialsProvider credentialsProvider = new MyStreamingRedisCredentialsProvider(); - RedisAuthenticationHandler handler = new RedisAuthenticationHandler(channelWriter, credentialsProvider, connectionState, - eventBus, false); + RedisAuthenticationHandler handler = new RedisAuthenticationHandler(channelHandler, credentialsProvider, + connectionState, eventBus, false); - verify(channelWriter, times(0)).write(any(RedisCommand.class)); // No command should be sent + verify(channelHandler, times(0)).dispatch(any(RedisCommand.class)); // No command should be sent // Verify the event was published StepVerifier.create(eventBus.get()).then(() -> { @@ -84,8 +84,8 @@ void shouldNotSubscribeIfConnectionIsNotSupported() { StreamingCredentialsProvider credentialsProvider = mock(StreamingCredentialsProvider.class); when(connectionState.getNegotiatedProtocolVersion()).thenReturn(ProtocolVersion.RESP2); - RedisAuthenticationHandler handler = new RedisAuthenticationHandler(channelWriter, credentialsProvider, connectionState, - eventBus, true); + RedisAuthenticationHandler handler = new RedisAuthenticationHandler(channelHandler, credentialsProvider, + connectionState, eventBus, true); // Subscribe to the provider (it should not subscribe due to unsupported connection) handler.subscribe(); @@ -96,12 +96,12 @@ void shouldNotSubscribeIfConnectionIsNotSupported() { @Test void testIsSupportedConnectionWithRESP2ProtocolOnPubSubConnection() { - RedisChannelWriter writer = mock(RedisChannelWriter.class); + RedisChannelHandler connection = mock(RedisChannelHandler.class); ConnectionState connectionState = mock(ConnectionState.class); when(connectionState.getNegotiatedProtocolVersion()).thenReturn(ProtocolVersion.RESP2); - RedisAuthenticationHandler handler = new RedisAuthenticationHandler(writer, mock(RedisCredentialsProvider.class), + RedisAuthenticationHandler handler = new RedisAuthenticationHandler(connection, mock(RedisCredentialsProvider.class), connectionState, mock(EventBus.class), true); assertFalse(handler.isSupportedConnection()); @@ -109,11 +109,11 @@ void testIsSupportedConnectionWithRESP2ProtocolOnPubSubConnection() { @Test void testIsSupportedConnectionWithNonPubSubConnection() { - RedisChannelWriter writer = mock(RedisChannelWriter.class); + RedisChannelHandler connection = mock(RedisChannelHandler.class); ConnectionState connectionState = mock(ConnectionState.class); when(connectionState.getNegotiatedProtocolVersion()).thenReturn(ProtocolVersion.RESP2); - RedisAuthenticationHandler handler = new RedisAuthenticationHandler(writer, mock(RedisCredentialsProvider.class), + RedisAuthenticationHandler handler = new RedisAuthenticationHandler(connection, mock(RedisCredentialsProvider.class), connectionState, mock(EventBus.class), false); assertTrue(handler.isSupportedConnection()); @@ -122,11 +122,11 @@ void testIsSupportedConnectionWithNonPubSubConnection() { @Test void testIsSupportedConnectionWithRESP3ProtocolOnPubSubConnection() { - RedisChannelWriter writer = mock(RedisChannelWriter.class); + RedisChannelHandler connection = mock(RedisChannelHandler.class); ConnectionState connectionState = mock(ConnectionState.class); when(connectionState.getNegotiatedProtocolVersion()).thenReturn(ProtocolVersion.RESP3); - RedisAuthenticationHandler handler = new RedisAuthenticationHandler(writer, mock(RedisCredentialsProvider.class), + RedisAuthenticationHandler handler = new RedisAuthenticationHandler(connection, mock(RedisCredentialsProvider.class), connectionState, mock(EventBus.class), true); assertTrue(handler.isSupportedConnection());