diff --git a/src/test/java/io/lettuce/core/AuthenticationIntegrationTests.java b/src/test/java/io/lettuce/core/AuthenticationIntegrationTests.java index 4ffb80203..01ef7b853 100644 --- a/src/test/java/io/lettuce/core/AuthenticationIntegrationTests.java +++ b/src/test/java/io/lettuce/core/AuthenticationIntegrationTests.java @@ -112,6 +112,7 @@ void streamingCredentialProvider(RedisClient client) { // verify that the connection is re-authenticated with the new user credentials assertThat(connection.sync().aclWhoami()).isEqualTo("steave"); + credentialsProvider.shutdown(); connection.close(); client.removeListener(listener); } diff --git a/src/test/java/io/lettuce/core/BaseRedisAuthenticationHandlerTest.java b/src/test/java/io/lettuce/core/BaseRedisAuthenticationHandlerTest.java index 9ac572177..0b1a0e2f5 100644 --- a/src/test/java/io/lettuce/core/BaseRedisAuthenticationHandlerTest.java +++ b/src/test/java/io/lettuce/core/BaseRedisAuthenticationHandlerTest.java @@ -53,20 +53,21 @@ void subscribeWithStreamingCredentialsProviderInvokesReauth() { assertThat(capturedCommand.getType()).isEqualTo(CommandType.AUTH); assertThat(capturedCommand.getArgs().toCommandString()).contains("newuser"); assertThat(capturedCommand.getArgs().toCommandString()).contains("newpassword"); + + credentialsProvider.shutdown(); } @Test void shouldHandleErrorInCredentialsStream() { - Sinks.Many sink = Sinks.many().replay().latest(); - Flux credentialsFlux = sink.asFlux(); - StreamingCredentialsProvider credentialsProvider = mock(StreamingCredentialsProvider.class); - when(credentialsProvider.credentials()).thenReturn(credentialsFlux); + MyStreamingRedisCredentialsProvider credentialsProvider = new MyStreamingRedisCredentialsProvider(); // Subscribe to the provider and simulate an error handler.subscribe(credentialsProvider); - sink.tryEmitError(new RuntimeException("Test error")); + credentialsProvider.tryEmitError(new RuntimeException("Test error")); verify(connection.getChannelWriter(), times(0)).write(any(RedisCommand.class)); // No command should be sent + + credentialsProvider.shutdown(); } @Test diff --git a/src/test/java/io/lettuce/core/MyStreamingRedisCredentialsProvider.java b/src/test/java/io/lettuce/core/MyStreamingRedisCredentialsProvider.java index 6d03bff63..34f60035e 100644 --- a/src/test/java/io/lettuce/core/MyStreamingRedisCredentialsProvider.java +++ b/src/test/java/io/lettuce/core/MyStreamingRedisCredentialsProvider.java @@ -31,4 +31,8 @@ public void emitCredentials(String username, char[] password) { credentialsSink.tryEmitNext(new StaticRedisCredentials(username, password)); } + public void tryEmitError(RuntimeException testError) { + credentialsSink.tryEmitError(testError); + } + } diff --git a/src/test/java/io/lettuce/core/cluster/RedisClusterStreamingCredentialsProviderlIntegrationTests.java b/src/test/java/io/lettuce/core/cluster/RedisClusterStreamingCredentialsProviderlIntegrationTests.java index 2d91b6b0f..d69882567 100644 --- a/src/test/java/io/lettuce/core/cluster/RedisClusterStreamingCredentialsProviderlIntegrationTests.java +++ b/src/test/java/io/lettuce/core/cluster/RedisClusterStreamingCredentialsProviderlIntegrationTests.java @@ -10,16 +10,13 @@ import io.lettuce.core.cluster.api.StatefulRedisClusterConnection; import io.lettuce.core.cluster.api.sync.Executions; import io.lettuce.core.cluster.api.sync.RedisAdvancedClusterCommands; -import io.lettuce.core.protocol.Command; import io.lettuce.test.CanConnect; -import io.lettuce.test.CliParser; import io.lettuce.test.resource.FastShutdown; import io.lettuce.test.resource.TestClientResources; import io.lettuce.test.settings.TestSettings; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; @@ -72,6 +69,7 @@ static void beforeClass() { @AfterAll static void afterClass() { + credentialsProvider.shutdown(); FastShutdown.shutdown(redisClient); }