diff --git a/src/main/java/org/springframework/data/redis/connection/ReactiveZSetCommands.java b/src/main/java/org/springframework/data/redis/connection/ReactiveZSetCommands.java index e3cfb190dd..a7eb2a3ca7 100644 --- a/src/main/java/org/springframework/data/redis/connection/ReactiveZSetCommands.java +++ b/src/main/java/org/springframework/data/redis/connection/ReactiveZSetCommands.java @@ -25,14 +25,17 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.EnumSet; import java.util.List; import java.util.Optional; +import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.function.Function; import org.reactivestreams.Publisher; import org.springframework.data.domain.Range; import org.springframework.data.domain.Sort.Direction; +import org.springframework.data.redis.connection.RedisZSetCommands.ZAddArgs.Flag; import org.springframework.data.redis.connection.zset.Aggregate; import org.springframework.data.redis.connection.zset.DefaultTuple; import org.springframework.data.redis.connection.zset.Tuple; @@ -61,23 +64,16 @@ public interface ReactiveZSetCommands { class ZAddCommand extends KeyCommand { private final List tuples; - private final boolean upsert; - private final boolean returnTotalChanged; + private final Set flags; private final boolean incr; - private final boolean gt; - private final boolean lt; - private ZAddCommand(@Nullable ByteBuffer key, List tuples, boolean upsert, boolean returnTotalChanged, - boolean incr, boolean gt, boolean lt) { + private ZAddCommand(@Nullable ByteBuffer key, List tuples, Set flags, boolean incr) { super(key); this.tuples = tuples; - this.upsert = upsert; - this.returnTotalChanged = returnTotalChanged; + this.flags = flags; this.incr = incr; - this.gt = gt; - this.lt = lt; } /** @@ -103,7 +99,7 @@ public static ZAddCommand tuples(Collection tuples) { Assert.notNull(tuples, "Tuples must not be null"); - return new ZAddCommand(null, new ArrayList<>(tuples), false, false, false, false, false); + return new ZAddCommand(null, new ArrayList<>(tuples), EnumSet.noneOf(Flag.class), false); } /** @@ -116,7 +112,7 @@ public ZAddCommand to(ByteBuffer key) { Assert.notNull(key, "Key must not be null"); - return new ZAddCommand(key, tuples, upsert, returnTotalChanged, incr, gt, lt); + return new ZAddCommand(key, tuples, flags, incr); } /** @@ -126,7 +122,11 @@ public ZAddCommand to(ByteBuffer key) { * @return a new {@link ZAddCommand} with {@literal xx} applied. */ public ZAddCommand xx() { - return new ZAddCommand(getKey(), tuples, false, returnTotalChanged, incr, gt, lt); + + EnumSet flags = EnumSet.copyOf(this.flags); + flags.remove(Flag.NX); + flags.add(Flag.XX); + return new ZAddCommand(getKey(), tuples, flags, incr); } /** @@ -136,7 +136,11 @@ public ZAddCommand xx() { * @return a new {@link ZAddCommand} with {@literal nx} applied. */ public ZAddCommand nx() { - return new ZAddCommand(getKey(), tuples, true, returnTotalChanged, incr, gt, lt); + + EnumSet flags = EnumSet.copyOf(this.flags); + flags.remove(Flag.XX); + flags.add(Flag.NX); + return new ZAddCommand(getKey(), tuples, flags, incr); } /** @@ -146,17 +150,20 @@ public ZAddCommand nx() { * @return a new {@link ZAddCommand} with {@literal ch} applied. */ public ZAddCommand ch() { - return new ZAddCommand(getKey(), tuples, upsert, true, incr, gt, lt); + + EnumSet flags = EnumSet.copyOf(this.flags); + flags.add(Flag.CH); + return new ZAddCommand(getKey(), tuples, flags, incr); } /** * Applies {@literal incr} mode (When this option is specified ZADD acts like ZINCRBY). Constructs a new command - * instance with all previously configured properties. + * instance with all previously configured properties. Note that the command result returns the score of the member. * * @return a new {@link ZAddCommand} with {@literal incr} applied. */ public ZAddCommand incr() { - return new ZAddCommand(getKey(), tuples, upsert, upsert, true, gt, lt); + return new ZAddCommand(getKey(), tuples, flags, true); } /** @@ -166,7 +173,11 @@ public ZAddCommand incr() { * @since 2.5 */ public ZAddCommand gt() { - return new ZAddCommand(getKey(), tuples, upsert, upsert, incr, true, lt); + + EnumSet flags = EnumSet.copyOf(this.flags); + flags.remove(Flag.LT); + flags.add(Flag.GT); + return new ZAddCommand(getKey(), tuples, flags, incr); } /** @@ -176,7 +187,11 @@ public ZAddCommand gt() { * @since 2.5 */ public ZAddCommand lt() { - return new ZAddCommand(getKey(), tuples, upsert, upsert, incr, gt, true); + + EnumSet flags = EnumSet.copyOf(this.flags); + flags.remove(Flag.GT); + flags.add(Flag.LT); + return new ZAddCommand(getKey(), tuples, flags, incr); } /** @@ -187,10 +202,26 @@ public List getTuples() { } /** - * @return + * @return {@code true} if the command does not contain NX or XX flags. */ public boolean isUpsert() { - return upsert; + return !flags.contains(Flag.NX) && !flags.contains(Flag.XX); + } + + /** + * @return {@code true} if the command contains the XX flag. + * @since 3.0.11 + */ + public boolean isIfExists() { + return flags.contains(Flag.XX); + } + + /** + * @return {@code true} if the command contains the NX flag. + * @since 3.0.11 + */ + public boolean isIfNotExists() { + return flags.contains(Flag.NX); } /** @@ -205,7 +236,7 @@ public boolean isIncr() { * @since 2.5 */ public boolean isGt() { - return gt; + return flags.contains(Flag.GT); } /** @@ -213,14 +244,14 @@ public boolean isGt() { * @since 2.5 */ public boolean isLt() { - return lt; + return flags.contains(Flag.LT); } /** * @return */ public boolean isReturnTotalChanged() { - return returnTotalChanged; + return flags.contains(Flag.CH); } } diff --git a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveZSetCommands.java b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveZSetCommands.java index 62ce81dbeb..73b3f27ed6 100644 --- a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveZSetCommands.java +++ b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveZSetCommands.java @@ -23,7 +23,6 @@ import io.lettuce.core.Value; import io.lettuce.core.ZAddArgs; import io.lettuce.core.ZStoreArgs; - import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -32,7 +31,6 @@ import java.util.concurrent.TimeUnit; import org.reactivestreams.Publisher; - import org.springframework.data.domain.Sort.Direction; import org.springframework.data.redis.connection.ReactiveRedisConnection.CommandResponse; import org.springframework.data.redis.connection.ReactiveRedisConnection.KeyCommand; @@ -86,36 +84,32 @@ public Flux> zAdd(Publisher co ZAddArgs args = null; - if (command.isIncr() || command.isUpsert() || command.isReturnTotalChanged()) { - - if (command.isIncr()) { + if (command.isIncr()) { - if (command.getTuples().size() > 1) { - throw new IllegalArgumentException("ZADD INCR must not contain more than one tuple"); - } + if (command.getTuples().size() > 1) { + throw new IllegalArgumentException("ZADD INCR must not contain more than one tuple"); + } - Tuple tuple = command.getTuples().iterator().next(); + Tuple tuple = command.getTuples().iterator().next(); - return reactiveCommands.zaddincr(command.getKey(), tuple.getScore(), ByteBuffer.wrap(tuple.getValue())) - .map(value -> new NumericResponse<>(command, value)); - } + return reactiveCommands.zaddincr(command.getKey(), tuple.getScore(), ByteBuffer.wrap(tuple.getValue())) + .map(value -> new NumericResponse<>(command, value)); + } - if (command.isReturnTotalChanged()) { - args = ZAddArgs.Builder.ch(); - } + if (command.isReturnTotalChanged()) { + args = ZAddArgs.Builder.ch(); + } - if (command.isUpsert()) { - args = args == null ? ZAddArgs.Builder.nx() : args.nx(); - } else { - args = args == null ? ZAddArgs.Builder.xx() : args.xx(); - } + if (command.isIfNotExists()) { + args = args == null ? ZAddArgs.Builder.nx() : args.nx(); + } else if (command.isIfExists()) { + args = args == null ? ZAddArgs.Builder.xx() : args.xx(); + } - if (command.isGt()) { - args = args == null ? ZAddArgs.Builder.gt() : args.gt(); - } - if (command.isLt()) { - args = args == null ? ZAddArgs.Builder.lt() : args.lt(); - } + if (command.isGt()) { + args = args == null ? ZAddArgs.Builder.gt() : args.gt(); + } else if (command.isLt()) { + args = args == null ? ZAddArgs.Builder.lt() : args.lt(); } ScoredValue[] values = (ScoredValue[]) command.getTuples().stream() @@ -139,8 +133,7 @@ public Flux> zRem(Publisher comm ByteBuffer[] values = command.getValues().toArray(ByteBuffer[]::new); - return reactiveCommands.zrem(command.getKey(), values) - .map(value -> new NumericResponse<>(command, value)); + return reactiveCommands.zrem(command.getKey(), values).map(value -> new NumericResponse<>(command, value)); })); } @@ -178,8 +171,8 @@ public Flux>> zRandMemberWithSco Assert.notNull(command.getKey(), "Key must not be null"); - Flux> result = - reactiveCommands.zrandmemberWithScores(command.getKey(), command.getCount()); + Flux> result = reactiveCommands.zrandmemberWithScores(command.getKey(), + command.getCount()); return new CommandResponse<>(command, result.map(this::toTuple)); })); @@ -299,7 +292,8 @@ public Flux>> zRangeByScore( result = reactiveCommands.zrangebyscore(command.getKey(), range).map(value -> toTuple(value, Double.NaN)); } else { - result = reactiveCommands.zrangebyscore(command.getKey(), range, LettuceConverters.toLimit(command.getLimit().get())) + result = reactiveCommands + .zrangebyscore(command.getKey(), range, LettuceConverters.toLimit(command.getLimit().get())) .map(value -> toTuple(value, Double.NaN)); } } @@ -319,7 +313,8 @@ public Flux>> zRangeByScore( result = reactiveCommands.zrevrangebyscore(command.getKey(), range) .map(value -> toTuple(value, Double.NaN)); } else { - result = reactiveCommands.zrevrangebyscore(command.getKey(), range, LettuceConverters.toLimit(command.getLimit().get())) + result = reactiveCommands + .zrevrangebyscore(command.getKey(), range, LettuceConverters.toLimit(command.getLimit().get())) .map(value -> toTuple(value, Double.NaN)); } } @@ -337,7 +332,8 @@ public Flux>> zScan(Publisher result = ScanStream.zscan(reactiveCommands, command.getKey(), LettuceConverters.toScanArgs(command.getOptions())) + Flux result = ScanStream + .zscan(reactiveCommands, command.getKey(), LettuceConverters.toScanArgs(command.getOptions())) .map(this::toTuple); return Mono.just(new CommandResponse<>(command, result)); @@ -387,8 +383,7 @@ public Flux>> zPop(Publisher>> bZPop(Publisher> result = commandResult.filter(Value::hasValue).map(Value::getValue); return new CommandResponse<>(command, result.filter(Value::hasValue).map(this::toTuple).flux()); - } - else { + } else { long timeout = command.getTimeUnit().toSeconds(command.getTimeout()); Mono>> commandResult = command.getDirection() == PopDirection.MIN - ? reactiveCommands.bzpopmin(timeout, command.getKey()) - : reactiveCommands.bzpopmax(timeout, command.getKey()); + ? reactiveCommands.bzpopmin(timeout, command.getKey()) + : reactiveCommands.bzpopmax(timeout, command.getKey()); Mono> result = commandResult.filter(Value::hasValue).map(Value::getValue); @@ -438,8 +432,7 @@ public Flux> zCard(Publisher comma Assert.notNull(command.getKey(), "Key must not be null"); - return reactiveCommands.zcard(command.getKey()) - .map(value -> new NumericResponse<>(command, value)); + return reactiveCommands.zcard(command.getKey()).map(value -> new NumericResponse<>(command, value)); })); } @@ -574,8 +567,7 @@ public Flux>> zInter( ByteBuffer[] sourceKeys = command.getSourceKeys().toArray(new ByteBuffer[0]); - Flux result = args != null - ? reactiveCommands.zinter(args, sourceKeys) + Flux result = args != null ? reactiveCommands.zinter(args, sourceKeys) : reactiveCommands.zinter(sourceKeys); return new CommandResponse<>(command, result); @@ -599,8 +591,7 @@ public Flux>> zInterWithScores( ByteBuffer[] sourceKeys = command.getSourceKeys().toArray(new ByteBuffer[0]); - Flux> result = args != null - ? reactiveCommands.zinterWithScores(args, sourceKeys) + Flux> result = args != null ? reactiveCommands.zinterWithScores(args, sourceKeys) : reactiveCommands.zinterWithScores(sourceKeys); return new CommandResponse<>(command, result.map(this::toTuple)); @@ -625,8 +616,7 @@ public Flux> zInterStore( ByteBuffer[] sourceKeys = command.getSourceKeys().toArray(new ByteBuffer[0]); - Mono result = args != null - ? reactiveCommands.zinterstore(command.getKey(), args, sourceKeys) + Mono result = args != null ? reactiveCommands.zinterstore(command.getKey(), args, sourceKeys) : reactiveCommands.zinterstore(command.getKey(), sourceKeys); return result.map(value -> new NumericResponse<>(command, value)); @@ -650,8 +640,7 @@ public Flux>> zUnion( ByteBuffer[] sourceKeys = command.getSourceKeys().toArray(ByteBuffer[]::new); - Flux result = args != null - ? reactiveCommands.zunion(args, sourceKeys) + Flux result = args != null ? reactiveCommands.zunion(args, sourceKeys) : reactiveCommands.zunion(sourceKeys); return new CommandResponse<>(command, result); @@ -675,8 +664,7 @@ public Flux>> zUnionWithScores( ByteBuffer[] sourceKeys = command.getSourceKeys().toArray(ByteBuffer[]::new); - Flux> result = args != null - ? reactiveCommands.zunionWithScores(args, sourceKeys) + Flux> result = args != null ? reactiveCommands.zunionWithScores(args, sourceKeys) : reactiveCommands.zunionWithScores(sourceKeys); return new CommandResponse<>(command, result.map(this::toTuple)); @@ -701,8 +689,7 @@ public Flux> zUnionStore( ByteBuffer[] sourceKeys = command.getSourceKeys().toArray(ByteBuffer[]::new); - Mono result = args != null - ? reactiveCommands.zunionstore(command.getKey(), args, sourceKeys) + Mono result = args != null ? reactiveCommands.zunionstore(command.getKey(), args, sourceKeys) : reactiveCommands.zunionstore(command.getKey(), sourceKeys); return result.map(value -> new NumericResponse<>(command, value)); diff --git a/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveZSetCommandsIntegrationTests.java b/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveZSetCommandsIntegrationTests.java index 040216b0d5..fce69f8dae 100644 --- a/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveZSetCommandsIntegrationTests.java +++ b/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveZSetCommandsIntegrationTests.java @@ -19,14 +19,20 @@ import static org.assertj.core.api.Assumptions.*; import static org.springframework.data.domain.Range.Bound.*; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; import reactor.test.StepVerifier; import java.nio.ByteBuffer; import java.time.Duration; import java.util.Arrays; +import java.util.function.Function; import org.springframework.data.domain.Range; +import org.springframework.data.redis.connection.ReactiveRedisConnection.NumericResponse; +import org.springframework.data.redis.connection.ReactiveZSetCommands.ZAddCommand; import org.springframework.data.redis.connection.zset.DefaultTuple; +import org.springframework.data.redis.connection.zset.Tuple; import org.springframework.data.redis.core.ScanOptions; import org.springframework.data.redis.test.condition.EnabledOnCommand; import org.springframework.data.redis.test.extension.parametrized.ParameterizedRedisTest; @@ -55,6 +61,149 @@ void zAddShouldAddValuesWithScores() { assertThat(connection.zSetCommands().zAdd(KEY_1_BBUFFER, 3.5D, VALUE_1_BBUFFER).block()).isEqualTo(1L); } + @ParameterizedRedisTest // GH-2731 + void zAddShouldConsiderAbsentPresentUpsertFlags() { + + Tuple tuple = Tuple.of(VALUE_1_BYTES, 3.5D); + + zAdd(KEY_1_BBUFFER, tuple, Function.identity()).map(Number::intValue) // + .as(StepVerifier::create) // + .expectNext(1) // + .verifyComplete(); + + // NX + zAdd(KEY_1_BBUFFER, tuple, ZAddCommand::nx).map(Number::intValue) // + .as(StepVerifier::create) // + .expectNext(0) // + .verifyComplete(); + + zAdd(KEY_2_BBUFFER, tuple, ZAddCommand::nx).map(Number::intValue) // + .as(StepVerifier::create) // + .expectNext(1) // + .verifyComplete(); + + // XX + zAdd(KEY_1_BBUFFER, Tuple.of(VALUE_1_BYTES, 3.0D), ZAddCommand::xx).map(Number::intValue) // + .as(StepVerifier::create) // + .expectNext(0) // + .verifyComplete(); + + connection.zSetCommands().zScore(KEY_1_BBUFFER, VALUE_1_BBUFFER).map(Number::doubleValue) // + .as(StepVerifier::create) // + .expectNext(3.0) // + .verifyComplete(); + + zAdd(KEY_3_BBUFFER, tuple, ZAddCommand::xx).map(Number::intValue) // + .as(StepVerifier::create) // + .expectNext(0) // + .verifyComplete(); + } + + @ParameterizedRedisTest // GH-2731 + void zAddShouldConsiderLessThan() { + + Tuple tuple = Tuple.of(VALUE_1_BYTES, 3.5D); + + zAdd(KEY_1_BBUFFER, tuple, Function.identity()).map(Number::intValue) // + .as(StepVerifier::create) // + .expectNext(1) // + .verifyComplete(); + + zAdd(KEY_1_BBUFFER, Tuple.of(VALUE_1_BYTES, 6D), ZAddCommand::lt).map(Number::intValue) // + .as(StepVerifier::create) // + .expectNext(0) // + .verifyComplete(); + + connection.zSetCommands().zScore(KEY_1_BBUFFER, VALUE_1_BBUFFER).map(Number::doubleValue) // + .as(StepVerifier::create) // + .expectNext(3.5) // + .verifyComplete(); + + zAdd(KEY_1_BBUFFER, Tuple.of(VALUE_1_BYTES, 1D), ZAddCommand::lt).map(Number::intValue) // + .as(StepVerifier::create) // + .expectNext(0) // + .verifyComplete(); + + connection.zSetCommands().zScore(KEY_1_BBUFFER, VALUE_1_BBUFFER).map(Number::doubleValue) // + .as(StepVerifier::create) // + .expectNext(1.0) // + .verifyComplete(); + } + + @ParameterizedRedisTest // GH-2731 + void zAddShouldConsiderGreaterThan() { + + Tuple tuple = Tuple.of(VALUE_1_BYTES, 3.5D); + + zAdd(KEY_1_BBUFFER, tuple, Function.identity()).map(Number::intValue) // + .as(StepVerifier::create) // + .expectNext(1) // + .verifyComplete(); + + zAdd(KEY_1_BBUFFER, Tuple.of(VALUE_1_BYTES, 1D), ZAddCommand::gt).map(Number::intValue) // + .as(StepVerifier::create) // + .expectNext(0) // + .verifyComplete(); + + connection.zSetCommands().zScore(KEY_1_BBUFFER, VALUE_1_BBUFFER).map(Number::doubleValue) // + .as(StepVerifier::create) // + .expectNext(3.5) // + .verifyComplete(); + + zAdd(KEY_1_BBUFFER, Tuple.of(VALUE_1_BYTES, 6D), ZAddCommand::gt).map(Number::intValue) // + .as(StepVerifier::create) // + .expectNext(0) // + .verifyComplete(); + + connection.zSetCommands().zScore(KEY_1_BBUFFER, VALUE_1_BBUFFER).map(Number::doubleValue) // + .as(StepVerifier::create) // + .expectNext(6.0) // + .verifyComplete(); + } + + @ParameterizedRedisTest // GH-2731 + void zAddShouldConsiderIncrFlag() { + + Tuple tuple = Tuple.of(VALUE_1_BYTES, 3.5D); + + zAdd(KEY_1_BBUFFER, tuple, Function.identity()).map(Number::intValue) // + .as(StepVerifier::create) // + .expectNext(1) // + .verifyComplete(); + + zAdd(KEY_1_BBUFFER, tuple, ZAddCommand::incr).map(Number::intValue) // + + .as(StepVerifier::create) // + .expectNext(7) // + .verifyComplete(); + } + + @ParameterizedRedisTest // GH-2731 + void zAddShouldConsiderChFlag() { + + Tuple tuple = Tuple.of(VALUE_1_BYTES, 3.5D); + + zAdd(KEY_1_BBUFFER, tuple, Function.identity()).map(Number::intValue) // + .as(StepVerifier::create) // + .expectNext(1) // + .verifyComplete(); + + zAdd(KEY_1_BBUFFER, tuple, ZAddCommand::ch).map(Number::intValue) // + .as(StepVerifier::create) // + .expectNext(0) // + .verifyComplete(); + + zAdd(KEY_1_BBUFFER, Tuple.of(VALUE_1_BYTES, 3.0D), ZAddCommand::ch).map(Number::intValue) // + .as(StepVerifier::create) // + .expectNext(1) // + .verifyComplete(); + } + + private Flux zAdd(ByteBuffer key, Tuple tuple, Function commandCustomizer) { + return connection.zSetCommands().zAdd(Mono.just(commandCustomizer.apply(ZAddCommand.tuple(tuple).to(key)))) + .map(NumericResponse::getOutput); + } + @ParameterizedRedisTest // DATAREDIS-525 void zRemShouldRemoveValuesFromSet() {