From fa182b7216d330eceae0c3a769f518a782860cfa Mon Sep 17 00:00:00 2001 From: atakavci <58048133+atakavci@users.noreply.github.com> Date: Tue, 26 Mar 2024 17:48:54 +0300 Subject: [PATCH] implementation of PUBSUB SHARDCHANNELS (#2793) * implementation of PUBSUB SHARDCHANNELS / issue #2756 * Polishing #2756 --- .../core/AbstractRedisAsyncCommands.java | 11 +++++++ .../core/AbstractRedisReactiveCommands.java | 11 +++++++ .../io/lettuce/core/RedisCommandBuilder.java | 13 ++++++++ .../api/async/BaseRedisAsyncCommands.java | 16 ++++++++++ .../reactive/BaseRedisReactiveCommands.java | 16 ++++++++++ .../core/api/sync/BaseRedisCommands.java | 16 ++++++++++ .../async/BaseNodeSelectionAsyncCommands.java | 16 ++++++++++ .../api/sync/BaseNodeSelectionCommands.java | 16 ++++++++++ .../lettuce/core/protocol/CommandKeyword.java | 3 +- .../core/pubsub/PubSubCommandBuilder.java | 6 ++++ .../pubsub/RedisPubSubAsyncCommandsImpl.java | 6 ++++ .../RedisPubSubReactiveCommandsImpl.java | 6 ++++ .../coroutines/BaseRedisCoroutinesCommands.kt | 16 ++++++++++ .../BaseRedisCoroutinesCommandsImpl.kt | 5 +++ .../lettuce/core/api/BaseRedisCommands.java | 16 ++++++++++ ...usterPubSubConnectionIntegrationTests.java | 16 ++++++++++ .../core/pubsub/PubSubCommandTest.java | 29 +++++++++++++++++ .../core/pubsub/PubSubReactiveTest.java | 31 +++++++++++++++++++ 18 files changed, 248 insertions(+), 1 deletion(-) diff --git a/src/main/java/io/lettuce/core/AbstractRedisAsyncCommands.java b/src/main/java/io/lettuce/core/AbstractRedisAsyncCommands.java index 8924c27274..ca112e9b57 100644 --- a/src/main/java/io/lettuce/core/AbstractRedisAsyncCommands.java +++ b/src/main/java/io/lettuce/core/AbstractRedisAsyncCommands.java @@ -56,6 +56,7 @@ * @author Tugdual Grall * @author dengliming * @author Andrey Shlykov + * @author Ali Takavci */ @SuppressWarnings("unchecked") public abstract class AbstractRedisAsyncCommands implements RedisAclAsyncCommands, RedisHashAsyncCommands, @@ -1556,6 +1557,16 @@ public RedisFuture> pubsubNumsub(K... channels) { return dispatch(commandBuilder.pubsubNumsub(channels)); } + @Override + public RedisFuture> pubsubShardChannels() { + return dispatch(commandBuilder.pubsubShardChannels()); + } + + @Override + public RedisFuture> pubsubShardChannels(K pattern) { + return dispatch(commandBuilder.pubsubShardChannels(pattern)); + } + @Override public RedisFuture> pubsubShardNumsub(K... shardChannels) { return dispatch(commandBuilder.pubsubShardNumsub(shardChannels)); diff --git a/src/main/java/io/lettuce/core/AbstractRedisReactiveCommands.java b/src/main/java/io/lettuce/core/AbstractRedisReactiveCommands.java index 310fe03185..3ebc4a9844 100644 --- a/src/main/java/io/lettuce/core/AbstractRedisReactiveCommands.java +++ b/src/main/java/io/lettuce/core/AbstractRedisReactiveCommands.java @@ -65,6 +65,7 @@ * @author Tugdual Grall * @author dengliming * @author Andrey Shlykov + * @author Ali Takavci * @since 4.0 */ public abstract class AbstractRedisReactiveCommands @@ -1635,6 +1636,16 @@ public Mono> pubsubNumsub(K... channels) { return createMono(() -> commandBuilder.pubsubNumsub(channels)); } + @Override + public Flux pubsubShardChannels() { + return createDissolvingFlux(commandBuilder::pubsubShardChannels); + } + + @Override + public Flux pubsubShardChannels(K pattern) { + return createDissolvingFlux(() -> commandBuilder.pubsubShardChannels(pattern)); + } + @Override public Mono> pubsubShardNumsub(K... shardChannels) { return createMono(() -> commandBuilder.pubsubShardNumsub(shardChannels)); diff --git a/src/main/java/io/lettuce/core/RedisCommandBuilder.java b/src/main/java/io/lettuce/core/RedisCommandBuilder.java index e6c9581aaf..1e85e37f9b 100644 --- a/src/main/java/io/lettuce/core/RedisCommandBuilder.java +++ b/src/main/java/io/lettuce/core/RedisCommandBuilder.java @@ -53,6 +53,7 @@ * @author dengliming * @author Mikhael Sokolov * @author Tihomir Mateev + * @author Ali Takavci */ @SuppressWarnings({ "unchecked", "varargs" }) class RedisCommandBuilder extends BaseRedisCommandBuilder { @@ -2108,6 +2109,18 @@ Command> pubsubNumsub(K... channels) { return createCommand(PUBSUB, (MapOutput) new MapOutput((RedisCodec) codec), args); } + Command> pubsubShardChannels() { + CommandArgs args = new CommandArgs<>(codec).add(SHARDCHANNELS); + return createCommand(PUBSUB, new KeyListOutput<>(codec), args); + } + + Command> pubsubShardChannels(K pattern) { + LettuceAssert.notNull(pattern, "Pattern " + MUST_NOT_BE_NULL); + + CommandArgs args = new CommandArgs<>(codec).add(SHARDCHANNELS).addKey(pattern); + return createCommand(PUBSUB, new KeyListOutput<>(codec), args); + } + Command> pubsubShardNumsub(K... shardChannels) { LettuceAssert.notNull(shardChannels, "ShardChannels " + MUST_NOT_BE_NULL); LettuceAssert.notEmpty(shardChannels, "ShardChannels " + MUST_NOT_BE_EMPTY); diff --git a/src/main/java/io/lettuce/core/api/async/BaseRedisAsyncCommands.java b/src/main/java/io/lettuce/core/api/async/BaseRedisAsyncCommands.java index 8c8b381303..6875c6cb17 100644 --- a/src/main/java/io/lettuce/core/api/async/BaseRedisAsyncCommands.java +++ b/src/main/java/io/lettuce/core/api/async/BaseRedisAsyncCommands.java @@ -29,6 +29,7 @@ * @param Key type. * @param Value type. * @author Mark Paluch + * @author Ali Takavci * @since 4.0 * @generated by io.lettuce.apigenerator.CreateAsyncApi */ @@ -66,6 +67,21 @@ public interface BaseRedisAsyncCommands { */ RedisFuture> pubsubNumsub(K... channels); + /** + * Lists the currently *active shard channels*. + * + * @return List<K> array-reply a list of active channels. + */ + RedisFuture> pubsubShardChannels(); + + /** + * Lists the currently *active shard channels*. + * + * @param pattern the pattern type: patternkey (pattern). + * @return List<K> array-reply a list of active channels, optionally matching the specified pattern. + */ + RedisFuture> pubsubShardChannels(K pattern); + /** * Returns the number of subscribers (not counting clients subscribed to patterns) for the specified shard channels. * diff --git a/src/main/java/io/lettuce/core/api/reactive/BaseRedisReactiveCommands.java b/src/main/java/io/lettuce/core/api/reactive/BaseRedisReactiveCommands.java index 27eb0672b9..2c48fcbc44 100644 --- a/src/main/java/io/lettuce/core/api/reactive/BaseRedisReactiveCommands.java +++ b/src/main/java/io/lettuce/core/api/reactive/BaseRedisReactiveCommands.java @@ -29,6 +29,7 @@ * @param Key type. * @param Value type. * @author Mark Paluch + * @author Ali Takavci * @since 4.0 * @generated by io.lettuce.apigenerator.CreateReactiveApi */ @@ -66,6 +67,21 @@ public interface BaseRedisReactiveCommands { */ Mono> pubsubNumsub(K... channels); + /** + * Lists the currently *active shard channels*. + * + * @return K array-reply a list of active channels. + */ + Flux pubsubShardChannels(); + + /** + * Lists the currently *active shard channels*. + * + * @param pattern the pattern type: patternkey (pattern). + * @return K array-reply a list of active channels, optionally matching the specified pattern. + */ + Flux pubsubShardChannels(K pattern); + /** * Returns the number of subscribers (not counting clients subscribed to patterns) for the specified shard channels. * diff --git a/src/main/java/io/lettuce/core/api/sync/BaseRedisCommands.java b/src/main/java/io/lettuce/core/api/sync/BaseRedisCommands.java index c0295bfc2d..64d80a658b 100644 --- a/src/main/java/io/lettuce/core/api/sync/BaseRedisCommands.java +++ b/src/main/java/io/lettuce/core/api/sync/BaseRedisCommands.java @@ -28,6 +28,7 @@ * @param Key type. * @param Value type. * @author Mark Paluch + * @author Ali Takavci * @since 4.0 * @generated by io.lettuce.apigenerator.CreateSyncApi */ @@ -65,6 +66,21 @@ public interface BaseRedisCommands { */ Map pubsubNumsub(K... channels); + /** + * Lists the currently *active shard channels*. + * + * @return List<K> array-reply a list of active channels. + */ + List pubsubShardChannels(); + + /** + * Lists the currently *active shard channels*. + * + * @param pattern the pattern type: patternkey (pattern). + * @return List<K> array-reply a list of active channels, optionally matching the specified pattern. + */ + List pubsubShardChannels(K pattern); + /** * Returns the number of subscribers (not counting clients subscribed to patterns) for the specified shard channels. * diff --git a/src/main/java/io/lettuce/core/cluster/api/async/BaseNodeSelectionAsyncCommands.java b/src/main/java/io/lettuce/core/cluster/api/async/BaseNodeSelectionAsyncCommands.java index b08959f2e9..4a225638a2 100644 --- a/src/main/java/io/lettuce/core/cluster/api/async/BaseNodeSelectionAsyncCommands.java +++ b/src/main/java/io/lettuce/core/cluster/api/async/BaseNodeSelectionAsyncCommands.java @@ -29,6 +29,7 @@ * @param Key type. * @param Value type. * @author Mark Paluch + * @author Ali Takavci * @since 4.0 * @generated by io.lettuce.apigenerator.CreateAsyncNodeSelectionClusterApi */ @@ -66,6 +67,21 @@ public interface BaseNodeSelectionAsyncCommands { */ AsyncExecutions> pubsubNumsub(K... channels); + /** + * Lists the currently *active shard channels*. + * + * @return List<K> array-reply a list of active channels. + */ + AsyncExecutions> pubsubShardChannels(); + + /** + * Lists the currently *active shard channels*. + * + * @param pattern the pattern type: patternkey (pattern). + * @return List<K> array-reply a list of active channels, optionally matching the specified pattern. + */ + AsyncExecutions> pubsubShardChannels(K pattern); + /** * Returns the number of subscribers (not counting clients subscribed to patterns) for the specified shard channels. * diff --git a/src/main/java/io/lettuce/core/cluster/api/sync/BaseNodeSelectionCommands.java b/src/main/java/io/lettuce/core/cluster/api/sync/BaseNodeSelectionCommands.java index 9318dbd0d0..8bf58de2ce 100644 --- a/src/main/java/io/lettuce/core/cluster/api/sync/BaseNodeSelectionCommands.java +++ b/src/main/java/io/lettuce/core/cluster/api/sync/BaseNodeSelectionCommands.java @@ -24,6 +24,7 @@ * @param Key type. * @param Value type. * @author Mark Paluch + * @author Ali Takavci * @since 4.0 * @generated by io.lettuce.apigenerator.CreateSyncNodeSelectionClusterApi */ @@ -61,6 +62,21 @@ public interface BaseNodeSelectionCommands { */ Executions> pubsubNumsub(K... channels); + /** + * Lists the currently *active shard channels*. + * + * @return List<K> array-reply a list of active channels. + */ + Executions> pubsubShardChannels(); + + /** + * Lists the currently *active shard channels*. + * + * @param pattern the pattern type: patternkey (pattern). + * @return List<K> array-reply a list of active channels, optionally matching the specified pattern. + */ + Executions> pubsubShardChannels(K pattern); + /** * Returns the number of subscribers (not counting clients subscribed to patterns) for the specified shard channels. * diff --git a/src/main/java/io/lettuce/core/protocol/CommandKeyword.java b/src/main/java/io/lettuce/core/protocol/CommandKeyword.java index 8c53f0f817..c32d67f56b 100644 --- a/src/main/java/io/lettuce/core/protocol/CommandKeyword.java +++ b/src/main/java/io/lettuce/core/protocol/CommandKeyword.java @@ -25,6 +25,7 @@ * @author Zhang Jessey * @author dengliming * @author Tihomir Mateev + * @author Ali Takavci */ public enum CommandKeyword implements ProtocolKeyword { @@ -36,7 +37,7 @@ public enum CommandKeyword implements ProtocolKeyword { IDLETIME, JUSTID, KILL, KEYSLOT, LEFT, LEN, LIMIT, LIST, LOAD, LOG, MATCH, - MAX, MAXLEN, MEET, MIN, MINID, MOVED, NO, NOACK, NOCOMMANDS, NODE, NODES, NOMKSTREAM, NOPASS, NOSAVE, NOT, NUMSUB, SHARDNUMSUB, NUMPAT, NX, OFF, ON, ONE, OR, PAUSE, + MAX, MAXLEN, MEET, MIN, MINID, MOVED, NO, NOACK, NOCOMMANDS, NODE, NODES, NOMKSTREAM, NOPASS, NOSAVE, NOT, NUMSUB, SHARDCHANNELS, SHARDNUMSUB, NUMPAT, NX, OFF, ON, ONE, OR, PAUSE, REFCOUNT, REMOVE, RELOAD, REPLACE, REPLICATE, REPLICAS, REV, RESET, RESETCHANNELS, RESETKEYS, RESETPASS, diff --git a/src/main/java/io/lettuce/core/pubsub/PubSubCommandBuilder.java b/src/main/java/io/lettuce/core/pubsub/PubSubCommandBuilder.java index db22a9dcd5..3573b4d894 100644 --- a/src/main/java/io/lettuce/core/pubsub/PubSubCommandBuilder.java +++ b/src/main/java/io/lettuce/core/pubsub/PubSubCommandBuilder.java @@ -37,6 +37,7 @@ * * @author Mark Paluch * @author Tihomir Mateev + * @author Ali Takavci * @since 4.2 */ @SuppressWarnings("varargs") @@ -67,6 +68,11 @@ final Command> pubsubNumsub(K... channels) { return createCommand(PUBSUB, new MapOutput<>((RedisCodec) codec), args); } + Command> pubsubShardChannels(K pattern) { + CommandArgs args = new PubSubCommandArgs<>(codec).add(SHARDCHANNELS).addKey(pattern); + return createCommand(PUBSUB, new KeyListOutput<>(codec), args); + } + @SafeVarargs @SuppressWarnings({ "unchecked", "rawtypes" }) final Command> pubsubShardNumsub(K... shardChannels) { diff --git a/src/main/java/io/lettuce/core/pubsub/RedisPubSubAsyncCommandsImpl.java b/src/main/java/io/lettuce/core/pubsub/RedisPubSubAsyncCommandsImpl.java index 595c92c5c3..8a9d30b242 100644 --- a/src/main/java/io/lettuce/core/pubsub/RedisPubSubAsyncCommandsImpl.java +++ b/src/main/java/io/lettuce/core/pubsub/RedisPubSubAsyncCommandsImpl.java @@ -30,6 +30,7 @@ * @param Value type. * @author Will Glozer * @author Mark Paluch + * @author Ali Takavci */ public class RedisPubSubAsyncCommandsImpl extends RedisAsyncCommandsImpl implements RedisPubSubAsyncCommands { @@ -85,6 +86,11 @@ public RedisFuture> pubsubNumsub(K... channels) { return dispatch(commandBuilder.pubsubNumsub(channels)); } + @Override + public RedisFuture> pubsubShardChannels(K pattern) { + return dispatch(commandBuilder.pubsubShardChannels(pattern)); + } + @Override public RedisFuture> pubsubShardNumsub(K... shardChannels) { return dispatch(commandBuilder.pubsubShardNumsub(shardChannels)); diff --git a/src/main/java/io/lettuce/core/pubsub/RedisPubSubReactiveCommandsImpl.java b/src/main/java/io/lettuce/core/pubsub/RedisPubSubReactiveCommandsImpl.java index dba1a4b258..af812fde0f 100644 --- a/src/main/java/io/lettuce/core/pubsub/RedisPubSubReactiveCommandsImpl.java +++ b/src/main/java/io/lettuce/core/pubsub/RedisPubSubReactiveCommandsImpl.java @@ -32,6 +32,7 @@ * @param Key type. * @param Value type. * @author Mark Paluch + * @author Ali Takavci * @since 5.0 */ public class RedisPubSubReactiveCommandsImpl extends RedisReactiveCommandsImpl @@ -143,6 +144,11 @@ public Mono> pubsubNumsub(K... channels) { return createMono(() -> commandBuilder.pubsubNumsub(channels)); } + @Override + public Flux pubsubShardChannels(K channel) { + return createDissolvingFlux(() -> commandBuilder.pubsubShardChannels(channel)); + } + @Override public Mono> pubsubShardNumsub(K... shardChannels) { return createMono(() -> commandBuilder.pubsubShardNumsub(shardChannels)); diff --git a/src/main/kotlin/io/lettuce/core/api/coroutines/BaseRedisCoroutinesCommands.kt b/src/main/kotlin/io/lettuce/core/api/coroutines/BaseRedisCoroutinesCommands.kt index f8c7639252..aa02565f40 100644 --- a/src/main/kotlin/io/lettuce/core/api/coroutines/BaseRedisCoroutinesCommands.kt +++ b/src/main/kotlin/io/lettuce/core/api/coroutines/BaseRedisCoroutinesCommands.kt @@ -28,6 +28,7 @@ import kotlinx.coroutines.flow.Flow * @param Key type. * @param Value type. * @author Mikhael Sokolov + * @author Ali Takavci * @since 6.0 * @generated by io.lettuce.apigenerator.CreateKotlinCoroutinesApi */ @@ -65,6 +66,21 @@ interface BaseRedisCoroutinesCommands { * @return array-reply a list of channels and number of subscribers for every channel. */ suspend fun pubsubNumsub(vararg channels: K): Map + + /** + * Lists the currently *active shard channels*. + * + * @return List array-reply a list of active channels. + */ + suspend fun pubsubShardChannels(): List + + /** + * Lists the currently *active shard channels*. + * + * @param pattern the pattern type: patternkey (pattern). + * @return List array-reply a list of active channels, optionally matching the specified pattern. + */ + suspend fun pubsubShardChannels(pattern: K): List /** * Returns the number of subscribers (not counting clients subscribed to patterns) for the specified shard channels. diff --git a/src/main/kotlin/io/lettuce/core/api/coroutines/BaseRedisCoroutinesCommandsImpl.kt b/src/main/kotlin/io/lettuce/core/api/coroutines/BaseRedisCoroutinesCommandsImpl.kt index 4d4e1ff6da..5aa125429d 100644 --- a/src/main/kotlin/io/lettuce/core/api/coroutines/BaseRedisCoroutinesCommandsImpl.kt +++ b/src/main/kotlin/io/lettuce/core/api/coroutines/BaseRedisCoroutinesCommandsImpl.kt @@ -34,6 +34,7 @@ import kotlinx.coroutines.reactive.awaitSingle * @param Key type. * @param Value type. * @author Mikhael Sokolov + * @author Ali Takavci * @since 6.0 */ @ExperimentalLettuceCoroutinesApi @@ -47,6 +48,10 @@ internal class BaseRedisCoroutinesCommandsImpl(internal val op override suspend fun pubsubNumsub(vararg channels: K): Map = ops.pubsubNumsub(*channels).awaitSingle() + override suspend fun pubsubShardChannels(): List = ops.pubsubShardChannels().asFlow().toList() + + override suspend fun pubsubShardChannels(pattern: K): List = ops.pubsubShardChannels(pattern).asFlow().toList() + override suspend fun pubsubShardNumsub(vararg shardChannels: K): Map = ops.pubsubShardNumsub(*shardChannels).awaitSingle() override suspend fun pubsubNumpat(): Long = ops.pubsubNumpat().awaitSingle() diff --git a/src/main/templates/io/lettuce/core/api/BaseRedisCommands.java b/src/main/templates/io/lettuce/core/api/BaseRedisCommands.java index e9486a0b9b..13b998a71f 100644 --- a/src/main/templates/io/lettuce/core/api/BaseRedisCommands.java +++ b/src/main/templates/io/lettuce/core/api/BaseRedisCommands.java @@ -30,6 +30,7 @@ * @param Value type. * @author Mark Paluch * @author Tihomir Mateev + * @author Ali Takavci * @since 4.0 */ public interface BaseRedisCommands { @@ -66,6 +67,21 @@ public interface BaseRedisCommands { */ Map pubsubNumsub(K... channels); + /** + * Lists the currently *active shard channels*. + * + * @return List<K> array-reply a list of active channels. + */ + List pubsubShardChannels(); + + /** + * Lists the currently *active shard channels*. + * + * @param pattern the pattern type: patternkey (pattern). + * @return List<K> array-reply a list of active channels, optionally matching the specified pattern. + */ + List pubsubShardChannels(K pattern); + /** * Returns the number of subscribers (not counting clients subscribed to patterns) for the specified shard channels. * diff --git a/src/test/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubConnectionIntegrationTests.java b/src/test/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubConnectionIntegrationTests.java index 91c91c3a4c..60aa2ee907 100644 --- a/src/test/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubConnectionIntegrationTests.java +++ b/src/test/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubConnectionIntegrationTests.java @@ -100,6 +100,22 @@ void testRegularClientPubSubChannels() { assertThat(channelsOnOtherNode).isEmpty(); } + + @Test + void testRegularClientPubSubShardChannels() { + + String nodeId = pubSubConnection.sync().clusterMyId(); + RedisClusterNode otherNode = getOtherThan(nodeId); + /// TODO : uncomment after SSUBSCRIBE is implemented + // pubSubConnection.sync().ssubscribe(key); + + List channelsOnSubscribedNode = connection.getConnection(nodeId).sync().pubsubShardChannels(); + // assertThat(channelsOnSubscribedNode).hasSize(1); + + List channelsOnOtherNode = connection.getConnection(otherNode.getNodeId()).sync().pubsubShardChannels(); + assertThat(channelsOnOtherNode).isEmpty(); + } + @Test void myIdWorksAfterDisconnect() throws InterruptedException { diff --git a/src/test/java/io/lettuce/core/pubsub/PubSubCommandTest.java b/src/test/java/io/lettuce/core/pubsub/PubSubCommandTest.java index ddc07ed421..aaf15cfb5c 100644 --- a/src/test/java/io/lettuce/core/pubsub/PubSubCommandTest.java +++ b/src/test/java/io/lettuce/core/pubsub/PubSubCommandTest.java @@ -54,6 +54,7 @@ * @author Tugdual Grall * @author dengliming * @author Tihomir Mateev + * @author Ali Takavci */ class PubSubCommandTest extends AbstractRedisClientTest implements RedisPubSubListener { @@ -348,6 +349,34 @@ void pubsubNumsub() { assertThat(result).containsKeys(channel); } + @Test + @EnabledOnCommand("SPUBLISH") + void pubsubShardChannels() { + /// TODO : uncomment after SSUBSCRIBE is implemented + // TestFutures.awaitOrTimeout(pubsub.ssubscribe(channel)); + List result = redis.pubsubShardChannels(); + // assertThat(result).contains(channel); + } + + @Test + @EnabledOnCommand("SPUBLISH") + void pubsubMultipleShardChannels() { + /// TODO : uncomment after SSUBSCRIBE is implemented + // TestFutures.awaitOrTimeout(pubsub.ssubscribe(channel, "channel1", "channel3")); + List result = redis.pubsubShardChannels(); + // assertThat(result).contains(channel, "channel1", "channel3"); + + } + + @Test + @EnabledOnCommand("SPUBLISH") + void pubsubShardChannelsWithArg() { + /// TODO : uncomment after SSUBSCRIBE is implemented + // TestFutures.awaitOrTimeout(pubsub.ssubscribe(channel)); + List result = redis.pubsubShardChannels(pattern); + // assertThat(result).contains(channel); + } + @Test @EnabledOnCommand("SPUBLISH") void pubsubShardNumsub() { diff --git a/src/test/java/io/lettuce/core/pubsub/PubSubReactiveTest.java b/src/test/java/io/lettuce/core/pubsub/PubSubReactiveTest.java index b65f34c667..095661b8bd 100644 --- a/src/test/java/io/lettuce/core/pubsub/PubSubReactiveTest.java +++ b/src/test/java/io/lettuce/core/pubsub/PubSubReactiveTest.java @@ -47,6 +47,7 @@ /** * @author Mark Paluch + * @author Ali Takavci */ class PubSubReactiveTest extends AbstractRedisClientTest implements RedisPubSubListener { @@ -252,6 +253,36 @@ void pubsubNumsub() { assertThat(result).containsKeys(channel); } + @Test + @EnabledOnCommand("SPUBLISH") + void pubsubShardChannels() { + /// TODO : uncomment after SSUBSCRIBE is implemented + // block(pubsub.ssubscribe(channel)); + List result = block(pubsub2.pubsubShardChannels().collectList()); + // assertThat(result).contains(channel); + } + + @Test + @EnabledOnCommand("SPUBLISH") + void pubsubShardMultipleChannels() { + /// TODO : uncomment after SSUBSCRIBE is implemented + // StepVerifier.create(pubsub.ssubscribe(channel, "channel1", "channel3")).verifyComplete(); + + // StepVerifier.create(pubsub2.pubsubShardChannels().collectList()) + // .consumeNextWith(actual -> assertThat(actual).contains(channel, "channel1", "channel3")).verifyComplete(); + } + + @Test + @EnabledOnCommand("SPUBLISH") + void pubsubShardChannelsWithArg() { + /// TODO : uncomment after SSUBSCRIBE is implemented + // StepVerifier.create(pubsub.ssubscribe(channel)).verifyComplete(); + // Wait.untilTrue(() -> mono(pubsub2.pubsubShardChannels(pattern).filter(s -> channel.equals(s))) != null).waitOrTimeout(); + + String result = mono(pubsub2.pubsubShardChannels(pattern).filter(s -> channel.equals(s))); + // assertThat(result).isEqualToIgnoringCase(channel); + } + @Test @EnabledOnCommand("SPUBLISH") void pubsubShardNumsub() {