Skip to content

Commit

Permalink
use injected cluster client in RedisClusterPubSubConnectionIntegratio…
Browse files Browse the repository at this point in the history
…nTests
  • Loading branch information
atakavci committed Apr 30, 2024
1 parent b906da7 commit 1f3d86d
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import io.lettuce.test.TestFutures;
import io.lettuce.test.Wait;
import io.lettuce.test.condition.EnabledOnCommand;
import io.lettuce.test.resource.DefaultRedisClusterClient;

/**
* Integration tests for Cluster Pub/Sub.
Expand All @@ -48,6 +47,8 @@ class RedisClusterPubSubConnectionIntegrationTests extends TestSupport {

private final RedisClusterClient clusterClient;

private final RedisClusterClient clusterClientWithNoRedirects;

private final PubSubTestListener connectionListener = new PubSubTestListener();

private final PubSubTestListener nodeListener = new PubSubTestListener();
Expand All @@ -65,8 +66,11 @@ class RedisClusterPubSubConnectionIntegrationTests extends TestSupport {
String shardTestChannel = "shard-test-channel";

@Inject
RedisClusterPubSubConnectionIntegrationTests(RedisClusterClient clusterClient) {
RedisClusterPubSubConnectionIntegrationTests(RedisClusterClient clusterClient, RedisClusterClient clusterClient2) {
this.clusterClient = clusterClient;
ClusterClientOptions.Builder builder = ClusterClientOptions.builder().maxRedirects(0);
clusterClient2.setOptions(builder.build());
this.clusterClientWithNoRedirects = clusterClient2;
}

@BeforeEach
Expand Down Expand Up @@ -178,9 +182,11 @@ void publishToShardChannelViaNewClient() throws Exception {
pubSubConnection.addListener(connectionListener);
pubSubConnection.async().ssubscribe(shardChannel);

DefaultRedisClusterClient.get().connectPubSub().async().spublish(shardChannel, shardMessage);
StatefulRedisClusterPubSubConnection<String, String> newPubsub = clusterClientWithNoRedirects.connectPubSub();
newPubsub.async().spublish(shardChannel, shardMessage);
Wait.untilEquals(shardChannel, connectionListener.getShardChannels()::poll).waitOrTimeout();
Wait.untilEquals(shardMessage, connectionListener.getMessages()::poll).waitOrTimeout();
newPubsub.close();
}

@Test
Expand All @@ -193,9 +199,7 @@ void publishToShardChannelViaNewClientWithNoRedirects() throws Exception {
pubSubConnection.async().ssubscribe(shardTestChannel);
Wait.untilEquals(shardTestChannel, connectionListener.getShardChannels()::poll).waitOrTimeout();

ClusterClientOptions.Builder builder = ClusterClientOptions.builder().maxRedirects(0);
RedisClusterPubSubAsyncCommands<String, String> cmd = DefaultRedisClusterClient.get(builder.build()).connectPubSub()
.async();
RedisClusterPubSubAsyncCommands<String, String> cmd = clusterClientWithNoRedirects.connectPubSub().async();

cmd.spublish(shardChannel, shardMessage);
Wait.untilEquals(shardChannel, connectionListener.getShardChannels()::poll).waitOrTimeout();
Expand All @@ -204,6 +208,7 @@ void publishToShardChannelViaNewClientWithNoRedirects() throws Exception {
cmd.spublish(shardTestChannel, shardMessage);
Wait.untilEquals(shardTestChannel, connectionListener.getShardChannels()::poll).waitOrTimeout();
Wait.untilEquals(shardMessage, connectionListener.getMessages()::poll).waitOrTimeout();
cmd.getStatefulConnection().close();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,4 @@ public static RedisClusterClient get() {
instance.redisClient.setOptions(ClusterClientOptions.create());
return instance.redisClient;
}


/**
* Do not close the client.
*
* @return redis client with given options for the tests.
*/
public static RedisClusterClient get(ClusterClientOptions options) {
DefaultRedisClusterClient client = new DefaultRedisClusterClient();
client.redisClient.setOptions(options);
return instance.redisClient;
}
}

0 comments on commit 1f3d86d

Please sign in to comment.