From 7a54c8bc2d55a07ecfa0b8336d14fe3f65fd2d02 Mon Sep 17 00:00:00 2001 From: "xiaofan.chen" Date: Fri, 16 Aug 2024 12:58:55 +0800 Subject: [PATCH] fix: activiation command should be sent immediately upon channelActive events --- .../io/lettuce/core/ContextualChannel.java | 4 - .../StatefulRedisClusterConnectionImpl.java | 18 +++- .../DefaultAutoBatchFlushEndpoint.java | 101 ++++++++++++------ .../io/lettuce/core/utils/ExceptionUtils.java | 20 ++-- 4 files changed, 95 insertions(+), 48 deletions(-) diff --git a/src/main/java/io/lettuce/core/ContextualChannel.java b/src/main/java/io/lettuce/core/ContextualChannel.java index 188698e0cf..384fd043ca 100644 --- a/src/main/java/io/lettuce/core/ContextualChannel.java +++ b/src/main/java/io/lettuce/core/ContextualChannel.java @@ -27,10 +27,6 @@ public class ContextualChannel implements Channel { public final ConnectionContext context; - public ConnectionContext getContext() { - return context; - } - public Channel getDelegate() { return delegate; } diff --git a/src/main/java/io/lettuce/core/cluster/StatefulRedisClusterConnectionImpl.java b/src/main/java/io/lettuce/core/cluster/StatefulRedisClusterConnectionImpl.java index 3e89689016..640321d10d 100644 --- a/src/main/java/io/lettuce/core/cluster/StatefulRedisClusterConnectionImpl.java +++ b/src/main/java/io/lettuce/core/cluster/StatefulRedisClusterConnectionImpl.java @@ -19,8 +19,6 @@ */ package io.lettuce.core.cluster; -import static io.lettuce.core.protocol.CommandType.*; - import java.lang.reflect.InvocationHandler; import java.lang.reflect.Proxy; import java.time.Duration; @@ -57,6 +55,12 @@ import io.lettuce.core.protocol.ConnectionIntent; import io.lettuce.core.protocol.ConnectionWatchdog; import io.lettuce.core.protocol.RedisCommand; +import io.netty.util.internal.logging.InternalLogger; +import io.netty.util.internal.logging.InternalLoggerFactory; + +import static io.lettuce.core.protocol.CommandType.AUTH; +import static io.lettuce.core.protocol.CommandType.READONLY; +import static io.lettuce.core.protocol.CommandType.READWRITE; /** * A thread-safe connection to a Redis Cluster. Multiple threads may share one {@link StatefulRedisClusterConnectionImpl} @@ -70,6 +74,8 @@ public class StatefulRedisClusterConnectionImpl extends RedisChannelHandler implements StatefulRedisClusterConnection { + private static final InternalLogger logger = InternalLoggerFactory.getInstance(StatefulRedisClusterConnectionImpl.class); + private final ClusterPushHandler pushHandler; protected final RedisCodec codec; @@ -208,7 +214,13 @@ public CompletableFuture> getConnectionAsync(Strin public void activated() { super.activated(); - async.clusterMyId().thenAccept(connectionState::setNodeId); + async.clusterMyId().whenComplete((nodeId, throwable) -> { + if (throwable != null) { + logger.warn("Failed to retrieve current cluster node ID: {}", throwable); + } else { + connectionState.setNodeId(nodeId); + } + }); } ClusterDistributionChannelWriter getClusterDistributionChannelWriter() { diff --git a/src/main/java/io/lettuce/core/protocol/DefaultAutoBatchFlushEndpoint.java b/src/main/java/io/lettuce/core/protocol/DefaultAutoBatchFlushEndpoint.java index b8bb3b6f62..2bb6d1d8ac 100644 --- a/src/main/java/io/lettuce/core/protocol/DefaultAutoBatchFlushEndpoint.java +++ b/src/main/java/io/lettuce/core/protocol/DefaultAutoBatchFlushEndpoint.java @@ -65,6 +65,7 @@ * * @author Mark Paluch */ +@SuppressWarnings("DuplicatedCode") public class DefaultAutoBatchFlushEndpoint implements RedisChannelWriter, AutoBatchFlushEndpoint, PushHandler { private static final InternalLogger logger = InternalLoggerFactory.getInstance(AutoBatchFlushEndpoint.class); @@ -235,10 +236,9 @@ public List getPushListeners() { @Override public RedisCommand write(RedisCommand command) { - LettuceAssert.notNull(command, "Command must not be null"); - - final Throwable validation = validateWrite(1); + final ContextualChannel chan = this.channel; + final Throwable validation = validateWrite(chan, 1, inActivation); if (validation != null) { command.completeExceptionally(validation); return command; @@ -246,16 +246,17 @@ public RedisCommand write(RedisCommand command) { try { if (inActivation) { + // needs write and flush activation command immediately, cannot queue it. command = processActivationCommand(command); - } - - this.taskQueue.offer(command); - QUEUE_SIZE.incrementAndGet(this); + writeAndFlushActivationCommand(chan, command); + } else { + this.taskQueue.offer(command); + QUEUE_SIZE.incrementAndGet(this); - if (autoFlushCommands) { - flushCommands(); + if (autoFlushCommands) { + flushCommands(); + } } - } finally { if (debugEnabled) { logger.debug("{} write() done", logPrefix()); @@ -268,10 +269,10 @@ public RedisCommand write(RedisCommand command) { @SuppressWarnings("unchecked") @Override public Collection> write(Collection> commands) { - LettuceAssert.notNull(commands, "Commands must not be null"); - final Throwable validation = validateWrite(commands.size()); + final ContextualChannel chan = this.channel; + final Throwable validation = validateWrite(chan, commands.size(), inActivation); if (validation != null) { commands.forEach(it -> it.completeExceptionally(validation)); return (Collection>) commands; @@ -279,14 +280,16 @@ public RedisCommand write(RedisCommand command) { try { if (inActivation) { + // needs write and flush activation commands immediately, cannot queue it. commands = processActivationCommands(commands); - } - - this.taskQueue.offer(commands); - QUEUE_SIZE.addAndGet(this, commands.size()); + writeAndFlushActivationCommands(chan, commands); + } else { + this.taskQueue.offer(commands); + QUEUE_SIZE.addAndGet(this, commands.size()); - if (autoFlushCommands) { - flushCommands(); + if (autoFlushCommands) { + flushCommands(); + } } } finally { if (debugEnabled) { @@ -297,6 +300,19 @@ public RedisCommand write(RedisCommand command) { return (Collection>) commands; } + private void writeAndFlushActivationCommand(ContextualChannel chan, RedisCommand command) { + channelWrite(chan, command).addListener(WrittenToChannel.newInstance(this, chan, command, true)); + channelFlush(chan); + } + + private void writeAndFlushActivationCommands(ContextualChannel chan, + Collection> commands) { + for (RedisCommand command : commands) { + channelWrite(chan, command).addListener(WrittenToChannel.newInstance(this, chan, command, true)); + } + channelFlush(chan); + } + @Override public void notifyChannelActive(Channel channel) { final ContextualChannel contextualChannel = new ContextualChannel(channel, ConnectionContext.State.CONNECTED); @@ -494,8 +510,8 @@ public void close() { } @Override + @SuppressWarnings("java:S125" /* The comments are necessary to prove the correctness code */) public CompletableFuture closeAsync() { - if (debugEnabled) { logger.debug("{} closeAsync()", logPrefix()); } @@ -650,7 +666,7 @@ private void scheduleSendJobIfNeeded(final ContextualChannel chan) { // 2. hasOngoingSendLoop.safe.get() == 1 (volatile read) synchronizes-before // hasOngoingSendLoop.safe.set(0) (volatile write) in first loopSend0() // 3. hasOngoingSendLoop.safe.set(0) (volatile write) synchronizes-before - // second loopSend0(), which will call poll() (volatile read of producerIndex) + // taskQueue.isEmpty() (volatile read of producerIndex), which guarantees to see the offered task. } private void loopSend(final ContextualChannel chan, boolean entered) { @@ -703,13 +719,13 @@ private int pollBatch(final AutoBatchFlushEndPointContext autoBatchFlushEndPoint if (o instanceof RedisCommand) { RedisCommand cmd = (RedisCommand) o; - channelWrite(chan, cmd).addListener(WrittenToChannel.newInstance(this, chan, cmd)); + channelWrite(chan, cmd).addListener(WrittenToChannel.newInstance(this, chan, cmd, false)); count++; } else { @SuppressWarnings("unchecked") Collection> commands = (Collection>) o; for (RedisCommand cmd : commands) { - channelWrite(chan, cmd).addListener(WrittenToChannel.newInstance(this, chan, cmd)); + channelWrite(chan, cmd).addListener(WrittenToChannel.newInstance(this, chan, cmd, false)); } count += commands.size(); } @@ -770,6 +786,7 @@ private void onEndpointQuiescence() { } // neither connectionWatchdog nor doReconnectOnEndpointQuiescence could be null + // noinspection DataFlowIssue connectionWatchdog.reconnectOnAutoBatchFlushEndpointQuiescence(); } @@ -839,6 +856,7 @@ private final void onReconnectFailed() { } @SafeVarargs + @SuppressWarnings("java:S3776" /* Suppress cognitive complexity warning */) private final void fulfillCommands(String message, Consumer> commandConsumer, Queue>... queues) { int totalCancelledTaskNum = 0; @@ -901,7 +919,6 @@ private Throwable getFailedToReconnectReason() { } private RedisCommand processActivationCommand(RedisCommand command) { - if (!ActivationCommand.isActivationCommand(command)) { return new ActivationCommand<>(command); } @@ -926,7 +943,7 @@ private RedisCommand processActivationCommand(RedisCommand clientOptions.getRequestQueueSize()) { + if (!isActivationCommand /* activation command should never be excluded due to queue full */ && boundedQueues + && queueSize + commands > clientOptions.getRequestQueueSize()) { return new RedisException("Request queue size exceeded: " + clientOptions.getRequestQueueSize() + ". Commands are not accepted until the queue size drops."); } - final ContextualChannel chan = this.channel; - switch (chan.context.initialState) { + final ConnectionContext.State initialState = chan.context.initialState; + final boolean rejectCommandsWhileDisconnectedLocal = this.rejectCommandsWhileDisconnected || isActivationCommand; + switch (initialState) { case ENDPOINT_CLOSED: return new RedisException("Connection is closed"); case RECONNECT_FAILED: return failedToReconnectReason; case WILL_RECONNECT: case CONNECTING: - return rejectCommandsWhileDisconnected ? new RedisException("Currently not connected. Commands are rejected.") + return rejectCommandsWhileDisconnectedLocal + ? new RedisException("Currently not connected. Commands are rejected.") : null; case CONNECTED: - return !chan.isActive() && rejectCommandsWhileDisconnected ? new RedisException("Connection is closed") : null; + return !chan.isActive() && rejectCommandsWhileDisconnectedLocal ? new RedisException("Channel is closed") + : null; default: - throw new IllegalStateException("unexpected state: " + chan.context.initialState); + throw new IllegalStateException("unexpected state: " + initialState); } } @@ -1023,6 +1044,8 @@ protected WrittenToChannel newObject(Recycler.Handle handle) { private RedisCommand cmd; + private boolean isActivationCommand; + private ContextualChannel chan; private WrittenToChannel(Recycler.Handle handle) { @@ -1035,21 +1058,32 @@ private WrittenToChannel(Recycler.Handle handle) { * @return new instance */ static WrittenToChannel newInstance(DefaultAutoBatchFlushEndpoint endpoint, ContextualChannel chan, - RedisCommand command) { + RedisCommand command, boolean isActivationCommand) { WrittenToChannel entry = RECYCLER.get(); entry.endpoint = endpoint; entry.chan = chan; entry.cmd = command; + entry.isActivationCommand = isActivationCommand; + + LettuceAssert.assertState(isActivationCommand == ActivationCommand.isActivationCommand(command), + "unexpected: isActivationCommand not match"); return entry; } @Override public void operationComplete(Future future) { - final AutoBatchFlushEndPointContext autoBatchFlushEndPointContext = chan.context.autoBatchFlushEndPointContext; try { + if (isActivationCommand) { + if (!future.isSuccess()) { + cmd.completeExceptionally(future.cause()); + } + return; + } + + final AutoBatchFlushEndPointContext autoBatchFlushEndPointContext = chan.context.autoBatchFlushEndPointContext; QUEUE_SIZE.decrementAndGet(endpoint); autoBatchFlushEndPointContext.done(1); @@ -1105,7 +1139,7 @@ private Throwable checkSendResult(Future sendFuture) { } private boolean shouldNotRetry(Throwable cause) { - return endpoint.reliability == Reliability.AT_MOST_ONCE || ActivationCommand.isActivationCommand(cmd) + return endpoint.reliability == Reliability.AT_MOST_ONCE || ExceptionUtils.oneOf(cause, SHOULD_NOT_RETRY_EXCEPTION_TYPES); } @@ -1129,6 +1163,7 @@ private void recycle() { this.endpoint = null; this.chan = null; this.cmd = null; + this.isActivationCommand = false; handle.recycle(this); } diff --git a/src/main/java/io/lettuce/core/utils/ExceptionUtils.java b/src/main/java/io/lettuce/core/utils/ExceptionUtils.java index 49ed6e548c..50220abc08 100644 --- a/src/main/java/io/lettuce/core/utils/ExceptionUtils.java +++ b/src/main/java/io/lettuce/core/utils/ExceptionUtils.java @@ -1,17 +1,17 @@ package io.lettuce.core.utils; -import io.lettuce.core.output.CommandOutput; -import io.lettuce.core.protocol.RedisCommand; -import io.netty.channel.socket.ChannelOutputShutdownException; -import io.netty.util.internal.logging.InternalLogger; - import java.io.IOException; import java.nio.channels.ClosedChannelException; import java.util.Arrays; import java.util.Collection; import java.util.HashSet; import java.util.Set; -import java.util.function.Function; + +import io.lettuce.core.output.CommandOutput; +import io.lettuce.core.protocol.RedisCommand; +import io.netty.channel.socket.ChannelOutputShutdownException; +import io.netty.util.internal.logging.InternalLogLevel; +import io.netty.util.internal.logging.InternalLogger; public class ExceptionUtils { @@ -26,12 +26,16 @@ public static void maybeLogSendError(InternalLogger logger, Throwable cause) { return; } + final String message = "Unexpected exception during request: {}"; + final InternalLogLevel logLevel; + if (cause instanceof IOException && (SUPPRESS_IO_EXCEPTION_MESSAGES.contains(cause.getMessage()) || cause instanceof ChannelOutputShutdownException)) { - logger.debug("[maybeLogSendError] error during request: {}", cause.getMessage(), cause); + logLevel = InternalLogLevel.DEBUG; } else { - logger.error("[maybeLogSendError][attention] unexpected exception during request: {}", cause.getMessage(), cause); + logLevel = InternalLogLevel.WARN; } + logger.log(logLevel, message, cause.toString(), cause); } /**