From d7dcf54ddf555b83b2aec65806817d3fef610d90 Mon Sep 17 00:00:00 2001 From: "xiaofan.chen" Date: Thu, 29 Aug 2024 16:49:33 +0800 Subject: [PATCH] fix: two bugs: 1, autoBatchFlushEndPointContext.add() should always be before autoBatchFlushEndPointContext.done(1) othewise the flyingTaskNum could be negative; 2, make sure lastEventLoop is never null --- .../io/lettuce/core/AbstractRedisClient.java | 11 ++-- .../DefaultAutoBatchFlushEndpoint.java | 62 ++++++++++--------- 2 files changed, 37 insertions(+), 36 deletions(-) diff --git a/src/main/java/io/lettuce/core/AbstractRedisClient.java b/src/main/java/io/lettuce/core/AbstractRedisClient.java index 9adff50624..0def69d209 100644 --- a/src/main/java/io/lettuce/core/AbstractRedisClient.java +++ b/src/main/java/io/lettuce/core/AbstractRedisClient.java @@ -34,7 +34,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import reactor.core.publisher.Mono; import io.lettuce.core.event.command.CommandListener; import io.lettuce.core.event.connection.ConnectEvent; import io.lettuce.core.event.connection.ConnectionCreatedEvent; @@ -65,6 +64,7 @@ import io.netty.util.concurrent.Future; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; +import reactor.core.publisher.Mono; /** * Base Redis client. This class holds the netty infrastructure, {@link ClientOptions} and the basic connection procedure. This @@ -80,8 +80,8 @@ * @author Mark Paluch * @author Jongyeol Choi * @author Poorva Gokhale - * @since 3.0 * @see ClientResources + * @since 3.0 */ public abstract class AbstractRedisClient implements AutoCloseable { @@ -202,7 +202,6 @@ protected void setOptions(ClientOptions clientOptions) { * * @return the {@link ClientResources} for this client. * @since 6.0 - * */ public ClientResources getResources() { return clientResources; @@ -509,8 +508,8 @@ public void close() { * @param quietPeriod the quiet period to allow the executor gracefully shut down. * @param timeout the maximum amount of time to wait until the backing executor is shutdown regardless if a task was * submitted during the quiet period. - * @since 5.0 * @see EventExecutorGroup#shutdownGracefully(long, long, TimeUnit) + * @since 5.0 */ public void shutdown(Duration quietPeriod, Duration timeout) { shutdown(quietPeriod.toNanos(), timeout.toNanos(), TimeUnit.NANOSECONDS); @@ -542,8 +541,8 @@ public void shutdown(long quietPeriod, long timeout, TimeUnit timeUnit) { * should be discarded after calling shutdown. The shutdown is executed without quiet time and a timeout of 2 * {@link TimeUnit#SECONDS}. * - * @since 4.4 * @see EventExecutorGroup#shutdownGracefully(long, long, TimeUnit) + * @since 4.4 */ public CompletableFuture shutdownAsync() { return shutdownAsync(0, 2, TimeUnit.SECONDS); @@ -558,8 +557,8 @@ public CompletableFuture shutdownAsync() { * @param timeout the maximum amount of time to wait until the backing executor is shutdown regardless if a task was * submitted during the quiet period. * @param timeUnit the unit of {@code quietPeriod} and {@code timeout}. - * @since 4.4 * @see EventExecutorGroup#shutdownGracefully(long, long, TimeUnit) + * @since 4.4 */ public CompletableFuture shutdownAsync(long quietPeriod, long timeout, TimeUnit timeUnit) { diff --git a/src/main/java/io/lettuce/core/protocol/DefaultAutoBatchFlushEndpoint.java b/src/main/java/io/lettuce/core/protocol/DefaultAutoBatchFlushEndpoint.java index 7326aff060..c5c84e30f2 100644 --- a/src/main/java/io/lettuce/core/protocol/DefaultAutoBatchFlushEndpoint.java +++ b/src/main/java/io/lettuce/core/protocol/DefaultAutoBatchFlushEndpoint.java @@ -15,24 +15,6 @@ */ package io.lettuce.core.protocol; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Deque; -import java.util.HashSet; -import java.util.List; -import java.util.Queue; -import java.util.Set; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; -import java.util.function.Consumer; - -import javax.annotation.Nonnull; -import javax.annotation.Nullable; - import io.lettuce.core.ClientOptions; import io.lettuce.core.ConnectionEvents; import io.lettuce.core.ContextualChannel; @@ -55,11 +37,29 @@ import io.netty.channel.EventLoop; import io.netty.handler.codec.EncoderException; import io.netty.util.Recycler; +import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GenericFutureListener; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Deque; +import java.util.HashSet; +import java.util.List; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import java.util.function.Consumer; + /** * Default {@link Endpoint} implementation. * @@ -161,7 +161,7 @@ protected static void cancelCommandOnEndpointClose(RedisCommand cmd) { private final boolean canFire; - private volatile EventLoop lastEventLoop = null; + private volatile EventExecutor lastEventExecutor; private volatile Throwable connectionError; @@ -202,6 +202,7 @@ protected DefaultAutoBatchFlushEndpoint(ClientOptions clientOptions, ClientResou this.callbackOnClose = callbackOnClose; this.writeSpinCount = clientOptions.getAutoBatchFlushOptions().getWriteSpinCount(); this.batchSize = clientOptions.getAutoBatchFlushOptions().getBatchSize(); + this.lastEventExecutor = clientResources.eventExecutorGroup().next(); } @Override @@ -322,7 +323,7 @@ public void notifyChannelActive(Channel channel) { return; } - this.lastEventLoop = channel.eventLoop(); + this.lastEventExecutor = channel.eventLoop(); this.connectionError = null; this.inProtectMode = false; this.logPrefix = null; @@ -585,7 +586,7 @@ private void resetInternal() { if (chan.context.initialState.isConnected()) { chan.pipeline().fireUserEventTriggered(new ConnectionEvents.Reset()); } - LettuceAssert.assertState(lastEventLoop.inEventLoop(), "must be called in lastEventLoop thread"); + LettuceAssert.assertState(lastEventExecutor.inEventLoop(), "must be called in lastEventLoop thread"); cancelCommands("resetInternal"); } @@ -727,22 +728,23 @@ private int pollBatch(final AutoBatchFlushEndPointContext autoBatchFlushEndPoint } if (o instanceof RedisCommand) { + autoBatchFlushEndPointContext.add(1); RedisCommand cmd = (RedisCommand) o; channelWrite(chan, cmd).addListener(WrittenToChannel.newInstance(this, chan, cmd, false)); count++; } else { @SuppressWarnings("unchecked") Collection> commands = (Collection>) o; + final int commandsSize = commands.size(); // size() could be expensive for some collections so cache it! + autoBatchFlushEndPointContext.add(commandsSize); for (RedisCommand cmd : commands) { channelWrite(chan, cmd).addListener(WrittenToChannel.newInstance(this, chan, cmd, false)); } - count += commands.size(); + count += commandsSize; } } if (count > 0) { - autoBatchFlushEndPointContext.add(count); - channelFlush(chan); if (autoBatchFlushEndPointContext.hasRetryableFailedToSendCommands()) { // Wait for onConnectionClose event() @@ -755,7 +757,7 @@ private int pollBatch(final AutoBatchFlushEndPointContext autoBatchFlushEndPoint private void trySetEndpointQuiescence(ContextualChannel chan) { final EventLoop eventLoop = chan.eventLoop(); LettuceAssert.isTrue(eventLoop.inEventLoop(), "unexpected: not in event loop"); - LettuceAssert.isTrue(eventLoop == lastEventLoop, "unexpected: lastEventLoop not match"); + LettuceAssert.isTrue(eventLoop == lastEventExecutor, "unexpected: lastEventLoop not match"); final ConnectionContext connectionContext = chan.context; final @Nullable ConnectionContext.CloseStatus closeStatus = connectionContext.getCloseStatus(); @@ -1019,14 +1021,14 @@ private ChannelFuture channelWrite(Channel channel, RedisCommand comman * is terminated (state is RECONNECT_FAILED/ENDPOINT_CLOSED) */ private void syncAfterTerminated(Runnable runnable) { - final EventLoop localLastEventLoop = lastEventLoop; - LettuceAssert.notNull(localLastEventLoop, "lastEventLoop must not be null after terminated"); - if (localLastEventLoop.inEventLoop()) { + final EventExecutor localLastEventExecutor = lastEventExecutor; + if (localLastEventExecutor.inEventLoop()) { runnable.run(); } else { - localLastEventLoop.execute(() -> { + localLastEventExecutor.execute(() -> { runnable.run(); - LettuceAssert.isTrue(lastEventLoop == localLastEventLoop, "lastEventLoop must not be changed after terminated"); + LettuceAssert.isTrue(lastEventExecutor == localLastEventExecutor, + "lastEventLoop must not be changed after terminated"); }); } }