Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker] Fix repeatedly acquired pending reads quota #23869

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ public void readEntriesFailed(ManagedLedgerException exception,
};
rangeEntryCache.asyncReadEntry0(lh,
missingOnRight.startEntry, missingOnRight.endEntry,
shouldCacheEntry, readFromRightCallback, null);
shouldCacheEntry, readFromRightCallback, null, false);
}

@Override
Expand All @@ -372,7 +372,7 @@ public void readEntriesFailed(ManagedLedgerException exception, Object dummyCtx4
}
};
rangeEntryCache.asyncReadEntry0(lh, missingOnLeft.startEntry, missingOnLeft.endEntry,
shouldCacheEntry, readFromLeftCallback, null);
shouldCacheEntry, readFromLeftCallback, null, false);
} else if (missingOnLeft != null) {
AsyncCallbacks.ReadEntriesCallback readFromLeftCallback =
new AsyncCallbacks.ReadEntriesCallback() {
Expand All @@ -395,7 +395,7 @@ public void readEntriesFailed(ManagedLedgerException exception,
}
};
rangeEntryCache.asyncReadEntry0(lh, missingOnLeft.startEntry, missingOnLeft.endEntry,
shouldCacheEntry, readFromLeftCallback, null);
shouldCacheEntry, readFromLeftCallback, null, false);
} else if (missingOnRight != null) {
AsyncCallbacks.ReadEntriesCallback readFromRightCallback =
new AsyncCallbacks.ReadEntriesCallback() {
Expand All @@ -418,7 +418,7 @@ public void readEntriesFailed(ManagedLedgerException exception,
}
};
rangeEntryCache.asyncReadEntry0(lh, missingOnRight.startEntry, missingOnRight.endEntry,
shouldCacheEntry, readFromRightCallback, null);
shouldCacheEntry, readFromRightCallback, null, false);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public class RangeEntryCacheImpl implements EntryCache {
/**
* Overhead per-entry to take into account the envelope.
*/
private static final long BOOKKEEPER_READ_OVERHEAD_PER_ENTRY = 64;
public static final long BOOKKEEPER_READ_OVERHEAD_PER_ENTRY = 64;

private final RangeEntryCacheManagerImpl manager;
final ManagedLedgerImpl ml;
Expand Down Expand Up @@ -102,7 +102,7 @@ public String getName() {
}

@VisibleForTesting
InflightReadsLimiter getPendingReadsLimiter() {
public InflightReadsLimiter getPendingReadsLimiter() {
return manager.getInflightReadsLimiter();
}

Expand Down Expand Up @@ -282,7 +282,7 @@ private void asyncReadEntry0(ReadHandle lh, Position position, final ReadEntryCa
public void asyncReadEntry(ReadHandle lh, long firstEntry, long lastEntry, boolean shouldCacheEntry,
final ReadEntriesCallback callback, Object ctx) {
try {
asyncReadEntry0(lh, firstEntry, lastEntry, shouldCacheEntry, callback, ctx);
asyncReadEntry0(lh, firstEntry, lastEntry, shouldCacheEntry, callback, ctx, true);
} catch (Throwable t) {
log.warn("failed to read entries for {}--{}-{}", lh.getId(), firstEntry, lastEntry, t);
// invalidate all entries related to ledger from the cache (it might happen if entry gets corrupt
Expand All @@ -295,16 +295,20 @@ public void asyncReadEntry(ReadHandle lh, long firstEntry, long lastEntry, boole

@SuppressWarnings({ "unchecked", "rawtypes" })
void asyncReadEntry0(ReadHandle lh, long firstEntry, long lastEntry, boolean shouldCacheEntry,
final ReadEntriesCallback callback, Object ctx) {
asyncReadEntry0WithLimits(lh, firstEntry, lastEntry, shouldCacheEntry, callback, ctx, null);
final ReadEntriesCallback callback, Object ctx, boolean withLimits) {
asyncReadEntry0WithLimits(lh, firstEntry, lastEntry, shouldCacheEntry, callback, ctx, null, withLimits);
}

void asyncReadEntry0WithLimits(ReadHandle lh, long firstEntry, long lastEntry, boolean shouldCacheEntry,
final ReadEntriesCallback originalCallback, Object ctx, InflightReadsLimiter.Handle handle) {

final AsyncCallbacks.ReadEntriesCallback callback =
handlePendingReadsLimits(lh, firstEntry, lastEntry, shouldCacheEntry,
originalCallback, ctx, handle);
final ReadEntriesCallback originalCallback, Object ctx, InflightReadsLimiter.Handle handle,
boolean withLimits) {
AsyncCallbacks.ReadEntriesCallback callback;
if (withLimits) {
callback = handlePendingReadsLimits(lh, firstEntry, lastEntry, shouldCacheEntry, originalCallback, ctx,
handle);
} else {
callback = originalCallback;
}
if (callback == null) {
return;
}
Expand Down Expand Up @@ -382,7 +386,7 @@ private AsyncCallbacks.ReadEntriesCallback handlePendingReadsLimits(ReadHandle l
}
ml.getExecutor().execute(() -> {
asyncReadEntry0WithLimits(lh, firstEntry, lastEntry, shouldCacheEntry,
originalCallback, ctx, newHandle);
originalCallback, ctx, newHandle, true);
});
return null;
} else {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.bookkeeper.mledger.impl;

import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.doAnswer;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.client.api.LedgerEntries;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
import org.apache.bookkeeper.mledger.impl.cache.InflightReadsLimiter;
import org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheImpl;
import org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheManagerImpl;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.awaitility.Awaitility;
import org.awaitility.reflect.WhiteboxImpl;
import org.mockito.Mockito;
import org.mockito.stubbing.Answer;
import org.testng.Assert;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Slf4j
public class InflightReadsLimiterIntegrationTest extends MockedBookKeeperTestCase {

@DataProvider
public Object[][] readMissingCases() {
return new Object[][]{
{"missRight"},
{"missLeft"},
{"bothMiss"}
};
}

@Test(dataProvider = "readMissingCases")
public void testPreciseLimitation(String missingCase) throws Exception {
final long start1 = 50;
final long start2 = "missLeft".endsWith(missingCase) || "bothMiss".equals(missingCase) ? 30 : 50;
final long end1 = 99;
final long end2 = "missRight".endsWith(missingCase) || "bothMiss".equals(missingCase) ? 109 : 99;
final HashSet<Long> secondReadEntries = new HashSet<>();
if (start2 < start1) {
secondReadEntries.add(start2);
}
if (end2 > end1) {
secondReadEntries.add(end1 + 1);
}
final int readCount1 = (int) (end1 - start1 + 1);
final int readCount2 = (int) (end2 - start2 + 1);

final DefaultThreadFactory threadFactory = new DefaultThreadFactory(UUID.randomUUID().toString());
final ManagedLedgerConfig config = new ManagedLedgerConfig();
config.setMaxEntriesPerLedger(100000);
ManagedLedgerFactoryConfig factoryConfig = new ManagedLedgerFactoryConfig();
factoryConfig.setCacheEvictionIntervalMs(3600 * 1000);
factoryConfig.setManagedLedgerMaxReadsInFlightSize(1000_000);
final ManagedLedgerFactoryImpl factory = new ManagedLedgerFactoryImpl(metadataStore, bkc, factoryConfig);
final ManagedLedgerImpl ml = (ManagedLedgerImpl) factory.open("my_test_ledger", config);
final RangeEntryCacheImpl entryCache = (RangeEntryCacheImpl) ml.entryCache;
final RangeEntryCacheManagerImpl rangeEntryCacheManager =
(RangeEntryCacheManagerImpl) factory.getEntryCacheManager();
final InflightReadsLimiter limiter = rangeEntryCacheManager.getInflightReadsLimiter();
final long totalCapacity =limiter.getRemainingBytes();
// final ManagedCursorImpl c1 = (ManagedCursorImpl) ml.openCursor("c1");
for (byte i = 1; i < 127; i++) {
log.info("add entry: " + i);
ml.addEntry(new byte[]{i});
}
// Evict cached entries.
entryCache.evictEntries(ml.currentLedgerSize);
Assert.assertEquals(entryCache.getSize(), 0);

CountDownLatch readCompleteSignal1 = new CountDownLatch(1);
CountDownLatch readCompleteSignal2 = new CountDownLatch(1);
CountDownLatch firstReadingStarted = new CountDownLatch(1);
LedgerHandle currentLedger = ml.currentLedger;
LedgerHandle spyCurrentLedger = Mockito.spy(currentLedger);
ml.currentLedger = spyCurrentLedger;
Answer answer = invocation -> {
long firstEntry = (long) invocation.getArguments()[0];
log.info("reading entry: {}", firstEntry);
if (firstEntry == start1) {
// Wait 3s to make
firstReadingStarted.countDown();
readCompleteSignal1.await();
Object res = invocation.callRealMethod();
return res;
} else if(secondReadEntries.contains(firstEntry)) {
final CompletableFuture res = new CompletableFuture<>();
threadFactory.newThread(() -> {
try {
readCompleteSignal2.await();
CompletableFuture<LedgerEntries> future =
(CompletableFuture<LedgerEntries>) invocation.callRealMethod();
future.thenAccept(v -> {
res.complete(v);
}).exceptionally(ex -> {
res.completeExceptionally(ex);
return null;
});
} catch (Throwable ex) {
res.completeExceptionally(ex);
}
}).start();
return res;
} else {
return invocation.callRealMethod();
}
};
doAnswer(answer).when(spyCurrentLedger).readAsync(anyLong(), anyLong());
doAnswer(answer).when(spyCurrentLedger).readUnconfirmedAsync(anyLong(), anyLong());

// Initialize "entryCache.estimatedEntrySize" to the correct value.
Object ctx = new Object();
SimpleReadEntriesCallback cb0 = new SimpleReadEntriesCallback();
entryCache.asyncReadEntry(spyCurrentLedger, 125, 125, true, cb0, ctx);
cb0.entries.join();
Long sizePerEntry1 = WhiteboxImpl.getInternalState(entryCache, "estimatedEntrySize");
Assert.assertEquals(sizePerEntry1, 1);
Awaitility.await().untilAsserted(() -> {
long remainingBytes =limiter.getRemainingBytes();
Assert.assertEquals(remainingBytes, totalCapacity);
});
log.info("remainingBytes 0: {}", limiter.getRemainingBytes());

// Concurrency reading.

SimpleReadEntriesCallback cb1 = new SimpleReadEntriesCallback();
SimpleReadEntriesCallback cb2 = new SimpleReadEntriesCallback();
threadFactory.newThread(() -> {
entryCache.asyncReadEntry(spyCurrentLedger, start1, end1, true, cb1, ctx);
}).start();
threadFactory.newThread(() -> {
try {
firstReadingStarted.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
entryCache.asyncReadEntry(spyCurrentLedger, start2, end2, true, cb2, ctx);
}).start();

long bytesAcquired1 = calculateBytesSizeBeforeFirstReading(readCount1 + readCount2, 1);
long remainingBytesExpected1 = totalCapacity - bytesAcquired1;
log.info("acquired : {}", bytesAcquired1);
log.info("remainingBytesExpected 0 : {}", remainingBytesExpected1);
Awaitility.await().untilAsserted(() -> {
log.info("remainingBytes 0: {}", limiter.getRemainingBytes());
Assert.assertEquals(limiter.getRemainingBytes(), remainingBytesExpected1);
});

// Complete the read1.
Thread.sleep(3000);
readCompleteSignal1.countDown();
cb1.entries.join();
Long sizePerEntry2 = WhiteboxImpl.getInternalState(entryCache, "estimatedEntrySize");
Assert.assertEquals(sizePerEntry2, 1);
long bytesAcquired2 = calculateBytesSizeBeforeFirstReading(readCount2, 1);
long remainingBytesExpected2 = totalCapacity - bytesAcquired2;
log.info("acquired : {}", bytesAcquired2);
log.info("remainingBytesExpected 1: {}", remainingBytesExpected2);
Awaitility.await().untilAsserted(() -> {
log.info("remainingBytes 1: {}", limiter.getRemainingBytes());
Assert.assertEquals(limiter.getRemainingBytes(), remainingBytesExpected2);
});

readCompleteSignal2.countDown();
cb2.entries.join();
Long sizePerEntry3 = WhiteboxImpl.getInternalState(entryCache, "estimatedEntrySize");
Assert.assertEquals(sizePerEntry3, 1);
Awaitility.await().untilAsserted(() -> {
long remainingBytes = limiter.getRemainingBytes();
log.info("remainingBytes 2: {}", remainingBytes);
Assert.assertEquals(remainingBytes, totalCapacity);
});
// cleanup
ml.delete();
factory.shutdown();
}

private long calculateBytesSizeBeforeFirstReading(int entriesCount, int perEntrySize) {
return entriesCount * (perEntrySize + RangeEntryCacheImpl.BOOKKEEPER_READ_OVERHEAD_PER_ENTRY);
}

class SimpleReadEntriesCallback implements AsyncCallbacks.ReadEntriesCallback {

CompletableFuture<List<Byte>> entries = new CompletableFuture<>();

@Override
public void readEntriesComplete(List<Entry> entriesRead, Object ctx) {
List<Byte> list = new ArrayList<>(entriesRead.size());
for (Entry entry : entriesRead) {
byte b = entry.getDataBuffer().readByte();
list.add(b);
entry.release();
}
this.entries.complete(list);
}

@Override
public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
this.entries.completeExceptionally(exception);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
return null;
}
}).when(rangeEntryCache).asyncReadEntry0(any(), anyLong(), anyLong(),
anyBoolean(), any(), any());
anyBoolean(), any(), any(), anyBoolean());

lh = mock(ReadHandle.class);
ml = mock(ManagedLedgerImpl.class);
Expand Down
Loading