Skip to content

Commit

Permalink
try ledger recovery from previous entries in case of corrupt/missing …
Browse files Browse the repository at this point in the history
…footer of the chunked data (apache#282)

(cherry picked from commit 6e72ecb)
  • Loading branch information
dlg99 committed May 28, 2024
1 parent bfc84fb commit 794ae7d
Show file tree
Hide file tree
Showing 3 changed files with 188 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,15 @@
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.time.Clock;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
Expand Down Expand Up @@ -75,6 +77,7 @@
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.ToString;
import org.apache.bookkeeper.client.AsyncCallback;
import org.apache.bookkeeper.client.AsyncCallback.CloseCallback;
import org.apache.bookkeeper.client.AsyncCallback.OpenCallback;
import org.apache.bookkeeper.client.BKException;
Expand Down Expand Up @@ -244,6 +247,8 @@ public class ManagedCursorImpl implements ManagedCursor {
// active state cache in ManagedCursor. It should be in sync with the state in activeCursors in ManagedLedger.
private volatile boolean isActive = false;

protected int maxPositionChunkSize = 1024 * 1024;

static class MarkDeleteEntry {
final PositionImpl newPosition;
final MarkDeleteCallback callback;
Expand Down Expand Up @@ -581,71 +586,9 @@ protected void recoverFromLedger(final ManagedCursorInfo info, final VoidCallbac

// Read the last entry in the ledger
long lastEntryInLedger = lh.getLastAddConfirmed();

if (lastEntryInLedger < 0) {
log.warn("[{}] Error reading from metadata ledger {} for cursor {}: No entries in ledger",
ledger.getName(), ledgerId, name);
// Rewind to last cursor snapshot available
initialize(getRollbackPosition(info), Collections.emptyMap(), cursorProperties, callback);
return;
}

lh.asyncReadEntries(lastEntryInLedger, lastEntryInLedger, (rc1, lh1, seq, ctx1) -> {
if (log.isDebugEnabled()) {
log.debug("[{}} readComplete rc={} entryId={}", ledger.getName(), rc1, lh1.getLastAddConfirmed());
}
if (isBkErrorNotRecoverable(rc1)) {
log.error("[{}] Error reading from metadata ledger {} for cursor {}: {}", ledger.getName(),
ledgerId, name, BKException.getMessage(rc1));
// Rewind to oldest entry available
initialize(getRollbackPosition(info), Collections.emptyMap(), cursorProperties, callback);
return;
} else if (rc1 != BKException.Code.OK) {
log.warn("[{}] Error reading from metadata ledger {} for cursor {}: {}", ledger.getName(),
ledgerId, name, BKException.getMessage(rc1));

callback.operationFailed(createManagedLedgerException(rc1));
return;
}

LedgerEntry entry = seq.nextElement();
mbean.addReadCursorLedgerSize(entry.getLength());
PositionInfo positionInfo;
try {
byte[] data = entry.getEntry();
data = decompressDataIfNeeded(data, lh);
positionInfo = PositionInfo.parseFrom(data);
} catch (InvalidProtocolBufferException e) {
callback.operationFailed(new ManagedLedgerException(e));
return;
}
log.info("[{}] Cursor {} recovered to position {}", ledger.getName(), name, positionInfo);

Map<String, Long> recoveredProperties = Collections.emptyMap();
if (positionInfo.getPropertiesCount() > 0) {
// Recover properties map
recoveredProperties = new HashMap<>();
for (int i = 0; i < positionInfo.getPropertiesCount(); i++) {
LongProperty property = positionInfo.getProperties(i);
recoveredProperties.put(property.getName(), property.getValue());
}
}

log.info("[{}] Cursor {} recovered with recoveredProperties {}, individualDeletedMessagesCount {}",
ledger.getName(), name, recoveredProperties, positionInfo.getIndividualDeletedMessagesCount());

PositionImpl position = new PositionImpl(positionInfo);
if (positionInfo.getIndividualDeletedMessagesCount() > 0) {
recoverIndividualDeletedMessages(positionInfo.getIndividualDeletedMessagesList());
}
if (getConfig().isDeletionAtBatchIndexLevelEnabled()
&& positionInfo.getBatchedEntryDeletionIndexInfoCount() > 0) {
recoverBatchDeletedIndexes(positionInfo.getBatchedEntryDeletionIndexInfoList());
}
recoveredCursor(position, recoveredProperties, cursorProperties, lh);
callback.operationComplete();
}, null);
recoverFromLedgerByEntryId(info, callback, lh, lastEntryInLedger);
};

try {
bookkeeper.asyncOpenLedger(ledgerId, digestType, getConfig().getPassword(), openCallback,
null);
Expand All @@ -656,6 +599,101 @@ protected void recoverFromLedger(final ManagedCursorInfo info, final VoidCallbac
}
}

private void recoverFromLedgerByEntryId(ManagedCursorInfo info,
VoidCallback callback,
LedgerHandle lh,
long entryId) {
long ledgerId = lh.getId();

if (entryId < 0) {
log.warn("[{}] Error reading from metadata ledger {} for cursor {}: No valid entries in ledger",
ledger.getName(), ledgerId, name);
// Rewind to last cursor snapshot available
initialize(getRollbackPosition(info), Collections.emptyMap(), cursorProperties, callback);
return;
}

lh.asyncReadEntries(entryId, entryId, (rc1, lh1, seq, ctx1) -> {
if (log.isDebugEnabled()) {
log.debug("[{}} readComplete rc={} entryId={}", ledger.getName(), rc1, lh1.getLastAddConfirmed());
}
if (isBkErrorNotRecoverable(rc1)) {
log.error("[{}] Error reading from metadata ledger {} for cursor {}: {}", ledger.getName(),
ledgerId, name, BKException.getMessage(rc1));
// Rewind to oldest entry available
initialize(getRollbackPosition(info), Collections.emptyMap(), cursorProperties, callback);
return;
} else if (rc1 != BKException.Code.OK) {
log.warn("[{}] Error reading from metadata ledger {} for cursor {}: {}", ledger.getName(),
ledgerId, name, BKException.getMessage(rc1));

callback.operationFailed(createManagedLedgerException(rc1));
return;
}

LedgerEntry entry = seq.nextElement();
byte[] data = entry.getEntry();
try {
ChunkSequenceFooter chunkSequenceFooter = parseChunkSequenceFooter(data);
if (chunkSequenceFooter.numParts > 0) {
readChunkSequence(callback, lh, entryId, chunkSequenceFooter);
} else {
Throwable res = tryCompleteCursorRecovery(lh, data);
if (res == null) {
callback.operationComplete();
} else {
log.warn("[{}] Error recovering from metadata ledger {} entry {} for cursor {}. "
+ "Will try recovery from previous entry.",
ledger.getName(), ledgerId, entryId, name, res);
//try recovery from previous entry
recoverFromLedgerByEntryId(info, callback, lh, entryId - 1);
}
}
} catch (IOException error) {
log.error("Cannot parse footer", error);
log.warn("[{}] Error recovering from metadata ledger {} entry {} for cursor {}, cannot parse footer. "
+ "Will try recovery from previous entry.",
ledger.getName(), ledgerId, entryId, name, error);
recoverFromLedgerByEntryId(info, callback, lh, entryId - 1);
}
}, null);
}

private void readChunkSequence(VoidCallback callback, LedgerHandle lh,
long footerPosition, ChunkSequenceFooter chunkSequenceFooter) {
long startPos = footerPosition - chunkSequenceFooter.numParts;
long endPos = footerPosition - 1;
log.info("readChunkSequence from pos {}, num parts {}, startPos {}, endPos {}",
footerPosition, chunkSequenceFooter.numParts, startPos, endPos);
lh.asyncReadEntries(startPos, endPos, new AsyncCallback.ReadCallback() {
@Override
public void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> entries, Object ctx) {
ByteArrayOutputStream buffer = new ByteArrayOutputStream();
entries.asIterator().forEachRemaining(entry -> {
log.info("pos {} len {} bytes ", entry.getEntryId(), entry.getLength());
try {
buffer.write(entry.getEntry());
} catch (IOException err) {
throw new RuntimeException(err);
}
});
byte[] result = buffer.toByteArray();
log.info("Read {} chunks, total of {} bytes, expected {} bytes", chunkSequenceFooter.numParts,
result.length, chunkSequenceFooter.length);
if (result.length != chunkSequenceFooter.length) {
callback.operationFailed(ManagedLedgerException.getManagedLedgerException(new IOException(
"Expected " + chunkSequenceFooter.length + " bytes but read " + result.length + " bytes")));
}
Throwable res = tryCompleteCursorRecovery(lh, result);
if (res == null) {
callback.operationComplete();
} else {
callback.operationFailed(new ManagedLedgerException(res));
}
}
}, null);
}

@AllArgsConstructor
@NoArgsConstructor
@Getter
Expand All @@ -675,16 +713,15 @@ private ChunkSequenceFooter parseChunkSequenceFooter(byte[] data) throws IOExcep
return ObjectMapperFactory.getMapper().getObjectMapper().readValue(data, ChunkSequenceFooter.class);
}

private void completeCursorRecovery(VoidCallback callback, LedgerHandle lh, byte[] data) {
private Throwable tryCompleteCursorRecovery(LedgerHandle lh, byte[] data) {
mbean.addReadCursorLedgerSize(data.length);

try {
data = decompressDataIfNeeded(data, lh);
} catch (Throwable e) {
log.error("[{}] Failed to decompress position info from ledger {} for cursor {}: {}", ledger.getName(),
lh.getId(), name, e);
callback.operationFailed(new ManagedLedgerException(e));
return;
return e;
}

PositionInfo positionInfo;
Expand All @@ -693,8 +730,7 @@ private void completeCursorRecovery(VoidCallback callback, LedgerHandle lh, byt
} catch (InvalidProtocolBufferException e) {
log.error("[{}] Failed to parse position info from ledger {} for cursor {}: {}", ledger.getName(),
lh.getId(), name, e);
callback.operationFailed(new ManagedLedgerException(e));
return;
return e;
}

Map<String, Long> recoveredProperties = Collections.emptyMap();
Expand All @@ -716,7 +752,7 @@ private void completeCursorRecovery(VoidCallback callback, LedgerHandle lh, byt
recoverBatchDeletedIndexes(positionInfo.getBatchedEntryDeletionIndexInfoList());
}
recoveredCursor(position, recoveredProperties, cursorProperties, lh);
callback.operationComplete();
return null;
}

private void recoverIndividualDeletedMessages(List<MLDataFormats.MessageRange> individualDeletedMessagesList) {
Expand Down Expand Up @@ -3282,6 +3318,7 @@ private void buildBatchEntryDeletionIndexInfoList(
}

void persistPositionToLedger(final LedgerHandle lh, MarkDeleteEntry mdEntry, final VoidCallback callback) {
checkArgument(maxPositionChunkSize > 0, "maxPositionChunkSize mus be greater than zero");
long now = System.nanoTime();
PositionImpl position = mdEntry.newPosition;

Expand All @@ -3302,10 +3339,9 @@ void persistPositionToLedger(final LedgerHandle lh, MarkDeleteEntry mdEntry, fin

long endCompress = System.nanoTime();

int maxSize = 1024 * 1024;
int offset = 0;
final int len = data.readableBytes();
int numParts = 1 + (len / maxSize);
int numParts = 1 + (len / maxPositionChunkSize);

if (log.isDebugEnabled()) {
log.debug("[{}] Cursor {} Appending to ledger={} position={} data size {} bytes, numParts {}",
Expand All @@ -3328,7 +3364,7 @@ void persistPositionToLedger(final LedgerHandle lh, MarkDeleteEntry mdEntry, fin
int part = 0;
while (part != numParts) {
int remaining = len - offset;
int currentLen = Math.min(maxSize, remaining);
int currentLen = Math.min(maxPositionChunkSize, remaining);
boolean isLast = part == numParts - 1;

if (log.isDebugEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
Expand All @@ -98,6 +99,7 @@
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.PositionInfo;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.common.api.proto.IntRange;
import org.apache.pulsar.common.util.FutureUtil;
Expand Down Expand Up @@ -3618,6 +3620,82 @@ public void operationFailed(ManagedLedgerException exception) {
assertEquals(c.getReadPosition(), readPositionBeforeRecover);
assertEquals(c.getNumberOfEntries(), 2L);
}

@Test(timeOut = 20000)
public void testRecoverCursorCorruptLastEntry() throws Exception {
ManagedLedger ml = factory.open("testRecoverCursorCorruptLastEntry");
ManagedCursorImpl c = (ManagedCursorImpl) ml.openCursor("sub", CommandSubscribe.InitialPosition.Latest);
// force chunking
c.maxPositionChunkSize = 2;

// A new cursor starts out with these values. The rest of the test assumes this, so we assert it here.
assertEquals(c.getMarkDeletedPosition().getEntryId(), -1);
assertEquals(c.getReadPosition().getEntryId(), 0);
assertEquals(ml.getLastConfirmedEntry().getEntryId(), -1);

c.resetCursor(PositionImpl.LATEST);

// A reset cursor starts out with these values. The rest of the test assumes this, so we assert it here.
assertEquals(c.getMarkDeletedPosition().getEntryId(), -1);
assertEquals(c.getReadPosition().getEntryId(), 0);
assertEquals(ml.getLastConfirmedEntry().getEntryId(), -1);

// Trigger the lastConfirmedEntry to move forward
ml.addEntry(new byte[1]);
ml.addEntry(new byte[1]);
ml.addEntry(new byte[1]);
ml.addEntry(new byte[1]);

c.resetCursor(PositionImpl.LATEST);
//corrupt last entry
LedgerHandle cursorLedger = (LedgerHandle)FieldUtils.readDeclaredField(c, "cursorLedger", true);
// can't parse json
cursorLedger.addEntry("{{".getBytes());
// can't parse PositionInfo protobuf
cursorLedger.addEntry("aa".getBytes());

assertEquals(c.getMarkDeletedPosition().getEntryId(), 3);
assertEquals(c.getReadPosition().getEntryId(), 4);
assertEquals(ml.getLastConfirmedEntry().getEntryId(), 3);

// Publish messages to move the lastConfirmedEntry field forward
ml.addEntry(new byte[1]);
ml.addEntry(new byte[1]);

final Position markDeleteBeforeRecover = c.getMarkDeletedPosition();
final Position readPositionBeforeRecover = c.getReadPosition();

ManagedCursorInfo info = ManagedCursorInfo.newBuilder()
.setCursorsLedgerId(c.getCursorLedger())
.setMarkDeleteLedgerId(markDeleteBeforeRecover.getLedgerId())
.setMarkDeleteEntryId(markDeleteBeforeRecover.getEntryId())
.setLastActive(0L)
.build();

CountDownLatch latch = new CountDownLatch(1);
AtomicBoolean failed = new AtomicBoolean(false);
c.recoverFromLedger(info, new VoidCallback() {
@Override
public void operationComplete() {
latch.countDown();
}

@Override
public void operationFailed(ManagedLedgerException exception) {
failed.set(true);
latch.countDown();
}
});

latch.await();
if (failed.get()) {
fail("Cursor recovery should not fail");
}
assertEquals(c.getMarkDeletedPosition(), markDeleteBeforeRecover);
assertEquals(c.getReadPosition(), readPositionBeforeRecover);
assertEquals(c.getNumberOfEntries(), 2L);
}

@Test
void testAlwaysInactive() throws Exception {
ManagedLedger ml = factory.open("testAlwaysInactive");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3256,7 +3256,7 @@ public void testManagedLedgerWithAddEntryTimeOut() throws Exception {
class MockLedgerHandle extends PulsarMockLedgerHandle {
public MockLedgerHandle(PulsarMockBookKeeper bk, long id, DigestType digest, byte[] passwd)
throws GeneralSecurityException {
super(bk, id, digest, passwd);
super(bk, id, digest, passwd, null);
}

@Override
Expand Down

0 comments on commit 794ae7d

Please sign in to comment.