Skip to content

Commit

Permalink
Merge pull request #196 from swisspost/develop
Browse files Browse the repository at this point in the history
PR for release
  • Loading branch information
mcweba authored Jun 25, 2024
2 parents 67f521c + c14fc95 commit 277f7f5
Show file tree
Hide file tree
Showing 13 changed files with 321 additions and 11 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -219,9 +219,10 @@ The following configuration values are available:
| deltaEtagsPrefix | redis | delta:etags | The prefix for delta etags redis keys |
| lockPrefix | redis | rest-storage:locks | The prefix for lock redis keys |
| resourceCleanupAmount | redis | 100000 | The maximum amount of resources to clean in a single cleanup run |
| resourceCleanupIntervalSec | redis | | The interval (in seconds) how often to peform the storage cleanup. When set to _null_ no periodic storage cleanup is peformed |
| resourceCleanupIntervalSec | redis | | The interval (in seconds) how often to perform the storage cleanup. When set to _null_ no periodic storage cleanup is performed |
| rejectStorageWriteOnLowMemory | redis | false | When set to _true_, PUT requests with the x-importance-level header can be rejected when memory gets low |
| freeMemoryCheckIntervalMs | redis | 60000 | The interval in milliseconds to calculate the actual memory usage |
| redisReadyCheckIntervalMs | redis | -1 | The interval in milliseconds to calculate the "ready state" of redis. When value < 1, no "ready state" will be calculated |

### Configuration util

Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>org.swisspush</groupId>
<artifactId>rest-storage</artifactId>
<version>3.1.7-SNAPSHOT</version>
<version>3.1.8-SNAPSHOT</version>
<name>rest-storage</name>
<description>
Persistence for REST resources in the filesystem or a redis database
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ public static void main(String[] args) {
.storageType(ModuleConfiguration.StorageType.redis)
.redisReconnectAttempts(-1)
.redisPoolRecycleTimeoutMs(-1)
.redisReadyCheckIntervalMs(10000)
.resourceCleanupIntervalSec(10);

Vertx.vertx().deployVerticle(new RestStorageMod(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import io.vertx.core.http.HttpServerResponse;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.core.spi.ExecutorServiceFactory;
import io.vertx.core.streams.Pump;
import io.vertx.ext.auth.authentication.AuthenticationProvider;
import io.vertx.ext.web.Router;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.swisspush.reststorage.exception.RestStorageExceptionFactory;
import org.swisspush.reststorage.redis.DefaultRedisProvider;
import org.swisspush.reststorage.redis.RedisProvider;
import org.swisspush.reststorage.redis.RedisStorage;
import org.swisspush.reststorage.util.ModuleConfiguration;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package org.swisspush.reststorage;
package org.swisspush.reststorage.redis;

import io.vertx.core.Future;
import io.vertx.core.Promise;
Expand All @@ -11,7 +11,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.swisspush.reststorage.exception.RestStorageExceptionFactory;
import org.swisspush.reststorage.redis.RedisProvider;
import org.swisspush.reststorage.util.ModuleConfiguration;

import java.util.ArrayList;
Expand All @@ -36,26 +35,42 @@ public class DefaultRedisProvider implements RedisProvider {
private Redis redis;
private final AtomicBoolean connecting = new AtomicBoolean();
private RedisConnection client;
private RedisReadyProvider readyProvider;

private final AtomicReference<Promise<RedisAPI>> connectPromiseRef = new AtomicReference<>();

public DefaultRedisProvider(
Vertx vertx,
ModuleConfiguration configuration,
RestStorageExceptionFactory exceptionFactory
Vertx vertx,
ModuleConfiguration configuration,
RestStorageExceptionFactory exceptionFactory
) {
this.vertx = vertx;
this.configuration = configuration;
this.exceptionFactory = exceptionFactory;

maybeInitRedisReadyProvider();
}

private void maybeInitRedisReadyProvider() {
if (configuration.getRedisReadyCheckIntervalMs() > 0) {
this.readyProvider = new DefaultRedisReadyProvider(vertx, configuration.getRedisReadyCheckIntervalMs());
}
}

@Override
public Future<RedisAPI> redis() {
if (redisAPI != null) {
return Future.succeededFuture(redisAPI);
} else {
if(redisAPI == null) {
return setupRedisClient();
}
if(readyProvider == null) {
return Future.succeededFuture(redisAPI);
}
return readyProvider.ready(redisAPI).compose(ready -> {
if (ready) {
return Future.succeededFuture(redisAPI);
}
return Future.failedFuture("Not yet ready!");
});
}

private boolean reconnectEnabled() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package org.swisspush.reststorage.redis;

import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.redis.client.RedisAPI;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* Default implementation of the {@link RedisReadyProvider} based on the <code>INFO</code> command in Redis
*
* @author https://github.com/mcweba [Marc-Andre Weber]
*/
public class DefaultRedisReadyProvider implements RedisReadyProvider {

private static final Logger log = LoggerFactory.getLogger(DefaultRedisReadyProvider.class);
private static final String DELIMITER = ":";
private static final String LOADING = "loading";
final AtomicBoolean redisReady = new AtomicBoolean(true);
final AtomicBoolean updateRedisReady = new AtomicBoolean(true);

/**
* Constructor defining the "ready-state" update interval
* @param vertx
* @param updateIntervalMs interval in ms how often to update the "ready-state"
*/
public DefaultRedisReadyProvider(Vertx vertx, int updateIntervalMs) {
vertx.setPeriodic(updateIntervalMs, l -> {
updateRedisReady.set(true);
});
}

@Override
public Future<Boolean> ready(RedisAPI redisAPI) {
if(updateRedisReady.compareAndSet(true, false)){
return updateRedisReadyState(redisAPI);
}
return Future.succeededFuture(redisReady.get());
}

/**
* Call the <code>INFO</code> command in Redis with a constraint to persistence related information
*
* @param redisAPI
* @return async boolean true when Redis is ready, otherwise false
*/
public Future<Boolean> updateRedisReadyState(RedisAPI redisAPI) {
return redisAPI.info(List.of("Persistence")).compose(response -> {
boolean ready = getReadyStateFromResponse(response.toString());
redisReady.set(ready);
return Future.succeededFuture(ready);
}, throwable -> {
log.error("Error reading redis info", throwable);
redisReady.set(false);
return Future.succeededFuture(false);
});
}

/**
* Check the response having a <code>loading:0</code> entry. If so, Redis is ready. When the response contains a
* <code>loading:1</code> entry or not related entry at all, we consider Redis to be not ready
*
* @param persistenceInfo the response from Redis _INFO_ command
* @return boolean true when Redis is ready, otherwise false
*/
private boolean getReadyStateFromResponse(String persistenceInfo) {
byte loadingValue;
try {
Optional<String> loadingOpt = persistenceInfo
.lines()
.filter(source -> source.startsWith(LOADING + DELIMITER))
.findAny();
if (loadingOpt.isEmpty()) {
log.warn("No 'loading' section received from redis. Unable to calculate ready state");
return false;
}
loadingValue = Byte.parseByte(loadingOpt.get().split(DELIMITER)[1]);
if (loadingValue == 0) {
return true;
}

} catch (NumberFormatException ex) {
log.warn("Invalid 'loading' section received from redis. Unable to calculate ready state");
return false;
}

return false;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package org.swisspush.reststorage.redis;

import io.vertx.core.Future;
import io.vertx.redis.client.RedisAPI;

/**
* Provides the "ready state" of the Redis database. The connection to Redis may be already established, but Redis is not
* yet ready to be used
*
* @author https://github.com/mcweba [Marc-Andre Weber]
*/
public interface RedisReadyProvider {

/**
* Get the "ready state" of the Redis database.
*
* @param redisAPI API to access redis database
* @return An async boolean true when Redis can be used. Returns async boolean false otherwise or in case of an error
*/
Future<Boolean> ready(RedisAPI redisAPI);
}
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ public Future<Optional<Float>> calculateCurrentMemoryUsage() {
log.error("Unable to get memory information from redis",
exceptionFactory.newException("redisProvider.redis() failed", ev.cause()));
promise.complete(Optional.empty());
return;
}
var redisAPI = ev.result();
redisAPI.info(Collections.singletonList("memory"), memoryInfo -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public enum StorageType {
private int redisReconnectAttempts = 0;
private int redisReconnectDelaySec = 30;
private int redisPoolRecycleTimeoutMs = 180_000;
private int redisReadyCheckIntervalMs = -1;
private String redisPassword = null;
private String redisUser = null;
private String expirablePrefix = "rest-storage:expirable";
Expand Down Expand Up @@ -170,6 +171,11 @@ public ModuleConfiguration redisPoolRecycleTimeoutMs(int redisPoolRecycleTimeout
return this;
}

public ModuleConfiguration redisReadyCheckIntervalMs(int redisReadyCheckIntervalMs) {
this.redisReadyCheckIntervalMs = redisReadyCheckIntervalMs;
return this;
}

@Deprecated(since = "3.0.17")
public ModuleConfiguration redisAuth(String redisAuth) {
this.redisAuth = redisAuth;
Expand Down Expand Up @@ -357,6 +363,10 @@ public int getRedisPoolRecycleTimeoutMs() {
return redisPoolRecycleTimeoutMs;
}

public int getRedisReadyCheckIntervalMs() {
return redisReadyCheckIntervalMs;
}

public boolean isRedisEnableTls() {
return redisEnableTls;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package org.swisspush.reststorage.redis;

import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import io.vertx.ext.unit.junit.VertxUnitRunner;
import io.vertx.redis.client.RedisAPI;
import io.vertx.redis.client.impl.types.BulkType;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.swisspush.reststorage.util.ResourcesUtils;

import static org.mockito.Mockito.*;

/**
* Tests for the {@link DefaultRedisReadyProvider} class
*
* @author https://github.com/mcweba [Marc-Andre Weber]
*/
@RunWith(VertxUnitRunner.class)
public class DefaultRedisReadyProviderTest {

private final String REDIS_INFO_LOADING = ResourcesUtils.loadResource("redis_info_persistance_loading_1", true);
private final String REDIS_INFO_NOT_LOADING = ResourcesUtils.loadResource("redis_info_persistance_loading_0", true);

private Vertx vertx;
private RedisAPI redisAPI;
private DefaultRedisReadyProvider readyProvider;

@Before
public void setUp() {
this.vertx = Vertx.vertx();
redisAPI = Mockito.mock(RedisAPI.class);
readyProvider = new DefaultRedisReadyProvider(vertx, 1000);
}

private void assertReadiness(TestContext testContext, AsyncResult<Boolean> event, Boolean expectedReadiness) {
testContext.assertTrue(event.succeeded());
testContext.assertEquals(expectedReadiness, event.result());
}

@Test
public void testRedisReady(TestContext testContext) {
Async async = testContext.async();
Mockito.when(redisAPI.info(any())).thenReturn(Future.succeededFuture(BulkType.create(Buffer.buffer(REDIS_INFO_NOT_LOADING), false)));

readyProvider.ready(redisAPI).onComplete(event -> {
assertReadiness(testContext, event, true);
async.complete();
});
}

@Test
public void testRedisReadyMultipleCalls(TestContext testContext) {
Async async = testContext.async();
Mockito.when(redisAPI.info(any())).thenReturn(Future.succeededFuture(BulkType.create(Buffer.buffer(REDIS_INFO_NOT_LOADING), false)));

readyProvider.ready(redisAPI).onComplete(event -> {
assertReadiness(testContext, event, true);
readyProvider.ready(redisAPI).onComplete(event2 -> {
assertReadiness(testContext, event2, true);
async.complete();
});
});

verify(redisAPI, times(1)).info(any());
}

@Test
public void testRedisNotReady(TestContext testContext) {
Async async = testContext.async();
Mockito.when(redisAPI.info(any())).thenReturn(Future.succeededFuture(BulkType.create(Buffer.buffer(REDIS_INFO_LOADING), false)));

readyProvider.ready(redisAPI).onComplete(event -> {
assertReadiness(testContext, event, false);
async.complete();
});
}

@Test
public void testRedisNotReadyInvalidInfoResponse(TestContext testContext) {
Async async = testContext.async();
Mockito.when(redisAPI.info(any())).thenReturn(Future.succeededFuture(BulkType.create(Buffer.buffer("some invalid info response"), false)));

readyProvider.ready(redisAPI).onComplete(event -> {
assertReadiness(testContext, event, false);
async.complete();
});
}

@Test
public void testRedisNotReadyExceptionWhenAccessingRedisAPI(TestContext testContext) {
Async async = testContext.async();
Mockito.when(redisAPI.info(any())).thenReturn(Future.failedFuture("Boooom"));

readyProvider.ready(redisAPI).onComplete(event -> {
assertReadiness(testContext, event, false);
async.complete();
});
}
}
Loading

0 comments on commit 277f7f5

Please sign in to comment.