Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Pub/Sub for static master/replica configuration. #2773

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/main/antora/modules/ROOT/pages/redis/pubsub.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ This is an example of the pattern often called Publish/Subscribe (Pub/Sub for sh

The `org.springframework.data.redis.connection` and `org.springframework.data.redis.listener` packages provide the core functionality for Redis messaging.

For static Master/Replica configuration, Pub/Sub operations always use the first node because messages are not replicated between nodes.

[[redis:pubsub:publish]]
== Publishing (Sending Messages)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,12 @@
* Configuration class used for setting up {@link RedisConnection} via {@link RedisConnectionFactory} using the provided
* Master / Replica configuration to nodes know to not change address. Eg. when connecting to
* <a href="https://aws.amazon.com/documentation/elasticache/">AWS ElastiCache with Read Replicas</a>. <br/>
* Please also note that a Master/Replica connection cannot be used for Pub/Sub operations.
* Please also note that Pub/Sub operations for Master/Replica always use the first node.
*
* @author Mark Paluch
* @author Christoph Strobl
* @author Tamer Soliman
* @author Krzysztof Debski
* @since 2.1
*/
public class RedisStaticMasterReplicaConfiguration implements RedisConfiguration, StaticMasterReplicaConfiguration {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.lettuce.core.RedisClient;
import io.lettuce.core.RedisURI;
import io.lettuce.core.api.StatefulConnection;
import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection;
import io.lettuce.core.codec.RedisCodec;
import io.lettuce.core.masterreplica.MasterReplica;
import io.lettuce.core.masterreplica.StatefulRedisMasterReplicaConnection;
Expand All @@ -38,6 +39,7 @@
*
* @author Mark Paluch
* @author Christoph Strobl
* @author Krzysztof Debski
* @since 2.1
*/
class StaticMasterReplicaConnectionProvider implements LettuceConnectionProvider {
Expand Down Expand Up @@ -68,7 +70,8 @@ class StaticMasterReplicaConnectionProvider implements LettuceConnectionProvider
public <T extends StatefulConnection<?, ?>> T getConnection(Class<T> connectionType) {

if (connectionType.equals(StatefulRedisPubSubConnection.class)) {
throw new UnsupportedOperationException("Pub/Sub connections not supported with Master/Replica configurations");

return connectionType.cast(client.connectPubSub(codec, getPubSubUri()));
}

if (StatefulConnection.class.isAssignableFrom(connectionType)) {
Expand All @@ -85,6 +88,12 @@ class StaticMasterReplicaConnectionProvider implements LettuceConnectionProvider
@Override
public <T extends StatefulConnection<?, ?>> CompletionStage<T> getConnectionAsync(Class<T> connectionType) {

if (connectionType.equals(StatefulRedisPubSubConnection.class)) {

return client.connectPubSubAsync(codec, getPubSubUri())
.thenApply(connectionType::cast);
}

if (StatefulConnection.class.isAssignableFrom(connectionType)) {

CompletableFuture<? extends StatefulRedisMasterReplicaConnection<?, ?>> connection = MasterReplica
Expand All @@ -99,4 +108,8 @@ class StaticMasterReplicaConnectionProvider implements LettuceConnectionProvider

throw new UnsupportedOperationException(String.format("Connection type %s not supported", connectionType));
}

private RedisURI getPubSubUri() {
return nodes.iterator().next();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
* @author Thomas Darimont
* @author Christoph Strobl
* @author Mark Paluch
* @author Krzysztof Debski
*/
@ExtendWith(LettuceConnectionFactoryExtension.class)
class LettuceConnectionFactoryTests {
Expand Down Expand Up @@ -427,7 +428,7 @@ void factoryUsesElastiCacheMasterReplicaConnections() {
}

@Test // DATAREDIS-1093
void pubSubDoesNotSupportMasterReplicaConnections() {
void factoryConnectionSupportsSubscriptionForMasterReplicaConnections() {

assumeTrue(String.format("No replicas connected to %s:%s.", SettingsUtils.getHost(), SettingsUtils.getPort()),
connection.info("replication").getProperty("connected_slaves", "0").compareTo("0") > 0);
Expand All @@ -441,10 +442,13 @@ void pubSubDoesNotSupportMasterReplicaConnections() {

RedisConnection connection = factory.getConnection();

assertThatThrownBy(() -> connection.pSubscribe((message, pattern) -> {}, "foo".getBytes()))
.isInstanceOf(RedisConnectionFailureException.class).hasCauseInstanceOf(UnsupportedOperationException.class);
try {
connection.pSubscribe((message, pattern) -> {}, "foo".getBytes());
assertThat(connection.isSubscribed()).isTrue();
} finally {
connection.close();
}

connection.close();
factory.destroy();
}

Expand Down
Loading