diff --git a/src/main/java/org/springframework/data/redis/cache/DefaultRedisCacheWriter.java b/src/main/java/org/springframework/data/redis/cache/DefaultRedisCacheWriter.java
index 37195ef05d..0137cfc9b0 100644
--- a/src/main/java/org/springframework/data/redis/cache/DefaultRedisCacheWriter.java
+++ b/src/main/java/org/springframework/data/redis/cache/DefaultRedisCacheWriter.java
@@ -15,20 +15,19 @@
*/
package org.springframework.data.redis.cache;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
import java.util.function.Function;
import org.springframework.dao.PessimisticLockingFailureException;
import org.springframework.data.redis.connection.ReactiveRedisConnection;
import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
+import org.springframework.data.redis.connection.ReactiveStringCommands;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.RedisStringCommands.SetOption;
@@ -39,6 +38,10 @@
import org.springframework.util.ClassUtils;
import org.springframework.util.ObjectUtils;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.core.publisher.SignalType;
+
/**
* {@link RedisCacheWriter} implementation capable of reading/writing binary data from/to Redis in {@literal standalone}
* and {@literal cluster} environments, and uses a given {@link RedisConnectionFactory} to obtain the actual
@@ -46,11 +49,11 @@
*
* {@link DefaultRedisCacheWriter} can be used in
* {@link RedisCacheWriter#lockingRedisCacheWriter(RedisConnectionFactory) locking} or
- * {@link RedisCacheWriter#nonLockingRedisCacheWriter(RedisConnectionFactory) non-locking} mode. While
- * {@literal non-locking} aims for maximum performance it may result in overlapping, non-atomic, command execution for
- * operations spanning multiple Redis interactions like {@code putIfAbsent}. The {@literal locking} counterpart prevents
- * command overlap by setting an explicit lock key and checking against presence of this key which leads to additional
- * requests and potential command wait times.
+ * {@link RedisCacheWriter#nonLockingRedisCacheWriter(RedisConnectionFactory) non-locking} mode. While {@literal non-locking}
+ * aims for maximum performance it may result in overlapping, non-atomic, command execution for operations spanning
+ * multiple Redis interactions like {@code putIfAbsent}. The {@literal locking} counterpart prevents command overlap
+ * by setting an explicit lock key and checking against presence of this key which leads to additional requests
+ * and potential command wait times.
*
* @author Christoph Strobl
* @author Mark Paluch
@@ -60,8 +63,8 @@
*/
class DefaultRedisCacheWriter implements RedisCacheWriter {
- private static final boolean REACTIVE_REDIS_CONNECTION_FACTORY_PRESENT = ClassUtils
- .isPresent("org.springframework.data.redis.connection.ReactiveRedisConnectionFactory", null);
+ private static final boolean REACTIVE_REDIS_CONNECTION_FACTORY_PRESENT =
+ ClassUtils.isPresent("org.springframework.data.redis.connection.ReactiveRedisConnectionFactory", null);
private final BatchStrategy batchStrategy;
@@ -75,31 +78,21 @@ class DefaultRedisCacheWriter implements RedisCacheWriter {
private final AsyncCacheWriter asyncCacheWriter;
- /**
- * @param connectionFactory must not be {@literal null}.
- * @param batchStrategy must not be {@literal null}.
- */
DefaultRedisCacheWriter(RedisConnectionFactory connectionFactory, BatchStrategy batchStrategy) {
this(connectionFactory, Duration.ZERO, batchStrategy);
}
/**
- * @param connectionFactory must not be {@literal null}.
- * @param sleepTime sleep time between lock request attempts. Must not be {@literal null}. Use {@link Duration#ZERO}
- * to disable locking.
- * @param batchStrategy must not be {@literal null}.
+ * @param sleepTime sleep time between lock request attempts. Must not be {@literal null}.
+ * Use {@link Duration#ZERO} to disable locking.
*/
DefaultRedisCacheWriter(RedisConnectionFactory connectionFactory, Duration sleepTime, BatchStrategy batchStrategy) {
this(connectionFactory, sleepTime, TtlFunction.persistent(), CacheStatisticsCollector.none(), batchStrategy);
}
/**
- * @param connectionFactory must not be {@literal null}.
- * @param sleepTime sleep time between lock request attempts. Must not be {@literal null}. Use {@link Duration#ZERO}
- * to disable locking.
- * @param lockTtl Lock TTL function must not be {@literal null}.
- * @param cacheStatisticsCollector must not be {@literal null}.
- * @param batchStrategy must not be {@literal null}.
+ * @param sleepTime sleep time between lock request attempts. Must not be {@literal null}.
+ * Use {@link Duration#ZERO} to disable locking.
*/
DefaultRedisCacheWriter(RedisConnectionFactory connectionFactory, Duration sleepTime, TtlFunction lockTtl,
CacheStatisticsCollector cacheStatisticsCollector, BatchStrategy batchStrategy) {
@@ -160,19 +153,19 @@ public CompletableFuture retrieve(String name, byte[] key, @Nullable Dur
Assert.notNull(name, "Name must not be null");
Assert.notNull(key, "Key must not be null");
- return asyncCacheWriter.retrieve(name, key, ttl) //
- .thenApply(cachedValue -> {
+ return asyncCacheWriter.retrieve(name, key, ttl).thenApply(cachedValue -> {
- statistics.incGets(name);
+ statistics.incGets(name);
- if (cachedValue != null) {
- statistics.incHits(name);
- } else {
- statistics.incMisses(name);
- }
+ if (cachedValue != null) {
+ statistics.incHits(name);
+ }
+ else {
+ statistics.incMisses(name);
+ }
- return cachedValue;
- });
+ return cachedValue;
+ });
}
@Override
@@ -185,8 +178,8 @@ public void put(String name, byte[] key, byte[] value, @Nullable Duration ttl) {
execute(name, connection -> {
if (shouldExpireWithin(ttl)) {
- connection.stringCommands().set(key, value, Expiration.from(ttl.toMillis(), TimeUnit.MILLISECONDS),
- SetOption.upsert());
+ connection.stringCommands()
+ .set(key, value, Expiration.from(ttl.toMillis(), TimeUnit.MILLISECONDS), SetOption.upsert());
} else {
connection.stringCommands().set(key, value);
}
@@ -204,8 +197,7 @@ public CompletableFuture store(String name, byte[] key, byte[] value, @Nul
Assert.notNull(key, "Key must not be null");
Assert.notNull(value, "Value must not be null");
- return asyncCacheWriter.store(name, key, value, ttl) //
- .thenRun(() -> statistics.incPuts(name));
+ return asyncCacheWriter.store(name, key, value, ttl).thenRun(() -> statistics.incPuts(name));
}
@Override
@@ -226,8 +218,8 @@ public byte[] putIfAbsent(String name, byte[] key, byte[] value, @Nullable Durat
boolean put;
if (shouldExpireWithin(ttl)) {
- put = ObjectUtils.nullSafeEquals(
- connection.stringCommands().set(key, value, Expiration.from(ttl), SetOption.ifAbsent()), true);
+ put = ObjectUtils.nullSafeEquals(connection.stringCommands()
+ .set(key, value, Expiration.from(ttl), SetOption.ifAbsent()), true);
} else {
put = ObjectUtils.nullSafeEquals(connection.stringCommands().setNX(key, value), true);
}
@@ -377,12 +369,10 @@ private void checkAndPotentiallyWaitUntilUnlocked(String name, RedisConnection c
Thread.sleep(this.sleepTime.toMillis());
}
} catch (InterruptedException cause) {
-
// Re-interrupt current Thread to allow other participants to react.
Thread.currentThread().interrupt();
-
- throw new PessimisticLockingFailureException(String.format("Interrupted while waiting to unlock cache %s", name),
- cause);
+ String message = String.format("Interrupted while waiting to unlock cache %s", name);
+ throw new PessimisticLockingFailureException(message, cause);
} finally {
this.statistics.incLockTime(name, System.nanoTime() - lockWaitTimeNs);
}
@@ -418,8 +408,8 @@ interface AsyncCacheWriter {
* @param name the cache name from which to retrieve the cache entry.
* @param key the cache entry key.
* @param ttl optional TTL to set for Time-to-Idle eviction.
- * @return a future that completes either with a value if the value exists or completing with {@code null} if the
- * cache does not contain an entry.
+ * @return a future that completes either with a value if the value exists or completing with {@code null}
+ * if the cache does not contain an entry.
*/
CompletableFuture retrieve(String name, byte[] key, @Nullable Duration ttl);
@@ -433,6 +423,7 @@ interface AsyncCacheWriter {
* @return a future that signals completion.
*/
CompletableFuture store(String name, byte[] key, byte[] value, @Nullable Duration ttl);
+
}
/**
@@ -441,6 +432,7 @@ interface AsyncCacheWriter {
* @since 3.2
*/
enum UnsupportedAsyncCacheWriter implements AsyncCacheWriter {
+
INSTANCE;
@Override
@@ -460,8 +452,8 @@ public CompletableFuture store(String name, byte[] key, byte[] value, @Nul
}
/**
- * Delegate implementing {@link AsyncCacheWriter} to provide asynchronous cache retrieval and storage operations using
- * {@link ReactiveRedisConnectionFactory}.
+ * Delegate implementing {@link AsyncCacheWriter} to provide asynchronous cache retrieval and storage operations
+ * using {@link ReactiveRedisConnectionFactory}.
*
* @since 3.2
*/
@@ -478,19 +470,16 @@ public CompletableFuture retrieve(String name, byte[] key, @Nullable Dur
return doWithConnection(connection -> {
ByteBuffer wrappedKey = ByteBuffer.wrap(key);
- Mono> cacheLockCheckFlux;
- if (isLockingCacheWriter())
- cacheLockCheckFlux = waitForLock(connection, name);
- else {
- cacheLockCheckFlux = Mono.empty();
- }
+ Mono> cacheLockCheck = isLockingCacheWriter() ? waitForLock(connection, name) : Mono.empty();
+
+ ReactiveStringCommands stringCommands = connection.stringCommands();
Mono get = shouldExpireWithin(ttl)
- ? connection.stringCommands().getEx(wrappedKey, Expiration.from(ttl))
- : connection.stringCommands().get(wrappedKey);
+ ? stringCommands.getEx(wrappedKey, toExpiration(ttl))
+ : stringCommands.get(wrappedKey);
- return cacheLockCheckFlux.then(get).map(ByteUtils::getBytes).toFuture();
+ return cacheLockCheck.then(get).map(ByteUtils::getBytes).toFuture();
});
}
@@ -499,15 +488,9 @@ public CompletableFuture store(String name, byte[] key, byte[] value, @Nul
return doWithConnection(connection -> {
- Mono> mono;
-
- if (isLockingCacheWriter()) {
-
- mono = Mono.usingWhen(doLock(name, key, value, connection), unused -> doStore(key, value, ttl, connection),
- unused -> doUnlock(name, connection));
- } else {
- mono = doStore(key, value, ttl, connection);
- }
+ Mono> mono = isLockingCacheWriter()
+ ? doLockStoreUnlock(name, key, value, ttl, connection)
+ : doStore(key, value, ttl, connection);
return mono.then().toFuture();
});
@@ -519,24 +502,31 @@ private Mono doStore(byte[] cacheKey, byte[] value, @Nullable Duration
ByteBuffer wrappedKey = ByteBuffer.wrap(cacheKey);
ByteBuffer wrappedValue = ByteBuffer.wrap(value);
- if (shouldExpireWithin(ttl)) {
- return connection.stringCommands().set(wrappedKey, wrappedValue,
- Expiration.from(ttl.toMillis(), TimeUnit.MILLISECONDS), SetOption.upsert());
- } else {
- return connection.stringCommands().set(wrappedKey, wrappedValue);
- }
+ ReactiveStringCommands stringCommands = connection.stringCommands();
+
+ return shouldExpireWithin(ttl)
+ ? stringCommands.set(wrappedKey, wrappedValue, toExpiration(ttl), SetOption.upsert())
+ : stringCommands.set(wrappedKey, wrappedValue);
+ }
+
+ private Mono doLockStoreUnlock(String name, byte[] key, byte[] value, @Nullable Duration ttl,
+ ReactiveRedisConnection connection) {
+
+ return Mono.usingWhen(doLock(name, key, value, connection), unused -> doStore(key, value, ttl, connection),
+ unused -> doUnlock(name, connection));
}
private Mono