From 1816b5e318e9e57943a752962c80a8403e265754 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=81=93=E5=90=9B?= Date: Thu, 10 Oct 2024 22:03:51 +0800 Subject: [PATCH] [fix][broker] Avoid orphan ledgers in BucketDelayedDeliveryTracker (#22802) (cherry picked from commit 8b6b3370f921435a61132e4f26f6428907dc69d8) (cherry picked from commit 7299f1af75ff39a9f046bd8f2c6e77dfff5ceb57) --- .../BucketDelayedDeliveryTrackerFactory.java | 7 ++-- .../BookkeeperBucketSnapshotStorage.java | 18 ++++++++--- .../bucket/BucketDelayedDeliveryTracker.java | 30 +++++++++++++++-- .../bucket/BucketNotExistException.java | 32 +++++++++++++++++++ .../delayed/bucket/ImmutableBucket.java | 10 +++--- 5 files changed, 83 insertions(+), 14 deletions(-) create mode 100644 pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketNotExistException.java diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/BucketDelayedDeliveryTrackerFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/BucketDelayedDeliveryTrackerFactory.java index 69a08bd2be4a9..2ee09a0f04c7b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/BucketDelayedDeliveryTrackerFactory.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/BucketDelayedDeliveryTrackerFactory.java @@ -119,10 +119,9 @@ public CompletableFuture cleanResidualSnapshots(ManagedCursor cursor) { FutureUtil.Sequencer sequencer = FutureUtil.Sequencer.create(); cursorProperties.forEach((k, v) -> { if (k != null && v != null && k.startsWith(BucketDelayedDeliveryTracker.DELAYED_BUCKET_KEY_PREFIX)) { - CompletableFuture future = sequencer.sequential(() -> { - return cursor.removeCursorProperty(k) - .thenCompose(__ -> bucketSnapshotStorage.deleteBucketSnapshot(Long.parseLong(v))); - }); + CompletableFuture future = sequencer.sequential(() -> + bucketSnapshotStorage.deleteBucketSnapshot(Long.parseLong(v)) + .thenCompose(__ -> cursor.removeCursorProperty(k))); futures.add(future); } }); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java index e99f39b382f56..4c4df419133c2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java @@ -205,7 +205,10 @@ private CompletableFuture openLedger(Long ledgerId) { BookKeeper.DigestType.fromApiDigestType(config.getManagedLedgerDigestType()), LedgerPassword, (rc, handle, ctx) -> { - if (rc != BKException.Code.OK) { + if (rc == BKException.Code.NoSuchLedgerExistsException) { + // If the ledger does not exist, throw BucketNotExistException + future.completeExceptionally(noSuchLedgerException("Open ledger", ledgerId)); + } else if (rc != BKException.Code.OK) { future.completeExceptionally(bkException("Open ledger", rc, ledgerId)); } else { future.complete(handle); @@ -265,10 +268,11 @@ CompletableFuture> getLedgerEntry(LedgerHandle ledger, private CompletableFuture deleteLedger(long ledgerId) { CompletableFuture future = new CompletableFuture<>(); bookKeeper.asyncDeleteLedger(ledgerId, (int rc, Object cnx) -> { - if (rc != BKException.Code.OK) { - future.completeExceptionally(bkException("Delete ledger", rc, ledgerId)); - } else { + if (rc == BKException.Code.NoSuchLedgerExistsException || rc == BKException.Code.OK) { + // If the ledger does not exist or has been deleted, we can treat it as success future.complete(null); + } else { + future.completeExceptionally(bkException("Delete ledger", rc, ledgerId)); } }, null); return future; @@ -279,4 +283,10 @@ private static BucketSnapshotPersistenceException bkException(String operation, + " - ledger=" + ledgerId + " - operation=" + operation; return new BucketSnapshotPersistenceException(message); } + + private static BucketNotExistException noSuchLedgerException(String operation, long ledgerId) { + String message = BKException.getMessage(BKException.Code.NoSuchLedgerExistsException) + + " - ledger=" + ledgerId + " - operation=" + operation; + return new BucketNotExistException(message); + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java index 1cbae674bd512..c23bc17c30365 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java @@ -179,8 +179,7 @@ private synchronized long recoverBucketSnapshot() throws RecoverDelayedDeliveryT futures = new HashMap<>(immutableBucketMap.size()); for (Map.Entry, ImmutableBucket> entry : immutableBucketMap.entrySet()) { Range key = entry.getKey(); - ImmutableBucket immutableBucket = entry.getValue(); - futures.put(key, immutableBucket.asyncRecoverBucketSnapshotEntry(this::getCutoffTime)); + futures.put(key, handleRecoverBucketSnapshotEntry(entry.getValue())); } try { @@ -231,6 +230,33 @@ private synchronized long recoverBucketSnapshot() throws RecoverDelayedDeliveryT return numberDelayedMessages.getValue(); } + /** + * Handle the BucketNotExistException when recover bucket snapshot entry. + * The non exist bucket will be added to `toBeDeletedBucketMap` and deleted from `immutableBuckets` + * in the next step. + * + * @param bucket + * @return + */ + private CompletableFuture> handleRecoverBucketSnapshotEntry(ImmutableBucket bucket) { + CompletableFuture> f = new CompletableFuture<>(); + bucket.asyncRecoverBucketSnapshotEntry(this::getCutoffTime) + .whenComplete((v, e) -> { + if (e == null) { + f.complete(v); + } else { + if (e instanceof BucketNotExistException) { + // If the bucket does not exist, return an empty list, + // the bucket will be deleted from `immutableBuckets` in the next step. + f.complete(Collections.emptyList()); + } else { + f.completeExceptionally(e); + } + } + }); + return f; + } + private synchronized void putAndCleanOverlapRange(Range range, ImmutableBucket immutableBucket, Map, ImmutableBucket> toBeDeletedBucketMap) { RangeMap subRangeMap = immutableBuckets.subRangeMap(range); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketNotExistException.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketNotExistException.java new file mode 100644 index 0000000000000..f6c16a1595f54 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketNotExistException.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.delayed.bucket; + +import org.apache.pulsar.broker.service.BrokerServiceException; + +public class BucketNotExistException extends BrokerServiceException.PersistenceException { + + public BucketNotExistException(Throwable t) { + super(t); + } + + public BucketNotExistException(String msg) { + super(msg); + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java index 0932f51f350ce..a1944a21ea794 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java @@ -193,9 +193,10 @@ CompletableFuture asyncDeleteBucketSnapshot(BucketDelayedMessageIndexStats stats.recordTriggerEvent(BucketDelayedMessageIndexStats.Type.delete); String bucketKey = bucketKey(); long bucketId = getAndUpdateBucketId(); - return removeBucketCursorProperty(bucketKey).thenCompose(__ -> - executeWithRetry(() -> bucketSnapshotStorage.deleteBucketSnapshot(bucketId), - BucketSnapshotPersistenceException.class, MaxRetryTimes)).whenComplete((__, ex) -> { + + return executeWithRetry(() -> bucketSnapshotStorage.deleteBucketSnapshot(bucketId), + BucketSnapshotPersistenceException.class, MaxRetryTimes) + .whenComplete((__, ex) -> { if (ex != null) { log.error("[{}] Failed to delete bucket snapshot, bucketId: {}, bucketKey: {}", dispatcherName, bucketId, bucketKey, ex); @@ -208,7 +209,8 @@ CompletableFuture asyncDeleteBucketSnapshot(BucketDelayedMessageIndexStats stats.recordSuccessEvent(BucketDelayedMessageIndexStats.Type.delete, System.currentTimeMillis() - deleteStartTime); } - }); + }) + .thenCompose(__ -> removeBucketCursorProperty(bucketKey)); } CompletableFuture clear(BucketDelayedMessageIndexStats stats) {