diff --git a/docs/advanced-usage.md b/docs/advanced-usage.md index 19e8c2b9d..057922ab6 100644 --- a/docs/advanced-usage.md +++ b/docs/advanced-usage.md @@ -327,7 +327,7 @@ client.setOptions(ClientOptions.builder() PING before activating connection -pingBefor eActivateConnection +pingBeforeActivateConnection true @@ -362,8 +362,21 @@ queued commands.

refuse commands and cancel these with an exception.

+Replay filter +replayFilter +(cmd) -> false + + +

Since: 6.6

+

Controls which commands are to be filtered out in case the driver +attempts to reconnect to the server. Returning false means +that the command would not be filtered out.

+

This flag has no effect in case the autoReconnect feature is not +enabled.

+ + Cancel commands on reconnect failure -cancelCommand sOnReconnectFailure +cancelCommandsOnReconnectFailure false @@ -486,7 +499,7 @@ store/trust store.

Timeout Options timeoutOptions -Do n ot timeout commands. +Do not timeout commands.

Since: 5.1

@@ -550,7 +563,7 @@ client.setOptions(ClusterClientOptions.builder() Periodic cluster topology refresh -en ablePeriodicRefresh +enablePeriodicRefresh false @@ -2399,14 +2412,14 @@ independent connections to Redis. Lettuce provides two levels of consistency; these are the rules for Redis command sends: -Depending on the chosen consistency level: +#### Depending on the chosen consistency level -- **at-most-once execution**, i. e. no guaranteed execution +- **at-most-once execution**, i.e. no guaranteed execution -- **at-least-once execution**, i. e. guaranteed execution (with [some +- **at-least-once execution**, i.e. guaranteed execution (with [some exceptions](#exceptions-to-at-least-once)) -Always: +#### Always - command ordering in the order of invocations @@ -2602,9 +2615,44 @@ re-established, queued commands are re-sent for execution. While a connection failure persists, issued commands are buffered. To change into *at-most-once* consistency level, disable auto-reconnect -mode. Connections cannot be longer reconnected and thus no retries are -issued. Not successfully commands are canceled. New commands are -rejected. +mode. Connections can no longer be reconnected and thus no retries are +issued. Unsuccessful commands are canceled. New commands are rejected. + +#### Controlling replay of commands in *at-lease-once* mode + +!!! NOTE + This feature is only available since Lettuce 6.6 + +One can achieve a more fine-grained control over the commands that are +replayed after a reconnection by using the option to specify a filter +predicate. This option is part of the ClientOptions configuration. See +[Client Options](advanced-usage.md#client-options) for further reference. + +``` java +Predicate > filter = cmd -> + cmd.getType().toString().equalsIgnoreCase("DECR"); + +client.setOptions(ClientOptions.builder() + .autoReconnect(true) + .replayFilter(filter) + .build()); +``` + +The code above would filter out all `DECR` commands from being replayed +after a reconnection. Another, perhaps more popular example, would be: + +``` java +Predicate > filter = cmd -> true; + +client.setOptions(ClientOptions.builder() + .autoReconnect(true) + .replayFilter(filter) + .build()); +``` + +... which disables any command replay, but still allows the driver to +re-connect, basically providing a way to have auto-reconnect without +auto-replay of commands. ### Clustered operations diff --git a/src/main/java/io/lettuce/core/ClientOptions.java b/src/main/java/io/lettuce/core/ClientOptions.java index 3fd635e4e..cf77d9d38 100644 --- a/src/main/java/io/lettuce/core/ClientOptions.java +++ b/src/main/java/io/lettuce/core/ClientOptions.java @@ -25,6 +25,7 @@ import java.util.Iterator; import java.util.ServiceConfigurationError; import java.util.ServiceLoader; +import java.util.function.Predicate; import io.lettuce.core.api.StatefulConnection; import io.lettuce.core.internal.LettuceAssert; @@ -34,6 +35,7 @@ import io.lettuce.core.protocol.DecodeBufferPolicy; import io.lettuce.core.protocol.ProtocolVersion; import io.lettuce.core.protocol.ReadOnlyCommands; +import io.lettuce.core.protocol.RedisCommand; import io.lettuce.core.resource.ClientResources; import reactor.core.publisher.Mono; @@ -49,6 +51,8 @@ public class ClientOptions implements Serializable { public static final boolean DEFAULT_AUTO_RECONNECT = true; + public static final Predicate> DEFAULT_REPLAY_FILTER = (cmd) -> false; + public static final int DEFAULT_BUFFER_USAGE_RATIO = 3; public static final boolean DEFAULT_CANCEL_CMD_RECONNECT_FAIL = false; @@ -91,6 +95,8 @@ public class ClientOptions implements Serializable { private final boolean autoReconnect; + private final Predicate> replayFilter; + private final boolean cancelCommandsOnReconnectFailure; private final DecodeBufferPolicy decodeBufferPolicy; @@ -125,6 +131,7 @@ public class ClientOptions implements Serializable { protected ClientOptions(Builder builder) { this.autoReconnect = builder.autoReconnect; + this.replayFilter = builder.replayFilter; this.cancelCommandsOnReconnectFailure = builder.cancelCommandsOnReconnectFailure; this.decodeBufferPolicy = builder.decodeBufferPolicy; this.disconnectedBehavior = builder.disconnectedBehavior; @@ -145,6 +152,7 @@ protected ClientOptions(Builder builder) { protected ClientOptions(ClientOptions original) { this.autoReconnect = original.isAutoReconnect(); + this.replayFilter = original.getReplayFilter(); this.cancelCommandsOnReconnectFailure = original.isCancelCommandsOnReconnectFailure(); this.decodeBufferPolicy = original.getDecodeBufferPolicy(); this.disconnectedBehavior = original.getDisconnectedBehavior(); @@ -198,6 +206,8 @@ public static class Builder { private boolean autoReconnect = DEFAULT_AUTO_RECONNECT; + private Predicate> replayFilter = DEFAULT_REPLAY_FILTER; + private boolean cancelCommandsOnReconnectFailure = DEFAULT_CANCEL_CMD_RECONNECT_FAIL; private DecodeBufferPolicy decodeBufferPolicy = DecodeBufferPolicies.ratio(DEFAULT_BUFFER_USAGE_RATIO); @@ -245,6 +255,21 @@ public Builder autoReconnect(boolean autoReconnect) { return this; } + /** + * When {@link #autoReconnect(boolean)} is set to true, this {@link Predicate} is used to filter commands to replay when + * the connection is reestablished after a disconnect. Returning false means the command will not be + * filtered out and will be replayed. Defaults to replaying all queued commands. + * + * @param replayFilter a {@link Predicate} to filter commands to replay. Must not be {@code null}. + * @see #DEFAULT_REPLAY_FILTER + * @return {@code this} + * @since 6.6 + */ + public Builder replayFilter(Predicate> replayFilter) { + this.replayFilter = replayFilter; + return this; + } + /** * Allows cancelling queued commands in case a reconnect fails.Defaults to {@code false}. See * {@link #DEFAULT_CANCEL_CMD_RECONNECT_FAIL}. This flag is deprecated and should not be used as it can lead to race @@ -526,13 +551,13 @@ public ClientOptions.Builder mutate() { Builder builder = new Builder(); builder.autoReconnect(isAutoReconnect()).cancelCommandsOnReconnectFailure(isCancelCommandsOnReconnectFailure()) - .decodeBufferPolicy(getDecodeBufferPolicy()).disconnectedBehavior(getDisconnectedBehavior()) - .reauthenticateBehavior(getReauthenticateBehaviour()).readOnlyCommands(getReadOnlyCommands()) - .publishOnScheduler(isPublishOnScheduler()).pingBeforeActivateConnection(isPingBeforeActivateConnection()) - .protocolVersion(getConfiguredProtocolVersion()).requestQueueSize(getRequestQueueSize()) - .scriptCharset(getScriptCharset()).jsonParser(getJsonParser()).socketOptions(getSocketOptions()) - .sslOptions(getSslOptions()).suspendReconnectOnProtocolFailure(isSuspendReconnectOnProtocolFailure()) - .timeoutOptions(getTimeoutOptions()); + .replayFilter(getReplayFilter()).decodeBufferPolicy(getDecodeBufferPolicy()) + .disconnectedBehavior(getDisconnectedBehavior()).reauthenticateBehavior(getReauthenticateBehaviour()) + .readOnlyCommands(getReadOnlyCommands()).publishOnScheduler(isPublishOnScheduler()) + .pingBeforeActivateConnection(isPingBeforeActivateConnection()).protocolVersion(getConfiguredProtocolVersion()) + .requestQueueSize(getRequestQueueSize()).scriptCharset(getScriptCharset()).jsonParser(getJsonParser()) + .socketOptions(getSocketOptions()).sslOptions(getSslOptions()) + .suspendReconnectOnProtocolFailure(isSuspendReconnectOnProtocolFailure()).timeoutOptions(getTimeoutOptions()); return builder; } @@ -550,6 +575,16 @@ public boolean isAutoReconnect() { return autoReconnect; } + /** + * Controls which {@link RedisCommand} will be replayed after a re-connect. The {@link Predicate} returns true + * if command should be filtered out and not replayed. Defaults to {@link #DEFAULT_REPLAY_FILTER}. + * + * @return the currently set {@link Predicate} used to filter out commands to replay + */ + public Predicate> getReplayFilter() { + return replayFilter; + } + /** * If this flag is {@code true} any queued commands will be canceled when a reconnect fails within the activation sequence. * Default is {@code false}. diff --git a/src/main/java/io/lettuce/core/protocol/DefaultEndpoint.java b/src/main/java/io/lettuce/core/protocol/DefaultEndpoint.java index 502ce4a76..79f2f05f1 100644 --- a/src/main/java/io/lettuce/core/protocol/DefaultEndpoint.java +++ b/src/main/java/io/lettuce/core/protocol/DefaultEndpoint.java @@ -33,6 +33,7 @@ import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; +import java.util.function.Predicate; import java.util.function.Supplier; import io.lettuce.core.ClientOptions; @@ -81,6 +82,8 @@ public class DefaultEndpoint implements RedisChannelWriter, Endpoint, PushHandle private final Reliability reliability; + private final Predicate> replayFilter; + private final ClientOptions clientOptions; private final ClientResources clientResources; @@ -139,6 +142,7 @@ public DefaultEndpoint(ClientOptions clientOptions, ClientResources clientResour this.clientOptions = clientOptions; this.clientResources = clientResources; this.reliability = clientOptions.isAutoReconnect() ? Reliability.AT_LEAST_ONCE : Reliability.AT_MOST_ONCE; + this.replayFilter = clientOptions.getReplayFilter(); this.disconnectedBuffer = LettuceFactories.newConcurrentQueue(clientOptions.getRequestQueueSize()); this.commandBuffer = LettuceFactories.newConcurrentQueue(clientOptions.getRequestQueueSize()); this.boundedQueues = clientOptions.getRequestQueueSize() != Integer.MAX_VALUE; @@ -343,6 +347,13 @@ private void writeToDisconnectedBuffer(RedisCommand command) { return; } + if (replayFilter.test(command)) { + if (debugEnabled) { + logger.debug("{} writeToDisconnectedBuffer() Filtering out command {}", logPrefix(), command); + } + return; + } + if (debugEnabled) { logger.debug("{} writeToDisconnectedBuffer() buffering (disconnected) command {}", logPrefix(), command); } @@ -1033,10 +1044,16 @@ private void doComplete(Future future) { private void potentiallyRequeueCommands(Channel channel, RedisCommand sentCommand, Collection> sentCommands) { + // do not requeue commands that are done if (sentCommand != null && sentCommand.isDone()) { return; } + // do not requeue commands that are to be filtered out + if (this.endpoint.replayFilter.test(sentCommand)) { + return; + } + if (sentCommands != null) { boolean foundToSend = false; diff --git a/src/test/java/io/lettuce/core/cluster/ClusterNodeEndpointUnitTests.java b/src/test/java/io/lettuce/core/cluster/ClusterNodeEndpointUnitTests.java index 12b2cd66c..c11c28c30 100644 --- a/src/test/java/io/lettuce/core/cluster/ClusterNodeEndpointUnitTests.java +++ b/src/test/java/io/lettuce/core/cluster/ClusterNodeEndpointUnitTests.java @@ -56,6 +56,7 @@ class ClusterNodeEndpointUnitTests { @BeforeEach void before() { + when(clientOptions.getReplayFilter()).thenReturn((cmd) -> false); when(clientOptions.getRequestQueueSize()).thenReturn(1000); when(clientOptions.getDisconnectedBehavior()).thenReturn(ClientOptions.DisconnectedBehavior.DEFAULT); diff --git a/src/test/java/io/lettuce/core/reliability/AtLeastOnceIntegrationTests.java b/src/test/java/io/lettuce/core/reliability/AtLeastOnceIntegrationTests.java index 7dbac3bf9..df420ce7d 100644 --- a/src/test/java/io/lettuce/core/reliability/AtLeastOnceIntegrationTests.java +++ b/src/test/java/io/lettuce/core/reliability/AtLeastOnceIntegrationTests.java @@ -8,8 +8,10 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.function.Predicate; import io.lettuce.core.TimeoutOptions; +import io.lettuce.core.protocol.RedisCommand; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; @@ -372,6 +374,54 @@ void retryAfterConnectionIsDisconnected() throws Exception { verificationConnection.getStatefulConnection().close(); } + @Test + void retryAfterConnectionIsDisconnectedButFiltered() throws Exception { + // Do not replay DECR commands after reconnect for some reason + Predicate> filter = cmd -> cmd.getType().toString().equalsIgnoreCase("DECR"); + + client.setOptions(ClientOptions.builder().autoReconnect(true).replayFilter(filter) + .timeoutOptions(TimeoutOptions.builder().timeoutCommands(false).build()).build()); + + // needs to be increased on slow systems...perhaps... + client.setDefaultTimeout(3, TimeUnit.SECONDS); + + StatefulRedisConnection connection = client.connect(); + RedisCommands verificationConnection = client.connect().sync(); + + connection.sync().set(key, "1"); + + ConnectionWatchdog connectionWatchdog = ConnectionTestUtil.getConnectionWatchdog(connection); + connectionWatchdog.setListenOnChannelInactive(false); + + connection.async().quit(); + while (connection.isOpen()) { + Delay.delay(Duration.ofMillis(100)); + } + + assertThat(connection.async().incr(key).await(1, TimeUnit.SECONDS)).isFalse(); + assertThat(connection.async().decr(key).await(1, TimeUnit.SECONDS)).isFalse(); + assertThat(connection.async().decr(key).await(1, TimeUnit.SECONDS)).isFalse(); + + assertThat(verificationConnection.get("key")).isEqualTo("1"); + + assertThat(ConnectionTestUtil.getDisconnectedBuffer(connection).size()).isGreaterThan(0); + assertThat(ConnectionTestUtil.getCommandBuffer(connection)).isEmpty(); + + connectionWatchdog.setListenOnChannelInactive(true); + connectionWatchdog.scheduleReconnect(); + + while (!ConnectionTestUtil.getCommandBuffer(connection).isEmpty() + || !ConnectionTestUtil.getDisconnectedBuffer(connection).isEmpty()) { + Delay.delay(Duration.ofMillis(10)); + } + + assertThat(connection.sync().get(key)).isEqualTo("2"); + assertThat(verificationConnection.get(key)).isEqualTo("2"); + + connection.close(); + verificationConnection.getStatefulConnection().close(); + } + private Throwable getException(RedisFuture command) { try { command.get();