Skip to content

Commit

Permalink
feat(s3stream): add switch for buffered memory
Browse files Browse the repository at this point in the history
Signed-off-by: Robin Han <[email protected]>
Signed-off-by: SSpirits <[email protected]>
  • Loading branch information
ShadowySpirits committed Feb 27, 2024
1 parent b38b1d0 commit 6629d81
Show file tree
Hide file tree
Showing 26 changed files with 117 additions and 106 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

package com.automq.stream;

import com.automq.stream.s3.DirectByteBufAlloc;
import com.automq.stream.s3.ByteBufAlloc;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import java.util.concurrent.atomic.AtomicReference;
Expand All @@ -25,14 +25,14 @@ public class DirectByteBufSeqAlloc {
public DirectByteBufSeqAlloc(int allocType) {
this.allocType = allocType;
for (int i = 0; i < hugeBufArray.length; i++) {
hugeBufArray[i] = new AtomicReference<>(new HugeBuf(DirectByteBufAlloc.byteBuffer(HUGE_BUF_SIZE, allocType)));
hugeBufArray[i] = new AtomicReference<>(new HugeBuf(ByteBufAlloc.byteBuffer(HUGE_BUF_SIZE, allocType)));
}
}

public ByteBuf byteBuffer(int capacity) {
if (capacity >= HUGE_BUF_SIZE) {
// if the request capacity is larger than HUGE_BUF_SIZE, just allocate a new ByteBuf
return DirectByteBufAlloc.byteBuffer(capacity, allocType);
return ByteBufAlloc.byteBuffer(capacity, allocType);
}
int bufIndex = Math.abs(Thread.currentThread().hashCode() % hugeBufArray.length);

Expand All @@ -53,13 +53,13 @@ public ByteBuf byteBuffer(int capacity) {
// 1. slice the remaining of the current hugeBuf and release the hugeBuf
// 2. create a new hugeBuf and slice the remaining of the required capacity
// 3. return the composite ByteBuf of the two slices
CompositeByteBuf cbf = DirectByteBufAlloc.compositeByteBuffer();
CompositeByteBuf cbf = ByteBufAlloc.compositeByteBuffer();
int readLength = hugeBuf.buf.capacity() - hugeBuf.nextIndex;
cbf.addComponent(false, hugeBuf.buf.retainedSlice(hugeBuf.nextIndex, readLength));
capacity -= readLength;
hugeBuf.buf.release();

HugeBuf newHugeBuf = new HugeBuf(DirectByteBufAlloc.byteBuffer(HUGE_BUF_SIZE, allocType));
HugeBuf newHugeBuf = new HugeBuf(ByteBufAlloc.byteBuffer(HUGE_BUF_SIZE, allocType));
bufRef.set(newHugeBuf);

cbf.addComponent(false, newHugeBuf.buf.retainedSlice(0, capacity));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,27 @@
package com.automq.stream.s3;

import com.automq.stream.WrappedByteBuf;
import io.netty.buffer.AbstractByteBufAllocator;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocatorMetric;
import io.netty.buffer.ByteBufAllocatorMetricProvider;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.UnpooledByteBufAllocator;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.LongAdder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DirectByteBufAlloc {
public class ByteBufAlloc {
public static final boolean MEMORY_USAGE_DETECT = Boolean.parseBoolean(System.getenv("AUTOMQ_MEMORY_USAGE_DETECT"));
public static final boolean ALLOCATOR_USAGE_POOLED = Boolean.parseBoolean(System.getenv("AUTOMQ_ALLOCATOR_USAGE_POOLED"));
public static final boolean BUFFER_USAGE_HEAPED = Boolean.parseBoolean(System.getenv("AUTOMQ_BUFFER_USAGE_HEAPED"));

private static final Logger LOGGER = LoggerFactory.getLogger(DirectByteBufAlloc.class);
private static final PooledByteBufAllocator ALLOC = PooledByteBufAllocator.DEFAULT;
private static final Logger LOGGER = LoggerFactory.getLogger(ByteBufAlloc.class);
private static final AbstractByteBufAllocator ALLOC = ALLOCATOR_USAGE_POOLED ? PooledByteBufAllocator.DEFAULT : UnpooledByteBufAllocator.DEFAULT;
private static final Map<Integer, LongAdder> USAGE_STATS = new ConcurrentHashMap<>();
private static long lastMetricLogTime = System.currentTimeMillis();
private static final Map<Integer, String> ALLOC_TYPE = new HashMap<>();
Expand Down Expand Up @@ -81,17 +87,17 @@ public static ByteBuf byteBuffer(int initCapacity, int type) {
if (now - lastMetricLogTime > 60000) {
// it's ok to be not thread safe
lastMetricLogTime = now;
DirectByteBufAlloc.directByteBufAllocMetric = new DirectByteBufAllocMetric();
LOGGER.info("Direct Memory usage: {}", DirectByteBufAlloc.directByteBufAllocMetric);
ByteBufAlloc.directByteBufAllocMetric = new DirectByteBufAllocMetric();
LOGGER.info("Direct Memory usage: {}", ByteBufAlloc.directByteBufAllocMetric);
}
return new WrappedByteBuf(ALLOC.directBuffer(initCapacity), () -> usage.add(-initCapacity));
return new WrappedByteBuf(BUFFER_USAGE_HEAPED ? ALLOC.heapBuffer(initCapacity) : ALLOC.directBuffer(initCapacity), () -> usage.add(-initCapacity));
} else {
return ALLOC.directBuffer(initCapacity);
return BUFFER_USAGE_HEAPED ? ALLOC.heapBuffer(initCapacity) : ALLOC.directBuffer(initCapacity);
}
} catch (OutOfMemoryError e) {
if (MEMORY_USAGE_DETECT) {
DirectByteBufAlloc.directByteBufAllocMetric = new DirectByteBufAllocMetric();
LOGGER.error("alloc direct buffer OOM, {}", DirectByteBufAlloc.directByteBufAllocMetric, e);
ByteBufAlloc.directByteBufAllocMetric = new DirectByteBufAllocMetric();
LOGGER.error("alloc direct buffer OOM, {}", ByteBufAlloc.directByteBufAllocMetric, e);
} else {
LOGGER.error("alloc direct buffer OOM", e);
}
Expand All @@ -109,20 +115,21 @@ public static void registerAllocType(int type, String name) {
}

public static class DirectByteBufAllocMetric {
private final long usedDirectMemory;
private final long allocatedDirectMemory;
private final long usedMemory;
private final long allocatedMemory;
private final Map<String, Long> detail = new HashMap<>();

public DirectByteBufAllocMetric() {
USAGE_STATS.forEach((k, v) -> {
detail.put(k + "/" + ALLOC_TYPE.get(k), v.longValue());
});
this.usedDirectMemory = ALLOC.metric().usedDirectMemory();
this.allocatedDirectMemory = this.detail.values().stream().mapToLong(Long::longValue).sum();
ByteBufAllocatorMetric metric = ((ByteBufAllocatorMetricProvider) ALLOC).metric();
this.usedMemory = BUFFER_USAGE_HEAPED ? metric.usedHeapMemory() : metric.usedDirectMemory();
this.allocatedMemory = this.detail.values().stream().mapToLong(Long::longValue).sum();
}

public long getUsedDirectMemory() {
return usedDirectMemory;
public long getUsedMemory() {
return usedMemory;
}

public Map<String, Long> getDetailedMap() {
Expand All @@ -131,14 +138,18 @@ public Map<String, Long> getDetailedMap() {

@Override
public String toString() {
StringBuilder sb = new StringBuilder("DirectByteBufAllocMetric{usedDirectMemory=");
sb.append(usedDirectMemory);
sb.append(", allocatedDirectMemory=");
sb.append(allocatedDirectMemory);
StringBuilder sb = new StringBuilder("DirectByteBufAllocMetric{usedMemory=");
sb.append(usedMemory);
sb.append(", allocatedMemory=");
sb.append(allocatedMemory);
sb.append(", detail=");
for (Map.Entry<String, Long> entry : detail.entrySet()) {
sb.append(entry.getKey()).append("=").append(entry.getValue()).append(",");
}
sb.append(", pooled=");
sb.append(ALLOCATOR_USAGE_POOLED);
sb.append(", direct=");
sb.append(!BUFFER_USAGE_HEAPED);
sb.append("}");
return sb.toString();
}
Expand Down
4 changes: 2 additions & 2 deletions s3stream/src/main/java/com/automq/stream/s3/ObjectReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static com.automq.stream.s3.DirectByteBufAlloc.READ_INDEX_BLOCK;
import static com.automq.stream.s3.ByteBufAlloc.READ_INDEX_BLOCK;
import static com.automq.stream.s3.ObjectWriter.Footer.FOOTER_SIZE;
import static com.automq.stream.s3.metadata.ObjectUtils.NOOP_OFFSET;

Expand Down Expand Up @@ -145,7 +145,7 @@ public static BasicObjectInfo parse(ByteBuf objectTailBuf,

// trim the ByteBuf to avoid extra memory occupy.
ByteBuf indexBlockBuf = objectTailBuf.slice(objectTailBuf.readerIndex() + indexRelativePosition, indexBlockSize);
ByteBuf copy = DirectByteBufAlloc.byteBuffer(indexBlockBuf.readableBytes(), READ_INDEX_BLOCK);
ByteBuf copy = ByteBufAlloc.byteBuffer(indexBlockBuf.readableBytes(), READ_INDEX_BLOCK);
indexBlockBuf.readBytes(copy, indexBlockBuf.readableBytes());
objectTailBuf.release();
indexBlockBuf = copy;
Expand Down
18 changes: 9 additions & 9 deletions s3stream/src/main/java/com/automq/stream/s3/ObjectWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@
import java.util.List;
import java.util.concurrent.CompletableFuture;

import static com.automq.stream.s3.DirectByteBufAlloc.WRITE_DATA_BLOCK_HEADER;
import static com.automq.stream.s3.DirectByteBufAlloc.WRITE_FOOTER;
import static com.automq.stream.s3.DirectByteBufAlloc.WRITE_INDEX_BLOCK;
import static com.automq.stream.s3.ByteBufAlloc.WRITE_DATA_BLOCK_HEADER;
import static com.automq.stream.s3.ByteBufAlloc.WRITE_FOOTER;
import static com.automq.stream.s3.ByteBufAlloc.WRITE_INDEX_BLOCK;

/**
* Write stream records to a single object.
Expand Down Expand Up @@ -130,7 +130,7 @@ private synchronized void tryUploadPart() {
}
}
if (partFull) {
CompositeByteBuf partBuf = DirectByteBufAlloc.compositeByteBuffer();
CompositeByteBuf partBuf = ByteBufAlloc.compositeByteBuffer();
for (DataBlock block : uploadBlocks) {
waitingUploadBlocksSize -= block.size();
partBuf.addComponent(true, block.buffer());
Expand All @@ -145,7 +145,7 @@ private synchronized void tryUploadPart() {
}

public CompletableFuture<Void> close() {
CompositeByteBuf buf = DirectByteBufAlloc.compositeByteBuffer();
CompositeByteBuf buf = ByteBufAlloc.compositeByteBuffer();
for (DataBlock block : waitingUploadBlocks) {
buf.addComponent(true, block.buffer());
completedBlocks.add(block);
Expand Down Expand Up @@ -197,7 +197,7 @@ class IndexBlock {
public IndexBlock() {
long nextPosition = 0;
int indexBlockSize = DataBlockIndex.BLOCK_INDEX_SIZE * completedBlocks.size();
buf = DirectByteBufAlloc.byteBuffer(indexBlockSize, WRITE_INDEX_BLOCK);
buf = ByteBufAlloc.byteBuffer(indexBlockSize, WRITE_INDEX_BLOCK);
for (DataBlock block : completedBlocks) {
ObjectStreamRange streamRange = block.getStreamRange();
new DataBlockIndex(streamRange.getStreamId(), streamRange.getStartOffset(), (int) (streamRange.getEndOffset() - streamRange.getStartOffset()),
Expand Down Expand Up @@ -230,8 +230,8 @@ class DataBlock {

public DataBlock(long streamId, List<StreamRecordBatch> records) {
this.recordCount = records.size();
this.encodedBuf = DirectByteBufAlloc.compositeByteBuffer();
ByteBuf header = DirectByteBufAlloc.byteBuffer(BLOCK_HEADER_SIZE, WRITE_DATA_BLOCK_HEADER);
this.encodedBuf = ByteBufAlloc.compositeByteBuffer();
ByteBuf header = ByteBufAlloc.byteBuffer(BLOCK_HEADER_SIZE, WRITE_DATA_BLOCK_HEADER);
header.writeByte(DATA_BLOCK_MAGIC);
header.writeByte(DATA_BLOCK_DEFAULT_FLAG);
header.writeInt(recordCount);
Expand Down Expand Up @@ -266,7 +266,7 @@ class Footer {
private final ByteBuf buf;

public Footer(long indexStartPosition, int indexBlockLength) {
buf = DirectByteBufAlloc.byteBuffer(FOOTER_SIZE, WRITE_FOOTER);
buf = ByteBufAlloc.byteBuffer(FOOTER_SIZE, WRITE_FOOTER);
// start position of index block
buf.writeLong(indexStartPosition);
// size of index block
Expand Down
2 changes: 1 addition & 1 deletion s3stream/src/main/java/com/automq/stream/s3/S3Storage.java
Original file line number Diff line number Diff line change
Expand Up @@ -929,7 +929,7 @@ public DeltaWALUploadTaskContext(LogCache.LogCacheBlock cache) {
}
}

class LogCacheEvictOOMHandler implements DirectByteBufAlloc.OOMHandler {
class LogCacheEvictOOMHandler implements ByteBufAlloc.OOMHandler {
@Override
public int handle(int memoryRequired) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static com.automq.stream.s3.DirectByteBufAlloc.STREAM_OBJECT_COMPACTION_READ;
import static com.automq.stream.s3.DirectByteBufAlloc.STREAM_OBJECT_COMPACTION_WRITE;
import static com.automq.stream.s3.ByteBufAlloc.STREAM_OBJECT_COMPACTION_READ;
import static com.automq.stream.s3.ByteBufAlloc.STREAM_OBJECT_COMPACTION_WRITE;
import static com.automq.stream.s3.metadata.ObjectUtils.NOOP_OBJECT_ID;
import static com.automq.stream.s3.metadata.ObjectUtils.NOOP_OFFSET;

Expand Down Expand Up @@ -174,7 +174,7 @@ public Optional<CompactStreamObjectRequest> compact() throws ExecutionException,
long compactedStartOffset = objectGroup.get(0).startOffset();
long compactedEndOffset = objectGroup.get(objectGroup.size() - 1).endOffset();
List<Long> compactedObjectIds = new LinkedList<>();
CompositeByteBuf indexes = DirectByteBufAlloc.compositeByteBuffer();
CompositeByteBuf indexes = ByteBufAlloc.compositeByteBuffer();
Writer writer = s3Operator.writer(new Writer.Context(STREAM_OBJECT_COMPACTION_READ), ObjectUtils.genKey(0, objectId), ThrottleStrategy.THROTTLE_2);
long groupStartOffset = -1L;
long groupStartPosition = -1L;
Expand All @@ -184,7 +184,7 @@ public Optional<CompactStreamObjectRequest> compact() throws ExecutionException,
for (S3ObjectMetadata object : objectGroup) {
try (ObjectReader reader = new ObjectReader(object, s3Operator)) {
ObjectReader.BasicObjectInfo basicObjectInfo = reader.basicObjectInfo().get();
ByteBuf subIndexes = DirectByteBufAlloc.byteBuffer(basicObjectInfo.indexBlock().count() * DataBlockIndex.BLOCK_INDEX_SIZE, STREAM_OBJECT_COMPACTION_WRITE);
ByteBuf subIndexes = ByteBufAlloc.byteBuffer(basicObjectInfo.indexBlock().count() * DataBlockIndex.BLOCK_INDEX_SIZE, STREAM_OBJECT_COMPACTION_WRITE);
Iterator<DataBlockIndex> it = basicObjectInfo.indexBlock().iterator();
long validDataBlockStartPosition = 0;
while (it.hasNext()) {
Expand Down Expand Up @@ -219,13 +219,13 @@ public Optional<CompactStreamObjectRequest> compact() throws ExecutionException,
}
}
if (lastIndex != null) {
ByteBuf subIndexes = DirectByteBufAlloc.byteBuffer(DataBlockIndex.BLOCK_INDEX_SIZE, STREAM_OBJECT_COMPACTION_WRITE);
ByteBuf subIndexes = ByteBufAlloc.byteBuffer(DataBlockIndex.BLOCK_INDEX_SIZE, STREAM_OBJECT_COMPACTION_WRITE);
new DataBlockIndex(streamId, groupStartOffset, (int) (lastIndex.endOffset() - groupStartOffset),
groupRecordCount, groupStartPosition, groupSize).encode(subIndexes);
indexes.addComponent(true, subIndexes);
}

CompositeByteBuf indexBlockAndFooter = DirectByteBufAlloc.compositeByteBuffer();
CompositeByteBuf indexBlockAndFooter = ByteBufAlloc.compositeByteBuffer();
indexBlockAndFooter.addComponent(true, indexes);
indexBlockAndFooter.addComponent(true, new ObjectWriter.Footer(nextBlockPosition, indexBlockAndFooter.readableBytes()).buffer());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import com.automq.stream.s3.model.StreamRecordBatch;
import io.netty.buffer.ByteBuf;

import static com.automq.stream.s3.DirectByteBufAlloc.ENCODE_RECORD;
import static com.automq.stream.s3.ByteBufAlloc.ENCODE_RECORD;

public class StreamRecordBatchCodec {
public static final byte MAGIC_V0 = 0x22;
Expand Down Expand Up @@ -56,7 +56,7 @@ public static StreamRecordBatch duplicateDecode(ByteBuf buf) {
long baseOffset = buf.readLong();
int lastOffsetDelta = buf.readInt();
int payloadLength = buf.readInt();
ByteBuf payload = DirectByteBufAlloc.byteBuffer(payloadLength, DirectByteBufAlloc.DECODE_RECORD);
ByteBuf payload = ByteBufAlloc.byteBuffer(payloadLength, ByteBufAlloc.DECODE_RECORD);
buf.readBytes(payload);
return new StreamRecordBatch(streamId, epoch, baseOffset, lastOffsetDelta, payload);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

package com.automq.stream.s3.cache;

import com.automq.stream.s3.DirectByteBufAlloc;
import com.automq.stream.s3.ByteBufAlloc;
import com.automq.stream.s3.cache.DefaultS3BlockCache.ReadAheadRecord;
import com.automq.stream.s3.metrics.S3StreamMetricsManager;
import com.automq.stream.s3.model.StreamRecordBatch;
Expand All @@ -35,7 +35,7 @@

import static com.automq.stream.s3.model.StreamRecordBatch.OBJECT_OVERHEAD;

public class BlockCache implements DirectByteBufAlloc.OOMHandler {
public class BlockCache implements ByteBufAlloc.OOMHandler {
public static final Integer ASYNC_READ_AHEAD_NOOP_OFFSET = -1;
static final int BLOCK_SIZE = 1024 * 1024;
private static final Logger LOGGER = LoggerFactory.getLogger(BlockCache.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@

package com.automq.stream.s3.compact.operator;

import com.automq.stream.s3.ByteBufAlloc;
import com.automq.stream.s3.DataBlockIndex;
import com.automq.stream.s3.DirectByteBufAlloc;
import com.automq.stream.s3.ObjectReader;
import com.automq.stream.s3.StreamDataBlock;
import com.automq.stream.s3.metadata.S3ObjectMetadata;
Expand All @@ -34,7 +34,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static com.automq.stream.s3.DirectByteBufAlloc.STREAM_SET_OBJECT_COMPACTION_READ;
import static com.automq.stream.s3.ByteBufAlloc.STREAM_SET_OBJECT_COMPACTION_READ;

//TODO: refactor to reduce duplicate code with ObjectWriter
public class DataBlockReader {
Expand Down Expand Up @@ -145,7 +145,7 @@ public void readContinuousBlocks(List<StreamDataBlock> streamDataBlocks, long ma
final int finalEnd = end + 1; // include current block
CompletableFuture.allOf(cfList.toArray(new CompletableFuture[0]))
.thenAccept(v -> {
CompositeByteBuf compositeByteBuf = DirectByteBufAlloc.compositeByteBuffer();
CompositeByteBuf compositeByteBuf = ByteBufAlloc.compositeByteBuffer();
for (int j = 0; j < iterations; j++) {
compositeByteBuf.addComponent(true, bufferMap.get(j));
}
Expand Down Expand Up @@ -195,7 +195,7 @@ private CompletableFuture<ByteBuf> rangeRead0(long start, long end) {
if (throttleBucket == null) {
return s3Operator.rangeRead(objectKey, start, end, ThrottleStrategy.THROTTLE_2).thenApply(buf -> {
// convert heap buffer to direct buffer
ByteBuf directBuf = DirectByteBufAlloc.byteBuffer(buf.readableBytes(), STREAM_SET_OBJECT_COMPACTION_READ);
ByteBuf directBuf = ByteBufAlloc.byteBuffer(buf.readableBytes(), STREAM_SET_OBJECT_COMPACTION_READ);
directBuf.writeBytes(buf);
buf.release();
return directBuf;
Expand All @@ -205,7 +205,7 @@ private CompletableFuture<ByteBuf> rangeRead0(long start, long end) {
.thenCompose(v ->
s3Operator.rangeRead(objectKey, start, end, ThrottleStrategy.THROTTLE_2).thenApply(buf -> {
// convert heap buffer to direct buffer
ByteBuf directBuf = DirectByteBufAlloc.byteBuffer(buf.readableBytes(), STREAM_SET_OBJECT_COMPACTION_READ);
ByteBuf directBuf = ByteBufAlloc.byteBuffer(buf.readableBytes(), STREAM_SET_OBJECT_COMPACTION_READ);
directBuf.writeBytes(buf);
buf.release();
return directBuf;
Expand Down
Loading

0 comments on commit 6629d81

Please sign in to comment.