Skip to content

Commit

Permalink
[fix][broker] Support large number of unack message store for cursor …
Browse files Browse the repository at this point in the history
…recovery (#9292)
  • Loading branch information
rdhabalia authored Oct 1, 2024
1 parent 5b98d37 commit 9eeffe5
Show file tree
Hide file tree
Showing 10 changed files with 249 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -505,8 +505,10 @@ public int getMaxUnackedRangesToPersistInMetadataStore() {
return maxUnackedRangesToPersistInMetadataStore;
}

public void setMaxUnackedRangesToPersistInMetadataStore(int maxUnackedRangesToPersistInMetadataStore) {
public ManagedLedgerConfig setMaxUnackedRangesToPersistInMetadataStore(
int maxUnackedRangesToPersistInMetadataStore) {
this.maxUnackedRangesToPersistInMetadataStore = maxUnackedRangesToPersistInMetadataStore;
return this;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import org.apache.bookkeeper.client.AsyncCallback.CloseCallback;
import org.apache.bookkeeper.client.AsyncCallback.OpenCallback;
Expand Down Expand Up @@ -91,12 +92,15 @@
import org.apache.bookkeeper.mledger.ScanOutcome;
import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback;
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.LongListMap;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.LongProperty;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.MessageRange;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.PositionInfo;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.PositionInfo.Builder;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.StringProperty;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats;
import org.apache.pulsar.common.util.DateFormatter;
Expand Down Expand Up @@ -606,9 +610,7 @@ protected void recoverFromLedger(final ManagedCursorInfo info, final VoidCallbac
}

Position position = PositionFactory.create(positionInfo.getLedgerId(), positionInfo.getEntryId());
if (positionInfo.getIndividualDeletedMessagesCount() > 0) {
recoverIndividualDeletedMessages(positionInfo.getIndividualDeletedMessagesList());
}
recoverIndividualDeletedMessages(positionInfo);
if (getConfig().isDeletionAtBatchIndexLevelEnabled()
&& positionInfo.getBatchedEntryDeletionIndexInfoCount() > 0) {
recoverBatchDeletedIndexes(positionInfo.getBatchedEntryDeletionIndexInfoList());
Expand All @@ -627,6 +629,45 @@ protected void recoverFromLedger(final ManagedCursorInfo info, final VoidCallbac
}
}

public void recoverIndividualDeletedMessages(PositionInfo positionInfo) {
if (positionInfo.getIndividualDeletedMessagesCount() > 0) {
recoverIndividualDeletedMessages(positionInfo.getIndividualDeletedMessagesList());
} else if (positionInfo.getIndividualDeletedMessageRangesCount() > 0) {
List<LongListMap> rangeList = positionInfo.getIndividualDeletedMessageRangesList();
try {
Map<Long, long[]> rangeMap = rangeList.stream().collect(Collectors.toMap(LongListMap::getKey,
list -> list.getValuesList().stream().mapToLong(i -> i).toArray()));
individualDeletedMessages.build(rangeMap);
} catch (Exception e) {
log.warn("[{}]-{} Failed to recover individualDeletedMessages from serialized data", ledger.getName(),
name, e);
}
}
}

private List<LongListMap> buildLongPropertiesMap(Map<Long, long[]> properties) {
if (properties.isEmpty()) {
return Collections.emptyList();
}
List<LongListMap> longListMap = new ArrayList<>();
MutableInt serializedSize = new MutableInt();
properties.forEach((id, ranges) -> {
if (ranges == null || ranges.length <= 0) {
return;
}
org.apache.bookkeeper.mledger.proto.MLDataFormats.LongListMap.Builder lmBuilder = LongListMap.newBuilder()
.setKey(id);
for (long range : ranges) {
lmBuilder.addValues(range);
}
LongListMap lm = lmBuilder.build();
longListMap.add(lm);
serializedSize.add(lm.getSerializedSize());
});
individualDeletedMessagesSerializedSize = serializedSize.toInteger();
return longListMap;
}

private void recoverIndividualDeletedMessages(List<MLDataFormats.MessageRange> individualDeletedMessagesList) {
lock.writeLock().lock();
try {
Expand Down Expand Up @@ -3125,12 +3166,23 @@ private List<MLDataFormats.BatchedEntryDeletionIndexInfo> buildBatchEntryDeletio

void persistPositionToLedger(final LedgerHandle lh, MarkDeleteEntry mdEntry, final VoidCallback callback) {
Position position = mdEntry.newPosition;
PositionInfo pi = PositionInfo.newBuilder().setLedgerId(position.getLedgerId())
Builder piBuilder = PositionInfo.newBuilder().setLedgerId(position.getLedgerId())
.setEntryId(position.getEntryId())
.addAllIndividualDeletedMessages(buildIndividualDeletedMessageRanges())
.addAllBatchedEntryDeletionIndexInfo(buildBatchEntryDeletionIndexInfoList())
.addAllProperties(buildPropertiesMap(mdEntry.properties)).build();
.addAllProperties(buildPropertiesMap(mdEntry.properties));

Map<Long, long[]> internalRanges = null;
try {
internalRanges = individualDeletedMessages.toRanges(getConfig().getMaxUnackedRangesToPersist());
} catch (Exception e) {
log.warn("[{}]-{} Failed to serialize individualDeletedMessages", ledger.getName(), name, e);
}
if (internalRanges != null && !internalRanges.isEmpty()) {
piBuilder.addAllIndividualDeletedMessageRanges(buildLongPropertiesMap(internalRanges));
} else {
piBuilder.addAllIndividualDeletedMessages(buildIndividualDeletedMessageRanges());
}
PositionInfo pi = piBuilder.build();

if (log.isDebugEnabled()) {
log.debug("[{}] Cursor {} Appending to ledger={} position={}", ledger.getName(), name, lh.getId(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.pulsar.common.util.collections.LongPairRangeSet;
import org.apache.pulsar.common.util.collections.OpenLongPairRangeSet;
Expand Down Expand Up @@ -142,6 +143,16 @@ public Range<T> lastRange() {
return rangeSet.lastRange();
}

@Override
public Map<Long, long[]> toRanges(int maxRanges) {
return rangeSet.toRanges(maxRanges);
}

@Override
public void build(Map<Long, long[]> internalRange) {
rangeSet.build(internalRange);
}

@Override
public int cardinality(long lowerKey, long lowerValue, long upperKey, long upperValue) {
return rangeSet.cardinality(lowerKey, lowerValue, upperKey, upperValue);
Expand Down Expand Up @@ -176,4 +187,22 @@ public boolean isDirtyLedgers(long ledgerId) {
public String toString() {
return rangeSet.toString();
}

@Override
public int hashCode() {
return rangeSet.hashCode();
}

@Override
public boolean equals(Object obj) {
if (!(obj instanceof RangeSetWrapper)) {
return false;
}
if (this == obj) {
return true;
}
@SuppressWarnings("rawtypes")
RangeSetWrapper set = (RangeSetWrapper) obj;
return this.rangeSet.equals(set.rangeSet);
}
}
6 changes: 6 additions & 0 deletions managed-ledger/src/main/proto/MLDataFormats.proto
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,19 @@ message PositionInfo {

// Store which index in the batch message has been deleted
repeated BatchedEntryDeletionIndexInfo batchedEntryDeletionIndexInfo = 5;
repeated LongListMap individualDeletedMessageRanges = 6;
}

message NestedPositionInfo {
required int64 ledgerId = 1;
required int64 entryId = 2;
}

message LongListMap {
required int64 key = 1;
repeated int64 values = 2;
}

message MessageRange {
required NestedPositionInfo lowerEndpoint = 1;
required NestedPositionInfo upperEndpoint = 2;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3223,7 +3223,7 @@ public void testOutOfOrderDeletePersistenceIntoLedgerWithClose() throws Exceptio
managedLedgerConfig.setMaxUnackedRangesToPersistInMetadataStore(10);
ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open(ledgerName, managedLedgerConfig);

ManagedCursorImpl c1 = (ManagedCursorImpl) ledger.openCursor(cursorName);
final ManagedCursorImpl c1 = (ManagedCursorImpl) ledger.openCursor(cursorName);

List<Position> addedPositions = new ArrayList<>();
for (int i = 0; i < totalAddEntries; i++) {
Expand Down Expand Up @@ -3269,7 +3269,8 @@ public void operationFailed(MetaStoreException e) {
LedgerEntry entry = seq.nextElement();
PositionInfo positionInfo;
positionInfo = PositionInfo.parseFrom(entry.getEntry());
individualDeletedMessagesCount.set(positionInfo.getIndividualDeletedMessagesCount());
c1.recoverIndividualDeletedMessages(positionInfo);
individualDeletedMessagesCount.set(c1.getIndividuallyDeletedMessagesSet().asRanges().size());
} catch (Exception e) {
}
latch.countDown();
Expand All @@ -3286,12 +3287,12 @@ public void operationFailed(MetaStoreException e) {
@Cleanup("shutdown")
ManagedLedgerFactory factory2 = new ManagedLedgerFactoryImpl(metadataStore, bkc);
ledger = (ManagedLedgerImpl) factory2.open(ledgerName, managedLedgerConfig);
c1 = (ManagedCursorImpl) ledger.openCursor("c1");
ManagedCursorImpl reopenCursor = (ManagedCursorImpl) ledger.openCursor("c1");
// verify cursor has been recovered
assertEquals(c1.getNumberOfEntriesInBacklog(false), totalAddEntries / 2);
assertEquals(reopenCursor.getNumberOfEntriesInBacklog(false), totalAddEntries / 2);

// try to read entries which should only read non-deleted positions
List<Entry> entries = c1.readEntries(totalAddEntries);
List<Entry> entries = reopenCursor.readEntries(totalAddEntries);
assertEquals(entries.size(), totalAddEntries / 2);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import io.netty.buffer.ByteBuf;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
Expand All @@ -34,7 +33,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import lombok.Cleanup;

import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.BookKeeperTestClient;
import org.apache.bookkeeper.client.api.DigestType;
Expand All @@ -53,9 +52,13 @@
import org.apache.bookkeeper.mledger.util.ThrowableToStringUtil;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats;
import org.apache.pulsar.common.util.collections.LongPairRangeSet;
import org.awaitility.Awaitility;
import org.testng.annotations.Test;

import io.netty.buffer.ByteBuf;
import lombok.Cleanup;

public class ManagedLedgerBkTest extends BookKeeperClusterTestCase {

public ManagedLedgerBkTest() {
Expand Down Expand Up @@ -587,4 +590,44 @@ public void testPeriodicRollover() throws Exception {
Awaitility.await().until(() -> cursorImpl.getCursorLedger() != currentLedgerId);
}

/**
* This test validates that cursor serializes and deserializes individual-ack list from the bk-ledger.
*
* @throws Exception
*/
@Test
public void testUnackmessagesAndRecovery() throws Exception {
ManagedLedgerFactoryConfig factoryConf = new ManagedLedgerFactoryConfig();
factoryConf.setMaxCacheSize(0);

ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(metadataStore, bkc, factoryConf);

ManagedLedgerConfig config = new ManagedLedgerConfig().setEnsembleSize(1).setWriteQuorumSize(1)
.setAckQuorumSize(1).setMetadataEnsembleSize(1).setMetadataWriteQuorumSize(1)
.setMaxUnackedRangesToPersistInMetadataStore(1).setMaxEntriesPerLedger(5).setMetadataAckQuorumSize(1);
ManagedLedger ledger = factory.open("my_test_unack_messages", config);
ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.openCursor("c1");

int totalEntries = 100;
for (int i = 0; i < totalEntries; i++) {
Position p = ledger.addEntry("entry".getBytes());
if (i % 2 == 0) {
cursor.delete(p);
}
}

LongPairRangeSet<Position> unackMessagesBefore = cursor.getIndividuallyDeletedMessagesSet();

ledger.close();

// open and recover cursor
ledger = factory.open("my_test_unack_messages", config);
cursor = (ManagedCursorImpl) ledger.openCursor("c1");

LongPairRangeSet<Position> unackMessagesAfter = cursor.getIndividuallyDeletedMessagesSet();
assertTrue(unackMessagesBefore.equals(unackMessagesAfter));

ledger.close();
factory.shutdown();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public void testRecoverFromNodeDeletion() throws Exception {
Awaitility.await().atMost(Duration.ofSeconds(3)).untilAsserted(() -> Assert.assertEquals(
brokerRegistry.getAvailableBrokersAsync().join(), List.of(pulsar.getBrokerId())));
pulsar.getLocalMetadataStore().delete(brokerMetadataPath, Optional.empty());
Awaitility.await().atMost(Duration.ofSeconds(3)).untilAsserted(() -> Assert.assertEquals(
Awaitility.await().atMost(Duration.ofSeconds(10)).untilAsserted(() -> Assert.assertEquals(
brokerRegistry.getAvailableBrokersAsync().join(), List.of(pulsar.getBrokerId())));

// If the node is deleted by unregister(), it should not recreate the path
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,21 @@
*/
package org.apache.pulsar.common.util.collections;

import static java.util.BitSet.valueOf;
import static java.util.Objects.requireNonNull;
import com.google.common.collect.BoundType;
import com.google.common.collect.Range;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NavigableMap;
import java.util.Objects;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang.mutable.MutableInt;

/**
Expand Down Expand Up @@ -253,6 +258,42 @@ public Range<T> lastRange() {
return Range.openClosed(consumer.apply(lastSet.getKey(), lower), consumer.apply(lastSet.getKey(), upper));
}

@Override
public Map<Long, long[]> toRanges(int maxRanges) {
Map<Long, long[]> internalBitSetMap = new HashMap<>();
AtomicInteger rangeCount = new AtomicInteger();
rangeBitSetMap.forEach((id, bmap) -> {
if (rangeCount.getAndAdd(bmap.cardinality()) > maxRanges) {
return;
}
internalBitSetMap.put(id, bmap.toLongArray());
});
return internalBitSetMap;
}

@Override
public void build(Map<Long, long[]> internalRange) {
internalRange.forEach((id, ranges) -> rangeBitSetMap.put(id, valueOf(ranges)));
}

@Override
public int hashCode() {
return Objects.hashCode(rangeBitSetMap);
}

@Override
public boolean equals(Object obj) {
if (!(obj instanceof ConcurrentOpenLongPairRangeSet)) {
return false;
}
if (this == obj) {
return true;
}
@SuppressWarnings("rawtypes")
ConcurrentOpenLongPairRangeSet set = (ConcurrentOpenLongPairRangeSet) obj;
return this.rangeBitSetMap.equals(set.rangeBitSetMap);
}

@Override
public int cardinality(long lowerKey, long lowerValue, long upperKey, long upperValue) {
NavigableMap<Long, BitSet> subMap = rangeBitSetMap.subMap(lowerKey, true, upperKey, true);
Expand Down
Loading

0 comments on commit 9eeffe5

Please sign in to comment.