Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for SPUBLISH #2838

Merged
merged 4 commits into from
May 2, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/main/java/io/lettuce/core/AbstractRedisAsyncCommands.java
Original file line number Diff line number Diff line change
Expand Up @@ -1582,6 +1582,11 @@ public RedisFuture<Long> publish(K channel, V message) {
return dispatch(commandBuilder.publish(channel, message));
}

@Override
atakavci marked this conversation as resolved.
Show resolved Hide resolved
public RedisFuture<Long> spublish(K shardChannel, V message) {
return dispatch(commandBuilder.spublish(shardChannel, message));
}

@Override
public RedisFuture<List<K>> pubsubChannels() {
return dispatch(commandBuilder.pubsubChannels());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1661,6 +1661,11 @@
return createMono(() -> commandBuilder.publish(channel, message));
}

@Override
public Mono<Long> spublish(K shardChannel, V message) {
return createMono(() -> commandBuilder.spublish(shardChannel, message));

Check warning on line 1666 in src/main/java/io/lettuce/core/AbstractRedisReactiveCommands.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/lettuce/core/AbstractRedisReactiveCommands.java#L1666

Added line #L1666 was not covered by tests
}

@Override
public Flux<K> pubsubChannels() {
return createDissolvingFlux(commandBuilder::pubsubChannels);
Expand Down
7 changes: 7 additions & 0 deletions src/main/java/io/lettuce/core/RedisCommandBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -2156,6 +2156,13 @@ Command<K, V, Long> publish(K channel, V message) {
return createCommand(PUBLISH, new IntegerOutput<>(codec), args);
}

Command<K, V, Long> spublish(K shardChannel, V message) {
LettuceAssert.notNull(shardChannel, "ShardChannel " + MUST_NOT_BE_NULL);

CommandArgs<K, V> args = new CommandArgs<>(codec).addKey(shardChannel).addValue(message);
return createCommand(SPUBLISH, new IntegerOutput<>(codec), args);
}

Command<K, V, List<K>> pubsubChannels() {
CommandArgs<K, V> args = new CommandArgs<>(codec).add(CHANNELS);
return createCommand(PUBSUB, new KeyListOutput<>(codec), args);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,17 @@ public interface BaseRedisAsyncCommands<K, V> {
*/
RedisFuture<Long> publish(K channel, V message);

/**
* Post a message to a shard channel.
*
* @param shardChannel the shard channel type: key.
* @param message the message type: value.
* @return Long integer-reply the number of clients that received the message.
* @since 7.0
*/
RedisFuture<Long> spublish(K shardChannel, V message);


/**
* Lists the currently *active channels*.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,17 @@ public interface BaseRedisReactiveCommands<K, V> {
*/
Mono<Long> publish(K channel, V message);

/**
* Post a message to a shard channel.
*
* @param shardChannel the shard channel type: key.
* @param message the message type: value.
* @return Long integer-reply the number of clients that received the message.
* @since 7.0
*/
Mono<Long> spublish(K shardChannel, V message);


/**
* Lists the currently *active channels*.
*
Expand Down
10 changes: 10 additions & 0 deletions src/main/java/io/lettuce/core/api/sync/BaseRedisCommands.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,16 @@ public interface BaseRedisCommands<K, V> {
*/
Long publish(K channel, V message);

/**
* Post a message to a shard channel.
*
* @param shardChannel the shard channel type: key.
* @param message the message type: value.
* @return Long integer-reply the number of clients that received the message.
* @since 7.0
*/
Long spublish(K shardChannel, V message);

/**
* Lists the currently *active channels*.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,11 @@ public void punsubscribed(K pattern, long count) {
notifications.punsubscribed(getNode(), pattern, count);
}

@Override
public void smessage(K shardChannel, V message) {
notifications.smessage(getNode(), shardChannel, message);
}

@Override
public void ssubscribed(K channel, long count) {
notifications.ssubscribed(getNode(), channel, count);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@
case unsubscribe:
multicast.unsubscribed(clusterNode, output.channel(), output.count());
break;
case smessage:
multicast.smessage(clusterNode, output.channel(), output.body());
break;

Check warning on line 93 in src/main/java/io/lettuce/core/cluster/PubSubClusterEndpoint.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/lettuce/core/cluster/PubSubClusterEndpoint.java#L92-L93

Added lines #L92 - L93 were not covered by tests
case ssubscribe:
multicast.ssubscribed(clusterNode, output.channel(), output.count());
break;
Expand Down Expand Up @@ -192,6 +195,12 @@
clusterListeners.forEach(listener -> listener.punsubscribed(node, pattern, count));
}

@Override
public void smessage(RedisClusterNode node, K shardChannel, V message) {
getListeners().forEach(listener -> listener.smessage(shardChannel, message));
clusterListeners.forEach(listener -> listener.smessage(node, shardChannel, message));
}

@Override
public void ssubscribed(RedisClusterNode node, K channel, long count) {
getListeners().forEach(listener -> listener.ssubscribed(channel, count));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,17 @@ public interface BaseNodeSelectionAsyncCommands<K, V> {
*/
AsyncExecutions<Long> publish(K channel, V message);

/**
* Post a message to a shard channel.
*
* @param shardChannel the shard channel type: key.
* @param message the message type: value.
* @return Long integer-reply the number of clients that received the message.
* @since 7.0
*/
AsyncExecutions<Long> spublish(K shardChannel, V message);


/**
* Lists the currently *active channels*.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,17 @@ public interface BaseNodeSelectionCommands<K, V> {
*/
Executions<Long> publish(K channel, V message);

/**
* Post a message to a shard channel.
*
* @param shardChannel the shard channel type: key.
* @param message the message type: value.
* @return Long integer-reply the number of clients that received the message.
* @since 7.0
*/
Executions<Long> spublish(K shardChannel, V message);


/**
* Lists the currently *active channels*.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@
// empty adapter method
}

@Override
public void smessage(RedisClusterNode node, K shardChannel, V message) {
// empty adapter method
}

Check warning on line 48 in src/main/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubAdapter.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubAdapter.java#L48

Added line #L48 was not covered by tests

@Override
public void ssubscribed(RedisClusterNode node, K channel, long count) {
// empty adapter method
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,18 @@
*/
void punsubscribed(RedisClusterNode node, K pattern, long count);

/**
* Message received from a shard channel subscription.
*
* @param node the {@link RedisClusterNode} from which the {@code message} originates.
* @param shardChannel shard channel.
* @param message Message.
* @since 7.0
*/
default void smessage(RedisClusterNode node, K shardChannel, V message){
message(node, shardChannel, message);
}

Check warning on line 80 in src/main/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubListener.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubListener.java#L79-L80

Added lines #L79 - L80 were not covered by tests

/**
* Subscribed to a shard channel.
*
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/lettuce/core/protocol/CommandType.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public enum CommandType implements ProtocolKeyword {

// Pub/Sub

PSUBSCRIBE, PUBLISH, PUNSUBSCRIBE, SUBSCRIBE, UNSUBSCRIBE, PUBSUB, SSUBSCRIBE,
PSUBSCRIBE, PUBLISH, PUNSUBSCRIBE, SUBSCRIBE, UNSUBSCRIBE, PUBSUB, SSUBSCRIBE, SPUBLISH,

// Sets

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ Command<K, V, Long> publish(K channel, V message) {
return createCommand(PUBLISH, new IntegerOutput<>(codec), args);
}

Command<K, V, Long> spublish(K shardChannel, V message) {
CommandArgs<K, V> args = new CommandArgs<>(codec).addKey(shardChannel).addValue(message);
return createCommand(SPUBLISH, new IntegerOutput<>(codec), args);
}

Command<K, V, List<K>> pubsubChannels(K pattern) {
CommandArgs<K, V> args = new PubSubCommandArgs<>(codec).add(CHANNELS).addKey(pattern);
return createCommand(PUBSUB, new KeyListOutput<>(codec), args);
Expand Down
3 changes: 3 additions & 0 deletions src/main/java/io/lettuce/core/pubsub/PubSubEndpoint.java
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,9 @@ protected void notifyListeners(PubSubMessage<K, V> message) {
case unsubscribe:
listener.unsubscribed(message.channel(), message.count());
break;
case smessage:
listener.smessage(message.channel(), message.body());
break;
case ssubscribe:
listener.ssubscribed(message.channel(), message.count());
break;
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/io/lettuce/core/pubsub/PubSubOutput.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public class PubSubOutput<K, V> extends CommandOutput<K, V, V> implements PubSub

public enum Type {

message, pmessage, psubscribe, punsubscribe, subscribe, unsubscribe, ssubscribe;
message, pmessage, psubscribe, punsubscribe, subscribe, unsubscribe, ssubscribe, smessage;

private final static Set<String> names = new HashSet<>();

Expand Down Expand Up @@ -108,6 +108,7 @@ private void handleOutput(ByteBuffer bytes) {
pattern = codec.decodeKey(bytes);
break;
}
case smessage:
case message:
if (channel == null) {
channel = codec.decodeKey(bytes);
Expand Down
5 changes: 5 additions & 0 deletions src/main/java/io/lettuce/core/pubsub/RedisPubSubAdapter.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@
// empty adapter method
}

@Override
public void smessage(K shardChannel, V message) {
// empty adapter method
}

Check warning on line 65 in src/main/java/io/lettuce/core/pubsub/RedisPubSubAdapter.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/lettuce/core/pubsub/RedisPubSubAdapter.java#L65

Added line #L65 was not covered by tests

@Override
public void ssubscribed(K shardChannel, long count) {
// empty adapter method
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@ public RedisFuture<Long> publish(K channel, V message) {
return dispatch(commandBuilder.publish(channel, message));
}

@Override
public RedisFuture<Long> spublish(K shardChannel, V message) {
return dispatch(commandBuilder.spublish(shardChannel, message));
}

@Override
public RedisFuture<List<K>> pubsubChannels(K channel) {
return dispatch(commandBuilder.pubsubChannels(channel));
Expand Down
11 changes: 11 additions & 0 deletions src/main/java/io/lettuce/core/pubsub/RedisPubSubListener.java
Original file line number Diff line number Diff line change
Expand Up @@ -88,4 +88,15 @@
subscribed(shardChannel, count);
}

/**
* Message received from a shard channel subscription.
*
* @param shardChannel shard channel.
* @param message Message.
* @since 7.0
*/
default void smessage(K shardChannel, V message) {
message(shardChannel, message);
}

Check warning on line 100 in src/main/java/io/lettuce/core/pubsub/RedisPubSubListener.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/lettuce/core/pubsub/RedisPubSubListener.java#L99-L100

Added lines #L99 - L100 were not covered by tests

}
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,11 @@
return createMono(() -> commandBuilder.publish(channel, message));
}

@Override
public Mono<Long> spublish(K shardChannel, V message) {
return createMono(() -> commandBuilder.publish(shardChannel, message));

Check warning on line 144 in src/main/java/io/lettuce/core/pubsub/RedisPubSubReactiveCommandsImpl.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/lettuce/core/pubsub/RedisPubSubReactiveCommandsImpl.java#L144

Added line #L144 was not covered by tests
}

@Override
public Flux<K> pubsubChannels(K channel) {
return createDissolvingFlux(() -> commandBuilder.pubsubChannels(channel));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,17 @@ interface BaseRedisCoroutinesCommands<K : Any, V : Any> {
*/
suspend fun publish(channel: K, message: V): Long?

/**
* Post a message to a shard channel.
*
* @param shardChannel the shard channel type: key.
* @param message the message type: value.
* @return Long integer-reply the number of clients that received the message.
* @since 7.0
*/
suspend fun spublish(shardChannel: K, message: V): Long?


/**
* Lists the currently *active channels*.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@

override suspend fun publish(channel: K, message: V): Long? = ops.publish(channel, message).awaitFirstOrNull()

override suspend fun spublish(shardChannel: K, message: V): Long? = ops.spublish(shardChannel, message).awaitFirstOrNull()

Check warning on line 49 in src/main/kotlin/io/lettuce/core/api/coroutines/BaseRedisCoroutinesCommandsImpl.kt

View check run for this annotation

Codecov / codecov/patch

src/main/kotlin/io/lettuce/core/api/coroutines/BaseRedisCoroutinesCommandsImpl.kt#L49

Added line #L49 was not covered by tests

override suspend fun pubsubChannels(): List<K> = ops.pubsubChannels().asFlow().toList()

override suspend fun pubsubChannels(channel: K): List<K> = ops.pubsubChannels(channel).asFlow().toList()
Expand Down
10 changes: 10 additions & 0 deletions src/main/templates/io/lettuce/core/api/BaseRedisCommands.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,16 @@ public interface BaseRedisCommands<K, V> {
*/
Long publish(K channel, V message);

/**
* Post a message to a shard channel.
*
* @param shardChannel the shard channel type: key.
* @param message the message type: value.
* @return Long integer-reply the number of clients that received the message.
* @since 7.0
*/
Long spublish(K shardChannel, V message);

/**
* Lists the currently *active channels*.
*
Expand Down
Loading
Loading