diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java index 03439f93ccad8..a24251450b4f4 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java @@ -505,8 +505,10 @@ public int getMaxUnackedRangesToPersistInMetadataStore() { return maxUnackedRangesToPersistInMetadataStore; } - public void setMaxUnackedRangesToPersistInMetadataStore(int maxUnackedRangesToPersistInMetadataStore) { + public ManagedLedgerConfig setMaxUnackedRangesToPersistInMetadataStore( + int maxUnackedRangesToPersistInMetadataStore) { this.maxUnackedRangesToPersistInMetadataStore = maxUnackedRangesToPersistInMetadataStore; + return this; } /** diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index e27814eadd0b5..b39fd231cdc06 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -59,6 +59,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Function; import java.util.function.Predicate; +import java.util.stream.Collectors; import java.util.stream.LongStream; import org.apache.bookkeeper.client.AsyncCallback.CloseCallback; import org.apache.bookkeeper.client.AsyncCallback.OpenCallback; @@ -91,12 +92,15 @@ import org.apache.bookkeeper.mledger.ScanOutcome; import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback; import org.apache.bookkeeper.mledger.proto.MLDataFormats; +import org.apache.bookkeeper.mledger.proto.MLDataFormats.LongListMap; import org.apache.bookkeeper.mledger.proto.MLDataFormats.LongProperty; import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo; import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo; import org.apache.bookkeeper.mledger.proto.MLDataFormats.MessageRange; import org.apache.bookkeeper.mledger.proto.MLDataFormats.PositionInfo; +import org.apache.bookkeeper.mledger.proto.MLDataFormats.PositionInfo.Builder; import org.apache.bookkeeper.mledger.proto.MLDataFormats.StringProperty; +import org.apache.commons.lang3.mutable.MutableInt; import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats; import org.apache.pulsar.common.util.DateFormatter; @@ -606,9 +610,7 @@ protected void recoverFromLedger(final ManagedCursorInfo info, final VoidCallbac } Position position = PositionFactory.create(positionInfo.getLedgerId(), positionInfo.getEntryId()); - if (positionInfo.getIndividualDeletedMessagesCount() > 0) { - recoverIndividualDeletedMessages(positionInfo.getIndividualDeletedMessagesList()); - } + recoverIndividualDeletedMessages(positionInfo); if (getConfig().isDeletionAtBatchIndexLevelEnabled() && positionInfo.getBatchedEntryDeletionIndexInfoCount() > 0) { recoverBatchDeletedIndexes(positionInfo.getBatchedEntryDeletionIndexInfoList()); @@ -627,6 +629,45 @@ protected void recoverFromLedger(final ManagedCursorInfo info, final VoidCallbac } } + public void recoverIndividualDeletedMessages(PositionInfo positionInfo) { + if (positionInfo.getIndividualDeletedMessagesCount() > 0) { + recoverIndividualDeletedMessages(positionInfo.getIndividualDeletedMessagesList()); + } else if (positionInfo.getIndividualDeletedMessageRangesCount() > 0) { + List rangeList = positionInfo.getIndividualDeletedMessageRangesList(); + try { + Map rangeMap = rangeList.stream().collect(Collectors.toMap(LongListMap::getKey, + list -> list.getValuesList().stream().mapToLong(i -> i).toArray())); + individualDeletedMessages.build(rangeMap); + } catch (Exception e) { + log.warn("[{}]-{} Failed to recover individualDeletedMessages from serialized data", ledger.getName(), + name, e); + } + } + } + + private List buildLongPropertiesMap(Map properties) { + if (properties.isEmpty()) { + return Collections.emptyList(); + } + List longListMap = new ArrayList<>(); + MutableInt serializedSize = new MutableInt(); + properties.forEach((id, ranges) -> { + if (ranges == null || ranges.length <= 0) { + return; + } + org.apache.bookkeeper.mledger.proto.MLDataFormats.LongListMap.Builder lmBuilder = LongListMap.newBuilder() + .setKey(id); + for (long range : ranges) { + lmBuilder.addValues(range); + } + LongListMap lm = lmBuilder.build(); + longListMap.add(lm); + serializedSize.add(lm.getSerializedSize()); + }); + individualDeletedMessagesSerializedSize = serializedSize.toInteger(); + return longListMap; + } + private void recoverIndividualDeletedMessages(List individualDeletedMessagesList) { lock.writeLock().lock(); try { @@ -3125,12 +3166,23 @@ private List buildBatchEntryDeletio void persistPositionToLedger(final LedgerHandle lh, MarkDeleteEntry mdEntry, final VoidCallback callback) { Position position = mdEntry.newPosition; - PositionInfo pi = PositionInfo.newBuilder().setLedgerId(position.getLedgerId()) + Builder piBuilder = PositionInfo.newBuilder().setLedgerId(position.getLedgerId()) .setEntryId(position.getEntryId()) - .addAllIndividualDeletedMessages(buildIndividualDeletedMessageRanges()) .addAllBatchedEntryDeletionIndexInfo(buildBatchEntryDeletionIndexInfoList()) - .addAllProperties(buildPropertiesMap(mdEntry.properties)).build(); + .addAllProperties(buildPropertiesMap(mdEntry.properties)); + Map internalRanges = null; + try { + internalRanges = individualDeletedMessages.toRanges(getConfig().getMaxUnackedRangesToPersist()); + } catch (Exception e) { + log.warn("[{}]-{} Failed to serialize individualDeletedMessages", ledger.getName(), name, e); + } + if (internalRanges != null && !internalRanges.isEmpty()) { + piBuilder.addAllIndividualDeletedMessageRanges(buildLongPropertiesMap(internalRanges)); + } else { + piBuilder.addAllIndividualDeletedMessages(buildIndividualDeletedMessageRanges()); + } + PositionInfo pi = piBuilder.build(); if (log.isDebugEnabled()) { log.debug("[{}] Cursor {} Appending to ledger={} position={}", ledger.getName(), name, lh.getId(), diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/RangeSetWrapper.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/RangeSetWrapper.java index a55e6444b2fd9..11cce409bec54 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/RangeSetWrapper.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/RangeSetWrapper.java @@ -24,6 +24,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.Map; import org.apache.bookkeeper.mledger.ManagedLedgerConfig; import org.apache.pulsar.common.util.collections.LongPairRangeSet; import org.apache.pulsar.common.util.collections.OpenLongPairRangeSet; @@ -142,6 +143,16 @@ public Range lastRange() { return rangeSet.lastRange(); } + @Override + public Map toRanges(int maxRanges) { + return rangeSet.toRanges(maxRanges); + } + + @Override + public void build(Map internalRange) { + rangeSet.build(internalRange); + } + @Override public int cardinality(long lowerKey, long lowerValue, long upperKey, long upperValue) { return rangeSet.cardinality(lowerKey, lowerValue, upperKey, upperValue); @@ -176,4 +187,22 @@ public boolean isDirtyLedgers(long ledgerId) { public String toString() { return rangeSet.toString(); } + + @Override + public int hashCode() { + return rangeSet.hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof RangeSetWrapper)) { + return false; + } + if (this == obj) { + return true; + } + @SuppressWarnings("rawtypes") + RangeSetWrapper set = (RangeSetWrapper) obj; + return this.rangeSet.equals(set.rangeSet); + } } diff --git a/managed-ledger/src/main/proto/MLDataFormats.proto b/managed-ledger/src/main/proto/MLDataFormats.proto index fdffed6762db7..f196649df0fdf 100644 --- a/managed-ledger/src/main/proto/MLDataFormats.proto +++ b/managed-ledger/src/main/proto/MLDataFormats.proto @@ -82,6 +82,7 @@ message PositionInfo { // Store which index in the batch message has been deleted repeated BatchedEntryDeletionIndexInfo batchedEntryDeletionIndexInfo = 5; + repeated LongListMap individualDeletedMessageRanges = 6; } message NestedPositionInfo { @@ -89,6 +90,11 @@ message NestedPositionInfo { required int64 entryId = 2; } +message LongListMap { + required int64 key = 1; + repeated int64 values = 2; +} + message MessageRange { required NestedPositionInfo lowerEndpoint = 1; required NestedPositionInfo upperEndpoint = 2; diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java index 8913c4013b4ab..1067cda441f6a 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java @@ -3223,7 +3223,7 @@ public void testOutOfOrderDeletePersistenceIntoLedgerWithClose() throws Exceptio managedLedgerConfig.setMaxUnackedRangesToPersistInMetadataStore(10); ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open(ledgerName, managedLedgerConfig); - ManagedCursorImpl c1 = (ManagedCursorImpl) ledger.openCursor(cursorName); + final ManagedCursorImpl c1 = (ManagedCursorImpl) ledger.openCursor(cursorName); List addedPositions = new ArrayList<>(); for (int i = 0; i < totalAddEntries; i++) { @@ -3269,7 +3269,8 @@ public void operationFailed(MetaStoreException e) { LedgerEntry entry = seq.nextElement(); PositionInfo positionInfo; positionInfo = PositionInfo.parseFrom(entry.getEntry()); - individualDeletedMessagesCount.set(positionInfo.getIndividualDeletedMessagesCount()); + c1.recoverIndividualDeletedMessages(positionInfo); + individualDeletedMessagesCount.set(c1.getIndividuallyDeletedMessagesSet().asRanges().size()); } catch (Exception e) { } latch.countDown(); @@ -3286,12 +3287,12 @@ public void operationFailed(MetaStoreException e) { @Cleanup("shutdown") ManagedLedgerFactory factory2 = new ManagedLedgerFactoryImpl(metadataStore, bkc); ledger = (ManagedLedgerImpl) factory2.open(ledgerName, managedLedgerConfig); - c1 = (ManagedCursorImpl) ledger.openCursor("c1"); + ManagedCursorImpl reopenCursor = (ManagedCursorImpl) ledger.openCursor("c1"); // verify cursor has been recovered - assertEquals(c1.getNumberOfEntriesInBacklog(false), totalAddEntries / 2); + assertEquals(reopenCursor.getNumberOfEntriesInBacklog(false), totalAddEntries / 2); // try to read entries which should only read non-deleted positions - List entries = c1.readEntries(totalAddEntries); + List entries = reopenCursor.readEntries(totalAddEntries); assertEquals(entries.size(), totalAddEntries / 2); } diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java index cd1dcf05c3708..9635376a782d3 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java @@ -23,7 +23,6 @@ import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; -import io.netty.buffer.ByteBuf; import java.nio.charset.StandardCharsets; import java.util.ArrayList; @@ -34,7 +33,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; -import lombok.Cleanup; + import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.client.BookKeeperTestClient; import org.apache.bookkeeper.client.api.DigestType; @@ -53,9 +52,13 @@ import org.apache.bookkeeper.mledger.util.ThrowableToStringUtil; import org.apache.bookkeeper.test.BookKeeperClusterTestCase; import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats; +import org.apache.pulsar.common.util.collections.LongPairRangeSet; import org.awaitility.Awaitility; import org.testng.annotations.Test; +import io.netty.buffer.ByteBuf; +import lombok.Cleanup; + public class ManagedLedgerBkTest extends BookKeeperClusterTestCase { public ManagedLedgerBkTest() { @@ -587,4 +590,44 @@ public void testPeriodicRollover() throws Exception { Awaitility.await().until(() -> cursorImpl.getCursorLedger() != currentLedgerId); } + /** + * This test validates that cursor serializes and deserializes individual-ack list from the bk-ledger. + * + * @throws Exception + */ + @Test + public void testUnackmessagesAndRecovery() throws Exception { + ManagedLedgerFactoryConfig factoryConf = new ManagedLedgerFactoryConfig(); + factoryConf.setMaxCacheSize(0); + + ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(metadataStore, bkc, factoryConf); + + ManagedLedgerConfig config = new ManagedLedgerConfig().setEnsembleSize(1).setWriteQuorumSize(1) + .setAckQuorumSize(1).setMetadataEnsembleSize(1).setMetadataWriteQuorumSize(1) + .setMaxUnackedRangesToPersistInMetadataStore(1).setMaxEntriesPerLedger(5).setMetadataAckQuorumSize(1); + ManagedLedger ledger = factory.open("my_test_unack_messages", config); + ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.openCursor("c1"); + + int totalEntries = 100; + for (int i = 0; i < totalEntries; i++) { + Position p = ledger.addEntry("entry".getBytes()); + if (i % 2 == 0) { + cursor.delete(p); + } + } + + LongPairRangeSet unackMessagesBefore = cursor.getIndividuallyDeletedMessagesSet(); + + ledger.close(); + + // open and recover cursor + ledger = factory.open("my_test_unack_messages", config); + cursor = (ManagedCursorImpl) ledger.openCursor("c1"); + + LongPairRangeSet unackMessagesAfter = cursor.getIndividuallyDeletedMessagesSet(); + assertTrue(unackMessagesBefore.equals(unackMessagesAfter)); + + ledger.close(); + factory.shutdown(); + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryIntegrationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryIntegrationTest.java index 232088afb94fe..e975671fa12e8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryIntegrationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryIntegrationTest.java @@ -80,7 +80,7 @@ public void testRecoverFromNodeDeletion() throws Exception { Awaitility.await().atMost(Duration.ofSeconds(3)).untilAsserted(() -> Assert.assertEquals( brokerRegistry.getAvailableBrokersAsync().join(), List.of(pulsar.getBrokerId()))); pulsar.getLocalMetadataStore().delete(brokerMetadataPath, Optional.empty()); - Awaitility.await().atMost(Duration.ofSeconds(3)).untilAsserted(() -> Assert.assertEquals( + Awaitility.await().atMost(Duration.ofSeconds(10)).untilAsserted(() -> Assert.assertEquals( brokerRegistry.getAvailableBrokersAsync().join(), List.of(pulsar.getBrokerId()))); // If the node is deleted by unregister(), it should not recreate the path diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSet.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSet.java index 6e45401978546..51f4a9ac51c90 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSet.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSet.java @@ -18,16 +18,21 @@ */ package org.apache.pulsar.common.util.collections; +import static java.util.BitSet.valueOf; import static java.util.Objects.requireNonNull; import com.google.common.collect.BoundType; import com.google.common.collect.Range; import java.util.ArrayList; import java.util.BitSet; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Map.Entry; import java.util.NavigableMap; +import java.util.Objects; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.lang.mutable.MutableInt; /** @@ -253,6 +258,42 @@ public Range lastRange() { return Range.openClosed(consumer.apply(lastSet.getKey(), lower), consumer.apply(lastSet.getKey(), upper)); } + @Override + public Map toRanges(int maxRanges) { + Map internalBitSetMap = new HashMap<>(); + AtomicInteger rangeCount = new AtomicInteger(); + rangeBitSetMap.forEach((id, bmap) -> { + if (rangeCount.getAndAdd(bmap.cardinality()) > maxRanges) { + return; + } + internalBitSetMap.put(id, bmap.toLongArray()); + }); + return internalBitSetMap; + } + + @Override + public void build(Map internalRange) { + internalRange.forEach((id, ranges) -> rangeBitSetMap.put(id, valueOf(ranges))); + } + + @Override + public int hashCode() { + return Objects.hashCode(rangeBitSetMap); + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof ConcurrentOpenLongPairRangeSet)) { + return false; + } + if (this == obj) { + return true; + } + @SuppressWarnings("rawtypes") + ConcurrentOpenLongPairRangeSet set = (ConcurrentOpenLongPairRangeSet) obj; + return this.rangeBitSetMap.equals(set.rangeBitSetMap); + } + @Override public int cardinality(long lowerKey, long lowerValue, long upperKey, long upperValue) { NavigableMap subMap = rangeBitSetMap.subMap(lowerKey, true, upperKey, true); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/LongPairRangeSet.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/LongPairRangeSet.java index 8aad5587dfd38..df74857245bb3 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/LongPairRangeSet.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/LongPairRangeSet.java @@ -25,6 +25,7 @@ import java.util.Collection; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.NoSuchElementException; import java.util.Set; import lombok.EqualsAndHashCode; @@ -136,6 +137,19 @@ public interface LongPairRangeSet> { */ Range lastRange(); + default Map toRanges(int maxRanges) { + throw new UnsupportedOperationException(); + } + + /** + * Build {@link LongPairRangeSet} using internal ranges returned by {@link #toRanges(int)} . + * + * @param ranges + */ + default void build(Map ranges) { + throw new UnsupportedOperationException(); + } + /** * Return the number bit sets to true from lower (inclusive) to upper (inclusive). */ diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/OpenLongPairRangeSet.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/OpenLongPairRangeSet.java index 6df6d414871ec..3076c6c5c5fa1 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/OpenLongPairRangeSet.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/OpenLongPairRangeSet.java @@ -18,16 +18,21 @@ */ package org.apache.pulsar.common.util.collections; +import static java.util.BitSet.valueOf; import static java.util.Objects.requireNonNull; import com.google.common.collect.BoundType; import com.google.common.collect.Range; import java.util.ArrayList; import java.util.BitSet; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Map.Entry; import java.util.NavigableMap; +import java.util.Objects; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; import javax.annotation.concurrent.NotThreadSafe; import org.apache.commons.lang.mutable.MutableInt; @@ -250,6 +255,47 @@ public Range lastRange() { return Range.openClosed(consumer.apply(lastSet.getKey(), lower), consumer.apply(lastSet.getKey(), upper)); } + @Override + public Map toRanges(int maxRanges) { + Map internalBitSetMap = new HashMap<>(); + AtomicInteger rangeCount = new AtomicInteger(); + rangeBitSetMap.forEach((id, bmap) -> { + if (rangeCount.getAndAdd(bmap.cardinality()) > maxRanges) { + return; + } + internalBitSetMap.put(id, bmap.toLongArray()); + }); + return internalBitSetMap; + } + + @Override + public void build(Map internalRange) { + internalRange.forEach((id, ranges) -> { + BitSet bitset = createNewBitSet(); + bitset.or(valueOf(ranges)); + rangeBitSetMap.put(id, bitset); + }); + } + + + @Override + public int hashCode() { + return Objects.hashCode(rangeBitSetMap); + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof OpenLongPairRangeSet)) { + return false; + } + if (this == obj) { + return true; + } + @SuppressWarnings("rawtypes") + OpenLongPairRangeSet set = (OpenLongPairRangeSet) obj; + return this.rangeBitSetMap.equals(set.rangeBitSetMap); + } + @Override public int cardinality(long lowerKey, long lowerValue, long upperKey, long upperValue) { NavigableMap subMap = rangeBitSetMap.subMap(lowerKey, true, upperKey, true);