From 0be0055e913cdd1b878ed5962def07580833f12b Mon Sep 17 00:00:00 2001 From: manish-manghwani Date: Sun, 24 Mar 2024 20:16:03 +0530 Subject: [PATCH] Add support for SPUBLISH #2757 --- .../io/lettuce/core/AbstractRedisAsyncCommands.java | 6 ++++++ .../lettuce/core/AbstractRedisReactiveCommands.java | 6 ++++++ .../java/io/lettuce/core/RedisCommandBuilder.java | 8 ++++++++ .../core/api/async/BaseRedisAsyncCommands.java | 11 +++++++++++ .../core/api/reactive/BaseRedisReactiveCommands.java | 11 +++++++++++ .../io/lettuce/core/api/sync/BaseRedisCommands.java | 11 +++++++++++ .../cluster/api/sync/BaseNodeSelectionCommands.java | 11 +++++++++++ .../java/io/lettuce/core/protocol/CommandType.java | 3 ++- .../io/lettuce/core/pubsub/PubSubCommandBuilder.java | 6 ++++++ .../core/pubsub/RedisPubSubAsyncCommandsImpl.java | 6 ++++++ .../core/pubsub/RedisPubSubReactiveCommandsImpl.java | 6 ++++++ .../api/coroutines/BaseRedisCoroutinesCommands.kt | 11 +++++++++++ .../api/coroutines/BaseRedisCoroutinesCommandsImpl.kt | 3 +++ 13 files changed, 98 insertions(+), 1 deletion(-) diff --git a/src/main/java/io/lettuce/core/AbstractRedisAsyncCommands.java b/src/main/java/io/lettuce/core/AbstractRedisAsyncCommands.java index 8924c27274..160eb990c7 100644 --- a/src/main/java/io/lettuce/core/AbstractRedisAsyncCommands.java +++ b/src/main/java/io/lettuce/core/AbstractRedisAsyncCommands.java @@ -56,6 +56,7 @@ * @author Tugdual Grall * @author dengliming * @author Andrey Shlykov + * @author Manish Manghwani */ @SuppressWarnings("unchecked") public abstract class AbstractRedisAsyncCommands implements RedisAclAsyncCommands, RedisHashAsyncCommands, @@ -1536,6 +1537,11 @@ public RedisFuture publish(K channel, V message) { return dispatch(commandBuilder.publish(channel, message)); } + @Override + public RedisFuture spublish(K shardChannel, V message) { + return dispatch(commandBuilder.spublish(shardChannel, message)); + } + @Override public RedisFuture> pubsubChannels() { return dispatch(commandBuilder.pubsubChannels()); diff --git a/src/main/java/io/lettuce/core/AbstractRedisReactiveCommands.java b/src/main/java/io/lettuce/core/AbstractRedisReactiveCommands.java index 310fe03185..d0f95aaefe 100644 --- a/src/main/java/io/lettuce/core/AbstractRedisReactiveCommands.java +++ b/src/main/java/io/lettuce/core/AbstractRedisReactiveCommands.java @@ -65,6 +65,7 @@ * @author Tugdual Grall * @author dengliming * @author Andrey Shlykov + * @author Manish Manghwani * @since 4.0 */ public abstract class AbstractRedisReactiveCommands @@ -1615,6 +1616,11 @@ public Mono publish(K channel, V message) { return createMono(() -> commandBuilder.publish(channel, message)); } + @Override + public Mono spublish(K channel, V message) { + return createMono(() -> commandBuilder.spublish(channel, message)); + } + @Override public Flux pubsubChannels() { return createDissolvingFlux(commandBuilder::pubsubChannels); diff --git a/src/main/java/io/lettuce/core/RedisCommandBuilder.java b/src/main/java/io/lettuce/core/RedisCommandBuilder.java index e6c9581aaf..9f00fe4134 100644 --- a/src/main/java/io/lettuce/core/RedisCommandBuilder.java +++ b/src/main/java/io/lettuce/core/RedisCommandBuilder.java @@ -53,6 +53,7 @@ * @author dengliming * @author Mikhael Sokolov * @author Tihomir Mateev + * @author Manish Manghwani */ @SuppressWarnings({ "unchecked", "varargs" }) class RedisCommandBuilder extends BaseRedisCommandBuilder { @@ -2094,6 +2095,13 @@ Command> pubsubChannels(K pattern) { return createCommand(PUBSUB, new KeyListOutput<>(codec), args); } + Command spublish(K shardChannel, V message) { + LettuceAssert.notNull(shardChannel, "Shard Channel " + MUST_NOT_BE_NULL); + + CommandArgs args = new CommandArgs<>(codec).addKey(shardChannel).addValue(message); + return createCommand(SPUBLISH, new IntegerOutput<>(codec), args); + } + Command pubsubNumpat() { CommandArgs args = new CommandArgs<>(codec).add(NUMPAT); return createCommand(PUBSUB, new IntegerOutput<>(codec), args); diff --git a/src/main/java/io/lettuce/core/api/async/BaseRedisAsyncCommands.java b/src/main/java/io/lettuce/core/api/async/BaseRedisAsyncCommands.java index 8c8b381303..c9fbc68d07 100644 --- a/src/main/java/io/lettuce/core/api/async/BaseRedisAsyncCommands.java +++ b/src/main/java/io/lettuce/core/api/async/BaseRedisAsyncCommands.java @@ -29,6 +29,7 @@ * @param Key type. * @param Value type. * @author Mark Paluch + * @author Manish Manghwani * @since 4.0 * @generated by io.lettuce.apigenerator.CreateAsyncApi */ @@ -43,6 +44,16 @@ public interface BaseRedisAsyncCommands { */ RedisFuture publish(K channel, V message); + /** + * Post a message to a Shard channel. + * + * @param shardChannel the shard channel type: key. + * @param message the message type: value. + * @return Long the number of clients that received the message. Note that in a Redis Cluster, only clients that are + * connected to the same node as the publishing client are included in the count. + */ + RedisFuture spublish(K shardChannel, V message); + /** * Lists the currently *active channels*. * diff --git a/src/main/java/io/lettuce/core/api/reactive/BaseRedisReactiveCommands.java b/src/main/java/io/lettuce/core/api/reactive/BaseRedisReactiveCommands.java index 27eb0672b9..5219a6753e 100644 --- a/src/main/java/io/lettuce/core/api/reactive/BaseRedisReactiveCommands.java +++ b/src/main/java/io/lettuce/core/api/reactive/BaseRedisReactiveCommands.java @@ -29,6 +29,7 @@ * @param Key type. * @param Value type. * @author Mark Paluch + * @author Manish Manghwani * @since 4.0 * @generated by io.lettuce.apigenerator.CreateReactiveApi */ @@ -43,6 +44,16 @@ public interface BaseRedisReactiveCommands { */ Mono publish(K channel, V message); + /** + * Post a message to a shard channel. + * + * @param shardChannel the channel type: key. + * @param message the message type: value. + * @return Long the number of clients that received the message. Note that in a Redis Cluster, only clients that are + * connected to the same node as the publishing client are included in the count. + */ + Mono spublish(K shardChannel, V message); + /** * Lists the currently *active channels*. * diff --git a/src/main/java/io/lettuce/core/api/sync/BaseRedisCommands.java b/src/main/java/io/lettuce/core/api/sync/BaseRedisCommands.java index c0295bfc2d..294d5f5b8f 100644 --- a/src/main/java/io/lettuce/core/api/sync/BaseRedisCommands.java +++ b/src/main/java/io/lettuce/core/api/sync/BaseRedisCommands.java @@ -28,6 +28,7 @@ * @param Key type. * @param Value type. * @author Mark Paluch + * @author Manish Manghwani * @since 4.0 * @generated by io.lettuce.apigenerator.CreateSyncApi */ @@ -42,6 +43,16 @@ public interface BaseRedisCommands { */ Long publish(K channel, V message); + /** + * Post a message to a channel. + * + * @param shardChannel the shard channel type: key. + * @param message the message type: value. + * @return Long the number of clients that received the message. Note that in a Redis Cluster, only clients that are + * connected to the same node as the publishing client are included in the count. + */ + Long spublish(K shardChannel, V message); + /** * Lists the currently *active channels*. * diff --git a/src/main/java/io/lettuce/core/cluster/api/sync/BaseNodeSelectionCommands.java b/src/main/java/io/lettuce/core/cluster/api/sync/BaseNodeSelectionCommands.java index 9318dbd0d0..3251e8c9e7 100644 --- a/src/main/java/io/lettuce/core/cluster/api/sync/BaseNodeSelectionCommands.java +++ b/src/main/java/io/lettuce/core/cluster/api/sync/BaseNodeSelectionCommands.java @@ -24,6 +24,7 @@ * @param Key type. * @param Value type. * @author Mark Paluch + * @author Manish Manghwani * @since 4.0 * @generated by io.lettuce.apigenerator.CreateSyncNodeSelectionClusterApi */ @@ -38,6 +39,16 @@ public interface BaseNodeSelectionCommands { */ Executions publish(K channel, V message); + /** + * Post a message to a channel. + * + * @param shardChannel the channel type: key. + * @param message the message type: value. + * @return Long the number of clients that received the message. Note that in a Redis Cluster, only clients that are + * connected to the same node as the publishing client are included in the count. + */ + Executions spublish(K shardChannel, V message); + /** * Lists the currently *active channels*. * diff --git a/src/main/java/io/lettuce/core/protocol/CommandType.java b/src/main/java/io/lettuce/core/protocol/CommandType.java index cdfe8d0dd5..a9b98a1e83 100644 --- a/src/main/java/io/lettuce/core/protocol/CommandType.java +++ b/src/main/java/io/lettuce/core/protocol/CommandType.java @@ -25,6 +25,7 @@ * @author Zhang Jessey * @author dengliming * @author Mikhael Sokolov + * @author Manish Manghwani */ public enum CommandType implements ProtocolKeyword { @@ -70,7 +71,7 @@ public enum CommandType implements ProtocolKeyword { // Pub/Sub - PSUBSCRIBE, PUBLISH, PUNSUBSCRIBE, SUBSCRIBE, UNSUBSCRIBE, PUBSUB, + PSUBSCRIBE, PUBLISH, SPUBLISH, PUNSUBSCRIBE, SUBSCRIBE, UNSUBSCRIBE, PUBSUB, // Sets diff --git a/src/main/java/io/lettuce/core/pubsub/PubSubCommandBuilder.java b/src/main/java/io/lettuce/core/pubsub/PubSubCommandBuilder.java index db22a9dcd5..1fd6aaddb0 100644 --- a/src/main/java/io/lettuce/core/pubsub/PubSubCommandBuilder.java +++ b/src/main/java/io/lettuce/core/pubsub/PubSubCommandBuilder.java @@ -37,6 +37,7 @@ * * @author Mark Paluch * @author Tihomir Mateev + * @author Manish Manghwani * @since 4.2 */ @SuppressWarnings("varargs") @@ -53,6 +54,11 @@ Command publish(K channel, V message) { return createCommand(PUBLISH, new IntegerOutput<>(codec), args); } + Command spublish(K shardChannel, V message) { + CommandArgs args = new PubSubCommandArgs<>(codec).addKey(shardChannel).addValue(message); + return createCommand(SPUBLISH, new IntegerOutput<>(codec), args); + } + Command> pubsubChannels(K pattern) { CommandArgs args = new PubSubCommandArgs<>(codec).add(CHANNELS).addKey(pattern); return createCommand(PUBSUB, new KeyListOutput<>(codec), args); diff --git a/src/main/java/io/lettuce/core/pubsub/RedisPubSubAsyncCommandsImpl.java b/src/main/java/io/lettuce/core/pubsub/RedisPubSubAsyncCommandsImpl.java index 595c92c5c3..330d09b7d0 100644 --- a/src/main/java/io/lettuce/core/pubsub/RedisPubSubAsyncCommandsImpl.java +++ b/src/main/java/io/lettuce/core/pubsub/RedisPubSubAsyncCommandsImpl.java @@ -30,6 +30,7 @@ * @param Value type. * @author Will Glozer * @author Mark Paluch + * @author Manish Manghwani */ public class RedisPubSubAsyncCommandsImpl extends RedisAsyncCommandsImpl implements RedisPubSubAsyncCommands { @@ -75,6 +76,11 @@ public RedisFuture publish(K channel, V message) { return dispatch(commandBuilder.publish(channel, message)); } + @Override + public RedisFuture spublish(K shardChannel, V message) { + return dispatch(commandBuilder.spublish(shardChannel, message)); + } + @Override public RedisFuture> pubsubChannels(K channel) { return dispatch(commandBuilder.pubsubChannels(channel)); diff --git a/src/main/java/io/lettuce/core/pubsub/RedisPubSubReactiveCommandsImpl.java b/src/main/java/io/lettuce/core/pubsub/RedisPubSubReactiveCommandsImpl.java index dba1a4b258..97eaecc742 100644 --- a/src/main/java/io/lettuce/core/pubsub/RedisPubSubReactiveCommandsImpl.java +++ b/src/main/java/io/lettuce/core/pubsub/RedisPubSubReactiveCommandsImpl.java @@ -32,6 +32,7 @@ * @param Key type. * @param Value type. * @author Mark Paluch + * @author Manish Manghwani * @since 5.0 */ public class RedisPubSubReactiveCommandsImpl extends RedisReactiveCommandsImpl @@ -133,6 +134,11 @@ public Mono publish(K channel, V message) { return createMono(() -> commandBuilder.publish(channel, message)); } + @Override + public Mono spublish(K shardChannel, V message) { + return createMono(() -> commandBuilder.spublish(shardChannel, message)); + } + @Override public Flux pubsubChannels(K channel) { return createDissolvingFlux(() -> commandBuilder.pubsubChannels(channel)); diff --git a/src/main/kotlin/io/lettuce/core/api/coroutines/BaseRedisCoroutinesCommands.kt b/src/main/kotlin/io/lettuce/core/api/coroutines/BaseRedisCoroutinesCommands.kt index f8c7639252..571203c823 100644 --- a/src/main/kotlin/io/lettuce/core/api/coroutines/BaseRedisCoroutinesCommands.kt +++ b/src/main/kotlin/io/lettuce/core/api/coroutines/BaseRedisCoroutinesCommands.kt @@ -28,6 +28,7 @@ import kotlinx.coroutines.flow.Flow * @param Key type. * @param Value type. * @author Mikhael Sokolov + * @author Manish Manghwani * @since 6.0 * @generated by io.lettuce.apigenerator.CreateKotlinCoroutinesApi */ @@ -43,6 +44,16 @@ interface BaseRedisCoroutinesCommands { */ suspend fun publish(channel: K, message: V): Long? + /** + * Post a message to a channel. + * + * @param shardChannel the shard channel type: key. + * @param message the message type: value. + * @return Long the number of clients that received the message. Note that in a Redis Cluster, only clients that are + * connected to the same node as the publishing client are included in the count. + */ + suspend fun spublish(shardChannel: K, message: V): Long? + /** * Lists the currently *active channels*. * diff --git a/src/main/kotlin/io/lettuce/core/api/coroutines/BaseRedisCoroutinesCommandsImpl.kt b/src/main/kotlin/io/lettuce/core/api/coroutines/BaseRedisCoroutinesCommandsImpl.kt index 4d4e1ff6da..fac49d6642 100644 --- a/src/main/kotlin/io/lettuce/core/api/coroutines/BaseRedisCoroutinesCommandsImpl.kt +++ b/src/main/kotlin/io/lettuce/core/api/coroutines/BaseRedisCoroutinesCommandsImpl.kt @@ -34,6 +34,7 @@ import kotlinx.coroutines.reactive.awaitSingle * @param Key type. * @param Value type. * @author Mikhael Sokolov + * @author Manish Manghwani * @since 6.0 */ @ExperimentalLettuceCoroutinesApi @@ -41,6 +42,8 @@ internal class BaseRedisCoroutinesCommandsImpl(internal val op override suspend fun publish(channel: K, message: V): Long? = ops.publish(channel, message).awaitFirstOrNull() + override suspend fun spublish(shardChannel: K, message: V): Long? = ops.spublish(shardChannel, message).awaitFirstOrNull() + override suspend fun pubsubChannels(): List = ops.pubsubChannels().asFlow().toList() override suspend fun pubsubChannels(channel: K): List = ops.pubsubChannels(channel).asFlow().toList()