diff --git a/src/main/java/io/lettuce/core/cluster/ClusterPubSubConnectionProvider.java b/src/main/java/io/lettuce/core/cluster/ClusterPubSubConnectionProvider.java index 9d79ae0cbe..ce5abb67e0 100644 --- a/src/main/java/io/lettuce/core/cluster/ClusterPubSubConnectionProvider.java +++ b/src/main/java/io/lettuce/core/cluster/ClusterPubSubConnectionProvider.java @@ -167,6 +167,11 @@ public void punsubscribed(K pattern, long count) { notifications.punsubscribed(getNode(), pattern, count); } + @Override + public void ssubscribed(K channel, long count) { + notifications.ssubscribed(getNode(), channel, count); + } + private RedisClusterNode getNode() { return nodeId != null ? getPartitions().getPartitionByNodeId(nodeId) : getPartitions().getPartition(host, port); } diff --git a/src/main/java/io/lettuce/core/cluster/ClusterReadOnlyCommands.java b/src/main/java/io/lettuce/core/cluster/ClusterReadOnlyCommands.java index 5a9e7d17b3..77bcecade3 100644 --- a/src/main/java/io/lettuce/core/cluster/ClusterReadOnlyCommands.java +++ b/src/main/java/io/lettuce/core/cluster/ClusterReadOnlyCommands.java @@ -58,7 +58,7 @@ public static ReadOnlyCommands.ReadOnlyPredicate asPredicate() { enum CommandName { // Pub/Sub commands are no key-space commands so they are safe to execute on replica nodes - PUBLISH, PUBSUB, PSUBSCRIBE, PUNSUBSCRIBE, SUBSCRIBE, UNSUBSCRIBE + PUBLISH, PUBSUB, PSUBSCRIBE, PUNSUBSCRIBE, SUBSCRIBE, UNSUBSCRIBE, SSUBSCRIBE } } diff --git a/src/main/java/io/lettuce/core/cluster/PubSubClusterEndpoint.java b/src/main/java/io/lettuce/core/cluster/PubSubClusterEndpoint.java index 2074867f6d..5679e04c8c 100644 --- a/src/main/java/io/lettuce/core/cluster/PubSubClusterEndpoint.java +++ b/src/main/java/io/lettuce/core/cluster/PubSubClusterEndpoint.java @@ -88,6 +88,9 @@ protected void notifyListeners(PubSubMessage output) { case unsubscribe: multicast.unsubscribed(clusterNode, output.channel(), output.count()); break; + case ssubscribe: + multicast.ssubscribed(clusterNode, output.channel(), output.count()); + break; default: throw new UnsupportedOperationException("Operation " + output.type() + " not supported"); } @@ -189,6 +192,12 @@ public void punsubscribed(RedisClusterNode node, K pattern, long count) { clusterListeners.forEach(listener -> listener.punsubscribed(node, pattern, count)); } + @Override + public void ssubscribed(RedisClusterNode node, K channel, long count) { + getListeners().forEach(listener -> listener.ssubscribed(channel, count)); + clusterListeners.forEach(listener -> listener.ssubscribed(node, channel, count)); + } + } } diff --git a/src/main/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubAdapter.java b/src/main/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubAdapter.java index fc57e5cc2e..c38e672ee3 100644 --- a/src/main/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubAdapter.java +++ b/src/main/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubAdapter.java @@ -42,4 +42,9 @@ public void punsubscribed(RedisClusterNode node, K pattern, long count) { // empty adapter method } + @Override + public void ssubscribed(RedisClusterNode node, K channel, long count) { + // empty adapter method + } + } diff --git a/src/main/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubListener.java b/src/main/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubListener.java index be12794b12..650dc233ca 100644 --- a/src/main/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubListener.java +++ b/src/main/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubListener.java @@ -67,4 +67,16 @@ public interface RedisClusterPubSubListener { */ void punsubscribed(RedisClusterNode node, K pattern, long count); + /** + * Subscribed to a shard channel. + * + * @param node the {@link RedisClusterNode} from which the {@code message} originates. + * @param shardChannel Shard channel + * @param count Subscription count. + * @since 7.0 + */ + default void ssubscribed(RedisClusterNode node, K shardChannel, long count) { + subscribed(node, shardChannel, count); + } + } diff --git a/src/main/java/io/lettuce/core/cluster/pubsub/api/async/NodeSelectionPubSubAsyncCommands.java b/src/main/java/io/lettuce/core/cluster/pubsub/api/async/NodeSelectionPubSubAsyncCommands.java index 68e1c4b9a7..ee8630378e 100644 --- a/src/main/java/io/lettuce/core/cluster/pubsub/api/async/NodeSelectionPubSubAsyncCommands.java +++ b/src/main/java/io/lettuce/core/cluster/pubsub/api/async/NodeSelectionPubSubAsyncCommands.java @@ -42,4 +42,12 @@ public interface NodeSelectionPubSubAsyncCommands { */ AsyncExecutions unsubscribe(K... channels); + /** + * Listen for messages published to the given shard channels. + * + * @param shardChannels the channels + * @return RedisFuture<Void> Future to synchronize {@code subscribe} completion + * @since 7.0 + */ + AsyncExecutions ssubscribe(K... shardChannels); } diff --git a/src/main/java/io/lettuce/core/cluster/pubsub/api/reactive/NodeSelectionPubSubReactiveCommands.java b/src/main/java/io/lettuce/core/cluster/pubsub/api/reactive/NodeSelectionPubSubReactiveCommands.java index fa53bbfe23..b61395cf2e 100644 --- a/src/main/java/io/lettuce/core/cluster/pubsub/api/reactive/NodeSelectionPubSubReactiveCommands.java +++ b/src/main/java/io/lettuce/core/cluster/pubsub/api/reactive/NodeSelectionPubSubReactiveCommands.java @@ -42,4 +42,13 @@ public interface NodeSelectionPubSubReactiveCommands { */ ReactiveExecutions unsubscribe(K... channels); + /** + * Listen for messages published to the given shard channels. + * + * @param shardCchannels the channels + * @return RedisFuture<Void> Future to synchronize {@code subscribe} completion + * @since 7.0 + */ + ReactiveExecutions ssubscribe(K... shardCchannels); + } diff --git a/src/main/java/io/lettuce/core/cluster/pubsub/api/sync/NodeSelectionPubSubCommands.java b/src/main/java/io/lettuce/core/cluster/pubsub/api/sync/NodeSelectionPubSubCommands.java index 1f1fd1b592..ba7fe76af7 100644 --- a/src/main/java/io/lettuce/core/cluster/pubsub/api/sync/NodeSelectionPubSubCommands.java +++ b/src/main/java/io/lettuce/core/cluster/pubsub/api/sync/NodeSelectionPubSubCommands.java @@ -42,4 +42,13 @@ public interface NodeSelectionPubSubCommands { */ Executions unsubscribe(K... channels); + /** + * Listen for messages published to the given shard channels. + * + * @param shardChannels the channels + * @return Executions Future to synchronize {@code subscribe} completion + * @since 7.0 + */ + Executions ssubscribe(K... shardChannels); + } diff --git a/src/main/java/io/lettuce/core/protocol/CommandType.java b/src/main/java/io/lettuce/core/protocol/CommandType.java index 446007b17d..7a9d87524d 100644 --- a/src/main/java/io/lettuce/core/protocol/CommandType.java +++ b/src/main/java/io/lettuce/core/protocol/CommandType.java @@ -74,7 +74,7 @@ public enum CommandType implements ProtocolKeyword { // Pub/Sub - PSUBSCRIBE, PUBLISH, PUNSUBSCRIBE, SUBSCRIBE, UNSUBSCRIBE, PUBSUB, + PSUBSCRIBE, PUBLISH, PUNSUBSCRIBE, SUBSCRIBE, UNSUBSCRIBE, PUBSUB, SSUBSCRIBE, // Sets diff --git a/src/main/java/io/lettuce/core/pubsub/PubSubCommandBuilder.java b/src/main/java/io/lettuce/core/pubsub/PubSubCommandBuilder.java index 9c1e74870b..c733435cbc 100644 --- a/src/main/java/io/lettuce/core/pubsub/PubSubCommandBuilder.java +++ b/src/main/java/io/lettuce/core/pubsub/PubSubCommandBuilder.java @@ -110,6 +110,14 @@ final Command unsubscribe(K... channels) { return pubSubCommand(UNSUBSCRIBE, new PubSubOutput<>(codec), channels); } + @SafeVarargs + final Command ssubscribe(K... shardChannels) { + LettuceAssert.notEmpty(shardChannels, "Shard channels " + MUST_NOT_BE_EMPTY); + + CommandArgs args = new CommandArgs<>(codec).addKeys(shardChannels); + return createCommand(SSUBSCRIBE, new PubSubOutput<>(codec), args); + } + @SafeVarargs final Command pubSubCommand(CommandType type, CommandOutput output, K... keys) { return new Command<>(type, output, new PubSubCommandArgs<>(codec).addKeys(keys)); diff --git a/src/main/java/io/lettuce/core/pubsub/PubSubCommandHandler.java b/src/main/java/io/lettuce/core/pubsub/PubSubCommandHandler.java index 8b43d602e2..0cb9bd1a55 100644 --- a/src/main/java/io/lettuce/core/pubsub/PubSubCommandHandler.java +++ b/src/main/java/io/lettuce/core/pubsub/PubSubCommandHandler.java @@ -218,6 +218,9 @@ private boolean shouldCompleteCommand(PubSubOutput.Type type, RedisCommand extends DefaultEndpoint { private final Set> channels; + private final Set> shardChannels; + private final Set> patterns; private volatile boolean subscribeWritten = false; @@ -70,6 +73,7 @@ public class PubSubEndpoint extends DefaultEndpoint { ALLOWED_COMMANDS_SUBSCRIBED.add(CommandType.PSUBSCRIBE.name()); ALLOWED_COMMANDS_SUBSCRIBED.add(CommandType.UNSUBSCRIBE.name()); ALLOWED_COMMANDS_SUBSCRIBED.add(CommandType.PUNSUBSCRIBE.name()); + ALLOWED_COMMANDS_SUBSCRIBED.add(CommandType.SSUBSCRIBE.name()); ALLOWED_COMMANDS_SUBSCRIBED.add(CommandType.QUIT.name()); ALLOWED_COMMANDS_SUBSCRIBED.add(CommandType.PING.name()); @@ -77,6 +81,7 @@ public class PubSubEndpoint extends DefaultEndpoint { SUBSCRIBE_COMMANDS.add(CommandType.SUBSCRIBE.name()); SUBSCRIBE_COMMANDS.add(CommandType.PSUBSCRIBE.name()); + SUBSCRIBE_COMMANDS.add(CommandType.SSUBSCRIBE.name()); } /** @@ -91,6 +96,7 @@ public PubSubEndpoint(ClientOptions clientOptions, ClientResources clientResourc this.channels = ConcurrentHashMap.newKeySet(); this.patterns = ConcurrentHashMap.newKeySet(); + this.shardChannels = ConcurrentHashMap.newKeySet(); } /** @@ -257,6 +263,9 @@ protected void notifyListeners(PubSubMessage message) { case unsubscribe: listener.unsubscribed(message.channel(), message.count()); break; + case ssubscribe: + listener.ssubscribed(message.channel(), message.count()); + break; default: throw new UnsupportedOperationException("Operation " + message.type() + " not supported"); } @@ -278,6 +287,9 @@ private void updateInternalState(PubSubMessage message) { case unsubscribe: channels.remove(new Wrapper<>(message.channel())); break; + case ssubscribe: + shardChannels.add(new Wrapper<>(message.channel())); + break; default: break; } diff --git a/src/main/java/io/lettuce/core/pubsub/PubSubOutput.java b/src/main/java/io/lettuce/core/pubsub/PubSubOutput.java index 16309ca69b..1bcb61b7a2 100644 --- a/src/main/java/io/lettuce/core/pubsub/PubSubOutput.java +++ b/src/main/java/io/lettuce/core/pubsub/PubSubOutput.java @@ -38,7 +38,7 @@ public class PubSubOutput extends CommandOutput implements PubSub public enum Type { - message, pmessage, psubscribe, punsubscribe, subscribe, unsubscribe; + message, pmessage, psubscribe, punsubscribe, subscribe, unsubscribe, ssubscribe; private final static Set names = new HashSet<>(); @@ -122,6 +122,7 @@ private void handleOutput(ByteBuffer bytes) { break; case subscribe: case unsubscribe: + case ssubscribe: channel = codec.decodeKey(bytes); break; default: diff --git a/src/main/java/io/lettuce/core/pubsub/RedisPubSubAdapter.java b/src/main/java/io/lettuce/core/pubsub/RedisPubSubAdapter.java index e63dc28796..ef16fed6fb 100644 --- a/src/main/java/io/lettuce/core/pubsub/RedisPubSubAdapter.java +++ b/src/main/java/io/lettuce/core/pubsub/RedisPubSubAdapter.java @@ -59,4 +59,9 @@ public void punsubscribed(K pattern, long count) { // empty adapter method } + @Override + public void ssubscribed(K shardChannel, long count) { + // empty adapter method + } + } diff --git a/src/main/java/io/lettuce/core/pubsub/RedisPubSubAsyncCommandsImpl.java b/src/main/java/io/lettuce/core/pubsub/RedisPubSubAsyncCommandsImpl.java index 9e6752816a..f69f24fbba 100644 --- a/src/main/java/io/lettuce/core/pubsub/RedisPubSubAsyncCommandsImpl.java +++ b/src/main/java/io/lettuce/core/pubsub/RedisPubSubAsyncCommandsImpl.java @@ -100,6 +100,12 @@ public RedisFuture> pubsubShardNumsub(K... shardChannels) { return dispatch(commandBuilder.pubsubShardNumsub(shardChannels)); } + @Override + @SuppressWarnings("unchecked") + public RedisFuture ssubscribe(K... channels) { + return (RedisFuture) dispatch(commandBuilder.ssubscribe(channels)); + } + @Override @SuppressWarnings("unchecked") public StatefulRedisPubSubConnection getStatefulConnection() { diff --git a/src/main/java/io/lettuce/core/pubsub/RedisPubSubListener.java b/src/main/java/io/lettuce/core/pubsub/RedisPubSubListener.java index 2099197bb9..71b5a6ad13 100644 --- a/src/main/java/io/lettuce/core/pubsub/RedisPubSubListener.java +++ b/src/main/java/io/lettuce/core/pubsub/RedisPubSubListener.java @@ -77,4 +77,15 @@ public interface RedisPubSubListener { */ void punsubscribed(K pattern, long count); + /** + * Subscribed to a Shard channel. + * + * @param shardChannel Shard channel + * @param count Subscription count. + * @since 7.0 + */ + default void ssubscribed(K shardChannel, long count) { + subscribed(shardChannel, count); + } + } diff --git a/src/main/java/io/lettuce/core/pubsub/RedisPubSubReactiveCommandsImpl.java b/src/main/java/io/lettuce/core/pubsub/RedisPubSubReactiveCommandsImpl.java index 6120f16657..218fbe9c77 100644 --- a/src/main/java/io/lettuce/core/pubsub/RedisPubSubReactiveCommandsImpl.java +++ b/src/main/java/io/lettuce/core/pubsub/RedisPubSubReactiveCommandsImpl.java @@ -26,6 +26,7 @@ import reactor.core.publisher.Mono; import io.lettuce.core.RedisReactiveCommandsImpl; import io.lettuce.core.codec.RedisCodec; +import io.lettuce.core.protocol.Command; import io.lettuce.core.pubsub.api.reactive.ChannelMessage; import io.lettuce.core.pubsub.api.reactive.PatternMessage; import io.lettuce.core.pubsub.api.reactive.RedisPubSubReactiveCommands; @@ -158,6 +159,11 @@ public Mono> pubsubShardNumsub(K... shardChannels) { return createMono(() -> commandBuilder.pubsubShardNumsub(shardChannels)); } + @Override + public Mono ssubscribe(K... shardChannels) { + return createFlux(() -> commandBuilder.ssubscribe(shardChannels)).then(); + } + @Override @SuppressWarnings("unchecked") public StatefulRedisPubSubConnection getStatefulConnection() { diff --git a/src/main/java/io/lettuce/core/pubsub/api/async/RedisPubSubAsyncCommands.java b/src/main/java/io/lettuce/core/pubsub/api/async/RedisPubSubAsyncCommands.java index 1689639a65..90c301a58e 100644 --- a/src/main/java/io/lettuce/core/pubsub/api/async/RedisPubSubAsyncCommands.java +++ b/src/main/java/io/lettuce/core/pubsub/api/async/RedisPubSubAsyncCommands.java @@ -51,4 +51,13 @@ public interface RedisPubSubAsyncCommands extends RedisAsyncCommands */ StatefulRedisPubSubConnection getStatefulConnection(); + /** + * Listen for messages published to the given shard channels. + * + * @param shardChannels the shard channels + * @return RedisFuture<Void> Future to synchronize {@code subscribe} completion + * @since 7.0 + */ + RedisFuture ssubscribe(K... shardChannels); + } diff --git a/src/main/java/io/lettuce/core/pubsub/api/reactive/RedisPubSubReactiveCommands.java b/src/main/java/io/lettuce/core/pubsub/api/reactive/RedisPubSubReactiveCommands.java index ae5bd54a0e..d6f0476d67 100644 --- a/src/main/java/io/lettuce/core/pubsub/api/reactive/RedisPubSubReactiveCommands.java +++ b/src/main/java/io/lettuce/core/pubsub/api/reactive/RedisPubSubReactiveCommands.java @@ -97,6 +97,16 @@ public interface RedisPubSubReactiveCommands extends RedisReactiveCommands */ Mono unsubscribe(K... channels); + /** + * Listen for messages published to the given shard channels. The {@link Mono} completes without a result as soon as the * + * subscription is registered. + * + * @param shardChannels the channels. + * @return Mono<Void> Mono for {@code subscribe} command. + * @since 7.0 + */ + Mono ssubscribe(K... shardChannels); + /** * @return the underlying connection. * @since 6.2, will be removed with Lettuce 7 to avoid exposing the underlying connection. diff --git a/src/main/java/io/lettuce/core/pubsub/api/sync/RedisPubSubCommands.java b/src/main/java/io/lettuce/core/pubsub/api/sync/RedisPubSubCommands.java index 93d44b35c1..44a59b6c6d 100644 --- a/src/main/java/io/lettuce/core/pubsub/api/sync/RedisPubSubCommands.java +++ b/src/main/java/io/lettuce/core/pubsub/api/sync/RedisPubSubCommands.java @@ -41,6 +41,14 @@ public interface RedisPubSubCommands extends RedisCommands { */ void unsubscribe(K... channels); + /** + * Listen for messages published to the given shard channels. + * + * @param shardChannels the channels + * @since 7.0 + */ + void ssubscribe(K... shardChannels); + /** * @return the underlying connection. */ diff --git a/src/test/java/io/lettuce/core/cluster/ClusterReadOnlyCommandsUnitTests.java b/src/test/java/io/lettuce/core/cluster/ClusterReadOnlyCommandsUnitTests.java index 52da2acbd3..b1f6cad9ff 100644 --- a/src/test/java/io/lettuce/core/cluster/ClusterReadOnlyCommandsUnitTests.java +++ b/src/test/java/io/lettuce/core/cluster/ClusterReadOnlyCommandsUnitTests.java @@ -16,7 +16,7 @@ class ClusterReadOnlyCommandsUnitTests { @Test void testCount() { - assertThat(ClusterReadOnlyCommands.getReadOnlyCommands()).hasSize(85); + assertThat(ClusterReadOnlyCommands.getReadOnlyCommands()).hasSize(86); } @Test @@ -26,4 +26,5 @@ void testResolvableCommandNames() { assertThat(readOnlyCommand.name()).isEqualTo(CommandType.valueOf(readOnlyCommand.name()).name()); } } + } diff --git a/src/test/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubConnectionIntegrationTests.java b/src/test/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubConnectionIntegrationTests.java index 4dee286e7f..3636522574 100644 --- a/src/test/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubConnectionIntegrationTests.java +++ b/src/test/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubConnectionIntegrationTests.java @@ -5,6 +5,7 @@ import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; import javax.inject.Inject; @@ -21,17 +22,21 @@ import io.lettuce.core.cluster.models.partitions.RedisClusterNode; import io.lettuce.core.cluster.pubsub.api.async.NodeSelectionPubSubAsyncCommands; import io.lettuce.core.cluster.pubsub.api.async.PubSubAsyncNodeSelection; +import io.lettuce.core.cluster.pubsub.api.async.RedisClusterPubSubAsyncCommands; import io.lettuce.core.cluster.pubsub.api.reactive.NodeSelectionPubSubReactiveCommands; import io.lettuce.core.cluster.pubsub.api.reactive.PubSubReactiveNodeSelection; import io.lettuce.core.cluster.pubsub.api.sync.NodeSelectionPubSubCommands; import io.lettuce.core.cluster.pubsub.api.sync.PubSubNodeSelection; import io.lettuce.core.event.command.CommandFailedEvent; import io.lettuce.core.event.command.CommandListener; +import io.lettuce.core.internal.LettuceFactories; import io.lettuce.core.pubsub.StatefulRedisPubSubConnection; +import io.lettuce.core.pubsub.api.async.RedisPubSubAsyncCommands; import io.lettuce.core.support.PubSubTestListener; import io.lettuce.test.LettuceExtension; import io.lettuce.test.TestFutures; import io.lettuce.test.Wait; +import io.lettuce.test.condition.EnabledOnCommand; /** * @author Mark Paluch @@ -51,9 +56,17 @@ class RedisClusterPubSubConnectionIntegrationTests extends TestSupport { private StatefulRedisClusterPubSubConnection pubSubConnection2; + BlockingQueue shardChannels; + + String shardChannel = "shard-channel"; + + String shardTestChannel = "shard-test-channel"; + @Inject RedisClusterPubSubConnectionIntegrationTests(RedisClusterClient clusterClient) { this.clusterClient = clusterClient; + shardChannels = LettuceFactories.newBlockingQueue(); + } @BeforeEach @@ -85,7 +98,6 @@ void testRegularClientPubSubChannels() { assertThat(channelsOnOtherNode).isEmpty(); } - @Test void testRegularClientPubSubShardChannels() { @@ -101,6 +113,26 @@ void testRegularClientPubSubShardChannels() { assertThat(channelsOnOtherNode).isEmpty(); } + @Test + @EnabledOnCommand("SSUBSCRIBE") + void subscribeToShardChannel() throws Exception { + pubSubConnection.addListener(connectionListener); + pubSubConnection.async().ssubscribe(shardChannel); + assertThat(connectionListener.getChannels().poll(3, TimeUnit.SECONDS)).isEqualTo(shardChannel); + } + + @Test + @EnabledOnCommand("SSUBSCRIBE") + void subscribeToShardChannelViaOtherEndpoint() throws Exception { + pubSubConnection.addListener(connectionListener); + RedisClusterPubSubAsyncCommands pubSub = pubSubConnection.async(); + String nodeId = pubSub.clusterMyId().get(); + RedisPubSubAsyncCommands other = pubSub + .nodes(node -> node.getRole().isUpstream() && !node.getNodeId().equals(nodeId)).commands(0); + other.ssubscribe(shardChannel); + assertThat(connectionListener.getChannels().poll(3, TimeUnit.SECONDS)).isEqualTo(shardChannel); + } + @Test void myIdWorksAfterDisconnect() throws InterruptedException { diff --git a/src/test/java/io/lettuce/core/pubsub/PubSubCommandBuilderUnitTests.java b/src/test/java/io/lettuce/core/pubsub/PubSubCommandBuilderUnitTests.java index 59ec37bf42..4d4e798187 100644 --- a/src/test/java/io/lettuce/core/pubsub/PubSubCommandBuilderUnitTests.java +++ b/src/test/java/io/lettuce/core/pubsub/PubSubCommandBuilderUnitTests.java @@ -6,6 +6,8 @@ import io.lettuce.core.output.KeyListOutput; import io.lettuce.core.output.MapOutput; import io.lettuce.core.protocol.Command; +import io.lettuce.core.protocol.CommandArgs; + import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -13,11 +15,12 @@ import java.util.Map; import static io.lettuce.core.protocol.CommandType.*; -import static org.junit.jupiter.api.Assertions.*; +import static org.assertj.core.api.Assertions.assertThat; class PubSubCommandBuilderUnitTests { private PubSubCommandBuilder commandBuilder; + private final RedisCodec codec = StringCodec.UTF8; @BeforeEach @@ -31,10 +34,10 @@ void publish() { String message = "message payload"; Command command = this.commandBuilder.publish(channel, message); - assertEquals( PUBLISH, command.getType()); - assertInstanceOf(PubSubCommandArgs.class, command.getArgs()); - assertEquals( "key value", command.getArgs().toCommandString()); - assertInstanceOf(IntegerOutput.class, command.getOutput()); + assertThat(command.getType()).isEqualTo(PUBLISH); + assertThat(command.getArgs()).isInstanceOf(PubSubCommandArgs.class); + assertThat(command.getArgs().toCommandString()).isEqualTo("key value"); + assertThat(command.getOutput()).isInstanceOf(IntegerOutput.class); } @Test @@ -42,10 +45,10 @@ void pubsubChannels() { String pattern = "channelPattern"; Command> command = this.commandBuilder.pubsubChannels(pattern); - assertEquals( PUBSUB, command.getType()); - assertInstanceOf(PubSubCommandArgs.class, command.getArgs()); - assertEquals( "CHANNELS key", command.getArgs().toCommandString()); - assertInstanceOf(KeyListOutput.class, command.getOutput()); + assertThat(command.getType()).isEqualTo(PUBSUB); + assertThat(command.getArgs()).isInstanceOf(PubSubCommandArgs.class); + assertThat(command.getArgs().toCommandString()).isEqualTo("CHANNELS key"); + assertThat(command.getOutput()).isInstanceOf(KeyListOutput.class); } @Test @@ -53,10 +56,10 @@ void pubsubNumsub() { String pattern = "channelPattern"; Command> command = this.commandBuilder.pubsubNumsub(pattern); - assertEquals( PUBSUB, command.getType()); - assertInstanceOf(PubSubCommandArgs.class, command.getArgs()); - assertEquals( "NUMSUB key", command.getArgs().toCommandString()); - assertInstanceOf(MapOutput.class, command.getOutput()); + assertThat(command.getType()).isEqualTo(PUBSUB); + assertThat(command.getArgs()).isInstanceOf(PubSubCommandArgs.class); + assertThat(command.getArgs().toCommandString()).isEqualTo("NUMSUB key"); + assertThat(command.getOutput()).isInstanceOf(MapOutput.class); } @Test @@ -64,10 +67,10 @@ void pubsubShardChannels() { String pattern = "channelPattern"; Command> command = this.commandBuilder.pubsubShardChannels(pattern); - assertEquals( PUBSUB, command.getType()); - assertInstanceOf(PubSubCommandArgs.class, command.getArgs()); - assertEquals( "SHARDCHANNELS key", command.getArgs().toCommandString()); - assertInstanceOf(KeyListOutput.class, command.getOutput()); + assertThat(command.getType()).isEqualTo(PUBSUB); + assertThat(command.getArgs()).isInstanceOf(PubSubCommandArgs.class); + assertThat(command.getArgs().toCommandString()).isEqualTo("SHARDCHANNELS key"); + assertThat(command.getOutput()).isInstanceOf(KeyListOutput.class); } @Test @@ -75,10 +78,10 @@ void pubsubShardNumsub() { String pattern = "channelPattern"; Command> command = this.commandBuilder.pubsubShardNumsub(pattern); - assertEquals( PUBSUB, command.getType()); - assertInstanceOf(PubSubCommandArgs.class, command.getArgs()); - assertEquals( "SHARDNUMSUB key", command.getArgs().toCommandString()); - assertInstanceOf(MapOutput.class, command.getOutput()); + assertThat(command.getType()).isEqualTo(PUBSUB); + assertThat(command.getArgs()).isInstanceOf(PubSubCommandArgs.class); + assertThat(command.getArgs().toCommandString()).isEqualTo("SHARDNUMSUB key"); + assertThat(command.getOutput()).isInstanceOf(MapOutput.class); } @Test @@ -86,10 +89,10 @@ void psubscribe() { String pattern = "channelPattern"; Command command = this.commandBuilder.psubscribe(pattern); - assertEquals( PSUBSCRIBE, command.getType()); - assertInstanceOf(PubSubCommandArgs.class, command.getArgs()); - assertEquals( "key", command.getArgs().toCommandString()); - assertInstanceOf(PubSubOutput.class, command.getOutput()); + assertThat(command.getType()).isEqualTo(PSUBSCRIBE); + assertThat(command.getArgs()).isInstanceOf(PubSubCommandArgs.class); + assertThat(command.getArgs().toCommandString()).isEqualTo("key"); + assertThat(command.getOutput()).isInstanceOf(PubSubOutput.class); } @Test @@ -97,10 +100,10 @@ void punsubscribe() { String pattern = "channelPattern"; Command command = this.commandBuilder.punsubscribe(pattern); - assertEquals( PUNSUBSCRIBE, command.getType()); - assertInstanceOf(PubSubCommandArgs.class, command.getArgs()); - assertEquals( "key", command.getArgs().toCommandString()); - assertInstanceOf(PubSubOutput.class, command.getOutput()); + assertThat(command.getType()).isEqualTo(PUNSUBSCRIBE); + assertThat(command.getArgs()).isInstanceOf(PubSubCommandArgs.class); + assertThat(command.getArgs().toCommandString()).isEqualTo("key"); + assertThat(command.getOutput()).isInstanceOf(PubSubOutput.class); } @Test @@ -108,10 +111,10 @@ void subscribe() { String pattern = "channelPattern"; Command command = this.commandBuilder.subscribe(pattern); - assertEquals( SUBSCRIBE, command.getType()); - assertInstanceOf(PubSubCommandArgs.class, command.getArgs()); - assertEquals( "key", command.getArgs().toCommandString()); - assertInstanceOf(PubSubOutput.class, command.getOutput()); + assertThat(command.getType()).isEqualTo(SUBSCRIBE); + assertThat(command.getArgs()).isInstanceOf(PubSubCommandArgs.class); + assertThat(command.getArgs().toCommandString()).isEqualTo("key"); + assertThat(command.getOutput()).isInstanceOf(PubSubOutput.class); } @Test @@ -119,9 +122,21 @@ void unsubscribe() { String pattern = "channelPattern"; Command command = this.commandBuilder.unsubscribe(pattern); - assertEquals( UNSUBSCRIBE, command.getType()); - assertInstanceOf(PubSubCommandArgs.class, command.getArgs()); - assertEquals( "key", command.getArgs().toCommandString()); - assertInstanceOf(PubSubOutput.class, command.getOutput()); + assertThat(command.getType()).isEqualTo(UNSUBSCRIBE); + assertThat(command.getArgs()).isInstanceOf(PubSubCommandArgs.class); + assertThat(command.getArgs().toCommandString()).isEqualTo("key"); + assertThat(command.getOutput()).isInstanceOf(PubSubOutput.class); + } + + @Test + void ssubscribe() { + String channel = "channelPattern"; + Command command = this.commandBuilder.ssubscribe(channel); + + assertThat(command.getType()).isEqualTo(SSUBSCRIBE); + assertThat(command.getArgs()).isInstanceOf(CommandArgs.class); + assertThat(command.getArgs().toCommandString()).isEqualTo("key"); + assertThat(command.getOutput()).isInstanceOf(PubSubOutput.class); } + } diff --git a/src/test/java/io/lettuce/core/pubsub/PubSubCommandTest.java b/src/test/java/io/lettuce/core/pubsub/PubSubCommandTest.java index 8c934d1c3e..62b9662fd0 100644 --- a/src/test/java/io/lettuce/core/pubsub/PubSubCommandTest.java +++ b/src/test/java/io/lettuce/core/pubsub/PubSubCommandTest.java @@ -42,6 +42,7 @@ import io.lettuce.core.internal.LettuceFactories; import io.lettuce.core.protocol.ProtocolVersion; import io.lettuce.core.pubsub.api.async.RedisPubSubAsyncCommands; +import io.lettuce.core.support.PubSubTestListener; import io.lettuce.test.Delay; import io.lettuce.test.TestFutures; import io.lettuce.test.Wait; @@ -60,21 +61,24 @@ * @author Tihomir Mateev * @author Ali Takavci */ -class PubSubCommandTest extends AbstractRedisClientTest implements RedisPubSubListener { +class PubSubCommandTest extends AbstractRedisClientTest { RedisPubSubAsyncCommands pubsub; - BlockingQueue channels; + PubSubTestListener listener = new PubSubTestListener(); - BlockingQueue patterns; + BlockingQueue channels = listener.getChannels(); - BlockingQueue messages; + BlockingQueue patterns = listener.getPatterns(); - BlockingQueue counts; + BlockingQueue messages = listener.getMessages(); + + BlockingQueue counts = listener.getCounts(); String channel = "channel0"; String shardChannel = "shard-channel"; + private String pattern = "channel*"; String message = "msg!"; @@ -85,12 +89,12 @@ void openPubSubConnection() { client.setOptions(getOptions()); pubsub = client.connectPubSub().async(); - pubsub.getStatefulConnection().addListener(this); + pubsub.getStatefulConnection().addListener(listener); } finally { - channels = LettuceFactories.newBlockingQueue(); - patterns = LettuceFactories.newBlockingQueue(); - messages = LettuceFactories.newBlockingQueue(); - counts = LettuceFactories.newBlockingQueue(); + channels.clear(); + patterns.clear(); + messages.clear(); + counts.clear(); } } @@ -112,7 +116,7 @@ void auth() { client.setOptions( ClientOptions.builder().protocolVersion(ProtocolVersion.RESP2).pingBeforeActivateConnection(false).build()); RedisPubSubAsyncCommands connection = client.connectPubSub().async(); - connection.getStatefulConnection().addListener(PubSubCommandTest.this); + connection.getStatefulConnection().addListener(listener); connection.auth(passwd); connection.subscribe(channel); @@ -128,7 +132,7 @@ void authWithUsername() { client.setOptions( ClientOptions.builder().protocolVersion(ProtocolVersion.RESP2).pingBeforeActivateConnection(false).build()); RedisPubSubAsyncCommands connection = client.connectPubSub().async(); - connection.getStatefulConnection().addListener(PubSubCommandTest.this); + connection.getStatefulConnection().addListener(listener); connection.auth(username, passwd); connection.subscribe(channel); @@ -145,7 +149,7 @@ void authWithReconnect() { ClientOptions.builder().protocolVersion(ProtocolVersion.RESP2).pingBeforeActivateConnection(false).build()); RedisPubSubAsyncCommands connection = client.connectPubSub().async(); - connection.getStatefulConnection().addListener(PubSubCommandTest.this); + connection.getStatefulConnection().addListener(listener); connection.auth(passwd); connection.clientSetname("authWithReconnect"); @@ -174,7 +178,7 @@ void authWithUsernameAndReconnect() { ClientOptions.builder().protocolVersion(ProtocolVersion.RESP2).pingBeforeActivateConnection(false).build()); RedisPubSubAsyncCommands connection = client.connectPubSub().async(); - connection.getStatefulConnection().addListener(PubSubCommandTest.this); + connection.getStatefulConnection().addListener(listener); connection.auth(username, passwd); connection.clientSetname("authWithReconnect"); connection.subscribe(channel).get(); @@ -533,7 +537,7 @@ void removeListener() throws Exception { assertThat(channels.take()).isEqualTo(channel); assertThat(messages.take()).isEqualTo(message); - pubsub.getStatefulConnection().removeListener(this); + pubsub.getStatefulConnection().removeListener(listener); redis.publish(channel, message); assertThat(channels.poll(10, TimeUnit.MILLISECONDS)).isNull(); @@ -549,43 +553,4 @@ void echoAllowedInSubscriptionState() { pubsub.unsubscribe(channel); } - // RedisPubSubListener implementation - - @Override - public void message(String channel, String message) { - channels.add(channel); - messages.add(message); - } - - @Override - public void message(String pattern, String channel, String message) { - patterns.add(pattern); - channels.add(channel); - messages.add(message); - } - - @Override - public void subscribed(String channel, long count) { - channels.add(channel); - counts.add(count); - } - - @Override - public void psubscribed(String pattern, long count) { - patterns.add(pattern); - counts.add(count); - } - - @Override - public void unsubscribed(String channel, long count) { - channels.add(channel); - counts.add(count); - } - - @Override - public void punsubscribed(String pattern, long count) { - patterns.add(pattern); - counts.add(count); - } - } diff --git a/src/test/java/io/lettuce/core/pubsub/PubSubReactiveTest.java b/src/test/java/io/lettuce/core/pubsub/PubSubReactiveTest.java index 0e47031e5e..86a96a2562 100644 --- a/src/test/java/io/lettuce/core/pubsub/PubSubReactiveTest.java +++ b/src/test/java/io/lettuce/core/pubsub/PubSubReactiveTest.java @@ -59,6 +59,7 @@ class PubSubReactiveTest extends AbstractRedisClientTest implements RedisPubSubL private RedisPubSubReactiveCommands pubsub2; private BlockingQueue channels; + private BlockingQueue shardChannels; private BlockingQueue patterns; private BlockingQueue messages; private BlockingQueue counts; @@ -75,6 +76,7 @@ void openPubSubConnection() { pubsub2 = client.connectPubSub().reactive(); pubsub.getStatefulConnection().addListener(this); channels = LettuceFactories.newBlockingQueue(); + shardChannels = LettuceFactories.newBlockingQueue(); patterns = LettuceFactories.newBlockingQueue(); messages = LettuceFactories.newBlockingQueue(); counts = LettuceFactories.newBlockingQueue(); @@ -343,6 +345,13 @@ void unsubscribe() throws Exception { } + @Test + void ssubscribe() throws Exception { + StepVerifier.create(pubsub.ssubscribe(channel)).verifyComplete(); + assertThat(shardChannels.take()).isEqualTo(channel); + assertThat((long) counts.take()).isGreaterThan(0); + } + @Test void pubsubCloseOnClientShutdown() { @@ -493,6 +502,12 @@ public void punsubscribed(String pattern, long count) { counts.add(count); } + @Override + public void ssubscribed(String shardChannel, long count) { + shardChannels.add(shardChannel); + counts.add(count); + } + T block(Mono mono) { return mono.block(); } diff --git a/src/test/java/io/lettuce/core/pubsub/RedisPubSubAsyncCommandsImplUnitTests.java b/src/test/java/io/lettuce/core/pubsub/RedisPubSubAsyncCommandsImplUnitTests.java index a95a73fc4e..8eebf685fa 100644 --- a/src/test/java/io/lettuce/core/pubsub/RedisPubSubAsyncCommandsImplUnitTests.java +++ b/src/test/java/io/lettuce/core/pubsub/RedisPubSubAsyncCommandsImplUnitTests.java @@ -7,7 +7,6 @@ import io.lettuce.core.output.MapOutput; import io.lettuce.core.protocol.AsyncCommand; import io.lettuce.core.protocol.RedisCommand; -import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; @@ -15,8 +14,10 @@ import java.util.concurrent.ExecutionException; import static io.lettuce.core.protocol.CommandType.*; +import static org.assertj.core.api.Assertions.assertThat; import static org.junit.Assert.*; import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.*; class RedisPubSubAsyncCommandsImplUnitTests { @@ -42,9 +43,9 @@ void psubscribe() throws ExecutionException, InterruptedException { ArgumentCaptor capturedCommand = ArgumentCaptor.forClass(AsyncCommand.class);; verify(mockedConnection).dispatch(capturedCommand.capture()); - Assertions.assertEquals( PSUBSCRIBE, capturedCommand.getValue().getType()); + assertThat(capturedCommand.getValue().getType()).isEqualTo(PSUBSCRIBE); assertInstanceOf(PubSubOutput.class, capturedCommand.getValue().getOutput()); - Assertions.assertEquals( "key", capturedCommand.getValue().getArgs().toCommandString()); + assertThat(capturedCommand.getValue().getArgs().toCommandString()).isEqualTo("key"); assertNotEquals(capturedCommand.getValue(), dispachedMock); } @@ -60,9 +61,9 @@ void punsubscribe() throws ExecutionException, InterruptedException { ArgumentCaptor capturedCommand = ArgumentCaptor.forClass(AsyncCommand.class);; verify(mockedConnection).dispatch(capturedCommand.capture()); - Assertions.assertEquals( PUNSUBSCRIBE, capturedCommand.getValue().getType()); + assertThat(capturedCommand.getValue().getType()).isEqualTo(PUNSUBSCRIBE); assertInstanceOf(PubSubOutput.class, capturedCommand.getValue().getOutput()); - Assertions.assertEquals( "key", capturedCommand.getValue().getArgs().toCommandString()); + assertThat(capturedCommand.getValue().getArgs().toCommandString()).isEqualTo("key"); assertNotEquals(capturedCommand.getValue(), dispachedMock); } @@ -78,9 +79,9 @@ void subscribe() throws ExecutionException, InterruptedException { ArgumentCaptor capturedCommand = ArgumentCaptor.forClass(AsyncCommand.class);; verify(mockedConnection).dispatch(capturedCommand.capture()); - Assertions.assertEquals( SUBSCRIBE, capturedCommand.getValue().getType()); + assertThat(capturedCommand.getValue().getType()).isEqualTo(SUBSCRIBE); assertInstanceOf(PubSubOutput.class, capturedCommand.getValue().getOutput()); - Assertions.assertEquals( "key", capturedCommand.getValue().getArgs().toCommandString()); + assertThat(capturedCommand.getValue().getArgs().toCommandString()).isEqualTo("key"); assertNotEquals(capturedCommand.getValue(), dispachedMock); } @@ -96,9 +97,9 @@ void unsubscribe() throws ExecutionException, InterruptedException { ArgumentCaptor capturedCommand = ArgumentCaptor.forClass(AsyncCommand.class);; verify(mockedConnection).dispatch(capturedCommand.capture()); - Assertions.assertEquals( UNSUBSCRIBE, capturedCommand.getValue().getType()); + assertThat(capturedCommand.getValue().getType()).isEqualTo(UNSUBSCRIBE); assertInstanceOf(PubSubOutput.class, capturedCommand.getValue().getOutput()); - Assertions.assertEquals( "key", capturedCommand.getValue().getArgs().toCommandString()); + assertThat(capturedCommand.getValue().getArgs().toCommandString()).isEqualTo("key"); assertNotEquals(capturedCommand.getValue(), dispachedMock); } @@ -116,9 +117,9 @@ void publish() throws ExecutionException, InterruptedException { ArgumentCaptor capturedCommand = ArgumentCaptor.forClass(AsyncCommand.class);; verify(mockedConnection).dispatch(capturedCommand.capture()); - Assertions.assertEquals( PUBLISH, capturedCommand.getValue().getType()); + assertThat(capturedCommand.getValue().getType()).isEqualTo(PUBLISH); assertInstanceOf(IntegerOutput.class, capturedCommand.getValue().getOutput()); - Assertions.assertEquals( "key value", capturedCommand.getValue().getArgs().toCommandString()); + assertThat(capturedCommand.getValue().getArgs().toCommandString()).isEqualTo("key value"); assertNotEquals(capturedCommand.getValue(), dispachedMock); } @@ -135,9 +136,9 @@ void pubsubChannels() throws ExecutionException, InterruptedException { ArgumentCaptor capturedCommand = ArgumentCaptor.forClass(AsyncCommand.class);; verify(mockedConnection).dispatch(capturedCommand.capture()); - Assertions.assertEquals( PUBSUB, capturedCommand.getValue().getType()); + assertThat(capturedCommand.getValue().getType()).isEqualTo(PUBSUB); assertInstanceOf(KeyListOutput.class, capturedCommand.getValue().getOutput()); - Assertions.assertEquals( "CHANNELS key", capturedCommand.getValue().getArgs().toCommandString()); + assertThat(capturedCommand.getValue().getArgs().toCommandString()).isEqualTo("CHANNELS key"); assertNotEquals(capturedCommand.getValue(), dispachedMock); } @@ -154,9 +155,9 @@ void pubsubNumsub() throws ExecutionException, InterruptedException { ArgumentCaptor capturedCommand = ArgumentCaptor.forClass(AsyncCommand.class);; verify(mockedConnection).dispatch(capturedCommand.capture()); - Assertions.assertEquals( PUBSUB, capturedCommand.getValue().getType()); + assertThat(capturedCommand.getValue().getType()).isEqualTo(PUBSUB); assertInstanceOf(MapOutput.class, capturedCommand.getValue().getOutput()); - Assertions.assertEquals( "NUMSUB key", capturedCommand.getValue().getArgs().toCommandString()); + assertThat(capturedCommand.getValue().getArgs().toCommandString()).isEqualTo("NUMSUB key"); assertNotEquals(capturedCommand.getValue(), dispachedMock); } @@ -173,9 +174,9 @@ void pubsubShardChannels() throws ExecutionException, InterruptedException { ArgumentCaptor capturedCommand = ArgumentCaptor.forClass(AsyncCommand.class);; verify(mockedConnection).dispatch(capturedCommand.capture()); - Assertions.assertEquals( PUBSUB, capturedCommand.getValue().getType()); + assertThat(capturedCommand.getValue().getType()).isEqualTo(PUBSUB); assertInstanceOf(KeyListOutput.class, capturedCommand.getValue().getOutput()); - Assertions.assertEquals( "SHARDCHANNELS key", capturedCommand.getValue().getArgs().toCommandString()); + assertThat(capturedCommand.getValue().getArgs().toCommandString()).isEqualTo("SHARDCHANNELS key"); assertNotEquals(capturedCommand.getValue(), dispachedMock); } @@ -192,9 +193,28 @@ void pubsubShardNumsub() throws ExecutionException, InterruptedException { ArgumentCaptor capturedCommand = ArgumentCaptor.forClass(AsyncCommand.class);; verify(mockedConnection).dispatch(capturedCommand.capture()); - Assertions.assertEquals( PUBSUB, capturedCommand.getValue().getType()); + assertThat(capturedCommand.getValue().getType()).isEqualTo(PUBSUB); assertInstanceOf(MapOutput.class, capturedCommand.getValue().getOutput()); - Assertions.assertEquals( "SHARDNUMSUB key", capturedCommand.getValue().getArgs().toCommandString()); + assertThat(capturedCommand.getValue().getArgs().toCommandString()).isEqualTo("SHARDNUMSUB key"); + + assertNotEquals(capturedCommand.getValue(), dispachedMock); + } + + @Test + void ssubscribe() throws ExecutionException, InterruptedException { + String pattern = "channelPattern"; + AsyncCommand dispachedMock = mock(AsyncCommand.class); + when(mockedConnection.dispatch((RedisCommand) any())).thenReturn(dispachedMock); + + commands.ssubscribe(pattern).get(); + + ArgumentCaptor capturedCommand = ArgumentCaptor.forClass(AsyncCommand.class); + + verify(mockedConnection).dispatch(capturedCommand.capture()); + + assertThat(capturedCommand.getValue().getType()).isEqualTo(SSUBSCRIBE); + assertInstanceOf(PubSubOutput.class, capturedCommand.getValue().getOutput()); + assertThat(capturedCommand.getValue().getArgs().toCommandString()).isEqualTo("key"); assertNotEquals(capturedCommand.getValue(), dispachedMock); }