From 7ea1e27c48380bcddab69f24678152145c2df954 Mon Sep 17 00:00:00 2001 From: ggivo Date: Thu, 24 Oct 2024 14:28:43 +0300 Subject: [PATCH] Closes #2940 Sharded PubSub subscriptions not recovered after disconnection and re-connection. (#3026) Backport to 6.4.x branch --- .../lettuce/core/pubsub/PubSubEndpoint.java | 8 +++++ .../StatefulRedisPubSubConnectionImpl.java | 4 +++ .../core/pubsub/PubSubCommandTest.java | 20 +++++++++++++ ...fulRedisPubSubConnectionImplUnitTests.java | 29 +++++++++++++++---- 4 files changed, 56 insertions(+), 5 deletions(-) diff --git a/src/main/java/io/lettuce/core/pubsub/PubSubEndpoint.java b/src/main/java/io/lettuce/core/pubsub/PubSubEndpoint.java index 6da336c4d0..bee692eab8 100644 --- a/src/main/java/io/lettuce/core/pubsub/PubSubEndpoint.java +++ b/src/main/java/io/lettuce/core/pubsub/PubSubEndpoint.java @@ -129,6 +129,14 @@ public Set getChannels() { return unwrap(this.channels); } + public boolean hasShardChannelSubscriptions() { + return !shardChannels.isEmpty(); + } + + public Set getShardChannels() { + return unwrap(this.shardChannels); + } + public boolean hasPatternSubscriptions() { return !patterns.isEmpty(); } diff --git a/src/main/java/io/lettuce/core/pubsub/StatefulRedisPubSubConnectionImpl.java b/src/main/java/io/lettuce/core/pubsub/StatefulRedisPubSubConnectionImpl.java index 33b1f1412e..30b4840913 100644 --- a/src/main/java/io/lettuce/core/pubsub/StatefulRedisPubSubConnectionImpl.java +++ b/src/main/java/io/lettuce/core/pubsub/StatefulRedisPubSubConnectionImpl.java @@ -130,6 +130,10 @@ protected List> resubscribe() { result.add(async().subscribe(toArray(endpoint.getChannels()))); } + if (endpoint.hasShardChannelSubscriptions()) { + result.add(async().ssubscribe(toArray(endpoint.getShardChannels()))); + } + if (endpoint.hasPatternSubscriptions()) { result.add(async().psubscribe(toArray(endpoint.getPatterns()))); } diff --git a/src/test/java/io/lettuce/core/pubsub/PubSubCommandTest.java b/src/test/java/io/lettuce/core/pubsub/PubSubCommandTest.java index c6fd838050..afcae01a0a 100644 --- a/src/test/java/io/lettuce/core/pubsub/PubSubCommandTest.java +++ b/src/test/java/io/lettuce/core/pubsub/PubSubCommandTest.java @@ -84,6 +84,8 @@ class PubSubCommandTest extends AbstractRedisClientTest { BlockingQueue counts = listener.getCounts(); + BlockingQueue shardCounts = listener.getShardCounts(); + String channel = "channel0"; String shardChannel = "shard-channel"; @@ -521,6 +523,24 @@ void resubscribePatternsOnReconnect() throws Exception { assertThat(messages.take()).isEqualTo(message); } + @Test + void resubscribeShardChannelsOnReconnect() throws Exception { + pubsub.ssubscribe(shardChannel); + assertThat(shardChannels.take()).isEqualTo(shardChannel); + assertThat((long) shardCounts.take()).isEqualTo(1); + + pubsub.quit(); + + assertThat(shardChannels.take()).isEqualTo(shardChannel); + assertThat((long) shardCounts.take()).isEqualTo(1); + + Wait.untilTrue(pubsub::isOpen).waitOrTimeout(); + + redis.spublish(shardChannel, shardMessage); + assertThat(shardChannels.take()).isEqualTo(shardChannel); + assertThat(messages.take()).isEqualTo(shardMessage); + } + @Test void adapter() throws Exception { final BlockingQueue localCounts = LettuceFactories.newBlockingQueue(); diff --git a/src/test/java/io/lettuce/core/pubsub/StatefulRedisPubSubConnectionImplUnitTests.java b/src/test/java/io/lettuce/core/pubsub/StatefulRedisPubSubConnectionImplUnitTests.java index 160a601125..d359c32c0c 100644 --- a/src/test/java/io/lettuce/core/pubsub/StatefulRedisPubSubConnectionImplUnitTests.java +++ b/src/test/java/io/lettuce/core/pubsub/StatefulRedisPubSubConnectionImplUnitTests.java @@ -10,7 +10,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import static org.junit.Assert.assertEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.mockito.Mockito.*; @@ -78,6 +78,7 @@ void resubscribeChannelSubscription() { when(mockedEndpoint.hasChannelSubscriptions()).thenReturn(true); when(mockedEndpoint.getChannels()).thenReturn(new HashSet<>(Arrays.asList(new String[] { "channel1", "channel2" }))); when(mockedEndpoint.hasPatternSubscriptions()).thenReturn(false); + when(mockedEndpoint.hasShardChannelSubscriptions()).thenReturn(false); List> subscriptions = connection.resubscribe(); RedisFuture commandFuture = subscriptions.get(0); @@ -87,17 +88,35 @@ void resubscribeChannelSubscription() { } @Test - void resubscribeChannelAndPatternSubscription() { + void resubscribeShardChannelSubscription() { + when(mockedEndpoint.hasShardChannelSubscriptions()).thenReturn(true); + when(mockedEndpoint.getShardChannels()) + .thenReturn(new HashSet<>(Arrays.asList(new String[] { "shard_channel1", "shard_channel2" }))); + when(mockedEndpoint.hasChannelSubscriptions()).thenReturn(false); + when(mockedEndpoint.hasPatternSubscriptions()).thenReturn(false); + + List> subscriptions = connection.resubscribe(); + RedisFuture commandFuture = subscriptions.get(0); + + assertEquals(1, subscriptions.size()); + assertInstanceOf(AsyncCommand.class, commandFuture); + } + + @Test + void resubscribeChannelAndPatternAndShardChanelSubscription() { when(mockedEndpoint.hasChannelSubscriptions()).thenReturn(true); - when(mockedEndpoint.getChannels()).thenReturn(new HashSet<>(Arrays.asList(new String[] { "channel1", "channel2" }))); when(mockedEndpoint.hasPatternSubscriptions()).thenReturn(true); + when(mockedEndpoint.hasShardChannelSubscriptions()).thenReturn(true); + when(mockedEndpoint.getChannels()).thenReturn(new HashSet<>(Arrays.asList(new String[] { "channel1", "channel2" }))); when(mockedEndpoint.getPatterns()).thenReturn(new HashSet<>(Arrays.asList(new String[] { "bcast*", "echo" }))); - + when(mockedEndpoint.getShardChannels()) + .thenReturn(new HashSet<>(Arrays.asList(new String[] { "shard_channel1", "shard_channel2" }))); List> subscriptions = connection.resubscribe(); - assertEquals(2, subscriptions.size()); + assertEquals(3, subscriptions.size()); assertInstanceOf(AsyncCommand.class, subscriptions.get(0)); assertInstanceOf(AsyncCommand.class, subscriptions.get(1)); + assertInstanceOf(AsyncCommand.class, subscriptions.get(1)); } }