Skip to content

Commit

Permalink
- RedisPubSubListener.ssubscribed and RedisClusterPubSubListener.ssub…
Browse files Browse the repository at this point in the history
…scribed delegates to subscribe

- use of pubsubtestlistener instead of class level listener implementation
- some format fixing
  • Loading branch information
atakavci committed Apr 5, 2024
1 parent e671633 commit b26e244
Show file tree
Hide file tree
Showing 12 changed files with 48 additions and 129 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,10 @@ public interface RedisClusterPubSubListener<K, V> {
* @param node the {@link RedisClusterNode} from which the {@code message} originates.
* @param shardChannel Shard channel
* @param count Subscription count.
* @since 7.0
*/
default void ssubscribed(RedisClusterNode node, K shardChannel, long count){
throw new UnsupportedOperationException(
"This operation is not available by default! Use one of available pub/sub adapters or override this method in your class.");
default void ssubscribed(RedisClusterNode node, K shardChannel, long count) {
subscribed(node, shardChannel, count);
}

Check warning on line 80 in src/main/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubListener.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubListener.java#L79-L80

Added lines #L79 - L80 were not covered by tests

}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public interface NodeSelectionPubSubAsyncCommands<K, V> {
*
* @param shardChannels the channels
* @return RedisFuture&lt;Void&gt; Future to synchronize {@code subscribe} completion
* @since 7.0
*/
AsyncExecutions<Void> ssubscribe(K... shardChannels);
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,12 @@ public interface NodeSelectionPubSubReactiveCommands<K, V> {
*/
ReactiveExecutions<Void> unsubscribe(K... channels);

/**
/**
* Listen for messages published to the given shard channels.
*
* @param shardCchannels the channels
* @return RedisFuture&lt;Void&gt; Future to synchronize {@code subscribe} completion
* @since 7.0
*/
ReactiveExecutions<Void> ssubscribe(K... shardCchannels);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,12 @@ public interface NodeSelectionPubSubCommands<K, V> {
*/
Executions<Void> unsubscribe(K... channels);

/**
/**
* Listen for messages published to the given shard channels.
*
* @param shardChannels the channels
* @return Executions Future to synchronize {@code subscribe} completion
* @since 7.0
*/
Executions<Void> ssubscribe(K... shardChannels);

Expand Down
7 changes: 4 additions & 3 deletions src/main/java/io/lettuce/core/pubsub/RedisPubSubListener.java
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,10 @@ public interface RedisPubSubListener<K, V> {
*
* @param shardChannel Shard channel
* @param count Subscription count.
* @since 7.0
*/
default void ssubscribed(K chashardChannelnnel, long count) {
throw new UnsupportedOperationException(
"This operation is not available by default. Use one of available pub/sub adapters or override this method in your class!");
default void ssubscribed(K shardChannel, long count) {
subscribed(shardChannel, count);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -161,11 +161,7 @@ public Mono<Map<K, Long>> pubsubShardNumsub(K... shardChannels) {

@Override
public Mono<Void> ssubscribe(K... shardChannels) {
return createFlux(() -> {
Command<K, V, V> c = commandBuilder.ssubscribe(shardChannels);
System.out.println("");
return c;
}).then();
return createFlux(() -> commandBuilder.ssubscribe(shardChannels)).then();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public interface RedisPubSubAsyncCommands<K, V> extends RedisAsyncCommands<K, V>
*
* @param shardChannels the shard channels
* @return RedisFuture&lt;Void&gt; Future to synchronize {@code subscribe} completion
* @since 7.0
*/
RedisFuture<Void> ssubscribe(K... shardChannels);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ public interface RedisPubSubReactiveCommands<K, V> extends RedisReactiveCommands
*
* @param shardChannels the channels.
* @return Mono&lt;Void&gt; Mono for {@code subscribe} command.
* @since 7.0
*/
Mono<Void> ssubscribe(K... shardChannels);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public interface RedisPubSubCommands<K, V> extends RedisCommands<K, V> {
* Listen for messages published to the given shard channels.
*
* @param shardChannels the channels
* @since 7.0
*/
void ssubscribe(K... shardChannels);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,13 @@
import io.lettuce.test.LettuceExtension;
import io.lettuce.test.TestFutures;
import io.lettuce.test.Wait;
import io.lettuce.test.condition.EnabledOnCommand;

/**
* @author Mark Paluch
*/
@ExtendWith(LettuceExtension.class)
class RedisClusterPubSubConnectionIntegrationTests extends TestSupport implements RedisClusterPubSubListener<String, String> {
class RedisClusterPubSubConnectionIntegrationTests extends TestSupport {

private final RedisClusterClient clusterClient;

Expand Down Expand Up @@ -113,29 +114,23 @@ void testRegularClientPubSubShardChannels() {
}

@Test
@EnabledOnCommand("SSUBSCRIBE")
void subscribeToShardChannel() throws Exception {
pubSubConnection.addListener(this);
pubSubConnection.addListener(connectionListener);
pubSubConnection.async().ssubscribe(shardChannel);
assertThat(shardChannels.poll(3, TimeUnit.SECONDS)).isEqualTo(shardChannel);
assertThat(connectionListener.getChannels().poll(3, TimeUnit.SECONDS)).isEqualTo(shardChannel);
}

@Test
@EnabledOnCommand("SSUBSCRIBE")
void subscribeToShardChannelViaOtherEndpoint() throws Exception {
pubSubConnection.addListener(this);
pubSubConnection.addListener(connectionListener);
RedisClusterPubSubAsyncCommands<String, String> pubSub = pubSubConnection.async();
String nodeId = getNodeId(pubSub);
String nodeId = pubSub.clusterMyId().get();
RedisPubSubAsyncCommands<String, String> other = pubSub
.nodes(node -> node.getRole().isUpstream() && !node.getNodeId().equals(nodeId)).commands(0);
other.ssubscribe(shardChannel);
assertThat(shardChannels.poll(3, TimeUnit.SECONDS)).isEqualTo(shardChannel);
}

private String getNodeId(RedisClusterPubSubAsyncCommands<String, String> clusterPubsub2) {
try {
return clusterPubsub2.clusterMyId().get();
} catch (Exception e) {
throw new RuntimeException(e);
}
assertThat(connectionListener.getChannels().poll(3, TimeUnit.SECONDS)).isEqualTo(shardChannel);
}

@Test
Expand Down Expand Up @@ -391,35 +386,4 @@ private RedisClusterNode getOtherThan(String nodeId) {
throw new IllegalStateException("No other nodes than " + nodeId + " available");
}

// RedisClusterShardedPubSubListener implementation

@Override
public void message(RedisClusterNode node, String channel, String message) {
}

@Override
public void message(RedisClusterNode node, String pattern, String channel, String message) {
}

@Override
public void subscribed(RedisClusterNode node, String channel, long count) {
}

@Override
public void psubscribed(RedisClusterNode node, String pattern, long count) {
}

@Override
public void unsubscribed(RedisClusterNode node, String channel, long count) {
}

@Override
public void punsubscribed(RedisClusterNode node, String pattern, long count) {
}

@Override
public void ssubscribed(RedisClusterNode node, String shardChannel, long count) {
shardChannels.add(shardChannel);
}

}
85 changes: 18 additions & 67 deletions src/test/java/io/lettuce/core/pubsub/PubSubCommandTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,19 +39,15 @@
import io.lettuce.core.*;
import io.lettuce.core.api.async.RedisAsyncCommands;
import io.lettuce.core.api.push.PushMessage;
import io.lettuce.core.cluster.models.partitions.RedisClusterNode;
import io.lettuce.core.cluster.pubsub.RedisClusterPubSubListener;
import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection;
import io.lettuce.core.cluster.pubsub.api.async.RedisClusterPubSubAsyncCommands;
import io.lettuce.core.internal.LettuceFactories;
import io.lettuce.core.protocol.ProtocolVersion;
import io.lettuce.core.pubsub.api.async.RedisPubSubAsyncCommands;
import io.lettuce.core.support.PubSubTestListener;
import io.lettuce.test.Delay;
import io.lettuce.test.TestFutures;
import io.lettuce.test.Wait;
import io.lettuce.test.WithPassword;
import io.lettuce.test.condition.EnabledOnCommand;
import io.lettuce.test.resource.DefaultRedisClusterClient;
import io.lettuce.test.resource.FastShutdown;
import io.lettuce.test.resource.TestClientResources;

Expand All @@ -65,18 +61,19 @@
* @author Tihomir Mateev
* @author Ali Takavci
*/
class PubSubCommandTest extends AbstractRedisClientTest
implements RedisPubSubListener<String, String> {
class PubSubCommandTest extends AbstractRedisClientTest {

RedisPubSubAsyncCommands<String, String> pubsub;

BlockingQueue<String> channels;
PubSubTestListener listener = new PubSubTestListener();

BlockingQueue<String> patterns;
BlockingQueue<String> channels = listener.getChannels();

BlockingQueue<String> messages;
BlockingQueue<String> patterns = listener.getPatterns();

BlockingQueue<Long> counts;
BlockingQueue<String> messages = listener.getMessages();

BlockingQueue<Long> counts = listener.getCounts();

String channel = "channel0";

Expand All @@ -92,12 +89,12 @@ void openPubSubConnection() {

client.setOptions(getOptions());
pubsub = client.connectPubSub().async();
pubsub.getStatefulConnection().addListener(this);
pubsub.getStatefulConnection().addListener(listener);
} finally {
channels = LettuceFactories.newBlockingQueue();
patterns = LettuceFactories.newBlockingQueue();
messages = LettuceFactories.newBlockingQueue();
counts = LettuceFactories.newBlockingQueue();
channels.clear();
patterns.clear();
messages.clear();
counts.clear();
}
}

Expand All @@ -119,7 +116,7 @@ void auth() {
client.setOptions(
ClientOptions.builder().protocolVersion(ProtocolVersion.RESP2).pingBeforeActivateConnection(false).build());
RedisPubSubAsyncCommands<String, String> connection = client.connectPubSub().async();
connection.getStatefulConnection().addListener(PubSubCommandTest.this);
connection.getStatefulConnection().addListener(listener);
connection.auth(passwd);

connection.subscribe(channel);
Expand All @@ -135,7 +132,7 @@ void authWithUsername() {
client.setOptions(
ClientOptions.builder().protocolVersion(ProtocolVersion.RESP2).pingBeforeActivateConnection(false).build());
RedisPubSubAsyncCommands<String, String> connection = client.connectPubSub().async();
connection.getStatefulConnection().addListener(PubSubCommandTest.this);
connection.getStatefulConnection().addListener(listener);
connection.auth(username, passwd);

connection.subscribe(channel);
Expand All @@ -152,7 +149,7 @@ void authWithReconnect() {
ClientOptions.builder().protocolVersion(ProtocolVersion.RESP2).pingBeforeActivateConnection(false).build());

RedisPubSubAsyncCommands<String, String> connection = client.connectPubSub().async();
connection.getStatefulConnection().addListener(PubSubCommandTest.this);
connection.getStatefulConnection().addListener(listener);
connection.auth(passwd);

connection.clientSetname("authWithReconnect");
Expand Down Expand Up @@ -181,7 +178,7 @@ void authWithUsernameAndReconnect() {
ClientOptions.builder().protocolVersion(ProtocolVersion.RESP2).pingBeforeActivateConnection(false).build());

RedisPubSubAsyncCommands<String, String> connection = client.connectPubSub().async();
connection.getStatefulConnection().addListener(PubSubCommandTest.this);
connection.getStatefulConnection().addListener(listener);
connection.auth(username, passwd);
connection.clientSetname("authWithReconnect");
connection.subscribe(channel).get();
Expand Down Expand Up @@ -222,7 +219,6 @@ void message() throws Exception {
assertThat(messages.take()).isEqualTo(message);
}


@Test
@EnabledOnCommand("ACL")
void messageAsPushMessage() throws Exception {
Expand Down Expand Up @@ -541,7 +537,7 @@ void removeListener() throws Exception {
assertThat(channels.take()).isEqualTo(channel);
assertThat(messages.take()).isEqualTo(message);

pubsub.getStatefulConnection().removeListener(this);
pubsub.getStatefulConnection().removeListener(listener);

redis.publish(channel, message);
assertThat(channels.poll(10, TimeUnit.MILLISECONDS)).isNull();
Expand All @@ -557,49 +553,4 @@ void echoAllowedInSubscriptionState() {
pubsub.unsubscribe(channel);
}

// RedisPubSubListener implementation

@Override
public void message(String channel, String message) {
channels.add(channel);
messages.add(message);
}

@Override
public void message(String pattern, String channel, String message) {
patterns.add(pattern);
channels.add(channel);
messages.add(message);
}

@Override
public void subscribed(String channel, long count) {
channels.add(channel);
counts.add(count);
}

@Override
public void psubscribed(String pattern, long count) {
patterns.add(pattern);
counts.add(count);
}

@Override
public void unsubscribed(String channel, long count) {
channels.add(channel);
counts.add(count);
}

@Override
public void punsubscribed(String pattern, long count) {
patterns.add(pattern);
counts.add(count);
}

@Override
public void ssubscribed(String shardChannel, long count) {
channels.add(shardChannel);
counts.add(count);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -202,17 +202,18 @@ void pubsubShardNumsub() throws ExecutionException, InterruptedException {
@Test
void ssubscribe() throws ExecutionException, InterruptedException {
String pattern = "channelPattern";
AsyncCommand dispachedMock = mock (AsyncCommand.class);
AsyncCommand dispachedMock = mock(AsyncCommand.class);
when(mockedConnection.dispatch((RedisCommand<String, String, Object>) any())).thenReturn(dispachedMock);

commands.ssubscribe(pattern).get();

ArgumentCaptor<AsyncCommand> capturedCommand = ArgumentCaptor.forClass(AsyncCommand.class);;
ArgumentCaptor<AsyncCommand> capturedCommand = ArgumentCaptor.forClass(AsyncCommand.class);

verify(mockedConnection).dispatch(capturedCommand.capture());

Assertions.assertEquals( SSUBSCRIBE, capturedCommand.getValue().getType());
Assertions.assertEquals(SSUBSCRIBE, capturedCommand.getValue().getType());
assertInstanceOf(PubSubOutput.class, capturedCommand.getValue().getOutput());
Assertions.assertEquals( "key<channelPattern>", capturedCommand.getValue().getArgs().toCommandString());
Assertions.assertEquals("key<channelPattern>", capturedCommand.getValue().getArgs().toCommandString());

assertNotEquals(capturedCommand.getValue(), dispachedMock);
}
Expand Down

0 comments on commit b26e244

Please sign in to comment.