reference, BoundingBox boundingBox,
+ GeoSearchStoreCommandArgs args) {
return searchAndStore(key, destKey, reference, GeoShape.byBox(boundingBox), args);
}
diff --git a/src/main/java/org/springframework/data/redis/core/ReactiveHashOperations.java b/src/main/java/org/springframework/data/redis/core/ReactiveHashOperations.java
index b0d8efddfc..9516bdc508 100644
--- a/src/main/java/org/springframework/data/redis/core/ReactiveHashOperations.java
+++ b/src/main/java/org/springframework/data/redis/core/ReactiveHashOperations.java
@@ -18,12 +18,18 @@
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
+import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.List;
import java.util.Map;
/**
- * Redis map specific operations working on a hash.
+ * Reactive Redis operations for Hash Commands.
+ *
+ * Streams of methods returning {@code Mono} or {@code Flux} are terminated with
+ * {@link org.springframework.dao.InvalidDataAccessApiUsageException} when
+ * {@link org.springframework.data.redis.serializer.RedisElementReader#read(ByteBuffer)} returns {@code null} for a
+ * particular element as Reactive Streams prohibit the usage of {@code null} values.
*
* @author Mark Paluch
* @author Christoph Strobl
diff --git a/src/main/java/org/springframework/data/redis/core/ReactiveHyperLogLogOperations.java b/src/main/java/org/springframework/data/redis/core/ReactiveHyperLogLogOperations.java
index 8dbe30ed92..d19da8fd66 100644
--- a/src/main/java/org/springframework/data/redis/core/ReactiveHyperLogLogOperations.java
+++ b/src/main/java/org/springframework/data/redis/core/ReactiveHyperLogLogOperations.java
@@ -18,7 +18,7 @@
import reactor.core.publisher.Mono;
/**
- * Redis cardinality specific operations working on a HyperLogLog multiset.
+ * Reactive Redis operations for working on a HyperLogLog multiset.
*
* @author Mark Paluch
* @since 2.0
diff --git a/src/main/java/org/springframework/data/redis/core/ReactiveListOperations.java b/src/main/java/org/springframework/data/redis/core/ReactiveListOperations.java
index e77f413a21..edf0360ff2 100644
--- a/src/main/java/org/springframework/data/redis/core/ReactiveListOperations.java
+++ b/src/main/java/org/springframework/data/redis/core/ReactiveListOperations.java
@@ -20,6 +20,7 @@
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
+import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Collection;
@@ -28,7 +29,12 @@
import org.springframework.util.Assert;
/**
- * Redis list specific operations.
+ * Reactive Redis operations for List Commands.
+ *
+ * Streams of methods returning {@code Mono} or {@code Flux} are terminated with
+ * {@link org.springframework.dao.InvalidDataAccessApiUsageException} when
+ * {@link org.springframework.data.redis.serializer.RedisElementReader#read(ByteBuffer)} returns {@code null} for a
+ * particular element as Reactive Streams prohibit the usage of {@code null} values.
*
* @author Mark Paluch
* @author Christoph Strobl
diff --git a/src/main/java/org/springframework/data/redis/core/ReactiveRedisOperations.java b/src/main/java/org/springframework/data/redis/core/ReactiveRedisOperations.java
index 19f02af13c..8424daf4aa 100644
--- a/src/main/java/org/springframework/data/redis/core/ReactiveRedisOperations.java
+++ b/src/main/java/org/springframework/data/redis/core/ReactiveRedisOperations.java
@@ -18,6 +18,7 @@
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
+import java.nio.ByteBuffer;
import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
@@ -41,6 +42,11 @@
/**
* Interface that specified a basic set of Redis operations, implemented by {@link ReactiveRedisTemplate}. Not often
* used but a useful option for extensibility and testability (as it can be easily mocked or stubbed).
+ *
+ * Streams of methods returning {@code Mono} or {@code Flux} are terminated with
+ * {@link org.springframework.dao.InvalidDataAccessApiUsageException} when
+ * {@link org.springframework.data.redis.serializer.RedisElementReader#read(ByteBuffer)} returns {@code null} for a
+ * particular element as Reactive Streams prohibit the usage of {@code null} values.
*
* @author Mark Paluch
* @author Christoph Strobl
diff --git a/src/main/java/org/springframework/data/redis/core/ReactiveRedisTemplate.java b/src/main/java/org/springframework/data/redis/core/ReactiveRedisTemplate.java
index da4c76825a..6745a47a5c 100644
--- a/src/main/java/org/springframework/data/redis/core/ReactiveRedisTemplate.java
+++ b/src/main/java/org/springframework/data/redis/core/ReactiveRedisTemplate.java
@@ -27,7 +27,7 @@
import java.util.stream.Collectors;
import org.reactivestreams.Publisher;
-
+import org.springframework.dao.InvalidDataAccessApiUsageException;
import org.springframework.data.redis.connection.DataType;
import org.springframework.data.redis.connection.ReactiveRedisConnection;
import org.springframework.data.redis.connection.ReactiveRedisConnection.CommandResponse;
@@ -55,6 +55,11 @@
*
* Note that while the template is generified, it is up to the serializers/deserializers to properly convert the given
* Objects to and from binary data.
+ *
+ * Streams of methods returning {@code Mono} or {@code Flux} are terminated with
+ * {@link org.springframework.dao.InvalidDataAccessApiUsageException} when
+ * {@link org.springframework.data.redis.serializer.RedisElementReader#read(ByteBuffer)} returns {@code null} for a
+ * particular element as Reactive Streams prohibit the usage of {@code null} values.
*
* @author Mark Paluch
* @author Christoph Strobl
@@ -331,7 +336,7 @@ public Flux keys(K pattern) {
return doCreateFlux(connection -> connection.keyCommands().keys(rawKey(pattern))) //
.flatMap(Flux::fromIterable) //
- .map(this::readKey);
+ .map(this::readRequiredKey);
}
@Override
@@ -340,12 +345,12 @@ public Flux scan(ScanOptions options) {
Assert.notNull(options, "ScanOptions must not be null");
return doCreateFlux(connection -> connection.keyCommands().scan(options)) //
- .map(this::readKey);
+ .map(this::readRequiredKey);
}
@Override
public Mono randomKey() {
- return doCreateMono(connection -> connection.keyCommands().randomKey()).map(this::readKey);
+ return doCreateMono(connection -> connection.keyCommands().randomKey()).map(this::readRequiredKey);
}
@Override
@@ -666,7 +671,19 @@ private ByteBuffer rawKey(K key) {
return getSerializationContext().getKeySerializationPair().getWriter().write(key);
}
+ @Nullable
private K readKey(ByteBuffer buffer) {
return getSerializationContext().getKeySerializationPair().getReader().read(buffer);
}
+
+ private K readRequiredKey(ByteBuffer buffer) {
+
+ K key = readKey(buffer);
+
+ if (key == null) {
+ throw new InvalidDataAccessApiUsageException("Deserialized key is null");
+ }
+
+ return key;
+ }
}
diff --git a/src/main/java/org/springframework/data/redis/core/ReactiveSetOperations.java b/src/main/java/org/springframework/data/redis/core/ReactiveSetOperations.java
index c8c34dd82d..f8d0feff61 100644
--- a/src/main/java/org/springframework/data/redis/core/ReactiveSetOperations.java
+++ b/src/main/java/org/springframework/data/redis/core/ReactiveSetOperations.java
@@ -18,11 +18,17 @@
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
+import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Map;
/**
- * Redis set specific operations.
+ * Reactive Redis operations for Set Commands.
+ *
+ * Streams of methods returning {@code Mono} or {@code Flux} are terminated with
+ * {@link org.springframework.dao.InvalidDataAccessApiUsageException} when
+ * {@link org.springframework.data.redis.serializer.RedisElementReader#read(ByteBuffer)} returns {@code null} for a
+ * particular element as Reactive Streams prohibit the usage of {@code null} values.
*
* @author Mark Paluch
* @author Christoph Strobl
diff --git a/src/main/java/org/springframework/data/redis/core/ReactiveStreamOperations.java b/src/main/java/org/springframework/data/redis/core/ReactiveStreamOperations.java
index 1002a660b5..9314bc2383 100644
--- a/src/main/java/org/springframework/data/redis/core/ReactiveStreamOperations.java
+++ b/src/main/java/org/springframework/data/redis/core/ReactiveStreamOperations.java
@@ -23,32 +23,20 @@
import java.util.Map;
import org.reactivestreams.Publisher;
-
import org.springframework.data.domain.Range;
import org.springframework.data.redis.connection.Limit;
import org.springframework.data.redis.connection.RedisStreamCommands.XClaimOptions;
-import org.springframework.data.redis.connection.stream.ByteBufferRecord;
-import org.springframework.data.redis.connection.stream.Consumer;
-import org.springframework.data.redis.connection.stream.MapRecord;
-import org.springframework.data.redis.connection.stream.ObjectRecord;
-import org.springframework.data.redis.connection.stream.PendingMessage;
-import org.springframework.data.redis.connection.stream.PendingMessages;
-import org.springframework.data.redis.connection.stream.PendingMessagesSummary;
-import org.springframework.data.redis.connection.stream.ReadOffset;
+import org.springframework.data.redis.connection.stream.*;
import org.springframework.data.redis.connection.stream.Record;
-import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.connection.stream.StreamInfo.XInfoConsumer;
import org.springframework.data.redis.connection.stream.StreamInfo.XInfoGroup;
import org.springframework.data.redis.connection.stream.StreamInfo.XInfoStream;
-import org.springframework.data.redis.connection.stream.StreamOffset;
-import org.springframework.data.redis.connection.stream.StreamReadOptions;
-import org.springframework.data.redis.connection.stream.StreamRecords;
import org.springframework.data.redis.hash.HashMapper;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
/**
- * Redis stream specific operations.
+ * Reactive Redis operations for Stream Commands.
*
* @author Mark Paluch
* @author Christoph Strobl
diff --git a/src/main/java/org/springframework/data/redis/core/ReactiveValueOperations.java b/src/main/java/org/springframework/data/redis/core/ReactiveValueOperations.java
index 639c121f42..be8a0c0fbe 100644
--- a/src/main/java/org/springframework/data/redis/core/ReactiveValueOperations.java
+++ b/src/main/java/org/springframework/data/redis/core/ReactiveValueOperations.java
@@ -17,6 +17,7 @@
import reactor.core.publisher.Mono;
+import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
@@ -26,6 +27,11 @@
/**
* Reactive Redis operations for simple (or in Redis terminology 'string') values.
+ *
+ * Streams of methods returning {@code Mono} or {@code Flux} are terminated with
+ * {@link org.springframework.dao.InvalidDataAccessApiUsageException} when
+ * {@link org.springframework.data.redis.serializer.RedisElementReader#read(ByteBuffer)} returns {@code null} for a
+ * particular element as Reactive Streams prohibit the usage of {@code null} values.
*
* @author Mark Paluch
* @author Jiahe Cai
diff --git a/src/main/java/org/springframework/data/redis/core/ReactiveZSetOperations.java b/src/main/java/org/springframework/data/redis/core/ReactiveZSetOperations.java
index ecbffc9eab..1416b92c7e 100644
--- a/src/main/java/org/springframework/data/redis/core/ReactiveZSetOperations.java
+++ b/src/main/java/org/springframework/data/redis/core/ReactiveZSetOperations.java
@@ -18,6 +18,7 @@
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
+import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
@@ -32,7 +33,12 @@
import org.springframework.lang.Nullable;
/**
- * Redis ZSet/sorted set specific operations.
+ * Reactive Redis operations for Sorted (ZSet) Commands.
+ *
+ * Streams of methods returning {@code Mono} or {@code Flux} are terminated with
+ * {@link org.springframework.dao.InvalidDataAccessApiUsageException} when
+ * {@link org.springframework.data.redis.serializer.RedisElementReader#read(ByteBuffer)} returns {@code null} for a
+ * particular element as Reactive Streams prohibit the usage of {@code null} values.
*
* @author Mark Paluch
* @author Christoph Strobl
diff --git a/src/main/java/org/springframework/data/redis/core/script/DefaultReactiveScriptExecutor.java b/src/main/java/org/springframework/data/redis/core/script/DefaultReactiveScriptExecutor.java
index c3eabf4ac2..b85c13047a 100644
--- a/src/main/java/org/springframework/data/redis/core/script/DefaultReactiveScriptExecutor.java
+++ b/src/main/java/org/springframework/data/redis/core/script/DefaultReactiveScriptExecutor.java
@@ -22,6 +22,7 @@
import java.util.List;
import java.util.stream.Stream;
+import org.springframework.dao.InvalidDataAccessApiUsageException;
import org.springframework.data.redis.RedisSystemException;
import org.springframework.data.redis.connection.ReactiveRedisConnection;
import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
@@ -79,7 +80,6 @@ public Flux execute(RedisScript script, List keys, List> args) {
}
@Override
- @SuppressWarnings("unchecked")
public Flux execute(RedisScript script, List keys, List> args, RedisElementWriter> argsWriter,
RedisElementReader resultReader) {
@@ -134,7 +134,16 @@ protected ByteBuffer scriptBytes(RedisScript> script) {
}
protected Flux deserializeResult(RedisElementReader reader, Flux result) {
- return result.map(it -> ScriptUtils.deserializeResult(reader, it));
+ return result.map(it -> {
+
+ T value = ScriptUtils.deserializeResult(reader, it);
+
+ if (value == null) {
+ throw new InvalidDataAccessApiUsageException("Deserialized script result is null");
+ }
+
+ return value;
+ });
}
protected SerializationPair keySerializer() {
diff --git a/src/main/java/org/springframework/data/redis/core/script/ReactiveScriptExecutor.java b/src/main/java/org/springframework/data/redis/core/script/ReactiveScriptExecutor.java
index f61a6cd735..20f8503fbc 100644
--- a/src/main/java/org/springframework/data/redis/core/script/ReactiveScriptExecutor.java
+++ b/src/main/java/org/springframework/data/redis/core/script/ReactiveScriptExecutor.java
@@ -17,6 +17,7 @@
import reactor.core.publisher.Flux;
+import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.List;
@@ -26,6 +27,11 @@
/**
* Executes {@link RedisScript}s using reactive infrastructure.
+ *
+ * Streams of methods returning {@code Mono} or {@code Flux} are terminated with
+ * {@link org.springframework.dao.InvalidDataAccessApiUsageException} when
+ * {@link org.springframework.data.redis.serializer.RedisElementReader#read(ByteBuffer)} returns {@code null} for a
+ * particular element as Reactive Streams prohibit the usage of {@code null} values.
*
* @author Mark Paluch
* @author Christoph Strobl
diff --git a/src/main/java/org/springframework/data/redis/core/script/ScriptUtils.java b/src/main/java/org/springframework/data/redis/core/script/ScriptUtils.java
index f7fead21a8..8e89cc9190 100644
--- a/src/main/java/org/springframework/data/redis/core/script/ScriptUtils.java
+++ b/src/main/java/org/springframework/data/redis/core/script/ScriptUtils.java
@@ -22,6 +22,7 @@
import org.springframework.dao.NonTransientDataAccessException;
import org.springframework.data.redis.serializer.RedisElementReader;
import org.springframework.data.redis.serializer.RedisSerializer;
+import org.springframework.lang.Nullable;
/**
* Utilities for Lua script execution and result deserialization.
@@ -71,6 +72,7 @@ static T deserializeResult(RedisSerializer resultSerializer, Object resul
* @param result must not be {@literal null}.
* @return the deserialized result.
*/
+ @Nullable
@SuppressWarnings({ "unchecked" })
static T deserializeResult(RedisElementReader reader, Object result) {
diff --git a/src/main/java/org/springframework/data/redis/serializer/RedisSerializationContext.java b/src/main/java/org/springframework/data/redis/serializer/RedisSerializationContext.java
index b0b75dfc37..23f54b5c35 100644
--- a/src/main/java/org/springframework/data/redis/serializer/RedisSerializationContext.java
+++ b/src/main/java/org/springframework/data/redis/serializer/RedisSerializationContext.java
@@ -17,6 +17,7 @@
import java.nio.ByteBuffer;
+import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
/**
@@ -269,8 +270,9 @@ static SerializationPair byteBuffer() {
* Deserialize a {@link ByteBuffer} into the according type.
*
* @param buffer must not be {@literal null}.
- * @return the deserialized value.
+ * @return the deserialized value. Can be {@literal null}.
*/
+ @Nullable
default T read(ByteBuffer buffer) {
return getReader().read(buffer);
}
diff --git a/src/test/java/org/springframework/data/redis/core/ReactiveStringRedisTemplateIntegrationTests.java b/src/test/java/org/springframework/data/redis/core/ReactiveStringRedisTemplateIntegrationTests.java
index 3257300576..0b3c8db611 100644
--- a/src/test/java/org/springframework/data/redis/core/ReactiveStringRedisTemplateIntegrationTests.java
+++ b/src/test/java/org/springframework/data/redis/core/ReactiveStringRedisTemplateIntegrationTests.java
@@ -20,9 +20,13 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
-
+import org.springframework.dao.InvalidDataAccessApiUsageException;
import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
import org.springframework.data.redis.connection.lettuce.extension.LettuceConnectionFactoryExtension;
+import org.springframework.data.redis.serializer.RedisElementReader;
+import org.springframework.data.redis.serializer.RedisElementWriter;
+import org.springframework.data.redis.serializer.RedisSerializationContext;
+import org.springframework.data.redis.serializer.StringRedisSerializer;
/**
* Integration tests for {@link ReactiveStringRedisTemplate}.
@@ -55,4 +59,30 @@ void shouldSetAndGetKeys() {
template.opsForValue().set("key", "value").as(StepVerifier::create).expectNext(true).verifyComplete();
template.opsForValue().get("key").as(StepVerifier::create).expectNext("value").verifyComplete();
}
+
+ @Test // GH-2655
+ void keysFailsOnNullElements() {
+
+ template.opsForValue().set("a", "1").as(StepVerifier::create).expectNext(true).verifyComplete();
+ template.opsForValue().set("b", "1").as(StepVerifier::create).expectNext(true).verifyComplete();
+
+ RedisElementWriter writer = RedisElementWriter.from(StringRedisSerializer.UTF_8);
+ RedisElementReader reader = RedisElementReader.from(StringRedisSerializer.UTF_8);
+ RedisSerializationContext nullReadingContext = RedisSerializationContext
+ . newSerializationContext(StringRedisSerializer.UTF_8).key(buffer -> {
+
+ String read = reader.read(buffer);
+ if ("a".equals(read)) {
+ return null;
+ }
+
+ return read;
+ }, writer).build();
+
+ ReactiveRedisTemplate customTemplate = new ReactiveRedisTemplate<>(template.getConnectionFactory(),
+ nullReadingContext);
+
+ customTemplate.keys("b").as(StepVerifier::create).expectNext("b").verifyComplete();
+ customTemplate.keys("a").as(StepVerifier::create).verifyError(InvalidDataAccessApiUsageException.class);
+ }
}