Skip to content

Commit

Permalink
Revert "feat(alloc): policies of memory allocator (#957)"
Browse files Browse the repository at this point in the history
This reverts commit 7dcdb78.
  • Loading branch information
Chillax-0v0 authored Mar 8, 2024
1 parent 7dcdb78 commit ab8c20e
Show file tree
Hide file tree
Showing 5 changed files with 10 additions and 89 deletions.
42 changes: 10 additions & 32 deletions s3stream/src/main/java/com/automq/stream/s3/ByteBufAlloc.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,11 @@

public class ByteBufAlloc {
public static final boolean MEMORY_USAGE_DETECT = Boolean.parseBoolean(System.getenv("AUTOMQ_MEMORY_USAGE_DETECT"));
public static final boolean ALLOCATOR_USAGE_UNPOOLED = Boolean.parseBoolean(System.getenv("AUTOMQ_ALLOCATOR_USAGE_UNPOOLED"));
public static final boolean BUFFER_USAGE_HEAPED = Boolean.parseBoolean(System.getenv("AUTOMQ_BUFFER_USAGE_HEAPED"));

private static final Logger LOGGER = LoggerFactory.getLogger(ByteBufAlloc.class);
private static final AbstractByteBufAllocator ALLOC = ALLOCATOR_USAGE_UNPOOLED ? UnpooledByteBufAllocator.DEFAULT : PooledByteBufAllocator.DEFAULT;
private static final Map<Integer, LongAdder> USAGE_STATS = new ConcurrentHashMap<>();
private static long lastMetricLogTime = System.currentTimeMillis();
private static final Map<Integer, String> ALLOC_TYPE = new HashMap<>();
Expand All @@ -47,15 +50,6 @@ public class ByteBufAlloc {
public static final int STREAM_SET_OBJECT_COMPACTION_WRITE = 10;
public static ByteBufAllocMetric byteBufAllocMetric = null;

/**
* The policy used to allocate memory.
*/
private static ByteBufAllocPolicy policy = ByteBufAllocPolicy.UNPOOLED_HEAP;
/**
* The allocator used to allocate memory. It should be updated when {@link #policy} is updated.
*/
private static AbstractByteBufAllocator allocator = getAllocatorByPolicy(policy);

static {
registerAllocType(DEFAULT, "default");
registerAllocType(ENCODE_RECORD, "write_record");
Expand All @@ -71,17 +65,8 @@ public class ByteBufAlloc {

}

/**
* Set the policy used to allocate memory.
*/
public static void setPolicy(ByteBufAllocPolicy policy) {
LOGGER.info("Set alloc policy to {}", policy);
ByteBufAlloc.policy = policy;
ByteBufAlloc.allocator = getAllocatorByPolicy(policy);
}

public static CompositeByteBuf compositeByteBuffer() {
return allocator.compositeDirectBuffer(Integer.MAX_VALUE);
return ALLOC.compositeDirectBuffer(Integer.MAX_VALUE);
}

public static ByteBuf byteBuffer(int initCapacity) {
Expand All @@ -105,9 +90,9 @@ public static ByteBuf byteBuffer(int initCapacity, int type) {
ByteBufAlloc.byteBufAllocMetric = new ByteBufAllocMetric();
LOGGER.info("Buffer usage: {}", ByteBufAlloc.byteBufAllocMetric);
}
return new WrappedByteBuf(policy.isDirect() ? allocator.directBuffer(initCapacity) : allocator.heapBuffer(initCapacity), () -> usage.add(-initCapacity));
return new WrappedByteBuf(BUFFER_USAGE_HEAPED ? ALLOC.heapBuffer(initCapacity) : ALLOC.directBuffer(initCapacity), () -> usage.add(-initCapacity));
} else {
return policy.isDirect() ? allocator.directBuffer(initCapacity) : allocator.heapBuffer(initCapacity);
return BUFFER_USAGE_HEAPED ? ALLOC.heapBuffer(initCapacity) : ALLOC.directBuffer(initCapacity);
}
} catch (OutOfMemoryError e) {
if (MEMORY_USAGE_DETECT) {
Expand All @@ -129,13 +114,6 @@ public static void registerAllocType(int type, String name) {
ALLOC_TYPE.put(type, name);
}

private static AbstractByteBufAllocator getAllocatorByPolicy(ByteBufAllocPolicy policy) {
if (policy.isPooled()) {
return PooledByteBufAllocator.DEFAULT;
}
return UnpooledByteBufAllocator.DEFAULT;
}

public static class ByteBufAllocMetric {
private final long usedMemory;
private final long allocatedMemory;
Expand All @@ -145,8 +123,8 @@ public ByteBufAllocMetric() {
USAGE_STATS.forEach((k, v) -> {
detail.put(k + "/" + ALLOC_TYPE.get(k), v.longValue());
});
ByteBufAllocatorMetric metric = ((ByteBufAllocatorMetricProvider) allocator).metric();
this.usedMemory = policy.isDirect() ? metric.usedDirectMemory() : metric.usedHeapMemory();
ByteBufAllocatorMetric metric = ((ByteBufAllocatorMetricProvider) ALLOC).metric();
this.usedMemory = BUFFER_USAGE_HEAPED ? metric.usedHeapMemory() : metric.usedDirectMemory();
this.allocatedMemory = this.detail.values().stream().mapToLong(Long::longValue).sum();
}

Expand All @@ -169,9 +147,9 @@ public String toString() {
sb.append(entry.getKey()).append("=").append(entry.getValue()).append(",");
}
sb.append(", pooled=");
sb.append(policy.isPooled());
sb.append(!ALLOCATOR_USAGE_UNPOOLED);
sb.append(", direct=");
sb.append(policy.isDirect());
sb.append(!BUFFER_USAGE_HEAPED);
sb.append("}");
return sb.toString();
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import java.util.concurrent.TimeUnit;
import org.mockito.Mockito;

import static com.automq.stream.s3.ByteBufAllocPolicy.POOLED_DIRECT;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.doReturn;
Expand All @@ -63,7 +62,6 @@ public class CompactionTestBase {
protected S3Operator s3Operator;

public void setUp() throws Exception {
ByteBufAlloc.setPolicy(POOLED_DIRECT);
streamManager = Mockito.mock(MemoryMetadataManager.class);
when(streamManager.getStreams(Mockito.anyList())).thenReturn(CompletableFuture.completedFuture(
List.of(new StreamMetadata(STREAM_0, 0, 0, 20, StreamState.OPENED),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ public class CompactionUploaderTest extends CompactionTestBase {

@BeforeEach
public void setUp() throws Exception {
super.setUp();
s3Operator = new MemoryS3Operator();
objectManager = new MemoryMetadataManager();
config = mock(Config.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import com.automq.stream.s3.objects.ObjectStreamRange;
import java.util.List;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
Expand All @@ -32,11 +31,6 @@
@Tag("S3Unit")
public class CompactionUtilTest extends CompactionTestBase {

@BeforeEach
public void setUp() throws Exception {
super.setUp();
}

@Test
public void testMergeStreamDataBlocks() {
List<StreamDataBlock> streamDataBlocks = List.of(
Expand Down

0 comments on commit ab8c20e

Please sign in to comment.