Skip to content

Commit

Permalink
post-rebase fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
dlg99 committed Sep 16, 2024
1 parent d5f5fa7 commit 1c405fe
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -747,7 +747,7 @@ private Throwable tryCompleteCursorRecovery(LedgerHandle lh, byte[] data) {
}
}

PositionImpl position = new PositionImpl(positionInfo);
Position position = PositionFactory.create(positionInfo.getLedgerId(), positionInfo.getEntryId());
if (positionInfo.getIndividualDeletedMessagesCount() > 0) {
recoverIndividualDeletedMessages(positionInfo.getIndividualDeletedMessagesList());
}
Expand Down Expand Up @@ -3333,9 +3333,9 @@ private void buildBatchEntryDeletionIndexInfoList(
return;
}
int count = 0;
Iterator<Map.Entry<PositionImpl, BitSetRecyclable>> iterator = batchDeletedIndexes.entrySet().iterator();
Iterator<Map.Entry<Position, BitSetRecyclable>> iterator = batchDeletedIndexes.entrySet().iterator();
while (iterator.hasNext() && count < maxBatchDeletedIndexToPersist) {
Map.Entry<PositionImpl, BitSetRecyclable> entry = iterator.next();
Map.Entry<Position, BitSetRecyclable> entry = iterator.next();
long[] array = entry.getValue().toLongArray();
consumer.acceptRange(entry.getKey().getLedgerId(), entry.getKey().getEntryId(), array);
count++;
Expand Down Expand Up @@ -3436,7 +3436,7 @@ private void writeToBookKeeperLastChunk(LedgerHandle lh,
VoidCallback callback,
ByteBuf data,
int totalLength,
PositionImpl position,
Position position,
Runnable onFinished) {
lh.asyncAddEntry(data, (rc, lh1, entryId, ctx) -> {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.function.Consumer;

import org.apache.bookkeeper.mledger.Position;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;

final class PositionInfoUtils {
Expand All @@ -35,7 +37,7 @@ interface BatchedEntryDeletionIndexInfoConsumer {
void acceptRange(long ledgerId, long entryId, long[] array);
}

static ByteBuf serializePositionInfo(ManagedCursorImpl.MarkDeleteEntry mdEntry, PositionImpl position,
static ByteBuf serializePositionInfo(ManagedCursorImpl.MarkDeleteEntry mdEntry, Position position,
Consumer<IndividuallyDeletedMessagesRangeConsumer> rangeScanner,
Consumer<BatchedEntryDeletionIndexInfoConsumer> batchDeletedIndexesScanner,
int lastSerializedSize) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3634,7 +3634,7 @@ public void testRecoverCursorCorruptLastEntry() throws Exception {
assertEquals(c.getReadPosition().getEntryId(), 0);
assertEquals(ml.getLastConfirmedEntry().getEntryId(), -1);

c.resetCursor(PositionImpl.LATEST);
c.resetCursor(PositionFactory.LATEST);

// A reset cursor starts out with these values. The rest of the test assumes this, so we assert it here.
assertEquals(c.getMarkDeletedPosition().getEntryId(), -1);
Expand All @@ -3647,7 +3647,7 @@ public void testRecoverCursorCorruptLastEntry() throws Exception {
ml.addEntry(new byte[1]);
ml.addEntry(new byte[1]);

c.resetCursor(PositionImpl.LATEST);
c.resetCursor(PositionFactory.LATEST);
//corrupt last entry
LedgerHandle cursorLedger = (LedgerHandle)FieldUtils.readDeclaredField(c, "cursorLedger", true);
// can't parse json
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import java.util.Map;
import java.util.List;

import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.PositionFactory;
import org.apache.bookkeeper.mledger.proto.LightMLDataFormats;
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
Expand All @@ -38,7 +40,7 @@ public class PositionInfoUtilsTest {

@Test
public void testSerializeDeserialize() throws Exception {
PositionImpl position = new PositionImpl(1, 2);
Position position = PositionFactory.create(1, 2);
ManagedCursorImpl.MarkDeleteEntry entry = new ManagedCursorImpl.MarkDeleteEntry(position,
Map.of("foo", 1L), null, null);

Expand Down Expand Up @@ -79,7 +81,7 @@ public void testSerializeDeserialize() throws Exception {

@Test
public void testSerializeDeserializeEmpty() throws Exception {
PositionImpl position = new PositionImpl(1, 2);
Position position = PositionFactory.create(1, 2);
ManagedCursorImpl.MarkDeleteEntry entry = new ManagedCursorImpl.MarkDeleteEntry(position,
null, null, null);

Expand All @@ -100,7 +102,7 @@ public void testSerializeDeserializeEmpty() throws Exception {

@Test
public void testSerializeDeserialize2() throws Exception {
PositionImpl position = new PositionImpl(1, 2);
Position position = PositionFactory.create(1, 2);
ManagedCursorImpl.MarkDeleteEntry entry = new ManagedCursorImpl.MarkDeleteEntry(position,
Map.of("foo", 1L), null, null);

Expand Down

0 comments on commit 1c405fe

Please sign in to comment.