diff --git a/src/main/antora/modules/ROOT/pages/redis/pubsub.adoc b/src/main/antora/modules/ROOT/pages/redis/pubsub.adoc
index 0fdc427c58..8e34998bc4 100644
--- a/src/main/antora/modules/ROOT/pages/redis/pubsub.adoc
+++ b/src/main/antora/modules/ROOT/pages/redis/pubsub.adoc
@@ -12,6 +12,8 @@ This is an example of the pattern often called Publish/Subscribe (Pub/Sub for sh
The `org.springframework.data.redis.connection` and `org.springframework.data.redis.listener` packages provide the core functionality for Redis messaging.
+For static Master/Replica configuration, Pub/Sub operations always use the first node because messages are not replicated between nodes.
+
[[redis:pubsub:publish]]
== Publishing (Sending Messages)
diff --git a/src/main/java/org/springframework/data/redis/connection/RedisStaticMasterReplicaConfiguration.java b/src/main/java/org/springframework/data/redis/connection/RedisStaticMasterReplicaConfiguration.java
index da8cb7eeb0..eff7d1cd01 100644
--- a/src/main/java/org/springframework/data/redis/connection/RedisStaticMasterReplicaConfiguration.java
+++ b/src/main/java/org/springframework/data/redis/connection/RedisStaticMasterReplicaConfiguration.java
@@ -28,11 +28,12 @@
* Configuration class used for setting up {@link RedisConnection} via {@link RedisConnectionFactory} using the provided
* Master / Replica configuration to nodes know to not change address. Eg. when connecting to
* AWS ElastiCache with Read Replicas.
- * Please also note that a Master/Replica connection cannot be used for Pub/Sub operations.
+ * Please also note that Pub/Sub operations for Master/Replica always use the first node.
*
* @author Mark Paluch
* @author Christoph Strobl
* @author Tamer Soliman
+ * @author Krzysztof Debski
* @since 2.1
*/
public class RedisStaticMasterReplicaConfiguration implements RedisConfiguration, StaticMasterReplicaConfiguration {
diff --git a/src/main/java/org/springframework/data/redis/connection/lettuce/StaticMasterReplicaConnectionProvider.java b/src/main/java/org/springframework/data/redis/connection/lettuce/StaticMasterReplicaConnectionProvider.java
index 51f3d7958b..c1009383e6 100644
--- a/src/main/java/org/springframework/data/redis/connection/lettuce/StaticMasterReplicaConnectionProvider.java
+++ b/src/main/java/org/springframework/data/redis/connection/lettuce/StaticMasterReplicaConnectionProvider.java
@@ -19,6 +19,7 @@
import io.lettuce.core.RedisClient;
import io.lettuce.core.RedisURI;
import io.lettuce.core.api.StatefulConnection;
+import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection;
import io.lettuce.core.codec.RedisCodec;
import io.lettuce.core.masterreplica.MasterReplica;
import io.lettuce.core.masterreplica.StatefulRedisMasterReplicaConnection;
@@ -38,6 +39,7 @@
*
* @author Mark Paluch
* @author Christoph Strobl
+ * @author Krzysztof Debski
* @since 2.1
*/
class StaticMasterReplicaConnectionProvider implements LettuceConnectionProvider {
@@ -68,7 +70,8 @@ class StaticMasterReplicaConnectionProvider implements LettuceConnectionProvider
public > T getConnection(Class connectionType) {
if (connectionType.equals(StatefulRedisPubSubConnection.class)) {
- throw new UnsupportedOperationException("Pub/Sub connections not supported with Master/Replica configurations");
+
+ return connectionType.cast(client.connectPubSub(codec, getPubSubUri()));
}
if (StatefulConnection.class.isAssignableFrom(connectionType)) {
@@ -85,6 +88,12 @@ class StaticMasterReplicaConnectionProvider implements LettuceConnectionProvider
@Override
public > CompletionStage getConnectionAsync(Class connectionType) {
+ if (connectionType.equals(StatefulRedisPubSubConnection.class)) {
+
+ return client.connectPubSubAsync(codec, getPubSubUri())
+ .thenApply(connectionType::cast);
+ }
+
if (StatefulConnection.class.isAssignableFrom(connectionType)) {
CompletableFuture extends StatefulRedisMasterReplicaConnection, ?>> connection = MasterReplica
@@ -99,4 +108,8 @@ class StaticMasterReplicaConnectionProvider implements LettuceConnectionProvider
throw new UnsupportedOperationException(String.format("Connection type %s not supported", connectionType));
}
+
+ private RedisURI getPubSubUri() {
+ return nodes.iterator().next();
+ }
}
diff --git a/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionFactoryTests.java b/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionFactoryTests.java
index 82eeda5e0a..1abf5e9d96 100644
--- a/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionFactoryTests.java
+++ b/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionFactoryTests.java
@@ -59,6 +59,7 @@
* @author Thomas Darimont
* @author Christoph Strobl
* @author Mark Paluch
+ * @author Krzysztof Debski
*/
@ExtendWith(LettuceConnectionFactoryExtension.class)
class LettuceConnectionFactoryTests {
@@ -427,7 +428,7 @@ void factoryUsesElastiCacheMasterReplicaConnections() {
}
@Test // DATAREDIS-1093
- void pubSubDoesNotSupportMasterReplicaConnections() {
+ void factoryConnectionSupportsSubscriptionForMasterReplicaConnections() {
assumeTrue(String.format("No replicas connected to %s:%s.", SettingsUtils.getHost(), SettingsUtils.getPort()),
connection.info("replication").getProperty("connected_slaves", "0").compareTo("0") > 0);
@@ -441,10 +442,13 @@ void pubSubDoesNotSupportMasterReplicaConnections() {
RedisConnection connection = factory.getConnection();
- assertThatThrownBy(() -> connection.pSubscribe((message, pattern) -> {}, "foo".getBytes()))
- .isInstanceOf(RedisConnectionFailureException.class).hasCauseInstanceOf(UnsupportedOperationException.class);
+ try {
+ connection.pSubscribe((message, pattern) -> {}, "foo".getBytes());
+ assertThat(connection.isSubscribed()).isTrue();
+ } finally {
+ connection.close();
+ }
- connection.close();
factory.destroy();
}