-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refine and clarify operations in the async cache implementation #2744
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,13 +17,15 @@ | |
|
||
import reactor.core.publisher.Flux; | ||
import reactor.core.publisher.Mono; | ||
import reactor.core.publisher.SignalType; | ||
|
||
import java.nio.ByteBuffer; | ||
import java.nio.charset.StandardCharsets; | ||
import java.time.Duration; | ||
import java.util.concurrent.CompletableFuture; | ||
import java.util.concurrent.TimeUnit; | ||
import java.util.concurrent.atomic.AtomicLong; | ||
import java.util.function.Consumer; | ||
import java.util.function.Function; | ||
|
||
import org.springframework.dao.PessimisticLockingFailureException; | ||
|
@@ -38,7 +40,7 @@ | |
import org.springframework.lang.Nullable; | ||
import org.springframework.util.Assert; | ||
import org.springframework.util.ClassUtils; | ||
import org.springframework.util.ObjectUtils; | ||
|
||
|
||
/** | ||
* {@link RedisCacheWriter} implementation capable of reading/writing binary data from/to Redis in {@literal standalone} | ||
|
@@ -47,11 +49,11 @@ | |
* <p> | ||
* {@link DefaultRedisCacheWriter} can be used in | ||
* {@link RedisCacheWriter#lockingRedisCacheWriter(RedisConnectionFactory) locking} or | ||
* {@link RedisCacheWriter#nonLockingRedisCacheWriter(RedisConnectionFactory) non-locking} mode. While | ||
* {@literal non-locking} aims for maximum performance it may result in overlapping, non-atomic, command execution for | ||
* operations spanning multiple Redis interactions like {@code putIfAbsent}. The {@literal locking} counterpart prevents | ||
* command overlap by setting an explicit lock key and checking against presence of this key which leads to additional | ||
* requests and potential command wait times. | ||
* {@link RedisCacheWriter#nonLockingRedisCacheWriter(RedisConnectionFactory) non-locking} mode. While {@literal non-locking} | ||
* aims for maximum performance it may result in overlapping, non-atomic, command execution for operations spanning | ||
* multiple Redis interactions like {@code putIfAbsent}. The {@literal locking} counterpart prevents command overlap | ||
* by setting an explicit lock key and checking against presence of this key which leads to additional requests | ||
* and potential command wait times. | ||
* | ||
* @author Christoph Strobl | ||
* @author Mark Paluch | ||
|
@@ -61,8 +63,12 @@ | |
*/ | ||
class DefaultRedisCacheWriter implements RedisCacheWriter { | ||
|
||
private static final boolean REACTIVE_REDIS_CONNECTION_FACTORY_PRESENT = ClassUtils | ||
.isPresent("org.springframework.data.redis.connection.ReactiveRedisConnectionFactory", null); | ||
public static final boolean FLUX_PRESENT = ClassUtils.isPresent("reactor.core.publisher.Flux", null); | ||
|
||
private static final boolean REACTIVE_REDIS_CONNECTION_FACTORY_PRESENT = | ||
ClassUtils.isPresent("org.springframework.data.redis.connection.ReactiveRedisConnectionFactory", null); | ||
|
||
private final AsyncCacheWriter asyncCacheWriter; | ||
|
||
private final BatchStrategy batchStrategy; | ||
|
||
|
@@ -74,8 +80,6 @@ class DefaultRedisCacheWriter implements RedisCacheWriter { | |
|
||
private final TtlFunction lockTtl; | ||
|
||
private final AsyncCacheWriter asyncCacheWriter; | ||
|
||
/** | ||
* @param connectionFactory must not be {@literal null}. | ||
* @param batchStrategy must not be {@literal null}. | ||
|
@@ -86,8 +90,8 @@ class DefaultRedisCacheWriter implements RedisCacheWriter { | |
|
||
/** | ||
* @param connectionFactory must not be {@literal null}. | ||
* @param sleepTime sleep time between lock request attempts. Must not be {@literal null}. Use {@link Duration#ZERO} | ||
* to disable locking. | ||
* @param sleepTime sleep time between lock request attempts. Must not be {@literal null}. | ||
* Use {@link Duration#ZERO} to disable locking. | ||
* @param batchStrategy must not be {@literal null}. | ||
*/ | ||
DefaultRedisCacheWriter(RedisConnectionFactory connectionFactory, Duration sleepTime, BatchStrategy batchStrategy) { | ||
|
@@ -96,8 +100,8 @@ class DefaultRedisCacheWriter implements RedisCacheWriter { | |
|
||
/** | ||
* @param connectionFactory must not be {@literal null}. | ||
* @param sleepTime sleep time between lock request attempts. Must not be {@literal null}. Use {@link Duration#ZERO} | ||
* to disable locking. | ||
* @param sleepTime sleep time between lock request attempts. Must not be {@literal null}. | ||
* Use {@link Duration#ZERO} to disable locking. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please use the Spring Data formatter settings. There's no point in reformatting with a different formatter as the next committer is going to apply a different format again resulting in a lot of change noise. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I manually format Javadoc. The Spring Data source code formatter for Javadoc needs to be fixed. The excessive indentation is causing unnatural line breaks (such as sentences ending in |
||
* @param lockTtl Lock TTL function must not be {@literal null}. | ||
* @param cacheStatisticsCollector must not be {@literal null}. | ||
* @param batchStrategy must not be {@literal null}. | ||
|
@@ -116,12 +120,13 @@ class DefaultRedisCacheWriter implements RedisCacheWriter { | |
this.lockTtl = lockTtl; | ||
this.statistics = cacheStatisticsCollector; | ||
this.batchStrategy = batchStrategy; | ||
this.asyncCacheWriter = isAsyncCacheSupportEnabled() ? new AsynchronousCacheWriterDelegate() | ||
: UnsupportedAsyncCacheWriter.INSTANCE; | ||
} | ||
|
||
if (REACTIVE_REDIS_CONNECTION_FACTORY_PRESENT && this.connectionFactory instanceof ReactiveRedisConnectionFactory) { | ||
asyncCacheWriter = new AsynchronousCacheWriterDelegate(); | ||
} else { | ||
asyncCacheWriter = UnsupportedAsyncCacheWriter.INSTANCE; | ||
} | ||
private boolean isAsyncCacheSupportEnabled() { | ||
return REACTIVE_REDIS_CONNECTION_FACTORY_PRESENT && FLUX_PRESENT | ||
&& this.connectionFactory instanceof ReactiveRedisConnectionFactory; | ||
} | ||
|
||
@Override | ||
|
@@ -168,7 +173,8 @@ public CompletableFuture<byte[]> retrieve(String name, byte[] key, @Nullable Dur | |
|
||
if (cachedValue != null) { | ||
statistics.incHits(name); | ||
} else { | ||
} | ||
else { | ||
statistics.incMisses(name); | ||
} | ||
|
||
|
@@ -186,8 +192,7 @@ public void put(String name, byte[] key, byte[] value, @Nullable Duration ttl) { | |
execute(name, connection -> { | ||
|
||
if (shouldExpireWithin(ttl)) { | ||
connection.stringCommands().set(key, value, Expiration.from(ttl.toMillis(), TimeUnit.MILLISECONDS), | ||
SetOption.upsert()); | ||
connection.stringCommands().set(key, value, toExpiration(ttl), SetOption.upsert()); | ||
} else { | ||
connection.stringCommands().set(key, value); | ||
} | ||
|
@@ -224,16 +229,11 @@ public byte[] putIfAbsent(String name, byte[] key, byte[] value, @Nullable Durat | |
|
||
try { | ||
|
||
boolean put; | ||
|
||
if (shouldExpireWithin(ttl)) { | ||
put = ObjectUtils.nullSafeEquals( | ||
connection.stringCommands().set(key, value, Expiration.from(ttl), SetOption.ifAbsent()), true); | ||
} else { | ||
put = ObjectUtils.nullSafeEquals(connection.stringCommands().setNX(key, value), true); | ||
} | ||
Boolean wasSet = shouldExpireWithin(ttl) | ||
? connection.stringCommands().set(key, value, Expiration.from(ttl), SetOption.ifAbsent()) | ||
: connection.stringCommands().setNX(key, value); | ||
|
||
if (put) { | ||
if (Boolean.TRUE.equals(wasSet)) { | ||
statistics.incPuts(name); | ||
return null; | ||
} | ||
|
@@ -322,9 +322,11 @@ void lock(String name) { | |
private Boolean doLock(String name, Object contextualKey, @Nullable Object contextualValue, | ||
RedisConnection connection) { | ||
|
||
Expiration expiration = Expiration.from(this.lockTtl.getTimeToLive(contextualKey, contextualValue)); | ||
byte[] cacheLockKey = createCacheLockKey(name); | ||
|
||
return connection.stringCommands().set(createCacheLockKey(name), new byte[0], expiration, SetOption.SET_IF_ABSENT); | ||
Expiration expiration = toExpiration(contextualKey, contextualValue); | ||
|
||
return connection.stringCommands().set(cacheLockKey, new byte[0], expiration, SetOption.SET_IF_ABSENT); | ||
} | ||
|
||
/** | ||
|
@@ -378,29 +380,40 @@ private void checkAndPotentiallyWaitUntilUnlocked(String name, RedisConnection c | |
Thread.sleep(this.sleepTime.toMillis()); | ||
} | ||
} catch (InterruptedException cause) { | ||
|
||
// Re-interrupt current Thread to allow other participants to react. | ||
Thread.currentThread().interrupt(); | ||
|
||
throw new PessimisticLockingFailureException(String.format("Interrupted while waiting to unlock cache %s", name), | ||
cause); | ||
String message = "Interrupted while waiting to unlock cache %s".formatted(name); | ||
throw new PessimisticLockingFailureException(message, cause); | ||
} finally { | ||
this.statistics.incLockTime(name, System.nanoTime() - lockWaitTimeNs); | ||
} | ||
} | ||
|
||
boolean doCheckLock(String name, RedisConnection connection) { | ||
return ObjectUtils.nullSafeEquals(connection.keyCommands().exists(createCacheLockKey(name)), true); | ||
Boolean cacheLockExists = connection.keyCommands().exists(createCacheLockKey(name)); | ||
return Boolean.TRUE.equals(cacheLockExists); | ||
} | ||
|
||
byte[] createCacheLockKey(String name) { | ||
return (name + "~lock").getBytes(StandardCharsets.UTF_8); | ||
} | ||
|
||
private ReactiveRedisConnectionFactory getReactiveConnectionFactory() { | ||
return (ReactiveRedisConnectionFactory) this.connectionFactory; | ||
} | ||
|
||
private static boolean shouldExpireWithin(@Nullable Duration ttl) { | ||
return ttl != null && !ttl.isZero() && !ttl.isNegative(); | ||
} | ||
|
||
private Expiration toExpiration(Duration ttl) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These tiny methods are unnecessary and generate a lot of noise making it harder to read the code. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So this: return connection.stringCommands().set(wrappedKey, wrappedValue,
Expiration.from(ttl.toMillis(), TimeUnit.MILLISECONDS), SetOption.upsert()); Is easier to read than this: stringCommands.set(wrappedKey, wrappedValue, toExpiration(ttl), SetOption.upsert()) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Technically, I have even seen things likes this: return connection.stringCommands().set(ByteBuffer.wrap(key), ByteBuffer.wrap(value),
Expiration.from(ttl.toMillis(), TimeUnit.MILLISECONDS), SetOption.upsert()); There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Additionally, these methods can be use as Lambda's, such as in |
||
return Expiration.from(ttl.toMillis(), TimeUnit.MILLISECONDS); | ||
} | ||
|
||
private Expiration toExpiration(Object key, @Nullable Object value) { | ||
return Expiration.from(this.lockTtl.getTimeToLive(key, value)); | ||
} | ||
|
||
/** | ||
* Interface for asynchronous cache retrieval. | ||
* | ||
|
@@ -419,8 +432,8 @@ interface AsyncCacheWriter { | |
* @param name the cache name from which to retrieve the cache entry. | ||
* @param key the cache entry key. | ||
* @param ttl optional TTL to set for Time-to-Idle eviction. | ||
* @return a future that completes either with a value if the value exists or completing with {@code null} if the | ||
* cache does not contain an entry. | ||
* @return a future that completes either with a value if the value exists or completing with {@code null} | ||
* if the cache does not contain an entry. | ||
*/ | ||
CompletableFuture<byte[]> retrieve(String name, byte[] key, @Nullable Duration ttl); | ||
|
||
|
@@ -463,8 +476,8 @@ public CompletableFuture<Void> store(String name, byte[] key, byte[] value, @Nul | |
} | ||
|
||
/** | ||
* Delegate implementing {@link AsyncCacheWriter} to provide asynchronous cache retrieval and storage operations using | ||
* {@link ReactiveRedisConnectionFactory}. | ||
* Delegate implementing {@link AsyncCacheWriter} to provide asynchronous cache retrieval and storage operations | ||
* using {@link ReactiveRedisConnectionFactory}. | ||
* | ||
* @since 3.2 | ||
*/ | ||
|
@@ -481,11 +494,13 @@ public CompletableFuture<byte[]> retrieve(String name, byte[] key, @Nullable Dur | |
return doWithConnection(connection -> { | ||
|
||
ByteBuffer wrappedKey = ByteBuffer.wrap(key); | ||
|
||
Mono<?> cacheLockCheck = isLockingCacheWriter() ? waitForLock(connection, name) : Mono.empty(); | ||
|
||
ReactiveStringCommands stringCommands = connection.stringCommands(); | ||
|
||
Mono<ByteBuffer> get = shouldExpireWithin(ttl) | ||
? stringCommands.getEx(wrappedKey, Expiration.from(ttl)) | ||
? stringCommands.getEx(wrappedKey, toExpiration(ttl)) | ||
: stringCommands.get(wrappedKey); | ||
|
||
return cacheLockCheck.then(get).map(ByteUtils::getBytes).toFuture(); | ||
|
@@ -498,75 +513,97 @@ public CompletableFuture<Void> store(String name, byte[] key, byte[] value, @Nul | |
return doWithConnection(connection -> { | ||
|
||
Mono<?> mono = isLockingCacheWriter() | ||
? doStoreWithLocking(name, key, value, ttl, connection) | ||
? doLockStoreUnlock(name, key, value, ttl, connection) | ||
: doStore(key, value, ttl, connection); | ||
|
||
return mono.then().toFuture(); | ||
}); | ||
} | ||
|
||
private Mono<Boolean> doStoreWithLocking(String name, byte[] key, byte[] value, @Nullable Duration ttl, | ||
ReactiveRedisConnection connection) { | ||
|
||
return Mono.usingWhen(doLock(name, key, value, connection), unused -> doStore(key, value, ttl, connection), | ||
unused -> doUnlock(name, connection)); | ||
} | ||
|
||
private Mono<Boolean> doStore(byte[] cacheKey, byte[] value, @Nullable Duration ttl, | ||
ReactiveRedisConnection connection) { | ||
|
||
ByteBuffer wrappedKey = ByteBuffer.wrap(cacheKey); | ||
ByteBuffer wrappedValue = ByteBuffer.wrap(value); | ||
|
||
if (shouldExpireWithin(ttl)) { | ||
return connection.stringCommands().set(wrappedKey, wrappedValue, | ||
Expiration.from(ttl.toMillis(), TimeUnit.MILLISECONDS), SetOption.upsert()); | ||
} else { | ||
return connection.stringCommands().set(wrappedKey, wrappedValue); | ||
} | ||
ReactiveStringCommands stringCommands = connection.stringCommands(); | ||
|
||
return shouldExpireWithin(ttl) | ||
? stringCommands.set(wrappedKey, wrappedValue, toExpiration(ttl), SetOption.upsert()) | ||
: stringCommands.set(wrappedKey, wrappedValue); | ||
} | ||
|
||
private Mono<Boolean> doLockStoreUnlock(String name, byte[] key, byte[] value, @Nullable Duration ttl, | ||
ReactiveRedisConnection connection) { | ||
|
||
Mono<Object> lock = doLock(name, key, value, connection); | ||
|
||
Function<Object, Mono<Boolean>> store = unused -> doStore(key, value, ttl, connection); | ||
Function<Object, Mono<Void>> unlock = unused -> doUnlock(name, connection); | ||
|
||
return Mono.usingWhen(lock, store, unlock); | ||
} | ||
|
||
private Mono<Object> doLock(String name, Object contextualKey, @Nullable Object contextualValue, | ||
ReactiveRedisConnection connection) { | ||
|
||
ByteBuffer key = ByteBuffer.wrap(createCacheLockKey(name)); | ||
ByteBuffer key = toCacheLockKey(name); | ||
ByteBuffer value = ByteBuffer.wrap(new byte[0]); | ||
Expiration expiration = Expiration.from(lockTtl.getTimeToLive(contextualKey, contextualValue)); | ||
|
||
Expiration expiration = toExpiration(contextualKey, contextualValue); | ||
|
||
return connection.stringCommands().set(key, value, expiration, SetOption.SET_IF_ABSENT) // | ||
// Ensure we emit an object, otherwise, the Mono.usingWhen operator doesn't run the inner resource function. | ||
.thenReturn(Boolean.TRUE); | ||
} | ||
|
||
private Mono<Void> doUnlock(String name, ReactiveRedisConnection connection) { | ||
return connection.keyCommands().del(ByteBuffer.wrap(createCacheLockKey(name))).then(); | ||
return connection.keyCommands().del(toCacheLockKey(name)).then(); | ||
} | ||
|
||
private Mono<Void> waitForLock(ReactiveRedisConnection connection, String cacheName) { | ||
|
||
AtomicLong lockWaitTimeNs = new AtomicLong(); | ||
byte[] cacheLockKey = createCacheLockKey(cacheName); | ||
AtomicLong lockWaitNanoTime = new AtomicLong(); | ||
|
||
Consumer<org.reactivestreams.Subscription> setNanoTimeOnLockWait = subscription -> | ||
lockWaitNanoTime.set(System.nanoTime()); | ||
|
||
Flux<Long> wait = Flux.interval(Duration.ZERO, sleepTime); | ||
Mono<Boolean> exists = connection.keyCommands().exists(ByteBuffer.wrap(cacheLockKey)).filter(it -> !it); | ||
Consumer<SignalType> recordStatistics = signalType -> | ||
statistics.incLockTime(cacheName, System.nanoTime() - lockWaitNanoTime.get()); | ||
|
||
return wait.doOnSubscribe(subscription -> lockWaitTimeNs.set(System.nanoTime())) // | ||
.flatMap(it -> exists) // | ||
.doFinally(signalType -> statistics.incLockTime(cacheName, System.nanoTime() - lockWaitTimeNs.get())) // | ||
Function<Long, Mono<Boolean>> doWhileCacheLockExists = lockWaitTime -> connection.keyCommands() | ||
.exists(toCacheLockKey(cacheName)).filter(cacheLockKeyExists -> !cacheLockKeyExists); | ||
|
||
return waitInterval(sleepTime) // | ||
.doOnSubscribe(setNanoTimeOnLockWait) // | ||
.flatMap(doWhileCacheLockExists) // | ||
.doFinally(recordStatistics) // | ||
.next() // | ||
.then(); | ||
} | ||
|
||
private Flux<Long> waitInterval(Duration period) { | ||
return Flux.interval(Duration.ZERO, period); | ||
} | ||
|
||
private ByteBuffer toCacheLockKey(String cacheName) { | ||
return ByteBuffer.wrap(createCacheLockKey(cacheName)); | ||
} | ||
|
||
private <T> CompletableFuture<T> doWithConnection( | ||
Function<ReactiveRedisConnection, CompletableFuture<T>> callback) { | ||
|
||
ReactiveRedisConnectionFactory cf = (ReactiveRedisConnectionFactory) connectionFactory; | ||
Mono<ReactiveRedisConnection> reactiveConnection = | ||
Mono.fromSupplier(getReactiveConnectionFactory()::getReactiveConnection); | ||
|
||
Function<ReactiveRedisConnection, Mono<T>> commandExecution = connection -> | ||
Mono.fromCompletionStage(callback.apply(connection)); | ||
|
||
Function<ReactiveRedisConnection, Mono<Void>> connectionClose = ReactiveRedisConnection::closeLater; | ||
|
||
Mono<T> result = Mono.usingWhen(reactiveConnection, commandExecution, connectionClose); | ||
|
||
return Mono.usingWhen(Mono.fromSupplier(cf::getReactiveConnection), // | ||
it -> Mono.fromCompletionStage(callback.apply(it)), // | ||
ReactiveRedisConnection::closeLater) // | ||
.toFuture(); | ||
return result.toFuture(); | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What has happened to the rest of the Javadoc? GH shows that we only document
sleepTime
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only "must not be null" on
@param
tags is redundant and non-informative.In general, I'd argue if we are not going to take the time to write meaningful Javadoc, then there is no point to have Javadoc on the method at all.