diff --git a/spring-context/src/main/java/org/springframework/cache/interceptor/CacheAspectSupport.java b/spring-context/src/main/java/org/springframework/cache/interceptor/CacheAspectSupport.java index b77b8ca6f80d..ff01be10870b 100644 --- a/spring-context/src/main/java/org/springframework/cache/interceptor/CacheAspectSupport.java +++ b/spring-context/src/main/java/org/springframework/cache/interceptor/CacheAspectSupport.java @@ -508,7 +508,10 @@ private Object findInCaches(CacheOperationContext context, Object key, if (CompletableFuture.class.isAssignableFrom(context.getMethod().getReturnType())) { CompletableFuture result = cache.retrieve(key); if (result != null) { - return result.thenCompose(value -> (CompletableFuture) evaluate( + return result.exceptionally(ex -> { + getErrorHandler().handleCacheGetError((RuntimeException) ex, cache, key); + return null; + }).thenCompose(value -> (CompletableFuture) evaluate( (value != null ? CompletableFuture.completedFuture(unwrapCacheValue(value)) : null), invoker, method, contexts)); } @@ -1136,12 +1139,30 @@ public Object findInCaches(CacheOperationContext context, Cache cache, Object ke if (adapter.isMultiValue()) { return adapter.fromPublisher(Flux.from(Mono.fromFuture(cachedFuture)) .switchIfEmpty(Flux.defer(() -> (Flux) evaluate(null, invoker, method, contexts))) - .flatMap(v -> evaluate(valueToFlux(v, contexts), invoker, method, contexts))); + .flatMap(v -> evaluate(valueToFlux(v, contexts), invoker, method, contexts)) + .onErrorResume(RuntimeException.class, ex -> { + try { + getErrorHandler().handleCacheGetError((RuntimeException) ex, cache, key); + return evaluate(null, invoker, method, contexts); + } + catch (RuntimeException exception) { + return Flux.error(exception); + } + })); } else { return adapter.fromPublisher(Mono.fromFuture(cachedFuture) .switchIfEmpty(Mono.defer(() -> (Mono) evaluate(null, invoker, method, contexts))) - .flatMap(v -> evaluate(Mono.justOrEmpty(unwrapCacheValue(v)), invoker, method, contexts))); + .flatMap(v -> evaluate(Mono.justOrEmpty(unwrapCacheValue(v)), invoker, method, contexts)) + .onErrorResume(RuntimeException.class, ex -> { + try { + getErrorHandler().handleCacheGetError((RuntimeException) ex, cache, key); + return evaluate(null, invoker, method, contexts); + } + catch (RuntimeException exception) { + return Mono.error(exception); + } + })); } } return NOT_HANDLED; diff --git a/spring-context/src/test/java/org/springframework/cache/annotation/ReactiveCachingTests.java b/spring-context/src/test/java/org/springframework/cache/annotation/ReactiveCachingTests.java index 06130951af90..6adf6a2ebf0e 100644 --- a/spring-context/src/test/java/org/springframework/cache/annotation/ReactiveCachingTests.java +++ b/spring-context/src/test/java/org/springframework/cache/annotation/ReactiveCachingTests.java @@ -18,8 +18,10 @@ import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.atomic.AtomicLong; +import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; import reactor.core.publisher.Flux; @@ -29,12 +31,15 @@ import org.springframework.cache.CacheManager; import org.springframework.cache.concurrent.ConcurrentMapCache; import org.springframework.cache.concurrent.ConcurrentMapCacheManager; +import org.springframework.cache.interceptor.CacheErrorHandler; +import org.springframework.cache.interceptor.LoggingCacheErrorHandler; import org.springframework.context.annotation.AnnotationConfigApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.lang.Nullable; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.AssertionsForClassTypes.catchThrowable; /** * Tests for annotation-based caching methods that use reactive operators. @@ -113,6 +118,51 @@ void cacheHitDetermination(Class configClass) { ctx.close(); } + @Test + void cacheErrorHandlerWithLoggingCacheErrorHandler() { + AnnotationConfigApplicationContext ctx = + new AnnotationConfigApplicationContext(ExceptionCacheManager.class, ReactiveCacheableService.class, ErrorHandlerCachingConfiguration.class); + ReactiveCacheableService service = ctx.getBean(ReactiveCacheableService.class); + + Object key = new Object(); + Long r1 = service.cacheFuture(key).join(); + + assertThat(r1).isNotNull(); + assertThat(r1).as("cacheFuture").isEqualTo(0L); + + key = new Object(); + + r1 = service.cacheMono(key).block(); + + assertThat(r1).isNotNull(); + assertThat(r1).as("cacheMono").isEqualTo(1L); + + key = new Object(); + + r1 = service.cacheFlux(key).blockFirst(); + + assertThat(r1).isNotNull(); + assertThat(r1).as("cacheFlux blockFirst").isEqualTo(2L); + } + + @Test + void cacheErrorHandlerWithSimpleCacheErrorHandler() { + AnnotationConfigApplicationContext ctx = + new AnnotationConfigApplicationContext(ExceptionCacheManager.class, ReactiveCacheableService.class); + ReactiveCacheableService service = ctx.getBean(ReactiveCacheableService.class); + + Throwable completableFuturThrowable = catchThrowable(() -> service.cacheFuture(new Object()).join()); + assertThat(completableFuturThrowable).isInstanceOf(CompletionException.class) + .extracting(Throwable::getCause) + .isInstanceOf(UnsupportedOperationException.class); + + Throwable monoThrowable = catchThrowable(() -> service.cacheMono(new Object()).block()); + assertThat(monoThrowable).isInstanceOf(UnsupportedOperationException.class); + + Throwable fluxThrowable = catchThrowable(() -> service.cacheFlux(new Object()).blockFirst()); + assertThat(fluxThrowable).isInstanceOf(UnsupportedOperationException.class); + } + @ParameterizedTest @ValueSource(classes = {EarlyCacheHitDeterminationConfig.class, EarlyCacheHitDeterminationWithoutNullValuesConfig.class, @@ -139,7 +189,6 @@ void fluxCacheDoesntDependOnFirstRequest(Class configClass) { ctx.close(); } - @CacheConfig(cacheNames = "first") static class ReactiveCacheableService { @@ -242,4 +291,41 @@ public void put(Object key, @Nullable Object value) { } } + @Configuration + static class ErrorHandlerCachingConfiguration implements CachingConfigurer { + + @Bean + @Override + public CacheErrorHandler errorHandler() { + return new LoggingCacheErrorHandler(); + } + } + + @Configuration(proxyBeanMethods = false) + @EnableCaching + static class ExceptionCacheManager { + + @Bean + CacheManager cacheManager() { + return new ConcurrentMapCacheManager("first") { + @Override + protected Cache createConcurrentMapCache(String name) { + return new ConcurrentMapCache(name, isAllowNullValues()) { + @Override + public CompletableFuture retrieve(Object key) { + return CompletableFuture.supplyAsync(() -> { + throw new UnsupportedOperationException("Test exception on retrieve"); + }); + } + + @Override + public void put(Object key, @Nullable Object value) { + throw new UnsupportedOperationException("Test exception on put"); + } + }; + } + }; + } + } + }