Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: schedule in eventloop for batchWrite of endpoint #2947

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 25 additions & 18 deletions src/main/java/io/lettuce/core/protocol/DefaultEndpoint.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.EventLoop;
import io.netty.handler.codec.EncoderException;
import io.netty.util.Recycler;
import io.netty.util.concurrent.Future;
Expand Down Expand Up @@ -379,38 +380,44 @@ private void writeToChannelAndFlush(RedisCommand<?, ?, ?> command) {
if (reliability == Reliability.AT_MOST_ONCE) {
// cancel on exceptions and remove from queue, because there is no housekeeping
channelFuture.addListener(AtMostOnceWriteListener.newInstance(this, command));
}

if (reliability == Reliability.AT_LEAST_ONCE) {
} else if (reliability == Reliability.AT_LEAST_ONCE) {
// commands are ok to stay within the queue, reconnect will retrigger them
channelFuture.addListener(RetryListener.newInstance(this, command));
}
}

private void writeToChannelAndFlush(Collection<? extends RedisCommand<?, ?, ?>> commands) {
final Channel chan = this.channel;
final EventLoop eventLoop = chan.eventLoop();

QUEUE_SIZE.addAndGet(this, commands.size());

if (reliability == Reliability.AT_MOST_ONCE) {

// cancel on exceptions and remove from queue, because there is no housekeeping
for (RedisCommand<?, ?, ?> command : commands) {
channelWrite(command).addListener(AtMostOnceWriteListener.newInstance(this, command));
}
executeInEventLoop(eventLoop, () -> {
for (RedisCommand<?, ?, ?> command : commands) {
channelWrite(chan, command).addListener(AtMostOnceWriteListener.newInstance(this, command));
}
channelFlush(chan);
});
} else if (reliability == Reliability.AT_LEAST_ONCE) {
executeInEventLoop(eventLoop, () -> {
for (RedisCommand<?, ?, ?> command : commands) {
channelWrite(chan, command).addListener(RetryListener.newInstance(this, command));
}
channelFlush(chan);
});
}
}

if (reliability == Reliability.AT_LEAST_ONCE) {

// commands are ok to stay within the queue, reconnect will retrigger them
for (RedisCommand<?, ?, ?> command : commands) {
channelWrite(command).addListener(RetryListener.newInstance(this, command));
}
private void executeInEventLoop(EventLoop eventLoop, Runnable runnable) {
if (eventLoop.inEventLoop()) {
runnable.run();
} else {
eventLoop.execute(runnable);
Copy link
Contributor

@Roiocam Roiocam Aug 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like it will limit the throughput because it will has only few thread write, in the legacy way, both IOGroup and ExecutorGroup can execute.

can you verify the performance improvement not just JMH case?

}

channelFlush();
}

private void channelFlush() {
private void channelFlush(Channel channel) {

if (debugEnabled) {
logger.debug("{} write() channelFlush", logPrefix());
Expand All @@ -419,7 +426,7 @@ private void channelFlush() {
channel.flush();
}

private ChannelFuture channelWrite(RedisCommand<?, ?, ?> command) {
private ChannelFuture channelWrite(Channel channel, RedisCommand<?, ?, ?> command) {

if (debugEnabled) {
logger.debug("{} write() channelWrite command {}", logPrefix(), command);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import io.netty.channel.SingleThreadEventLoop;
import io.netty.channel.nio.NioEventLoop;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.DefaultThreadFactory;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.core.LoggerContext;
Expand All @@ -27,6 +31,7 @@
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
Expand All @@ -46,6 +51,7 @@
import io.netty.channel.EventLoop;
import io.netty.handler.codec.EncoderException;
import io.netty.util.concurrent.ImmediateEventExecutor;
import org.mockito.stubbing.Answer;

/**
* @author Mark Paluch
Expand All @@ -64,6 +70,9 @@ class DefaultEndpointUnitTests {
@Mock
private Channel channel;

@Mock
private EventLoop eventLoop;

@Mock
private ConnectionFacade connectionFacade;

Expand Down Expand Up @@ -117,6 +126,13 @@ void before() {
return promise;
});

when(channel.eventLoop()).thenReturn(eventLoop);
doAnswer((Answer<Void>) invocation -> {
Runnable runnable = (Runnable) invocation.getArguments()[0];
runnable.run(); // Directly execute the runnable
return null;
}).when(eventLoop).execute(any(Runnable.class));

sut = new DefaultEndpoint(ClientOptions.create(), clientResources);
sut.setConnectionFacade(connectionFacade);
}
Expand Down Expand Up @@ -337,8 +353,6 @@ void retryListenerDoesNotRetryCompletedCommands() {

DefaultEndpoint.RetryListener listener = DefaultEndpoint.RetryListener.newInstance(sut, command);

when(channel.eventLoop()).thenReturn(mock(EventLoop.class));

command.complete();
promise.tryFailure(new Exception());

Expand Down
Loading