Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(alloc): policies of memory allocator #957

Merged
merged 2 commits into from
Mar 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 32 additions & 10 deletions s3stream/src/main/java/com/automq/stream/s3/ByteBufAlloc.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,8 @@

public class ByteBufAlloc {
public static final boolean MEMORY_USAGE_DETECT = Boolean.parseBoolean(System.getenv("AUTOMQ_MEMORY_USAGE_DETECT"));
Chillax-0v0 marked this conversation as resolved.
Show resolved Hide resolved
public static final boolean ALLOCATOR_USAGE_UNPOOLED = Boolean.parseBoolean(System.getenv("AUTOMQ_ALLOCATOR_USAGE_UNPOOLED"));
public static final boolean BUFFER_USAGE_HEAPED = Boolean.parseBoolean(System.getenv("AUTOMQ_BUFFER_USAGE_HEAPED"));

private static final Logger LOGGER = LoggerFactory.getLogger(ByteBufAlloc.class);
private static final AbstractByteBufAllocator ALLOC = ALLOCATOR_USAGE_UNPOOLED ? UnpooledByteBufAllocator.DEFAULT : PooledByteBufAllocator.DEFAULT;
private static final Map<Integer, LongAdder> USAGE_STATS = new ConcurrentHashMap<>();
private static long lastMetricLogTime = System.currentTimeMillis();
private static final Map<Integer, String> ALLOC_TYPE = new HashMap<>();
Expand All @@ -50,6 +47,15 @@ public class ByteBufAlloc {
public static final int STREAM_SET_OBJECT_COMPACTION_WRITE = 10;
public static ByteBufAllocMetric byteBufAllocMetric = null;

/**
* The policy used to allocate memory.
*/
private static ByteBufAllocPolicy policy = ByteBufAllocPolicy.UNPOOLED_HEAP;
Chillax-0v0 marked this conversation as resolved.
Show resolved Hide resolved
/**
* The allocator used to allocate memory. It should be updated when {@link #policy} is updated.
*/
private static AbstractByteBufAllocator allocator = getAllocatorByPolicy(policy);

static {
registerAllocType(DEFAULT, "default");
registerAllocType(ENCODE_RECORD, "write_record");
Expand All @@ -65,8 +71,17 @@ public class ByteBufAlloc {

}

/**
* Set the policy used to allocate memory.
*/
public static void setPolicy(ByteBufAllocPolicy policy) {
LOGGER.info("Set alloc policy to {}", policy);
ByteBufAlloc.policy = policy;
ByteBufAlloc.allocator = getAllocatorByPolicy(policy);
}

public static CompositeByteBuf compositeByteBuffer() {
return ALLOC.compositeDirectBuffer(Integer.MAX_VALUE);
return allocator.compositeDirectBuffer(Integer.MAX_VALUE);
}

public static ByteBuf byteBuffer(int initCapacity) {
Expand All @@ -90,9 +105,9 @@ public static ByteBuf byteBuffer(int initCapacity, int type) {
ByteBufAlloc.byteBufAllocMetric = new ByteBufAllocMetric();
LOGGER.info("Buffer usage: {}", ByteBufAlloc.byteBufAllocMetric);
}
return new WrappedByteBuf(BUFFER_USAGE_HEAPED ? ALLOC.heapBuffer(initCapacity) : ALLOC.directBuffer(initCapacity), () -> usage.add(-initCapacity));
return new WrappedByteBuf(policy.isDirect() ? allocator.directBuffer(initCapacity) : allocator.heapBuffer(initCapacity), () -> usage.add(-initCapacity));
} else {
return BUFFER_USAGE_HEAPED ? ALLOC.heapBuffer(initCapacity) : ALLOC.directBuffer(initCapacity);
return policy.isDirect() ? allocator.directBuffer(initCapacity) : allocator.heapBuffer(initCapacity);
}
} catch (OutOfMemoryError e) {
if (MEMORY_USAGE_DETECT) {
Expand All @@ -114,6 +129,13 @@ public static void registerAllocType(int type, String name) {
ALLOC_TYPE.put(type, name);
}

private static AbstractByteBufAllocator getAllocatorByPolicy(ByteBufAllocPolicy policy) {
if (policy.isPooled()) {
return PooledByteBufAllocator.DEFAULT;
}
return UnpooledByteBufAllocator.DEFAULT;
}

public static class ByteBufAllocMetric {
private final long usedMemory;
private final long allocatedMemory;
Expand All @@ -123,8 +145,8 @@ public ByteBufAllocMetric() {
USAGE_STATS.forEach((k, v) -> {
detail.put(k + "/" + ALLOC_TYPE.get(k), v.longValue());
});
ByteBufAllocatorMetric metric = ((ByteBufAllocatorMetricProvider) ALLOC).metric();
this.usedMemory = BUFFER_USAGE_HEAPED ? metric.usedHeapMemory() : metric.usedDirectMemory();
ByteBufAllocatorMetric metric = ((ByteBufAllocatorMetricProvider) allocator).metric();
this.usedMemory = policy.isDirect() ? metric.usedDirectMemory() : metric.usedHeapMemory();
this.allocatedMemory = this.detail.values().stream().mapToLong(Long::longValue).sum();
}

Expand All @@ -147,9 +169,9 @@ public String toString() {
sb.append(entry.getKey()).append("=").append(entry.getValue()).append(",");
}
sb.append(", pooled=");
sb.append(!ALLOCATOR_USAGE_UNPOOLED);
sb.append(policy.isPooled());
sb.append(", direct=");
sb.append(!BUFFER_USAGE_HEAPED);
sb.append(policy.isDirect());
sb.append("}");
return sb.toString();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright 2024, AutoMQ CO.,LTD.
*
* Use of this software is governed by the Business Source License
* included in the file BSL.md
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/

package com.automq.stream.s3;

public enum ByteBufAllocPolicy {

/**
* Allocate memory from the heap without pooling.
*/
UNPOOLED_HEAP(false, false),

/**
* Use pooled direct memory.
*/
POOLED_DIRECT(true, true);

/**
* Whether the buffer should be pooled or not.
*/
private final boolean pooled;

/**
* Whether the buffer should be direct or not.
*/
private final boolean direct;

ByteBufAllocPolicy(boolean pooled, boolean direct) {
this.pooled = pooled;
this.direct = direct;
}

public boolean isPooled() {
return pooled;
}

public boolean isDirect() {
return direct;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.concurrent.TimeUnit;
import org.mockito.Mockito;

import static com.automq.stream.s3.ByteBufAllocPolicy.POOLED_DIRECT;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.doReturn;
Expand All @@ -62,6 +63,7 @@ public class CompactionTestBase {
protected S3Operator s3Operator;

public void setUp() throws Exception {
ByteBufAlloc.setPolicy(POOLED_DIRECT);
streamManager = Mockito.mock(MemoryMetadataManager.class);
when(streamManager.getStreams(Mockito.anyList())).thenReturn(CompletableFuture.completedFuture(
List.of(new StreamMetadata(STREAM_0, 0, 0, 20, StreamState.OPENED),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public class CompactionUploaderTest extends CompactionTestBase {

@BeforeEach
public void setUp() throws Exception {
super.setUp();
s3Operator = new MemoryS3Operator();
objectManager = new MemoryMetadataManager();
config = mock(Config.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.automq.stream.s3.objects.ObjectStreamRange;
import java.util.List;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
Expand All @@ -31,6 +32,11 @@
@Tag("S3Unit")
public class CompactionUtilTest extends CompactionTestBase {

@BeforeEach
public void setUp() throws Exception {
super.setUp();
}

@Test
public void testMergeStreamDataBlocks() {
List<StreamDataBlock> streamDataBlocks = List.of(
Expand Down
Loading