diff --git a/src/main/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubListener.java b/src/main/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubListener.java index 9bb5c48fcc..650dc233ca 100644 --- a/src/main/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubListener.java +++ b/src/main/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubListener.java @@ -73,10 +73,10 @@ public interface RedisClusterPubSubListener { * @param node the {@link RedisClusterNode} from which the {@code message} originates. * @param shardChannel Shard channel * @param count Subscription count. + * @since 7.0 */ - default void ssubscribed(RedisClusterNode node, K shardChannel, long count){ - throw new UnsupportedOperationException( - "This operation is not available by default! Use one of available pub/sub adapters or override this method in your class."); + default void ssubscribed(RedisClusterNode node, K shardChannel, long count) { + subscribed(node, shardChannel, count); } } diff --git a/src/main/java/io/lettuce/core/cluster/pubsub/api/async/NodeSelectionPubSubAsyncCommands.java b/src/main/java/io/lettuce/core/cluster/pubsub/api/async/NodeSelectionPubSubAsyncCommands.java index 8f3a3485a7..ee8630378e 100644 --- a/src/main/java/io/lettuce/core/cluster/pubsub/api/async/NodeSelectionPubSubAsyncCommands.java +++ b/src/main/java/io/lettuce/core/cluster/pubsub/api/async/NodeSelectionPubSubAsyncCommands.java @@ -47,6 +47,7 @@ public interface NodeSelectionPubSubAsyncCommands { * * @param shardChannels the channels * @return RedisFuture<Void> Future to synchronize {@code subscribe} completion + * @since 7.0 */ AsyncExecutions ssubscribe(K... shardChannels); } diff --git a/src/main/java/io/lettuce/core/cluster/pubsub/api/reactive/NodeSelectionPubSubReactiveCommands.java b/src/main/java/io/lettuce/core/cluster/pubsub/api/reactive/NodeSelectionPubSubReactiveCommands.java index 12261f57bb..b61395cf2e 100644 --- a/src/main/java/io/lettuce/core/cluster/pubsub/api/reactive/NodeSelectionPubSubReactiveCommands.java +++ b/src/main/java/io/lettuce/core/cluster/pubsub/api/reactive/NodeSelectionPubSubReactiveCommands.java @@ -42,11 +42,12 @@ public interface NodeSelectionPubSubReactiveCommands { */ ReactiveExecutions unsubscribe(K... channels); - /** + /** * Listen for messages published to the given shard channels. * * @param shardCchannels the channels * @return RedisFuture<Void> Future to synchronize {@code subscribe} completion + * @since 7.0 */ ReactiveExecutions ssubscribe(K... shardCchannels); diff --git a/src/main/java/io/lettuce/core/cluster/pubsub/api/sync/NodeSelectionPubSubCommands.java b/src/main/java/io/lettuce/core/cluster/pubsub/api/sync/NodeSelectionPubSubCommands.java index 6e8e60a8e3..ba7fe76af7 100644 --- a/src/main/java/io/lettuce/core/cluster/pubsub/api/sync/NodeSelectionPubSubCommands.java +++ b/src/main/java/io/lettuce/core/cluster/pubsub/api/sync/NodeSelectionPubSubCommands.java @@ -42,11 +42,12 @@ public interface NodeSelectionPubSubCommands { */ Executions unsubscribe(K... channels); - /** + /** * Listen for messages published to the given shard channels. * * @param shardChannels the channels * @return Executions Future to synchronize {@code subscribe} completion + * @since 7.0 */ Executions ssubscribe(K... shardChannels); diff --git a/src/main/java/io/lettuce/core/pubsub/RedisPubSubListener.java b/src/main/java/io/lettuce/core/pubsub/RedisPubSubListener.java index c563d7c03b..71b5a6ad13 100644 --- a/src/main/java/io/lettuce/core/pubsub/RedisPubSubListener.java +++ b/src/main/java/io/lettuce/core/pubsub/RedisPubSubListener.java @@ -82,9 +82,10 @@ public interface RedisPubSubListener { * * @param shardChannel Shard channel * @param count Subscription count. + * @since 7.0 */ - default void ssubscribed(K chashardChannelnnel, long count) { - throw new UnsupportedOperationException( - "This operation is not available by default. Use one of available pub/sub adapters or override this method in your class!"); + default void ssubscribed(K shardChannel, long count) { + subscribed(shardChannel, count); } + } diff --git a/src/main/java/io/lettuce/core/pubsub/RedisPubSubReactiveCommandsImpl.java b/src/main/java/io/lettuce/core/pubsub/RedisPubSubReactiveCommandsImpl.java index 56360f0c83..218fbe9c77 100644 --- a/src/main/java/io/lettuce/core/pubsub/RedisPubSubReactiveCommandsImpl.java +++ b/src/main/java/io/lettuce/core/pubsub/RedisPubSubReactiveCommandsImpl.java @@ -161,11 +161,7 @@ public Mono> pubsubShardNumsub(K... shardChannels) { @Override public Mono ssubscribe(K... shardChannels) { - return createFlux(() -> { - Command c = commandBuilder.ssubscribe(shardChannels); - System.out.println(""); - return c; - }).then(); + return createFlux(() -> commandBuilder.ssubscribe(shardChannels)).then(); } @Override diff --git a/src/main/java/io/lettuce/core/pubsub/api/async/RedisPubSubAsyncCommands.java b/src/main/java/io/lettuce/core/pubsub/api/async/RedisPubSubAsyncCommands.java index 4e03824b6a..90c301a58e 100644 --- a/src/main/java/io/lettuce/core/pubsub/api/async/RedisPubSubAsyncCommands.java +++ b/src/main/java/io/lettuce/core/pubsub/api/async/RedisPubSubAsyncCommands.java @@ -56,6 +56,7 @@ public interface RedisPubSubAsyncCommands extends RedisAsyncCommands * * @param shardChannels the shard channels * @return RedisFuture<Void> Future to synchronize {@code subscribe} completion + * @since 7.0 */ RedisFuture ssubscribe(K... shardChannels); diff --git a/src/main/java/io/lettuce/core/pubsub/api/reactive/RedisPubSubReactiveCommands.java b/src/main/java/io/lettuce/core/pubsub/api/reactive/RedisPubSubReactiveCommands.java index 96e8b22753..d6f0476d67 100644 --- a/src/main/java/io/lettuce/core/pubsub/api/reactive/RedisPubSubReactiveCommands.java +++ b/src/main/java/io/lettuce/core/pubsub/api/reactive/RedisPubSubReactiveCommands.java @@ -103,6 +103,7 @@ public interface RedisPubSubReactiveCommands extends RedisReactiveCommands * * @param shardChannels the channels. * @return Mono<Void> Mono for {@code subscribe} command. + * @since 7.0 */ Mono ssubscribe(K... shardChannels); diff --git a/src/main/java/io/lettuce/core/pubsub/api/sync/RedisPubSubCommands.java b/src/main/java/io/lettuce/core/pubsub/api/sync/RedisPubSubCommands.java index 7fbe5adb13..44a59b6c6d 100644 --- a/src/main/java/io/lettuce/core/pubsub/api/sync/RedisPubSubCommands.java +++ b/src/main/java/io/lettuce/core/pubsub/api/sync/RedisPubSubCommands.java @@ -45,6 +45,7 @@ public interface RedisPubSubCommands extends RedisCommands { * Listen for messages published to the given shard channels. * * @param shardChannels the channels + * @since 7.0 */ void ssubscribe(K... shardChannels); diff --git a/src/test/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubConnectionIntegrationTests.java b/src/test/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubConnectionIntegrationTests.java index e267e35cc2..3636522574 100644 --- a/src/test/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubConnectionIntegrationTests.java +++ b/src/test/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubConnectionIntegrationTests.java @@ -36,12 +36,13 @@ import io.lettuce.test.LettuceExtension; import io.lettuce.test.TestFutures; import io.lettuce.test.Wait; +import io.lettuce.test.condition.EnabledOnCommand; /** * @author Mark Paluch */ @ExtendWith(LettuceExtension.class) -class RedisClusterPubSubConnectionIntegrationTests extends TestSupport implements RedisClusterPubSubListener { +class RedisClusterPubSubConnectionIntegrationTests extends TestSupport { private final RedisClusterClient clusterClient; @@ -113,29 +114,23 @@ void testRegularClientPubSubShardChannels() { } @Test + @EnabledOnCommand("SSUBSCRIBE") void subscribeToShardChannel() throws Exception { - pubSubConnection.addListener(this); + pubSubConnection.addListener(connectionListener); pubSubConnection.async().ssubscribe(shardChannel); - assertThat(shardChannels.poll(3, TimeUnit.SECONDS)).isEqualTo(shardChannel); + assertThat(connectionListener.getChannels().poll(3, TimeUnit.SECONDS)).isEqualTo(shardChannel); } @Test + @EnabledOnCommand("SSUBSCRIBE") void subscribeToShardChannelViaOtherEndpoint() throws Exception { - pubSubConnection.addListener(this); + pubSubConnection.addListener(connectionListener); RedisClusterPubSubAsyncCommands pubSub = pubSubConnection.async(); - String nodeId = getNodeId(pubSub); + String nodeId = pubSub.clusterMyId().get(); RedisPubSubAsyncCommands other = pubSub .nodes(node -> node.getRole().isUpstream() && !node.getNodeId().equals(nodeId)).commands(0); other.ssubscribe(shardChannel); - assertThat(shardChannels.poll(3, TimeUnit.SECONDS)).isEqualTo(shardChannel); - } - - private String getNodeId(RedisClusterPubSubAsyncCommands clusterPubsub2) { - try { - return clusterPubsub2.clusterMyId().get(); - } catch (Exception e) { - throw new RuntimeException(e); - } + assertThat(connectionListener.getChannels().poll(3, TimeUnit.SECONDS)).isEqualTo(shardChannel); } @Test @@ -391,35 +386,4 @@ private RedisClusterNode getOtherThan(String nodeId) { throw new IllegalStateException("No other nodes than " + nodeId + " available"); } - // RedisClusterShardedPubSubListener implementation - - @Override - public void message(RedisClusterNode node, String channel, String message) { - } - - @Override - public void message(RedisClusterNode node, String pattern, String channel, String message) { - } - - @Override - public void subscribed(RedisClusterNode node, String channel, long count) { - } - - @Override - public void psubscribed(RedisClusterNode node, String pattern, long count) { - } - - @Override - public void unsubscribed(RedisClusterNode node, String channel, long count) { - } - - @Override - public void punsubscribed(RedisClusterNode node, String pattern, long count) { - } - - @Override - public void ssubscribed(RedisClusterNode node, String shardChannel, long count) { - shardChannels.add(shardChannel); - } - } diff --git a/src/test/java/io/lettuce/core/pubsub/PubSubCommandTest.java b/src/test/java/io/lettuce/core/pubsub/PubSubCommandTest.java index 0574454a82..62b9662fd0 100644 --- a/src/test/java/io/lettuce/core/pubsub/PubSubCommandTest.java +++ b/src/test/java/io/lettuce/core/pubsub/PubSubCommandTest.java @@ -39,19 +39,15 @@ import io.lettuce.core.*; import io.lettuce.core.api.async.RedisAsyncCommands; import io.lettuce.core.api.push.PushMessage; -import io.lettuce.core.cluster.models.partitions.RedisClusterNode; -import io.lettuce.core.cluster.pubsub.RedisClusterPubSubListener; -import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection; -import io.lettuce.core.cluster.pubsub.api.async.RedisClusterPubSubAsyncCommands; import io.lettuce.core.internal.LettuceFactories; import io.lettuce.core.protocol.ProtocolVersion; import io.lettuce.core.pubsub.api.async.RedisPubSubAsyncCommands; +import io.lettuce.core.support.PubSubTestListener; import io.lettuce.test.Delay; import io.lettuce.test.TestFutures; import io.lettuce.test.Wait; import io.lettuce.test.WithPassword; import io.lettuce.test.condition.EnabledOnCommand; -import io.lettuce.test.resource.DefaultRedisClusterClient; import io.lettuce.test.resource.FastShutdown; import io.lettuce.test.resource.TestClientResources; @@ -65,18 +61,19 @@ * @author Tihomir Mateev * @author Ali Takavci */ -class PubSubCommandTest extends AbstractRedisClientTest - implements RedisPubSubListener { +class PubSubCommandTest extends AbstractRedisClientTest { RedisPubSubAsyncCommands pubsub; - BlockingQueue channels; + PubSubTestListener listener = new PubSubTestListener(); - BlockingQueue patterns; + BlockingQueue channels = listener.getChannels(); - BlockingQueue messages; + BlockingQueue patterns = listener.getPatterns(); - BlockingQueue counts; + BlockingQueue messages = listener.getMessages(); + + BlockingQueue counts = listener.getCounts(); String channel = "channel0"; @@ -92,12 +89,12 @@ void openPubSubConnection() { client.setOptions(getOptions()); pubsub = client.connectPubSub().async(); - pubsub.getStatefulConnection().addListener(this); + pubsub.getStatefulConnection().addListener(listener); } finally { - channels = LettuceFactories.newBlockingQueue(); - patterns = LettuceFactories.newBlockingQueue(); - messages = LettuceFactories.newBlockingQueue(); - counts = LettuceFactories.newBlockingQueue(); + channels.clear(); + patterns.clear(); + messages.clear(); + counts.clear(); } } @@ -119,7 +116,7 @@ void auth() { client.setOptions( ClientOptions.builder().protocolVersion(ProtocolVersion.RESP2).pingBeforeActivateConnection(false).build()); RedisPubSubAsyncCommands connection = client.connectPubSub().async(); - connection.getStatefulConnection().addListener(PubSubCommandTest.this); + connection.getStatefulConnection().addListener(listener); connection.auth(passwd); connection.subscribe(channel); @@ -135,7 +132,7 @@ void authWithUsername() { client.setOptions( ClientOptions.builder().protocolVersion(ProtocolVersion.RESP2).pingBeforeActivateConnection(false).build()); RedisPubSubAsyncCommands connection = client.connectPubSub().async(); - connection.getStatefulConnection().addListener(PubSubCommandTest.this); + connection.getStatefulConnection().addListener(listener); connection.auth(username, passwd); connection.subscribe(channel); @@ -152,7 +149,7 @@ void authWithReconnect() { ClientOptions.builder().protocolVersion(ProtocolVersion.RESP2).pingBeforeActivateConnection(false).build()); RedisPubSubAsyncCommands connection = client.connectPubSub().async(); - connection.getStatefulConnection().addListener(PubSubCommandTest.this); + connection.getStatefulConnection().addListener(listener); connection.auth(passwd); connection.clientSetname("authWithReconnect"); @@ -181,7 +178,7 @@ void authWithUsernameAndReconnect() { ClientOptions.builder().protocolVersion(ProtocolVersion.RESP2).pingBeforeActivateConnection(false).build()); RedisPubSubAsyncCommands connection = client.connectPubSub().async(); - connection.getStatefulConnection().addListener(PubSubCommandTest.this); + connection.getStatefulConnection().addListener(listener); connection.auth(username, passwd); connection.clientSetname("authWithReconnect"); connection.subscribe(channel).get(); @@ -222,7 +219,6 @@ void message() throws Exception { assertThat(messages.take()).isEqualTo(message); } - @Test @EnabledOnCommand("ACL") void messageAsPushMessage() throws Exception { @@ -541,7 +537,7 @@ void removeListener() throws Exception { assertThat(channels.take()).isEqualTo(channel); assertThat(messages.take()).isEqualTo(message); - pubsub.getStatefulConnection().removeListener(this); + pubsub.getStatefulConnection().removeListener(listener); redis.publish(channel, message); assertThat(channels.poll(10, TimeUnit.MILLISECONDS)).isNull(); @@ -557,49 +553,4 @@ void echoAllowedInSubscriptionState() { pubsub.unsubscribe(channel); } - // RedisPubSubListener implementation - - @Override - public void message(String channel, String message) { - channels.add(channel); - messages.add(message); - } - - @Override - public void message(String pattern, String channel, String message) { - patterns.add(pattern); - channels.add(channel); - messages.add(message); - } - - @Override - public void subscribed(String channel, long count) { - channels.add(channel); - counts.add(count); - } - - @Override - public void psubscribed(String pattern, long count) { - patterns.add(pattern); - counts.add(count); - } - - @Override - public void unsubscribed(String channel, long count) { - channels.add(channel); - counts.add(count); - } - - @Override - public void punsubscribed(String pattern, long count) { - patterns.add(pattern); - counts.add(count); - } - - @Override - public void ssubscribed(String shardChannel, long count) { - channels.add(shardChannel); - counts.add(count); - } - } diff --git a/src/test/java/io/lettuce/core/pubsub/RedisPubSubAsyncCommandsImplUnitTests.java b/src/test/java/io/lettuce/core/pubsub/RedisPubSubAsyncCommandsImplUnitTests.java index 6ad35bfd57..17f159276e 100644 --- a/src/test/java/io/lettuce/core/pubsub/RedisPubSubAsyncCommandsImplUnitTests.java +++ b/src/test/java/io/lettuce/core/pubsub/RedisPubSubAsyncCommandsImplUnitTests.java @@ -202,17 +202,18 @@ void pubsubShardNumsub() throws ExecutionException, InterruptedException { @Test void ssubscribe() throws ExecutionException, InterruptedException { String pattern = "channelPattern"; - AsyncCommand dispachedMock = mock (AsyncCommand.class); + AsyncCommand dispachedMock = mock(AsyncCommand.class); when(mockedConnection.dispatch((RedisCommand) any())).thenReturn(dispachedMock); commands.ssubscribe(pattern).get(); - ArgumentCaptor capturedCommand = ArgumentCaptor.forClass(AsyncCommand.class);; + ArgumentCaptor capturedCommand = ArgumentCaptor.forClass(AsyncCommand.class); + verify(mockedConnection).dispatch(capturedCommand.capture()); - Assertions.assertEquals( SSUBSCRIBE, capturedCommand.getValue().getType()); + Assertions.assertEquals(SSUBSCRIBE, capturedCommand.getValue().getType()); assertInstanceOf(PubSubOutput.class, capturedCommand.getValue().getOutput()); - Assertions.assertEquals( "key", capturedCommand.getValue().getArgs().toCommandString()); + Assertions.assertEquals("key", capturedCommand.getValue().getArgs().toCommandString()); assertNotEquals(capturedCommand.getValue(), dispachedMock); }