From 3241d42e5474cbbc7e5a2c0fea0ee444d89dfca8 Mon Sep 17 00:00:00 2001 From: "xiaofan.chen" Date: Thu, 8 Aug 2024 16:57:34 +0800 Subject: [PATCH] chore: handle eventLoop.inEventLoop() case of scheduleSendJobIfNeeded() --- .../protocol/DefaultBatchFlushEndpoint.java | 26 ++++++++++--------- 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/src/main/java/io/lettuce/core/protocol/DefaultBatchFlushEndpoint.java b/src/main/java/io/lettuce/core/protocol/DefaultBatchFlushEndpoint.java index 0ca5739bf6..54cbd6c511 100644 --- a/src/main/java/io/lettuce/core/protocol/DefaultBatchFlushEndpoint.java +++ b/src/main/java/io/lettuce/core/protocol/DefaultBatchFlushEndpoint.java @@ -601,15 +601,17 @@ private void scheduleSendJobOnConnected(final ContextualChannel chan) { LettuceAssert.assertState(chan.eventLoop().inEventLoop(), "must be called in event loop thread"); // Schedule directly - if (chan.context.batchFlushEndPointContext.hasOngoingSendLoop.tryEnter()) { - loopSend(chan); - } - // Otherwise: - // someone will do the job for us + loopSend(chan, false); } private void scheduleSendJobIfNeeded(final ContextualChannel chan) { final EventLoop eventLoop = chan.eventLoop(); + if (eventLoop.inEventLoop()) { + // Possible in reactive() mode. + loopSend(chan, false); + return; + } + if (chan.context.batchFlushEndPointContext.hasOngoingSendLoop.tryEnter()) { // Benchmark result of using tryEnterSafeGetVolatile() or not (1 thread, async get): // 1. uses tryEnterSafeGetVolatile() to avoid unnecessary eventLoop.execute() calls @@ -618,7 +620,7 @@ private void scheduleSendJobIfNeeded(final ContextualChannel chan) { // 2. uses eventLoop.execute() directly // Avg latency: 3.2677197021496998s // Avg QPS: 476925.0751855796/s - eventLoop.execute(() -> loopSend(chan)); + eventLoop.execute(() -> loopSend(chan, true)); } // Otherwise: @@ -629,7 +631,7 @@ private void scheduleSendJobIfNeeded(final ContextualChannel chan) { // second loopSend0(), which will call poll() } - private void loopSend(final ContextualChannel chan) { + private void loopSend(final ContextualChannel chan, boolean entered) { final ConnectionContext connectionContext = chan.context; final BatchFlushEndPointContext batchFlushEndPointContext = connectionContext.batchFlushEndPointContext; if (connectionContext.isChannelInactiveEventFired() || batchFlushEndPointContext.hasRetryableFailedToSendTasks()) { @@ -637,11 +639,11 @@ private void loopSend(final ContextualChannel chan) { } LettuceAssert.assertState(channel == chan, "unexpected: channel not match but closeStatus == null"); - loopSend0(batchFlushEndPointContext, chan, writeSpinCount, false); + loopSend0(batchFlushEndPointContext, chan, writeSpinCount, entered); } private void loopSend0(final BatchFlushEndPointContext batchFlushEndPointContext, final ContextualChannel chan, - int remainingSpinnCount, final boolean exited) { + int remainingSpinnCount, final boolean entered) { do { final int count = pollBatch(batchFlushEndPointContext, chan); if (count < 0) { @@ -655,16 +657,16 @@ private void loopSend0(final BatchFlushEndPointContext batchFlushEndPointContext if (remainingSpinnCount <= 0) { // Don't need to exitUnsafe since we still have an ongoing consume tasks in this thread. - chan.eventLoop().execute(() -> loopSend(chan)); + chan.eventLoop().execute(() -> loopSend(chan, entered)); return; } - if (!exited) { + if (entered) { // The send loop will be triggered later when a new task is added, // // Don't setUnsafe here because loopSend0() may result in a delayed loopSend() call. batchFlushEndPointContext.hasOngoingSendLoop.exit(); // // Guarantee thread-safety: no dangling tasks in the queue. - loopSend0(batchFlushEndPointContext, chan, remainingSpinnCount, true); + loopSend0(batchFlushEndPointContext, chan, remainingSpinnCount, false); // chan.eventLoop().schedule(() -> loopSend0(batchFlushEndPointContext, chan, writeSpinCount, false), 100, // TimeUnit.NANOSECONDS); }