From fb65b0a132c1c3bae1999cb7f49371d8ae4533ca Mon Sep 17 00:00:00 2001 From: "xiaofan.chen" Date: Thu, 8 Aug 2024 13:55:59 +0800 Subject: [PATCH] refactor: remove tryEnterUnsafe/exitUnsafe --- .../context/BatchFlushEndPointContext.java | 21 ++------------- .../protocol/DefaultBatchFlushEndpoint.java | 26 +++++-------------- 2 files changed, 9 insertions(+), 38 deletions(-) diff --git a/src/main/java/io/lettuce/core/context/BatchFlushEndPointContext.java b/src/main/java/io/lettuce/core/context/BatchFlushEndPointContext.java index c7dff147bd..1939c62a55 100644 --- a/src/main/java/io/lettuce/core/context/BatchFlushEndPointContext.java +++ b/src/main/java/io/lettuce/core/context/BatchFlushEndPointContext.java @@ -41,31 +41,14 @@ public HasOngoingSendLoop() { * * @return true if entered the loop, false if already have a running loop. */ - public boolean tryEnterSafeGetVolatile() { + public boolean tryEnter() { return safe.get() == 0 && /* rare case if QPS is high */ safe.compareAndSet(0, 1); } - public void exitSafe() { + public void exit() { safe.set(0); } - /** - * This method is not thread safe, can only be used from single thread. - * - * @return true if the value was updated - */ - public boolean tryEnterUnsafe() { - if (unsafe) { - return false; - } - unsafe = true; - return true; - } - - public void exitUnsafe() { - unsafe = false; - } - } BatchFlushEndPointContext() { diff --git a/src/main/java/io/lettuce/core/protocol/DefaultBatchFlushEndpoint.java b/src/main/java/io/lettuce/core/protocol/DefaultBatchFlushEndpoint.java index 5f1132d8e3..0ca5739bf6 100644 --- a/src/main/java/io/lettuce/core/protocol/DefaultBatchFlushEndpoint.java +++ b/src/main/java/io/lettuce/core/protocol/DefaultBatchFlushEndpoint.java @@ -601,8 +601,8 @@ private void scheduleSendJobOnConnected(final ContextualChannel chan) { LettuceAssert.assertState(chan.eventLoop().inEventLoop(), "must be called in event loop thread"); // Schedule directly - if (chan.context.batchFlushEndPointContext.hasOngoingSendLoop.tryEnterSafeGetVolatile()) { - scheduleSendJobInEventLoopIfNeeded(chan); + if (chan.context.batchFlushEndPointContext.hasOngoingSendLoop.tryEnter()) { + loopSend(chan); } // Otherwise: // someone will do the job for us @@ -610,7 +610,7 @@ private void scheduleSendJobOnConnected(final ContextualChannel chan) { private void scheduleSendJobIfNeeded(final ContextualChannel chan) { final EventLoop eventLoop = chan.eventLoop(); - if (chan.context.batchFlushEndPointContext.hasOngoingSendLoop.tryEnterSafeGetVolatile()) { + if (chan.context.batchFlushEndPointContext.hasOngoingSendLoop.tryEnter()) { // Benchmark result of using tryEnterSafeGetVolatile() or not (1 thread, async get): // 1. uses tryEnterSafeGetVolatile() to avoid unnecessary eventLoop.execute() calls // Avg latency: 3.2956217278663s @@ -618,7 +618,7 @@ private void scheduleSendJobIfNeeded(final ContextualChannel chan) { // 2. uses eventLoop.execute() directly // Avg latency: 3.2677197021496998s // Avg QPS: 476925.0751855796/s - eventLoop.execute(() -> scheduleSendJobInEventLoopIfNeeded(chan)); + eventLoop.execute(() -> loopSend(chan)); } // Otherwise: @@ -629,16 +629,6 @@ private void scheduleSendJobIfNeeded(final ContextualChannel chan) { // second loopSend0(), which will call poll() } - private void scheduleSendJobInEventLoopIfNeeded(final ContextualChannel chan) { - // Guarantee only 1 send loop. - BatchFlushEndPointContext.HasOngoingSendLoop hasOngoingSendLoop = chan.context.batchFlushEndPointContext.hasOngoingSendLoop; - if (hasOngoingSendLoop.tryEnterUnsafe()) { - loopSend(chan); - } else { - hasOngoingSendLoop.exitSafe(); - } - } - private void loopSend(final ContextualChannel chan) { final ConnectionContext connectionContext = chan.context; final BatchFlushEndPointContext batchFlushEndPointContext = connectionContext.batchFlushEndPointContext; @@ -651,7 +641,7 @@ private void loopSend(final ContextualChannel chan) { } private void loopSend0(final BatchFlushEndPointContext batchFlushEndPointContext, final ContextualChannel chan, - int remainingSpinnCount, final boolean exitedSafe) { + int remainingSpinnCount, final boolean exited) { do { final int count = pollBatch(batchFlushEndPointContext, chan); if (count < 0) { @@ -669,12 +659,10 @@ private void loopSend0(final BatchFlushEndPointContext batchFlushEndPointContext return; } - if (exitedSafe) { + if (!exited) { // The send loop will be triggered later when a new task is added, - batchFlushEndPointContext.hasOngoingSendLoop.exitUnsafe(); - } else { // // Don't setUnsafe here because loopSend0() may result in a delayed loopSend() call. - batchFlushEndPointContext.hasOngoingSendLoop.exitSafe(); + batchFlushEndPointContext.hasOngoingSendLoop.exit(); // // Guarantee thread-safety: no dangling tasks in the queue. loopSend0(batchFlushEndPointContext, chan, remainingSpinnCount, true); // chan.eventLoop().schedule(() -> loopSend0(batchFlushEndPointContext, chan, writeSpinCount, false), 100,