Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for SSUBSCRIBE #2813

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.cluster.models.partitions.Partitions;
import io.lettuce.core.cluster.models.partitions.RedisClusterNode;
import io.lettuce.core.cluster.pubsub.RedisClusterPubSubListener;
import io.lettuce.core.cluster.pubsub.RedisClusterShardedPubSubListener;
import io.lettuce.core.codec.RedisCodec;
import io.lettuce.core.pubsub.RedisPubSubAdapter;
import io.lettuce.core.pubsub.RedisPubSubListener;
Expand All @@ -28,7 +28,7 @@ class ClusterPubSubConnectionProvider<K, V> extends PooledClusterConnectionProvi

private final RedisCodec<K, V> redisCodec;

private final RedisClusterPubSubListener<K, V> notifications;
private final RedisClusterShardedPubSubListener<K, V> notifications;
atakavci marked this conversation as resolved.
Show resolved Hide resolved

/**
* Creates a new {@link ClusterPubSubConnectionProvider}.
Expand All @@ -40,7 +40,7 @@ class ClusterPubSubConnectionProvider<K, V> extends PooledClusterConnectionProvi
* @param clusterEventListener must not be {@code null}.
*/
ClusterPubSubConnectionProvider(RedisClusterClient redisClusterClient, RedisChannelWriter clusterWriter,
RedisCodec<K, V> redisCodec, RedisClusterPubSubListener<K, V> notificationTarget,
RedisCodec<K, V> redisCodec, RedisClusterShardedPubSubListener<K, V> notificationTarget,
ClusterEventListener clusterEventListener) {

super(redisClusterClient, clusterWriter, redisCodec, clusterEventListener);
Expand Down Expand Up @@ -167,6 +167,11 @@ public void punsubscribed(K pattern, long count) {
notifications.punsubscribed(getNode(), pattern, count);
}

@Override
public void ssubscribed(K channel, long count) {
notifications.ssubscribed(getNode(), channel, count);
}

private RedisClusterNode getNode() {
return nodeId != null ? getPartitions().getPartitionByNodeId(nodeId) : getPartitions().getPartition(host, port);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public static ReadOnlyCommands.ReadOnlyPredicate asPredicate() {
enum CommandName {

// Pub/Sub commands are no key-space commands so they are safe to execute on replica nodes
PUBLISH, PUBSUB, PSUBSCRIBE, PUNSUBSCRIBE, SUBSCRIBE, UNSUBSCRIBE
PUBLISH, PUBSUB, PSUBSCRIBE, PUNSUBSCRIBE, SUBSCRIBE, UNSUBSCRIBE, SSUBSCRIBE
}

}
23 changes: 22 additions & 1 deletion src/main/java/io/lettuce/core/cluster/PubSubClusterEndpoint.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,17 @@

import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Consumer;

import io.lettuce.core.ClientOptions;
import io.lettuce.core.cluster.models.partitions.RedisClusterNode;
import io.lettuce.core.cluster.pubsub.RedisClusterPubSubAdapter;
import io.lettuce.core.cluster.pubsub.RedisClusterPubSubListener;
import io.lettuce.core.cluster.pubsub.RedisClusterShardedPubSubListener;
import io.lettuce.core.pubsub.PubSubEndpoint;
import io.lettuce.core.pubsub.PubSubMessage;
import io.lettuce.core.pubsub.RedisPubSubListener;
import io.lettuce.core.pubsub.RedisShardedPubSubListener;
import io.lettuce.core.resource.ClientResources;

/**
Expand Down Expand Up @@ -45,7 +49,7 @@
clusterListeners.add(listener);
}

public RedisClusterPubSubListener<K, V> getUpstreamListener() {
public RedisClusterShardedPubSubListener<K, V> getUpstreamListener() {
return upstream;
}

Expand Down Expand Up @@ -88,6 +92,9 @@
case unsubscribe:
multicast.unsubscribed(clusterNode, output.channel(), output.count());
break;
case ssubscribe:
multicast.ssubscribed(clusterNode, output.channel(), output.count());
break;

Check warning on line 97 in src/main/java/io/lettuce/core/cluster/PubSubClusterEndpoint.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/lettuce/core/cluster/PubSubClusterEndpoint.java#L96-L97

Added lines #L96 - L97 were not covered by tests
default:
throw new UnsupportedOperationException("Operation " + output.type() + " not supported");
}
Expand Down Expand Up @@ -189,6 +196,20 @@
clusterListeners.forEach(listener -> listener.punsubscribed(node, pattern, count));
}

@Override
public void ssubscribed(RedisClusterNode node, K channel, long count) {
getListeners().forEach(listener -> {
if (listener instanceof RedisShardedPubSubListener) {
atakavci marked this conversation as resolved.
Show resolved Hide resolved
((RedisShardedPubSubListener<K, V>) listener).ssubscribed(channel, count);

Check warning on line 203 in src/main/java/io/lettuce/core/cluster/PubSubClusterEndpoint.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/lettuce/core/cluster/PubSubClusterEndpoint.java#L203

Added line #L203 was not covered by tests
}
});

Check warning on line 205 in src/main/java/io/lettuce/core/cluster/PubSubClusterEndpoint.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/lettuce/core/cluster/PubSubClusterEndpoint.java#L205

Added line #L205 was not covered by tests
clusterListeners.forEach(listener -> {
if (listener instanceof RedisClusterShardedPubSubListener) {
((RedisClusterShardedPubSubListener<K, V>) listener).ssubscribed(node, channel, count);
}
});
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
* @author Mark Paluch
* @since 4.4
*/
public class RedisClusterPubSubAdapter<K, V> implements RedisClusterPubSubListener<K, V> {
public class RedisClusterPubSubAdapter<K, V> implements RedisClusterShardedPubSubListener<K, V> {

@Override
public void message(RedisClusterNode node, K channel, V message) {
Expand Down Expand Up @@ -42,4 +42,9 @@
// empty adapter method
}

@Override
public void ssubscribed(RedisClusterNode node, K channel, long count) {
// empty adapter method
}

Check warning on line 48 in src/main/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubAdapter.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubAdapter.java#L48

Added line #L48 was not covered by tests

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package io.lettuce.core.cluster.pubsub;

import io.lettuce.core.cluster.models.partitions.RedisClusterNode;

/**
* Interface for Redis Cluster Pub/Sub listeners.
atakavci marked this conversation as resolved.
Show resolved Hide resolved
*
* @param <K> Key type.
* @param <V> Value type.
* @author Ali Takavci
* @since 4.4
*/
public interface RedisClusterShardedPubSubListener<K, V> extends RedisClusterPubSubListener<K, V>{

/**
* Subscribed to a shard channel.
*
* @param node the {@link RedisClusterNode} from which the {@code message} originates.
* @param shardChannel Shard channel
* @param count Subscription count.
*/
void ssubscribed(RedisClusterNode node, K shardChannel, long count);

}
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,11 @@ public interface NodeSelectionPubSubAsyncCommands<K, V> {
*/
AsyncExecutions<Void> unsubscribe(K... channels);

/**
* Listen for messages published to the given shard channels.
*
* @param shardChannels the channels
* @return RedisFuture&lt;Void&gt; Future to synchronize {@code subscribe} completion
*/
AsyncExecutions<Void> ssubscribe(K... shardChannels);
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,12 @@ public interface NodeSelectionPubSubReactiveCommands<K, V> {
*/
ReactiveExecutions<Void> unsubscribe(K... channels);

/**
atakavci marked this conversation as resolved.
Show resolved Hide resolved
* Listen for messages published to the given shard channels.
*
* @param shardCchannels the channels
* @return RedisFuture&lt;Void&gt; Future to synchronize {@code subscribe} completion
*/
ReactiveExecutions<Void> ssubscribe(K... shardCchannels);

}
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,12 @@ public interface NodeSelectionPubSubCommands<K, V> {
*/
Executions<Void> unsubscribe(K... channels);

/**
atakavci marked this conversation as resolved.
Show resolved Hide resolved
* Listen for messages published to the given shard channels.
*
* @param shardChannels the channels
* @return Executions Future to synchronize {@code subscribe} completion
*/
Executions<Void> ssubscribe(K... shardChannels);

}
2 changes: 1 addition & 1 deletion src/main/java/io/lettuce/core/protocol/CommandType.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public enum CommandType implements ProtocolKeyword {

// Pub/Sub

PSUBSCRIBE, PUBLISH, PUNSUBSCRIBE, SUBSCRIBE, UNSUBSCRIBE, PUBSUB,
PSUBSCRIBE, PUBLISH, PUNSUBSCRIBE, SUBSCRIBE, UNSUBSCRIBE, PUBSUB, SSUBSCRIBE,

// Sets

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,14 @@ final Command<K, V, V> unsubscribe(K... channels) {
return pubSubCommand(UNSUBSCRIBE, new PubSubOutput<>(codec), channels);
}

@SafeVarargs
final Command<K, V, V> ssubscribe(K... shardChannels) {
LettuceAssert.notEmpty(shardChannels, "Shard channels " + MUST_NOT_BE_EMPTY);

CommandArgs<K, V> args = new CommandArgs<>(codec).addKeys(shardChannels);
return createCommand(SSUBSCRIBE, new PubSubOutput<>(codec), args);
}

@SafeVarargs
final <T> Command<K, V, T> pubSubCommand(CommandType type, CommandOutput<K, V, T> output, K... keys) {
return new Command<>(type, output, new PubSubCommandArgs<>(codec).addKeys(keys));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,9 @@ private boolean shouldCompleteCommand(PubSubOutput.Type type, RedisCommand<?, ?,
case subscribe:
return commandType.equalsIgnoreCase("SUBSCRIBE");

case ssubscribe:
return commandType.equalsIgnoreCase("SSUBSCRIBE");

case psubscribe:
return commandType.equalsIgnoreCase("PSUBSCRIBE");

Expand Down
18 changes: 18 additions & 0 deletions src/main/java/io/lettuce/core/pubsub/PubSubEndpoint.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Consumer;

import io.lettuce.core.ClientOptions;
import io.lettuce.core.ConnectionState;
Expand Down Expand Up @@ -56,6 +57,8 @@ public class PubSubEndpoint<K, V> extends DefaultEndpoint {

private final Set<Wrapper<K>> channels;

private final Set<Wrapper<K>> shardChannels;

private final Set<Wrapper<K>> patterns;

private volatile boolean subscribeWritten = false;
Expand All @@ -70,13 +73,15 @@ public class PubSubEndpoint<K, V> extends DefaultEndpoint {
ALLOWED_COMMANDS_SUBSCRIBED.add(CommandType.PSUBSCRIBE.name());
ALLOWED_COMMANDS_SUBSCRIBED.add(CommandType.UNSUBSCRIBE.name());
ALLOWED_COMMANDS_SUBSCRIBED.add(CommandType.PUNSUBSCRIBE.name());
ALLOWED_COMMANDS_SUBSCRIBED.add(CommandType.SSUBSCRIBE.name());
ALLOWED_COMMANDS_SUBSCRIBED.add(CommandType.QUIT.name());
ALLOWED_COMMANDS_SUBSCRIBED.add(CommandType.PING.name());

SUBSCRIBE_COMMANDS = new HashSet<>(2, 1);

SUBSCRIBE_COMMANDS.add(CommandType.SUBSCRIBE.name());
SUBSCRIBE_COMMANDS.add(CommandType.PSUBSCRIBE.name());
SUBSCRIBE_COMMANDS.add(CommandType.SSUBSCRIBE.name());
}

/**
Expand All @@ -91,6 +96,7 @@ public PubSubEndpoint(ClientOptions clientOptions, ClientResources clientResourc

this.channels = ConcurrentHashMap.newKeySet();
this.patterns = ConcurrentHashMap.newKeySet();
this.shardChannels = ConcurrentHashMap.newKeySet();
}

/**
Expand Down Expand Up @@ -257,12 +263,21 @@ protected void notifyListeners(PubSubMessage<K, V> message) {
case unsubscribe:
listener.unsubscribed(message.channel(), message.count());
break;
case ssubscribe:
shardNotify(listener, (l) -> l.ssubscribed(message.channel(), message.count()));
break;
default:
throw new UnsupportedOperationException("Operation " + message.type() + " not supported");
}
}
}

private void shardNotify(RedisPubSubListener<K, V> listener, Consumer<RedisShardedPubSubListener<K, V>> c) {
if (listener instanceof RedisShardedPubSubListener) {
atakavci marked this conversation as resolved.
Show resolved Hide resolved
c.accept((RedisShardedPubSubListener<K, V>) listener);
}
}

private void updateInternalState(PubSubMessage<K, V> message) {
// update internal state
switch (message.type()) {
Expand All @@ -278,6 +293,9 @@ private void updateInternalState(PubSubMessage<K, V> message) {
case unsubscribe:
channels.remove(new Wrapper<>(message.channel()));
break;
case ssubscribe:
shardChannels.add(new Wrapper<>(message.channel()));
break;
default:
break;
}
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/io/lettuce/core/pubsub/PubSubOutput.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public class PubSubOutput<K, V> extends CommandOutput<K, V, V> implements PubSub

public enum Type {

message, pmessage, psubscribe, punsubscribe, subscribe, unsubscribe;
message, pmessage, psubscribe, punsubscribe, subscribe, unsubscribe, ssubscribe;

private final static Set<String> names = new HashSet<>();

Expand Down Expand Up @@ -122,6 +122,7 @@ private void handleOutput(ByteBuffer bytes) {
break;
case subscribe:
case unsubscribe:
case ssubscribe:
channel = codec.decodeKey(bytes);
break;
default:
Expand Down
7 changes: 6 additions & 1 deletion src/main/java/io/lettuce/core/pubsub/RedisPubSubAdapter.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
*
* @author Will Glozer
*/
public class RedisPubSubAdapter<K, V> implements RedisPubSubListener<K, V> {
public class RedisPubSubAdapter<K, V> implements RedisShardedPubSubListener<K, V> {

@Override
public void message(K channel, V message) {
Expand Down Expand Up @@ -59,4 +59,9 @@
// empty adapter method
}

@Override
public void ssubscribed(K shardChannel, long count) {
// empty adapter method
}

Check warning on line 65 in src/main/java/io/lettuce/core/pubsub/RedisPubSubAdapter.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/lettuce/core/pubsub/RedisPubSubAdapter.java#L65

Added line #L65 was not covered by tests

}
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,12 @@ public RedisFuture<Map<K, Long>> pubsubShardNumsub(K... shardChannels) {
return dispatch(commandBuilder.pubsubShardNumsub(shardChannels));
}

@Override
@SuppressWarnings("unchecked")
public RedisFuture<Void> ssubscribe(K... channels) {
return (RedisFuture<Void>) dispatch(commandBuilder.ssubscribe(channels));
}

@Override
@SuppressWarnings("unchecked")
public StatefulRedisPubSubConnection<K, V> getStatefulConnection() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import reactor.core.publisher.Mono;
import io.lettuce.core.RedisReactiveCommandsImpl;
import io.lettuce.core.codec.RedisCodec;
import io.lettuce.core.protocol.Command;
import io.lettuce.core.pubsub.api.reactive.ChannelMessage;
import io.lettuce.core.pubsub.api.reactive.PatternMessage;
import io.lettuce.core.pubsub.api.reactive.RedisPubSubReactiveCommands;
Expand Down Expand Up @@ -158,6 +159,15 @@ public Mono<Map<K, Long>> pubsubShardNumsub(K... shardChannels) {
return createMono(() -> commandBuilder.pubsubShardNumsub(shardChannels));
}

@Override
public Mono<Void> ssubscribe(K... shardChannels) {
return createFlux(() -> {
Command<K, V, V> c = commandBuilder.ssubscribe(shardChannels);
System.out.println("");
atakavci marked this conversation as resolved.
Show resolved Hide resolved
return c;
}).then();
}

@Override
@SuppressWarnings("unchecked")
public StatefulRedisPubSubConnection<K, V> getStatefulConnection() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package io.lettuce.core.pubsub;

/**
* Interface for Redis Pub/Sub listeners.
*
* @param <K> Key type.
* @param <V> Value type.
* @author Ali Takavci
*/
public interface RedisShardedPubSubListener<K, V> extends RedisPubSubListener<K, V>{

/**
* Subscribed to a Shard channel.
*
* @param shardChannel Shard channel
* @param count Subscription count.
*/
void ssubscribed(K chashardChannelnnel, long count);

}
Loading
Loading