Skip to content

Commit d48c0a6

Browse files
poorbarcodenikhil-ctds
authored andcommitted
[fix][broker] Fix repeatedly acquired pending reads quota (apache#23869)
(cherry picked from commit 331a997) (cherry picked from commit 0132d93)
1 parent 159b721 commit d48c0a6

File tree

4 files changed

+251
-16
lines changed

4 files changed

+251
-16
lines changed

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManager.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -362,7 +362,7 @@ public void readEntriesFailed(ManagedLedgerException exception,
362362
};
363363
rangeEntryCache.asyncReadEntry0(lh,
364364
missingOnRight.startEntry, missingOnRight.endEntry,
365-
shouldCacheEntry, readFromRightCallback, null);
365+
shouldCacheEntry, readFromRightCallback, null, false);
366366
}
367367

368368
@Override
@@ -372,7 +372,7 @@ public void readEntriesFailed(ManagedLedgerException exception, Object dummyCtx4
372372
}
373373
};
374374
rangeEntryCache.asyncReadEntry0(lh, missingOnLeft.startEntry, missingOnLeft.endEntry,
375-
shouldCacheEntry, readFromLeftCallback, null);
375+
shouldCacheEntry, readFromLeftCallback, null, false);
376376
} else if (missingOnLeft != null) {
377377
AsyncCallbacks.ReadEntriesCallback readFromLeftCallback =
378378
new AsyncCallbacks.ReadEntriesCallback() {
@@ -395,7 +395,7 @@ public void readEntriesFailed(ManagedLedgerException exception,
395395
}
396396
};
397397
rangeEntryCache.asyncReadEntry0(lh, missingOnLeft.startEntry, missingOnLeft.endEntry,
398-
shouldCacheEntry, readFromLeftCallback, null);
398+
shouldCacheEntry, readFromLeftCallback, null, false);
399399
} else if (missingOnRight != null) {
400400
AsyncCallbacks.ReadEntriesCallback readFromRightCallback =
401401
new AsyncCallbacks.ReadEntriesCallback() {
@@ -418,7 +418,7 @@ public void readEntriesFailed(ManagedLedgerException exception,
418418
}
419419
};
420420
rangeEntryCache.asyncReadEntry0(lh, missingOnRight.startEntry, missingOnRight.endEntry,
421-
shouldCacheEntry, readFromRightCallback, null);
421+
shouldCacheEntry, readFromRightCallback, null, false);
422422
}
423423
}
424424

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java

+15-11
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ public class RangeEntryCacheImpl implements EntryCache {
5656
/**
5757
* Overhead per-entry to take into account the envelope.
5858
*/
59-
private static final long BOOKKEEPER_READ_OVERHEAD_PER_ENTRY = 64;
59+
public static final long BOOKKEEPER_READ_OVERHEAD_PER_ENTRY = 64;
6060

6161
private final RangeEntryCacheManagerImpl manager;
6262
final ManagedLedgerImpl ml;
@@ -101,7 +101,7 @@ public String getName() {
101101
}
102102

103103
@VisibleForTesting
104-
InflightReadsLimiter getPendingReadsLimiter() {
104+
public InflightReadsLimiter getPendingReadsLimiter() {
105105
return manager.getInflightReadsLimiter();
106106
}
107107

@@ -281,7 +281,7 @@ private void asyncReadEntry0(ReadHandle lh, PositionImpl position, final ReadEnt
281281
public void asyncReadEntry(ReadHandle lh, long firstEntry, long lastEntry, boolean shouldCacheEntry,
282282
final ReadEntriesCallback callback, Object ctx) {
283283
try {
284-
asyncReadEntry0(lh, firstEntry, lastEntry, shouldCacheEntry, callback, ctx);
284+
asyncReadEntry0(lh, firstEntry, lastEntry, shouldCacheEntry, callback, ctx, true);
285285
} catch (Throwable t) {
286286
log.warn("failed to read entries for {}--{}-{}", lh.getId(), firstEntry, lastEntry, t);
287287
// invalidate all entries related to ledger from the cache (it might happen if entry gets corrupt
@@ -294,16 +294,20 @@ public void asyncReadEntry(ReadHandle lh, long firstEntry, long lastEntry, boole
294294

295295
@SuppressWarnings({ "unchecked", "rawtypes" })
296296
void asyncReadEntry0(ReadHandle lh, long firstEntry, long lastEntry, boolean shouldCacheEntry,
297-
final ReadEntriesCallback callback, Object ctx) {
298-
asyncReadEntry0WithLimits(lh, firstEntry, lastEntry, shouldCacheEntry, callback, ctx, null);
297+
final ReadEntriesCallback callback, Object ctx, boolean withLimits) {
298+
asyncReadEntry0WithLimits(lh, firstEntry, lastEntry, shouldCacheEntry, callback, ctx, null, withLimits);
299299
}
300300

301301
void asyncReadEntry0WithLimits(ReadHandle lh, long firstEntry, long lastEntry, boolean shouldCacheEntry,
302-
final ReadEntriesCallback originalCallback, Object ctx, InflightReadsLimiter.Handle handle) {
303-
304-
final AsyncCallbacks.ReadEntriesCallback callback =
305-
handlePendingReadsLimits(lh, firstEntry, lastEntry, shouldCacheEntry,
306-
originalCallback, ctx, handle);
302+
final ReadEntriesCallback originalCallback, Object ctx, InflightReadsLimiter.Handle handle,
303+
boolean withLimits) {
304+
AsyncCallbacks.ReadEntriesCallback callback;
305+
if (withLimits) {
306+
callback = handlePendingReadsLimits(lh, firstEntry, lastEntry, shouldCacheEntry, originalCallback, ctx,
307+
handle);
308+
} else {
309+
callback = originalCallback;
310+
}
307311
if (callback == null) {
308312
return;
309313
}
@@ -381,7 +385,7 @@ private AsyncCallbacks.ReadEntriesCallback handlePendingReadsLimits(ReadHandle l
381385
}
382386
ml.getExecutor().execute(() -> {
383387
asyncReadEntry0WithLimits(lh, firstEntry, lastEntry, shouldCacheEntry,
384-
originalCallback, ctx, newHandle);
388+
originalCallback, ctx, newHandle, true);
385389
});
386390
return null;
387391
} else {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,231 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.bookkeeper.mledger.impl;
20+
21+
import static org.mockito.ArgumentMatchers.anyLong;
22+
import static org.mockito.Mockito.doAnswer;
23+
import io.netty.util.concurrent.DefaultThreadFactory;
24+
import java.util.ArrayList;
25+
import java.util.HashSet;
26+
import java.util.List;
27+
import java.util.UUID;
28+
import java.util.concurrent.CompletableFuture;
29+
import java.util.concurrent.CountDownLatch;
30+
import lombok.extern.slf4j.Slf4j;
31+
import org.apache.bookkeeper.client.LedgerHandle;
32+
import org.apache.bookkeeper.client.api.LedgerEntries;
33+
import org.apache.bookkeeper.mledger.AsyncCallbacks;
34+
import org.apache.bookkeeper.mledger.Entry;
35+
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
36+
import org.apache.bookkeeper.mledger.ManagedLedgerException;
37+
import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
38+
import org.apache.bookkeeper.mledger.impl.cache.InflightReadsLimiter;
39+
import org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheImpl;
40+
import org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheManagerImpl;
41+
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
42+
import org.awaitility.Awaitility;
43+
import org.awaitility.reflect.WhiteboxImpl;
44+
import org.mockito.Mockito;
45+
import org.mockito.stubbing.Answer;
46+
import org.testng.Assert;
47+
import org.testng.annotations.DataProvider;
48+
import org.testng.annotations.Test;
49+
50+
@Slf4j
51+
public class InflightReadsLimiterIntegrationTest extends MockedBookKeeperTestCase {
52+
53+
@DataProvider
54+
public Object[][] readMissingCases() {
55+
return new Object[][]{
56+
{"missRight"},
57+
{"missLeft"},
58+
{"bothMiss"}
59+
};
60+
}
61+
62+
@Test(dataProvider = "readMissingCases")
63+
public void testPreciseLimitation(String missingCase) throws Exception {
64+
final long start1 = 50;
65+
final long start2 = "missLeft".endsWith(missingCase) || "bothMiss".equals(missingCase) ? 30 : 50;
66+
final long end1 = 99;
67+
final long end2 = "missRight".endsWith(missingCase) || "bothMiss".equals(missingCase) ? 109 : 99;
68+
final HashSet<Long> secondReadEntries = new HashSet<>();
69+
if (start2 < start1) {
70+
secondReadEntries.add(start2);
71+
}
72+
if (end2 > end1) {
73+
secondReadEntries.add(end1 + 1);
74+
}
75+
final int readCount1 = (int) (end1 - start1 + 1);
76+
final int readCount2 = (int) (end2 - start2 + 1);
77+
78+
final DefaultThreadFactory threadFactory = new DefaultThreadFactory(UUID.randomUUID().toString());
79+
final ManagedLedgerConfig config = new ManagedLedgerConfig();
80+
config.setMaxEntriesPerLedger(100000);
81+
ManagedLedgerFactoryConfig factoryConfig = new ManagedLedgerFactoryConfig();
82+
factoryConfig.setCacheEvictionIntervalMs(3600 * 1000);
83+
factoryConfig.setManagedLedgerMaxReadsInFlightSize(1000_000);
84+
final ManagedLedgerFactoryImpl factory = new ManagedLedgerFactoryImpl(metadataStore, bkc, factoryConfig);
85+
final ManagedLedgerImpl ml = (ManagedLedgerImpl) factory.open("my_test_ledger", config);
86+
final RangeEntryCacheImpl entryCache = (RangeEntryCacheImpl) ml.entryCache;
87+
final RangeEntryCacheManagerImpl rangeEntryCacheManager =
88+
(RangeEntryCacheManagerImpl) factory.getEntryCacheManager();
89+
final InflightReadsLimiter limiter = rangeEntryCacheManager.getInflightReadsLimiter();
90+
final long totalCapacity =limiter.getRemainingBytes();
91+
// final ManagedCursorImpl c1 = (ManagedCursorImpl) ml.openCursor("c1");
92+
for (byte i = 1; i < 127; i++) {
93+
log.info("add entry: " + i);
94+
ml.addEntry(new byte[]{i});
95+
}
96+
// Evict cached entries.
97+
entryCache.evictEntries(ml.currentLedgerSize);
98+
Assert.assertEquals(entryCache.getSize(), 0);
99+
100+
CountDownLatch readCompleteSignal1 = new CountDownLatch(1);
101+
CountDownLatch readCompleteSignal2 = new CountDownLatch(1);
102+
CountDownLatch firstReadingStarted = new CountDownLatch(1);
103+
LedgerHandle currentLedger = ml.currentLedger;
104+
LedgerHandle spyCurrentLedger = Mockito.spy(currentLedger);
105+
ml.currentLedger = spyCurrentLedger;
106+
Answer answer = invocation -> {
107+
long firstEntry = (long) invocation.getArguments()[0];
108+
log.info("reading entry: {}", firstEntry);
109+
if (firstEntry == start1) {
110+
// Wait 3s to make
111+
firstReadingStarted.countDown();
112+
readCompleteSignal1.await();
113+
Object res = invocation.callRealMethod();
114+
return res;
115+
} else if(secondReadEntries.contains(firstEntry)) {
116+
final CompletableFuture res = new CompletableFuture<>();
117+
threadFactory.newThread(() -> {
118+
try {
119+
readCompleteSignal2.await();
120+
CompletableFuture<LedgerEntries> future =
121+
(CompletableFuture<LedgerEntries>) invocation.callRealMethod();
122+
future.thenAccept(v -> {
123+
res.complete(v);
124+
}).exceptionally(ex -> {
125+
res.completeExceptionally(ex);
126+
return null;
127+
});
128+
} catch (Throwable ex) {
129+
res.completeExceptionally(ex);
130+
}
131+
}).start();
132+
return res;
133+
} else {
134+
return invocation.callRealMethod();
135+
}
136+
};
137+
doAnswer(answer).when(spyCurrentLedger).readAsync(anyLong(), anyLong());
138+
doAnswer(answer).when(spyCurrentLedger).readUnconfirmedAsync(anyLong(), anyLong());
139+
140+
// Initialize "entryCache.estimatedEntrySize" to the correct value.
141+
Object ctx = new Object();
142+
SimpleReadEntriesCallback cb0 = new SimpleReadEntriesCallback();
143+
entryCache.asyncReadEntry(spyCurrentLedger, 125, 125, true, cb0, ctx);
144+
cb0.entries.join();
145+
Long sizePerEntry1 = WhiteboxImpl.getInternalState(entryCache, "estimatedEntrySize");
146+
Assert.assertEquals(sizePerEntry1, 1);
147+
Awaitility.await().untilAsserted(() -> {
148+
long remainingBytes =limiter.getRemainingBytes();
149+
Assert.assertEquals(remainingBytes, totalCapacity);
150+
});
151+
log.info("remainingBytes 0: {}", limiter.getRemainingBytes());
152+
153+
// Concurrency reading.
154+
155+
SimpleReadEntriesCallback cb1 = new SimpleReadEntriesCallback();
156+
SimpleReadEntriesCallback cb2 = new SimpleReadEntriesCallback();
157+
threadFactory.newThread(() -> {
158+
entryCache.asyncReadEntry(spyCurrentLedger, start1, end1, true, cb1, ctx);
159+
}).start();
160+
threadFactory.newThread(() -> {
161+
try {
162+
firstReadingStarted.await();
163+
} catch (InterruptedException e) {
164+
throw new RuntimeException(e);
165+
}
166+
entryCache.asyncReadEntry(spyCurrentLedger, start2, end2, true, cb2, ctx);
167+
}).start();
168+
169+
long bytesAcquired1 = calculateBytesSizeBeforeFirstReading(readCount1 + readCount2, 1);
170+
long remainingBytesExpected1 = totalCapacity - bytesAcquired1;
171+
log.info("acquired : {}", bytesAcquired1);
172+
log.info("remainingBytesExpected 0 : {}", remainingBytesExpected1);
173+
Awaitility.await().untilAsserted(() -> {
174+
log.info("remainingBytes 0: {}", limiter.getRemainingBytes());
175+
Assert.assertEquals(limiter.getRemainingBytes(), remainingBytesExpected1);
176+
});
177+
178+
// Complete the read1.
179+
Thread.sleep(3000);
180+
readCompleteSignal1.countDown();
181+
cb1.entries.join();
182+
Long sizePerEntry2 = WhiteboxImpl.getInternalState(entryCache, "estimatedEntrySize");
183+
Assert.assertEquals(sizePerEntry2, 1);
184+
long bytesAcquired2 = calculateBytesSizeBeforeFirstReading(readCount2, 1);
185+
long remainingBytesExpected2 = totalCapacity - bytesAcquired2;
186+
log.info("acquired : {}", bytesAcquired2);
187+
log.info("remainingBytesExpected 1: {}", remainingBytesExpected2);
188+
Awaitility.await().untilAsserted(() -> {
189+
log.info("remainingBytes 1: {}", limiter.getRemainingBytes());
190+
Assert.assertEquals(limiter.getRemainingBytes(), remainingBytesExpected2);
191+
});
192+
193+
readCompleteSignal2.countDown();
194+
cb2.entries.join();
195+
Long sizePerEntry3 = WhiteboxImpl.getInternalState(entryCache, "estimatedEntrySize");
196+
Assert.assertEquals(sizePerEntry3, 1);
197+
Awaitility.await().untilAsserted(() -> {
198+
long remainingBytes = limiter.getRemainingBytes();
199+
log.info("remainingBytes 2: {}", remainingBytes);
200+
Assert.assertEquals(remainingBytes, totalCapacity);
201+
});
202+
// cleanup
203+
ml.delete();
204+
factory.shutdown();
205+
}
206+
207+
private long calculateBytesSizeBeforeFirstReading(int entriesCount, int perEntrySize) {
208+
return entriesCount * (perEntrySize + RangeEntryCacheImpl.BOOKKEEPER_READ_OVERHEAD_PER_ENTRY);
209+
}
210+
211+
class SimpleReadEntriesCallback implements AsyncCallbacks.ReadEntriesCallback {
212+
213+
CompletableFuture<List<Byte>> entries = new CompletableFuture<>();
214+
215+
@Override
216+
public void readEntriesComplete(List<Entry> entriesRead, Object ctx) {
217+
List<Byte> list = new ArrayList<>(entriesRead.size());
218+
for (Entry entry : entriesRead) {
219+
byte b = entry.getDataBuffer().readByte();
220+
list.add(b);
221+
entry.release();
222+
}
223+
this.entries.complete(list);
224+
}
225+
226+
@Override
227+
public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
228+
this.entries.completeExceptionally(exception);
229+
}
230+
}
231+
}

managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManagerTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
110110
return null;
111111
}
112112
}).when(rangeEntryCache).asyncReadEntry0(any(), anyLong(), anyLong(),
113-
anyBoolean(), any(), any());
113+
anyBoolean(), any(), any(), anyBoolean());
114114

115115
lh = mock(ReadHandle.class);
116116
ml = mock(ManagedLedgerImpl.class);

0 commit comments

Comments
 (0)