Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refine and clarify operations in the async cache implementation #2744

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

<groupId>org.springframework.data</groupId>
<artifactId>spring-data-redis</artifactId>
<version>3.2.0-SNAPSHOT</version>
<version>3.2.0-GH-2743-SNAPSHOT</version>

<name>Spring Data Redis</name>
<description>Spring Data module for Redis</description>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Function;

import org.springframework.dao.PessimisticLockingFailureException;
Expand All @@ -38,7 +40,7 @@
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
import org.springframework.util.ObjectUtils;


/**
* {@link RedisCacheWriter} implementation capable of reading/writing binary data from/to Redis in {@literal standalone}
Expand All @@ -47,11 +49,11 @@
* <p>
* {@link DefaultRedisCacheWriter} can be used in
* {@link RedisCacheWriter#lockingRedisCacheWriter(RedisConnectionFactory) locking} or
* {@link RedisCacheWriter#nonLockingRedisCacheWriter(RedisConnectionFactory) non-locking} mode. While
* {@literal non-locking} aims for maximum performance it may result in overlapping, non-atomic, command execution for
* operations spanning multiple Redis interactions like {@code putIfAbsent}. The {@literal locking} counterpart prevents
* command overlap by setting an explicit lock key and checking against presence of this key which leads to additional
* requests and potential command wait times.
* {@link RedisCacheWriter#nonLockingRedisCacheWriter(RedisConnectionFactory) non-locking} mode. While {@literal non-locking}
* aims for maximum performance it may result in overlapping, non-atomic, command execution for operations spanning
* multiple Redis interactions like {@code putIfAbsent}. The {@literal locking} counterpart prevents command overlap
* by setting an explicit lock key and checking against presence of this key which leads to additional requests
* and potential command wait times.
*
* @author Christoph Strobl
* @author Mark Paluch
Expand All @@ -61,8 +63,12 @@
*/
class DefaultRedisCacheWriter implements RedisCacheWriter {

private static final boolean REACTIVE_REDIS_CONNECTION_FACTORY_PRESENT = ClassUtils
.isPresent("org.springframework.data.redis.connection.ReactiveRedisConnectionFactory", null);
public static final boolean FLUX_PRESENT = ClassUtils.isPresent("reactor.core.publisher.Flux", null);

private static final boolean REACTIVE_REDIS_CONNECTION_FACTORY_PRESENT =
ClassUtils.isPresent("org.springframework.data.redis.connection.ReactiveRedisConnectionFactory", null);

private final AsyncCacheWriter asyncCacheWriter;

private final BatchStrategy batchStrategy;

Expand All @@ -74,8 +80,6 @@ class DefaultRedisCacheWriter implements RedisCacheWriter {

private final TtlFunction lockTtl;

private final AsyncCacheWriter asyncCacheWriter;

/**
* @param connectionFactory must not be {@literal null}.
* @param batchStrategy must not be {@literal null}.
Expand All @@ -86,8 +90,8 @@ class DefaultRedisCacheWriter implements RedisCacheWriter {

/**
* @param connectionFactory must not be {@literal null}.
* @param sleepTime sleep time between lock request attempts. Must not be {@literal null}. Use {@link Duration#ZERO}
* to disable locking.
* @param sleepTime sleep time between lock request attempts. Must not be {@literal null}.
* Use {@link Duration#ZERO} to disable locking.
* @param batchStrategy must not be {@literal null}.
*/
DefaultRedisCacheWriter(RedisConnectionFactory connectionFactory, Duration sleepTime, BatchStrategy batchStrategy) {
Expand All @@ -96,8 +100,8 @@ class DefaultRedisCacheWriter implements RedisCacheWriter {

/**
* @param connectionFactory must not be {@literal null}.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What has happened to the rest of the Javadoc? GH shows that we only document sleepTime.

Copy link
Contributor Author

@jxblum jxblum Oct 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only "must not be null" on @param tags is redundant and non-informative.

In general, I'd argue if we are not going to take the time to write meaningful Javadoc, then there is no point to have Javadoc on the method at all.

* @param sleepTime sleep time between lock request attempts. Must not be {@literal null}. Use {@link Duration#ZERO}
* to disable locking.
* @param sleepTime sleep time between lock request attempts. Must not be {@literal null}.
* Use {@link Duration#ZERO} to disable locking.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use the Spring Data formatter settings. There's no point in reformatting with a different formatter as the next committer is going to apply a different format again resulting in a lot of change noise.

Copy link
Contributor Author

@jxblum jxblum Oct 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I manually format Javadoc.

The Spring Data source code formatter for Javadoc needs to be fixed.

The excessive indentation is causing unnatural line breaks (such as sentences ending in and, or or, or While, etc, are bad writing style), creating orphans, and doing other inconsistent things that do not align with the core Spring Frameworks Javadoc guidelines.

* @param lockTtl Lock TTL function must not be {@literal null}.
* @param cacheStatisticsCollector must not be {@literal null}.
* @param batchStrategy must not be {@literal null}.
Expand All @@ -116,12 +120,13 @@ class DefaultRedisCacheWriter implements RedisCacheWriter {
this.lockTtl = lockTtl;
this.statistics = cacheStatisticsCollector;
this.batchStrategy = batchStrategy;
this.asyncCacheWriter = isAsyncCacheSupportEnabled() ? new AsynchronousCacheWriterDelegate()
: UnsupportedAsyncCacheWriter.INSTANCE;
}

if (REACTIVE_REDIS_CONNECTION_FACTORY_PRESENT && this.connectionFactory instanceof ReactiveRedisConnectionFactory) {
asyncCacheWriter = new AsynchronousCacheWriterDelegate();
} else {
asyncCacheWriter = UnsupportedAsyncCacheWriter.INSTANCE;
}
private boolean isAsyncCacheSupportEnabled() {
return REACTIVE_REDIS_CONNECTION_FACTORY_PRESENT && FLUX_PRESENT
&& this.connectionFactory instanceof ReactiveRedisConnectionFactory;
}

@Override
Expand Down Expand Up @@ -168,7 +173,8 @@ public CompletableFuture<byte[]> retrieve(String name, byte[] key, @Nullable Dur

if (cachedValue != null) {
statistics.incHits(name);
} else {
}
else {
statistics.incMisses(name);
}

Expand All @@ -186,8 +192,7 @@ public void put(String name, byte[] key, byte[] value, @Nullable Duration ttl) {
execute(name, connection -> {

if (shouldExpireWithin(ttl)) {
connection.stringCommands().set(key, value, Expiration.from(ttl.toMillis(), TimeUnit.MILLISECONDS),
SetOption.upsert());
connection.stringCommands().set(key, value, toExpiration(ttl), SetOption.upsert());
} else {
connection.stringCommands().set(key, value);
}
Expand Down Expand Up @@ -224,16 +229,11 @@ public byte[] putIfAbsent(String name, byte[] key, byte[] value, @Nullable Durat

try {

boolean put;

if (shouldExpireWithin(ttl)) {
put = ObjectUtils.nullSafeEquals(
connection.stringCommands().set(key, value, Expiration.from(ttl), SetOption.ifAbsent()), true);
} else {
put = ObjectUtils.nullSafeEquals(connection.stringCommands().setNX(key, value), true);
}
Boolean wasSet = shouldExpireWithin(ttl)
? connection.stringCommands().set(key, value, Expiration.from(ttl), SetOption.ifAbsent())
: connection.stringCommands().setNX(key, value);

if (put) {
if (Boolean.TRUE.equals(wasSet)) {
statistics.incPuts(name);
return null;
}
Expand Down Expand Up @@ -322,9 +322,11 @@ void lock(String name) {
private Boolean doLock(String name, Object contextualKey, @Nullable Object contextualValue,
RedisConnection connection) {

Expiration expiration = Expiration.from(this.lockTtl.getTimeToLive(contextualKey, contextualValue));
byte[] cacheLockKey = createCacheLockKey(name);

return connection.stringCommands().set(createCacheLockKey(name), new byte[0], expiration, SetOption.SET_IF_ABSENT);
Expiration expiration = toExpiration(contextualKey, contextualValue);

return connection.stringCommands().set(cacheLockKey, new byte[0], expiration, SetOption.SET_IF_ABSENT);
}

/**
Expand Down Expand Up @@ -378,29 +380,40 @@ private void checkAndPotentiallyWaitUntilUnlocked(String name, RedisConnection c
Thread.sleep(this.sleepTime.toMillis());
}
} catch (InterruptedException cause) {

// Re-interrupt current Thread to allow other participants to react.
Thread.currentThread().interrupt();

throw new PessimisticLockingFailureException(String.format("Interrupted while waiting to unlock cache %s", name),
cause);
String message = "Interrupted while waiting to unlock cache %s".formatted(name);
throw new PessimisticLockingFailureException(message, cause);
} finally {
this.statistics.incLockTime(name, System.nanoTime() - lockWaitTimeNs);
}
}

boolean doCheckLock(String name, RedisConnection connection) {
return ObjectUtils.nullSafeEquals(connection.keyCommands().exists(createCacheLockKey(name)), true);
Boolean cacheLockExists = connection.keyCommands().exists(createCacheLockKey(name));
return Boolean.TRUE.equals(cacheLockExists);
}

byte[] createCacheLockKey(String name) {
return (name + "~lock").getBytes(StandardCharsets.UTF_8);
}

private ReactiveRedisConnectionFactory getReactiveConnectionFactory() {
return (ReactiveRedisConnectionFactory) this.connectionFactory;
}

private static boolean shouldExpireWithin(@Nullable Duration ttl) {
return ttl != null && !ttl.isZero() && !ttl.isNegative();
}

private Expiration toExpiration(Duration ttl) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These tiny methods are unnecessary and generate a lot of noise making it harder to read the code.

Copy link
Contributor Author

@jxblum jxblum Oct 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this:

return connection.stringCommands().set(wrappedKey, wrappedValue,
         Expiration.from(ttl.toMillis(), TimeUnit.MILLISECONDS), SetOption.upsert());

Is easier to read than this:

stringCommands.set(wrappedKey, wrappedValue, toExpiration(ttl), SetOption.upsert())

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically, I have even seen things likes this:

return connection.stringCommands().set(ByteBuffer.wrap(key), ByteBuffer.wrap(value),
         Expiration.from(ttl.toMillis(), TimeUnit.MILLISECONDS), SetOption.upsert());

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Additionally, these methods can be use as Lambda's, such as in .map(:Function<S, T>) functions, which are useful in the context of Optionals, Streams, etc.

return Expiration.from(ttl.toMillis(), TimeUnit.MILLISECONDS);
}

private Expiration toExpiration(Object key, @Nullable Object value) {
return Expiration.from(this.lockTtl.getTimeToLive(key, value));
}

/**
* Interface for asynchronous cache retrieval.
*
Expand All @@ -419,8 +432,8 @@ interface AsyncCacheWriter {
* @param name the cache name from which to retrieve the cache entry.
* @param key the cache entry key.
* @param ttl optional TTL to set for Time-to-Idle eviction.
* @return a future that completes either with a value if the value exists or completing with {@code null} if the
* cache does not contain an entry.
* @return a future that completes either with a value if the value exists or completing with {@code null}
* if the cache does not contain an entry.
*/
CompletableFuture<byte[]> retrieve(String name, byte[] key, @Nullable Duration ttl);

Expand Down Expand Up @@ -463,8 +476,8 @@ public CompletableFuture<Void> store(String name, byte[] key, byte[] value, @Nul
}

/**
* Delegate implementing {@link AsyncCacheWriter} to provide asynchronous cache retrieval and storage operations using
* {@link ReactiveRedisConnectionFactory}.
* Delegate implementing {@link AsyncCacheWriter} to provide asynchronous cache retrieval and storage operations
* using {@link ReactiveRedisConnectionFactory}.
*
* @since 3.2
*/
Expand All @@ -481,11 +494,13 @@ public CompletableFuture<byte[]> retrieve(String name, byte[] key, @Nullable Dur
return doWithConnection(connection -> {

ByteBuffer wrappedKey = ByteBuffer.wrap(key);

Mono<?> cacheLockCheck = isLockingCacheWriter() ? waitForLock(connection, name) : Mono.empty();

ReactiveStringCommands stringCommands = connection.stringCommands();

Mono<ByteBuffer> get = shouldExpireWithin(ttl)
? stringCommands.getEx(wrappedKey, Expiration.from(ttl))
? stringCommands.getEx(wrappedKey, toExpiration(ttl))
: stringCommands.get(wrappedKey);

return cacheLockCheck.then(get).map(ByteUtils::getBytes).toFuture();
Expand All @@ -498,75 +513,97 @@ public CompletableFuture<Void> store(String name, byte[] key, byte[] value, @Nul
return doWithConnection(connection -> {

Mono<?> mono = isLockingCacheWriter()
? doStoreWithLocking(name, key, value, ttl, connection)
? doLockStoreUnlock(name, key, value, ttl, connection)
: doStore(key, value, ttl, connection);

return mono.then().toFuture();
});
}

private Mono<Boolean> doStoreWithLocking(String name, byte[] key, byte[] value, @Nullable Duration ttl,
ReactiveRedisConnection connection) {

return Mono.usingWhen(doLock(name, key, value, connection), unused -> doStore(key, value, ttl, connection),
unused -> doUnlock(name, connection));
}

private Mono<Boolean> doStore(byte[] cacheKey, byte[] value, @Nullable Duration ttl,
ReactiveRedisConnection connection) {

ByteBuffer wrappedKey = ByteBuffer.wrap(cacheKey);
ByteBuffer wrappedValue = ByteBuffer.wrap(value);

if (shouldExpireWithin(ttl)) {
return connection.stringCommands().set(wrappedKey, wrappedValue,
Expiration.from(ttl.toMillis(), TimeUnit.MILLISECONDS), SetOption.upsert());
} else {
return connection.stringCommands().set(wrappedKey, wrappedValue);
}
ReactiveStringCommands stringCommands = connection.stringCommands();

return shouldExpireWithin(ttl)
? stringCommands.set(wrappedKey, wrappedValue, toExpiration(ttl), SetOption.upsert())
: stringCommands.set(wrappedKey, wrappedValue);
}

private Mono<Boolean> doLockStoreUnlock(String name, byte[] key, byte[] value, @Nullable Duration ttl,
ReactiveRedisConnection connection) {

Mono<Object> lock = doLock(name, key, value, connection);

Function<Object, Mono<Boolean>> store = unused -> doStore(key, value, ttl, connection);
Function<Object, Mono<Void>> unlock = unused -> doUnlock(name, connection);

return Mono.usingWhen(lock, store, unlock);
}

private Mono<Object> doLock(String name, Object contextualKey, @Nullable Object contextualValue,
ReactiveRedisConnection connection) {

ByteBuffer key = ByteBuffer.wrap(createCacheLockKey(name));
ByteBuffer key = toCacheLockKey(name);
ByteBuffer value = ByteBuffer.wrap(new byte[0]);
Expiration expiration = Expiration.from(lockTtl.getTimeToLive(contextualKey, contextualValue));

Expiration expiration = toExpiration(contextualKey, contextualValue);

return connection.stringCommands().set(key, value, expiration, SetOption.SET_IF_ABSENT) //
// Ensure we emit an object, otherwise, the Mono.usingWhen operator doesn't run the inner resource function.
.thenReturn(Boolean.TRUE);
}

private Mono<Void> doUnlock(String name, ReactiveRedisConnection connection) {
return connection.keyCommands().del(ByteBuffer.wrap(createCacheLockKey(name))).then();
return connection.keyCommands().del(toCacheLockKey(name)).then();
}

private Mono<Void> waitForLock(ReactiveRedisConnection connection, String cacheName) {

AtomicLong lockWaitTimeNs = new AtomicLong();
byte[] cacheLockKey = createCacheLockKey(cacheName);
AtomicLong lockWaitNanoTime = new AtomicLong();

Consumer<org.reactivestreams.Subscription> setNanoTimeOnLockWait = subscription ->
lockWaitNanoTime.set(System.nanoTime());

Flux<Long> wait = Flux.interval(Duration.ZERO, sleepTime);
Mono<Boolean> exists = connection.keyCommands().exists(ByteBuffer.wrap(cacheLockKey)).filter(it -> !it);
Consumer<SignalType> recordStatistics = signalType ->
statistics.incLockTime(cacheName, System.nanoTime() - lockWaitNanoTime.get());

return wait.doOnSubscribe(subscription -> lockWaitTimeNs.set(System.nanoTime())) //
.flatMap(it -> exists) //
.doFinally(signalType -> statistics.incLockTime(cacheName, System.nanoTime() - lockWaitTimeNs.get())) //
Function<Long, Mono<Boolean>> doWhileCacheLockExists = lockWaitTime -> connection.keyCommands()
.exists(toCacheLockKey(cacheName)).filter(cacheLockKeyExists -> !cacheLockKeyExists);

return waitInterval(sleepTime) //
.doOnSubscribe(setNanoTimeOnLockWait) //
.flatMap(doWhileCacheLockExists) //
.doFinally(recordStatistics) //
.next() //
.then();
}

private Flux<Long> waitInterval(Duration period) {
return Flux.interval(Duration.ZERO, period);
}

private ByteBuffer toCacheLockKey(String cacheName) {
return ByteBuffer.wrap(createCacheLockKey(cacheName));
}

private <T> CompletableFuture<T> doWithConnection(
Function<ReactiveRedisConnection, CompletableFuture<T>> callback) {

ReactiveRedisConnectionFactory cf = (ReactiveRedisConnectionFactory) connectionFactory;
Mono<ReactiveRedisConnection> reactiveConnection =
Mono.fromSupplier(getReactiveConnectionFactory()::getReactiveConnection);

Function<ReactiveRedisConnection, Mono<T>> commandExecution = connection ->
Mono.fromCompletionStage(callback.apply(connection));

Function<ReactiveRedisConnection, Mono<Void>> connectionClose = ReactiveRedisConnection::closeLater;

Mono<T> result = Mono.usingWhen(reactiveConnection, commandExecution, connectionClose);

return Mono.usingWhen(Mono.fromSupplier(cf::getReactiveConnection), //
it -> Mono.fromCompletionStage(callback.apply(it)), //
ReactiveRedisConnection::closeLater) //
.toFuture();
return result.toFuture();
}
}
}