Skip to content

Commit

Permalink
Merge branch '6.1.x'
Browse files Browse the repository at this point in the history
  • Loading branch information
simonbasle committed Jul 2, 2024
2 parents a8e8897 + 8974da2 commit 78d594c
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,10 @@ private Object findInCaches(CacheOperationContext context, Object key,
if (CompletableFuture.class.isAssignableFrom(context.getMethod().getReturnType())) {
CompletableFuture<?> result = cache.retrieve(key);
if (result != null) {
return result.thenCompose(value -> (CompletableFuture<?>) evaluate(
return result.exceptionally(ex -> {
getErrorHandler().handleCacheGetError((RuntimeException) ex, cache, key);
return null;
}).thenCompose(value -> (CompletableFuture<?>) evaluate(
(value != null ? CompletableFuture.completedFuture(unwrapCacheValue(value)) : null),
invoker, method, contexts));
}
Expand Down Expand Up @@ -1136,12 +1139,30 @@ public Object findInCaches(CacheOperationContext context, Cache cache, Object ke
if (adapter.isMultiValue()) {
return adapter.fromPublisher(Flux.from(Mono.fromFuture(cachedFuture))
.switchIfEmpty(Flux.defer(() -> (Flux) evaluate(null, invoker, method, contexts)))
.flatMap(v -> evaluate(valueToFlux(v, contexts), invoker, method, contexts)));
.flatMap(v -> evaluate(valueToFlux(v, contexts), invoker, method, contexts))
.onErrorResume(RuntimeException.class, ex -> {
try {
getErrorHandler().handleCacheGetError((RuntimeException) ex, cache, key);
return evaluate(null, invoker, method, contexts);
}
catch (RuntimeException exception) {
return Flux.error(exception);
}
}));
}
else {
return adapter.fromPublisher(Mono.fromFuture(cachedFuture)
.switchIfEmpty(Mono.defer(() -> (Mono) evaluate(null, invoker, method, contexts)))
.flatMap(v -> evaluate(Mono.justOrEmpty(unwrapCacheValue(v)), invoker, method, contexts)));
.flatMap(v -> evaluate(Mono.justOrEmpty(unwrapCacheValue(v)), invoker, method, contexts))
.onErrorResume(RuntimeException.class, ex -> {
try {
getErrorHandler().handleCacheGetError((RuntimeException) ex, cache, key);
return evaluate(null, invoker, method, contexts);
}
catch (RuntimeException exception) {
return Mono.error(exception);
}
}));
}
}
return NOT_HANDLED;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.atomic.AtomicLong;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import reactor.core.publisher.Flux;
Expand All @@ -29,12 +31,15 @@
import org.springframework.cache.CacheManager;
import org.springframework.cache.concurrent.ConcurrentMapCache;
import org.springframework.cache.concurrent.ConcurrentMapCacheManager;
import org.springframework.cache.interceptor.CacheErrorHandler;
import org.springframework.cache.interceptor.LoggingCacheErrorHandler;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.lang.Nullable;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.AssertionsForClassTypes.catchThrowable;

/**
* Tests for annotation-based caching methods that use reactive operators.
Expand Down Expand Up @@ -113,6 +118,51 @@ void cacheHitDetermination(Class<?> configClass) {
ctx.close();
}

@Test
void cacheErrorHandlerWithLoggingCacheErrorHandler() {
AnnotationConfigApplicationContext ctx =
new AnnotationConfigApplicationContext(ExceptionCacheManager.class, ReactiveCacheableService.class, ErrorHandlerCachingConfiguration.class);
ReactiveCacheableService service = ctx.getBean(ReactiveCacheableService.class);

Object key = new Object();
Long r1 = service.cacheFuture(key).join();

assertThat(r1).isNotNull();
assertThat(r1).as("cacheFuture").isEqualTo(0L);

key = new Object();

r1 = service.cacheMono(key).block();

assertThat(r1).isNotNull();
assertThat(r1).as("cacheMono").isEqualTo(1L);

key = new Object();

r1 = service.cacheFlux(key).blockFirst();

assertThat(r1).isNotNull();
assertThat(r1).as("cacheFlux blockFirst").isEqualTo(2L);
}

@Test
void cacheErrorHandlerWithSimpleCacheErrorHandler() {
AnnotationConfigApplicationContext ctx =
new AnnotationConfigApplicationContext(ExceptionCacheManager.class, ReactiveCacheableService.class);
ReactiveCacheableService service = ctx.getBean(ReactiveCacheableService.class);

Throwable completableFuturThrowable = catchThrowable(() -> service.cacheFuture(new Object()).join());
assertThat(completableFuturThrowable).isInstanceOf(CompletionException.class)
.extracting(Throwable::getCause)
.isInstanceOf(UnsupportedOperationException.class);

Throwable monoThrowable = catchThrowable(() -> service.cacheMono(new Object()).block());
assertThat(monoThrowable).isInstanceOf(UnsupportedOperationException.class);

Throwable fluxThrowable = catchThrowable(() -> service.cacheFlux(new Object()).blockFirst());
assertThat(fluxThrowable).isInstanceOf(UnsupportedOperationException.class);
}

@ParameterizedTest
@ValueSource(classes = {EarlyCacheHitDeterminationConfig.class,
EarlyCacheHitDeterminationWithoutNullValuesConfig.class,
Expand All @@ -139,7 +189,6 @@ void fluxCacheDoesntDependOnFirstRequest(Class<?> configClass) {
ctx.close();
}


@CacheConfig(cacheNames = "first")
static class ReactiveCacheableService {

Expand Down Expand Up @@ -242,4 +291,41 @@ public void put(Object key, @Nullable Object value) {
}
}

@Configuration
static class ErrorHandlerCachingConfiguration implements CachingConfigurer {

@Bean
@Override
public CacheErrorHandler errorHandler() {
return new LoggingCacheErrorHandler();
}
}

@Configuration(proxyBeanMethods = false)
@EnableCaching
static class ExceptionCacheManager {

@Bean
CacheManager cacheManager() {
return new ConcurrentMapCacheManager("first") {
@Override
protected Cache createConcurrentMapCache(String name) {
return new ConcurrentMapCache(name, isAllowNullValues()) {
@Override
public CompletableFuture<?> retrieve(Object key) {
return CompletableFuture.supplyAsync(() -> {
throw new UnsupportedOperationException("Test exception on retrieve");
});
}

@Override
public void put(Object key, @Nullable Object value) {
throw new UnsupportedOperationException("Test exception on put");
}
};
}
};
}
}

}

0 comments on commit 78d594c

Please sign in to comment.