diff --git a/src/main/java/org/springframework/data/redis/aot/RedisRuntimeHints.java b/src/main/java/org/springframework/data/redis/aot/RedisRuntimeHints.java
index 127b0d76e7..639d1906ba 100644
--- a/src/main/java/org/springframework/data/redis/aot/RedisRuntimeHints.java
+++ b/src/main/java/org/springframework/data/redis/aot/RedisRuntimeHints.java
@@ -106,6 +106,7 @@ public void registerHints(RuntimeHints hints, @Nullable ClassLoader classLoader)
TypeReference.of(ReactiveClusterScriptingCommands.class),
TypeReference.of(ReactiveClusterGeoCommands.class),
TypeReference.of(ReactiveClusterHyperLogLogCommands.class), TypeReference.of(ReactiveRedisOperations.class),
+ TypeReference.of(ReactiveRedisConnectionFactory.class),
TypeReference.of(ReactiveRedisTemplate.class), TypeReference.of(RedisOperations.class),
TypeReference.of(RedisTemplate.class), TypeReference.of(StringRedisTemplate.class),
TypeReference.of(KeyspaceConfiguration.class), TypeReference.of(MappingConfiguration.class),
diff --git a/src/main/java/org/springframework/data/redis/cache/DefaultRedisCacheWriter.java b/src/main/java/org/springframework/data/redis/cache/DefaultRedisCacheWriter.java
index 101f4864b2..37195ef05d 100644
--- a/src/main/java/org/springframework/data/redis/cache/DefaultRedisCacheWriter.java
+++ b/src/main/java/org/springframework/data/redis/cache/DefaultRedisCacheWriter.java
@@ -15,14 +15,16 @@
*/
package org.springframework.data.redis.cache;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
-import java.util.function.BiFunction;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
-import java.util.function.Supplier;
import org.springframework.dao.PessimisticLockingFailureException;
import org.springframework.data.redis.connection.ReactiveRedisConnection;
@@ -34,9 +36,8 @@
import org.springframework.data.redis.util.ByteUtils;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
-
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
+import org.springframework.util.ClassUtils;
+import org.springframework.util.ObjectUtils;
/**
* {@link RedisCacheWriter} implementation capable of reading/writing binary data from/to Redis in {@literal standalone}
@@ -44,8 +45,8 @@
* {@link RedisConnection}.
*
* {@link DefaultRedisCacheWriter} can be used in
- * {@link RedisCacheWriter#lockingRedisCacheWriter(RedisConnectionFactory) locking}
- * or {@link RedisCacheWriter#nonLockingRedisCacheWriter(RedisConnectionFactory) non-locking} mode. While
+ * {@link RedisCacheWriter#lockingRedisCacheWriter(RedisConnectionFactory) locking} or
+ * {@link RedisCacheWriter#nonLockingRedisCacheWriter(RedisConnectionFactory) non-locking} mode. While
* {@literal non-locking} aims for maximum performance it may result in overlapping, non-atomic, command execution for
* operations spanning multiple Redis interactions like {@code putIfAbsent}. The {@literal locking} counterpart prevents
* command overlap by setting an explicit lock key and checking against presence of this key which leads to additional
@@ -59,6 +60,9 @@
*/
class DefaultRedisCacheWriter implements RedisCacheWriter {
+ private static final boolean REACTIVE_REDIS_CONNECTION_FACTORY_PRESENT = ClassUtils
+ .isPresent("org.springframework.data.redis.connection.ReactiveRedisConnectionFactory", null);
+
private final BatchStrategy batchStrategy;
private final CacheStatisticsCollector statistics;
@@ -69,6 +73,8 @@ class DefaultRedisCacheWriter implements RedisCacheWriter {
private final TtlFunction lockTtl;
+ private final AsyncCacheWriter asyncCacheWriter;
+
/**
* @param connectionFactory must not be {@literal null}.
* @param batchStrategy must not be {@literal null}.
@@ -109,6 +115,12 @@ class DefaultRedisCacheWriter implements RedisCacheWriter {
this.lockTtl = lockTtl;
this.statistics = cacheStatisticsCollector;
this.batchStrategy = batchStrategy;
+
+ if (REACTIVE_REDIS_CONNECTION_FACTORY_PRESENT && this.connectionFactory instanceof ReactiveRedisConnectionFactory) {
+ asyncCacheWriter = new AsynchronousCacheWriterDelegate();
+ } else {
+ asyncCacheWriter = UnsupportedAsyncCacheWriter.INSTANCE;
+ }
}
@Override
@@ -138,8 +150,8 @@ public byte[] get(String name, byte[] key, @Nullable Duration ttl) {
}
@Override
- public boolean isRetrieveSupported() {
- return isReactive();
+ public boolean supportsAsyncRetrieve() {
+ return asyncCacheWriter.isSupported();
}
@Override
@@ -148,68 +160,19 @@ public CompletableFuture retrieve(String name, byte[] key, @Nullable Dur
Assert.notNull(name, "Name must not be null");
Assert.notNull(key, "Key must not be null");
- CompletableFuture result = nonBlockingRetrieveFunction(name).apply(key, ttl);
-
- result = result.thenApply(cachedValue -> {
-
- statistics.incGets(name);
-
- if (cachedValue != null) {
- statistics.incHits(name);
- } else {
- statistics.incMisses(name);
- }
-
- return cachedValue;
- });
-
- return result;
- }
-
- private BiFunction> nonBlockingRetrieveFunction(String cacheName) {
- return isReactive() ? reactiveRetrieveFunction(cacheName) : asyncRetrieveFunction(cacheName);
- }
-
- // TODO: Possibly remove if we rely on the default Cache.retrieve(..) behavior
- // after assessing RedisCacheWriter.isRetrieveSupported().
- // Function applied for Cache.retrieve(key) when a non-reactive Redis driver is used, such as Jedis.
- private BiFunction> asyncRetrieveFunction(String cacheName) {
-
- return (key, ttl) -> {
-
- Supplier getKey = () -> execute(cacheName, connection -> connection.stringCommands().get(key));
-
- Supplier getKeyWithExpiration = () -> execute(cacheName, connection ->
- connection.stringCommands().getEx(key, Expiration.from(ttl)));
-
- return shouldExpireWithin(ttl)
- ? CompletableFuture.supplyAsync(getKeyWithExpiration)
- : CompletableFuture.supplyAsync(getKey);
-
- };
- }
-
- // Function applied for Cache.retrieve(key) when a reactive Redis driver is used, such as Lettuce.
- private BiFunction> reactiveRetrieveFunction(String cacheName) {
-
- return (key, ttl) -> {
-
- ByteBuffer wrappedKey = ByteBuffer.wrap(key);
+ return asyncCacheWriter.retrieve(name, key, ttl) //
+ .thenApply(cachedValue -> {
- Flux> cacheLockCheckFlux = Flux.interval(Duration.ZERO, this.sleepTime).takeUntil(count ->
- executeLockFree(connection -> !doCheckLock(cacheName, connection)));
+ statistics.incGets(name);
- Mono getMono = shouldExpireWithin(ttl)
- ? executeReactively(connection -> connection.stringCommands().getEx(wrappedKey, Expiration.from(ttl)))
- : executeReactively(connection -> connection.stringCommands().get(wrappedKey));
+ if (cachedValue != null) {
+ statistics.incHits(name);
+ } else {
+ statistics.incMisses(name);
+ }
- Mono result = cacheLockCheckFlux.then(getMono);
-
- @SuppressWarnings("all")
- Mono byteArrayResult = result.map(DefaultRedisCacheWriter::nullSafeGetBytes);
-
- return byteArrayResult.toFuture();
- };
+ return cachedValue;
+ });
}
@Override
@@ -222,8 +185,8 @@ public void put(String name, byte[] key, byte[] value, @Nullable Duration ttl) {
execute(name, connection -> {
if (shouldExpireWithin(ttl)) {
- connection.stringCommands()
- .set(key, value, Expiration.from(ttl.toMillis(), TimeUnit.MILLISECONDS), SetOption.upsert());
+ connection.stringCommands().set(key, value, Expiration.from(ttl.toMillis(), TimeUnit.MILLISECONDS),
+ SetOption.upsert());
} else {
connection.stringCommands().set(key, value);
}
@@ -234,6 +197,17 @@ public void put(String name, byte[] key, byte[] value, @Nullable Duration ttl) {
statistics.incPuts(name);
}
+ @Override
+ public CompletableFuture store(String name, byte[] key, byte[] value, @Nullable Duration ttl) {
+
+ Assert.notNull(name, "Name must not be null");
+ Assert.notNull(key, "Key must not be null");
+ Assert.notNull(value, "Value must not be null");
+
+ return asyncCacheWriter.store(name, key, value, ttl) //
+ .thenRun(() -> statistics.incPuts(name));
+ }
+
@Override
public byte[] putIfAbsent(String name, byte[] key, byte[] value, @Nullable Duration ttl) {
@@ -252,9 +226,10 @@ public byte[] putIfAbsent(String name, byte[] key, byte[] value, @Nullable Durat
boolean put;
if (shouldExpireWithin(ttl)) {
- put = isTrue(connection.stringCommands().set(key, value, Expiration.from(ttl), SetOption.ifAbsent()));
+ put = ObjectUtils.nullSafeEquals(
+ connection.stringCommands().set(key, value, Expiration.from(ttl), SetOption.ifAbsent()), true);
} else {
- put = isTrue(connection.stringCommands().setNX(key, value));
+ put = ObjectUtils.nullSafeEquals(connection.stringCommands().setNX(key, value), true);
}
if (put) {
@@ -348,8 +323,7 @@ private Boolean doLock(String name, Object contextualKey, @Nullable Object conte
Expiration expiration = Expiration.from(this.lockTtl.getTimeToLive(contextualKey, contextualValue));
- return connection.stringCommands()
- .set(createCacheLockKey(name), new byte[0], expiration, SetOption.SET_IF_ABSENT);
+ return connection.stringCommands().set(createCacheLockKey(name), new byte[0], expiration, SetOption.SET_IF_ABSENT);
}
/**
@@ -381,18 +355,6 @@ private T executeLockFree(Function callback) {
}
}
- private T executeReactively(Function callback) {
-
- ReactiveRedisConnection connection = getReactiveRedisConnectionFactory().getReactiveConnection();
-
- try {
- return callback.apply(connection);
- }
- finally {
- connection.closeLater();
- }
- }
-
/**
* Determines whether this {@link RedisCacheWriter} uses locks during caching operations.
*
@@ -419,40 +381,192 @@ private void checkAndPotentiallyWaitUntilUnlocked(String name, RedisConnection c
// Re-interrupt current Thread to allow other participants to react.
Thread.currentThread().interrupt();
- String message = String.format("Interrupted while waiting to unlock cache %s", name);
-
- throw new PessimisticLockingFailureException(message, cause);
+ throw new PessimisticLockingFailureException(String.format("Interrupted while waiting to unlock cache %s", name),
+ cause);
} finally {
this.statistics.incLockTime(name, System.nanoTime() - lockWaitTimeNs);
}
}
boolean doCheckLock(String name, RedisConnection connection) {
- return isTrue(connection.keyCommands().exists(createCacheLockKey(name)));
+ return ObjectUtils.nullSafeEquals(connection.keyCommands().exists(createCacheLockKey(name)), true);
}
- private boolean isReactive() {
- return this.connectionFactory instanceof ReactiveRedisConnectionFactory;
+ byte[] createCacheLockKey(String name) {
+ return (name + "~lock").getBytes(StandardCharsets.UTF_8);
}
- private ReactiveRedisConnectionFactory getReactiveRedisConnectionFactory() {
- return (ReactiveRedisConnectionFactory) this.connectionFactory;
+ private static boolean shouldExpireWithin(@Nullable Duration ttl) {
+ return ttl != null && !ttl.isZero() && !ttl.isNegative();
}
- private static byte[] createCacheLockKey(String name) {
- return (name + "~lock").getBytes(StandardCharsets.UTF_8);
+ /**
+ * Interface for asynchronous cache retrieval.
+ *
+ * @since 3.2
+ */
+ interface AsyncCacheWriter {
+
+ /**
+ * @return {@code true} if async cache operations are supported; {@code false} otherwise.
+ */
+ boolean isSupported();
+
+ /**
+ * Retrieve a cache entry asynchronously.
+ *
+ * @param name the cache name from which to retrieve the cache entry.
+ * @param key the cache entry key.
+ * @param ttl optional TTL to set for Time-to-Idle eviction.
+ * @return a future that completes either with a value if the value exists or completing with {@code null} if the
+ * cache does not contain an entry.
+ */
+ CompletableFuture retrieve(String name, byte[] key, @Nullable Duration ttl);
+
+ /**
+ * Store a cache entry asynchronously.
+ *
+ * @param name the cache name which to store the cache entry to.
+ * @param key the key for the cache entry. Must not be {@literal null}.
+ * @param value the value stored for the key. Must not be {@literal null}.
+ * @param ttl optional expiration time. Can be {@literal null}.
+ * @return a future that signals completion.
+ */
+ CompletableFuture store(String name, byte[] key, byte[] value, @Nullable Duration ttl);
}
- private static boolean isTrue(@Nullable Boolean value) {
- return Boolean.TRUE.equals(value);
- }
+ /**
+ * Unsupported variant of a {@link AsyncCacheWriter}.
+ *
+ * @since 3.2
+ */
+ enum UnsupportedAsyncCacheWriter implements AsyncCacheWriter {
+ INSTANCE;
- @Nullable
- private static byte[] nullSafeGetBytes(@Nullable ByteBuffer value) {
- return value != null ? ByteUtils.getBytes(value) : null;
+ @Override
+ public boolean isSupported() {
+ return false;
+ }
+
+ @Override
+ public CompletableFuture retrieve(String name, byte[] key, @Nullable Duration ttl) {
+ throw new UnsupportedOperationException("async retrieve not supported");
+ }
+
+ @Override
+ public CompletableFuture store(String name, byte[] key, byte[] value, @Nullable Duration ttl) {
+ throw new UnsupportedOperationException("async store not supported");
+ }
}
- private static boolean shouldExpireWithin(@Nullable Duration ttl) {
- return ttl != null && !ttl.isZero() && !ttl.isNegative();
+ /**
+ * Delegate implementing {@link AsyncCacheWriter} to provide asynchronous cache retrieval and storage operations using
+ * {@link ReactiveRedisConnectionFactory}.
+ *
+ * @since 3.2
+ */
+ class AsynchronousCacheWriterDelegate implements AsyncCacheWriter {
+
+ @Override
+ public boolean isSupported() {
+ return true;
+ }
+
+ @Override
+ public CompletableFuture retrieve(String name, byte[] key, @Nullable Duration ttl) {
+
+ return doWithConnection(connection -> {
+
+ ByteBuffer wrappedKey = ByteBuffer.wrap(key);
+ Mono> cacheLockCheckFlux;
+
+ if (isLockingCacheWriter())
+ cacheLockCheckFlux = waitForLock(connection, name);
+ else {
+ cacheLockCheckFlux = Mono.empty();
+ }
+
+ Mono get = shouldExpireWithin(ttl)
+ ? connection.stringCommands().getEx(wrappedKey, Expiration.from(ttl))
+ : connection.stringCommands().get(wrappedKey);
+
+ return cacheLockCheckFlux.then(get).map(ByteUtils::getBytes).toFuture();
+ });
+ }
+
+ @Override
+ public CompletableFuture store(String name, byte[] key, byte[] value, @Nullable Duration ttl) {
+
+ return doWithConnection(connection -> {
+
+ Mono> mono;
+
+ if (isLockingCacheWriter()) {
+
+ mono = Mono.usingWhen(doLock(name, key, value, connection), unused -> doStore(key, value, ttl, connection),
+ unused -> doUnlock(name, connection));
+ } else {
+ mono = doStore(key, value, ttl, connection);
+ }
+
+ return mono.then().toFuture();
+ });
+ }
+
+ private Mono doStore(byte[] cacheKey, byte[] value, @Nullable Duration ttl,
+ ReactiveRedisConnection connection) {
+
+ ByteBuffer wrappedKey = ByteBuffer.wrap(cacheKey);
+ ByteBuffer wrappedValue = ByteBuffer.wrap(value);
+
+ if (shouldExpireWithin(ttl)) {
+ return connection.stringCommands().set(wrappedKey, wrappedValue,
+ Expiration.from(ttl.toMillis(), TimeUnit.MILLISECONDS), SetOption.upsert());
+ } else {
+ return connection.stringCommands().set(wrappedKey, wrappedValue);
+ }
+ }
+
+ private Mono