diff --git a/s3stream/src/main/java/com/automq/stream/DirectByteBufSeqAlloc.java b/s3stream/src/main/java/com/automq/stream/DirectByteBufSeqAlloc.java index 59ae5e351..854ae2120 100644 --- a/s3stream/src/main/java/com/automq/stream/DirectByteBufSeqAlloc.java +++ b/s3stream/src/main/java/com/automq/stream/DirectByteBufSeqAlloc.java @@ -11,7 +11,7 @@ package com.automq.stream; -import com.automq.stream.s3.DirectByteBufAlloc; +import com.automq.stream.s3.ByteBufAlloc; import io.netty.buffer.ByteBuf; import io.netty.buffer.CompositeByteBuf; import java.util.concurrent.atomic.AtomicReference; @@ -25,14 +25,14 @@ public class DirectByteBufSeqAlloc { public DirectByteBufSeqAlloc(int allocType) { this.allocType = allocType; for (int i = 0; i < hugeBufArray.length; i++) { - hugeBufArray[i] = new AtomicReference<>(new HugeBuf(DirectByteBufAlloc.byteBuffer(HUGE_BUF_SIZE, allocType))); + hugeBufArray[i] = new AtomicReference<>(new HugeBuf(ByteBufAlloc.byteBuffer(HUGE_BUF_SIZE, allocType))); } } public ByteBuf byteBuffer(int capacity) { if (capacity >= HUGE_BUF_SIZE) { // if the request capacity is larger than HUGE_BUF_SIZE, just allocate a new ByteBuf - return DirectByteBufAlloc.byteBuffer(capacity, allocType); + return ByteBufAlloc.byteBuffer(capacity, allocType); } int bufIndex = Math.abs(Thread.currentThread().hashCode() % hugeBufArray.length); @@ -53,13 +53,13 @@ public ByteBuf byteBuffer(int capacity) { // 1. slice the remaining of the current hugeBuf and release the hugeBuf // 2. create a new hugeBuf and slice the remaining of the required capacity // 3. return the composite ByteBuf of the two slices - CompositeByteBuf cbf = DirectByteBufAlloc.compositeByteBuffer(); + CompositeByteBuf cbf = ByteBufAlloc.compositeByteBuffer(); int readLength = hugeBuf.buf.capacity() - hugeBuf.nextIndex; cbf.addComponent(false, hugeBuf.buf.retainedSlice(hugeBuf.nextIndex, readLength)); capacity -= readLength; hugeBuf.buf.release(); - HugeBuf newHugeBuf = new HugeBuf(DirectByteBufAlloc.byteBuffer(HUGE_BUF_SIZE, allocType)); + HugeBuf newHugeBuf = new HugeBuf(ByteBufAlloc.byteBuffer(HUGE_BUF_SIZE, allocType)); bufRef.set(newHugeBuf); cbf.addComponent(false, newHugeBuf.buf.retainedSlice(0, capacity)); diff --git a/s3stream/src/main/java/com/automq/stream/s3/DirectByteBufAlloc.java b/s3stream/src/main/java/com/automq/stream/s3/ByteBufAlloc.java similarity index 71% rename from s3stream/src/main/java/com/automq/stream/s3/DirectByteBufAlloc.java rename to s3stream/src/main/java/com/automq/stream/s3/ByteBufAlloc.java index cdf51d469..faa45a37e 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/DirectByteBufAlloc.java +++ b/s3stream/src/main/java/com/automq/stream/s3/ByteBufAlloc.java @@ -12,9 +12,13 @@ package com.automq.stream.s3; import com.automq.stream.WrappedByteBuf; +import io.netty.buffer.AbstractByteBufAllocator; import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocatorMetric; +import io.netty.buffer.ByteBufAllocatorMetricProvider; import io.netty.buffer.CompositeByteBuf; import io.netty.buffer.PooledByteBufAllocator; +import io.netty.buffer.UnpooledByteBufAllocator; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -22,11 +26,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class DirectByteBufAlloc { +public class ByteBufAlloc { public static final boolean MEMORY_USAGE_DETECT = Boolean.parseBoolean(System.getenv("AUTOMQ_MEMORY_USAGE_DETECT")); + public static final boolean ALLOCATOR_USAGE_UNPOOLED = Boolean.parseBoolean(System.getenv("AUTOMQ_ALLOCATOR_USAGE_UNPOOLED")); + public static final boolean BUFFER_USAGE_HEAPED = Boolean.parseBoolean(System.getenv("AUTOMQ_BUFFER_USAGE_HEAPED")); - private static final Logger LOGGER = LoggerFactory.getLogger(DirectByteBufAlloc.class); - private static final PooledByteBufAllocator ALLOC = PooledByteBufAllocator.DEFAULT; + private static final Logger LOGGER = LoggerFactory.getLogger(ByteBufAlloc.class); + private static final AbstractByteBufAllocator ALLOC = ALLOCATOR_USAGE_UNPOOLED ? UnpooledByteBufAllocator.DEFAULT : PooledByteBufAllocator.DEFAULT; private static final Map USAGE_STATS = new ConcurrentHashMap<>(); private static long lastMetricLogTime = System.currentTimeMillis(); private static final Map ALLOC_TYPE = new HashMap<>(); @@ -81,17 +87,17 @@ public static ByteBuf byteBuffer(int initCapacity, int type) { if (now - lastMetricLogTime > 60000) { // it's ok to be not thread safe lastMetricLogTime = now; - DirectByteBufAlloc.directByteBufAllocMetric = new DirectByteBufAllocMetric(); - LOGGER.info("Direct Memory usage: {}", DirectByteBufAlloc.directByteBufAllocMetric); + ByteBufAlloc.directByteBufAllocMetric = new DirectByteBufAllocMetric(); + LOGGER.info("Direct Memory usage: {}", ByteBufAlloc.directByteBufAllocMetric); } - return new WrappedByteBuf(ALLOC.directBuffer(initCapacity), () -> usage.add(-initCapacity)); + return new WrappedByteBuf(BUFFER_USAGE_HEAPED ? ALLOC.heapBuffer(initCapacity) : ALLOC.directBuffer(initCapacity), () -> usage.add(-initCapacity)); } else { - return ALLOC.directBuffer(initCapacity); + return BUFFER_USAGE_HEAPED ? ALLOC.heapBuffer(initCapacity) : ALLOC.directBuffer(initCapacity); } } catch (OutOfMemoryError e) { if (MEMORY_USAGE_DETECT) { - DirectByteBufAlloc.directByteBufAllocMetric = new DirectByteBufAllocMetric(); - LOGGER.error("alloc direct buffer OOM, {}", DirectByteBufAlloc.directByteBufAllocMetric, e); + ByteBufAlloc.directByteBufAllocMetric = new DirectByteBufAllocMetric(); + LOGGER.error("alloc direct buffer OOM, {}", ByteBufAlloc.directByteBufAllocMetric, e); } else { LOGGER.error("alloc direct buffer OOM", e); } @@ -109,20 +115,21 @@ public static void registerAllocType(int type, String name) { } public static class DirectByteBufAllocMetric { - private final long usedDirectMemory; - private final long allocatedDirectMemory; + private final long usedMemory; + private final long allocatedMemory; private final Map detail = new HashMap<>(); public DirectByteBufAllocMetric() { USAGE_STATS.forEach((k, v) -> { detail.put(k + "/" + ALLOC_TYPE.get(k), v.longValue()); }); - this.usedDirectMemory = ALLOC.metric().usedDirectMemory(); - this.allocatedDirectMemory = this.detail.values().stream().mapToLong(Long::longValue).sum(); + ByteBufAllocatorMetric metric = ((ByteBufAllocatorMetricProvider) ALLOC).metric(); + this.usedMemory = BUFFER_USAGE_HEAPED ? metric.usedHeapMemory() : metric.usedDirectMemory(); + this.allocatedMemory = this.detail.values().stream().mapToLong(Long::longValue).sum(); } - public long getUsedDirectMemory() { - return usedDirectMemory; + public long getUsedMemory() { + return usedMemory; } public Map getDetailedMap() { @@ -131,14 +138,18 @@ public Map getDetailedMap() { @Override public String toString() { - StringBuilder sb = new StringBuilder("DirectByteBufAllocMetric{usedDirectMemory="); - sb.append(usedDirectMemory); - sb.append(", allocatedDirectMemory="); - sb.append(allocatedDirectMemory); + StringBuilder sb = new StringBuilder("DirectByteBufAllocMetric{usedMemory="); + sb.append(usedMemory); + sb.append(", allocatedMemory="); + sb.append(allocatedMemory); sb.append(", detail="); for (Map.Entry entry : detail.entrySet()) { sb.append(entry.getKey()).append("=").append(entry.getValue()).append(","); } + sb.append(", pooled="); + sb.append(!ALLOCATOR_USAGE_UNPOOLED); + sb.append(", direct="); + sb.append(!BUFFER_USAGE_HEAPED); sb.append("}"); return sb.toString(); } diff --git a/s3stream/src/main/java/com/automq/stream/s3/ObjectReader.java b/s3stream/src/main/java/com/automq/stream/s3/ObjectReader.java index 3e9969aeb..d5441408a 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/ObjectReader.java +++ b/s3stream/src/main/java/com/automq/stream/s3/ObjectReader.java @@ -28,7 +28,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static com.automq.stream.s3.DirectByteBufAlloc.READ_INDEX_BLOCK; +import static com.automq.stream.s3.ByteBufAlloc.READ_INDEX_BLOCK; import static com.automq.stream.s3.ObjectWriter.Footer.FOOTER_SIZE; import static com.automq.stream.s3.metadata.ObjectUtils.NOOP_OFFSET; @@ -145,7 +145,7 @@ public static BasicObjectInfo parse(ByteBuf objectTailBuf, // trim the ByteBuf to avoid extra memory occupy. ByteBuf indexBlockBuf = objectTailBuf.slice(objectTailBuf.readerIndex() + indexRelativePosition, indexBlockSize); - ByteBuf copy = DirectByteBufAlloc.byteBuffer(indexBlockBuf.readableBytes(), READ_INDEX_BLOCK); + ByteBuf copy = ByteBufAlloc.byteBuffer(indexBlockBuf.readableBytes(), READ_INDEX_BLOCK); indexBlockBuf.readBytes(copy, indexBlockBuf.readableBytes()); objectTailBuf.release(); indexBlockBuf = copy; diff --git a/s3stream/src/main/java/com/automq/stream/s3/ObjectWriter.java b/s3stream/src/main/java/com/automq/stream/s3/ObjectWriter.java index 0d1431a8c..85863fb5e 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/ObjectWriter.java +++ b/s3stream/src/main/java/com/automq/stream/s3/ObjectWriter.java @@ -24,9 +24,9 @@ import java.util.List; import java.util.concurrent.CompletableFuture; -import static com.automq.stream.s3.DirectByteBufAlloc.WRITE_DATA_BLOCK_HEADER; -import static com.automq.stream.s3.DirectByteBufAlloc.WRITE_FOOTER; -import static com.automq.stream.s3.DirectByteBufAlloc.WRITE_INDEX_BLOCK; +import static com.automq.stream.s3.ByteBufAlloc.WRITE_DATA_BLOCK_HEADER; +import static com.automq.stream.s3.ByteBufAlloc.WRITE_FOOTER; +import static com.automq.stream.s3.ByteBufAlloc.WRITE_INDEX_BLOCK; /** * Write stream records to a single object. @@ -130,7 +130,7 @@ private synchronized void tryUploadPart() { } } if (partFull) { - CompositeByteBuf partBuf = DirectByteBufAlloc.compositeByteBuffer(); + CompositeByteBuf partBuf = ByteBufAlloc.compositeByteBuffer(); for (DataBlock block : uploadBlocks) { waitingUploadBlocksSize -= block.size(); partBuf.addComponent(true, block.buffer()); @@ -145,7 +145,7 @@ private synchronized void tryUploadPart() { } public CompletableFuture close() { - CompositeByteBuf buf = DirectByteBufAlloc.compositeByteBuffer(); + CompositeByteBuf buf = ByteBufAlloc.compositeByteBuffer(); for (DataBlock block : waitingUploadBlocks) { buf.addComponent(true, block.buffer()); completedBlocks.add(block); @@ -197,7 +197,7 @@ class IndexBlock { public IndexBlock() { long nextPosition = 0; int indexBlockSize = DataBlockIndex.BLOCK_INDEX_SIZE * completedBlocks.size(); - buf = DirectByteBufAlloc.byteBuffer(indexBlockSize, WRITE_INDEX_BLOCK); + buf = ByteBufAlloc.byteBuffer(indexBlockSize, WRITE_INDEX_BLOCK); for (DataBlock block : completedBlocks) { ObjectStreamRange streamRange = block.getStreamRange(); new DataBlockIndex(streamRange.getStreamId(), streamRange.getStartOffset(), (int) (streamRange.getEndOffset() - streamRange.getStartOffset()), @@ -230,8 +230,8 @@ class DataBlock { public DataBlock(long streamId, List records) { this.recordCount = records.size(); - this.encodedBuf = DirectByteBufAlloc.compositeByteBuffer(); - ByteBuf header = DirectByteBufAlloc.byteBuffer(BLOCK_HEADER_SIZE, WRITE_DATA_BLOCK_HEADER); + this.encodedBuf = ByteBufAlloc.compositeByteBuffer(); + ByteBuf header = ByteBufAlloc.byteBuffer(BLOCK_HEADER_SIZE, WRITE_DATA_BLOCK_HEADER); header.writeByte(DATA_BLOCK_MAGIC); header.writeByte(DATA_BLOCK_DEFAULT_FLAG); header.writeInt(recordCount); @@ -266,7 +266,7 @@ class Footer { private final ByteBuf buf; public Footer(long indexStartPosition, int indexBlockLength) { - buf = DirectByteBufAlloc.byteBuffer(FOOTER_SIZE, WRITE_FOOTER); + buf = ByteBufAlloc.byteBuffer(FOOTER_SIZE, WRITE_FOOTER); // start position of index block buf.writeLong(indexStartPosition); // size of index block diff --git a/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java b/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java index 27c27cc4d..8c1dc718a 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java +++ b/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java @@ -929,7 +929,7 @@ public DeltaWALUploadTaskContext(LogCache.LogCacheBlock cache) { } } - class LogCacheEvictOOMHandler implements DirectByteBufAlloc.OOMHandler { + class LogCacheEvictOOMHandler implements ByteBufAlloc.OOMHandler { @Override public int handle(int memoryRequired) { try { diff --git a/s3stream/src/main/java/com/automq/stream/s3/StreamObjectCompactor.java b/s3stream/src/main/java/com/automq/stream/s3/StreamObjectCompactor.java index 035ea9452..261f53446 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/StreamObjectCompactor.java +++ b/s3stream/src/main/java/com/automq/stream/s3/StreamObjectCompactor.java @@ -32,8 +32,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static com.automq.stream.s3.DirectByteBufAlloc.STREAM_OBJECT_COMPACTION_READ; -import static com.automq.stream.s3.DirectByteBufAlloc.STREAM_OBJECT_COMPACTION_WRITE; +import static com.automq.stream.s3.ByteBufAlloc.STREAM_OBJECT_COMPACTION_READ; +import static com.automq.stream.s3.ByteBufAlloc.STREAM_OBJECT_COMPACTION_WRITE; import static com.automq.stream.s3.metadata.ObjectUtils.NOOP_OBJECT_ID; import static com.automq.stream.s3.metadata.ObjectUtils.NOOP_OFFSET; @@ -174,7 +174,7 @@ public Optional compact() throws ExecutionException, long compactedStartOffset = objectGroup.get(0).startOffset(); long compactedEndOffset = objectGroup.get(objectGroup.size() - 1).endOffset(); List compactedObjectIds = new LinkedList<>(); - CompositeByteBuf indexes = DirectByteBufAlloc.compositeByteBuffer(); + CompositeByteBuf indexes = ByteBufAlloc.compositeByteBuffer(); Writer writer = s3Operator.writer(new Writer.Context(STREAM_OBJECT_COMPACTION_READ), ObjectUtils.genKey(0, objectId), ThrottleStrategy.THROTTLE_2); long groupStartOffset = -1L; long groupStartPosition = -1L; @@ -184,7 +184,7 @@ public Optional compact() throws ExecutionException, for (S3ObjectMetadata object : objectGroup) { try (ObjectReader reader = new ObjectReader(object, s3Operator)) { ObjectReader.BasicObjectInfo basicObjectInfo = reader.basicObjectInfo().get(); - ByteBuf subIndexes = DirectByteBufAlloc.byteBuffer(basicObjectInfo.indexBlock().count() * DataBlockIndex.BLOCK_INDEX_SIZE, STREAM_OBJECT_COMPACTION_WRITE); + ByteBuf subIndexes = ByteBufAlloc.byteBuffer(basicObjectInfo.indexBlock().count() * DataBlockIndex.BLOCK_INDEX_SIZE, STREAM_OBJECT_COMPACTION_WRITE); Iterator it = basicObjectInfo.indexBlock().iterator(); long validDataBlockStartPosition = 0; while (it.hasNext()) { @@ -219,13 +219,13 @@ public Optional compact() throws ExecutionException, } } if (lastIndex != null) { - ByteBuf subIndexes = DirectByteBufAlloc.byteBuffer(DataBlockIndex.BLOCK_INDEX_SIZE, STREAM_OBJECT_COMPACTION_WRITE); + ByteBuf subIndexes = ByteBufAlloc.byteBuffer(DataBlockIndex.BLOCK_INDEX_SIZE, STREAM_OBJECT_COMPACTION_WRITE); new DataBlockIndex(streamId, groupStartOffset, (int) (lastIndex.endOffset() - groupStartOffset), groupRecordCount, groupStartPosition, groupSize).encode(subIndexes); indexes.addComponent(true, subIndexes); } - CompositeByteBuf indexBlockAndFooter = DirectByteBufAlloc.compositeByteBuffer(); + CompositeByteBuf indexBlockAndFooter = ByteBufAlloc.compositeByteBuffer(); indexBlockAndFooter.addComponent(true, indexes); indexBlockAndFooter.addComponent(true, new ObjectWriter.Footer(nextBlockPosition, indexBlockAndFooter.readableBytes()).buffer()); diff --git a/s3stream/src/main/java/com/automq/stream/s3/StreamRecordBatchCodec.java b/s3stream/src/main/java/com/automq/stream/s3/StreamRecordBatchCodec.java index 8df5e422c..eae64872e 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/StreamRecordBatchCodec.java +++ b/s3stream/src/main/java/com/automq/stream/s3/StreamRecordBatchCodec.java @@ -15,7 +15,7 @@ import com.automq.stream.s3.model.StreamRecordBatch; import io.netty.buffer.ByteBuf; -import static com.automq.stream.s3.DirectByteBufAlloc.ENCODE_RECORD; +import static com.automq.stream.s3.ByteBufAlloc.ENCODE_RECORD; public class StreamRecordBatchCodec { public static final byte MAGIC_V0 = 0x22; @@ -56,7 +56,7 @@ public static StreamRecordBatch duplicateDecode(ByteBuf buf) { long baseOffset = buf.readLong(); int lastOffsetDelta = buf.readInt(); int payloadLength = buf.readInt(); - ByteBuf payload = DirectByteBufAlloc.byteBuffer(payloadLength, DirectByteBufAlloc.DECODE_RECORD); + ByteBuf payload = ByteBufAlloc.byteBuffer(payloadLength, ByteBufAlloc.DECODE_RECORD); buf.readBytes(payload); return new StreamRecordBatch(streamId, epoch, baseOffset, lastOffsetDelta, payload); } diff --git a/s3stream/src/main/java/com/automq/stream/s3/cache/BlockCache.java b/s3stream/src/main/java/com/automq/stream/s3/cache/BlockCache.java index 864addc9a..497d3b6da 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/cache/BlockCache.java +++ b/s3stream/src/main/java/com/automq/stream/s3/cache/BlockCache.java @@ -11,7 +11,7 @@ package com.automq.stream.s3.cache; -import com.automq.stream.s3.DirectByteBufAlloc; +import com.automq.stream.s3.ByteBufAlloc; import com.automq.stream.s3.cache.DefaultS3BlockCache.ReadAheadRecord; import com.automq.stream.s3.metrics.S3StreamMetricsManager; import com.automq.stream.s3.model.StreamRecordBatch; @@ -35,7 +35,7 @@ import static com.automq.stream.s3.model.StreamRecordBatch.OBJECT_OVERHEAD; -public class BlockCache implements DirectByteBufAlloc.OOMHandler { +public class BlockCache implements ByteBufAlloc.OOMHandler { public static final Integer ASYNC_READ_AHEAD_NOOP_OFFSET = -1; static final int BLOCK_SIZE = 1024 * 1024; private static final Logger LOGGER = LoggerFactory.getLogger(BlockCache.class); diff --git a/s3stream/src/main/java/com/automq/stream/s3/compact/operator/DataBlockReader.java b/s3stream/src/main/java/com/automq/stream/s3/compact/operator/DataBlockReader.java index 5c785e2d5..17cf005df 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/compact/operator/DataBlockReader.java +++ b/s3stream/src/main/java/com/automq/stream/s3/compact/operator/DataBlockReader.java @@ -11,8 +11,8 @@ package com.automq.stream.s3.compact.operator; +import com.automq.stream.s3.ByteBufAlloc; import com.automq.stream.s3.DataBlockIndex; -import com.automq.stream.s3.DirectByteBufAlloc; import com.automq.stream.s3.ObjectReader; import com.automq.stream.s3.StreamDataBlock; import com.automq.stream.s3.metadata.S3ObjectMetadata; @@ -34,7 +34,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static com.automq.stream.s3.DirectByteBufAlloc.STREAM_SET_OBJECT_COMPACTION_READ; +import static com.automq.stream.s3.ByteBufAlloc.STREAM_SET_OBJECT_COMPACTION_READ; //TODO: refactor to reduce duplicate code with ObjectWriter public class DataBlockReader { @@ -145,7 +145,7 @@ public void readContinuousBlocks(List streamDataBlocks, long ma final int finalEnd = end + 1; // include current block CompletableFuture.allOf(cfList.toArray(new CompletableFuture[0])) .thenAccept(v -> { - CompositeByteBuf compositeByteBuf = DirectByteBufAlloc.compositeByteBuffer(); + CompositeByteBuf compositeByteBuf = ByteBufAlloc.compositeByteBuffer(); for (int j = 0; j < iterations; j++) { compositeByteBuf.addComponent(true, bufferMap.get(j)); } @@ -195,7 +195,7 @@ private CompletableFuture rangeRead0(long start, long end) { if (throttleBucket == null) { return s3Operator.rangeRead(objectKey, start, end, ThrottleStrategy.THROTTLE_2).thenApply(buf -> { // convert heap buffer to direct buffer - ByteBuf directBuf = DirectByteBufAlloc.byteBuffer(buf.readableBytes(), STREAM_SET_OBJECT_COMPACTION_READ); + ByteBuf directBuf = ByteBufAlloc.byteBuffer(buf.readableBytes(), STREAM_SET_OBJECT_COMPACTION_READ); directBuf.writeBytes(buf); buf.release(); return directBuf; @@ -205,7 +205,7 @@ private CompletableFuture rangeRead0(long start, long end) { .thenCompose(v -> s3Operator.rangeRead(objectKey, start, end, ThrottleStrategy.THROTTLE_2).thenApply(buf -> { // convert heap buffer to direct buffer - ByteBuf directBuf = DirectByteBufAlloc.byteBuffer(buf.readableBytes(), STREAM_SET_OBJECT_COMPACTION_READ); + ByteBuf directBuf = ByteBufAlloc.byteBuffer(buf.readableBytes(), STREAM_SET_OBJECT_COMPACTION_READ); directBuf.writeBytes(buf); buf.release(); return directBuf; diff --git a/s3stream/src/main/java/com/automq/stream/s3/compact/operator/DataBlockWriter.java b/s3stream/src/main/java/com/automq/stream/s3/compact/operator/DataBlockWriter.java index ba946b48b..bb082540d 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/compact/operator/DataBlockWriter.java +++ b/s3stream/src/main/java/com/automq/stream/s3/compact/operator/DataBlockWriter.java @@ -11,8 +11,8 @@ package com.automq.stream.s3.compact.operator; +import com.automq.stream.s3.ByteBufAlloc; import com.automq.stream.s3.DataBlockIndex; -import com.automq.stream.s3.DirectByteBufAlloc; import com.automq.stream.s3.StreamDataBlock; import com.automq.stream.s3.compact.utils.CompactionUtils; import com.automq.stream.s3.compact.utils.GroupByLimitPredicate; @@ -30,8 +30,8 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; -import static com.automq.stream.s3.DirectByteBufAlloc.STREAM_OBJECT_COMPACTION_WRITE; -import static com.automq.stream.s3.DirectByteBufAlloc.STREAM_SET_OBJECT_COMPACTION_READ; +import static com.automq.stream.s3.ByteBufAlloc.STREAM_OBJECT_COMPACTION_WRITE; +import static com.automq.stream.s3.ByteBufAlloc.STREAM_SET_OBJECT_COMPACTION_READ; import static com.automq.stream.s3.operator.Writer.MIN_PART_SIZE; //TODO: refactor to reduce duplicate code with ObjectWriter @@ -121,7 +121,7 @@ public CompletableFuture close() { } private CompositeByteBuf groupWaitingBlocks() { - CompositeByteBuf buf = DirectByteBufAlloc.compositeByteBuffer(); + CompositeByteBuf buf = ByteBufAlloc.compositeByteBuffer(); for (StreamDataBlock block : waitingUploadBlocks) { buf.addComponent(true, block.getDataCf().join()); block.releaseRef(); @@ -154,7 +154,7 @@ public IndexBlock() { List dataBlockIndices = CompactionUtils.buildDataBlockIndicesFromGroup( CompactionUtils.groupStreamDataBlocks(completedBlocks, new GroupByLimitPredicate(DEFAULT_DATA_BLOCK_GROUP_SIZE_THRESHOLD))); - buf = DirectByteBufAlloc.byteBuffer(dataBlockIndices.size() * DataBlockIndex.BLOCK_INDEX_SIZE, DirectByteBufAlloc.STREAM_SET_OBJECT_COMPACTION_WRITE); + buf = ByteBufAlloc.byteBuffer(dataBlockIndices.size() * DataBlockIndex.BLOCK_INDEX_SIZE, ByteBufAlloc.STREAM_SET_OBJECT_COMPACTION_WRITE); for (DataBlockIndex dataBlockIndex : dataBlockIndices) { dataBlockIndex.encode(buf); } @@ -179,7 +179,7 @@ class Footer { private final ByteBuf buf; public Footer() { - buf = DirectByteBufAlloc.byteBuffer(FOOTER_SIZE, STREAM_OBJECT_COMPACTION_WRITE); + buf = ByteBufAlloc.byteBuffer(FOOTER_SIZE, STREAM_OBJECT_COMPACTION_WRITE); buf.writeLong(indexBlock.position()); buf.writeInt(indexBlock.size()); buf.writeZero(40 - 8 - 4); diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsConstant.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsConstant.java index fdbbe7cbb..2bce6ac7b 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsConstant.java +++ b/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsConstant.java @@ -91,8 +91,8 @@ public class S3StreamMetricsConstant { public static final String INFLIGHT_WAL_UPLOAD_TASKS_COUNT_METRIC_NAME = "inflight_wal_upload_tasks_count"; public static final String COMPACTION_READ_SIZE_METRIC_NAME = "compaction_read_size"; public static final String COMPACTION_WRITE_SIZE_METRIC_NAME = "compaction_write_size"; - public static final String ALLOCATED_DIRECT_MEMORY_SIZE_METRIC_NAME = "allocated_direct_memory_size"; - public static final String USED_DIRECT_MEMORY_SIZE_METRIC_NAME = "used_direct_memory_size"; + public static final String BUFFER_ALLOCATED_MEMORY_SIZE_METRIC_NAME = "buffer_allocated_memory_size"; + public static final String BUFFER_USED_MEMORY_SIZE_METRIC_NAME = "buffer_used_memory_size"; public static final AttributeKey LABEL_OPERATION_TYPE = AttributeKey.stringKey("operation_type"); public static final AttributeKey LABEL_OPERATION_NAME = AttributeKey.stringKey("operation_name"); public static final AttributeKey LABEL_SIZE_NAME = AttributeKey.stringKey("size"); diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsManager.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsManager.java index 1a7a96b7e..74571a852 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsManager.java +++ b/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsManager.java @@ -11,13 +11,13 @@ package com.automq.stream.s3.metrics; -import com.automq.stream.s3.DirectByteBufAlloc; -import com.automq.stream.s3.metrics.wrapper.CounterMetric; +import com.automq.stream.s3.ByteBufAlloc; import com.automq.stream.s3.metrics.operations.S3ObjectStage; import com.automq.stream.s3.metrics.operations.S3Operation; import com.automq.stream.s3.metrics.operations.S3Stage; -import com.automq.stream.s3.metrics.wrapper.HistogramMetric; import com.automq.stream.s3.metrics.wrapper.ConfigListener; +import com.automq.stream.s3.metrics.wrapper.CounterMetric; +import com.automq.stream.s3.metrics.wrapper.HistogramMetric; import com.automq.stream.s3.network.AsyncNetworkBandwidthLimiter; import io.opentelemetry.api.common.Attributes; import io.opentelemetry.api.metrics.LongCounter; @@ -56,8 +56,8 @@ public class S3StreamMetricsManager { private static ObservableLongGauge availableInflightS3ReadQuota = new NoopObservableLongGauge(); private static ObservableLongGauge availableInflightS3WriteQuota = new NoopObservableLongGauge(); private static ObservableLongGauge inflightWALUploadTasksCount = new NoopObservableLongGauge(); - private static ObservableLongGauge allocatedDirectMemorySize = new NoopObservableLongGauge(); - private static ObservableLongGauge usedDirectMemorySize = new NoopObservableLongGauge(); + private static ObservableLongGauge allocatedMemorySize = new NoopObservableLongGauge(); + private static ObservableLongGauge usedMemorySize = new NoopObservableLongGauge(); private static LongCounter compactionReadSizeInTotal = new NoopLongCounter(); private static LongCounter compactionWriteSizeInTotal = new NoopLongCounter(); private static Supplier networkInboundAvailableBandwidthSupplier = () -> 0L; @@ -267,25 +267,25 @@ public static void initMetrics(Meter meter, String prefix) { .setDescription("Compaction write size") .setUnit("bytes") .build(); - allocatedDirectMemorySize = meter.gaugeBuilder(prefix + S3StreamMetricsConstant.ALLOCATED_DIRECT_MEMORY_SIZE_METRIC_NAME) - .setDescription("Allocated direct memory size") + allocatedMemorySize = meter.gaugeBuilder(prefix + S3StreamMetricsConstant.BUFFER_ALLOCATED_MEMORY_SIZE_METRIC_NAME) + .setDescription("Buffer allocated memory size") .setUnit("bytes") .ofLongs() .buildWithCallback(result -> { - if (MetricsLevel.INFO.isWithin(metricsConfig.getMetricsLevel()) && DirectByteBufAlloc.directByteBufAllocMetric != null) { - Map allocateSizeMap = DirectByteBufAlloc.directByteBufAllocMetric.getDetailedMap(); + if (MetricsLevel.INFO.isWithin(metricsConfig.getMetricsLevel()) && ByteBufAlloc.directByteBufAllocMetric != null) { + Map allocateSizeMap = ByteBufAlloc.directByteBufAllocMetric.getDetailedMap(); for (Map.Entry entry : allocateSizeMap.entrySet()) { result.record(entry.getValue(), ALLOC_TYPE_ATTRIBUTES.get(entry.getKey())); } } }); - usedDirectMemorySize = meter.gaugeBuilder(prefix + S3StreamMetricsConstant.USED_DIRECT_MEMORY_SIZE_METRIC_NAME) - .setDescription("Used direct memory size") + usedMemorySize = meter.gaugeBuilder(prefix + S3StreamMetricsConstant.BUFFER_USED_MEMORY_SIZE_METRIC_NAME) + .setDescription("Buffer used memory size") .setUnit("bytes") .ofLongs() .buildWithCallback(result -> { - if (MetricsLevel.DEBUG.isWithin(metricsConfig.getMetricsLevel()) && DirectByteBufAlloc.directByteBufAllocMetric != null) { - result.record(DirectByteBufAlloc.directByteBufAllocMetric.getUsedDirectMemory(), metricsConfig.getBaseAttributes()); + if (MetricsLevel.DEBUG.isWithin(metricsConfig.getMetricsLevel()) && ByteBufAlloc.directByteBufAllocMetric != null) { + result.record(ByteBufAlloc.directByteBufAllocMetric.getUsedMemory(), metricsConfig.getBaseAttributes()); } }); } diff --git a/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java b/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java index c2c74fad0..42f5558fc 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java +++ b/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java @@ -11,7 +11,7 @@ package com.automq.stream.s3.operator; -import com.automq.stream.s3.DirectByteBufAlloc; +import com.automq.stream.s3.ByteBufAlloc; import com.automq.stream.s3.metrics.MetricsLevel; import com.automq.stream.s3.metrics.S3StreamMetricsManager; import com.automq.stream.s3.metrics.TimerUtil; @@ -317,7 +317,7 @@ void mergedRangeRead0(String path, long start, long end, CompletableFuture { S3ObjectStats.getInstance().objectDownloadSizeStats.record(MetricsLevel.INFO, size); - CompositeByteBuf buf = DirectByteBufAlloc.compositeByteBuffer(); + CompositeByteBuf buf = ByteBufAlloc.compositeByteBuffer(); responsePublisher.subscribe((bytes) -> { // the aws client will copy DefaultHttpContent to heap ByteBuffer buf.addComponent(true, Unpooled.wrappedBuffer(bytes)); diff --git a/s3stream/src/main/java/com/automq/stream/s3/operator/MultiPartWriter.java b/s3stream/src/main/java/com/automq/stream/s3/operator/MultiPartWriter.java index 66527143a..bd76c9e7e 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/operator/MultiPartWriter.java +++ b/s3stream/src/main/java/com/automq/stream/s3/operator/MultiPartWriter.java @@ -11,7 +11,7 @@ package com.automq.stream.s3.operator; -import com.automq.stream.s3.DirectByteBufAlloc; +import com.automq.stream.s3.ByteBufAlloc; import com.automq.stream.s3.metrics.MetricsLevel; import com.automq.stream.s3.metrics.TimerUtil; import com.automq.stream.s3.metrics.stats.S3ObjectStats; @@ -185,7 +185,7 @@ class ObjectPart { private final int partNumber = nextPartNumber.getAndIncrement(); private final CompletableFuture partCf = new CompletableFuture<>(); private final ThrottleStrategy throttleStrategy; - private CompositeByteBuf partBuf = DirectByteBufAlloc.compositeByteBuffer(); + private CompositeByteBuf partBuf = ByteBufAlloc.compositeByteBuffer(); private CompletableFuture lastRangeReadCf = CompletableFuture.completedFuture(null); private long size; @@ -203,9 +203,9 @@ public void write(ByteBuf data) { public void copyOnWrite() { int size = partBuf.readableBytes(); if (size > 0) { - ByteBuf buf = DirectByteBufAlloc.byteBuffer(size, context.allocType()); + ByteBuf buf = ByteBufAlloc.byteBuffer(size, context.allocType()); buf.writeBytes(partBuf.duplicate()); - CompositeByteBuf copy = DirectByteBufAlloc.compositeByteBuffer().addComponent(true, buf); + CompositeByteBuf copy = ByteBufAlloc.compositeByteBuffer().addComponent(true, buf); this.partBuf.release(); this.partBuf = copy; } diff --git a/s3stream/src/main/java/com/automq/stream/s3/operator/ProxyWriter.java b/s3stream/src/main/java/com/automq/stream/s3/operator/ProxyWriter.java index 3e5e6714b..f074e2935 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/operator/ProxyWriter.java +++ b/s3stream/src/main/java/com/automq/stream/s3/operator/ProxyWriter.java @@ -11,7 +11,7 @@ package com.automq.stream.s3.operator; -import com.automq.stream.s3.DirectByteBufAlloc; +import com.automq.stream.s3.ByteBufAlloc; import com.automq.stream.s3.metrics.MetricsLevel; import com.automq.stream.s3.metrics.TimerUtil; import com.automq.stream.s3.metrics.stats.S3ObjectStats; @@ -119,7 +119,7 @@ class ObjectWriter implements Writer { // max upload size, when object data size is larger MAX_UPLOAD_SIZE, we should use multi-part upload to upload it. static final long MAX_UPLOAD_SIZE = 32L * 1024 * 1024; CompletableFuture cf = new CompletableFuture<>(); - CompositeByteBuf data = DirectByteBufAlloc.compositeByteBuffer(); + CompositeByteBuf data = ByteBufAlloc.compositeByteBuffer(); TimerUtil timerUtil = new TimerUtil(); @Override @@ -132,9 +132,9 @@ public CompletableFuture write(ByteBuf part) { public void copyOnWrite() { int size = data.readableBytes(); if (size > 0) { - ByteBuf buf = DirectByteBufAlloc.byteBuffer(size, context.allocType()); + ByteBuf buf = ByteBufAlloc.byteBuffer(size, context.allocType()); buf.writeBytes(data.duplicate()); - CompositeByteBuf copy = DirectByteBufAlloc.compositeByteBuffer().addComponent(true, buf); + CompositeByteBuf copy = ByteBufAlloc.compositeByteBuffer().addComponent(true, buf); this.data.release(); this.data = copy; } diff --git a/s3stream/src/main/java/com/automq/stream/s3/operator/Writer.java b/s3stream/src/main/java/com/automq/stream/s3/operator/Writer.java index 6d47559ab..cdafae314 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/operator/Writer.java +++ b/s3stream/src/main/java/com/automq/stream/s3/operator/Writer.java @@ -11,7 +11,7 @@ package com.automq.stream.s3.operator; -import com.automq.stream.s3.DirectByteBufAlloc; +import com.automq.stream.s3.ByteBufAlloc; import io.netty.buffer.ByteBuf; import java.util.concurrent.CompletableFuture; @@ -75,7 +75,7 @@ public interface Writer { CompletableFuture release(); class Context { - public static final Context DEFAULT = new Context(DirectByteBufAlloc.DEFAULT); + public static final Context DEFAULT = new Context(ByteBufAlloc.DEFAULT); private final int allocType; diff --git a/s3stream/src/main/java/com/automq/stream/s3/wal/BlockImpl.java b/s3stream/src/main/java/com/automq/stream/s3/wal/BlockImpl.java index 08897e39c..ecee701a0 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/wal/BlockImpl.java +++ b/s3stream/src/main/java/com/automq/stream/s3/wal/BlockImpl.java @@ -11,7 +11,7 @@ package com.automq.stream.s3.wal; -import com.automq.stream.s3.DirectByteBufAlloc; +import com.automq.stream.s3.ByteBufAlloc; import com.automq.stream.s3.metrics.MetricsLevel; import com.automq.stream.s3.metrics.TimerUtil; import com.automq.stream.s3.metrics.stats.StorageOperationStats; @@ -103,7 +103,7 @@ public ByteBuf data() { return null; } - data = DirectByteBufAlloc.compositeByteBuffer(); + data = ByteBufAlloc.compositeByteBuffer(); for (Supplier supplier : records) { ByteBuf record = supplier.get(); data.addComponent(true, record); diff --git a/s3stream/src/main/java/com/automq/stream/s3/wal/BlockWALService.java b/s3stream/src/main/java/com/automq/stream/s3/wal/BlockWALService.java index 5b3d90399..6dad28e1f 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/wal/BlockWALService.java +++ b/s3stream/src/main/java/com/automq/stream/s3/wal/BlockWALService.java @@ -11,8 +11,8 @@ package com.automq.stream.s3.wal; +import com.automq.stream.s3.ByteBufAlloc; import com.automq.stream.s3.Config; -import com.automq.stream.s3.DirectByteBufAlloc; import com.automq.stream.s3.metrics.MetricsLevel; import com.automq.stream.s3.metrics.S3StreamMetricsManager; import com.automq.stream.s3.metrics.TimerUtil; @@ -159,7 +159,7 @@ private synchronized void flushWALHeader() { */ private ByteBuf readRecord(long recoverStartOffset, Function logicalToPhysical) throws ReadRecordException { - final ByteBuf recordHeader = DirectByteBufAlloc.byteBuffer(RECORD_HEADER_SIZE); + final ByteBuf recordHeader = ByteBufAlloc.byteBuffer(RECORD_HEADER_SIZE); SlidingWindowService.RecordHeaderCoreData readRecordHeader; try { readRecordHeader = parseRecordHeader(recoverStartOffset, recordHeader, logicalToPhysical); @@ -168,7 +168,7 @@ private ByteBuf readRecord(long recoverStartOffset, } int recordBodyLength = readRecordHeader.getRecordBodyLength(); - ByteBuf recordBody = DirectByteBufAlloc.byteBuffer(recordBodyLength); + ByteBuf recordBody = ByteBufAlloc.byteBuffer(recordBodyLength); try { parseRecordBody(recoverStartOffset, readRecordHeader, recordBody, logicalToPhysical); } catch (ReadRecordException e) { @@ -300,7 +300,7 @@ private long getCurrentStartOffset() { private WALHeader tryReadWALHeader(WALChannel walChannel) { WALHeader header = null; for (int i = 0; i < WAL_HEADER_COUNT; i++) { - ByteBuf buf = DirectByteBufAlloc.byteBuffer(WALHeader.WAL_HEADER_SIZE); + ByteBuf buf = ByteBufAlloc.byteBuffer(WALHeader.WAL_HEADER_SIZE); try { int read = walChannel.retryRead(buf, i * WAL_HEADER_CAPACITY); if (read != WALHeader.WAL_HEADER_SIZE) { @@ -422,7 +422,7 @@ private ByteBuf recordHeader(ByteBuf body, int crc, long start) { } private ByteBuf record(ByteBuf body, int crc, long start) { - CompositeByteBuf record = DirectByteBufAlloc.compositeByteBuffer(); + CompositeByteBuf record = ByteBufAlloc.compositeByteBuffer(); crc = 0 == crc ? WALUtil.crc32(body) : crc; record.addComponents(true, recordHeader(body, crc, start), body); return record; diff --git a/s3stream/src/main/java/com/automq/stream/s3/wal/SlidingWindowService.java b/s3stream/src/main/java/com/automq/stream/s3/wal/SlidingWindowService.java index 0cd549ec2..64964e294 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/wal/SlidingWindowService.java +++ b/s3stream/src/main/java/com/automq/stream/s3/wal/SlidingWindowService.java @@ -11,7 +11,7 @@ package com.automq.stream.s3.wal; -import com.automq.stream.s3.DirectByteBufAlloc; +import com.automq.stream.s3.ByteBufAlloc; import com.automq.stream.s3.metrics.MetricsLevel; import com.automq.stream.s3.metrics.TimerUtil; import com.automq.stream.s3.metrics.stats.StorageOperationStats; @@ -445,7 +445,7 @@ public String toString() { } private ByteBuf marshalHeaderExceptCRC() { - ByteBuf buf = DirectByteBufAlloc.byteBuffer(RECORD_HEADER_SIZE); + ByteBuf buf = ByteBufAlloc.byteBuffer(RECORD_HEADER_SIZE); buf.writeInt(magicCode0); buf.writeInt(recordBodyLength1); buf.writeLong(recordBodyOffset2); diff --git a/s3stream/src/main/java/com/automq/stream/s3/wal/WALHeader.java b/s3stream/src/main/java/com/automq/stream/s3/wal/WALHeader.java index e1aa4bade..02bf18d43 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/wal/WALHeader.java +++ b/s3stream/src/main/java/com/automq/stream/s3/wal/WALHeader.java @@ -11,7 +11,7 @@ package com.automq.stream.s3.wal; -import com.automq.stream.s3.DirectByteBufAlloc; +import com.automq.stream.s3.ByteBufAlloc; import com.automq.stream.s3.wal.util.WALUtil; import io.netty.buffer.ByteBuf; import java.util.concurrent.atomic.AtomicLong; @@ -182,7 +182,7 @@ public String toString() { } private ByteBuf marshalHeaderExceptCRC() { - ByteBuf buf = DirectByteBufAlloc.byteBuffer(WAL_HEADER_SIZE); + ByteBuf buf = ByteBufAlloc.byteBuffer(WAL_HEADER_SIZE); buf.writeInt(magicCode0); buf.writeLong(capacity1); buf.writeLong(trimOffset2.get()); diff --git a/s3stream/src/main/java/com/automq/stream/s3/wal/benchmark/BenchTool.java b/s3stream/src/main/java/com/automq/stream/s3/wal/benchmark/BenchTool.java index d7f0d84c1..1cf658ca4 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/wal/benchmark/BenchTool.java +++ b/s3stream/src/main/java/com/automq/stream/s3/wal/benchmark/BenchTool.java @@ -11,7 +11,7 @@ package com.automq.stream.s3.wal.benchmark; -import com.automq.stream.s3.DirectByteBufAlloc; +import com.automq.stream.s3.ByteBufAlloc; import com.automq.stream.s3.wal.BlockWALService; import com.automq.stream.s3.wal.WriteAheadLog; import com.automq.stream.s3.wal.util.WALChannel; @@ -58,7 +58,7 @@ public static void resetWALHeader(String path) throws IOException { int capacity = BlockWALService.WAL_HEADER_TOTAL_CAPACITY; WALChannel channel = WALChannel.builder(path).capacity(capacity).build(); channel.open(); - ByteBuf buf = DirectByteBufAlloc.byteBuffer(capacity); + ByteBuf buf = ByteBufAlloc.byteBuffer(capacity); buf.writeZero(capacity); channel.write(buf, 0); buf.release(); diff --git a/s3stream/src/main/java/com/automq/stream/s3/wal/util/WALCachedChannel.java b/s3stream/src/main/java/com/automq/stream/s3/wal/util/WALCachedChannel.java index 5f9a95511..e9a396c39 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/wal/util/WALCachedChannel.java +++ b/s3stream/src/main/java/com/automq/stream/s3/wal/util/WALCachedChannel.java @@ -11,7 +11,7 @@ package com.automq.stream.s3.wal.util; -import com.automq.stream.s3.DirectByteBufAlloc; +import com.automq.stream.s3.ByteBufAlloc; import io.netty.buffer.ByteBuf; import java.io.IOException; @@ -102,7 +102,7 @@ public synchronized void releaseCache() { */ private ByteBuf getCache() { if (this.cache == null) { - this.cache = DirectByteBufAlloc.byteBuffer(cacheSize); + this.cache = ByteBufAlloc.byteBuffer(cacheSize); } return this.cache; } diff --git a/s3stream/src/main/java/com/automq/stream/utils/S3Utils.java b/s3stream/src/main/java/com/automq/stream/utils/S3Utils.java index a050c869c..1a644aede 100644 --- a/s3stream/src/main/java/com/automq/stream/utils/S3Utils.java +++ b/s3stream/src/main/java/com/automq/stream/utils/S3Utils.java @@ -11,7 +11,7 @@ package com.automq.stream.utils; -import com.automq.stream.s3.DirectByteBufAlloc; +import com.automq.stream.s3.ByteBufAlloc; import io.netty.buffer.ByteBuf; import io.netty.buffer.CompositeByteBuf; import io.netty.buffer.Unpooled; @@ -305,7 +305,7 @@ protected void readRange(S3AsyncClient readS3Client, String path, CompletableFut GetObjectRequest request = GetObjectRequest.builder().bucket(bucket).key(path).range(range(start, end)).build(); readS3Client.getObject(request, AsyncResponseTransformer.toPublisher()) .thenAccept(responsePublisher -> { - CompositeByteBuf buf = DirectByteBufAlloc.compositeByteBuffer(); + CompositeByteBuf buf = ByteBufAlloc.compositeByteBuffer(); responsePublisher.subscribe((bytes) -> { // the aws client will copy DefaultHttpContent to heap ByteBuffer buf.addComponent(true, Unpooled.wrappedBuffer(bytes)); diff --git a/s3stream/src/test/java/com/automq/stream/s3/ObjectReaderTest.java b/s3stream/src/test/java/com/automq/stream/s3/ObjectReaderTest.java index 47dfcdffc..cc4b06be1 100644 --- a/s3stream/src/test/java/com/automq/stream/s3/ObjectReaderTest.java +++ b/s3stream/src/test/java/com/automq/stream/s3/ObjectReaderTest.java @@ -107,7 +107,7 @@ public void testGetBasicObjectInfo() throws ExecutionException, InterruptedExcep @Test public void testReadBlockGroup() throws ExecutionException, InterruptedException { S3Operator s3Operator = new MemoryS3Operator(); - ByteBuf buf = DirectByteBufAlloc.byteBuffer(0); + ByteBuf buf = ByteBufAlloc.byteBuffer(0); buf.writeBytes(new ObjectWriter.DataBlock(233L, List.of( new StreamRecordBatch(233L, 0, 10, 1, TestUtils.random(100)), new StreamRecordBatch(233L, 0, 11, 2, TestUtils.random(100)) diff --git a/s3stream/src/test/java/com/automq/stream/s3/compact/CompactionTestBase.java b/s3stream/src/test/java/com/automq/stream/s3/compact/CompactionTestBase.java index f3749e0be..0bbea6925 100644 --- a/s3stream/src/test/java/com/automq/stream/s3/compact/CompactionTestBase.java +++ b/s3stream/src/test/java/com/automq/stream/s3/compact/CompactionTestBase.java @@ -11,8 +11,8 @@ package com.automq.stream.s3.compact; +import com.automq.stream.s3.ByteBufAlloc; import com.automq.stream.s3.DataBlockIndex; -import com.automq.stream.s3.DirectByteBufAlloc; import com.automq.stream.s3.ObjectWriter; import com.automq.stream.s3.StreamDataBlock; import com.automq.stream.s3.TestUtils; @@ -225,7 +225,7 @@ protected List mergeStreamDataBlocksForGroup(List streamDataBlocks) { - CompositeByteBuf buf = DirectByteBufAlloc.compositeByteBuffer(); + CompositeByteBuf buf = ByteBufAlloc.compositeByteBuffer(); for (StreamDataBlock block : streamDataBlocks) { buf.addComponent(true, block.getDataCf().join()); } diff --git a/s3stream/src/test/java/com/automq/stream/s3/wal/BlockWALServiceTest.java b/s3stream/src/test/java/com/automq/stream/s3/wal/BlockWALServiceTest.java index 36df60e3b..fbb4a2894 100644 --- a/s3stream/src/test/java/com/automq/stream/s3/wal/BlockWALServiceTest.java +++ b/s3stream/src/test/java/com/automq/stream/s3/wal/BlockWALServiceTest.java @@ -11,7 +11,7 @@ package com.automq.stream.s3.wal; -import com.automq.stream.s3.DirectByteBufAlloc; +import com.automq.stream.s3.ByteBufAlloc; import com.automq.stream.s3.TestUtils; import com.automq.stream.s3.wal.benchmark.WriteBench; import com.automq.stream.s3.wal.util.WALBlockDeviceChannel; @@ -891,7 +891,7 @@ private void write(WALChannel walChannel, long logicOffset, int recordSize) thro ByteBuf recordBody = TestUtils.random(recordSize - RECORD_HEADER_SIZE); ByteBuf recordHeader = recordHeader(recordBody, logicOffset); - CompositeByteBuf record = DirectByteBufAlloc.compositeByteBuffer(); + CompositeByteBuf record = ByteBufAlloc.compositeByteBuffer(); record.addComponents(true, recordHeader, recordBody); long position = WALUtil.recordOffsetToPosition(logicOffset, walChannel.capacity(), WAL_HEADER_TOTAL_CAPACITY);