From 977b33ab3485e2ab1db1b0ce68e4c14aec709ddd Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Mon, 20 Jan 2025 22:10:58 +0800 Subject: [PATCH] [fix][broker]Fix repeated acquire pending reads quota --- .../impl/cache/PendingReadsManager.java | 8 +- .../impl/cache/RangeEntryCacheImpl.java | 26 +- .../InflightReadsLimiterIntegrationTest.java | 231 ++++++++++++++++++ .../impl/cache/PendingReadsManagerTest.java | 2 +- 4 files changed, 251 insertions(+), 16 deletions(-) create mode 100644 managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/InflightReadsLimiterIntegrationTest.java diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManager.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManager.java index 8b2f3e25f1cbb..d733b54dd1304 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManager.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManager.java @@ -362,7 +362,7 @@ public void readEntriesFailed(ManagedLedgerException exception, }; rangeEntryCache.asyncReadEntry0(lh, missingOnRight.startEntry, missingOnRight.endEntry, - shouldCacheEntry, readFromRightCallback, null); + shouldCacheEntry, readFromRightCallback, null, false); } @Override @@ -372,7 +372,7 @@ public void readEntriesFailed(ManagedLedgerException exception, Object dummyCtx4 } }; rangeEntryCache.asyncReadEntry0(lh, missingOnLeft.startEntry, missingOnLeft.endEntry, - shouldCacheEntry, readFromLeftCallback, null); + shouldCacheEntry, readFromLeftCallback, null, false); } else if (missingOnLeft != null) { AsyncCallbacks.ReadEntriesCallback readFromLeftCallback = new AsyncCallbacks.ReadEntriesCallback() { @@ -395,7 +395,7 @@ public void readEntriesFailed(ManagedLedgerException exception, } }; rangeEntryCache.asyncReadEntry0(lh, missingOnLeft.startEntry, missingOnLeft.endEntry, - shouldCacheEntry, readFromLeftCallback, null); + shouldCacheEntry, readFromLeftCallback, null, false); } else if (missingOnRight != null) { AsyncCallbacks.ReadEntriesCallback readFromRightCallback = new AsyncCallbacks.ReadEntriesCallback() { @@ -418,7 +418,7 @@ public void readEntriesFailed(ManagedLedgerException exception, } }; rangeEntryCache.asyncReadEntry0(lh, missingOnRight.startEntry, missingOnRight.endEntry, - shouldCacheEntry, readFromRightCallback, null); + shouldCacheEntry, readFromRightCallback, null, false); } } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java index cb006a5f0cea9..d52fc8535b55b 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java @@ -57,7 +57,7 @@ public class RangeEntryCacheImpl implements EntryCache { /** * Overhead per-entry to take into account the envelope. */ - private static final long BOOKKEEPER_READ_OVERHEAD_PER_ENTRY = 64; + public static final long BOOKKEEPER_READ_OVERHEAD_PER_ENTRY = 64; private final RangeEntryCacheManagerImpl manager; final ManagedLedgerImpl ml; @@ -102,7 +102,7 @@ public String getName() { } @VisibleForTesting - InflightReadsLimiter getPendingReadsLimiter() { + public InflightReadsLimiter getPendingReadsLimiter() { return manager.getInflightReadsLimiter(); } @@ -282,7 +282,7 @@ private void asyncReadEntry0(ReadHandle lh, Position position, final ReadEntryCa public void asyncReadEntry(ReadHandle lh, long firstEntry, long lastEntry, boolean shouldCacheEntry, final ReadEntriesCallback callback, Object ctx) { try { - asyncReadEntry0(lh, firstEntry, lastEntry, shouldCacheEntry, callback, ctx); + asyncReadEntry0(lh, firstEntry, lastEntry, shouldCacheEntry, callback, ctx, true); } catch (Throwable t) { log.warn("failed to read entries for {}--{}-{}", lh.getId(), firstEntry, lastEntry, t); // invalidate all entries related to ledger from the cache (it might happen if entry gets corrupt @@ -295,16 +295,20 @@ public void asyncReadEntry(ReadHandle lh, long firstEntry, long lastEntry, boole @SuppressWarnings({ "unchecked", "rawtypes" }) void asyncReadEntry0(ReadHandle lh, long firstEntry, long lastEntry, boolean shouldCacheEntry, - final ReadEntriesCallback callback, Object ctx) { - asyncReadEntry0WithLimits(lh, firstEntry, lastEntry, shouldCacheEntry, callback, ctx, null); + final ReadEntriesCallback callback, Object ctx, boolean withLimits) { + asyncReadEntry0WithLimits(lh, firstEntry, lastEntry, shouldCacheEntry, callback, ctx, null, withLimits); } void asyncReadEntry0WithLimits(ReadHandle lh, long firstEntry, long lastEntry, boolean shouldCacheEntry, - final ReadEntriesCallback originalCallback, Object ctx, InflightReadsLimiter.Handle handle) { - - final AsyncCallbacks.ReadEntriesCallback callback = - handlePendingReadsLimits(lh, firstEntry, lastEntry, shouldCacheEntry, - originalCallback, ctx, handle); + final ReadEntriesCallback originalCallback, Object ctx, InflightReadsLimiter.Handle handle, + boolean withLimits) { + AsyncCallbacks.ReadEntriesCallback callback; + if (withLimits) { + callback = handlePendingReadsLimits(lh, firstEntry, lastEntry, shouldCacheEntry, originalCallback, ctx, + handle); + } else { + callback = originalCallback; + } if (callback == null) { return; } @@ -382,7 +386,7 @@ private AsyncCallbacks.ReadEntriesCallback handlePendingReadsLimits(ReadHandle l } ml.getExecutor().execute(() -> { asyncReadEntry0WithLimits(lh, firstEntry, lastEntry, shouldCacheEntry, - originalCallback, ctx, newHandle); + originalCallback, ctx, newHandle, true); }); return null; } else { diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/InflightReadsLimiterIntegrationTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/InflightReadsLimiterIntegrationTest.java new file mode 100644 index 0000000000000..b57dea6a5bb4d --- /dev/null +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/InflightReadsLimiterIntegrationTest.java @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.mledger.impl; + +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.doAnswer; +import io.netty.util.concurrent.DefaultThreadFactory; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.client.LedgerHandle; +import org.apache.bookkeeper.client.api.LedgerEntries; +import org.apache.bookkeeper.mledger.AsyncCallbacks; +import org.apache.bookkeeper.mledger.Entry; +import org.apache.bookkeeper.mledger.ManagedLedgerConfig; +import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig; +import org.apache.bookkeeper.mledger.impl.cache.InflightReadsLimiter; +import org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheImpl; +import org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheManagerImpl; +import org.apache.bookkeeper.test.MockedBookKeeperTestCase; +import org.awaitility.Awaitility; +import org.awaitility.reflect.WhiteboxImpl; +import org.mockito.Mockito; +import org.mockito.stubbing.Answer; +import org.testng.Assert; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +@Slf4j +public class InflightReadsLimiterIntegrationTest extends MockedBookKeeperTestCase { + + @DataProvider + public Object[][] readMissingCases() { + return new Object[][]{ + {"missRight"}, + {"missLeft"}, + {"bothMiss"} + }; + } + + @Test(dataProvider = "readMissingCases") + public void testPreciseLimitation(String missingCase) throws Exception { + final long start1 = 50; + final long start2 = "missLeft".endsWith(missingCase) || "bothMiss".equals(missingCase) ? 30 : 50; + final long end1 = 99; + final long end2 = "missRight".endsWith(missingCase) || "bothMiss".equals(missingCase) ? 109 : 99; + final HashSet secondReadEntries = new HashSet<>(); + if (start2 < start1) { + secondReadEntries.add(start2); + } + if (end2 > end1) { + secondReadEntries.add(end1 + 1); + } + final int readCount1 = (int) (end1 - start1 + 1); + final int readCount2 = (int) (end2 - start2 + 1); + + final DefaultThreadFactory threadFactory = new DefaultThreadFactory(UUID.randomUUID().toString()); + final ManagedLedgerConfig config = new ManagedLedgerConfig(); + config.setMaxEntriesPerLedger(100000); + ManagedLedgerFactoryConfig factoryConfig = new ManagedLedgerFactoryConfig(); + factoryConfig.setCacheEvictionIntervalMs(3600 * 1000); + factoryConfig.setManagedLedgerMaxReadsInFlightSize(1000_000); + final ManagedLedgerFactoryImpl factory = new ManagedLedgerFactoryImpl(metadataStore, bkc, factoryConfig); + final ManagedLedgerImpl ml = (ManagedLedgerImpl) factory.open("my_test_ledger", config); + final RangeEntryCacheImpl entryCache = (RangeEntryCacheImpl) ml.entryCache; + final RangeEntryCacheManagerImpl rangeEntryCacheManager = + (RangeEntryCacheManagerImpl) factory.getEntryCacheManager(); + final InflightReadsLimiter limiter = rangeEntryCacheManager.getInflightReadsLimiter(); + final long totalCapacity =limiter.getRemainingBytes(); + // final ManagedCursorImpl c1 = (ManagedCursorImpl) ml.openCursor("c1"); + for (byte i = 1; i < 127; i++) { + log.info("add entry: " + i); + ml.addEntry(new byte[]{i}); + } + // Evict cached entries. + entryCache.evictEntries(ml.currentLedgerSize); + Assert.assertEquals(entryCache.getSize(), 0); + + CountDownLatch readCompleteSignal1 = new CountDownLatch(1); + CountDownLatch readCompleteSignal2 = new CountDownLatch(1); + CountDownLatch firstReadingStarted = new CountDownLatch(1); + LedgerHandle currentLedger = ml.currentLedger; + LedgerHandle spyCurrentLedger = Mockito.spy(currentLedger); + ml.currentLedger = spyCurrentLedger; + Answer answer = invocation -> { + long firstEntry = (long) invocation.getArguments()[0]; + log.info("reading entry: {}", firstEntry); + if (firstEntry == start1) { + // Wait 3s to make + firstReadingStarted.countDown(); + readCompleteSignal1.await(); + Object res = invocation.callRealMethod(); + return res; + } else if(secondReadEntries.contains(firstEntry)) { + final CompletableFuture res = new CompletableFuture<>(); + threadFactory.newThread(() -> { + try { + readCompleteSignal2.await(); + CompletableFuture future = + (CompletableFuture) invocation.callRealMethod(); + future.thenAccept(v -> { + res.complete(v); + }).exceptionally(ex -> { + res.completeExceptionally(ex); + return null; + }); + } catch (Throwable ex) { + res.completeExceptionally(ex); + } + }).start(); + return res; + } else { + return invocation.callRealMethod(); + } + }; + doAnswer(answer).when(spyCurrentLedger).readAsync(anyLong(), anyLong()); + doAnswer(answer).when(spyCurrentLedger).readUnconfirmedAsync(anyLong(), anyLong()); + + // Initialize "entryCache.estimatedEntrySize" to the correct value. + Object ctx = new Object(); + SimpleReadEntriesCallback cb0 = new SimpleReadEntriesCallback(); + entryCache.asyncReadEntry(spyCurrentLedger, 125, 125, true, cb0, ctx); + cb0.entries.join(); + Long sizePerEntry1 = WhiteboxImpl.getInternalState(entryCache, "estimatedEntrySize"); + Assert.assertEquals(sizePerEntry1, 1); + Awaitility.await().untilAsserted(() -> { + long remainingBytes =limiter.getRemainingBytes(); + Assert.assertEquals(remainingBytes, totalCapacity); + }); + log.info("remainingBytes 0: {}", limiter.getRemainingBytes()); + + // Concurrency reading. + + SimpleReadEntriesCallback cb1 = new SimpleReadEntriesCallback(); + SimpleReadEntriesCallback cb2 = new SimpleReadEntriesCallback(); + threadFactory.newThread(() -> { + entryCache.asyncReadEntry(spyCurrentLedger, start1, end1, true, cb1, ctx); + }).start(); + threadFactory.newThread(() -> { + try { + firstReadingStarted.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + entryCache.asyncReadEntry(spyCurrentLedger, start2, end2, true, cb2, ctx); + }).start(); + + long bytesAcquired1 = calculateBytesSizeBeforeFirstReading(readCount1 + readCount2, 1); + long remainingBytesExpected1 = totalCapacity - bytesAcquired1; + log.info("acquired : {}", bytesAcquired1); + log.info("remainingBytesExpected 0 : {}", remainingBytesExpected1); + Awaitility.await().untilAsserted(() -> { + log.info("remainingBytes 0: {}", limiter.getRemainingBytes()); + Assert.assertEquals(limiter.getRemainingBytes(), remainingBytesExpected1); + }); + + // Complete the read1. + Thread.sleep(3000); + readCompleteSignal1.countDown(); + cb1.entries.join(); + Long sizePerEntry2 = WhiteboxImpl.getInternalState(entryCache, "estimatedEntrySize"); + Assert.assertEquals(sizePerEntry2, 1); + long bytesAcquired2 = calculateBytesSizeBeforeFirstReading(readCount2, 1); + long remainingBytesExpected2 = totalCapacity - bytesAcquired2; + log.info("acquired : {}", bytesAcquired2); + log.info("remainingBytesExpected 1: {}", remainingBytesExpected2); + Awaitility.await().untilAsserted(() -> { + log.info("remainingBytes 1: {}", limiter.getRemainingBytes()); + Assert.assertEquals(limiter.getRemainingBytes(), remainingBytesExpected2); + }); + + readCompleteSignal2.countDown(); + cb2.entries.join(); + Long sizePerEntry3 = WhiteboxImpl.getInternalState(entryCache, "estimatedEntrySize"); + Assert.assertEquals(sizePerEntry3, 1); + Awaitility.await().untilAsserted(() -> { + long remainingBytes = limiter.getRemainingBytes(); + log.info("remainingBytes 2: {}", remainingBytes); + Assert.assertEquals(remainingBytes, totalCapacity); + }); + // cleanup + ml.delete(); + factory.shutdown(); + } + + private long calculateBytesSizeBeforeFirstReading(int entriesCount, int perEntrySize) { + return entriesCount * (perEntrySize + RangeEntryCacheImpl.BOOKKEEPER_READ_OVERHEAD_PER_ENTRY); + } + + class SimpleReadEntriesCallback implements AsyncCallbacks.ReadEntriesCallback { + + CompletableFuture> entries = new CompletableFuture<>(); + + @Override + public void readEntriesComplete(List entriesRead, Object ctx) { + List list = new ArrayList<>(entriesRead.size()); + for (Entry entry : entriesRead) { + byte b = entry.getDataBuffer().readByte(); + list.add(b); + entry.release(); + } + this.entries.complete(list); + } + + @Override + public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { + this.entries.completeExceptionally(exception); + } + } +} diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManagerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManagerTest.java index 01976f648aba4..383568c17e83d 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManagerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManagerTest.java @@ -108,7 +108,7 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable { return null; } }).when(rangeEntryCache).asyncReadEntry0(any(), anyLong(), anyLong(), - anyBoolean(), any(), any()); + anyBoolean(), any(), any(), anyBoolean()); lh = mock(ReadHandle.class); ml = mock(ManagedLedgerImpl.class);