From cfd82d53b0f8c94c18685e4702297de4e284ea31 Mon Sep 17 00:00:00 2001 From: ggivo Date: Wed, 30 Oct 2024 09:07:01 +0200 Subject: [PATCH 1/4] OpsForGeo producing "READONLY You can't write against a read only replica" on READS... only if master & replica configured #1813 Divert pure read intentions of georadius and georadiusbymember commands (variants that do not use STORE/STOREDIST) to GEORADIUS_RO/GEORADIUSBYMEMBER_RO This will unify the behaviour between Cluster and Redis Standalone/Replica arrangements Relates to issues #1481 #2568 #2871 Closes #1813 --- .../core/AbstractRedisAsyncCommands.java | 10 +- .../core/AbstractRedisReactiveCommands.java | 22 ++-- ...RedisAdvancedClusterAsyncCommandsImpl.java | 22 ---- ...isAdvancedClusterReactiveCommandsImpl.java | 21 --- .../RedisClusterPubSubAsyncCommandsImpl.java | 22 ---- .../GeoMasterReplicaIntegrationTests.java | 122 ++++++++++++++++++ 6 files changed, 135 insertions(+), 84 deletions(-) create mode 100644 src/test/java/io/lettuce/core/commands/GeoMasterReplicaIntegrationTests.java diff --git a/src/main/java/io/lettuce/core/AbstractRedisAsyncCommands.java b/src/main/java/io/lettuce/core/AbstractRedisAsyncCommands.java index 9c8b52849b..1c2ae525be 100644 --- a/src/main/java/io/lettuce/core/AbstractRedisAsyncCommands.java +++ b/src/main/java/io/lettuce/core/AbstractRedisAsyncCommands.java @@ -58,8 +58,6 @@ import java.util.Set; import static io.lettuce.core.protocol.CommandType.EXEC; -import static io.lettuce.core.protocol.CommandType.GEORADIUS; -import static io.lettuce.core.protocol.CommandType.GEORADIUSBYMEMBER; import static io.lettuce.core.protocol.CommandType.GEORADIUSBYMEMBER_RO; import static io.lettuce.core.protocol.CommandType.GEORADIUS_RO; @@ -1140,13 +1138,13 @@ public RedisFuture> geopos(K key, V... members) { @Override public RedisFuture> georadius(K key, double longitude, double latitude, double distance, GeoArgs.Unit unit) { - return dispatch(commandBuilder.georadius(GEORADIUS, key, longitude, latitude, distance, unit.name())); + return georadius_ro(key, longitude, latitude, distance, unit); } @Override public RedisFuture>> georadius(K key, double longitude, double latitude, double distance, GeoArgs.Unit unit, GeoArgs geoArgs) { - return dispatch(commandBuilder.georadius(GEORADIUS, key, longitude, latitude, distance, unit.name(), geoArgs)); + return georadius_ro(key, longitude, latitude, distance, unit, geoArgs); } @Override @@ -1166,13 +1164,13 @@ protected RedisFuture>> georadius_ro(K key, double longitude, @Override public RedisFuture> georadiusbymember(K key, V member, double distance, GeoArgs.Unit unit) { - return dispatch(commandBuilder.georadiusbymember(GEORADIUSBYMEMBER, key, member, distance, unit.name())); + return georadiusbymember_ro(key, member, distance, unit); } @Override public RedisFuture>> georadiusbymember(K key, V member, double distance, GeoArgs.Unit unit, GeoArgs geoArgs) { - return dispatch(commandBuilder.georadiusbymember(GEORADIUSBYMEMBER, key, member, distance, unit.name(), geoArgs)); + return georadiusbymember_ro(key, member, distance, unit, geoArgs); } @Override diff --git a/src/main/java/io/lettuce/core/AbstractRedisReactiveCommands.java b/src/main/java/io/lettuce/core/AbstractRedisReactiveCommands.java index eb7e911ca4..6d6c82e1ae 100644 --- a/src/main/java/io/lettuce/core/AbstractRedisReactiveCommands.java +++ b/src/main/java/io/lettuce/core/AbstractRedisReactiveCommands.java @@ -66,8 +66,6 @@ import java.util.function.Supplier; import static io.lettuce.core.protocol.CommandType.EXEC; -import static io.lettuce.core.protocol.CommandType.GEORADIUS; -import static io.lettuce.core.protocol.CommandType.GEORADIUSBYMEMBER; import static io.lettuce.core.protocol.CommandType.GEORADIUSBYMEMBER_RO; import static io.lettuce.core.protocol.CommandType.GEORADIUS_RO; @@ -1203,14 +1201,14 @@ public Flux> geopos(K key, V... members) { } @Override - public Flux georadius(K key, double longitude, double latitude, double distance, Unit unit) { - return createDissolvingFlux(() -> commandBuilder.georadius(GEORADIUS, key, longitude, latitude, distance, unit.name())); + public Flux georadius(K key, double longitude, double latitude, double distance, GeoArgs.Unit unit) { + return georadius_ro(key, longitude, latitude, distance, unit); } @Override - public Flux> georadius(K key, double longitude, double latitude, double distance, Unit unit, GeoArgs geoArgs) { - return createDissolvingFlux( - () -> commandBuilder.georadius(GEORADIUS, key, longitude, latitude, distance, unit.name(), geoArgs)); + public Flux> georadius(K key, double longitude, double latitude, double distance, GeoArgs.Unit unit, + GeoArgs geoArgs) { + return georadius_ro(key, longitude, latitude, distance, unit, geoArgs); } @Override @@ -1231,15 +1229,13 @@ protected Flux> georadius_ro(K key, double longitude, double latitu } @Override - public Flux georadiusbymember(K key, V member, double distance, Unit unit) { - return createDissolvingFlux( - () -> commandBuilder.georadiusbymember(GEORADIUSBYMEMBER, key, member, distance, unit.name())); + public Flux georadiusbymember(K key, V member, double distance, GeoArgs.Unit unit) { + return georadiusbymember_ro(key, member, distance, unit); } @Override - public Flux> georadiusbymember(K key, V member, double distance, Unit unit, GeoArgs geoArgs) { - return createDissolvingFlux( - () -> commandBuilder.georadiusbymember(GEORADIUSBYMEMBER, key, member, distance, unit.name(), geoArgs)); + public Flux> georadiusbymember(K key, V member, double distance, GeoArgs.Unit unit, GeoArgs geoArgs) { + return georadiusbymember_ro(key, member, distance, unit, geoArgs); } @Override diff --git a/src/main/java/io/lettuce/core/cluster/RedisAdvancedClusterAsyncCommandsImpl.java b/src/main/java/io/lettuce/core/cluster/RedisAdvancedClusterAsyncCommandsImpl.java index 14295db788..06c9a3ee7d 100644 --- a/src/main/java/io/lettuce/core/cluster/RedisAdvancedClusterAsyncCommandsImpl.java +++ b/src/main/java/io/lettuce/core/cluster/RedisAdvancedClusterAsyncCommandsImpl.java @@ -249,28 +249,6 @@ public RedisFuture flushdb(FlushMode flushMode) { .firstOfAsync(executeOnUpstream(kvRedisClusterAsyncCommands -> kvRedisClusterAsyncCommands.flushdb(flushMode))); } - @Override - public RedisFuture> georadius(K key, double longitude, double latitude, double distance, GeoArgs.Unit unit) { - return super.georadius_ro(key, longitude, latitude, distance, unit); - } - - @Override - public RedisFuture>> georadius(K key, double longitude, double latitude, double distance, - GeoArgs.Unit unit, GeoArgs geoArgs) { - return super.georadius_ro(key, longitude, latitude, distance, unit, geoArgs); - } - - @Override - public RedisFuture> georadiusbymember(K key, V member, double distance, GeoArgs.Unit unit) { - return super.georadiusbymember_ro(key, member, distance, unit); - } - - @Override - public RedisFuture>> georadiusbymember(K key, V member, double distance, GeoArgs.Unit unit, - GeoArgs geoArgs) { - return super.georadiusbymember_ro(key, member, distance, unit, geoArgs); - } - @Override public RedisFuture> keys(K pattern) { diff --git a/src/main/java/io/lettuce/core/cluster/RedisAdvancedClusterReactiveCommandsImpl.java b/src/main/java/io/lettuce/core/cluster/RedisAdvancedClusterReactiveCommandsImpl.java index 0517feb4c4..0ebb1475e5 100644 --- a/src/main/java/io/lettuce/core/cluster/RedisAdvancedClusterReactiveCommandsImpl.java +++ b/src/main/java/io/lettuce/core/cluster/RedisAdvancedClusterReactiveCommandsImpl.java @@ -235,27 +235,6 @@ public Mono flushdb(FlushMode flushMode) { return Flux.merge(publishers.values()).last(); } - @Override - public Flux georadius(K key, double longitude, double latitude, double distance, GeoArgs.Unit unit) { - return super.georadius_ro(key, longitude, latitude, distance, unit); - } - - @Override - public Flux> georadius(K key, double longitude, double latitude, double distance, GeoArgs.Unit unit, - GeoArgs geoArgs) { - return super.georadius_ro(key, longitude, latitude, distance, unit, geoArgs); - } - - @Override - public Flux georadiusbymember(K key, V member, double distance, GeoArgs.Unit unit) { - return super.georadiusbymember_ro(key, member, distance, unit); - } - - @Override - public Flux> georadiusbymember(K key, V member, double distance, GeoArgs.Unit unit, GeoArgs geoArgs) { - return super.georadiusbymember_ro(key, member, distance, unit, geoArgs); - } - @Override public Flux keys(K pattern) { diff --git a/src/main/java/io/lettuce/core/cluster/RedisClusterPubSubAsyncCommandsImpl.java b/src/main/java/io/lettuce/core/cluster/RedisClusterPubSubAsyncCommandsImpl.java index 28a2f972ef..a9163f947c 100644 --- a/src/main/java/io/lettuce/core/cluster/RedisClusterPubSubAsyncCommandsImpl.java +++ b/src/main/java/io/lettuce/core/cluster/RedisClusterPubSubAsyncCommandsImpl.java @@ -65,28 +65,6 @@ public RedisClusterPubSubAsyncCommandsImpl(StatefulRedisPubSubConnection c super(connection, codec); } - @Override - public RedisFuture> georadius(K key, double longitude, double latitude, double distance, GeoArgs.Unit unit) { - return super.georadius_ro(key, longitude, latitude, distance, unit); - } - - @Override - public RedisFuture>> georadius(K key, double longitude, double latitude, double distance, - GeoArgs.Unit unit, GeoArgs geoArgs) { - return super.georadius_ro(key, longitude, latitude, distance, unit, geoArgs); - } - - @Override - public RedisFuture> georadiusbymember(K key, V member, double distance, GeoArgs.Unit unit) { - return super.georadiusbymember_ro(key, member, distance, unit); - } - - @Override - public RedisFuture>> georadiusbymember(K key, V member, double distance, GeoArgs.Unit unit, - GeoArgs geoArgs) { - return super.georadiusbymember_ro(key, member, distance, unit, geoArgs); - } - @Override public StatefulRedisClusterPubSubConnectionImpl getStatefulConnection() { return (StatefulRedisClusterPubSubConnectionImpl) super.getStatefulConnection(); diff --git a/src/test/java/io/lettuce/core/commands/GeoMasterReplicaIntegrationTests.java b/src/test/java/io/lettuce/core/commands/GeoMasterReplicaIntegrationTests.java new file mode 100644 index 0000000000..8eae34796b --- /dev/null +++ b/src/test/java/io/lettuce/core/commands/GeoMasterReplicaIntegrationTests.java @@ -0,0 +1,122 @@ +package io.lettuce.core.commands; + +import io.lettuce.core.*; +import io.lettuce.core.api.sync.RedisCommands; +import io.lettuce.core.codec.StringCodec; +import io.lettuce.core.masterreplica.MasterReplica; +import io.lettuce.core.masterreplica.StatefulRedisMasterReplicaConnection; +import io.lettuce.core.models.role.RedisInstance; +import io.lettuce.core.models.role.RoleParser; +import io.lettuce.test.LettuceExtension; +import io.lettuce.test.condition.EnabledOnCommand; +import io.lettuce.test.settings.TestSettings; +import org.junit.jupiter.api.*; +import org.junit.jupiter.api.extension.ExtendWith; + +import java.util.Arrays; +import java.util.List; +import java.util.Set; + +import static io.lettuce.TestTags.INTEGRATION_TEST; +import static org.assertj.core.api.Assertions.*; +import static org.junit.jupiter.api.Assumptions.assumeTrue; + +/** + * @author Mark Paluch + */ +@Tag(INTEGRATION_TEST) +@ExtendWith(LettuceExtension.class) +@EnabledOnCommand("GEOADD") +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +public class GeoMasterReplicaIntegrationTests extends AbstractRedisClientTest { + + private StatefulRedisMasterReplicaConnection masterReplica; + + private RedisCommands upstream; + + private RedisCommands connection1; + + private RedisCommands connection2; + + @BeforeEach + void before() { + + RedisURI node1 = RedisURI.Builder.redis(host, TestSettings.port(3)).withDatabase(2).build(); + RedisURI node2 = RedisURI.Builder.redis(host, TestSettings.port(4)).withDatabase(2).build(); + + connection1 = client.connect(node1).sync(); + connection2 = client.connect(node2).sync(); + + RedisInstance node1Instance = RoleParser.parse(this.connection1.role()); + RedisInstance node2Instance = RoleParser.parse(this.connection2.role()); + + if (node1Instance.getRole().isUpstream() && node2Instance.getRole().isReplica()) { + upstream = connection1; + } else if (node2Instance.getRole().isUpstream() && node1Instance.getRole().isReplica()) { + upstream = connection2; + } else { + assumeTrue(false, + String.format("Cannot run the test because I don't have a distinct master and replica but %s and %s", + node1Instance, node2Instance)); + } + + masterReplica = MasterReplica.connect(client, StringCodec.UTF8, Arrays.asList(node1, node2)); + masterReplica.setReadFrom(ReadFrom.REPLICA); + } + + @AfterEach + void after() { + + if (connection1 != null) { + connection1.getStatefulConnection().close(); + } + + if (connection2 != null) { + connection2.getStatefulConnection().close(); + } + + if (masterReplica != null) { + masterReplica.close(); + } + } + + @BeforeEach + void setUp() { + this.redis.flushall(); + } + + @Test + void georadiusReadFromReplica() { + + prepareGeo(upstream); + + upstream.waitForReplication(1, 1000); + + Set georadius = masterReplica.sync().georadius(key, 8.6582861, 49.5285695, 1, GeoArgs.Unit.km); + assertThat(georadius).hasSize(1).contains("Weinheim"); + } + + @Test + void georadiusbymemberReadFromReplica() { + + prepareGeo(upstream); + upstream.waitForReplication(1, 100); + + Set empty = masterReplica.sync().georadiusbymember(key, "Bahn", 1, GeoArgs.Unit.km); + assertThat(empty).hasSize(1).contains("Bahn"); + } + + protected void prepareGeo(RedisCommands redis) { + redis.geoadd(key, 8.6638775, 49.5282537, "Weinheim"); + redis.geoadd(key, 8.3796281, 48.9978127, "EFS9", 8.665351, 49.553302, "Bahn"); + } + + private static double getY(List> georadius, int i) { + return georadius.get(i).getCoordinates().getY().doubleValue(); + } + + private static double getX(List> georadius, int i) { + return georadius.get(i).getCoordinates().getX().doubleValue(); + } + +} From fedefdc00c6a18cf26c15f37b40b8f857cb34d28 Mon Sep 17 00:00:00 2001 From: ggivo Date: Wed, 30 Oct 2024 10:43:57 +0200 Subject: [PATCH 2/4] Fix tests --- .../core/commands/GeoMasterReplicaIntegrationTests.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/test/java/io/lettuce/core/commands/GeoMasterReplicaIntegrationTests.java b/src/test/java/io/lettuce/core/commands/GeoMasterReplicaIntegrationTests.java index 8eae34796b..f9d704c864 100644 --- a/src/test/java/io/lettuce/core/commands/GeoMasterReplicaIntegrationTests.java +++ b/src/test/java/io/lettuce/core/commands/GeoMasterReplicaIntegrationTests.java @@ -47,6 +47,9 @@ void before() { connection1 = client.connect(node1).sync(); connection2 = client.connect(node2).sync(); + connection1.flushall(); + connection2.flushall(); + RedisInstance node1Instance = RoleParser.parse(this.connection1.role()); RedisInstance node2Instance = RoleParser.parse(this.connection2.role()); From 7a9ca22ea1e3461af21bdc34e1afbf7379d99903 Mon Sep 17 00:00:00 2001 From: ggivo Date: Wed, 30 Oct 2024 10:44:34 +0200 Subject: [PATCH 3/4] Remove unused methods --- .../core/commands/GeoMasterReplicaIntegrationTests.java | 8 -------- 1 file changed, 8 deletions(-) diff --git a/src/test/java/io/lettuce/core/commands/GeoMasterReplicaIntegrationTests.java b/src/test/java/io/lettuce/core/commands/GeoMasterReplicaIntegrationTests.java index f9d704c864..96a0baaa72 100644 --- a/src/test/java/io/lettuce/core/commands/GeoMasterReplicaIntegrationTests.java +++ b/src/test/java/io/lettuce/core/commands/GeoMasterReplicaIntegrationTests.java @@ -114,12 +114,4 @@ protected void prepareGeo(RedisCommands redis) { redis.geoadd(key, 8.3796281, 48.9978127, "EFS9", 8.665351, 49.553302, "Bahn"); } - private static double getY(List> georadius, int i) { - return georadius.get(i).getCoordinates().getY().doubleValue(); - } - - private static double getX(List> georadius, int i) { - return georadius.get(i).getCoordinates().getX().doubleValue(); - } - } From d67c63fbdaaba433f4521465f651fc9541379684 Mon Sep 17 00:00:00 2001 From: ggivo Date: Wed, 30 Oct 2024 11:02:04 +0200 Subject: [PATCH 4/4] Fix tests and add tests withArgs --- .../GeoMasterReplicaIntegrationTests.java | 29 +++++++++++++++++-- 1 file changed, 26 insertions(+), 3 deletions(-) diff --git a/src/test/java/io/lettuce/core/commands/GeoMasterReplicaIntegrationTests.java b/src/test/java/io/lettuce/core/commands/GeoMasterReplicaIntegrationTests.java index 96a0baaa72..3a176e475c 100644 --- a/src/test/java/io/lettuce/core/commands/GeoMasterReplicaIntegrationTests.java +++ b/src/test/java/io/lettuce/core/commands/GeoMasterReplicaIntegrationTests.java @@ -47,9 +47,6 @@ void before() { connection1 = client.connect(node1).sync(); connection2 = client.connect(node2).sync(); - connection1.flushall(); - connection2.flushall(); - RedisInstance node1Instance = RoleParser.parse(this.connection1.role()); RedisInstance node2Instance = RoleParser.parse(this.connection2.role()); @@ -62,9 +59,11 @@ void before() { String.format("Cannot run the test because I don't have a distinct master and replica but %s and %s", node1Instance, node2Instance)); } + upstream.flushall(); masterReplica = MasterReplica.connect(client, StringCodec.UTF8, Arrays.asList(node1, node2)); masterReplica.setReadFrom(ReadFrom.REPLICA); + } @AfterEach @@ -99,6 +98,19 @@ void georadiusReadFromReplica() { assertThat(georadius).hasSize(1).contains("Weinheim"); } + @Test + void georadiusWithArgsReadFromReplica() { + + prepareGeo(upstream); + + upstream.waitForReplication(1, 1000); + + GeoArgs geoArgs = new GeoArgs().withHash().withCoordinates().withDistance().withCount(1).desc(); + + List> result = masterReplica.sync().georadius(key, 8.665351, 49.553302, 5, GeoArgs.Unit.km, geoArgs); + assertThat(result).hasSize(1); + } + @Test void georadiusbymemberReadFromReplica() { @@ -109,6 +121,17 @@ void georadiusbymemberReadFromReplica() { assertThat(empty).hasSize(1).contains("Bahn"); } + @Test + void georadiusbymemberWithArgsReadFromReplica() { + + prepareGeo(upstream); + upstream.waitForReplication(1, 100); + + List> empty = masterReplica.sync().georadiusbymember(key, "Bahn", 1, GeoArgs.Unit.km, + new GeoArgs().withHash().withCoordinates().withDistance().desc()); + assertThat(empty).isNotEmpty(); + } + protected void prepareGeo(RedisCommands redis) { redis.geoadd(key, 8.6638775, 49.5282537, "Weinheim"); redis.geoadd(key, 8.3796281, 48.9978127, "EFS9", 8.665351, 49.553302, "Bahn");