Skip to content

Commit

Permalink
OpsForGeo producing "READONLY You can't write against a read only rep…
Browse files Browse the repository at this point in the history
…lica " on READS... (#3032)

* OpsForGeo producing "READONLY You can't write against a read only replica" on READS... only if master & replica configured #1813

Divert pure read intentions of georadius and georadiusbymember commands (variants that do not use STORE/STOREDIST) to GEORADIUS_RO/GEORADIUSBYMEMBER_RO
This will unify the behaviour between Cluster and Redis Standalone/Replica arrangements

Relates to  issues #1481 #2568 #2871

Closes #1813

* Fix tests

* Remove unused methods

* Fix tests and add tests  withArgs
  • Loading branch information
ggivo authored and tishun committed Dec 1, 2024
1 parent 1cd0ecf commit d1b853b
Show file tree
Hide file tree
Showing 6 changed files with 153 additions and 84 deletions.
10 changes: 4 additions & 6 deletions src/main/java/io/lettuce/core/AbstractRedisAsyncCommands.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,6 @@
import java.util.Set;

import static io.lettuce.core.protocol.CommandType.EXEC;
import static io.lettuce.core.protocol.CommandType.GEORADIUS;
import static io.lettuce.core.protocol.CommandType.GEORADIUSBYMEMBER;
import static io.lettuce.core.protocol.CommandType.GEORADIUSBYMEMBER_RO;
import static io.lettuce.core.protocol.CommandType.GEORADIUS_RO;

Expand Down Expand Up @@ -1140,13 +1138,13 @@ public RedisFuture<List<GeoCoordinates>> geopos(K key, V... members) {

@Override
public RedisFuture<Set<V>> georadius(K key, double longitude, double latitude, double distance, GeoArgs.Unit unit) {
return dispatch(commandBuilder.georadius(GEORADIUS, key, longitude, latitude, distance, unit.name()));
return georadius_ro(key, longitude, latitude, distance, unit);
}

@Override
public RedisFuture<List<GeoWithin<V>>> georadius(K key, double longitude, double latitude, double distance,
GeoArgs.Unit unit, GeoArgs geoArgs) {
return dispatch(commandBuilder.georadius(GEORADIUS, key, longitude, latitude, distance, unit.name(), geoArgs));
return georadius_ro(key, longitude, latitude, distance, unit, geoArgs);
}

@Override
Expand All @@ -1166,13 +1164,13 @@ protected RedisFuture<List<GeoWithin<V>>> georadius_ro(K key, double longitude,

@Override
public RedisFuture<Set<V>> georadiusbymember(K key, V member, double distance, GeoArgs.Unit unit) {
return dispatch(commandBuilder.georadiusbymember(GEORADIUSBYMEMBER, key, member, distance, unit.name()));
return georadiusbymember_ro(key, member, distance, unit);
}

@Override
public RedisFuture<List<GeoWithin<V>>> georadiusbymember(K key, V member, double distance, GeoArgs.Unit unit,
GeoArgs geoArgs) {
return dispatch(commandBuilder.georadiusbymember(GEORADIUSBYMEMBER, key, member, distance, unit.name(), geoArgs));
return georadiusbymember_ro(key, member, distance, unit, geoArgs);
}

@Override
Expand Down
22 changes: 9 additions & 13 deletions src/main/java/io/lettuce/core/AbstractRedisReactiveCommands.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,6 @@
import java.util.function.Supplier;

import static io.lettuce.core.protocol.CommandType.EXEC;
import static io.lettuce.core.protocol.CommandType.GEORADIUS;
import static io.lettuce.core.protocol.CommandType.GEORADIUSBYMEMBER;
import static io.lettuce.core.protocol.CommandType.GEORADIUSBYMEMBER_RO;
import static io.lettuce.core.protocol.CommandType.GEORADIUS_RO;

Expand Down Expand Up @@ -1203,14 +1201,14 @@ public Flux<Value<GeoCoordinates>> geopos(K key, V... members) {
}

@Override
public Flux<V> georadius(K key, double longitude, double latitude, double distance, Unit unit) {
return createDissolvingFlux(() -> commandBuilder.georadius(GEORADIUS, key, longitude, latitude, distance, unit.name()));
public Flux<V> georadius(K key, double longitude, double latitude, double distance, GeoArgs.Unit unit) {
return georadius_ro(key, longitude, latitude, distance, unit);
}

@Override
public Flux<GeoWithin<V>> georadius(K key, double longitude, double latitude, double distance, Unit unit, GeoArgs geoArgs) {
return createDissolvingFlux(
() -> commandBuilder.georadius(GEORADIUS, key, longitude, latitude, distance, unit.name(), geoArgs));
public Flux<GeoWithin<V>> georadius(K key, double longitude, double latitude, double distance, GeoArgs.Unit unit,
GeoArgs geoArgs) {
return georadius_ro(key, longitude, latitude, distance, unit, geoArgs);
}

@Override
Expand All @@ -1231,15 +1229,13 @@ protected Flux<GeoWithin<V>> georadius_ro(K key, double longitude, double latitu
}

@Override
public Flux<V> georadiusbymember(K key, V member, double distance, Unit unit) {
return createDissolvingFlux(
() -> commandBuilder.georadiusbymember(GEORADIUSBYMEMBER, key, member, distance, unit.name()));
public Flux<V> georadiusbymember(K key, V member, double distance, GeoArgs.Unit unit) {
return georadiusbymember_ro(key, member, distance, unit);
}

@Override
public Flux<GeoWithin<V>> georadiusbymember(K key, V member, double distance, Unit unit, GeoArgs geoArgs) {
return createDissolvingFlux(
() -> commandBuilder.georadiusbymember(GEORADIUSBYMEMBER, key, member, distance, unit.name(), geoArgs));
public Flux<GeoWithin<V>> georadiusbymember(K key, V member, double distance, GeoArgs.Unit unit, GeoArgs geoArgs) {
return georadiusbymember_ro(key, member, distance, unit, geoArgs);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,28 +249,6 @@ public RedisFuture<String> flushdb(FlushMode flushMode) {
.firstOfAsync(executeOnUpstream(kvRedisClusterAsyncCommands -> kvRedisClusterAsyncCommands.flushdb(flushMode)));
}

@Override
public RedisFuture<Set<V>> georadius(K key, double longitude, double latitude, double distance, GeoArgs.Unit unit) {
return super.georadius_ro(key, longitude, latitude, distance, unit);
}

@Override
public RedisFuture<List<GeoWithin<V>>> georadius(K key, double longitude, double latitude, double distance,
GeoArgs.Unit unit, GeoArgs geoArgs) {
return super.georadius_ro(key, longitude, latitude, distance, unit, geoArgs);
}

@Override
public RedisFuture<Set<V>> georadiusbymember(K key, V member, double distance, GeoArgs.Unit unit) {
return super.georadiusbymember_ro(key, member, distance, unit);
}

@Override
public RedisFuture<List<GeoWithin<V>>> georadiusbymember(K key, V member, double distance, GeoArgs.Unit unit,
GeoArgs geoArgs) {
return super.georadiusbymember_ro(key, member, distance, unit, geoArgs);
}

@Override
public RedisFuture<List<K>> keys(K pattern) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,27 +235,6 @@ public Mono<String> flushdb(FlushMode flushMode) {
return Flux.merge(publishers.values()).last();
}

@Override
public Flux<V> georadius(K key, double longitude, double latitude, double distance, GeoArgs.Unit unit) {
return super.georadius_ro(key, longitude, latitude, distance, unit);
}

@Override
public Flux<GeoWithin<V>> georadius(K key, double longitude, double latitude, double distance, GeoArgs.Unit unit,
GeoArgs geoArgs) {
return super.georadius_ro(key, longitude, latitude, distance, unit, geoArgs);
}

@Override
public Flux<V> georadiusbymember(K key, V member, double distance, GeoArgs.Unit unit) {
return super.georadiusbymember_ro(key, member, distance, unit);
}

@Override
public Flux<GeoWithin<V>> georadiusbymember(K key, V member, double distance, GeoArgs.Unit unit, GeoArgs geoArgs) {
return super.georadiusbymember_ro(key, member, distance, unit, geoArgs);
}

@Override
public Flux<K> keys(K pattern) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,28 +65,6 @@ public RedisClusterPubSubAsyncCommandsImpl(StatefulRedisPubSubConnection<K, V> c
super(connection, codec);
}

@Override
public RedisFuture<Set<V>> georadius(K key, double longitude, double latitude, double distance, GeoArgs.Unit unit) {
return super.georadius_ro(key, longitude, latitude, distance, unit);
}

@Override
public RedisFuture<List<GeoWithin<V>>> georadius(K key, double longitude, double latitude, double distance,
GeoArgs.Unit unit, GeoArgs geoArgs) {
return super.georadius_ro(key, longitude, latitude, distance, unit, geoArgs);
}

@Override
public RedisFuture<Set<V>> georadiusbymember(K key, V member, double distance, GeoArgs.Unit unit) {
return super.georadiusbymember_ro(key, member, distance, unit);
}

@Override
public RedisFuture<List<GeoWithin<V>>> georadiusbymember(K key, V member, double distance, GeoArgs.Unit unit,
GeoArgs geoArgs) {
return super.georadiusbymember_ro(key, member, distance, unit, geoArgs);
}

@Override
public StatefulRedisClusterPubSubConnectionImpl<K, V> getStatefulConnection() {
return (StatefulRedisClusterPubSubConnectionImpl<K, V>) super.getStatefulConnection();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
package io.lettuce.core.commands;

import io.lettuce.core.*;
import io.lettuce.core.api.sync.RedisCommands;
import io.lettuce.core.codec.StringCodec;
import io.lettuce.core.masterreplica.MasterReplica;
import io.lettuce.core.masterreplica.StatefulRedisMasterReplicaConnection;
import io.lettuce.core.models.role.RedisInstance;
import io.lettuce.core.models.role.RoleParser;
import io.lettuce.test.LettuceExtension;
import io.lettuce.test.condition.EnabledOnCommand;
import io.lettuce.test.settings.TestSettings;
import org.junit.jupiter.api.*;
import org.junit.jupiter.api.extension.ExtendWith;

import java.util.Arrays;
import java.util.List;
import java.util.Set;

import static io.lettuce.TestTags.INTEGRATION_TEST;
import static org.assertj.core.api.Assertions.*;
import static org.junit.jupiter.api.Assumptions.assumeTrue;

/**
* @author Mark Paluch
*/
@Tag(INTEGRATION_TEST)
@ExtendWith(LettuceExtension.class)
@EnabledOnCommand("GEOADD")
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class GeoMasterReplicaIntegrationTests extends AbstractRedisClientTest {

private StatefulRedisMasterReplicaConnection<String, String> masterReplica;

private RedisCommands<String, String> upstream;

private RedisCommands<String, String> connection1;

private RedisCommands<String, String> connection2;

@BeforeEach
void before() {

RedisURI node1 = RedisURI.Builder.redis(host, TestSettings.port(3)).withDatabase(2).build();
RedisURI node2 = RedisURI.Builder.redis(host, TestSettings.port(4)).withDatabase(2).build();

connection1 = client.connect(node1).sync();
connection2 = client.connect(node2).sync();

RedisInstance node1Instance = RoleParser.parse(this.connection1.role());
RedisInstance node2Instance = RoleParser.parse(this.connection2.role());

if (node1Instance.getRole().isUpstream() && node2Instance.getRole().isReplica()) {
upstream = connection1;
} else if (node2Instance.getRole().isUpstream() && node1Instance.getRole().isReplica()) {
upstream = connection2;
} else {
assumeTrue(false,
String.format("Cannot run the test because I don't have a distinct master and replica but %s and %s",
node1Instance, node2Instance));
}
upstream.flushall();

masterReplica = MasterReplica.connect(client, StringCodec.UTF8, Arrays.asList(node1, node2));
masterReplica.setReadFrom(ReadFrom.REPLICA);

}

@AfterEach
void after() {

if (connection1 != null) {
connection1.getStatefulConnection().close();
}

if (connection2 != null) {
connection2.getStatefulConnection().close();
}

if (masterReplica != null) {
masterReplica.close();
}
}

@BeforeEach
void setUp() {
this.redis.flushall();
}

@Test
void georadiusReadFromReplica() {

prepareGeo(upstream);

upstream.waitForReplication(1, 1000);

Set<String> georadius = masterReplica.sync().georadius(key, 8.6582861, 49.5285695, 1, GeoArgs.Unit.km);
assertThat(georadius).hasSize(1).contains("Weinheim");
}

@Test
void georadiusWithArgsReadFromReplica() {

prepareGeo(upstream);

upstream.waitForReplication(1, 1000);

GeoArgs geoArgs = new GeoArgs().withHash().withCoordinates().withDistance().withCount(1).desc();

List<GeoWithin<String>> result = masterReplica.sync().georadius(key, 8.665351, 49.553302, 5, GeoArgs.Unit.km, geoArgs);
assertThat(result).hasSize(1);
}

@Test
void georadiusbymemberReadFromReplica() {

prepareGeo(upstream);
upstream.waitForReplication(1, 100);

Set<String> empty = masterReplica.sync().georadiusbymember(key, "Bahn", 1, GeoArgs.Unit.km);
assertThat(empty).hasSize(1).contains("Bahn");
}

@Test
void georadiusbymemberWithArgsReadFromReplica() {

prepareGeo(upstream);
upstream.waitForReplication(1, 100);

List<GeoWithin<String>> empty = masterReplica.sync().georadiusbymember(key, "Bahn", 1, GeoArgs.Unit.km,
new GeoArgs().withHash().withCoordinates().withDistance().desc());
assertThat(empty).isNotEmpty();
}

protected void prepareGeo(RedisCommands<String, String> redis) {
redis.geoadd(key, 8.6638775, 49.5282537, "Weinheim");
redis.geoadd(key, 8.3796281, 48.9978127, "EFS9", 8.665351, 49.553302, "Bahn");
}

}

0 comments on commit d1b853b

Please sign in to comment.