Skip to content

Commit

Permalink
removed usage of byte[] where possible
Browse files Browse the repository at this point in the history
  • Loading branch information
dlg99 committed Sep 23, 2024
1 parent 8db72f4 commit 1397faf
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,8 @@
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Clock;
import java.util.ArrayDeque;
import java.util.ArrayList;
Expand Down Expand Up @@ -636,7 +634,7 @@ private void recoverFromLedgerByEntryId(ManagedCursorInfo info,
}

LedgerEntry entry = seq.nextElement();
byte[] data = entry.getEntry();
ByteBuf data = entry.getEntryBuffer();
try {
ChunkSequenceFooter chunkSequenceFooter = parseChunkSequenceFooter(data);
if (chunkSequenceFooter.numParts > 0) {
Expand Down Expand Up @@ -672,23 +670,28 @@ private void readChunkSequence(VoidCallback callback, LedgerHandle lh,
lh.asyncReadEntries(startPos, endPos, new AsyncCallback.ReadCallback() {
@Override
public void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> entries, Object ctx) {
ByteArrayOutputStream buffer = new ByteArrayOutputStream();
CompositeByteBuf buffer = PulsarByteBufAllocator.DEFAULT.compositeBuffer();

AtomicInteger readableBytes = new AtomicInteger(0);
entries.asIterator().forEachRemaining(entry -> {
log.info("pos {} len {} bytes ", entry.getEntryId(), entry.getLength());
try {
buffer.write(entry.getEntry());
} catch (IOException err) {
throw new RuntimeException(err);
if (log.isInfoEnabled()) {
log.debug("pos {} len {} bytes ", entry.getEntryId(), entry.getLength());
}
ByteBuf part = entry.getEntryBuffer();
buffer.addComponent(part);
readableBytes.addAndGet(part.readableBytes());
});
byte[] result = buffer.toByteArray();
buffer.writerIndex(readableBytes.get())
.readerIndex(0);

log.info("Read {} chunks, total of {} bytes, expected {} bytes", chunkSequenceFooter.numParts,
result.length, chunkSequenceFooter.length);
if (result.length != chunkSequenceFooter.length) {
buffer.readableBytes(), chunkSequenceFooter.length);
if (buffer.readableBytes() != chunkSequenceFooter.length) {
callback.operationFailed(ManagedLedgerException.getManagedLedgerException(new IOException(
"Expected " + chunkSequenceFooter.length + " bytes but read " + result.length + " bytes")));
"Expected " + chunkSequenceFooter.length + " bytes but read "
+ buffer.readableBytes() + " bytes")));
}
Throwable res = tryCompleteCursorRecovery(lh, result);
Throwable res = tryCompleteCursorRecovery(lh, buffer);
if (res == null) {
callback.operationComplete();
} else {
Expand All @@ -709,32 +712,42 @@ public static final class ChunkSequenceFooter {
private int length;
}

private ChunkSequenceFooter parseChunkSequenceFooter(byte[] data) throws IOException {
if (data.length == 0 || data[0] != '{') {
private ChunkSequenceFooter parseChunkSequenceFooter(ByteBuf data) throws IOException {
// getChar() doesn't move the reader index
if (data.readableBytes() == 0 || data.getByte(0) != '{') {
// this is not JSON
return ChunkSequenceFooter.NOT_CHUNKED;
}
return ObjectMapperFactory.getMapper().getObjectMapper().readValue(data, ChunkSequenceFooter.class);

try {
return ObjectMapperFactory.getMapper().getObjectMapper()
.readValue(data.toString(StandardCharsets.UTF_8), ChunkSequenceFooter.class);
} catch (JsonProcessingException e) {
return ChunkSequenceFooter.NOT_CHUNKED;
}
}

private Throwable tryCompleteCursorRecovery(LedgerHandle lh, byte[] data) {
mbean.addReadCursorLedgerSize(data.length);
private Throwable tryCompleteCursorRecovery(LedgerHandle lh, ByteBuf data) {
mbean.addReadCursorLedgerSize(data.readableBytes());

try {
data = decompressDataIfNeeded(data, lh);
} catch (Throwable e) {
data.release();
log.error("[{}] Failed to decompress position info from ledger {} for cursor {}: {}", ledger.getName(),
lh.getId(), name, e);
return e;
}

PositionInfo positionInfo;
try {
positionInfo = PositionInfo.parseFrom(data);
positionInfo = PositionInfo.parseFrom(data.nioBuffer());
} catch (InvalidProtocolBufferException e) {
log.error("[{}] Failed to parse position info from ledger {} for cursor {}: {}", ledger.getName(),
lh.getId(), name, e);
return e;
} finally {
data.release();
}

Map<String, Long> recoveredProperties = Collections.emptyMap();
Expand Down Expand Up @@ -3492,42 +3505,39 @@ private ByteBuf compressDataIfNeeded(ByteBuf data, LedgerHandle lh) {
result.readerIndex(0)
.writerIndex(4 + compressedSize);

int ratio = (int) (compressedSize * 100.0 / uncompressedSize);
log.info("[{}] Cursor {} Compressed data size {} bytes (with {}, original size {} bytes, ratio {}%)",
ledger.getName(), name, compressedSize, pulsarCursorInfoCompressionString, uncompressedSize, ratio);
if (log.isInfoEnabled()) {
int ratio = (int) (compressedSize * 100.0 / uncompressedSize);
log.info("[{}] Cursor {} Compressed data size {} bytes (with {}, original size {} bytes, ratio {}%)",
ledger.getName(), name, compressedSize, pulsarCursorInfoCompressionString,
uncompressedSize, ratio);
}
return result;
} finally {
data.release();
}
}

static byte[] decompressDataIfNeeded(byte[] data, LedgerHandle lh) {
static ByteBuf decompressDataIfNeeded(ByteBuf data, LedgerHandle lh) {
byte[] pulsarCursorInfoCompression =
lh.getCustomMetadata().get(METADATA_PROPERTY_CURSOR_COMPRESSION_TYPE);
if (pulsarCursorInfoCompression != null) {
String pulsarCursorInfoCompressionString = new String(pulsarCursorInfoCompression);
if (log.isDebugEnabled()) {
log.debug("Ledger {} compression {} decompressing {} bytes, full {}",
lh.getId(), pulsarCursorInfoCompressionString, data.length,
lh.getId(), pulsarCursorInfoCompressionString, data.readableBytes(),
ByteBufUtil.prettyHexDump(Unpooled.wrappedBuffer(data)));
}
ByteArrayInputStream input = new ByteArrayInputStream(data);
DataInputStream dataInputStream = new DataInputStream(input);
try {
int uncompressedSize = dataInputStream.readInt();
byte[] compressedData = dataInputStream.readAllBytes();
// this moves readerIndex
int uncompressedSize = data.readInt();
CompressionCodec compressionCodec = CompressionCodecProvider.getCompressionCodec(
CompressionType.valueOf(pulsarCursorInfoCompressionString));
ByteBuf decode = compressionCodec.decode(Unpooled.wrappedBuffer(compressedData), uncompressedSize);
try {
return ByteBufUtil.getBytes(decode);
} finally {
decode.release();
}
ByteBuf decode = compressionCodec.decode(data, uncompressedSize);
return decode;
} catch (IOException | MalformedInputException error) {
log.error("Cannot decompress cursor position using {}. Payload is {}",
pulsarCursorInfoCompressionString,
ByteBufUtil.prettyHexDump(Unpooled.wrappedBuffer(data)), error);
ByteBufUtil.prettyHexDump(data), error);
throw new RuntimeException(error);
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ public void testCloseCursor() throws Exception {
ledger.addEntry(new byte[]{3});
ledger.addEntry(new byte[]{4});
ledger.addEntry(new byte[]{5});
// Persistent cursor info to ledger.
// Persist cursor info to ledger.
c1.delete(PositionFactory.create(c1.getReadPosition().getLedgerId(), c1.getReadPosition().getEntryId()));
Awaitility.await().until(() ->c1.getStats().getPersistLedgerSucceed() > 0);
// Make cursor ledger can not work.
Expand Down Expand Up @@ -3276,9 +3276,9 @@ public void operationFailed(MetaStoreException e) {
try {
LedgerEntry entry = seq.nextElement();
PositionInfo positionInfo;
byte[] data = entry.getEntry();
ByteBuf data = entry.getEntryBuffer();
data = ManagedCursorImpl.decompressDataIfNeeded(data, lh);
positionInfo = PositionInfo.parseFrom(data);
positionInfo = PositionInfo.parseFrom(data.nioBuffer());
individualDeletedMessagesCount.set(positionInfo.getIndividualDeletedMessagesCount());
} catch (Exception e) {
}
Expand Down

0 comments on commit 1397faf

Please sign in to comment.