diff --git a/README.md b/README.md index d860fff..6e99821 100644 --- a/README.md +++ b/README.md @@ -219,9 +219,10 @@ The following configuration values are available: | deltaEtagsPrefix | redis | delta:etags | The prefix for delta etags redis keys | | lockPrefix | redis | rest-storage:locks | The prefix for lock redis keys | | resourceCleanupAmount | redis | 100000 | The maximum amount of resources to clean in a single cleanup run | -| resourceCleanupIntervalSec | redis | | The interval (in seconds) how often to peform the storage cleanup. When set to _null_ no periodic storage cleanup is peformed | +| resourceCleanupIntervalSec | redis | | The interval (in seconds) how often to perform the storage cleanup. When set to _null_ no periodic storage cleanup is performed | | rejectStorageWriteOnLowMemory | redis | false | When set to _true_, PUT requests with the x-importance-level header can be rejected when memory gets low | | freeMemoryCheckIntervalMs | redis | 60000 | The interval in milliseconds to calculate the actual memory usage | +| redisReadyCheckIntervalMs | redis | -1 | The interval in milliseconds to calculate the "ready state" of redis. When value < 1, no "ready state" will be calculated | ### Configuration util diff --git a/src/main/java/org/swisspush/reststorage/RedisRestStorageRunner.java b/src/main/java/org/swisspush/reststorage/RedisRestStorageRunner.java index 2a91874..f84dfbc 100644 --- a/src/main/java/org/swisspush/reststorage/RedisRestStorageRunner.java +++ b/src/main/java/org/swisspush/reststorage/RedisRestStorageRunner.java @@ -20,6 +20,7 @@ public static void main(String[] args) { .storageType(ModuleConfiguration.StorageType.redis) .redisReconnectAttempts(-1) .redisPoolRecycleTimeoutMs(-1) + .redisReadyCheckIntervalMs(10000) .resourceCleanupIntervalSec(10); Vertx.vertx().deployVerticle(new RestStorageMod(), diff --git a/src/main/java/org/swisspush/reststorage/RestStorageHandler.java b/src/main/java/org/swisspush/reststorage/RestStorageHandler.java index 95e1882..45c7332 100644 --- a/src/main/java/org/swisspush/reststorage/RestStorageHandler.java +++ b/src/main/java/org/swisspush/reststorage/RestStorageHandler.java @@ -8,7 +8,6 @@ import io.vertx.core.http.HttpServerResponse; import io.vertx.core.json.JsonArray; import io.vertx.core.json.JsonObject; -import io.vertx.core.spi.ExecutorServiceFactory; import io.vertx.core.streams.Pump; import io.vertx.ext.auth.authentication.AuthenticationProvider; import io.vertx.ext.web.Router; diff --git a/src/main/java/org/swisspush/reststorage/RestStorageMod.java b/src/main/java/org/swisspush/reststorage/RestStorageMod.java index baa6713..fccfb73 100644 --- a/src/main/java/org/swisspush/reststorage/RestStorageMod.java +++ b/src/main/java/org/swisspush/reststorage/RestStorageMod.java @@ -6,6 +6,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.swisspush.reststorage.exception.RestStorageExceptionFactory; +import org.swisspush.reststorage.redis.DefaultRedisProvider; import org.swisspush.reststorage.redis.RedisProvider; import org.swisspush.reststorage.redis.RedisStorage; import org.swisspush.reststorage.util.ModuleConfiguration; diff --git a/src/main/java/org/swisspush/reststorage/DefaultRedisProvider.java b/src/main/java/org/swisspush/reststorage/redis/DefaultRedisProvider.java similarity index 90% rename from src/main/java/org/swisspush/reststorage/DefaultRedisProvider.java rename to src/main/java/org/swisspush/reststorage/redis/DefaultRedisProvider.java index f8ca4d6..062403d 100644 --- a/src/main/java/org/swisspush/reststorage/DefaultRedisProvider.java +++ b/src/main/java/org/swisspush/reststorage/redis/DefaultRedisProvider.java @@ -1,4 +1,4 @@ -package org.swisspush.reststorage; +package org.swisspush.reststorage.redis; import io.vertx.core.Future; import io.vertx.core.Promise; @@ -11,7 +11,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.swisspush.reststorage.exception.RestStorageExceptionFactory; -import org.swisspush.reststorage.redis.RedisProvider; import org.swisspush.reststorage.util.ModuleConfiguration; import java.util.ArrayList; @@ -36,26 +35,42 @@ public class DefaultRedisProvider implements RedisProvider { private Redis redis; private final AtomicBoolean connecting = new AtomicBoolean(); private RedisConnection client; + private RedisReadyProvider readyProvider; private final AtomicReference> connectPromiseRef = new AtomicReference<>(); public DefaultRedisProvider( - Vertx vertx, - ModuleConfiguration configuration, - RestStorageExceptionFactory exceptionFactory + Vertx vertx, + ModuleConfiguration configuration, + RestStorageExceptionFactory exceptionFactory ) { this.vertx = vertx; this.configuration = configuration; this.exceptionFactory = exceptionFactory; + + maybeInitRedisReadyProvider(); + } + + private void maybeInitRedisReadyProvider() { + if (configuration.getRedisReadyCheckIntervalMs() > 0) { + this.readyProvider = new DefaultRedisReadyProvider(vertx, configuration.getRedisReadyCheckIntervalMs()); + } } @Override public Future redis() { - if (redisAPI != null) { - return Future.succeededFuture(redisAPI); - } else { + if(redisAPI == null) { return setupRedisClient(); } + if(readyProvider == null) { + return Future.succeededFuture(redisAPI); + } + return readyProvider.ready(redisAPI).compose(ready -> { + if (ready) { + return Future.succeededFuture(redisAPI); + } + return Future.failedFuture("Not yet ready!"); + }); } private boolean reconnectEnabled() { diff --git a/src/main/java/org/swisspush/reststorage/redis/DefaultRedisReadyProvider.java b/src/main/java/org/swisspush/reststorage/redis/DefaultRedisReadyProvider.java new file mode 100644 index 0000000..1d3d6d0 --- /dev/null +++ b/src/main/java/org/swisspush/reststorage/redis/DefaultRedisReadyProvider.java @@ -0,0 +1,93 @@ +package org.swisspush.reststorage.redis; + +import io.vertx.core.Future; +import io.vertx.core.Vertx; +import io.vertx.redis.client.RedisAPI; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Default implementation of the {@link RedisReadyProvider} based on the INFO command in Redis + * + * @author https://github.com/mcweba [Marc-Andre Weber] + */ +public class DefaultRedisReadyProvider implements RedisReadyProvider { + + private static final Logger log = LoggerFactory.getLogger(DefaultRedisReadyProvider.class); + private static final String DELIMITER = ":"; + private static final String LOADING = "loading"; + final AtomicBoolean redisReady = new AtomicBoolean(true); + final AtomicBoolean updateRedisReady = new AtomicBoolean(true); + + /** + * Constructor defining the "ready-state" update interval + * @param vertx + * @param updateIntervalMs interval in ms how often to update the "ready-state" + */ + public DefaultRedisReadyProvider(Vertx vertx, int updateIntervalMs) { + vertx.setPeriodic(updateIntervalMs, l -> { + updateRedisReady.set(true); + }); + } + + @Override + public Future ready(RedisAPI redisAPI) { + if(updateRedisReady.compareAndSet(true, false)){ + return updateRedisReadyState(redisAPI); + } + return Future.succeededFuture(redisReady.get()); + } + + /** + * Call the INFO command in Redis with a constraint to persistence related information + * + * @param redisAPI + * @return async boolean true when Redis is ready, otherwise false + */ + public Future updateRedisReadyState(RedisAPI redisAPI) { + return redisAPI.info(List.of("Persistence")).compose(response -> { + boolean ready = getReadyStateFromResponse(response.toString()); + redisReady.set(ready); + return Future.succeededFuture(ready); + }, throwable -> { + log.error("Error reading redis info", throwable); + redisReady.set(false); + return Future.succeededFuture(false); + }); + } + + /** + * Check the response having a loading:0 entry. If so, Redis is ready. When the response contains a + * loading:1 entry or not related entry at all, we consider Redis to be not ready + * + * @param persistenceInfo the response from Redis _INFO_ command + * @return boolean true when Redis is ready, otherwise false + */ + private boolean getReadyStateFromResponse(String persistenceInfo) { + byte loadingValue; + try { + Optional loadingOpt = persistenceInfo + .lines() + .filter(source -> source.startsWith(LOADING + DELIMITER)) + .findAny(); + if (loadingOpt.isEmpty()) { + log.warn("No 'loading' section received from redis. Unable to calculate ready state"); + return false; + } + loadingValue = Byte.parseByte(loadingOpt.get().split(DELIMITER)[1]); + if (loadingValue == 0) { + return true; + } + + } catch (NumberFormatException ex) { + log.warn("Invalid 'loading' section received from redis. Unable to calculate ready state"); + return false; + } + + return false; + } +} diff --git a/src/main/java/org/swisspush/reststorage/redis/RedisReadyProvider.java b/src/main/java/org/swisspush/reststorage/redis/RedisReadyProvider.java new file mode 100644 index 0000000..62fd56a --- /dev/null +++ b/src/main/java/org/swisspush/reststorage/redis/RedisReadyProvider.java @@ -0,0 +1,21 @@ +package org.swisspush.reststorage.redis; + +import io.vertx.core.Future; +import io.vertx.redis.client.RedisAPI; + +/** + * Provides the "ready state" of the Redis database. The connection to Redis may be already established, but Redis is not + * yet ready to be used + * + * @author https://github.com/mcweba [Marc-Andre Weber] + */ +public interface RedisReadyProvider { + + /** + * Get the "ready state" of the Redis database. + * + * @param redisAPI API to access redis database + * @return An async boolean true when Redis can be used. Returns async boolean false otherwise or in case of an error + */ + Future ready(RedisAPI redisAPI); +} diff --git a/src/main/java/org/swisspush/reststorage/redis/RedisStorage.java b/src/main/java/org/swisspush/reststorage/redis/RedisStorage.java index 447fdba..d5384c8 100644 --- a/src/main/java/org/swisspush/reststorage/redis/RedisStorage.java +++ b/src/main/java/org/swisspush/reststorage/redis/RedisStorage.java @@ -213,6 +213,7 @@ public Future> calculateCurrentMemoryUsage() { log.error("Unable to get memory information from redis", exceptionFactory.newException("redisProvider.redis() failed", ev.cause())); promise.complete(Optional.empty()); + return; } var redisAPI = ev.result(); redisAPI.info(Collections.singletonList("memory"), memoryInfo -> { diff --git a/src/main/java/org/swisspush/reststorage/util/ModuleConfiguration.java b/src/main/java/org/swisspush/reststorage/util/ModuleConfiguration.java index 5e5604f..2e13921 100644 --- a/src/main/java/org/swisspush/reststorage/util/ModuleConfiguration.java +++ b/src/main/java/org/swisspush/reststorage/util/ModuleConfiguration.java @@ -50,6 +50,7 @@ public enum StorageType { private int redisReconnectAttempts = 0; private int redisReconnectDelaySec = 30; private int redisPoolRecycleTimeoutMs = 180_000; + private int redisReadyCheckIntervalMs = -1; private String redisPassword = null; private String redisUser = null; private String expirablePrefix = "rest-storage:expirable"; @@ -170,6 +171,11 @@ public ModuleConfiguration redisPoolRecycleTimeoutMs(int redisPoolRecycleTimeout return this; } + public ModuleConfiguration redisReadyCheckIntervalMs(int redisReadyCheckIntervalMs) { + this.redisReadyCheckIntervalMs = redisReadyCheckIntervalMs; + return this; + } + @Deprecated(since = "3.0.17") public ModuleConfiguration redisAuth(String redisAuth) { this.redisAuth = redisAuth; @@ -357,6 +363,10 @@ public int getRedisPoolRecycleTimeoutMs() { return redisPoolRecycleTimeoutMs; } + public int getRedisReadyCheckIntervalMs() { + return redisReadyCheckIntervalMs; + } + public boolean isRedisEnableTls() { return redisEnableTls; } diff --git a/src/test/java/org/swisspush/reststorage/redis/DefaultRedisReadyProviderTest.java b/src/test/java/org/swisspush/reststorage/redis/DefaultRedisReadyProviderTest.java new file mode 100644 index 0000000..a888d30 --- /dev/null +++ b/src/test/java/org/swisspush/reststorage/redis/DefaultRedisReadyProviderTest.java @@ -0,0 +1,106 @@ +package org.swisspush.reststorage.redis; + +import io.vertx.core.AsyncResult; +import io.vertx.core.Future; +import io.vertx.core.Vertx; +import io.vertx.core.buffer.Buffer; +import io.vertx.ext.unit.Async; +import io.vertx.ext.unit.TestContext; +import io.vertx.ext.unit.junit.VertxUnitRunner; +import io.vertx.redis.client.RedisAPI; +import io.vertx.redis.client.impl.types.BulkType; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mockito; +import org.swisspush.reststorage.util.ResourcesUtils; + +import static org.mockito.Mockito.*; + +/** + * Tests for the {@link DefaultRedisReadyProvider} class + * + * @author https://github.com/mcweba [Marc-Andre Weber] + */ +@RunWith(VertxUnitRunner.class) +public class DefaultRedisReadyProviderTest { + + private final String REDIS_INFO_LOADING = ResourcesUtils.loadResource("redis_info_persistance_loading_1", true); + private final String REDIS_INFO_NOT_LOADING = ResourcesUtils.loadResource("redis_info_persistance_loading_0", true); + + private Vertx vertx; + private RedisAPI redisAPI; + private DefaultRedisReadyProvider readyProvider; + + @Before + public void setUp() { + this.vertx = Vertx.vertx(); + redisAPI = Mockito.mock(RedisAPI.class); + readyProvider = new DefaultRedisReadyProvider(vertx, 1000); + } + + private void assertReadiness(TestContext testContext, AsyncResult event, Boolean expectedReadiness) { + testContext.assertTrue(event.succeeded()); + testContext.assertEquals(expectedReadiness, event.result()); + } + + @Test + public void testRedisReady(TestContext testContext) { + Async async = testContext.async(); + Mockito.when(redisAPI.info(any())).thenReturn(Future.succeededFuture(BulkType.create(Buffer.buffer(REDIS_INFO_NOT_LOADING), false))); + + readyProvider.ready(redisAPI).onComplete(event -> { + assertReadiness(testContext, event, true); + async.complete(); + }); + } + + @Test + public void testRedisReadyMultipleCalls(TestContext testContext) { + Async async = testContext.async(); + Mockito.when(redisAPI.info(any())).thenReturn(Future.succeededFuture(BulkType.create(Buffer.buffer(REDIS_INFO_NOT_LOADING), false))); + + readyProvider.ready(redisAPI).onComplete(event -> { + assertReadiness(testContext, event, true); + readyProvider.ready(redisAPI).onComplete(event2 -> { + assertReadiness(testContext, event2, true); + async.complete(); + }); + }); + + verify(redisAPI, times(1)).info(any()); + } + + @Test + public void testRedisNotReady(TestContext testContext) { + Async async = testContext.async(); + Mockito.when(redisAPI.info(any())).thenReturn(Future.succeededFuture(BulkType.create(Buffer.buffer(REDIS_INFO_LOADING), false))); + + readyProvider.ready(redisAPI).onComplete(event -> { + assertReadiness(testContext, event, false); + async.complete(); + }); + } + + @Test + public void testRedisNotReadyInvalidInfoResponse(TestContext testContext) { + Async async = testContext.async(); + Mockito.when(redisAPI.info(any())).thenReturn(Future.succeededFuture(BulkType.create(Buffer.buffer("some invalid info response"), false))); + + readyProvider.ready(redisAPI).onComplete(event -> { + assertReadiness(testContext, event, false); + async.complete(); + }); + } + + @Test + public void testRedisNotReadyExceptionWhenAccessingRedisAPI(TestContext testContext) { + Async async = testContext.async(); + Mockito.when(redisAPI.info(any())).thenReturn(Future.failedFuture("Boooom")); + + readyProvider.ready(redisAPI).onComplete(event -> { + assertReadiness(testContext, event, false); + async.complete(); + }); + } +} diff --git a/src/test/resources/redis_info_persistance_loading_0 b/src/test/resources/redis_info_persistance_loading_0 new file mode 100644 index 0000000..2521abd --- /dev/null +++ b/src/test/resources/redis_info_persistance_loading_0 @@ -0,0 +1,31 @@ +# Persistence +loading:0 +async_loading:0 +current_cow_peak:0 +current_cow_size:0 +current_cow_size_age:0 +current_fork_perc:0.00 +current_save_keys_processed:0 +current_save_keys_total:0 +rdb_changes_since_last_save:108 +rdb_bgsave_in_progress:0 +rdb_last_save_time:1718713249 +rdb_last_bgsave_status:ok +rdb_last_bgsave_time_sec:0 +rdb_current_bgsave_time_sec:-1 +rdb_saves:296 +rdb_last_cow_size:0 +rdb_last_load_keys_expired:0 +rdb_last_load_keys_loaded:0 +aof_enabled:0 +aof_rewrite_in_progress:0 +aof_rewrite_scheduled:0 +aof_last_rewrite_time_sec:-1 +aof_current_rewrite_time_sec:-1 +aof_last_bgrewrite_status:ok +aof_rewrites:0 +aof_rewrites_consecutive_failures:0 +aof_last_write_status:ok +aof_last_cow_size:0 +module_fork_in_progress:0 +module_fork_last_cow_size:0 diff --git a/src/test/resources/redis_info_persistance_loading_1 b/src/test/resources/redis_info_persistance_loading_1 new file mode 100644 index 0000000..de55047 --- /dev/null +++ b/src/test/resources/redis_info_persistance_loading_1 @@ -0,0 +1,31 @@ +# Persistence +loading:1 +async_loading:0 +current_cow_peak:0 +current_cow_size:0 +current_cow_size_age:0 +current_fork_perc:0.00 +current_save_keys_processed:0 +current_save_keys_total:0 +rdb_changes_since_last_save:108 +rdb_bgsave_in_progress:0 +rdb_last_save_time:1718713249 +rdb_last_bgsave_status:ok +rdb_last_bgsave_time_sec:0 +rdb_current_bgsave_time_sec:-1 +rdb_saves:296 +rdb_last_cow_size:0 +rdb_last_load_keys_expired:0 +rdb_last_load_keys_loaded:0 +aof_enabled:0 +aof_rewrite_in_progress:0 +aof_rewrite_scheduled:0 +aof_last_rewrite_time_sec:-1 +aof_current_rewrite_time_sec:-1 +aof_last_bgrewrite_status:ok +aof_rewrites:0 +aof_rewrites_consecutive_failures:0 +aof_last_write_status:ok +aof_last_cow_size:0 +module_fork_in_progress:0 +module_fork_last_cow_size:0