diff --git a/src/main/antora/modules/ROOT/pages/redis/pubsub.adoc b/src/main/antora/modules/ROOT/pages/redis/pubsub.adoc index 0fdc427c58..8e34998bc4 100644 --- a/src/main/antora/modules/ROOT/pages/redis/pubsub.adoc +++ b/src/main/antora/modules/ROOT/pages/redis/pubsub.adoc @@ -12,6 +12,8 @@ This is an example of the pattern often called Publish/Subscribe (Pub/Sub for sh The `org.springframework.data.redis.connection` and `org.springframework.data.redis.listener` packages provide the core functionality for Redis messaging. +For static Master/Replica configuration, Pub/Sub operations always use the first node because messages are not replicated between nodes. + [[redis:pubsub:publish]] == Publishing (Sending Messages) diff --git a/src/main/java/org/springframework/data/redis/connection/RedisStaticMasterReplicaConfiguration.java b/src/main/java/org/springframework/data/redis/connection/RedisStaticMasterReplicaConfiguration.java index da8cb7eeb0..eff7d1cd01 100644 --- a/src/main/java/org/springframework/data/redis/connection/RedisStaticMasterReplicaConfiguration.java +++ b/src/main/java/org/springframework/data/redis/connection/RedisStaticMasterReplicaConfiguration.java @@ -28,11 +28,12 @@ * Configuration class used for setting up {@link RedisConnection} via {@link RedisConnectionFactory} using the provided * Master / Replica configuration to nodes know to not change address. Eg. when connecting to * AWS ElastiCache with Read Replicas.
- * Please also note that a Master/Replica connection cannot be used for Pub/Sub operations. + * Please also note that Pub/Sub operations for Master/Replica always use the first node. * * @author Mark Paluch * @author Christoph Strobl * @author Tamer Soliman + * @author Krzysztof Debski * @since 2.1 */ public class RedisStaticMasterReplicaConfiguration implements RedisConfiguration, StaticMasterReplicaConfiguration { diff --git a/src/main/java/org/springframework/data/redis/connection/lettuce/StaticMasterReplicaConnectionProvider.java b/src/main/java/org/springframework/data/redis/connection/lettuce/StaticMasterReplicaConnectionProvider.java index 51f3d7958b..c1009383e6 100644 --- a/src/main/java/org/springframework/data/redis/connection/lettuce/StaticMasterReplicaConnectionProvider.java +++ b/src/main/java/org/springframework/data/redis/connection/lettuce/StaticMasterReplicaConnectionProvider.java @@ -19,6 +19,7 @@ import io.lettuce.core.RedisClient; import io.lettuce.core.RedisURI; import io.lettuce.core.api.StatefulConnection; +import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection; import io.lettuce.core.codec.RedisCodec; import io.lettuce.core.masterreplica.MasterReplica; import io.lettuce.core.masterreplica.StatefulRedisMasterReplicaConnection; @@ -38,6 +39,7 @@ * * @author Mark Paluch * @author Christoph Strobl + * @author Krzysztof Debski * @since 2.1 */ class StaticMasterReplicaConnectionProvider implements LettuceConnectionProvider { @@ -68,7 +70,8 @@ class StaticMasterReplicaConnectionProvider implements LettuceConnectionProvider public > T getConnection(Class connectionType) { if (connectionType.equals(StatefulRedisPubSubConnection.class)) { - throw new UnsupportedOperationException("Pub/Sub connections not supported with Master/Replica configurations"); + + return connectionType.cast(client.connectPubSub(codec, getPubSubUri())); } if (StatefulConnection.class.isAssignableFrom(connectionType)) { @@ -85,6 +88,12 @@ class StaticMasterReplicaConnectionProvider implements LettuceConnectionProvider @Override public > CompletionStage getConnectionAsync(Class connectionType) { + if (connectionType.equals(StatefulRedisPubSubConnection.class)) { + + return client.connectPubSubAsync(codec, getPubSubUri()) + .thenApply(connectionType::cast); + } + if (StatefulConnection.class.isAssignableFrom(connectionType)) { CompletableFuture> connection = MasterReplica @@ -99,4 +108,8 @@ class StaticMasterReplicaConnectionProvider implements LettuceConnectionProvider throw new UnsupportedOperationException(String.format("Connection type %s not supported", connectionType)); } + + private RedisURI getPubSubUri() { + return nodes.iterator().next(); + } } diff --git a/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionFactoryTests.java b/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionFactoryTests.java index 82eeda5e0a..1abf5e9d96 100644 --- a/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionFactoryTests.java +++ b/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionFactoryTests.java @@ -59,6 +59,7 @@ * @author Thomas Darimont * @author Christoph Strobl * @author Mark Paluch + * @author Krzysztof Debski */ @ExtendWith(LettuceConnectionFactoryExtension.class) class LettuceConnectionFactoryTests { @@ -427,7 +428,7 @@ void factoryUsesElastiCacheMasterReplicaConnections() { } @Test // DATAREDIS-1093 - void pubSubDoesNotSupportMasterReplicaConnections() { + void factoryConnectionSupportsSubscriptionForMasterReplicaConnections() { assumeTrue(String.format("No replicas connected to %s:%s.", SettingsUtils.getHost(), SettingsUtils.getPort()), connection.info("replication").getProperty("connected_slaves", "0").compareTo("0") > 0); @@ -441,10 +442,13 @@ void pubSubDoesNotSupportMasterReplicaConnections() { RedisConnection connection = factory.getConnection(); - assertThatThrownBy(() -> connection.pSubscribe((message, pattern) -> {}, "foo".getBytes())) - .isInstanceOf(RedisConnectionFailureException.class).hasCauseInstanceOf(UnsupportedOperationException.class); + try { + connection.pSubscribe((message, pattern) -> {}, "foo".getBytes()); + assertThat(connection.isSubscribed()).isTrue(); + } finally { + connection.close(); + } - connection.close(); factory.destroy(); }