Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PositionInfo Util serialization fix and test #272

Merged
merged 3 commits into from
May 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.concurrent.FastThreadLocal;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
Expand Down Expand Up @@ -133,13 +132,6 @@
@SuppressWarnings("checkstyle:javadoctype")
public class ManagedCursorImpl implements ManagedCursor {

private static final FastThreadLocal<LightMLDataFormats.PositionInfo> piThreadLocal = new FastThreadLocal<>() {
@Override
protected LightMLDataFormats.PositionInfo initialValue() {
return new LightMLDataFormats.PositionInfo();
}
};

private static final Comparator<Entry> ENTRY_COMPARATOR = (e1, e2) -> {
if (e1.getLedgerId() != e2.getLedgerId()) {
return e1.getLedgerId() < e2.getLedgerId() ? -1 : 1;
Expand Down Expand Up @@ -488,7 +480,7 @@ public boolean removeProperty(String key) {
if (lastMarkDeleteEntry != null) {
LAST_MARK_DELETE_ENTRY_UPDATER.updateAndGet(this, last -> {
Map<String, Long> properties = last.properties;
if (properties != null && properties.containsKey(key)) {
if (properties != null) {
properties.remove(key);
}
return last;
Expand Down Expand Up @@ -2066,7 +2058,7 @@ public void asyncMarkDelete(final Position position, Map<String, Long> propertie
}
callback.markDeleteFailed(
new ManagedLedgerException("Reset cursor in progress - unable to mark delete position "
+ position.toString()),
+ position),
ctx);
return;
}
Expand Down Expand Up @@ -3277,13 +3269,6 @@ private void buildBatchEntryDeletionIndexInfoList(
}
}

private static ByteBuf toByteBuf(LightMLDataFormats.PositionInfo pi) {
int size = pi.getSerializedSize();
ByteBuf buf = PulsarByteBufAllocator.DEFAULT.buffer(size, size);
pi.writeTo(buf);
return buf;
}

void persistPositionToLedger(final LedgerHandle lh, MarkDeleteEntry mdEntry, final VoidCallback callback) {
long now = System.nanoTime();
PositionImpl position = mdEntry.newPosition;
Expand Down Expand Up @@ -3525,7 +3510,7 @@ boolean shouldCloseLedger(LedgerHandle lh) {
long now = clock.millis();
if (ledger.getFactory().isMetadataServiceAvailable()
&& (lh.getLastAddConfirmed() >= config.getMetadataMaxEntriesPerLedger()
|| lastLedgerSwitchTimestamp < (now - config.getLedgerRolloverTimeout() * 1000))
|| lastLedgerSwitchTimestamp < (now - config.getLedgerRolloverTimeout() * 1000L))
&& (STATE_UPDATER.get(this) != State.Closed && STATE_UPDATER.get(this) != State.Closing)) {
// It's safe to modify the timestamp since this method will be only called from a callback, implying that
// calls will be serialized on one single thread
Expand Down Expand Up @@ -3664,7 +3649,6 @@ private void asyncDeleteLedger(final LedgerHandle lh, int retry) {
ledger.getScheduledExecutor().schedule(() -> asyncDeleteLedger(lh, retry - 1),
DEFAULT_LEDGER_DELETE_BACKOFF_TIME_SEC, TimeUnit.SECONDS);
}
return;
} else {
log.info("[{}][{}] Successfully closed & deleted ledger {} in cursor", ledger.getName(), name,
lh.getId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,9 @@
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.util.collections.BitSetRecyclable;

final class PositionInfoUtils {

Expand All @@ -47,18 +42,18 @@ static ByteBuf serializePositionInfo(ManagedCursorImpl.MarkDeleteEntry mdEntry,
int size = Math.max(lastSerializedSize, 64 * 1024);
ByteBuf _b = PulsarByteBufAllocator.DEFAULT.buffer(size);

int _writeIdx = _b.writerIndex();
LightProtoCodec.writeVarInt(_b, PositionInfo._LEDGER_ID_TAG);
LightProtoCodec.writeVarInt64(_b, position.getLedgerId());
LightProtoCodec.writeVarInt(_b, PositionInfo._ENTRY_ID_TAG);
LightProtoCodec.writeVarInt64(_b, position.getEntryId());

MessageRange _item = new MessageRange();
NestedPositionInfo lower = _item.setLowerEndpoint();
NestedPositionInfo upper = _item.setUpperEndpoint();
rangeScanner.accept(new IndividuallyDeletedMessagesRangeConsumer() {
@Override
public void acceptRange(long lowerLegerId, long lowerEntryId, long upperLedgerId, long upperEntryId) {
_item.clear();
NestedPositionInfo lower = _item.setLowerEndpoint();
NestedPositionInfo upper = _item.setUpperEndpoint();
lower.setLedgerId(lowerLegerId);
lower.setEntryId(lowerEntryId);
upper.setLedgerId(upperLedgerId);
Expand All @@ -82,15 +77,14 @@ public void acceptRange(long lowerLegerId, long lowerEntryId, long upperLedgerId
}

final BatchedEntryDeletionIndexInfo batchDeletedIndexInfo = new BatchedEntryDeletionIndexInfo();
final NestedPositionInfo nestedPositionInfo = batchDeletedIndexInfo.setPosition();

batchDeletedIndexesScanner.accept(new BatchedEntryDeletionIndexInfoConsumer() {
@Override
public void acceptRange(long ledgerId, long entryId, long[] array) {
batchDeletedIndexInfo.clear();
final NestedPositionInfo nestedPositionInfo = batchDeletedIndexInfo.setPosition();
nestedPositionInfo.setLedgerId(ledgerId);
nestedPositionInfo.setEntryId(entryId);
List<Long> deleteSet = new ArrayList<>(array.length);
batchDeletedIndexInfo.clearDeleteSet();
for (long l : array) {
batchDeletedIndexInfo.addDeleteSet(l);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,23 @@
package org.apache.bookkeeper.mledger.impl;

import static org.testng.Assert.*;

import com.google.protobuf.InvalidProtocolBufferException;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import java.util.Map;
import java.util.List;

import org.apache.bookkeeper.mledger.proto.LightMLDataFormats;
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.Test;

public class PositionInfoUtilsTest {
private static final Logger log = LoggerFactory.getLogger(PositionInfoUtilsTest.class);

@Test
public void testSerializeDeserialize() throws Exception {
PositionImpl position = new PositionImpl(1, 2);
Expand Down Expand Up @@ -88,4 +97,118 @@ public void testSerializeDeserializeEmpty() throws Exception {
assertEquals(0, positionInfoParsed.getBatchedEntryDeletionIndexInfoCount());
result.release();
}

@Test
public void testSerializeDeserialize2() throws Exception {
PositionImpl position = new PositionImpl(1, 2);
ManagedCursorImpl.MarkDeleteEntry entry = new ManagedCursorImpl.MarkDeleteEntry(position,
Map.of("foo", 1L), null, null);

final int numRanges = 10000;
ByteBuf result = PositionInfoUtils.serializePositionInfo(entry, position, (scanner) -> {
for (int i = 0; i < numRanges; i++) {
scanner.acceptRange(i*4 + 1, i*4 + 2, i*4 + 3, i*4 + 4);
}
}, (scanner) -> {
long[] array = {7L, 8L};
for (int i = 0; i < numRanges; i++) {
scanner.acceptRange(i*2 + 1, i*2 + 2, array);
}
}, 1024);

// deserialize PIUtils -> lightproto
final int idx = result.readerIndex();
LightMLDataFormats.PositionInfo lighPositionInfoParsed = new LightMLDataFormats.PositionInfo();
lighPositionInfoParsed.parseFrom(result, result.readableBytes());
result.readerIndex(idx);

validateLightproto(lighPositionInfoParsed, numRanges);

// serialize lightproto
int serializedSz = lighPositionInfoParsed.getSerializedSize();
ByteBuf lightResult = PulsarByteBufAllocator.DEFAULT.buffer(serializedSz);
lighPositionInfoParsed.writeTo(lightResult);

byte[] light = ByteBufUtil.getBytes(lightResult);
byte[] util = ByteBufUtil.getBytes(result);

assertEquals(light.length, util.length);

for (int i = 0; i < light.length; i++) {
if (light[i] != util[i]) {
log.error("Mismatch at index {} light={} util={}", i, light[i], util[i]);
}
}

assertEquals(light, util);

// deserialize lightproto -> protobuf
parseProtobufAndValidate(light, numRanges);

// deserialize PIUtils -> protobuf
parseProtobufAndValidate(util, numRanges);

result.release();
lightResult.release();
}

private static void validateLightproto(LightMLDataFormats.PositionInfo lighPositionInfoParsed, int numRanges) {
assertEquals(1, lighPositionInfoParsed.getLedgerId());
assertEquals(2, lighPositionInfoParsed.getEntryId());

assertEquals(1, lighPositionInfoParsed.getPropertiesCount());
assertEquals("foo", lighPositionInfoParsed.getPropertyAt(0).getName());
assertEquals(1, lighPositionInfoParsed.getPropertyAt(0).getValue());

assertEquals(numRanges, lighPositionInfoParsed.getIndividualDeletedMessagesCount());
int curr = 0;
for (int i = 0; i < numRanges; i++) {
assertEquals(i*4 + 1, lighPositionInfoParsed.getIndividualDeletedMessageAt(curr).getLowerEndpoint().getLedgerId());
assertEquals(i*4 + 2, lighPositionInfoParsed.getIndividualDeletedMessageAt(curr).getLowerEndpoint().getEntryId());
assertEquals(i*4 + 3, lighPositionInfoParsed.getIndividualDeletedMessageAt(curr).getUpperEndpoint().getLedgerId());
assertEquals(i*4 + 4, lighPositionInfoParsed.getIndividualDeletedMessageAt(curr).getUpperEndpoint().getEntryId());
curr++;
}

assertEquals(numRanges, lighPositionInfoParsed.getBatchedEntryDeletionIndexInfosCount());
curr = 0;
for (int i = 0; i < numRanges; i++) {
assertEquals(i*2 + 1, lighPositionInfoParsed.getBatchedEntryDeletionIndexInfoAt(curr).getPosition().getLedgerId());
assertEquals(i*2 + 2, lighPositionInfoParsed.getBatchedEntryDeletionIndexInfoAt(curr).getPosition().getEntryId());
assertEquals(7L, lighPositionInfoParsed.getBatchedEntryDeletionIndexInfoAt(curr).getDeleteSetAt(0));
assertEquals(8L, lighPositionInfoParsed.getBatchedEntryDeletionIndexInfoAt(curr).getDeleteSetAt(1));
curr++;
}
}

private static void parseProtobufAndValidate(byte[] data, int numRanges) throws InvalidProtocolBufferException {
MLDataFormats.PositionInfo positionInfoParsed = MLDataFormats.PositionInfo.parseFrom(data);

assertEquals(1, positionInfoParsed.getLedgerId());
assertEquals(2, positionInfoParsed.getEntryId());

assertEquals(1, positionInfoParsed.getPropertiesCount());
assertEquals("foo", positionInfoParsed.getProperties(0).getName());
assertEquals(1, positionInfoParsed.getProperties(0).getValue());

assertEquals(numRanges, positionInfoParsed.getIndividualDeletedMessagesCount());
int curr = 0;
for (int i = 0; i < numRanges; i++) {
assertEquals(i*4 + 1, positionInfoParsed.getIndividualDeletedMessages(curr).getLowerEndpoint().getLedgerId());
assertEquals(i*4 + 2, positionInfoParsed.getIndividualDeletedMessages(curr).getLowerEndpoint().getEntryId());
assertEquals(i*4 + 3, positionInfoParsed.getIndividualDeletedMessages(curr).getUpperEndpoint().getLedgerId());
assertEquals(i*4 + 4, positionInfoParsed.getIndividualDeletedMessages(curr).getUpperEndpoint().getEntryId());
curr++;
}

assertEquals(numRanges, positionInfoParsed.getBatchedEntryDeletionIndexInfoCount());
curr = 0;
for (int i = 0; i < numRanges; i++) {
assertEquals(i*2 + 1, positionInfoParsed.getBatchedEntryDeletionIndexInfo(curr).getPosition().getLedgerId());
assertEquals(i*2 + 2, positionInfoParsed.getBatchedEntryDeletionIndexInfo(curr).getPosition().getEntryId());
assertEquals(List.of(7L, 8L), positionInfoParsed.getBatchedEntryDeletionIndexInfo(curr).getDeleteSetList());
curr++;
}
}

}
Loading