Skip to content

Commit d4ecfaf

Browse files
poorbarcodesrinath-ctds
authored andcommitted
[fix][ml] Topic load timeout due to ml data ledger future never finishes (apache#23772)
### Motivation **Background** There is a mechanism that repeatedly prevents the callback of ML data ledger creation: - Start a scheduled task to check whether the creation will be timeout. - Received a callback - Check whether the future(`@param ctx` of `BK.createAsync`) has been done or not. - If done: it means the creation has timeout before the creation is completed - Otherwise: it is a real callback from BK. **Issue:** But the timeout event will call the same callback as above, then the steps are as follows, which you ca reproduce by the test `testCreateDataLedgerTimeout`: - Start creating a data ledger - Call `BK.createAsync` - Timeout - Mark the future(`@param ctx` of `BK.createAsync`) as completed exceptionally. - Trigger the callback related to ledger creation. - Check whether the future(`@param ctx` of `BK.createAsync`) has been done or not. - If done: do nothing. - Creation is compelled. - Trigger the callback related to ledger creation. - Check whether the future(`@param ctx` of `BK.createAsync`) has been done or not. - If done: do nothing. - Issue: The callback for ledger creation will never be called. ![Screenshot 2024-12-24 at 00 14 38](https://github.com/user-attachments/assets/44ed19d2-7238-45a4-9186-c127f6ed14f7) ![Screenshot 2024-12-24 at 00 14 08](https://github.com/user-attachments/assets/349f39ff-7e98-4a09-9af2-f80082339592) ### Modifications Fix the issue ### Documentation <!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. --> - [ ] `doc` <!-- Your PR contains doc changes. --> - [ ] `doc-required` <!-- Your PR changes impact docs and you will update later --> - [x] `doc-not-needed` <!-- Your PR changes do not impact docs --> - [ ] `doc-complete` <!-- Docs have been already added --> ### Matching PR in forked repository PR in forked repository: x (cherry picked from commit 9699dc2) (cherry picked from commit 2189b60)
1 parent 32a8368 commit d4ecfaf

File tree

3 files changed

+41
-2
lines changed

3 files changed

+41
-2
lines changed

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -530,6 +530,7 @@ public void operationFailed(MetaStoreException e) {
530530
executor.execute(() -> {
531531
mbean.endDataLedgerCreateOp();
532532
if (rc != BKException.Code.OK) {
533+
log.error("[{}] Error creating ledger rc={} {}", name, rc, BKException.getMessage(rc));
533534
callback.initializeFailed(createManagedLedgerException(rc));
534535
return;
535536
}
@@ -4192,7 +4193,7 @@ public Clock getClock() {
41924193
protected boolean checkAndCompleteLedgerOpTask(int rc, LedgerHandle lh, Object ctx) {
41934194
if (ctx instanceof CompletableFuture) {
41944195
// ledger-creation is already timed out and callback is already completed so, delete this ledger and return.
4195-
if (((CompletableFuture) ctx).complete(lh)) {
4196+
if (((CompletableFuture) ctx).complete(lh) || rc == BKException.Code.TimeoutException) {
41964197
return false;
41974198
} else {
41984199
if (rc == BKException.Code.OK) {

managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java

+25
Original file line numberDiff line numberDiff line change
@@ -4233,6 +4233,31 @@ public void testNonDurableCursorCreateForInactiveLedger() throws Exception {
42334233
assertNotNull(ml.newNonDurableCursor(Position));
42344234
}
42354235

4236+
@Test(timeOut = 60 * 1000)
4237+
public void testCreateDataLedgerTimeout() throws Exception {
4238+
String mlName = UUID.randomUUID().toString();
4239+
ManagedLedgerFactoryImpl factory = null;
4240+
ManagedLedger ml = null;
4241+
try {
4242+
factory = new ManagedLedgerFactoryImpl(metadataStore, bkc);
4243+
ManagedLedgerConfig config = new ManagedLedgerConfig();
4244+
config.setMetadataOperationsTimeoutSeconds(5);
4245+
bkc.delay(10 * 1000);
4246+
ml = factory.open(mlName, config);
4247+
fail("Should get a timeout ex");
4248+
} catch (ManagedLedgerException ex) {
4249+
assertTrue(ex.getMessage().contains("timeout"));
4250+
} finally {
4251+
// cleanup.
4252+
if (ml != null) {
4253+
ml.delete();
4254+
}
4255+
if (factory != null) {
4256+
factory.shutdown();
4257+
}
4258+
}
4259+
}
4260+
42364261
/***
42374262
* When a ML tries to create a ledger, it will create a delay task to check if the ledger create request is timeout.
42384263
* But we should guarantee that the delay task should be canceled after the ledger create request responded.

testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java

+14-1
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import static com.google.common.base.Preconditions.checkArgument;
2222
import com.google.common.collect.Lists;
23+
import io.netty.util.concurrent.DefaultThreadFactory;
2324
import java.util.ArrayList;
2425
import java.util.Arrays;
2526
import java.util.Collection;
@@ -35,6 +36,7 @@
3536
import java.util.concurrent.ConcurrentLinkedQueue;
3637
import java.util.concurrent.ExecutionException;
3738
import java.util.concurrent.ExecutorService;
39+
import java.util.concurrent.Executors;
3840
import java.util.concurrent.ScheduledExecutorService;
3941
import java.util.concurrent.TimeUnit;
4042
import java.util.concurrent.atomic.AtomicLong;
@@ -70,6 +72,7 @@ public class PulsarMockBookKeeper extends BookKeeper {
7072

7173
final OrderedExecutor orderedExecutor;
7274
final ExecutorService executor;
75+
final ScheduledExecutorService scheduler;
7376

7477
@Override
7578
public ClientConfiguration getConf() {
@@ -97,6 +100,7 @@ public static Collection<BookieId> getMockEnsemble() {
97100
public PulsarMockBookKeeper(OrderedExecutor orderedExecutor) throws Exception {
98101
this.orderedExecutor = orderedExecutor;
99102
this.executor = orderedExecutor.chooseThread();
103+
scheduler = Executors.newScheduledThreadPool(1, new DefaultThreadFactory("mock-bk-scheduler"));
100104
}
101105

102106
@Override
@@ -290,7 +294,7 @@ public void shutdown() {
290294
for (PulsarMockLedgerHandle ledger : ledgers.values()) {
291295
ledger.entries.clear();
292296
}
293-
297+
scheduler.shutdown();
294298
ledgers.clear();
295299
}
296300

@@ -331,6 +335,15 @@ synchronized CompletableFuture<Void> getProgrammedFailure() {
331335
return failures.isEmpty() ? defaultResponse : failures.remove(0);
332336
}
333337

338+
public void delay(long millis) {
339+
CompletableFuture<Void> delayFuture = new CompletableFuture<>();
340+
scheduler.schedule(() -> {
341+
delayFuture.complete(null);
342+
}, millis, TimeUnit.MILLISECONDS);
343+
failures.add(delayFuture);
344+
}
345+
346+
334347
public void failNow(int rc) {
335348
failAfter(0, rc);
336349
}

0 commit comments

Comments
 (0)