diff --git a/pom.xml b/pom.xml index 9dede838e3..93cea9035c 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ org.springframework.data spring-data-redis - 3.0.9-SNAPSHOT + 3.0.x-GH-2655-SNAPSHOT Spring Data Redis Spring Data module for Redis diff --git a/src/main/java/org/springframework/data/redis/connection/convert/Converters.java b/src/main/java/org/springframework/data/redis/connection/convert/Converters.java index 7a2698fdab..51f23e9cb2 100644 --- a/src/main/java/org/springframework/data/redis/connection/convert/Converters.java +++ b/src/main/java/org/springframework/data/redis/connection/convert/Converters.java @@ -465,7 +465,7 @@ public static Object parse(Object source, String sourcePath, Map Map.Entry entryOf(K key, V value) { + public static Map.Entry entryOf(@Nullable K key, @Nullable V value) { return new AbstractMap.SimpleImmutableEntry<>(key, value); } diff --git a/src/main/java/org/springframework/data/redis/core/DefaultReactiveGeoOperations.java b/src/main/java/org/springframework/data/redis/core/DefaultReactiveGeoOperations.java index b14b5d58c0..6df8747542 100644 --- a/src/main/java/org/springframework/data/redis/core/DefaultReactiveGeoOperations.java +++ b/src/main/java/org/springframework/data/redis/core/DefaultReactiveGeoOperations.java @@ -26,7 +26,6 @@ import java.util.stream.Collectors; import org.reactivestreams.Publisher; - import org.springframework.data.geo.Circle; import org.springframework.data.geo.Distance; import org.springframework.data.geo.GeoResult; @@ -40,6 +39,7 @@ import org.springframework.data.redis.domain.geo.GeoReference.GeoMemberReference; import org.springframework.data.redis.domain.geo.GeoShape; import org.springframework.data.redis.serializer.RedisSerializationContext; +import org.springframework.lang.Nullable; import org.springframework.util.Assert; /** @@ -321,13 +321,14 @@ private ByteBuffer rawValue(V value) { return serializationContext.getValueSerializationPair().write(value); } + @Nullable private V readValue(ByteBuffer buffer) { return serializationContext.getValueSerializationPair().read(buffer); } private GeoResult> readGeoResult(GeoResult> source) { - return new GeoResult<>(new GeoLocation(readValue(source.getContent().getName()), source.getContent().getPoint()), + return new GeoResult<>(new GeoLocation<>(readValue(source.getContent().getName()), source.getContent().getPoint()), source.getDistance()); } } diff --git a/src/main/java/org/springframework/data/redis/core/DefaultReactiveHashOperations.java b/src/main/java/org/springframework/data/redis/core/DefaultReactiveHashOperations.java index 21cab4155a..518899d34e 100644 --- a/src/main/java/org/springframework/data/redis/core/DefaultReactiveHashOperations.java +++ b/src/main/java/org/springframework/data/redis/core/DefaultReactiveHashOperations.java @@ -26,10 +26,11 @@ import java.util.function.Function; import org.reactivestreams.Publisher; - +import org.springframework.dao.InvalidDataAccessApiUsageException; import org.springframework.data.redis.connection.ReactiveHashCommands; import org.springframework.data.redis.connection.convert.Converters; import org.springframework.data.redis.serializer.RedisSerializationContext; +import org.springframework.lang.Nullable; import org.springframework.util.Assert; /** @@ -127,7 +128,7 @@ public Mono randomKey(H key) { Assert.notNull(key, "Key must not be null"); return template.doCreateMono(connection -> connection // - .hashCommands().hRandField(rawKey(key))).map(this::readHashKey); + .hashCommands().hRandField(rawKey(key))).map(this::readRequiredHashKey); } @Override @@ -145,7 +146,7 @@ public Flux randomKeys(H key, long count) { Assert.notNull(key, "Key must not be null"); return template.doCreateFlux(connection -> connection // - .hashCommands().hRandField(rawKey(key), count)).map(this::readHashKey); + .hashCommands().hRandField(rawKey(key), count)).map(this::readRequiredHashKey); } @Override @@ -163,7 +164,7 @@ public Flux keys(H key) { Assert.notNull(key, "Key must not be null"); return createFlux(connection -> connection.hKeys(rawKey(key)) // - .map(this::readHashKey)); + .map(this::readRequiredHashKey)); } @Override @@ -211,7 +212,7 @@ public Flux values(H key) { Assert.notNull(key, "Key must not be null"); return createFlux(connection -> connection.hVals(rawKey(key)) // - .map(this::readHashValue)); + .map(this::readRequiredHashValue)); } @Override @@ -268,13 +269,37 @@ private ByteBuffer rawHashValue(HV key) { } @SuppressWarnings("unchecked") + @Nullable private HK readHashKey(ByteBuffer value) { return (HK) serializationContext.getHashKeySerializationPair().read(value); } + private HK readRequiredHashKey(ByteBuffer buffer) { + + HK hashKey = readHashKey(buffer); + + if (hashKey == null) { + throw new InvalidDataAccessApiUsageException("Deserialized hash key is null"); + } + + return hashKey; + } + @SuppressWarnings("unchecked") - private HV readHashValue(ByteBuffer value) { - return (HV) (value == null ? value : serializationContext.getHashValueSerializationPair().read(value)); + @Nullable + private HV readHashValue(@Nullable ByteBuffer value) { + return (HV) (value == null ? null : serializationContext.getHashValueSerializationPair().read(value)); + } + + private HV readRequiredHashValue(ByteBuffer buffer) { + + HV hashValue = readHashValue(buffer); + + if (hashValue == null) { + throw new InvalidDataAccessApiUsageException("Deserialized hash value is null"); + } + + return hashValue; } private Map.Entry deserializeHashEntry(Map.Entry source) { diff --git a/src/main/java/org/springframework/data/redis/core/DefaultReactiveListOperations.java b/src/main/java/org/springframework/data/redis/core/DefaultReactiveListOperations.java index 20b7b08dfd..e230026462 100644 --- a/src/main/java/org/springframework/data/redis/core/DefaultReactiveListOperations.java +++ b/src/main/java/org/springframework/data/redis/core/DefaultReactiveListOperations.java @@ -27,11 +27,13 @@ import java.util.function.Function; import org.reactivestreams.Publisher; +import org.springframework.dao.InvalidDataAccessApiUsageException; import org.springframework.data.redis.connection.ReactiveListCommands; import org.springframework.data.redis.connection.ReactiveListCommands.Direction; import org.springframework.data.redis.connection.ReactiveListCommands.LPosCommand; import org.springframework.data.redis.connection.RedisListCommands.Position; import org.springframework.data.redis.serializer.RedisSerializationContext; +import org.springframework.lang.Nullable; import org.springframework.util.Assert; /** @@ -58,7 +60,7 @@ public Flux range(K key, long start, long end) { Assert.notNull(key, "Key must not be null"); - return createFlux(connection -> connection.lRange(rawKey(key), start, end).map(this::readValue)); + return createFlux(connection -> connection.lRange(rawKey(key), start, end).map(this::readRequiredValue)); } @Override @@ -170,7 +172,8 @@ public Mono move(K sourceKey, Direction from, K destinationKey, Direction to) Assert.notNull(to, "To direction must not be null"); return createMono( - connection -> connection.lMove(rawKey(sourceKey), rawKey(destinationKey), from, to).map(this::readValue)); + connection -> connection.lMove(rawKey(sourceKey), rawKey(destinationKey), from, to) + .map(this::readRequiredValue)); } @Override @@ -183,7 +186,7 @@ public Mono move(K sourceKey, Direction from, K destinationKey, Direction to, Assert.notNull(timeout, "Timeout must not be null"); return createMono(connection -> connection.bLMove(rawKey(sourceKey), rawKey(destinationKey), from, to, timeout) - .map(this::readValue)); + .map(this::readRequiredValue)); } @Override @@ -208,7 +211,7 @@ public Mono index(K key, long index) { Assert.notNull(key, "Key must not be null"); - return createMono(connection -> connection.lIndex(rawKey(key), index).map(this::readValue)); + return createMono(connection -> connection.lIndex(rawKey(key), index).map(this::readRequiredValue)); } @Override @@ -232,7 +235,7 @@ public Mono leftPop(K key) { Assert.notNull(key, "Key must not be null"); - return createMono(connection -> connection.lPop(rawKey(key)).map(this::readValue)); + return createMono(connection -> connection.lPop(rawKey(key)).map(this::readRequiredValue)); } @@ -244,7 +247,7 @@ public Mono leftPop(K key, Duration timeout) { Assert.isTrue(isZeroOrGreater1Second(timeout), "Duration must be either zero or greater or equal to 1 second"); return createMono(connection -> connection.blPop(Collections.singletonList(rawKey(key)), timeout) - .map(popResult -> readValue(popResult.getValue()))); + .mapNotNull(popResult -> readValue(popResult.getValue()))); } @Override @@ -252,7 +255,7 @@ public Mono rightPop(K key) { Assert.notNull(key, "Key must not be null"); - return createMono(connection -> connection.rPop(rawKey(key)).map(this::readValue)); + return createMono(connection -> connection.rPop(rawKey(key)).map(this::readRequiredValue)); } @Override @@ -263,7 +266,7 @@ public Mono rightPop(K key, Duration timeout) { Assert.isTrue(isZeroOrGreater1Second(timeout), "Duration must be either zero or greater or equal to 1 second"); return createMono(connection -> connection.brPop(Collections.singletonList(rawKey(key)), timeout) - .map(popResult -> readValue(popResult.getValue()))); + .mapNotNull(popResult -> readValue(popResult.getValue()))); } @Override @@ -273,7 +276,7 @@ public Mono rightPopAndLeftPush(K sourceKey, K destinationKey) { Assert.notNull(destinationKey, "Destination key must not be null"); return createMono( - connection -> connection.rPopLPush(rawKey(sourceKey), rawKey(destinationKey)).map(this::readValue)); + connection -> connection.rPopLPush(rawKey(sourceKey), rawKey(destinationKey)).map(this::readRequiredValue)); } @Override @@ -285,7 +288,8 @@ public Mono rightPopAndLeftPush(K sourceKey, K destinationKey, Duration timeo Assert.isTrue(isZeroOrGreater1Second(timeout), "Duration must be either zero or greater or equal to 1 second"); return createMono( - connection -> connection.bRPopLPush(rawKey(sourceKey), rawKey(destinationKey), timeout).map(this::readValue)); + connection -> connection.bRPopLPush(rawKey(sourceKey), rawKey(destinationKey), timeout) + .map(this::readRequiredValue)); } @Override @@ -322,7 +326,19 @@ private ByteBuffer rawValue(V value) { return serializationContext.getValueSerializationPair().write(value); } + @Nullable private V readValue(ByteBuffer buffer) { return serializationContext.getValueSerializationPair().read(buffer); } + + private V readRequiredValue(ByteBuffer buffer) { + + V v = readValue(buffer); + + if (v == null) { + throw new InvalidDataAccessApiUsageException("Deserialized list value is null"); + } + + return v; + } } diff --git a/src/main/java/org/springframework/data/redis/core/DefaultReactiveSetOperations.java b/src/main/java/org/springframework/data/redis/core/DefaultReactiveSetOperations.java index d0bd9acc4b..7f4c179d82 100644 --- a/src/main/java/org/springframework/data/redis/core/DefaultReactiveSetOperations.java +++ b/src/main/java/org/springframework/data/redis/core/DefaultReactiveSetOperations.java @@ -28,8 +28,10 @@ import java.util.function.Function; import org.reactivestreams.Publisher; +import org.springframework.dao.InvalidDataAccessApiUsageException; import org.springframework.data.redis.connection.ReactiveSetCommands; import org.springframework.data.redis.serializer.RedisSerializationContext; +import org.springframework.lang.Nullable; import org.springframework.util.Assert; /** @@ -88,7 +90,7 @@ public Mono pop(K key) { Assert.notNull(key, "Key must not be null"); - return createMono(connection -> connection.sPop(rawKey(key)).map(this::readValue)); + return createMono(connection -> connection.sPop(rawKey(key)).map(this::readRequiredValue)); } @Override @@ -96,7 +98,7 @@ public Flux pop(K key, long count) { Assert.notNull(key, "Key must not be null"); - return createFlux(connection -> connection.sPop(rawKey(key), count).map(this::readValue)); + return createFlux(connection -> connection.sPop(rawKey(key), count).map(this::readRequiredValue)); } @Override @@ -176,7 +178,7 @@ public Flux intersect(Collection keys) { .map(this::rawKey) // .collectList() // .flatMapMany(connection::sInter) // - .map(this::readValue)); + .map(this::readRequiredValue)); } @Override @@ -238,7 +240,7 @@ public Flux union(Collection keys) { .map(this::rawKey) // .collectList() // .flatMapMany(connection::sUnion) // - .map(this::readValue)); + .map(this::readRequiredValue)); } @Override @@ -300,7 +302,7 @@ public Flux difference(Collection keys) { .map(this::rawKey) // .collectList() // .flatMapMany(connection::sDiff) // - .map(this::readValue)); + .map(this::readRequiredValue)); } @Override @@ -340,7 +342,7 @@ public Flux members(K key) { Assert.notNull(key, "Key must not be null"); - return createFlux(connection -> connection.sMembers(rawKey(key)).map(this::readValue)); + return createFlux(connection -> connection.sMembers(rawKey(key)).map(this::readRequiredValue)); } @Override @@ -349,7 +351,7 @@ public Flux scan(K key, ScanOptions options) { Assert.notNull(key, "Key must not be null"); Assert.notNull(options, "ScanOptions must not be null"); - return createFlux(connection -> connection.sScan(rawKey(key), options).map(this::readValue)); + return createFlux(connection -> connection.sScan(rawKey(key), options).map(this::readRequiredValue)); } @Override @@ -357,7 +359,7 @@ public Mono randomMember(K key) { Assert.notNull(key, "Key must not be null"); - return createMono(connection -> connection.sRandMember(rawKey(key)).map(this::readValue)); + return createMono(connection -> connection.sRandMember(rawKey(key)).map(this::readRequiredValue)); } @Override @@ -365,7 +367,7 @@ public Flux distinctRandomMembers(K key, long count) { Assert.isTrue(count > 0, "Negative count not supported; Use randomMembers to allow duplicate elements"); - return createFlux(connection -> connection.sRandMember(rawKey(key), count).map(this::readValue)); + return createFlux(connection -> connection.sRandMember(rawKey(key), count).map(this::readRequiredValue)); } @Override @@ -373,7 +375,7 @@ public Flux randomMembers(K key, long count) { Assert.isTrue(count > 0, "Use a positive number for count; This method is already allowing duplicate elements"); - return createFlux(connection -> connection.sRandMember(rawKey(key), -count).map(this::readValue)); + return createFlux(connection -> connection.sRandMember(rawKey(key), -count).map(this::readRequiredValue)); } @Override @@ -416,7 +418,19 @@ private ByteBuffer rawValue(V value) { return serializationContext.getValueSerializationPair().write(value); } + @Nullable private V readValue(ByteBuffer buffer) { return serializationContext.getValueSerializationPair().read(buffer); } + + private V readRequiredValue(ByteBuffer buffer) { + + V v = readValue(buffer); + + if (v == null) { + throw new InvalidDataAccessApiUsageException("Deserialized set value is null"); + } + + return v; + } } diff --git a/src/main/java/org/springframework/data/redis/core/DefaultReactiveValueOperations.java b/src/main/java/org/springframework/data/redis/core/DefaultReactiveValueOperations.java index 4951e5dfaa..e26f9b4022 100644 --- a/src/main/java/org/springframework/data/redis/core/DefaultReactiveValueOperations.java +++ b/src/main/java/org/springframework/data/redis/core/DefaultReactiveValueOperations.java @@ -27,12 +27,14 @@ import java.util.function.Function; import org.reactivestreams.Publisher; +import org.springframework.dao.InvalidDataAccessApiUsageException; import org.springframework.data.redis.connection.BitFieldSubCommands; import org.springframework.data.redis.connection.ReactiveStringCommands; import org.springframework.data.redis.connection.RedisStringCommands.SetOption; import org.springframework.data.redis.core.types.Expiration; import org.springframework.data.redis.serializer.RedisSerializationContext; import org.springframework.data.redis.serializer.RedisSerializationContext.SerializationPair; +import org.springframework.lang.Nullable; import org.springframework.util.Assert; /** @@ -146,7 +148,7 @@ public Mono get(Object key) { Assert.notNull(key, "Key must not be null"); return createMono(connection -> connection.get(rawKey((K) key)) // - .map(this::readValue)); + .map(this::readRequiredValue)); } @Override @@ -155,7 +157,7 @@ public Mono getAndDelete(K key) { Assert.notNull(key, "Key must not be null"); return createMono(connection -> connection.getDel(rawKey(key)) // - .map(this::readValue)); + .map(this::readRequiredValue)); } @Override @@ -165,7 +167,7 @@ public Mono getAndExpire(K key, Duration timeout) { Assert.notNull(timeout, "Timeout must not be null"); return createMono(connection -> connection.getEx(rawKey(key), Expiration.from(timeout)) // - .map(this::readValue)); + .map(this::readRequiredValue)); } @Override @@ -174,7 +176,7 @@ public Mono getAndPersist(K key) { Assert.notNull(key, "Key must not be null"); return createMono(connection -> connection.getEx(rawKey(key), Expiration.persistent()) // - .map(this::readValue)); + .map(this::readRequiredValue)); } @Override @@ -182,7 +184,7 @@ public Mono getAndSet(K key, V value) { Assert.notNull(key, "Key must not be null"); - return createMono(connection -> connection.getSet(rawKey(key), rawValue(value)).map(value()::read)); + return createMono(connection -> connection.getSet(rawKey(key), rawValue(value)).mapNotNull(value()::read)); } @Override @@ -250,7 +252,7 @@ public Mono get(K key, long start, long end) { Assert.notNull(key, "Key must not be null"); return createMono(connection -> connection.getRange(rawKey(key), start, end) // - .map(stringSerializationPair()::read)); + .mapNotNull(stringSerializationPair()::read)); } @Override @@ -317,10 +319,22 @@ private ByteBuffer rawValue(V value) { return serializationContext.getValueSerializationPair().write(value); } + @Nullable private V readValue(ByteBuffer buffer) { return serializationContext.getValueSerializationPair().read(buffer); } + private V readRequiredValue(ByteBuffer buffer) { + + V v = readValue(buffer); + + if (v == null) { + throw new InvalidDataAccessApiUsageException("Deserialized value is null"); + } + + return v; + } + private SerializationPair stringSerializationPair() { return serializationContext.getStringSerializationPair(); } @@ -348,4 +362,5 @@ private List deserializeValues(List source) { return result; } + } diff --git a/src/main/java/org/springframework/data/redis/core/DefaultReactiveZSetOperations.java b/src/main/java/org/springframework/data/redis/core/DefaultReactiveZSetOperations.java index 2ff8d6b831..ce2f40664d 100644 --- a/src/main/java/org/springframework/data/redis/core/DefaultReactiveZSetOperations.java +++ b/src/main/java/org/springframework/data/redis/core/DefaultReactiveZSetOperations.java @@ -27,7 +27,7 @@ import java.util.function.Function; import org.reactivestreams.Publisher; - +import org.springframework.dao.InvalidDataAccessApiUsageException; import org.springframework.data.domain.Range; import org.springframework.data.redis.connection.Limit; import org.springframework.data.redis.connection.ReactiveZSetCommands; @@ -38,6 +38,7 @@ import org.springframework.data.redis.core.ZSetOperations.TypedTuple; import org.springframework.data.redis.serializer.RedisSerializationContext; import org.springframework.data.redis.util.ByteUtils; +import org.springframework.lang.Nullable; import org.springframework.util.Assert; /** @@ -110,7 +111,7 @@ public Mono randomMember(K key) { Assert.notNull(key, "Key must not be null"); - return createMono(connection -> connection.zRandMember(rawKey(key))).map(this::readValue); + return createMono(connection -> connection.zRandMember(rawKey(key))).map(this::readRequiredValue); } @Override @@ -119,7 +120,7 @@ public Flux distinctRandomMembers(K key, long count) { Assert.notNull(key, "Key must not be null"); Assert.isTrue(count > 0, "Negative count not supported; Use randomMembers to allow duplicate elements"); - return createFlux(connection -> connection.zRandMember(rawKey(key), count)).map(this::readValue); + return createFlux(connection -> connection.zRandMember(rawKey(key), count)).map(this::readRequiredValue); } @Override @@ -128,7 +129,7 @@ public Flux randomMembers(K key, long count) { Assert.notNull(key, "Key must not be null"); Assert.isTrue(count > 0, "Use a positive number for count; This method is already allowing duplicate elements"); - return createFlux(connection -> connection.zRandMember(rawKey(key), -count)).map(this::readValue); + return createFlux(connection -> connection.zRandMember(rawKey(key), -count)).map(this::readRequiredValue); } @Override @@ -181,7 +182,7 @@ public Flux range(K key, Range range) { Assert.notNull(key, "Key must not be null"); Assert.notNull(range, "Range must not be null"); - return createFlux(connection -> connection.zRange(rawKey(key), range).map(this::readValue)); + return createFlux(connection -> connection.zRange(rawKey(key), range).map(this::readRequiredValue)); } @Override @@ -199,7 +200,7 @@ public Flux rangeByScore(K key, Range range) { Assert.notNull(key, "Key must not be null"); Assert.notNull(range, "Range must not be null"); - return createFlux(connection -> connection.zRangeByScore(rawKey(key), range).map(this::readValue)); + return createFlux(connection -> connection.zRangeByScore(rawKey(key), range).map(this::readRequiredValue)); } @Override @@ -217,7 +218,7 @@ public Flux rangeByScore(K key, Range range, Limit limit) { Assert.notNull(key, "Key must not be null"); Assert.notNull(range, "Range must not be null"); - return createFlux(connection -> connection.zRangeByScore(rawKey(key), range, limit).map(this::readValue)); + return createFlux(connection -> connection.zRangeByScore(rawKey(key), range, limit).map(this::readRequiredValue)); } @Override @@ -237,7 +238,7 @@ public Flux reverseRange(K key, Range range) { Assert.notNull(key, "Key must not be null"); Assert.notNull(range, "Range must not be null"); - return createFlux(connection -> connection.zRevRange(rawKey(key), range).map(this::readValue)); + return createFlux(connection -> connection.zRevRange(rawKey(key), range).map(this::readRequiredValue)); } @Override @@ -255,7 +256,7 @@ public Flux reverseRangeByScore(K key, Range range) { Assert.notNull(key, "Key must not be null"); Assert.notNull(range, "Range must not be null"); - return createFlux(connection -> connection.zRevRangeByScore(rawKey(key), range).map(this::readValue)); + return createFlux(connection -> connection.zRevRangeByScore(rawKey(key), range).map(this::readRequiredValue)); } @Override @@ -274,7 +275,8 @@ public Flux reverseRangeByScore(K key, Range range, Limit limit) { Assert.notNull(key, "Key must not be null"); Assert.notNull(range, "Range must not be null"); - return createFlux(connection -> connection.zRevRangeByScore(rawKey(key), range, limit).map(this::readValue)); + return createFlux( + connection -> connection.zRevRangeByScore(rawKey(key), range, limit).map(this::readRequiredValue)); } @Override @@ -474,7 +476,7 @@ public Flux difference(K key, Collection otherKeys) { return createFlux(connection -> Flux.fromIterable(getKeys(key, otherKeys)) // .map(this::rawKey) // .collectList() // - .flatMapMany(connection::zDiff).map(this::readValue)); + .flatMapMany(connection::zDiff).map(this::readRequiredValue)); } @Override @@ -512,7 +514,7 @@ public Flux intersect(K key, Collection otherKeys) { return createFlux(connection -> Flux.fromIterable(getKeys(key, otherKeys)) // .map(this::rawKey) // .collectList() // - .flatMapMany(connection::zInter).map(this::readValue)); + .flatMapMany(connection::zInter).map(this::readRequiredValue)); } @Override @@ -580,7 +582,7 @@ public Flux union(K key, Collection otherKeys) { return createFlux(connection -> Flux.fromIterable(getKeys(key, otherKeys)) // .map(this::rawKey) // .collectList() // - .flatMapMany(connection::zUnion).map(this::readValue)); + .flatMapMany(connection::zUnion).map(this::readRequiredValue)); } @Override @@ -653,7 +655,7 @@ public Flux rangeByLex(K key, Range range) { Assert.notNull(key, "Key must not be null"); Assert.notNull(range, "Range must not be null"); - return createFlux(connection -> connection.zRangeByLex(rawKey(key), range).map(this::readValue)); + return createFlux(connection -> connection.zRangeByLex(rawKey(key), range).map(this::readRequiredValue)); } @Override @@ -663,7 +665,7 @@ public Flux rangeByLex(K key, Range range, Limit limit) { Assert.notNull(range, "Range must not be null"); Assert.notNull(limit, "Limit must not be null"); - return createFlux(connection -> connection.zRangeByLex(rawKey(key), range, limit).map(this::readValue)); + return createFlux(connection -> connection.zRangeByLex(rawKey(key), range, limit).map(this::readRequiredValue)); } @Override @@ -672,7 +674,7 @@ public Flux reverseRangeByLex(K key, Range range) { Assert.notNull(key, "Key must not be null"); Assert.notNull(range, "Range must not be null"); - return createFlux(connection -> connection.zRevRangeByLex(rawKey(key), range).map(this::readValue)); + return createFlux(connection -> connection.zRevRangeByLex(rawKey(key), range).map(this::readRequiredValue)); } @Override @@ -682,7 +684,7 @@ public Flux reverseRangeByLex(K key, Range range, Limit limit) { Assert.notNull(range, "Range must not be null"); Assert.notNull(limit, "Limit must not be null"); - return createFlux(connection -> connection.zRevRangeByLex(rawKey(key), range, limit).map(this::readValue)); + return createFlux(connection -> connection.zRevRangeByLex(rawKey(key), range, limit).map(this::readRequiredValue)); } @Override @@ -725,10 +727,22 @@ private ByteBuffer rawValue(V value) { return serializationContext.getValueSerializationPair().write(value); } + @Nullable private V readValue(ByteBuffer buffer) { return serializationContext.getValueSerializationPair().read(buffer); } + private V readRequiredValue(ByteBuffer buffer) { + + V v = readValue(buffer); + + if (v == null) { + throw new InvalidDataAccessApiUsageException("Deserialized sorted set value is null"); + } + + return v; + } + private TypedTuple readTypedTuple(Tuple raw) { return new DefaultTypedTuple<>(readValue(ByteBuffer.wrap(raw.getValue())), raw.getScore()); } diff --git a/src/main/java/org/springframework/data/redis/core/ReactiveGeoOperations.java b/src/main/java/org/springframework/data/redis/core/ReactiveGeoOperations.java index c5eb41b970..99974cac70 100644 --- a/src/main/java/org/springframework/data/redis/core/ReactiveGeoOperations.java +++ b/src/main/java/org/springframework/data/redis/core/ReactiveGeoOperations.java @@ -35,7 +35,7 @@ import org.springframework.data.redis.domain.geo.GeoShape; /** - * Reactive Redis operations for geo commands. + * Reactive Redis operations for Geo Commands. * * @author Mark Paluch * @author Christoph Strobl @@ -289,8 +289,7 @@ default Flux>> search(K key, GeoReference reference, * @since 2.6 * @see Redis Documentation: GEOSEARCH */ - default Flux>> search(K key, GeoReference reference, - BoundingBox boundingBox) { + default Flux>> search(K key, GeoReference reference, BoundingBox boundingBox) { return search(key, reference, boundingBox, GeoSearchCommandArgs.newGeoSearchArgs()); } @@ -306,8 +305,8 @@ default Flux>> search(K key, GeoReference reference, * @since 2.6 * @see Redis Documentation: GEOSEARCH */ - default Flux>> search(K key, GeoReference reference, - BoundingBox boundingBox, GeoSearchCommandArgs args) { + default Flux>> search(K key, GeoReference reference, BoundingBox boundingBox, + GeoSearchCommandArgs args) { return search(key, reference, GeoShape.byBox(boundingBox), args); } @@ -383,8 +382,7 @@ default Mono searchAndStore(K key, K destKey, GeoReference reference, D * @since 2.6 * @see Redis Documentation: GEOSEARCHSTORE */ - default Mono searchAndStore(K key, K destKey, GeoReference reference, - BoundingBox boundingBox) { + default Mono searchAndStore(K key, K destKey, GeoReference reference, BoundingBox boundingBox) { return searchAndStore(key, destKey, reference, boundingBox, GeoSearchStoreCommandArgs.newGeoSearchStoreArgs()); } @@ -400,8 +398,8 @@ default Mono searchAndStore(K key, K destKey, GeoReference reference, * @since 2.6 * @see Redis Documentation: GEOSEARCHSTORE */ - default Mono searchAndStore(K key, K destKey, GeoReference reference, - BoundingBox boundingBox, GeoSearchStoreCommandArgs args) { + default Mono searchAndStore(K key, K destKey, GeoReference reference, BoundingBox boundingBox, + GeoSearchStoreCommandArgs args) { return searchAndStore(key, destKey, reference, GeoShape.byBox(boundingBox), args); } diff --git a/src/main/java/org/springframework/data/redis/core/ReactiveHashOperations.java b/src/main/java/org/springframework/data/redis/core/ReactiveHashOperations.java index b0d8efddfc..9516bdc508 100644 --- a/src/main/java/org/springframework/data/redis/core/ReactiveHashOperations.java +++ b/src/main/java/org/springframework/data/redis/core/ReactiveHashOperations.java @@ -18,12 +18,18 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import java.nio.ByteBuffer; import java.util.Collection; import java.util.List; import java.util.Map; /** - * Redis map specific operations working on a hash. + * Reactive Redis operations for Hash Commands. + *

+ * Streams of methods returning {@code Mono} or {@code Flux} are terminated with + * {@link org.springframework.dao.InvalidDataAccessApiUsageException} when + * {@link org.springframework.data.redis.serializer.RedisElementReader#read(ByteBuffer)} returns {@code null} for a + * particular element as Reactive Streams prohibit the usage of {@code null} values. * * @author Mark Paluch * @author Christoph Strobl diff --git a/src/main/java/org/springframework/data/redis/core/ReactiveHyperLogLogOperations.java b/src/main/java/org/springframework/data/redis/core/ReactiveHyperLogLogOperations.java index 8dbe30ed92..d19da8fd66 100644 --- a/src/main/java/org/springframework/data/redis/core/ReactiveHyperLogLogOperations.java +++ b/src/main/java/org/springframework/data/redis/core/ReactiveHyperLogLogOperations.java @@ -18,7 +18,7 @@ import reactor.core.publisher.Mono; /** - * Redis cardinality specific operations working on a HyperLogLog multiset. + * Reactive Redis operations for working on a HyperLogLog multiset. * * @author Mark Paluch * @since 2.0 diff --git a/src/main/java/org/springframework/data/redis/core/ReactiveListOperations.java b/src/main/java/org/springframework/data/redis/core/ReactiveListOperations.java index e77f413a21..edf0360ff2 100644 --- a/src/main/java/org/springframework/data/redis/core/ReactiveListOperations.java +++ b/src/main/java/org/springframework/data/redis/core/ReactiveListOperations.java @@ -20,6 +20,7 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import java.nio.ByteBuffer; import java.time.Duration; import java.util.Collection; @@ -28,7 +29,12 @@ import org.springframework.util.Assert; /** - * Redis list specific operations. + * Reactive Redis operations for List Commands. + *

+ * Streams of methods returning {@code Mono} or {@code Flux} are terminated with + * {@link org.springframework.dao.InvalidDataAccessApiUsageException} when + * {@link org.springframework.data.redis.serializer.RedisElementReader#read(ByteBuffer)} returns {@code null} for a + * particular element as Reactive Streams prohibit the usage of {@code null} values. * * @author Mark Paluch * @author Christoph Strobl diff --git a/src/main/java/org/springframework/data/redis/core/ReactiveRedisOperations.java b/src/main/java/org/springframework/data/redis/core/ReactiveRedisOperations.java index 19f02af13c..8424daf4aa 100644 --- a/src/main/java/org/springframework/data/redis/core/ReactiveRedisOperations.java +++ b/src/main/java/org/springframework/data/redis/core/ReactiveRedisOperations.java @@ -18,6 +18,7 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import java.nio.ByteBuffer; import java.time.Duration; import java.time.Instant; import java.util.Arrays; @@ -41,6 +42,11 @@ /** * Interface that specified a basic set of Redis operations, implemented by {@link ReactiveRedisTemplate}. Not often * used but a useful option for extensibility and testability (as it can be easily mocked or stubbed). + *

+ * Streams of methods returning {@code Mono} or {@code Flux} are terminated with + * {@link org.springframework.dao.InvalidDataAccessApiUsageException} when + * {@link org.springframework.data.redis.serializer.RedisElementReader#read(ByteBuffer)} returns {@code null} for a + * particular element as Reactive Streams prohibit the usage of {@code null} values. * * @author Mark Paluch * @author Christoph Strobl diff --git a/src/main/java/org/springframework/data/redis/core/ReactiveRedisTemplate.java b/src/main/java/org/springframework/data/redis/core/ReactiveRedisTemplate.java index da4c76825a..6745a47a5c 100644 --- a/src/main/java/org/springframework/data/redis/core/ReactiveRedisTemplate.java +++ b/src/main/java/org/springframework/data/redis/core/ReactiveRedisTemplate.java @@ -27,7 +27,7 @@ import java.util.stream.Collectors; import org.reactivestreams.Publisher; - +import org.springframework.dao.InvalidDataAccessApiUsageException; import org.springframework.data.redis.connection.DataType; import org.springframework.data.redis.connection.ReactiveRedisConnection; import org.springframework.data.redis.connection.ReactiveRedisConnection.CommandResponse; @@ -55,6 +55,11 @@ *

* Note that while the template is generified, it is up to the serializers/deserializers to properly convert the given * Objects to and from binary data. + *

+ * Streams of methods returning {@code Mono} or {@code Flux} are terminated with + * {@link org.springframework.dao.InvalidDataAccessApiUsageException} when + * {@link org.springframework.data.redis.serializer.RedisElementReader#read(ByteBuffer)} returns {@code null} for a + * particular element as Reactive Streams prohibit the usage of {@code null} values. * * @author Mark Paluch * @author Christoph Strobl @@ -331,7 +336,7 @@ public Flux keys(K pattern) { return doCreateFlux(connection -> connection.keyCommands().keys(rawKey(pattern))) // .flatMap(Flux::fromIterable) // - .map(this::readKey); + .map(this::readRequiredKey); } @Override @@ -340,12 +345,12 @@ public Flux scan(ScanOptions options) { Assert.notNull(options, "ScanOptions must not be null"); return doCreateFlux(connection -> connection.keyCommands().scan(options)) // - .map(this::readKey); + .map(this::readRequiredKey); } @Override public Mono randomKey() { - return doCreateMono(connection -> connection.keyCommands().randomKey()).map(this::readKey); + return doCreateMono(connection -> connection.keyCommands().randomKey()).map(this::readRequiredKey); } @Override @@ -666,7 +671,19 @@ private ByteBuffer rawKey(K key) { return getSerializationContext().getKeySerializationPair().getWriter().write(key); } + @Nullable private K readKey(ByteBuffer buffer) { return getSerializationContext().getKeySerializationPair().getReader().read(buffer); } + + private K readRequiredKey(ByteBuffer buffer) { + + K key = readKey(buffer); + + if (key == null) { + throw new InvalidDataAccessApiUsageException("Deserialized key is null"); + } + + return key; + } } diff --git a/src/main/java/org/springframework/data/redis/core/ReactiveSetOperations.java b/src/main/java/org/springframework/data/redis/core/ReactiveSetOperations.java index c8c34dd82d..f8d0feff61 100644 --- a/src/main/java/org/springframework/data/redis/core/ReactiveSetOperations.java +++ b/src/main/java/org/springframework/data/redis/core/ReactiveSetOperations.java @@ -18,11 +18,17 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import java.nio.ByteBuffer; import java.util.Collection; import java.util.Map; /** - * Redis set specific operations. + * Reactive Redis operations for Set Commands. + *

+ * Streams of methods returning {@code Mono} or {@code Flux} are terminated with + * {@link org.springframework.dao.InvalidDataAccessApiUsageException} when + * {@link org.springframework.data.redis.serializer.RedisElementReader#read(ByteBuffer)} returns {@code null} for a + * particular element as Reactive Streams prohibit the usage of {@code null} values. * * @author Mark Paluch * @author Christoph Strobl diff --git a/src/main/java/org/springframework/data/redis/core/ReactiveStreamOperations.java b/src/main/java/org/springframework/data/redis/core/ReactiveStreamOperations.java index 1002a660b5..9314bc2383 100644 --- a/src/main/java/org/springframework/data/redis/core/ReactiveStreamOperations.java +++ b/src/main/java/org/springframework/data/redis/core/ReactiveStreamOperations.java @@ -23,32 +23,20 @@ import java.util.Map; import org.reactivestreams.Publisher; - import org.springframework.data.domain.Range; import org.springframework.data.redis.connection.Limit; import org.springframework.data.redis.connection.RedisStreamCommands.XClaimOptions; -import org.springframework.data.redis.connection.stream.ByteBufferRecord; -import org.springframework.data.redis.connection.stream.Consumer; -import org.springframework.data.redis.connection.stream.MapRecord; -import org.springframework.data.redis.connection.stream.ObjectRecord; -import org.springframework.data.redis.connection.stream.PendingMessage; -import org.springframework.data.redis.connection.stream.PendingMessages; -import org.springframework.data.redis.connection.stream.PendingMessagesSummary; -import org.springframework.data.redis.connection.stream.ReadOffset; +import org.springframework.data.redis.connection.stream.*; import org.springframework.data.redis.connection.stream.Record; -import org.springframework.data.redis.connection.stream.RecordId; import org.springframework.data.redis.connection.stream.StreamInfo.XInfoConsumer; import org.springframework.data.redis.connection.stream.StreamInfo.XInfoGroup; import org.springframework.data.redis.connection.stream.StreamInfo.XInfoStream; -import org.springframework.data.redis.connection.stream.StreamOffset; -import org.springframework.data.redis.connection.stream.StreamReadOptions; -import org.springframework.data.redis.connection.stream.StreamRecords; import org.springframework.data.redis.hash.HashMapper; import org.springframework.lang.Nullable; import org.springframework.util.Assert; /** - * Redis stream specific operations. + * Reactive Redis operations for Stream Commands. * * @author Mark Paluch * @author Christoph Strobl diff --git a/src/main/java/org/springframework/data/redis/core/ReactiveValueOperations.java b/src/main/java/org/springframework/data/redis/core/ReactiveValueOperations.java index 639c121f42..be8a0c0fbe 100644 --- a/src/main/java/org/springframework/data/redis/core/ReactiveValueOperations.java +++ b/src/main/java/org/springframework/data/redis/core/ReactiveValueOperations.java @@ -17,6 +17,7 @@ import reactor.core.publisher.Mono; +import java.nio.ByteBuffer; import java.time.Duration; import java.util.Collection; import java.util.List; @@ -26,6 +27,11 @@ /** * Reactive Redis operations for simple (or in Redis terminology 'string') values. + *

+ * Streams of methods returning {@code Mono} or {@code Flux} are terminated with + * {@link org.springframework.dao.InvalidDataAccessApiUsageException} when + * {@link org.springframework.data.redis.serializer.RedisElementReader#read(ByteBuffer)} returns {@code null} for a + * particular element as Reactive Streams prohibit the usage of {@code null} values. * * @author Mark Paluch * @author Jiahe Cai diff --git a/src/main/java/org/springframework/data/redis/core/ReactiveZSetOperations.java b/src/main/java/org/springframework/data/redis/core/ReactiveZSetOperations.java index ecbffc9eab..1416b92c7e 100644 --- a/src/main/java/org/springframework/data/redis/core/ReactiveZSetOperations.java +++ b/src/main/java/org/springframework/data/redis/core/ReactiveZSetOperations.java @@ -18,6 +18,7 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import java.nio.ByteBuffer; import java.time.Duration; import java.util.Collection; import java.util.Collections; @@ -32,7 +33,12 @@ import org.springframework.lang.Nullable; /** - * Redis ZSet/sorted set specific operations. + * Reactive Redis operations for Sorted (ZSet) Commands. + *

+ * Streams of methods returning {@code Mono} or {@code Flux} are terminated with + * {@link org.springframework.dao.InvalidDataAccessApiUsageException} when + * {@link org.springframework.data.redis.serializer.RedisElementReader#read(ByteBuffer)} returns {@code null} for a + * particular element as Reactive Streams prohibit the usage of {@code null} values. * * @author Mark Paluch * @author Christoph Strobl diff --git a/src/main/java/org/springframework/data/redis/core/script/DefaultReactiveScriptExecutor.java b/src/main/java/org/springframework/data/redis/core/script/DefaultReactiveScriptExecutor.java index c3eabf4ac2..b85c13047a 100644 --- a/src/main/java/org/springframework/data/redis/core/script/DefaultReactiveScriptExecutor.java +++ b/src/main/java/org/springframework/data/redis/core/script/DefaultReactiveScriptExecutor.java @@ -22,6 +22,7 @@ import java.util.List; import java.util.stream.Stream; +import org.springframework.dao.InvalidDataAccessApiUsageException; import org.springframework.data.redis.RedisSystemException; import org.springframework.data.redis.connection.ReactiveRedisConnection; import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory; @@ -79,7 +80,6 @@ public Flux execute(RedisScript script, List keys, List args) { } @Override - @SuppressWarnings("unchecked") public Flux execute(RedisScript script, List keys, List args, RedisElementWriter argsWriter, RedisElementReader resultReader) { @@ -134,7 +134,16 @@ protected ByteBuffer scriptBytes(RedisScript script) { } protected Flux deserializeResult(RedisElementReader reader, Flux result) { - return result.map(it -> ScriptUtils.deserializeResult(reader, it)); + return result.map(it -> { + + T value = ScriptUtils.deserializeResult(reader, it); + + if (value == null) { + throw new InvalidDataAccessApiUsageException("Deserialized script result is null"); + } + + return value; + }); } protected SerializationPair keySerializer() { diff --git a/src/main/java/org/springframework/data/redis/core/script/ReactiveScriptExecutor.java b/src/main/java/org/springframework/data/redis/core/script/ReactiveScriptExecutor.java index f61a6cd735..20f8503fbc 100644 --- a/src/main/java/org/springframework/data/redis/core/script/ReactiveScriptExecutor.java +++ b/src/main/java/org/springframework/data/redis/core/script/ReactiveScriptExecutor.java @@ -17,6 +17,7 @@ import reactor.core.publisher.Flux; +import java.nio.ByteBuffer; import java.util.Collections; import java.util.List; @@ -26,6 +27,11 @@ /** * Executes {@link RedisScript}s using reactive infrastructure. + *

+ * Streams of methods returning {@code Mono} or {@code Flux} are terminated with + * {@link org.springframework.dao.InvalidDataAccessApiUsageException} when + * {@link org.springframework.data.redis.serializer.RedisElementReader#read(ByteBuffer)} returns {@code null} for a + * particular element as Reactive Streams prohibit the usage of {@code null} values. * * @author Mark Paluch * @author Christoph Strobl diff --git a/src/main/java/org/springframework/data/redis/core/script/ScriptUtils.java b/src/main/java/org/springframework/data/redis/core/script/ScriptUtils.java index f7fead21a8..8e89cc9190 100644 --- a/src/main/java/org/springframework/data/redis/core/script/ScriptUtils.java +++ b/src/main/java/org/springframework/data/redis/core/script/ScriptUtils.java @@ -22,6 +22,7 @@ import org.springframework.dao.NonTransientDataAccessException; import org.springframework.data.redis.serializer.RedisElementReader; import org.springframework.data.redis.serializer.RedisSerializer; +import org.springframework.lang.Nullable; /** * Utilities for Lua script execution and result deserialization. @@ -71,6 +72,7 @@ static T deserializeResult(RedisSerializer resultSerializer, Object resul * @param result must not be {@literal null}. * @return the deserialized result. */ + @Nullable @SuppressWarnings({ "unchecked" }) static T deserializeResult(RedisElementReader reader, Object result) { diff --git a/src/main/java/org/springframework/data/redis/serializer/RedisSerializationContext.java b/src/main/java/org/springframework/data/redis/serializer/RedisSerializationContext.java index b0b75dfc37..23f54b5c35 100644 --- a/src/main/java/org/springframework/data/redis/serializer/RedisSerializationContext.java +++ b/src/main/java/org/springframework/data/redis/serializer/RedisSerializationContext.java @@ -17,6 +17,7 @@ import java.nio.ByteBuffer; +import org.springframework.lang.Nullable; import org.springframework.util.Assert; /** @@ -269,8 +270,9 @@ static SerializationPair byteBuffer() { * Deserialize a {@link ByteBuffer} into the according type. * * @param buffer must not be {@literal null}. - * @return the deserialized value. + * @return the deserialized value. Can be {@literal null}. */ + @Nullable default T read(ByteBuffer buffer) { return getReader().read(buffer); } diff --git a/src/test/java/org/springframework/data/redis/core/ReactiveStringRedisTemplateIntegrationTests.java b/src/test/java/org/springframework/data/redis/core/ReactiveStringRedisTemplateIntegrationTests.java index 3257300576..0b3c8db611 100644 --- a/src/test/java/org/springframework/data/redis/core/ReactiveStringRedisTemplateIntegrationTests.java +++ b/src/test/java/org/springframework/data/redis/core/ReactiveStringRedisTemplateIntegrationTests.java @@ -20,9 +20,13 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; - +import org.springframework.dao.InvalidDataAccessApiUsageException; import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory; import org.springframework.data.redis.connection.lettuce.extension.LettuceConnectionFactoryExtension; +import org.springframework.data.redis.serializer.RedisElementReader; +import org.springframework.data.redis.serializer.RedisElementWriter; +import org.springframework.data.redis.serializer.RedisSerializationContext; +import org.springframework.data.redis.serializer.StringRedisSerializer; /** * Integration tests for {@link ReactiveStringRedisTemplate}. @@ -55,4 +59,30 @@ void shouldSetAndGetKeys() { template.opsForValue().set("key", "value").as(StepVerifier::create).expectNext(true).verifyComplete(); template.opsForValue().get("key").as(StepVerifier::create).expectNext("value").verifyComplete(); } + + @Test // GH-2655 + void keysFailsOnNullElements() { + + template.opsForValue().set("a", "1").as(StepVerifier::create).expectNext(true).verifyComplete(); + template.opsForValue().set("b", "1").as(StepVerifier::create).expectNext(true).verifyComplete(); + + RedisElementWriter writer = RedisElementWriter.from(StringRedisSerializer.UTF_8); + RedisElementReader reader = RedisElementReader.from(StringRedisSerializer.UTF_8); + RedisSerializationContext nullReadingContext = RedisSerializationContext + . newSerializationContext(StringRedisSerializer.UTF_8).key(buffer -> { + + String read = reader.read(buffer); + if ("a".equals(read)) { + return null; + } + + return read; + }, writer).build(); + + ReactiveRedisTemplate customTemplate = new ReactiveRedisTemplate<>(template.getConnectionFactory(), + nullReadingContext); + + customTemplate.keys("b").as(StepVerifier::create).expectNext("b").verifyComplete(); + customTemplate.keys("a").as(StepVerifier::create).verifyError(InvalidDataAccessApiUsageException.class); + } }