From dd7be346e33167819b192dfdc3bec0052a1cb322 Mon Sep 17 00:00:00 2001 From: atakavci Date: Wed, 24 Apr 2024 09:36:17 +0300 Subject: [PATCH 1/4] implementation of SPUBLISH --- .../core/AbstractRedisAsyncCommands.java | 5 ++ .../core/AbstractRedisReactiveCommands.java | 5 ++ .../io/lettuce/core/RedisCommandBuilder.java | 7 +++ .../api/async/BaseRedisAsyncCommands.java | 11 ++++ .../reactive/BaseRedisReactiveCommands.java | 11 ++++ .../core/api/sync/BaseRedisCommands.java | 10 +++ .../ClusterPubSubConnectionProvider.java | 5 ++ .../core/cluster/PubSubClusterEndpoint.java | 9 +++ .../async/BaseNodeSelectionAsyncCommands.java | 11 ++++ .../api/sync/BaseNodeSelectionCommands.java | 11 ++++ .../pubsub/RedisClusterPubSubAdapter.java | 5 ++ .../pubsub/RedisClusterPubSubListener.java | 12 ++++ .../io/lettuce/core/protocol/CommandType.java | 2 +- .../core/pubsub/PubSubCommandBuilder.java | 5 ++ .../lettuce/core/pubsub/PubSubEndpoint.java | 3 + .../io/lettuce/core/pubsub/PubSubOutput.java | 3 +- .../core/pubsub/RedisPubSubAdapter.java | 5 ++ .../pubsub/RedisPubSubAsyncCommandsImpl.java | 5 ++ .../core/pubsub/RedisPubSubListener.java | 11 ++++ .../RedisPubSubReactiveCommandsImpl.java | 5 ++ .../coroutines/BaseRedisCoroutinesCommands.kt | 11 ++++ .../BaseRedisCoroutinesCommandsImpl.kt | 2 + .../lettuce/core/api/BaseRedisCommands.java | 10 +++ ...usterPubSubConnectionIntegrationTests.java | 63 ++++++++++++++++--- .../core/pubsub/PubSubCommandTest.java | 52 ++++++++++----- .../core/pubsub/PubSubReactiveTest.java | 41 +++++++----- .../core/support/PubSubTestListener.java | 12 ++++ 27 files changed, 292 insertions(+), 40 deletions(-) diff --git a/src/main/java/io/lettuce/core/AbstractRedisAsyncCommands.java b/src/main/java/io/lettuce/core/AbstractRedisAsyncCommands.java index f2c6c625ce..bce4f48930 100644 --- a/src/main/java/io/lettuce/core/AbstractRedisAsyncCommands.java +++ b/src/main/java/io/lettuce/core/AbstractRedisAsyncCommands.java @@ -1582,6 +1582,11 @@ public RedisFuture publish(K channel, V message) { return dispatch(commandBuilder.publish(channel, message)); } + @Override + public RedisFuture spublish(K shardChannel, V message) { + return dispatch(commandBuilder.spublish(shardChannel, message)); + } + @Override public RedisFuture> pubsubChannels() { return dispatch(commandBuilder.pubsubChannels()); diff --git a/src/main/java/io/lettuce/core/AbstractRedisReactiveCommands.java b/src/main/java/io/lettuce/core/AbstractRedisReactiveCommands.java index 47b473495e..d502900e6f 100644 --- a/src/main/java/io/lettuce/core/AbstractRedisReactiveCommands.java +++ b/src/main/java/io/lettuce/core/AbstractRedisReactiveCommands.java @@ -1661,6 +1661,11 @@ public Mono publish(K channel, V message) { return createMono(() -> commandBuilder.publish(channel, message)); } + @Override + public Mono spublish(K shardChannel, V message) { + return createMono(() -> commandBuilder.spublish(shardChannel, message)); + } + @Override public Flux pubsubChannels() { return createDissolvingFlux(commandBuilder::pubsubChannels); diff --git a/src/main/java/io/lettuce/core/RedisCommandBuilder.java b/src/main/java/io/lettuce/core/RedisCommandBuilder.java index 3d3d2be0be..d074863e5e 100644 --- a/src/main/java/io/lettuce/core/RedisCommandBuilder.java +++ b/src/main/java/io/lettuce/core/RedisCommandBuilder.java @@ -2156,6 +2156,13 @@ Command publish(K channel, V message) { return createCommand(PUBLISH, new IntegerOutput<>(codec), args); } + Command spublish(K shardChannel, V message) { + LettuceAssert.notNull(shardChannel, "ShardChannel " + MUST_NOT_BE_NULL); + + CommandArgs args = new CommandArgs<>(codec).addKey(shardChannel).addValue(message); + return createCommand(SPUBLISH, new IntegerOutput<>(codec), args); + } + Command> pubsubChannels() { CommandArgs args = new CommandArgs<>(codec).add(CHANNELS); return createCommand(PUBSUB, new KeyListOutput<>(codec), args); diff --git a/src/main/java/io/lettuce/core/api/async/BaseRedisAsyncCommands.java b/src/main/java/io/lettuce/core/api/async/BaseRedisAsyncCommands.java index c4eb965a80..7fd3d11796 100644 --- a/src/main/java/io/lettuce/core/api/async/BaseRedisAsyncCommands.java +++ b/src/main/java/io/lettuce/core/api/async/BaseRedisAsyncCommands.java @@ -48,6 +48,17 @@ public interface BaseRedisAsyncCommands { */ RedisFuture publish(K channel, V message); + /** + * Post a message to a shard channel. + * + * @param shardChannel the shard channel type: key. + * @param message the message type: value. + * @return Long integer-reply the number of clients that received the message. + * @since 7.0 + */ + RedisFuture spublish(K shardChannel, V message); + + /** * Lists the currently *active channels*. * diff --git a/src/main/java/io/lettuce/core/api/reactive/BaseRedisReactiveCommands.java b/src/main/java/io/lettuce/core/api/reactive/BaseRedisReactiveCommands.java index 051340ea99..f05b786044 100644 --- a/src/main/java/io/lettuce/core/api/reactive/BaseRedisReactiveCommands.java +++ b/src/main/java/io/lettuce/core/api/reactive/BaseRedisReactiveCommands.java @@ -48,6 +48,17 @@ public interface BaseRedisReactiveCommands { */ Mono publish(K channel, V message); + /** + * Post a message to a shard channel. + * + * @param shardChannel the shard channel type: key. + * @param message the message type: value. + * @return Long integer-reply the number of clients that received the message. + * @since 7.0 + */ + Mono spublish(K shardChannel, V message); + + /** * Lists the currently *active channels*. * diff --git a/src/main/java/io/lettuce/core/api/sync/BaseRedisCommands.java b/src/main/java/io/lettuce/core/api/sync/BaseRedisCommands.java index b0ebe918be..5111d4f01d 100644 --- a/src/main/java/io/lettuce/core/api/sync/BaseRedisCommands.java +++ b/src/main/java/io/lettuce/core/api/sync/BaseRedisCommands.java @@ -47,6 +47,16 @@ public interface BaseRedisCommands { */ Long publish(K channel, V message); + /** + * Post a message to a shard channel. + * + * @param shardChannel the shard channel type: key. + * @param message the message type: value. + * @return Long integer-reply the number of clients that received the message. + * @since 7.0 + */ + Long spublish(K shardChannel, V message); + /** * Lists the currently *active channels*. * diff --git a/src/main/java/io/lettuce/core/cluster/ClusterPubSubConnectionProvider.java b/src/main/java/io/lettuce/core/cluster/ClusterPubSubConnectionProvider.java index ce5abb67e0..747018ca60 100644 --- a/src/main/java/io/lettuce/core/cluster/ClusterPubSubConnectionProvider.java +++ b/src/main/java/io/lettuce/core/cluster/ClusterPubSubConnectionProvider.java @@ -167,6 +167,11 @@ public void punsubscribed(K pattern, long count) { notifications.punsubscribed(getNode(), pattern, count); } + @Override + public void smessage(K shardChannel, V message) { + notifications.smessage(getNode(), shardChannel, message); + } + @Override public void ssubscribed(K channel, long count) { notifications.ssubscribed(getNode(), channel, count); diff --git a/src/main/java/io/lettuce/core/cluster/PubSubClusterEndpoint.java b/src/main/java/io/lettuce/core/cluster/PubSubClusterEndpoint.java index 5679e04c8c..9288dd8f85 100644 --- a/src/main/java/io/lettuce/core/cluster/PubSubClusterEndpoint.java +++ b/src/main/java/io/lettuce/core/cluster/PubSubClusterEndpoint.java @@ -88,6 +88,9 @@ protected void notifyListeners(PubSubMessage output) { case unsubscribe: multicast.unsubscribed(clusterNode, output.channel(), output.count()); break; + case smessage: + multicast.smessage(clusterNode, output.channel(), output.body()); + break; case ssubscribe: multicast.ssubscribed(clusterNode, output.channel(), output.count()); break; @@ -192,6 +195,12 @@ public void punsubscribed(RedisClusterNode node, K pattern, long count) { clusterListeners.forEach(listener -> listener.punsubscribed(node, pattern, count)); } + @Override + public void smessage(RedisClusterNode node, K shardChannel, V message) { + getListeners().forEach(listener -> listener.smessage(shardChannel, message)); + clusterListeners.forEach(listener -> listener.smessage(node, shardChannel, message)); + } + @Override public void ssubscribed(RedisClusterNode node, K channel, long count) { getListeners().forEach(listener -> listener.ssubscribed(channel, count)); diff --git a/src/main/java/io/lettuce/core/cluster/api/async/BaseNodeSelectionAsyncCommands.java b/src/main/java/io/lettuce/core/cluster/api/async/BaseNodeSelectionAsyncCommands.java index 059ddfba48..edf287c7f0 100644 --- a/src/main/java/io/lettuce/core/cluster/api/async/BaseNodeSelectionAsyncCommands.java +++ b/src/main/java/io/lettuce/core/cluster/api/async/BaseNodeSelectionAsyncCommands.java @@ -48,6 +48,17 @@ public interface BaseNodeSelectionAsyncCommands { */ AsyncExecutions publish(K channel, V message); + /** + * Post a message to a shard channel. + * + * @param shardChannel the shard channel type: key. + * @param message the message type: value. + * @return Long integer-reply the number of clients that received the message. + * @since 7.0 + */ + AsyncExecutions spublish(K shardChannel, V message); + + /** * Lists the currently *active channels*. * diff --git a/src/main/java/io/lettuce/core/cluster/api/sync/BaseNodeSelectionCommands.java b/src/main/java/io/lettuce/core/cluster/api/sync/BaseNodeSelectionCommands.java index e9072268d4..0a57158a58 100644 --- a/src/main/java/io/lettuce/core/cluster/api/sync/BaseNodeSelectionCommands.java +++ b/src/main/java/io/lettuce/core/cluster/api/sync/BaseNodeSelectionCommands.java @@ -43,6 +43,17 @@ public interface BaseNodeSelectionCommands { */ Executions publish(K channel, V message); + /** + * Post a message to a shard channel. + * + * @param shardChannel the shard channel type: key. + * @param message the message type: value. + * @return Long integer-reply the number of clients that received the message. + * @since 7.0 + */ + Executions spublish(K shardChannel, V message); + + /** * Lists the currently *active channels*. * diff --git a/src/main/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubAdapter.java b/src/main/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubAdapter.java index c38e672ee3..554efa3009 100644 --- a/src/main/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubAdapter.java +++ b/src/main/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubAdapter.java @@ -42,6 +42,11 @@ public void punsubscribed(RedisClusterNode node, K pattern, long count) { // empty adapter method } + @Override + public void smessage(RedisClusterNode node, K shardChannel, V message) { + // empty adapter method + } + @Override public void ssubscribed(RedisClusterNode node, K channel, long count) { // empty adapter method diff --git a/src/main/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubListener.java b/src/main/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubListener.java index 650dc233ca..482453df7c 100644 --- a/src/main/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubListener.java +++ b/src/main/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubListener.java @@ -67,6 +67,18 @@ public interface RedisClusterPubSubListener { */ void punsubscribed(RedisClusterNode node, K pattern, long count); + /** + * Message received from a shard channel subscription. + * + * @param node the {@link RedisClusterNode} from which the {@code message} originates. + * @param shardChannel shard channel. + * @param message Message. + * @since 7.0 + */ + default void smessage(RedisClusterNode node, K shardChannel, V message){ + message(node, shardChannel, message); + } + /** * Subscribed to a shard channel. * diff --git a/src/main/java/io/lettuce/core/protocol/CommandType.java b/src/main/java/io/lettuce/core/protocol/CommandType.java index 7a9d87524d..b92eaf2519 100644 --- a/src/main/java/io/lettuce/core/protocol/CommandType.java +++ b/src/main/java/io/lettuce/core/protocol/CommandType.java @@ -74,7 +74,7 @@ public enum CommandType implements ProtocolKeyword { // Pub/Sub - PSUBSCRIBE, PUBLISH, PUNSUBSCRIBE, SUBSCRIBE, UNSUBSCRIBE, PUBSUB, SSUBSCRIBE, + PSUBSCRIBE, PUBLISH, PUNSUBSCRIBE, SUBSCRIBE, UNSUBSCRIBE, PUBSUB, SSUBSCRIBE, SPUBLISH, // Sets diff --git a/src/main/java/io/lettuce/core/pubsub/PubSubCommandBuilder.java b/src/main/java/io/lettuce/core/pubsub/PubSubCommandBuilder.java index c733435cbc..391149bbb8 100644 --- a/src/main/java/io/lettuce/core/pubsub/PubSubCommandBuilder.java +++ b/src/main/java/io/lettuce/core/pubsub/PubSubCommandBuilder.java @@ -58,6 +58,11 @@ Command publish(K channel, V message) { return createCommand(PUBLISH, new IntegerOutput<>(codec), args); } + Command spublish(K shardChannel, V message) { + CommandArgs args = new CommandArgs<>(codec).addKey(shardChannel).addValue(message); + return createCommand(SPUBLISH, new IntegerOutput<>(codec), args); + } + Command> pubsubChannels(K pattern) { CommandArgs args = new PubSubCommandArgs<>(codec).add(CHANNELS).addKey(pattern); return createCommand(PUBSUB, new KeyListOutput<>(codec), args); diff --git a/src/main/java/io/lettuce/core/pubsub/PubSubEndpoint.java b/src/main/java/io/lettuce/core/pubsub/PubSubEndpoint.java index febb31f2c0..6b6b267fe2 100644 --- a/src/main/java/io/lettuce/core/pubsub/PubSubEndpoint.java +++ b/src/main/java/io/lettuce/core/pubsub/PubSubEndpoint.java @@ -263,6 +263,9 @@ protected void notifyListeners(PubSubMessage message) { case unsubscribe: listener.unsubscribed(message.channel(), message.count()); break; + case smessage: + listener.smessage(message.channel(), message.body()); + break; case ssubscribe: listener.ssubscribed(message.channel(), message.count()); break; diff --git a/src/main/java/io/lettuce/core/pubsub/PubSubOutput.java b/src/main/java/io/lettuce/core/pubsub/PubSubOutput.java index 1bcb61b7a2..18c19ccddb 100644 --- a/src/main/java/io/lettuce/core/pubsub/PubSubOutput.java +++ b/src/main/java/io/lettuce/core/pubsub/PubSubOutput.java @@ -38,7 +38,7 @@ public class PubSubOutput extends CommandOutput implements PubSub public enum Type { - message, pmessage, psubscribe, punsubscribe, subscribe, unsubscribe, ssubscribe; + message, pmessage, psubscribe, punsubscribe, subscribe, unsubscribe, ssubscribe, smessage; private final static Set names = new HashSet<>(); @@ -108,6 +108,7 @@ private void handleOutput(ByteBuffer bytes) { pattern = codec.decodeKey(bytes); break; } + case smessage: case message: if (channel == null) { channel = codec.decodeKey(bytes); diff --git a/src/main/java/io/lettuce/core/pubsub/RedisPubSubAdapter.java b/src/main/java/io/lettuce/core/pubsub/RedisPubSubAdapter.java index ef16fed6fb..828a85743f 100644 --- a/src/main/java/io/lettuce/core/pubsub/RedisPubSubAdapter.java +++ b/src/main/java/io/lettuce/core/pubsub/RedisPubSubAdapter.java @@ -59,6 +59,11 @@ public void punsubscribed(K pattern, long count) { // empty adapter method } + @Override + public void smessage(K shardChannel, V message) { + // empty adapter method + } + @Override public void ssubscribed(K shardChannel, long count) { // empty adapter method diff --git a/src/main/java/io/lettuce/core/pubsub/RedisPubSubAsyncCommandsImpl.java b/src/main/java/io/lettuce/core/pubsub/RedisPubSubAsyncCommandsImpl.java index f69f24fbba..52c3f6e312 100644 --- a/src/main/java/io/lettuce/core/pubsub/RedisPubSubAsyncCommandsImpl.java +++ b/src/main/java/io/lettuce/core/pubsub/RedisPubSubAsyncCommandsImpl.java @@ -80,6 +80,11 @@ public RedisFuture publish(K channel, V message) { return dispatch(commandBuilder.publish(channel, message)); } + @Override + public RedisFuture spublish(K shardChannel, V message) { + return dispatch(commandBuilder.spublish(shardChannel, message)); + } + @Override public RedisFuture> pubsubChannels(K channel) { return dispatch(commandBuilder.pubsubChannels(channel)); diff --git a/src/main/java/io/lettuce/core/pubsub/RedisPubSubListener.java b/src/main/java/io/lettuce/core/pubsub/RedisPubSubListener.java index 71b5a6ad13..ccc1f0e1e1 100644 --- a/src/main/java/io/lettuce/core/pubsub/RedisPubSubListener.java +++ b/src/main/java/io/lettuce/core/pubsub/RedisPubSubListener.java @@ -88,4 +88,15 @@ default void ssubscribed(K shardChannel, long count) { subscribed(shardChannel, count); } + /** + * Message received from a shard channel subscription. + * + * @param shardChannel shard channel. + * @param message Message. + * @since 7.0 + */ + default void smessage(K shardChannel, V message) { + message(shardChannel, message); + } + } diff --git a/src/main/java/io/lettuce/core/pubsub/RedisPubSubReactiveCommandsImpl.java b/src/main/java/io/lettuce/core/pubsub/RedisPubSubReactiveCommandsImpl.java index 218fbe9c77..46ff64b274 100644 --- a/src/main/java/io/lettuce/core/pubsub/RedisPubSubReactiveCommandsImpl.java +++ b/src/main/java/io/lettuce/core/pubsub/RedisPubSubReactiveCommandsImpl.java @@ -139,6 +139,11 @@ public Mono publish(K channel, V message) { return createMono(() -> commandBuilder.publish(channel, message)); } + @Override + public Mono spublish(K shardChannel, V message) { + return createMono(() -> commandBuilder.publish(shardChannel, message)); + } + @Override public Flux pubsubChannels(K channel) { return createDissolvingFlux(() -> commandBuilder.pubsubChannels(channel)); diff --git a/src/main/kotlin/io/lettuce/core/api/coroutines/BaseRedisCoroutinesCommands.kt b/src/main/kotlin/io/lettuce/core/api/coroutines/BaseRedisCoroutinesCommands.kt index 37ba6c789f..163560a3a7 100644 --- a/src/main/kotlin/io/lettuce/core/api/coroutines/BaseRedisCoroutinesCommands.kt +++ b/src/main/kotlin/io/lettuce/core/api/coroutines/BaseRedisCoroutinesCommands.kt @@ -48,6 +48,17 @@ interface BaseRedisCoroutinesCommands { */ suspend fun publish(channel: K, message: V): Long? + /** + * Post a message to a shard channel. + * + * @param shardChannel the shard channel type: key. + * @param message the message type: value. + * @return Long integer-reply the number of clients that received the message. + * @since 7.0 + */ + suspend fun spublish(shardChannel: K, message: V): Long? + + /** * Lists the currently *active channels*. * diff --git a/src/main/kotlin/io/lettuce/core/api/coroutines/BaseRedisCoroutinesCommandsImpl.kt b/src/main/kotlin/io/lettuce/core/api/coroutines/BaseRedisCoroutinesCommandsImpl.kt index 910fae9317..fd88019a5f 100644 --- a/src/main/kotlin/io/lettuce/core/api/coroutines/BaseRedisCoroutinesCommandsImpl.kt +++ b/src/main/kotlin/io/lettuce/core/api/coroutines/BaseRedisCoroutinesCommandsImpl.kt @@ -46,6 +46,8 @@ internal class BaseRedisCoroutinesCommandsImpl(internal val op override suspend fun publish(channel: K, message: V): Long? = ops.publish(channel, message).awaitFirstOrNull() + override suspend fun spublish(shardChannel: K, message: V): Long? = ops.spublish(shardChannel, message).awaitFirstOrNull() + override suspend fun pubsubChannels(): List = ops.pubsubChannels().asFlow().toList() override suspend fun pubsubChannels(channel: K): List = ops.pubsubChannels(channel).asFlow().toList() diff --git a/src/main/templates/io/lettuce/core/api/BaseRedisCommands.java b/src/main/templates/io/lettuce/core/api/BaseRedisCommands.java index 40d5165f16..ae42efb578 100644 --- a/src/main/templates/io/lettuce/core/api/BaseRedisCommands.java +++ b/src/main/templates/io/lettuce/core/api/BaseRedisCommands.java @@ -48,6 +48,16 @@ public interface BaseRedisCommands { */ Long publish(K channel, V message); + /** + * Post a message to a shard channel. + * + * @param shardChannel the shard channel type: key. + * @param message the message type: value. + * @return Long integer-reply the number of clients that received the message. + * @since 7.0 + */ + Long spublish(K shardChannel, V message); + /** * Lists the currently *active channels*. * diff --git a/src/test/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubConnectionIntegrationTests.java b/src/test/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubConnectionIntegrationTests.java index c4a808fad9..def4550369 100644 --- a/src/test/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubConnectionIntegrationTests.java +++ b/src/test/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubConnectionIntegrationTests.java @@ -15,6 +15,7 @@ import org.junit.jupiter.api.extension.ExtendWith; import io.lettuce.core.RedisURI; +import io.lettuce.core.StatefulRedisConnectionImpl; import io.lettuce.core.TestSupport; import io.lettuce.core.api.sync.RedisCommands; import io.lettuce.core.cluster.RedisClusterClient; @@ -34,6 +35,7 @@ import io.lettuce.test.TestFutures; import io.lettuce.test.Wait; import io.lettuce.test.condition.EnabledOnCommand; +import io.lettuce.test.resource.DefaultRedisClusterClient; /** * Integration tests for Cluster Pub/Sub. @@ -57,6 +59,10 @@ class RedisClusterPubSubConnectionIntegrationTests extends TestSupport { String shardChannel = "shard-channel"; + String shardMessage = "shard msg!"; + + String shardTestChannel = "shard-test-channel"; + @Inject RedisClusterPubSubConnectionIntegrationTests(RedisClusterClient clusterClient) { this.clusterClient = clusterClient; @@ -94,15 +100,16 @@ void testRegularClientPubSubChannels() { } @Test + @EnabledOnCommand("SSUBSCRIBE") void testRegularClientPubSubShardChannels() { pubSubConnection.sync().ssubscribe(shardChannel); Integer clusterKeyslot = connection.sync().clusterKeyslot(shardChannel).intValue(); - RedisCommands rightSlot = - connection.sync().nodes(node -> node.getSlots().contains(clusterKeyslot)).commands(0); - RedisCommands wrongSlot = - connection.sync().nodes(node -> !node.getSlots().contains(clusterKeyslot)).commands(0); + RedisCommands rightSlot = connection.sync().nodes(node -> node.getSlots().contains(clusterKeyslot)) + .commands(0); + RedisCommands wrongSlot = connection.sync().nodes(node -> !node.getSlots().contains(clusterKeyslot)) + .commands(0); List channelsOnSubscribedNode = rightSlot.pubsubShardChannels(); assertThat(channelsOnSubscribedNode).hasSize(1); @@ -113,7 +120,7 @@ void testRegularClientPubSubShardChannels() { @Test @EnabledOnCommand("SSUBSCRIBE") - void subscribeToShardChannel(){ + void subscribeToShardChannel() { pubSubConnection.sync().ssubscribe(shardChannel); Wait.untilEquals(1L, connectionListener.getShardCounts()::poll).waitOrTimeout(); @@ -126,13 +133,55 @@ void subscribeToShardChannelViaReplica() { int clusterKeyslot = connection.sync().clusterKeyslot(shardChannel).intValue(); String thisNode = connection.getPartitions().getPartitionBySlot(clusterKeyslot).getNodeId(); - RedisPubSubAsyncCommands replica = - pubSubConnection.async().nodes(node -> thisNode.equals(node.getSlaveOf())).commands(0); + RedisPubSubAsyncCommands replica = pubSubConnection.async() + .nodes(node -> thisNode.equals(node.getSlaveOf())).commands(0); replica.ssubscribe(shardChannel); Wait.untilEquals(shardChannel, connectionListener.getShardChannels()::poll).waitOrTimeout(); } + @Test + @EnabledOnCommand("SSUBSCRIBE") + void publishToShardChannel() throws Exception { + pubSubConnection.addListener(connectionListener); + pubSubConnection.async().ssubscribe(shardChannel); + Wait.untilEquals(shardChannel, connectionListener.getShardChannels()::poll).waitOrTimeout(); + + pubSubConnection.async().spublish(shardChannel, shardMessage); + Wait.untilEquals(shardChannel, connectionListener.getShardChannels()::poll).waitOrTimeout(); + Wait.untilEquals(shardMessage, connectionListener.getMessages()::poll).waitOrTimeout(); + } + + @Test + @EnabledOnCommand("SSUBSCRIBE") + void publishToShardChannelViaDifferentEndpoints() throws Exception { + pubSubConnection.addListener(connectionListener); + pubSubConnection.async().ssubscribe(shardChannel); + Wait.untilEquals(shardChannel, connectionListener.getShardChannels()::poll).waitOrTimeout(); + + pubSubConnection.async().ssubscribe(shardTestChannel); + Wait.untilEquals(shardTestChannel, connectionListener.getShardChannels()::poll).waitOrTimeout(); + + pubSubConnection.async().spublish(shardChannel, shardMessage); + Wait.untilEquals(shardChannel, connectionListener.getShardChannels()::poll).waitOrTimeout(); + Wait.untilEquals(shardMessage, connectionListener.getMessages()::poll).waitOrTimeout(); + + pubSubConnection.async().spublish(shardTestChannel, shardMessage); + Wait.untilEquals(shardTestChannel, connectionListener.getShardChannels()::poll).waitOrTimeout(); + Wait.untilEquals(shardMessage, connectionListener.getMessages()::poll).waitOrTimeout(); + } + + @Test + @EnabledOnCommand("SSUBSCRIBE") + void publishToShardChannelViaNewClient() throws Exception { + pubSubConnection.addListener(connectionListener); + pubSubConnection.async().ssubscribe(shardChannel); + + DefaultRedisClusterClient.get().connectPubSub().async().spublish(shardChannel, shardMessage); + Wait.untilEquals(shardChannel, connectionListener.getShardChannels()::poll).waitOrTimeout(); + Wait.untilEquals(shardMessage, connectionListener.getMessages()::poll).waitOrTimeout(); + } + @Test void myIdWorksAfterDisconnect() throws InterruptedException { diff --git a/src/test/java/io/lettuce/core/pubsub/PubSubCommandTest.java b/src/test/java/io/lettuce/core/pubsub/PubSubCommandTest.java index 5fe358def3..c6fd838050 100644 --- a/src/test/java/io/lettuce/core/pubsub/PubSubCommandTest.java +++ b/src/test/java/io/lettuce/core/pubsub/PubSubCommandTest.java @@ -32,6 +32,7 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; +import org.assertj.core.util.Arrays; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -53,6 +54,7 @@ import io.lettuce.test.Wait; import io.lettuce.test.WithPassword; import io.lettuce.test.condition.EnabledOnCommand; +import io.lettuce.test.resource.DefaultRedisClient; import io.lettuce.test.resource.FastShutdown; import io.lettuce.test.resource.TestClientResources; @@ -74,6 +76,8 @@ class PubSubCommandTest extends AbstractRedisClientTest { BlockingQueue channels = listener.getChannels(); + BlockingQueue shardChannels = listener.getShardChannels(); + BlockingQueue patterns = listener.getPatterns(); BlockingQueue messages = listener.getMessages(); @@ -88,6 +92,8 @@ class PubSubCommandTest extends AbstractRedisClientTest { String message = "msg!"; + String shardMessage = "shard msg!"; + @BeforeEach void openPubSubConnection() { try { @@ -221,6 +227,29 @@ void message() throws Exception { assertThat(messages.take()).isEqualTo(message); } + @Test + @EnabledOnCommand("SSUBSCRIBE") + void messageToShardChannel() throws Exception { + pubsub.ssubscribe(shardChannel); + Wait.untilEquals(shardChannel, shardChannels::poll).waitOrTimeout(); + + redis.spublish(shardChannel, shardMessage); + Wait.untilEquals(shardChannel, shardChannels::poll).waitOrTimeout(); + Wait.untilEquals(shardMessage, messages::poll).waitOrTimeout(); + } + + @Test + @EnabledOnCommand("SSUBSCRIBE") + void messageToShardChannelViaNewClient() throws Exception { + pubsub.ssubscribe(shardChannel); + Wait.untilEquals(shardChannel, shardChannels::poll).waitOrTimeout(); + + RedisPubSubAsyncCommands redis = DefaultRedisClient.get().connectPubSub().async(); + redis.spublish(shardChannel, shardMessage); + Wait.untilEquals(shardMessage, messages::poll).waitOrTimeout(); + Wait.untilEquals(shardChannel, shardChannels::poll).waitOrTimeout(); + } + @Test @EnabledOnCommand("ACL") void messageAsPushMessage() throws Exception { @@ -362,40 +391,35 @@ void pubsubNumsub() { @Test @EnabledOnCommand("SPUBLISH") void pubsubShardChannels() { - /// TODO : uncomment after SSUBSCRIBE is implemented - // TestFutures.awaitOrTimeout(pubsub.ssubscribe(channel)); + TestFutures.awaitOrTimeout(pubsub.ssubscribe(shardChannel)); List result = redis.pubsubShardChannels(); - // assertThat(result).contains(channel); + assertThat(result).contains(shardChannel); } @Test @EnabledOnCommand("SPUBLISH") void pubsubMultipleShardChannels() { - /// TODO : uncomment after SSUBSCRIBE is implemented - // TestFutures.awaitOrTimeout(pubsub.ssubscribe(channel, "channel1", "channel3")); + TestFutures.awaitOrTimeout(pubsub.ssubscribe(shardChannel, "channel1", "channel3")); List result = redis.pubsubShardChannels(); - // assertThat(result).contains(channel, "channel1", "channel3"); + assertThat(result).contains(shardChannel, "channel1", "channel3"); } @Test @EnabledOnCommand("SPUBLISH") void pubsubShardChannelsWithArg() { - /// TODO : uncomment after SSUBSCRIBE is implemented - // TestFutures.awaitOrTimeout(pubsub.ssubscribe(channel)); - List result = redis.pubsubShardChannels(pattern); - // assertThat(result).contains(channel); + TestFutures.awaitOrTimeout(pubsub.ssubscribe(shardChannel)); + List result = redis.pubsubShardChannels(shardChannel); + assertThat(result).contains(shardChannel); } @Test @EnabledOnCommand("SPUBLISH") void pubsubShardNumsub() { - // TODO After we have SSUBSCRIBE implement a step to subscribe to a shard channel - // Depends on https://github.com/lettuce-io/lettuce-core/issues/2758 + TestFutures.awaitOrTimeout(pubsub.ssubscribe(shardChannel)); Map result = redis.pubsubShardNumsub(shardChannel); - assertThat(result.getOrDefault(shardChannel, 0L)).isEqualTo(0); - // TODO verify that the channel from step 1 is the one returned by the command + assertThat(result.keySet()).contains(shardChannel); } @Test diff --git a/src/test/java/io/lettuce/core/pubsub/PubSubReactiveTest.java b/src/test/java/io/lettuce/core/pubsub/PubSubReactiveTest.java index 86a96a2562..2c93374fa9 100644 --- a/src/test/java/io/lettuce/core/pubsub/PubSubReactiveTest.java +++ b/src/test/java/io/lettuce/core/pubsub/PubSubReactiveTest.java @@ -56,17 +56,25 @@ class PubSubReactiveTest extends AbstractRedisClientTest implements RedisPubSubListener { private RedisPubSubReactiveCommands pubsub; + private RedisPubSubReactiveCommands pubsub2; private BlockingQueue channels; + private BlockingQueue shardChannels; + private BlockingQueue patterns; + private BlockingQueue messages; + private BlockingQueue counts; private String shardChannel = "shard-channel"; + private String channel = "channel0"; + private String pattern = "channel*"; + private String message = "msg!"; @BeforeEach @@ -262,45 +270,40 @@ void pubsubNumsub() { @Test @EnabledOnCommand("SPUBLISH") void pubsubShardChannels() { - /// TODO : uncomment after SSUBSCRIBE is implemented - // block(pubsub.ssubscribe(channel)); + block(pubsub.ssubscribe(shardChannel)); List result = block(pubsub2.pubsubShardChannels().collectList()); - // assertThat(result).contains(channel); + assertThat(result).contains(shardChannel); } @Test @EnabledOnCommand("SPUBLISH") void pubsubShardMultipleChannels() { - /// TODO : uncomment after SSUBSCRIBE is implemented - // StepVerifier.create(pubsub.ssubscribe(channel, "channel1", "channel3")).verifyComplete(); + StepVerifier.create(pubsub.ssubscribe(shardChannel, "channel1", "channel3")).verifyComplete(); - // StepVerifier.create(pubsub2.pubsubShardChannels().collectList()) - // .consumeNextWith(actual -> assertThat(actual).contains(channel, "channel1", "channel3")).verifyComplete(); + StepVerifier.create(pubsub2.pubsubShardChannels().collectList()) + .consumeNextWith(actual -> assertThat(actual).contains(shardChannel, "channel1", "channel3")).verifyComplete(); } @Test @EnabledOnCommand("SPUBLISH") void pubsubShardChannelsWithArg() { - /// TODO : uncomment after SSUBSCRIBE is implemented - // StepVerifier.create(pubsub.ssubscribe(channel)).verifyComplete(); - // Wait.untilTrue(() -> mono(pubsub2.pubsubShardChannels(pattern).filter(s -> channel.equals(s))) != null).waitOrTimeout(); + StepVerifier.create(pubsub.ssubscribe(shardChannel)).verifyComplete(); + Wait.untilTrue(() -> mono(pubsub2.pubsubShardChannels(shardChannel).filter(s -> shardChannel.equals(s))) != null) + .waitOrTimeout(); - String result = mono(pubsub2.pubsubShardChannels(pattern).filter(s -> channel.equals(s))); - // assertThat(result).isEqualToIgnoringCase(channel); + String result = mono(pubsub2.pubsubShardChannels(shardChannel).filter(s -> shardChannel.equals(s))); + assertThat(result).isEqualToIgnoringCase(shardChannel); } @Test @EnabledOnCommand("SPUBLISH") void pubsubShardNumsub() { - - // TODO After we have SSUBSCRIBE implement a step to subscribe to a shard channel - // Depends on https://github.com/lettuce-io/lettuce-core/issues/2758 + StepVerifier.create(pubsub.ssubscribe(shardChannel)).verifyComplete(); Wait.untilEquals(1, () -> block(pubsub2.pubsubShardNumsub(shardChannel)).size()).waitOrTimeout(); Map result = block(pubsub2.pubsubShardNumsub(shardChannel)); - assertThat(result.getOrDefault(shardChannel, 0L)).isEqualTo(0); - // TODO verify that the channel from step 1 is the one returned by the command + assertThat(result.getOrDefault(shardChannel, 0L)).isEqualTo(1); } @Test @@ -314,6 +317,7 @@ void pubsubNumpat() { Long result = block(pubsub2.pubsubNumpat()); assertThat(result.longValue()).isGreaterThan(0); } + @Test void punsubscribe() throws Exception { @@ -420,6 +424,7 @@ void adapter() throws Exception { final BlockingQueue localCounts = LettuceFactories.newBlockingQueue(); RedisPubSubAdapter adapter = new RedisPubSubAdapter() { + @Override public void subscribed(String channel, long count) { super.subscribed(channel, count); @@ -431,6 +436,7 @@ public void unsubscribed(String channel, long count) { super.unsubscribed(channel, count); localCounts.add(count); } + }; pubsub.getStatefulConnection().addListener(adapter); @@ -519,4 +525,5 @@ T mono(Flux flux) { List all(Flux flux) { return flux.collectList().block(); } + } diff --git a/src/test/java/io/lettuce/core/support/PubSubTestListener.java b/src/test/java/io/lettuce/core/support/PubSubTestListener.java index d89c9a7810..1afcb42766 100644 --- a/src/test/java/io/lettuce/core/support/PubSubTestListener.java +++ b/src/test/java/io/lettuce/core/support/PubSubTestListener.java @@ -30,10 +30,15 @@ public class PubSubTestListener implements RedisPubSubListener { private BlockingQueue channels = LettuceFactories.newBlockingQueue(); + private BlockingQueue patterns = LettuceFactories.newBlockingQueue(); + private BlockingQueue messages = LettuceFactories.newBlockingQueue(); + private BlockingQueue counts = LettuceFactories.newBlockingQueue(); + private BlockingQueue shardChannels = LettuceFactories.newBlockingQueue(); + private BlockingQueue shardCounts = LettuceFactories.newBlockingQueue(); // RedisPubSubListener implementation @@ -51,6 +56,12 @@ public void message(String pattern, String channel, String message) { messages.add(message); } + @Override + public void smessage(String channel, String message) { + shardChannels.add(channel); + messages.add(message); + } + @Override public void subscribed(String channel, long count) { channels.add(channel); @@ -116,4 +127,5 @@ public void clear() { counts.clear(); shardCounts.clear(); } + } From 247287a9906a41960413ca85208a37c65d790df9 Mon Sep 17 00:00:00 2001 From: atakavci Date: Wed, 24 Apr 2024 14:02:34 +0300 Subject: [PATCH 2/4] sort methods by name --- .../core/AbstractRedisAsyncCommands.java | 12 ++++----- .../core/AbstractRedisReactiveCommands.java | 10 +++---- .../io/lettuce/core/RedisCommandBuilder.java | 14 +++++----- .../api/async/BaseRedisAsyncCommands.java | 21 +++++++-------- .../reactive/BaseRedisReactiveCommands.java | 21 +++++++-------- .../core/api/sync/BaseRedisCommands.java | 20 +++++++------- .../async/BaseNodeSelectionAsyncCommands.java | 21 +++++++-------- .../api/sync/BaseNodeSelectionCommands.java | 21 +++++++-------- .../core/pubsub/PubSubCommandBuilder.java | 26 +++++++++---------- .../pubsub/RedisPubSubAsyncCommandsImpl.java | 10 +++---- .../RedisPubSubReactiveCommandsImpl.java | 10 +++---- .../coroutines/BaseRedisCoroutinesCommands.kt | 21 +++++++-------- .../BaseRedisCoroutinesCommandsImpl.kt | 4 +-- .../lettuce/core/api/BaseRedisCommands.java | 20 +++++++------- 14 files changed, 113 insertions(+), 118 deletions(-) diff --git a/src/main/java/io/lettuce/core/AbstractRedisAsyncCommands.java b/src/main/java/io/lettuce/core/AbstractRedisAsyncCommands.java index bce4f48930..2e80433a6a 100644 --- a/src/main/java/io/lettuce/core/AbstractRedisAsyncCommands.java +++ b/src/main/java/io/lettuce/core/AbstractRedisAsyncCommands.java @@ -1232,7 +1232,7 @@ public RedisFuture hscan(KeyValueStreamingChannel channe @Override public RedisFuture hscanNovalues(KeyStreamingChannel channel, K key, ScanCursor scanCursor, - ScanArgs scanArgs) { + ScanArgs scanArgs) { return dispatch(commandBuilder.hscanNoValuesStreaming(channel, key, scanCursor, scanArgs)); } @@ -1582,11 +1582,6 @@ public RedisFuture publish(K channel, V message) { return dispatch(commandBuilder.publish(channel, message)); } - @Override - public RedisFuture spublish(K shardChannel, V message) { - return dispatch(commandBuilder.spublish(shardChannel, message)); - } - @Override public RedisFuture> pubsubChannels() { return dispatch(commandBuilder.pubsubChannels()); @@ -2006,6 +2001,11 @@ public RedisFuture> spop(K key, long count) { return dispatch(commandBuilder.spop(key, count)); } + @Override + public RedisFuture spublish(K shardChannel, V message) { + return dispatch(commandBuilder.spublish(shardChannel, message)); + } + @Override public RedisFuture srandmember(K key) { return dispatch(commandBuilder.srandmember(key)); diff --git a/src/main/java/io/lettuce/core/AbstractRedisReactiveCommands.java b/src/main/java/io/lettuce/core/AbstractRedisReactiveCommands.java index d502900e6f..734076fe36 100644 --- a/src/main/java/io/lettuce/core/AbstractRedisReactiveCommands.java +++ b/src/main/java/io/lettuce/core/AbstractRedisReactiveCommands.java @@ -1661,11 +1661,6 @@ public Mono publish(K channel, V message) { return createMono(() -> commandBuilder.publish(channel, message)); } - @Override - public Mono spublish(K shardChannel, V message) { - return createMono(() -> commandBuilder.spublish(shardChannel, message)); - } - @Override public Flux pubsubChannels() { return createDissolvingFlux(commandBuilder::pubsubChannels); @@ -2085,6 +2080,11 @@ public Flux spop(K key, long count) { return createDissolvingFlux(() -> commandBuilder.spop(key, count)); } + @Override + public Mono spublish(K shardChannel, V message) { + return createMono(() -> commandBuilder.spublish(shardChannel, message)); + } + @Override public Mono srandmember(K key) { return createMono(() -> commandBuilder.srandmember(key)); diff --git a/src/main/java/io/lettuce/core/RedisCommandBuilder.java b/src/main/java/io/lettuce/core/RedisCommandBuilder.java index d074863e5e..7880540cf7 100644 --- a/src/main/java/io/lettuce/core/RedisCommandBuilder.java +++ b/src/main/java/io/lettuce/core/RedisCommandBuilder.java @@ -2156,13 +2156,6 @@ Command publish(K channel, V message) { return createCommand(PUBLISH, new IntegerOutput<>(codec), args); } - Command spublish(K shardChannel, V message) { - LettuceAssert.notNull(shardChannel, "ShardChannel " + MUST_NOT_BE_NULL); - - CommandArgs args = new CommandArgs<>(codec).addKey(shardChannel).addValue(message); - return createCommand(SPUBLISH, new IntegerOutput<>(codec), args); - } - Command> pubsubChannels() { CommandArgs args = new CommandArgs<>(codec).add(CHANNELS); return createCommand(PUBSUB, new KeyListOutput<>(codec), args); @@ -2710,6 +2703,13 @@ Command> spop(K key, long count) { return createCommand(SPOP, new ValueSetOutput<>(codec), args); } + Command spublish(K shardChannel, V message) { + LettuceAssert.notNull(shardChannel, "ShardChannel " + MUST_NOT_BE_NULL); + + CommandArgs args = new CommandArgs<>(codec).addKey(shardChannel).addValue(message); + return createCommand(SPUBLISH, new IntegerOutput<>(codec), args); + } + Command srandmember(K key) { notNullKey(key); diff --git a/src/main/java/io/lettuce/core/api/async/BaseRedisAsyncCommands.java b/src/main/java/io/lettuce/core/api/async/BaseRedisAsyncCommands.java index 7fd3d11796..76aac4f8ea 100644 --- a/src/main/java/io/lettuce/core/api/async/BaseRedisAsyncCommands.java +++ b/src/main/java/io/lettuce/core/api/async/BaseRedisAsyncCommands.java @@ -48,17 +48,6 @@ public interface BaseRedisAsyncCommands { */ RedisFuture publish(K channel, V message); - /** - * Post a message to a shard channel. - * - * @param shardChannel the shard channel type: key. - * @param message the message type: value. - * @return Long integer-reply the number of clients that received the message. - * @since 7.0 - */ - RedisFuture spublish(K shardChannel, V message); - - /** * Lists the currently *active channels*. * @@ -113,6 +102,16 @@ public interface BaseRedisAsyncCommands { */ RedisFuture pubsubNumpat(); + /** + * Post a message to a shard channel. + * + * @param shardChannel the shard channel type: key. + * @param message the message type: value. + * @return Long integer-reply the number of clients that received the message. + * @since 7.0 + */ + RedisFuture spublish(K shardChannel, V message); + /** * Echo the given string. * diff --git a/src/main/java/io/lettuce/core/api/reactive/BaseRedisReactiveCommands.java b/src/main/java/io/lettuce/core/api/reactive/BaseRedisReactiveCommands.java index f05b786044..24d9989efa 100644 --- a/src/main/java/io/lettuce/core/api/reactive/BaseRedisReactiveCommands.java +++ b/src/main/java/io/lettuce/core/api/reactive/BaseRedisReactiveCommands.java @@ -48,17 +48,6 @@ public interface BaseRedisReactiveCommands { */ Mono publish(K channel, V message); - /** - * Post a message to a shard channel. - * - * @param shardChannel the shard channel type: key. - * @param message the message type: value. - * @return Long integer-reply the number of clients that received the message. - * @since 7.0 - */ - Mono spublish(K shardChannel, V message); - - /** * Lists the currently *active channels*. * @@ -113,6 +102,16 @@ public interface BaseRedisReactiveCommands { */ Mono pubsubNumpat(); + /** + * Post a message to a shard channel. + * + * @param shardChannel the shard channel type: key. + * @param message the message type: value. + * @return Long integer-reply the number of clients that received the message. + * @since 7.0 + */ + Mono spublish(K shardChannel, V message); + /** * Echo the given string. * diff --git a/src/main/java/io/lettuce/core/api/sync/BaseRedisCommands.java b/src/main/java/io/lettuce/core/api/sync/BaseRedisCommands.java index 5111d4f01d..ce942334ed 100644 --- a/src/main/java/io/lettuce/core/api/sync/BaseRedisCommands.java +++ b/src/main/java/io/lettuce/core/api/sync/BaseRedisCommands.java @@ -47,16 +47,6 @@ public interface BaseRedisCommands { */ Long publish(K channel, V message); - /** - * Post a message to a shard channel. - * - * @param shardChannel the shard channel type: key. - * @param message the message type: value. - * @return Long integer-reply the number of clients that received the message. - * @since 7.0 - */ - Long spublish(K shardChannel, V message); - /** * Lists the currently *active channels*. * @@ -111,6 +101,16 @@ public interface BaseRedisCommands { */ Long pubsubNumpat(); + /** + * Post a message to a shard channel. + * + * @param shardChannel the shard channel type: key. + * @param message the message type: value. + * @return Long integer-reply the number of clients that received the message. + * @since 7.0 + */ + Long spublish(K shardChannel, V message); + /** * Echo the given string. * diff --git a/src/main/java/io/lettuce/core/cluster/api/async/BaseNodeSelectionAsyncCommands.java b/src/main/java/io/lettuce/core/cluster/api/async/BaseNodeSelectionAsyncCommands.java index edf287c7f0..dbdc2744f4 100644 --- a/src/main/java/io/lettuce/core/cluster/api/async/BaseNodeSelectionAsyncCommands.java +++ b/src/main/java/io/lettuce/core/cluster/api/async/BaseNodeSelectionAsyncCommands.java @@ -48,17 +48,6 @@ public interface BaseNodeSelectionAsyncCommands { */ AsyncExecutions publish(K channel, V message); - /** - * Post a message to a shard channel. - * - * @param shardChannel the shard channel type: key. - * @param message the message type: value. - * @return Long integer-reply the number of clients that received the message. - * @since 7.0 - */ - AsyncExecutions spublish(K shardChannel, V message); - - /** * Lists the currently *active channels*. * @@ -113,6 +102,16 @@ public interface BaseNodeSelectionAsyncCommands { */ AsyncExecutions pubsubNumpat(); + /** + * Post a message to a shard channel. + * + * @param shardChannel the shard channel type: key. + * @param message the message type: value. + * @return Long integer-reply the number of clients that received the message. + * @since 7.0 + */ + AsyncExecutions spublish(K shardChannel, V message); + /** * Echo the given string. * diff --git a/src/main/java/io/lettuce/core/cluster/api/sync/BaseNodeSelectionCommands.java b/src/main/java/io/lettuce/core/cluster/api/sync/BaseNodeSelectionCommands.java index 0a57158a58..a1b8922c8d 100644 --- a/src/main/java/io/lettuce/core/cluster/api/sync/BaseNodeSelectionCommands.java +++ b/src/main/java/io/lettuce/core/cluster/api/sync/BaseNodeSelectionCommands.java @@ -43,17 +43,6 @@ public interface BaseNodeSelectionCommands { */ Executions publish(K channel, V message); - /** - * Post a message to a shard channel. - * - * @param shardChannel the shard channel type: key. - * @param message the message type: value. - * @return Long integer-reply the number of clients that received the message. - * @since 7.0 - */ - Executions spublish(K shardChannel, V message); - - /** * Lists the currently *active channels*. * @@ -108,6 +97,16 @@ public interface BaseNodeSelectionCommands { */ Executions pubsubNumpat(); + /** + * Post a message to a shard channel. + * + * @param shardChannel the shard channel type: key. + * @param message the message type: value. + * @return Long integer-reply the number of clients that received the message. + * @since 7.0 + */ + Executions spublish(K shardChannel, V message); + /** * Echo the given string. * diff --git a/src/main/java/io/lettuce/core/pubsub/PubSubCommandBuilder.java b/src/main/java/io/lettuce/core/pubsub/PubSubCommandBuilder.java index 391149bbb8..8c486770ec 100644 --- a/src/main/java/io/lettuce/core/pubsub/PubSubCommandBuilder.java +++ b/src/main/java/io/lettuce/core/pubsub/PubSubCommandBuilder.java @@ -58,11 +58,6 @@ Command publish(K channel, V message) { return createCommand(PUBLISH, new IntegerOutput<>(codec), args); } - Command spublish(K shardChannel, V message) { - CommandArgs args = new CommandArgs<>(codec).addKey(shardChannel).addValue(message); - return createCommand(SPUBLISH, new IntegerOutput<>(codec), args); - } - Command> pubsubChannels(K pattern) { CommandArgs args = new PubSubCommandArgs<>(codec).add(CHANNELS).addKey(pattern); return createCommand(PUBSUB, new KeyListOutput<>(codec), args); @@ -103,6 +98,19 @@ final Command punsubscribe(K... patterns) { return pubSubCommand(PUNSUBSCRIBE, new PubSubOutput<>(codec), patterns); } + Command spublish(K shardChannel, V message) { + CommandArgs args = new CommandArgs<>(codec).addKey(shardChannel).addValue(message); + return createCommand(SPUBLISH, new IntegerOutput<>(codec), args); + } + + @SafeVarargs + final Command ssubscribe(K... shardChannels) { + LettuceAssert.notEmpty(shardChannels, "Shard channels " + MUST_NOT_BE_EMPTY); + + CommandArgs args = new CommandArgs<>(codec).addKeys(shardChannels); + return createCommand(SSUBSCRIBE, new PubSubOutput<>(codec), args); + } + @SafeVarargs final Command subscribe(K... channels) { LettuceAssert.notEmpty(channels, "Channels " + MUST_NOT_BE_EMPTY); @@ -115,14 +123,6 @@ final Command unsubscribe(K... channels) { return pubSubCommand(UNSUBSCRIBE, new PubSubOutput<>(codec), channels); } - @SafeVarargs - final Command ssubscribe(K... shardChannels) { - LettuceAssert.notEmpty(shardChannels, "Shard channels " + MUST_NOT_BE_EMPTY); - - CommandArgs args = new CommandArgs<>(codec).addKeys(shardChannels); - return createCommand(SSUBSCRIBE, new PubSubOutput<>(codec), args); - } - @SafeVarargs final Command pubSubCommand(CommandType type, CommandOutput output, K... keys) { return new Command<>(type, output, new PubSubCommandArgs<>(codec).addKeys(keys)); diff --git a/src/main/java/io/lettuce/core/pubsub/RedisPubSubAsyncCommandsImpl.java b/src/main/java/io/lettuce/core/pubsub/RedisPubSubAsyncCommandsImpl.java index 52c3f6e312..20515e60fd 100644 --- a/src/main/java/io/lettuce/core/pubsub/RedisPubSubAsyncCommandsImpl.java +++ b/src/main/java/io/lettuce/core/pubsub/RedisPubSubAsyncCommandsImpl.java @@ -80,11 +80,6 @@ public RedisFuture publish(K channel, V message) { return dispatch(commandBuilder.publish(channel, message)); } - @Override - public RedisFuture spublish(K shardChannel, V message) { - return dispatch(commandBuilder.spublish(shardChannel, message)); - } - @Override public RedisFuture> pubsubChannels(K channel) { return dispatch(commandBuilder.pubsubChannels(channel)); @@ -105,6 +100,11 @@ public RedisFuture> pubsubShardNumsub(K... shardChannels) { return dispatch(commandBuilder.pubsubShardNumsub(shardChannels)); } + @Override + public RedisFuture spublish(K shardChannel, V message) { + return dispatch(commandBuilder.spublish(shardChannel, message)); + } + @Override @SuppressWarnings("unchecked") public RedisFuture ssubscribe(K... channels) { diff --git a/src/main/java/io/lettuce/core/pubsub/RedisPubSubReactiveCommandsImpl.java b/src/main/java/io/lettuce/core/pubsub/RedisPubSubReactiveCommandsImpl.java index 46ff64b274..adff1b58fa 100644 --- a/src/main/java/io/lettuce/core/pubsub/RedisPubSubReactiveCommandsImpl.java +++ b/src/main/java/io/lettuce/core/pubsub/RedisPubSubReactiveCommandsImpl.java @@ -139,11 +139,6 @@ public Mono publish(K channel, V message) { return createMono(() -> commandBuilder.publish(channel, message)); } - @Override - public Mono spublish(K shardChannel, V message) { - return createMono(() -> commandBuilder.publish(shardChannel, message)); - } - @Override public Flux pubsubChannels(K channel) { return createDissolvingFlux(() -> commandBuilder.pubsubChannels(channel)); @@ -164,6 +159,11 @@ public Mono> pubsubShardNumsub(K... shardChannels) { return createMono(() -> commandBuilder.pubsubShardNumsub(shardChannels)); } + @Override + public Mono spublish(K shardChannel, V message) { + return createMono(() -> commandBuilder.publish(shardChannel, message)); + } + @Override public Mono ssubscribe(K... shardChannels) { return createFlux(() -> commandBuilder.ssubscribe(shardChannels)).then(); diff --git a/src/main/kotlin/io/lettuce/core/api/coroutines/BaseRedisCoroutinesCommands.kt b/src/main/kotlin/io/lettuce/core/api/coroutines/BaseRedisCoroutinesCommands.kt index 163560a3a7..df829da7b5 100644 --- a/src/main/kotlin/io/lettuce/core/api/coroutines/BaseRedisCoroutinesCommands.kt +++ b/src/main/kotlin/io/lettuce/core/api/coroutines/BaseRedisCoroutinesCommands.kt @@ -48,17 +48,6 @@ interface BaseRedisCoroutinesCommands { */ suspend fun publish(channel: K, message: V): Long? - /** - * Post a message to a shard channel. - * - * @param shardChannel the shard channel type: key. - * @param message the message type: value. - * @return Long integer-reply the number of clients that received the message. - * @since 7.0 - */ - suspend fun spublish(shardChannel: K, message: V): Long? - - /** * Lists the currently *active channels*. * @@ -113,6 +102,16 @@ interface BaseRedisCoroutinesCommands { */ suspend fun pubsubNumpat(): Long + /** + * Post a message to a shard channel. + * + * @param shardChannel the shard channel type: key. + * @param message the message type: value. + * @return Long integer-reply the number of clients that received the message. + * @since 7.0 + */ + suspend fun spublish(shardChannel: K, message: V): Long? + /** * Echo the given string. * diff --git a/src/main/kotlin/io/lettuce/core/api/coroutines/BaseRedisCoroutinesCommandsImpl.kt b/src/main/kotlin/io/lettuce/core/api/coroutines/BaseRedisCoroutinesCommandsImpl.kt index fd88019a5f..6351169241 100644 --- a/src/main/kotlin/io/lettuce/core/api/coroutines/BaseRedisCoroutinesCommandsImpl.kt +++ b/src/main/kotlin/io/lettuce/core/api/coroutines/BaseRedisCoroutinesCommandsImpl.kt @@ -46,8 +46,6 @@ internal class BaseRedisCoroutinesCommandsImpl(internal val op override suspend fun publish(channel: K, message: V): Long? = ops.publish(channel, message).awaitFirstOrNull() - override suspend fun spublish(shardChannel: K, message: V): Long? = ops.spublish(shardChannel, message).awaitFirstOrNull() - override suspend fun pubsubChannels(): List = ops.pubsubChannels().asFlow().toList() override suspend fun pubsubChannels(channel: K): List = ops.pubsubChannels(channel).asFlow().toList() @@ -62,6 +60,8 @@ internal class BaseRedisCoroutinesCommandsImpl(internal val op override suspend fun pubsubNumpat(): Long = ops.pubsubNumpat().awaitSingle() + override suspend fun spublish(shardChannel: K, message: V): Long? = ops.spublish(shardChannel, message).awaitFirstOrNull() + override suspend fun echo(msg: V): V = ops.echo(msg).awaitSingle() override suspend fun role(): List = ops.role().asFlow().toList() diff --git a/src/main/templates/io/lettuce/core/api/BaseRedisCommands.java b/src/main/templates/io/lettuce/core/api/BaseRedisCommands.java index ae42efb578..27c5723f0f 100644 --- a/src/main/templates/io/lettuce/core/api/BaseRedisCommands.java +++ b/src/main/templates/io/lettuce/core/api/BaseRedisCommands.java @@ -48,16 +48,6 @@ public interface BaseRedisCommands { */ Long publish(K channel, V message); - /** - * Post a message to a shard channel. - * - * @param shardChannel the shard channel type: key. - * @param message the message type: value. - * @return Long integer-reply the number of clients that received the message. - * @since 7.0 - */ - Long spublish(K shardChannel, V message); - /** * Lists the currently *active channels*. * @@ -112,6 +102,16 @@ public interface BaseRedisCommands { */ Long pubsubNumpat(); + /** + * Post a message to a shard channel. + * + * @param shardChannel the shard channel type: key. + * @param message the message type: value. + * @return Long integer-reply the number of clients that received the message. + * @since 7.0 + */ + Long spublish(K shardChannel, V message); + /** * Echo the given string. * From b906da7413933545b2344f624483e72260862c5b Mon Sep 17 00:00:00 2001 From: atakavci Date: Wed, 24 Apr 2024 17:23:43 +0300 Subject: [PATCH 3/4] test cluster spublish with no redirects --- ...usterPubSubConnectionIntegrationTests.java | 26 ++++++++++++++++++- .../resource/DefaultRedisClusterClient.java | 12 +++++++++ 2 files changed, 37 insertions(+), 1 deletion(-) diff --git a/src/test/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubConnectionIntegrationTests.java b/src/test/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubConnectionIntegrationTests.java index def4550369..a762169e23 100644 --- a/src/test/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubConnectionIntegrationTests.java +++ b/src/test/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubConnectionIntegrationTests.java @@ -15,14 +15,15 @@ import org.junit.jupiter.api.extension.ExtendWith; import io.lettuce.core.RedisURI; -import io.lettuce.core.StatefulRedisConnectionImpl; import io.lettuce.core.TestSupport; import io.lettuce.core.api.sync.RedisCommands; +import io.lettuce.core.cluster.ClusterClientOptions; import io.lettuce.core.cluster.RedisClusterClient; import io.lettuce.core.cluster.api.StatefulRedisClusterConnection; import io.lettuce.core.cluster.models.partitions.RedisClusterNode; import io.lettuce.core.cluster.pubsub.api.async.NodeSelectionPubSubAsyncCommands; import io.lettuce.core.cluster.pubsub.api.async.PubSubAsyncNodeSelection; +import io.lettuce.core.cluster.pubsub.api.async.RedisClusterPubSubAsyncCommands; import io.lettuce.core.cluster.pubsub.api.reactive.NodeSelectionPubSubReactiveCommands; import io.lettuce.core.cluster.pubsub.api.reactive.PubSubReactiveNodeSelection; import io.lettuce.core.cluster.pubsub.api.sync.NodeSelectionPubSubCommands; @@ -182,6 +183,29 @@ void publishToShardChannelViaNewClient() throws Exception { Wait.untilEquals(shardMessage, connectionListener.getMessages()::poll).waitOrTimeout(); } + @Test + @EnabledOnCommand("SSUBSCRIBE") + void publishToShardChannelViaNewClientWithNoRedirects() throws Exception { + pubSubConnection.addListener(connectionListener); + pubSubConnection.async().ssubscribe(shardChannel); + Wait.untilEquals(shardChannel, connectionListener.getShardChannels()::poll).waitOrTimeout(); + + pubSubConnection.async().ssubscribe(shardTestChannel); + Wait.untilEquals(shardTestChannel, connectionListener.getShardChannels()::poll).waitOrTimeout(); + + ClusterClientOptions.Builder builder = ClusterClientOptions.builder().maxRedirects(0); + RedisClusterPubSubAsyncCommands cmd = DefaultRedisClusterClient.get(builder.build()).connectPubSub() + .async(); + + cmd.spublish(shardChannel, shardMessage); + Wait.untilEquals(shardChannel, connectionListener.getShardChannels()::poll).waitOrTimeout(); + Wait.untilEquals(shardMessage, connectionListener.getMessages()::poll).waitOrTimeout(); + + cmd.spublish(shardTestChannel, shardMessage); + Wait.untilEquals(shardTestChannel, connectionListener.getShardChannels()::poll).waitOrTimeout(); + Wait.untilEquals(shardMessage, connectionListener.getMessages()::poll).waitOrTimeout(); + } + @Test void myIdWorksAfterDisconnect() throws InterruptedException { diff --git a/src/test/java/io/lettuce/test/resource/DefaultRedisClusterClient.java b/src/test/java/io/lettuce/test/resource/DefaultRedisClusterClient.java index 35a3b8301d..0a23cbfe0f 100644 --- a/src/test/java/io/lettuce/test/resource/DefaultRedisClusterClient.java +++ b/src/test/java/io/lettuce/test/resource/DefaultRedisClusterClient.java @@ -34,4 +34,16 @@ public static RedisClusterClient get() { instance.redisClient.setOptions(ClusterClientOptions.create()); return instance.redisClient; } + + + /** + * Do not close the client. + * + * @return redis client with given options for the tests. + */ + public static RedisClusterClient get(ClusterClientOptions options) { + DefaultRedisClusterClient client = new DefaultRedisClusterClient(); + client.redisClient.setOptions(options); + return instance.redisClient; + } } From 1f3d86dc943ea94bc10c80c5c8965ba4488c106e Mon Sep 17 00:00:00 2001 From: atakavci Date: Tue, 30 Apr 2024 10:14:57 +0300 Subject: [PATCH 4/4] use injected cluster client in RedisClusterPubSubConnectionIntegrationTests --- ...ClusterPubSubConnectionIntegrationTests.java | 17 +++++++++++------ .../resource/DefaultRedisClusterClient.java | 12 ------------ 2 files changed, 11 insertions(+), 18 deletions(-) diff --git a/src/test/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubConnectionIntegrationTests.java b/src/test/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubConnectionIntegrationTests.java index a762169e23..772af41dab 100644 --- a/src/test/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubConnectionIntegrationTests.java +++ b/src/test/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubConnectionIntegrationTests.java @@ -36,7 +36,6 @@ import io.lettuce.test.TestFutures; import io.lettuce.test.Wait; import io.lettuce.test.condition.EnabledOnCommand; -import io.lettuce.test.resource.DefaultRedisClusterClient; /** * Integration tests for Cluster Pub/Sub. @@ -48,6 +47,8 @@ class RedisClusterPubSubConnectionIntegrationTests extends TestSupport { private final RedisClusterClient clusterClient; + private final RedisClusterClient clusterClientWithNoRedirects; + private final PubSubTestListener connectionListener = new PubSubTestListener(); private final PubSubTestListener nodeListener = new PubSubTestListener(); @@ -65,8 +66,11 @@ class RedisClusterPubSubConnectionIntegrationTests extends TestSupport { String shardTestChannel = "shard-test-channel"; @Inject - RedisClusterPubSubConnectionIntegrationTests(RedisClusterClient clusterClient) { + RedisClusterPubSubConnectionIntegrationTests(RedisClusterClient clusterClient, RedisClusterClient clusterClient2) { this.clusterClient = clusterClient; + ClusterClientOptions.Builder builder = ClusterClientOptions.builder().maxRedirects(0); + clusterClient2.setOptions(builder.build()); + this.clusterClientWithNoRedirects = clusterClient2; } @BeforeEach @@ -178,9 +182,11 @@ void publishToShardChannelViaNewClient() throws Exception { pubSubConnection.addListener(connectionListener); pubSubConnection.async().ssubscribe(shardChannel); - DefaultRedisClusterClient.get().connectPubSub().async().spublish(shardChannel, shardMessage); + StatefulRedisClusterPubSubConnection newPubsub = clusterClientWithNoRedirects.connectPubSub(); + newPubsub.async().spublish(shardChannel, shardMessage); Wait.untilEquals(shardChannel, connectionListener.getShardChannels()::poll).waitOrTimeout(); Wait.untilEquals(shardMessage, connectionListener.getMessages()::poll).waitOrTimeout(); + newPubsub.close(); } @Test @@ -193,9 +199,7 @@ void publishToShardChannelViaNewClientWithNoRedirects() throws Exception { pubSubConnection.async().ssubscribe(shardTestChannel); Wait.untilEquals(shardTestChannel, connectionListener.getShardChannels()::poll).waitOrTimeout(); - ClusterClientOptions.Builder builder = ClusterClientOptions.builder().maxRedirects(0); - RedisClusterPubSubAsyncCommands cmd = DefaultRedisClusterClient.get(builder.build()).connectPubSub() - .async(); + RedisClusterPubSubAsyncCommands cmd = clusterClientWithNoRedirects.connectPubSub().async(); cmd.spublish(shardChannel, shardMessage); Wait.untilEquals(shardChannel, connectionListener.getShardChannels()::poll).waitOrTimeout(); @@ -204,6 +208,7 @@ void publishToShardChannelViaNewClientWithNoRedirects() throws Exception { cmd.spublish(shardTestChannel, shardMessage); Wait.untilEquals(shardTestChannel, connectionListener.getShardChannels()::poll).waitOrTimeout(); Wait.untilEquals(shardMessage, connectionListener.getMessages()::poll).waitOrTimeout(); + cmd.getStatefulConnection().close(); } @Test diff --git a/src/test/java/io/lettuce/test/resource/DefaultRedisClusterClient.java b/src/test/java/io/lettuce/test/resource/DefaultRedisClusterClient.java index 0a23cbfe0f..35a3b8301d 100644 --- a/src/test/java/io/lettuce/test/resource/DefaultRedisClusterClient.java +++ b/src/test/java/io/lettuce/test/resource/DefaultRedisClusterClient.java @@ -34,16 +34,4 @@ public static RedisClusterClient get() { instance.redisClient.setOptions(ClusterClientOptions.create()); return instance.redisClient; } - - - /** - * Do not close the client. - * - * @return redis client with given options for the tests. - */ - public static RedisClusterClient get(ClusterClientOptions options) { - DefaultRedisClusterClient client = new DefaultRedisClusterClient(); - client.redisClient.setOptions(options); - return instance.redisClient; - } }