diff --git a/pom.xml b/pom.xml index c3b30527e5..1c8f1f3c8b 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ org.springframework.data spring-data-redis - 3.2.0-SNAPSHOT + 3.2.0-GH-2743-SNAPSHOT Spring Data Redis Spring Data module for Redis diff --git a/src/main/java/org/springframework/data/redis/cache/DefaultRedisCacheWriter.java b/src/main/java/org/springframework/data/redis/cache/DefaultRedisCacheWriter.java index d80fd6335f..4e16712655 100644 --- a/src/main/java/org/springframework/data/redis/cache/DefaultRedisCacheWriter.java +++ b/src/main/java/org/springframework/data/redis/cache/DefaultRedisCacheWriter.java @@ -17,6 +17,7 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.publisher.SignalType; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; @@ -24,6 +25,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; import java.util.function.Function; import org.springframework.dao.PessimisticLockingFailureException; @@ -38,7 +40,7 @@ import org.springframework.lang.Nullable; import org.springframework.util.Assert; import org.springframework.util.ClassUtils; -import org.springframework.util.ObjectUtils; + /** * {@link RedisCacheWriter} implementation capable of reading/writing binary data from/to Redis in {@literal standalone} @@ -47,11 +49,11 @@ *

* {@link DefaultRedisCacheWriter} can be used in * {@link RedisCacheWriter#lockingRedisCacheWriter(RedisConnectionFactory) locking} or - * {@link RedisCacheWriter#nonLockingRedisCacheWriter(RedisConnectionFactory) non-locking} mode. While - * {@literal non-locking} aims for maximum performance it may result in overlapping, non-atomic, command execution for - * operations spanning multiple Redis interactions like {@code putIfAbsent}. The {@literal locking} counterpart prevents - * command overlap by setting an explicit lock key and checking against presence of this key which leads to additional - * requests and potential command wait times. + * {@link RedisCacheWriter#nonLockingRedisCacheWriter(RedisConnectionFactory) non-locking} mode. While {@literal non-locking} + * aims for maximum performance it may result in overlapping, non-atomic, command execution for operations spanning + * multiple Redis interactions like {@code putIfAbsent}. The {@literal locking} counterpart prevents command overlap + * by setting an explicit lock key and checking against presence of this key which leads to additional requests + * and potential command wait times. * * @author Christoph Strobl * @author Mark Paluch @@ -61,8 +63,12 @@ */ class DefaultRedisCacheWriter implements RedisCacheWriter { - private static final boolean REACTIVE_REDIS_CONNECTION_FACTORY_PRESENT = ClassUtils - .isPresent("org.springframework.data.redis.connection.ReactiveRedisConnectionFactory", null); + public static final boolean FLUX_PRESENT = ClassUtils.isPresent("reactor.core.publisher.Flux", null); + + private static final boolean REACTIVE_REDIS_CONNECTION_FACTORY_PRESENT = + ClassUtils.isPresent("org.springframework.data.redis.connection.ReactiveRedisConnectionFactory", null); + + private final AsyncCacheWriter asyncCacheWriter; private final BatchStrategy batchStrategy; @@ -74,8 +80,6 @@ class DefaultRedisCacheWriter implements RedisCacheWriter { private final TtlFunction lockTtl; - private final AsyncCacheWriter asyncCacheWriter; - /** * @param connectionFactory must not be {@literal null}. * @param batchStrategy must not be {@literal null}. @@ -86,8 +90,8 @@ class DefaultRedisCacheWriter implements RedisCacheWriter { /** * @param connectionFactory must not be {@literal null}. - * @param sleepTime sleep time between lock request attempts. Must not be {@literal null}. Use {@link Duration#ZERO} - * to disable locking. + * @param sleepTime sleep time between lock request attempts. Must not be {@literal null}. + * Use {@link Duration#ZERO} to disable locking. * @param batchStrategy must not be {@literal null}. */ DefaultRedisCacheWriter(RedisConnectionFactory connectionFactory, Duration sleepTime, BatchStrategy batchStrategy) { @@ -96,8 +100,8 @@ class DefaultRedisCacheWriter implements RedisCacheWriter { /** * @param connectionFactory must not be {@literal null}. - * @param sleepTime sleep time between lock request attempts. Must not be {@literal null}. Use {@link Duration#ZERO} - * to disable locking. + * @param sleepTime sleep time between lock request attempts. Must not be {@literal null}. + * Use {@link Duration#ZERO} to disable locking. * @param lockTtl Lock TTL function must not be {@literal null}. * @param cacheStatisticsCollector must not be {@literal null}. * @param batchStrategy must not be {@literal null}. @@ -116,12 +120,13 @@ class DefaultRedisCacheWriter implements RedisCacheWriter { this.lockTtl = lockTtl; this.statistics = cacheStatisticsCollector; this.batchStrategy = batchStrategy; + this.asyncCacheWriter = isAsyncCacheSupportEnabled() ? new AsynchronousCacheWriterDelegate() + : UnsupportedAsyncCacheWriter.INSTANCE; + } - if (REACTIVE_REDIS_CONNECTION_FACTORY_PRESENT && this.connectionFactory instanceof ReactiveRedisConnectionFactory) { - asyncCacheWriter = new AsynchronousCacheWriterDelegate(); - } else { - asyncCacheWriter = UnsupportedAsyncCacheWriter.INSTANCE; - } + private boolean isAsyncCacheSupportEnabled() { + return REACTIVE_REDIS_CONNECTION_FACTORY_PRESENT && FLUX_PRESENT + && this.connectionFactory instanceof ReactiveRedisConnectionFactory; } @Override @@ -168,7 +173,8 @@ public CompletableFuture retrieve(String name, byte[] key, @Nullable Dur if (cachedValue != null) { statistics.incHits(name); - } else { + } + else { statistics.incMisses(name); } @@ -186,8 +192,7 @@ public void put(String name, byte[] key, byte[] value, @Nullable Duration ttl) { execute(name, connection -> { if (shouldExpireWithin(ttl)) { - connection.stringCommands().set(key, value, Expiration.from(ttl.toMillis(), TimeUnit.MILLISECONDS), - SetOption.upsert()); + connection.stringCommands().set(key, value, toExpiration(ttl), SetOption.upsert()); } else { connection.stringCommands().set(key, value); } @@ -224,16 +229,11 @@ public byte[] putIfAbsent(String name, byte[] key, byte[] value, @Nullable Durat try { - boolean put; - - if (shouldExpireWithin(ttl)) { - put = ObjectUtils.nullSafeEquals( - connection.stringCommands().set(key, value, Expiration.from(ttl), SetOption.ifAbsent()), true); - } else { - put = ObjectUtils.nullSafeEquals(connection.stringCommands().setNX(key, value), true); - } + Boolean wasSet = shouldExpireWithin(ttl) + ? connection.stringCommands().set(key, value, Expiration.from(ttl), SetOption.ifAbsent()) + : connection.stringCommands().setNX(key, value); - if (put) { + if (Boolean.TRUE.equals(wasSet)) { statistics.incPuts(name); return null; } @@ -322,9 +322,11 @@ void lock(String name) { private Boolean doLock(String name, Object contextualKey, @Nullable Object contextualValue, RedisConnection connection) { - Expiration expiration = Expiration.from(this.lockTtl.getTimeToLive(contextualKey, contextualValue)); + byte[] cacheLockKey = createCacheLockKey(name); - return connection.stringCommands().set(createCacheLockKey(name), new byte[0], expiration, SetOption.SET_IF_ABSENT); + Expiration expiration = toExpiration(contextualKey, contextualValue); + + return connection.stringCommands().set(cacheLockKey, new byte[0], expiration, SetOption.SET_IF_ABSENT); } /** @@ -378,29 +380,40 @@ private void checkAndPotentiallyWaitUntilUnlocked(String name, RedisConnection c Thread.sleep(this.sleepTime.toMillis()); } } catch (InterruptedException cause) { - // Re-interrupt current Thread to allow other participants to react. Thread.currentThread().interrupt(); - - throw new PessimisticLockingFailureException(String.format("Interrupted while waiting to unlock cache %s", name), - cause); + String message = "Interrupted while waiting to unlock cache %s".formatted(name); + throw new PessimisticLockingFailureException(message, cause); } finally { this.statistics.incLockTime(name, System.nanoTime() - lockWaitTimeNs); } } boolean doCheckLock(String name, RedisConnection connection) { - return ObjectUtils.nullSafeEquals(connection.keyCommands().exists(createCacheLockKey(name)), true); + Boolean cacheLockExists = connection.keyCommands().exists(createCacheLockKey(name)); + return Boolean.TRUE.equals(cacheLockExists); } byte[] createCacheLockKey(String name) { return (name + "~lock").getBytes(StandardCharsets.UTF_8); } + private ReactiveRedisConnectionFactory getReactiveConnectionFactory() { + return (ReactiveRedisConnectionFactory) this.connectionFactory; + } + private static boolean shouldExpireWithin(@Nullable Duration ttl) { return ttl != null && !ttl.isZero() && !ttl.isNegative(); } + private Expiration toExpiration(Duration ttl) { + return Expiration.from(ttl.toMillis(), TimeUnit.MILLISECONDS); + } + + private Expiration toExpiration(Object key, @Nullable Object value) { + return Expiration.from(this.lockTtl.getTimeToLive(key, value)); + } + /** * Interface for asynchronous cache retrieval. * @@ -419,8 +432,8 @@ interface AsyncCacheWriter { * @param name the cache name from which to retrieve the cache entry. * @param key the cache entry key. * @param ttl optional TTL to set for Time-to-Idle eviction. - * @return a future that completes either with a value if the value exists or completing with {@code null} if the - * cache does not contain an entry. + * @return a future that completes either with a value if the value exists or completing with {@code null} + * if the cache does not contain an entry. */ CompletableFuture retrieve(String name, byte[] key, @Nullable Duration ttl); @@ -463,8 +476,8 @@ public CompletableFuture store(String name, byte[] key, byte[] value, @Nul } /** - * Delegate implementing {@link AsyncCacheWriter} to provide asynchronous cache retrieval and storage operations using - * {@link ReactiveRedisConnectionFactory}. + * Delegate implementing {@link AsyncCacheWriter} to provide asynchronous cache retrieval and storage operations + * using {@link ReactiveRedisConnectionFactory}. * * @since 3.2 */ @@ -481,11 +494,13 @@ public CompletableFuture retrieve(String name, byte[] key, @Nullable Dur return doWithConnection(connection -> { ByteBuffer wrappedKey = ByteBuffer.wrap(key); + Mono cacheLockCheck = isLockingCacheWriter() ? waitForLock(connection, name) : Mono.empty(); + ReactiveStringCommands stringCommands = connection.stringCommands(); Mono get = shouldExpireWithin(ttl) - ? stringCommands.getEx(wrappedKey, Expiration.from(ttl)) + ? stringCommands.getEx(wrappedKey, toExpiration(ttl)) : stringCommands.get(wrappedKey); return cacheLockCheck.then(get).map(ByteUtils::getBytes).toFuture(); @@ -498,41 +513,44 @@ public CompletableFuture store(String name, byte[] key, byte[] value, @Nul return doWithConnection(connection -> { Mono mono = isLockingCacheWriter() - ? doStoreWithLocking(name, key, value, ttl, connection) + ? doLockStoreUnlock(name, key, value, ttl, connection) : doStore(key, value, ttl, connection); return mono.then().toFuture(); }); } - private Mono doStoreWithLocking(String name, byte[] key, byte[] value, @Nullable Duration ttl, - ReactiveRedisConnection connection) { - - return Mono.usingWhen(doLock(name, key, value, connection), unused -> doStore(key, value, ttl, connection), - unused -> doUnlock(name, connection)); - } - private Mono doStore(byte[] cacheKey, byte[] value, @Nullable Duration ttl, ReactiveRedisConnection connection) { ByteBuffer wrappedKey = ByteBuffer.wrap(cacheKey); ByteBuffer wrappedValue = ByteBuffer.wrap(value); - if (shouldExpireWithin(ttl)) { - return connection.stringCommands().set(wrappedKey, wrappedValue, - Expiration.from(ttl.toMillis(), TimeUnit.MILLISECONDS), SetOption.upsert()); - } else { - return connection.stringCommands().set(wrappedKey, wrappedValue); - } + ReactiveStringCommands stringCommands = connection.stringCommands(); + + return shouldExpireWithin(ttl) + ? stringCommands.set(wrappedKey, wrappedValue, toExpiration(ttl), SetOption.upsert()) + : stringCommands.set(wrappedKey, wrappedValue); } + private Mono doLockStoreUnlock(String name, byte[] key, byte[] value, @Nullable Duration ttl, + ReactiveRedisConnection connection) { + + Mono lock = doLock(name, key, value, connection); + + Function> store = unused -> doStore(key, value, ttl, connection); + Function> unlock = unused -> doUnlock(name, connection); + + return Mono.usingWhen(lock, store, unlock); + } private Mono doLock(String name, Object contextualKey, @Nullable Object contextualValue, ReactiveRedisConnection connection) { - ByteBuffer key = ByteBuffer.wrap(createCacheLockKey(name)); + ByteBuffer key = toCacheLockKey(name); ByteBuffer value = ByteBuffer.wrap(new byte[0]); - Expiration expiration = Expiration.from(lockTtl.getTimeToLive(contextualKey, contextualValue)); + + Expiration expiration = toExpiration(contextualKey, contextualValue); return connection.stringCommands().set(key, value, expiration, SetOption.SET_IF_ABSENT) // // Ensure we emit an object, otherwise, the Mono.usingWhen operator doesn't run the inner resource function. @@ -540,33 +558,52 @@ private Mono doLock(String name, Object contextualKey, @Nullable Object } private Mono doUnlock(String name, ReactiveRedisConnection connection) { - return connection.keyCommands().del(ByteBuffer.wrap(createCacheLockKey(name))).then(); + return connection.keyCommands().del(toCacheLockKey(name)).then(); } private Mono waitForLock(ReactiveRedisConnection connection, String cacheName) { - AtomicLong lockWaitTimeNs = new AtomicLong(); - byte[] cacheLockKey = createCacheLockKey(cacheName); + AtomicLong lockWaitNanoTime = new AtomicLong(); + + Consumer setNanoTimeOnLockWait = subscription -> + lockWaitNanoTime.set(System.nanoTime()); - Flux wait = Flux.interval(Duration.ZERO, sleepTime); - Mono exists = connection.keyCommands().exists(ByteBuffer.wrap(cacheLockKey)).filter(it -> !it); + Consumer recordStatistics = signalType -> + statistics.incLockTime(cacheName, System.nanoTime() - lockWaitNanoTime.get()); - return wait.doOnSubscribe(subscription -> lockWaitTimeNs.set(System.nanoTime())) // - .flatMap(it -> exists) // - .doFinally(signalType -> statistics.incLockTime(cacheName, System.nanoTime() - lockWaitTimeNs.get())) // + Function> doWhileCacheLockExists = lockWaitTime -> connection.keyCommands() + .exists(toCacheLockKey(cacheName)).filter(cacheLockKeyExists -> !cacheLockKeyExists); + + return waitInterval(sleepTime) // + .doOnSubscribe(setNanoTimeOnLockWait) // + .flatMap(doWhileCacheLockExists) // + .doFinally(recordStatistics) // .next() // .then(); } + private Flux waitInterval(Duration period) { + return Flux.interval(Duration.ZERO, period); + } + + private ByteBuffer toCacheLockKey(String cacheName) { + return ByteBuffer.wrap(createCacheLockKey(cacheName)); + } + private CompletableFuture doWithConnection( Function> callback) { - ReactiveRedisConnectionFactory cf = (ReactiveRedisConnectionFactory) connectionFactory; + Mono reactiveConnection = + Mono.fromSupplier(getReactiveConnectionFactory()::getReactiveConnection); + + Function> commandExecution = connection -> + Mono.fromCompletionStage(callback.apply(connection)); + + Function> connectionClose = ReactiveRedisConnection::closeLater; + + Mono result = Mono.usingWhen(reactiveConnection, commandExecution, connectionClose); - return Mono.usingWhen(Mono.fromSupplier(cf::getReactiveConnection), // - it -> Mono.fromCompletionStage(callback.apply(it)), // - ReactiveRedisConnection::closeLater) // - .toFuture(); + return result.toFuture(); } } }