diff --git a/s3stream/src/main/java/com/automq/stream/s3/ByteBufAlloc.java b/s3stream/src/main/java/com/automq/stream/s3/ByteBufAlloc.java index dfe72bede..63be46327 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/ByteBufAlloc.java +++ b/s3stream/src/main/java/com/automq/stream/s3/ByteBufAlloc.java @@ -28,11 +28,8 @@ public class ByteBufAlloc { public static final boolean MEMORY_USAGE_DETECT = Boolean.parseBoolean(System.getenv("AUTOMQ_MEMORY_USAGE_DETECT")); - public static final boolean ALLOCATOR_USAGE_UNPOOLED = Boolean.parseBoolean(System.getenv("AUTOMQ_ALLOCATOR_USAGE_UNPOOLED")); - public static final boolean BUFFER_USAGE_HEAPED = Boolean.parseBoolean(System.getenv("AUTOMQ_BUFFER_USAGE_HEAPED")); private static final Logger LOGGER = LoggerFactory.getLogger(ByteBufAlloc.class); - private static final AbstractByteBufAllocator ALLOC = ALLOCATOR_USAGE_UNPOOLED ? UnpooledByteBufAllocator.DEFAULT : PooledByteBufAllocator.DEFAULT; private static final Map USAGE_STATS = new ConcurrentHashMap<>(); private static long lastMetricLogTime = System.currentTimeMillis(); private static final Map ALLOC_TYPE = new HashMap<>(); @@ -50,6 +47,15 @@ public class ByteBufAlloc { public static final int STREAM_SET_OBJECT_COMPACTION_WRITE = 10; public static ByteBufAllocMetric byteBufAllocMetric = null; + /** + * The policy used to allocate memory. + */ + private static ByteBufAllocPolicy policy = ByteBufAllocPolicy.UNPOOLED_HEAP; + /** + * The allocator used to allocate memory. It should be updated when {@link #policy} is updated. + */ + private static AbstractByteBufAllocator allocator = getAllocatorByPolicy(policy); + static { registerAllocType(DEFAULT, "default"); registerAllocType(ENCODE_RECORD, "write_record"); @@ -65,8 +71,17 @@ public class ByteBufAlloc { } + /** + * Set the policy used to allocate memory. + */ + public static void setPolicy(ByteBufAllocPolicy policy) { + LOGGER.info("Set alloc policy to {}", policy); + ByteBufAlloc.policy = policy; + ByteBufAlloc.allocator = getAllocatorByPolicy(policy); + } + public static CompositeByteBuf compositeByteBuffer() { - return ALLOC.compositeDirectBuffer(Integer.MAX_VALUE); + return allocator.compositeDirectBuffer(Integer.MAX_VALUE); } public static ByteBuf byteBuffer(int initCapacity) { @@ -90,9 +105,9 @@ public static ByteBuf byteBuffer(int initCapacity, int type) { ByteBufAlloc.byteBufAllocMetric = new ByteBufAllocMetric(); LOGGER.info("Buffer usage: {}", ByteBufAlloc.byteBufAllocMetric); } - return new WrappedByteBuf(BUFFER_USAGE_HEAPED ? ALLOC.heapBuffer(initCapacity) : ALLOC.directBuffer(initCapacity), () -> usage.add(-initCapacity)); + return new WrappedByteBuf(policy.isDirect() ? allocator.directBuffer(initCapacity) : allocator.heapBuffer(initCapacity), () -> usage.add(-initCapacity)); } else { - return BUFFER_USAGE_HEAPED ? ALLOC.heapBuffer(initCapacity) : ALLOC.directBuffer(initCapacity); + return policy.isDirect() ? allocator.directBuffer(initCapacity) : allocator.heapBuffer(initCapacity); } } catch (OutOfMemoryError e) { if (MEMORY_USAGE_DETECT) { @@ -114,6 +129,13 @@ public static void registerAllocType(int type, String name) { ALLOC_TYPE.put(type, name); } + private static AbstractByteBufAllocator getAllocatorByPolicy(ByteBufAllocPolicy policy) { + if (policy.isPooled()) { + return PooledByteBufAllocator.DEFAULT; + } + return UnpooledByteBufAllocator.DEFAULT; + } + public static class ByteBufAllocMetric { private final long usedMemory; private final long allocatedMemory; @@ -123,8 +145,8 @@ public ByteBufAllocMetric() { USAGE_STATS.forEach((k, v) -> { detail.put(k + "/" + ALLOC_TYPE.get(k), v.longValue()); }); - ByteBufAllocatorMetric metric = ((ByteBufAllocatorMetricProvider) ALLOC).metric(); - this.usedMemory = BUFFER_USAGE_HEAPED ? metric.usedHeapMemory() : metric.usedDirectMemory(); + ByteBufAllocatorMetric metric = ((ByteBufAllocatorMetricProvider) allocator).metric(); + this.usedMemory = policy.isDirect() ? metric.usedDirectMemory() : metric.usedHeapMemory(); this.allocatedMemory = this.detail.values().stream().mapToLong(Long::longValue).sum(); } @@ -147,9 +169,9 @@ public String toString() { sb.append(entry.getKey()).append("=").append(entry.getValue()).append(","); } sb.append(", pooled="); - sb.append(!ALLOCATOR_USAGE_UNPOOLED); + sb.append(policy.isPooled()); sb.append(", direct="); - sb.append(!BUFFER_USAGE_HEAPED); + sb.append(policy.isDirect()); sb.append("}"); return sb.toString(); } diff --git a/s3stream/src/main/java/com/automq/stream/s3/ByteBufAllocPolicy.java b/s3stream/src/main/java/com/automq/stream/s3/ByteBufAllocPolicy.java new file mode 100644 index 000000000..11c4c84df --- /dev/null +++ b/s3stream/src/main/java/com/automq/stream/s3/ByteBufAllocPolicy.java @@ -0,0 +1,48 @@ +/* + * Copyright 2024, AutoMQ CO.,LTD. + * + * Use of this software is governed by the Business Source License + * included in the file BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +package com.automq.stream.s3; + +public enum ByteBufAllocPolicy { + + /** + * Allocate memory from the heap without pooling. + */ + UNPOOLED_HEAP(false, false), + + /** + * Use pooled direct memory. + */ + POOLED_DIRECT(true, true); + + /** + * Whether the buffer should be pooled or not. + */ + private final boolean pooled; + + /** + * Whether the buffer should be direct or not. + */ + private final boolean direct; + + ByteBufAllocPolicy(boolean pooled, boolean direct) { + this.pooled = pooled; + this.direct = direct; + } + + public boolean isPooled() { + return pooled; + } + + public boolean isDirect() { + return direct; + } +} diff --git a/s3stream/src/test/java/com/automq/stream/s3/compact/CompactionTestBase.java b/s3stream/src/test/java/com/automq/stream/s3/compact/CompactionTestBase.java index 0bbea6925..e28b30071 100644 --- a/s3stream/src/test/java/com/automq/stream/s3/compact/CompactionTestBase.java +++ b/s3stream/src/test/java/com/automq/stream/s3/compact/CompactionTestBase.java @@ -36,6 +36,7 @@ import java.util.concurrent.TimeUnit; import org.mockito.Mockito; +import static com.automq.stream.s3.ByteBufAllocPolicy.POOLED_DIRECT; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.doReturn; @@ -62,6 +63,7 @@ public class CompactionTestBase { protected S3Operator s3Operator; public void setUp() throws Exception { + ByteBufAlloc.setPolicy(POOLED_DIRECT); streamManager = Mockito.mock(MemoryMetadataManager.class); when(streamManager.getStreams(Mockito.anyList())).thenReturn(CompletableFuture.completedFuture( List.of(new StreamMetadata(STREAM_0, 0, 0, 20, StreamState.OPENED), diff --git a/s3stream/src/test/java/com/automq/stream/s3/compact/CompactionUploaderTest.java b/s3stream/src/test/java/com/automq/stream/s3/compact/CompactionUploaderTest.java index c4e68ac1e..9ce121a61 100644 --- a/s3stream/src/test/java/com/automq/stream/s3/compact/CompactionUploaderTest.java +++ b/s3stream/src/test/java/com/automq/stream/s3/compact/CompactionUploaderTest.java @@ -45,6 +45,7 @@ public class CompactionUploaderTest extends CompactionTestBase { @BeforeEach public void setUp() throws Exception { + super.setUp(); s3Operator = new MemoryS3Operator(); objectManager = new MemoryMetadataManager(); config = mock(Config.class); diff --git a/s3stream/src/test/java/com/automq/stream/s3/compact/CompactionUtilTest.java b/s3stream/src/test/java/com/automq/stream/s3/compact/CompactionUtilTest.java index abd9396d8..85e727b9a 100644 --- a/s3stream/src/test/java/com/automq/stream/s3/compact/CompactionUtilTest.java +++ b/s3stream/src/test/java/com/automq/stream/s3/compact/CompactionUtilTest.java @@ -21,6 +21,7 @@ import com.automq.stream.s3.objects.ObjectStreamRange; import java.util.List; import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; @@ -31,6 +32,11 @@ @Tag("S3Unit") public class CompactionUtilTest extends CompactionTestBase { + @BeforeEach + public void setUp() throws Exception { + super.setUp(); + } + @Test public void testMergeStreamDataBlocks() { List streamDataBlocks = List.of(