diff --git a/pom.xml b/pom.xml
index c3b30527e5..1c8f1f3c8b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -5,7 +5,7 @@
org.springframework.data
spring-data-redis
- 3.2.0-SNAPSHOT
+ 3.2.0-GH-2743-SNAPSHOT
Spring Data Redis
Spring Data module for Redis
diff --git a/src/main/java/org/springframework/data/redis/cache/DefaultRedisCacheWriter.java b/src/main/java/org/springframework/data/redis/cache/DefaultRedisCacheWriter.java
index d80fd6335f..4e16712655 100644
--- a/src/main/java/org/springframework/data/redis/cache/DefaultRedisCacheWriter.java
+++ b/src/main/java/org/springframework/data/redis/cache/DefaultRedisCacheWriter.java
@@ -17,6 +17,7 @@
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
+import reactor.core.publisher.SignalType;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
@@ -24,6 +25,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
import java.util.function.Function;
import org.springframework.dao.PessimisticLockingFailureException;
@@ -38,7 +40,7 @@
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
-import org.springframework.util.ObjectUtils;
+
/**
* {@link RedisCacheWriter} implementation capable of reading/writing binary data from/to Redis in {@literal standalone}
@@ -47,11 +49,11 @@
*
* {@link DefaultRedisCacheWriter} can be used in
* {@link RedisCacheWriter#lockingRedisCacheWriter(RedisConnectionFactory) locking} or
- * {@link RedisCacheWriter#nonLockingRedisCacheWriter(RedisConnectionFactory) non-locking} mode. While
- * {@literal non-locking} aims for maximum performance it may result in overlapping, non-atomic, command execution for
- * operations spanning multiple Redis interactions like {@code putIfAbsent}. The {@literal locking} counterpart prevents
- * command overlap by setting an explicit lock key and checking against presence of this key which leads to additional
- * requests and potential command wait times.
+ * {@link RedisCacheWriter#nonLockingRedisCacheWriter(RedisConnectionFactory) non-locking} mode. While {@literal non-locking}
+ * aims for maximum performance it may result in overlapping, non-atomic, command execution for operations spanning
+ * multiple Redis interactions like {@code putIfAbsent}. The {@literal locking} counterpart prevents command overlap
+ * by setting an explicit lock key and checking against presence of this key which leads to additional requests
+ * and potential command wait times.
*
* @author Christoph Strobl
* @author Mark Paluch
@@ -61,8 +63,12 @@
*/
class DefaultRedisCacheWriter implements RedisCacheWriter {
- private static final boolean REACTIVE_REDIS_CONNECTION_FACTORY_PRESENT = ClassUtils
- .isPresent("org.springframework.data.redis.connection.ReactiveRedisConnectionFactory", null);
+ public static final boolean FLUX_PRESENT = ClassUtils.isPresent("reactor.core.publisher.Flux", null);
+
+ private static final boolean REACTIVE_REDIS_CONNECTION_FACTORY_PRESENT =
+ ClassUtils.isPresent("org.springframework.data.redis.connection.ReactiveRedisConnectionFactory", null);
+
+ private final AsyncCacheWriter asyncCacheWriter;
private final BatchStrategy batchStrategy;
@@ -74,8 +80,6 @@ class DefaultRedisCacheWriter implements RedisCacheWriter {
private final TtlFunction lockTtl;
- private final AsyncCacheWriter asyncCacheWriter;
-
/**
* @param connectionFactory must not be {@literal null}.
* @param batchStrategy must not be {@literal null}.
@@ -86,8 +90,8 @@ class DefaultRedisCacheWriter implements RedisCacheWriter {
/**
* @param connectionFactory must not be {@literal null}.
- * @param sleepTime sleep time between lock request attempts. Must not be {@literal null}. Use {@link Duration#ZERO}
- * to disable locking.
+ * @param sleepTime sleep time between lock request attempts. Must not be {@literal null}.
+ * Use {@link Duration#ZERO} to disable locking.
* @param batchStrategy must not be {@literal null}.
*/
DefaultRedisCacheWriter(RedisConnectionFactory connectionFactory, Duration sleepTime, BatchStrategy batchStrategy) {
@@ -96,8 +100,8 @@ class DefaultRedisCacheWriter implements RedisCacheWriter {
/**
* @param connectionFactory must not be {@literal null}.
- * @param sleepTime sleep time between lock request attempts. Must not be {@literal null}. Use {@link Duration#ZERO}
- * to disable locking.
+ * @param sleepTime sleep time between lock request attempts. Must not be {@literal null}.
+ * Use {@link Duration#ZERO} to disable locking.
* @param lockTtl Lock TTL function must not be {@literal null}.
* @param cacheStatisticsCollector must not be {@literal null}.
* @param batchStrategy must not be {@literal null}.
@@ -116,12 +120,13 @@ class DefaultRedisCacheWriter implements RedisCacheWriter {
this.lockTtl = lockTtl;
this.statistics = cacheStatisticsCollector;
this.batchStrategy = batchStrategy;
+ this.asyncCacheWriter = isAsyncCacheSupportEnabled() ? new AsynchronousCacheWriterDelegate()
+ : UnsupportedAsyncCacheWriter.INSTANCE;
+ }
- if (REACTIVE_REDIS_CONNECTION_FACTORY_PRESENT && this.connectionFactory instanceof ReactiveRedisConnectionFactory) {
- asyncCacheWriter = new AsynchronousCacheWriterDelegate();
- } else {
- asyncCacheWriter = UnsupportedAsyncCacheWriter.INSTANCE;
- }
+ private boolean isAsyncCacheSupportEnabled() {
+ return REACTIVE_REDIS_CONNECTION_FACTORY_PRESENT && FLUX_PRESENT
+ && this.connectionFactory instanceof ReactiveRedisConnectionFactory;
}
@Override
@@ -168,7 +173,8 @@ public CompletableFuture retrieve(String name, byte[] key, @Nullable Dur
if (cachedValue != null) {
statistics.incHits(name);
- } else {
+ }
+ else {
statistics.incMisses(name);
}
@@ -186,8 +192,7 @@ public void put(String name, byte[] key, byte[] value, @Nullable Duration ttl) {
execute(name, connection -> {
if (shouldExpireWithin(ttl)) {
- connection.stringCommands().set(key, value, Expiration.from(ttl.toMillis(), TimeUnit.MILLISECONDS),
- SetOption.upsert());
+ connection.stringCommands().set(key, value, toExpiration(ttl), SetOption.upsert());
} else {
connection.stringCommands().set(key, value);
}
@@ -224,16 +229,11 @@ public byte[] putIfAbsent(String name, byte[] key, byte[] value, @Nullable Durat
try {
- boolean put;
-
- if (shouldExpireWithin(ttl)) {
- put = ObjectUtils.nullSafeEquals(
- connection.stringCommands().set(key, value, Expiration.from(ttl), SetOption.ifAbsent()), true);
- } else {
- put = ObjectUtils.nullSafeEquals(connection.stringCommands().setNX(key, value), true);
- }
+ Boolean wasSet = shouldExpireWithin(ttl)
+ ? connection.stringCommands().set(key, value, Expiration.from(ttl), SetOption.ifAbsent())
+ : connection.stringCommands().setNX(key, value);
- if (put) {
+ if (Boolean.TRUE.equals(wasSet)) {
statistics.incPuts(name);
return null;
}
@@ -322,9 +322,11 @@ void lock(String name) {
private Boolean doLock(String name, Object contextualKey, @Nullable Object contextualValue,
RedisConnection connection) {
- Expiration expiration = Expiration.from(this.lockTtl.getTimeToLive(contextualKey, contextualValue));
+ byte[] cacheLockKey = createCacheLockKey(name);
- return connection.stringCommands().set(createCacheLockKey(name), new byte[0], expiration, SetOption.SET_IF_ABSENT);
+ Expiration expiration = toExpiration(contextualKey, contextualValue);
+
+ return connection.stringCommands().set(cacheLockKey, new byte[0], expiration, SetOption.SET_IF_ABSENT);
}
/**
@@ -378,29 +380,40 @@ private void checkAndPotentiallyWaitUntilUnlocked(String name, RedisConnection c
Thread.sleep(this.sleepTime.toMillis());
}
} catch (InterruptedException cause) {
-
// Re-interrupt current Thread to allow other participants to react.
Thread.currentThread().interrupt();
-
- throw new PessimisticLockingFailureException(String.format("Interrupted while waiting to unlock cache %s", name),
- cause);
+ String message = "Interrupted while waiting to unlock cache %s".formatted(name);
+ throw new PessimisticLockingFailureException(message, cause);
} finally {
this.statistics.incLockTime(name, System.nanoTime() - lockWaitTimeNs);
}
}
boolean doCheckLock(String name, RedisConnection connection) {
- return ObjectUtils.nullSafeEquals(connection.keyCommands().exists(createCacheLockKey(name)), true);
+ Boolean cacheLockExists = connection.keyCommands().exists(createCacheLockKey(name));
+ return Boolean.TRUE.equals(cacheLockExists);
}
byte[] createCacheLockKey(String name) {
return (name + "~lock").getBytes(StandardCharsets.UTF_8);
}
+ private ReactiveRedisConnectionFactory getReactiveConnectionFactory() {
+ return (ReactiveRedisConnectionFactory) this.connectionFactory;
+ }
+
private static boolean shouldExpireWithin(@Nullable Duration ttl) {
return ttl != null && !ttl.isZero() && !ttl.isNegative();
}
+ private Expiration toExpiration(Duration ttl) {
+ return Expiration.from(ttl.toMillis(), TimeUnit.MILLISECONDS);
+ }
+
+ private Expiration toExpiration(Object key, @Nullable Object value) {
+ return Expiration.from(this.lockTtl.getTimeToLive(key, value));
+ }
+
/**
* Interface for asynchronous cache retrieval.
*
@@ -419,8 +432,8 @@ interface AsyncCacheWriter {
* @param name the cache name from which to retrieve the cache entry.
* @param key the cache entry key.
* @param ttl optional TTL to set for Time-to-Idle eviction.
- * @return a future that completes either with a value if the value exists or completing with {@code null} if the
- * cache does not contain an entry.
+ * @return a future that completes either with a value if the value exists or completing with {@code null}
+ * if the cache does not contain an entry.
*/
CompletableFuture retrieve(String name, byte[] key, @Nullable Duration ttl);
@@ -463,8 +476,8 @@ public CompletableFuture store(String name, byte[] key, byte[] value, @Nul
}
/**
- * Delegate implementing {@link AsyncCacheWriter} to provide asynchronous cache retrieval and storage operations using
- * {@link ReactiveRedisConnectionFactory}.
+ * Delegate implementing {@link AsyncCacheWriter} to provide asynchronous cache retrieval and storage operations
+ * using {@link ReactiveRedisConnectionFactory}.
*
* @since 3.2
*/
@@ -481,11 +494,13 @@ public CompletableFuture retrieve(String name, byte[] key, @Nullable Dur
return doWithConnection(connection -> {
ByteBuffer wrappedKey = ByteBuffer.wrap(key);
+
Mono> cacheLockCheck = isLockingCacheWriter() ? waitForLock(connection, name) : Mono.empty();
+
ReactiveStringCommands stringCommands = connection.stringCommands();
Mono get = shouldExpireWithin(ttl)
- ? stringCommands.getEx(wrappedKey, Expiration.from(ttl))
+ ? stringCommands.getEx(wrappedKey, toExpiration(ttl))
: stringCommands.get(wrappedKey);
return cacheLockCheck.then(get).map(ByteUtils::getBytes).toFuture();
@@ -498,41 +513,44 @@ public CompletableFuture store(String name, byte[] key, byte[] value, @Nul
return doWithConnection(connection -> {
Mono> mono = isLockingCacheWriter()
- ? doStoreWithLocking(name, key, value, ttl, connection)
+ ? doLockStoreUnlock(name, key, value, ttl, connection)
: doStore(key, value, ttl, connection);
return mono.then().toFuture();
});
}
- private Mono doStoreWithLocking(String name, byte[] key, byte[] value, @Nullable Duration ttl,
- ReactiveRedisConnection connection) {
-
- return Mono.usingWhen(doLock(name, key, value, connection), unused -> doStore(key, value, ttl, connection),
- unused -> doUnlock(name, connection));
- }
-
private Mono doStore(byte[] cacheKey, byte[] value, @Nullable Duration ttl,
ReactiveRedisConnection connection) {
ByteBuffer wrappedKey = ByteBuffer.wrap(cacheKey);
ByteBuffer wrappedValue = ByteBuffer.wrap(value);
- if (shouldExpireWithin(ttl)) {
- return connection.stringCommands().set(wrappedKey, wrappedValue,
- Expiration.from(ttl.toMillis(), TimeUnit.MILLISECONDS), SetOption.upsert());
- } else {
- return connection.stringCommands().set(wrappedKey, wrappedValue);
- }
+ ReactiveStringCommands stringCommands = connection.stringCommands();
+
+ return shouldExpireWithin(ttl)
+ ? stringCommands.set(wrappedKey, wrappedValue, toExpiration(ttl), SetOption.upsert())
+ : stringCommands.set(wrappedKey, wrappedValue);
}
+ private Mono doLockStoreUnlock(String name, byte[] key, byte[] value, @Nullable Duration ttl,
+ ReactiveRedisConnection connection) {
+
+ Mono