Skip to content

Commit

Permalink
fix: two bugs: 1, autoBatchFlushEndPointContext.add() should always b…
Browse files Browse the repository at this point in the history
…e before autoBatchFlushEndPointContext.done(1) othewise the flyingTaskNum could be negative; 2, make sure lastEventLoop is never null
  • Loading branch information
okg-cxf committed Aug 29, 2024
1 parent a80453e commit d7dcf54
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 36 deletions.
11 changes: 5 additions & 6 deletions src/main/java/io/lettuce/core/AbstractRedisClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import reactor.core.publisher.Mono;
import io.lettuce.core.event.command.CommandListener;
import io.lettuce.core.event.connection.ConnectEvent;
import io.lettuce.core.event.connection.ConnectionCreatedEvent;
Expand Down Expand Up @@ -65,6 +64,7 @@
import io.netty.util.concurrent.Future;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import reactor.core.publisher.Mono;

/**
* Base Redis client. This class holds the netty infrastructure, {@link ClientOptions} and the basic connection procedure. This
Expand All @@ -80,8 +80,8 @@
* @author Mark Paluch
* @author Jongyeol Choi
* @author Poorva Gokhale
* @since 3.0
* @see ClientResources
* @since 3.0
*/
public abstract class AbstractRedisClient implements AutoCloseable {

Expand Down Expand Up @@ -202,7 +202,6 @@ protected void setOptions(ClientOptions clientOptions) {
*
* @return the {@link ClientResources} for this client.
* @since 6.0
*
*/
public ClientResources getResources() {
return clientResources;
Expand Down Expand Up @@ -509,8 +508,8 @@ public void close() {
* @param quietPeriod the quiet period to allow the executor gracefully shut down.
* @param timeout the maximum amount of time to wait until the backing executor is shutdown regardless if a task was
* submitted during the quiet period.
* @since 5.0
* @see EventExecutorGroup#shutdownGracefully(long, long, TimeUnit)
* @since 5.0
*/
public void shutdown(Duration quietPeriod, Duration timeout) {
shutdown(quietPeriod.toNanos(), timeout.toNanos(), TimeUnit.NANOSECONDS);
Expand Down Expand Up @@ -542,8 +541,8 @@ public void shutdown(long quietPeriod, long timeout, TimeUnit timeUnit) {
* should be discarded after calling shutdown. The shutdown is executed without quiet time and a timeout of 2
* {@link TimeUnit#SECONDS}.
*
* @since 4.4
* @see EventExecutorGroup#shutdownGracefully(long, long, TimeUnit)
* @since 4.4
*/
public CompletableFuture<Void> shutdownAsync() {
return shutdownAsync(0, 2, TimeUnit.SECONDS);
Expand All @@ -558,8 +557,8 @@ public CompletableFuture<Void> shutdownAsync() {
* @param timeout the maximum amount of time to wait until the backing executor is shutdown regardless if a task was
* submitted during the quiet period.
* @param timeUnit the unit of {@code quietPeriod} and {@code timeout}.
* @since 4.4
* @see EventExecutorGroup#shutdownGracefully(long, long, TimeUnit)
* @since 4.4
*/
public CompletableFuture<Void> shutdownAsync(long quietPeriod, long timeout, TimeUnit timeUnit) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,6 @@
*/
package io.lettuce.core.protocol;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Deque;
import java.util.HashSet;
import java.util.List;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Consumer;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import io.lettuce.core.ClientOptions;
import io.lettuce.core.ConnectionEvents;
import io.lettuce.core.ContextualChannel;
Expand All @@ -55,11 +37,29 @@
import io.netty.channel.EventLoop;
import io.netty.handler.codec.EncoderException;
import io.netty.util.Recycler;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Deque;
import java.util.HashSet;
import java.util.List;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Consumer;

/**
* Default {@link Endpoint} implementation.
*
Expand Down Expand Up @@ -161,7 +161,7 @@ protected static void cancelCommandOnEndpointClose(RedisCommand<?, ?, ?> cmd) {

private final boolean canFire;

private volatile EventLoop lastEventLoop = null;
private volatile EventExecutor lastEventExecutor;

private volatile Throwable connectionError;

Expand Down Expand Up @@ -202,6 +202,7 @@ protected DefaultAutoBatchFlushEndpoint(ClientOptions clientOptions, ClientResou
this.callbackOnClose = callbackOnClose;
this.writeSpinCount = clientOptions.getAutoBatchFlushOptions().getWriteSpinCount();
this.batchSize = clientOptions.getAutoBatchFlushOptions().getBatchSize();
this.lastEventExecutor = clientResources.eventExecutorGroup().next();
}

@Override
Expand Down Expand Up @@ -322,7 +323,7 @@ public void notifyChannelActive(Channel channel) {
return;
}

this.lastEventLoop = channel.eventLoop();
this.lastEventExecutor = channel.eventLoop();
this.connectionError = null;
this.inProtectMode = false;
this.logPrefix = null;
Expand Down Expand Up @@ -585,7 +586,7 @@ private void resetInternal() {
if (chan.context.initialState.isConnected()) {
chan.pipeline().fireUserEventTriggered(new ConnectionEvents.Reset());
}
LettuceAssert.assertState(lastEventLoop.inEventLoop(), "must be called in lastEventLoop thread");
LettuceAssert.assertState(lastEventExecutor.inEventLoop(), "must be called in lastEventLoop thread");
cancelCommands("resetInternal");
}

Expand Down Expand Up @@ -727,22 +728,23 @@ private int pollBatch(final AutoBatchFlushEndPointContext autoBatchFlushEndPoint
}

if (o instanceof RedisCommand<?, ?, ?>) {
autoBatchFlushEndPointContext.add(1);
RedisCommand<?, ?, ?> cmd = (RedisCommand<?, ?, ?>) o;
channelWrite(chan, cmd).addListener(WrittenToChannel.newInstance(this, chan, cmd, false));
count++;
} else {
@SuppressWarnings("unchecked")
Collection<? extends RedisCommand<?, ?, ?>> commands = (Collection<? extends RedisCommand<?, ?, ?>>) o;
final int commandsSize = commands.size(); // size() could be expensive for some collections so cache it!
autoBatchFlushEndPointContext.add(commandsSize);
for (RedisCommand<?, ?, ?> cmd : commands) {
channelWrite(chan, cmd).addListener(WrittenToChannel.newInstance(this, chan, cmd, false));
}
count += commands.size();
count += commandsSize;
}
}

if (count > 0) {
autoBatchFlushEndPointContext.add(count);

channelFlush(chan);
if (autoBatchFlushEndPointContext.hasRetryableFailedToSendCommands()) {
// Wait for onConnectionClose event()
Expand All @@ -755,7 +757,7 @@ private int pollBatch(final AutoBatchFlushEndPointContext autoBatchFlushEndPoint
private void trySetEndpointQuiescence(ContextualChannel chan) {
final EventLoop eventLoop = chan.eventLoop();
LettuceAssert.isTrue(eventLoop.inEventLoop(), "unexpected: not in event loop");
LettuceAssert.isTrue(eventLoop == lastEventLoop, "unexpected: lastEventLoop not match");
LettuceAssert.isTrue(eventLoop == lastEventExecutor, "unexpected: lastEventLoop not match");

final ConnectionContext connectionContext = chan.context;
final @Nullable ConnectionContext.CloseStatus closeStatus = connectionContext.getCloseStatus();
Expand Down Expand Up @@ -1019,14 +1021,14 @@ private ChannelFuture channelWrite(Channel channel, RedisCommand<?, ?, ?> comman
* is terminated (state is RECONNECT_FAILED/ENDPOINT_CLOSED)
*/
private void syncAfterTerminated(Runnable runnable) {
final EventLoop localLastEventLoop = lastEventLoop;
LettuceAssert.notNull(localLastEventLoop, "lastEventLoop must not be null after terminated");
if (localLastEventLoop.inEventLoop()) {
final EventExecutor localLastEventExecutor = lastEventExecutor;
if (localLastEventExecutor.inEventLoop()) {
runnable.run();
} else {
localLastEventLoop.execute(() -> {
localLastEventExecutor.execute(() -> {
runnable.run();
LettuceAssert.isTrue(lastEventLoop == localLastEventLoop, "lastEventLoop must not be changed after terminated");
LettuceAssert.isTrue(lastEventExecutor == localLastEventExecutor,
"lastEventLoop must not be changed after terminated");
});
}
}
Expand Down

0 comments on commit d7dcf54

Please sign in to comment.