From 9839d6a4074666556b93af766da07cdcb7429976 Mon Sep 17 00:00:00 2001 From: Shichao Nie Date: Wed, 28 Feb 2024 10:43:56 +0800 Subject: [PATCH] feat(s3stream): use sequential memory alloc for stream set object compaction Signed-off-by: Shichao Nie --- ...eBufSeqAlloc.java => ByteBufSeqAlloc.java} | 7 ++++--- .../com/automq/stream/s3/ByteBufAlloc.java | 20 +++++++++---------- .../stream/s3/StreamRecordBatchCodec.java | 4 ++-- .../s3/compact/operator/DataBlockReader.java | 6 ++++-- .../s3/metrics/S3StreamMetricsManager.java | 8 ++++---- ...llocTest.java => ByteBufSeqAllocTest.java} | 10 +++++----- 6 files changed, 29 insertions(+), 26 deletions(-) rename s3stream/src/main/java/com/automq/stream/{DirectByteBufSeqAlloc.java => ByteBufSeqAlloc.java} (93%) rename s3stream/src/test/java/com/automq/stream/{DirectByteBufSeqAllocTest.java => ByteBufSeqAllocTest.java} (80%) diff --git a/s3stream/src/main/java/com/automq/stream/DirectByteBufSeqAlloc.java b/s3stream/src/main/java/com/automq/stream/ByteBufSeqAlloc.java similarity index 93% rename from s3stream/src/main/java/com/automq/stream/DirectByteBufSeqAlloc.java rename to s3stream/src/main/java/com/automq/stream/ByteBufSeqAlloc.java index 854ae2120..c8e32d7d0 100644 --- a/s3stream/src/main/java/com/automq/stream/DirectByteBufSeqAlloc.java +++ b/s3stream/src/main/java/com/automq/stream/ByteBufSeqAlloc.java @@ -16,14 +16,15 @@ import io.netty.buffer.CompositeByteBuf; import java.util.concurrent.atomic.AtomicReference; -public class DirectByteBufSeqAlloc { +public class ByteBufSeqAlloc { public static final int HUGE_BUF_SIZE = 8 * 1024 * 1024; // why not use ThreadLocal? the partition open has too much threads - final AtomicReference[] hugeBufArray = new AtomicReference[8]; + final AtomicReference[] hugeBufArray; private final int allocType; - public DirectByteBufSeqAlloc(int allocType) { + public ByteBufSeqAlloc(int allocType, int concurrency) { this.allocType = allocType; + hugeBufArray = new AtomicReference[concurrency]; for (int i = 0; i < hugeBufArray.length; i++) { hugeBufArray[i] = new AtomicReference<>(new HugeBuf(ByteBufAlloc.byteBuffer(HUGE_BUF_SIZE, allocType))); } diff --git a/s3stream/src/main/java/com/automq/stream/s3/ByteBufAlloc.java b/s3stream/src/main/java/com/automq/stream/s3/ByteBufAlloc.java index faa45a37e..dfe72bede 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/ByteBufAlloc.java +++ b/s3stream/src/main/java/com/automq/stream/s3/ByteBufAlloc.java @@ -48,7 +48,7 @@ public class ByteBufAlloc { public static final int STREAM_OBJECT_COMPACTION_WRITE = 8; public static final int STREAM_SET_OBJECT_COMPACTION_READ = 9; public static final int STREAM_SET_OBJECT_COMPACTION_WRITE = 10; - public static DirectByteBufAllocMetric directByteBufAllocMetric = null; + public static ByteBufAllocMetric byteBufAllocMetric = null; static { registerAllocType(DEFAULT, "default"); @@ -87,8 +87,8 @@ public static ByteBuf byteBuffer(int initCapacity, int type) { if (now - lastMetricLogTime > 60000) { // it's ok to be not thread safe lastMetricLogTime = now; - ByteBufAlloc.directByteBufAllocMetric = new DirectByteBufAllocMetric(); - LOGGER.info("Direct Memory usage: {}", ByteBufAlloc.directByteBufAllocMetric); + ByteBufAlloc.byteBufAllocMetric = new ByteBufAllocMetric(); + LOGGER.info("Buffer usage: {}", ByteBufAlloc.byteBufAllocMetric); } return new WrappedByteBuf(BUFFER_USAGE_HEAPED ? ALLOC.heapBuffer(initCapacity) : ALLOC.directBuffer(initCapacity), () -> usage.add(-initCapacity)); } else { @@ -96,12 +96,12 @@ public static ByteBuf byteBuffer(int initCapacity, int type) { } } catch (OutOfMemoryError e) { if (MEMORY_USAGE_DETECT) { - ByteBufAlloc.directByteBufAllocMetric = new DirectByteBufAllocMetric(); - LOGGER.error("alloc direct buffer OOM, {}", ByteBufAlloc.directByteBufAllocMetric, e); + ByteBufAlloc.byteBufAllocMetric = new ByteBufAllocMetric(); + LOGGER.error("alloc buffer OOM, {}", ByteBufAlloc.byteBufAllocMetric, e); } else { - LOGGER.error("alloc direct buffer OOM", e); + LOGGER.error("alloc buffer OOM", e); } - System.err.println("alloc direct buffer OOM"); + System.err.println("alloc buffer OOM"); Runtime.getRuntime().halt(1); throw e; } @@ -114,12 +114,12 @@ public static void registerAllocType(int type, String name) { ALLOC_TYPE.put(type, name); } - public static class DirectByteBufAllocMetric { + public static class ByteBufAllocMetric { private final long usedMemory; private final long allocatedMemory; private final Map detail = new HashMap<>(); - public DirectByteBufAllocMetric() { + public ByteBufAllocMetric() { USAGE_STATS.forEach((k, v) -> { detail.put(k + "/" + ALLOC_TYPE.get(k), v.longValue()); }); @@ -138,7 +138,7 @@ public Map getDetailedMap() { @Override public String toString() { - StringBuilder sb = new StringBuilder("DirectByteBufAllocMetric{usedMemory="); + StringBuilder sb = new StringBuilder("ByteBufAllocMetric{usedMemory="); sb.append(usedMemory); sb.append(", allocatedMemory="); sb.append(allocatedMemory); diff --git a/s3stream/src/main/java/com/automq/stream/s3/StreamRecordBatchCodec.java b/s3stream/src/main/java/com/automq/stream/s3/StreamRecordBatchCodec.java index eae64872e..a7a4033ad 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/StreamRecordBatchCodec.java +++ b/s3stream/src/main/java/com/automq/stream/s3/StreamRecordBatchCodec.java @@ -11,7 +11,7 @@ package com.automq.stream.s3; -import com.automq.stream.DirectByteBufSeqAlloc; +import com.automq.stream.ByteBufSeqAlloc; import com.automq.stream.s3.model.StreamRecordBatch; import io.netty.buffer.ByteBuf; @@ -26,7 +26,7 @@ public class StreamRecordBatchCodec { + 8 // baseOffset + 4 // lastOffsetDelta + 4; // payload length - private static final DirectByteBufSeqAlloc ENCODE_ALLOC = new DirectByteBufSeqAlloc(ENCODE_RECORD); + private static final ByteBufSeqAlloc ENCODE_ALLOC = new ByteBufSeqAlloc(ENCODE_RECORD, 8); public static ByteBuf encode(StreamRecordBatch streamRecord) { int totalLength = HEADER_SIZE + streamRecord.size(); // payload diff --git a/s3stream/src/main/java/com/automq/stream/s3/compact/operator/DataBlockReader.java b/s3stream/src/main/java/com/automq/stream/s3/compact/operator/DataBlockReader.java index 17cf005df..8c9db2b63 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/compact/operator/DataBlockReader.java +++ b/s3stream/src/main/java/com/automq/stream/s3/compact/operator/DataBlockReader.java @@ -11,6 +11,7 @@ package com.automq.stream.s3.compact.operator; +import com.automq.stream.ByteBufSeqAlloc; import com.automq.stream.s3.ByteBufAlloc; import com.automq.stream.s3.DataBlockIndex; import com.automq.stream.s3.ObjectReader; @@ -39,6 +40,7 @@ //TODO: refactor to reduce duplicate code with ObjectWriter public class DataBlockReader { private static final Logger LOGGER = LoggerFactory.getLogger(DataBlockReader.class); + private static final ByteBufSeqAlloc DIRECT_ALLOC = new ByteBufSeqAlloc(STREAM_SET_OBJECT_COMPACTION_READ, 1); private final S3ObjectMetadata metadata; private final String objectKey; private final S3Operator s3Operator; @@ -195,7 +197,7 @@ private CompletableFuture rangeRead0(long start, long end) { if (throttleBucket == null) { return s3Operator.rangeRead(objectKey, start, end, ThrottleStrategy.THROTTLE_2).thenApply(buf -> { // convert heap buffer to direct buffer - ByteBuf directBuf = ByteBufAlloc.byteBuffer(buf.readableBytes(), STREAM_SET_OBJECT_COMPACTION_READ); + ByteBuf directBuf = DIRECT_ALLOC.byteBuffer(buf.readableBytes()); directBuf.writeBytes(buf); buf.release(); return directBuf; @@ -205,7 +207,7 @@ private CompletableFuture rangeRead0(long start, long end) { .thenCompose(v -> s3Operator.rangeRead(objectKey, start, end, ThrottleStrategy.THROTTLE_2).thenApply(buf -> { // convert heap buffer to direct buffer - ByteBuf directBuf = ByteBufAlloc.byteBuffer(buf.readableBytes(), STREAM_SET_OBJECT_COMPACTION_READ); + ByteBuf directBuf = DIRECT_ALLOC.byteBuffer(buf.readableBytes()); directBuf.writeBytes(buf); buf.release(); return directBuf; diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsManager.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsManager.java index 74571a852..5da77fbcf 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsManager.java +++ b/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsManager.java @@ -272,8 +272,8 @@ public static void initMetrics(Meter meter, String prefix) { .setUnit("bytes") .ofLongs() .buildWithCallback(result -> { - if (MetricsLevel.INFO.isWithin(metricsConfig.getMetricsLevel()) && ByteBufAlloc.directByteBufAllocMetric != null) { - Map allocateSizeMap = ByteBufAlloc.directByteBufAllocMetric.getDetailedMap(); + if (MetricsLevel.INFO.isWithin(metricsConfig.getMetricsLevel()) && ByteBufAlloc.byteBufAllocMetric != null) { + Map allocateSizeMap = ByteBufAlloc.byteBufAllocMetric.getDetailedMap(); for (Map.Entry entry : allocateSizeMap.entrySet()) { result.record(entry.getValue(), ALLOC_TYPE_ATTRIBUTES.get(entry.getKey())); } @@ -284,8 +284,8 @@ public static void initMetrics(Meter meter, String prefix) { .setUnit("bytes") .ofLongs() .buildWithCallback(result -> { - if (MetricsLevel.DEBUG.isWithin(metricsConfig.getMetricsLevel()) && ByteBufAlloc.directByteBufAllocMetric != null) { - result.record(ByteBufAlloc.directByteBufAllocMetric.getUsedMemory(), metricsConfig.getBaseAttributes()); + if (MetricsLevel.DEBUG.isWithin(metricsConfig.getMetricsLevel()) && ByteBufAlloc.byteBufAllocMetric != null) { + result.record(ByteBufAlloc.byteBufAllocMetric.getUsedMemory(), metricsConfig.getBaseAttributes()); } }); } diff --git a/s3stream/src/test/java/com/automq/stream/DirectByteBufSeqAllocTest.java b/s3stream/src/test/java/com/automq/stream/ByteBufSeqAllocTest.java similarity index 80% rename from s3stream/src/test/java/com/automq/stream/DirectByteBufSeqAllocTest.java rename to s3stream/src/test/java/com/automq/stream/ByteBufSeqAllocTest.java index 5764e9304..1a398ab4e 100644 --- a/s3stream/src/test/java/com/automq/stream/DirectByteBufSeqAllocTest.java +++ b/s3stream/src/test/java/com/automq/stream/ByteBufSeqAllocTest.java @@ -19,13 +19,13 @@ import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertTrue; -public class DirectByteBufSeqAllocTest { +public class ByteBufSeqAllocTest { @Test public void testAlloc() { - DirectByteBufSeqAlloc alloc = new DirectByteBufSeqAlloc(0); + ByteBufSeqAlloc alloc = new ByteBufSeqAlloc(0, 1); - AtomicReference bufRef = alloc.hugeBufArray[Math.abs(Thread.currentThread().hashCode() % alloc.hugeBufArray.length)]; + AtomicReference bufRef = alloc.hugeBufArray[Math.abs(Thread.currentThread().hashCode() % alloc.hugeBufArray.length)]; ByteBuf buf1 = alloc.byteBuffer(12); buf1.writeLong(1); @@ -36,7 +36,7 @@ public void testAlloc() { buf2.writeInt(4); buf2.writeLong(5); - ByteBuf buf3 = alloc.byteBuffer(DirectByteBufSeqAlloc.HUGE_BUF_SIZE - 12 - 20 - 4); + ByteBuf buf3 = alloc.byteBuffer(ByteBufSeqAlloc.HUGE_BUF_SIZE - 12 - 20 - 4); ByteBuf oldHugeBuf = bufRef.get().buf; @@ -64,7 +64,7 @@ public void testAlloc() { ByteBuf oldHugeBuf2 = bufRef.get().buf; - alloc.byteBuffer(DirectByteBufSeqAlloc.HUGE_BUF_SIZE - 12).release(); + alloc.byteBuffer(ByteBufSeqAlloc.HUGE_BUF_SIZE - 12).release(); alloc.byteBuffer(12).release(); assertEquals(0, oldHugeBuf2.refCnt()); }