Skip to content

Commit

Permalink
feat(s3stream): support network throttle with priority (#598)
Browse files Browse the repository at this point in the history
Signed-off-by: Shichao Nie <[email protected]>
  • Loading branch information
SCNieh authored Nov 8, 2023
1 parent bbc1808 commit e8aaf94
Show file tree
Hide file tree
Showing 11 changed files with 97 additions and 120 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package com.automq.stream.s3;

import com.automq.stream.s3.network.ThrottleStrategy;
import com.automq.stream.utils.CloseableIterator;
import com.automq.stream.api.ErrorCode;
import com.automq.stream.api.StreamClientException;
Expand Down Expand Up @@ -73,7 +74,7 @@ public CompletableFuture<List<DataBlockIndex>> find(long streamId, long startOff
}

public CompletableFuture<DataBlock> read(DataBlockIndex block) {
CompletableFuture<ByteBuf> rangeReadCf = s3Operator.rangeRead(objectKey, block.startPosition(), block.endPosition());
CompletableFuture<ByteBuf> rangeReadCf = s3Operator.rangeRead(objectKey, block.startPosition(), block.endPosition(), ThrottleStrategy.THROTTLE_1);
return rangeReadCf.thenApply(buf -> new DataBlock(buf, block.recordCount()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public class StreamObjectCopier {
public StreamObjectCopier(long objectId, S3Operator s3Operator) {
this.s3Operator = s3Operator;
// TODO: use a better clusterName
this.writer = s3Operator.writer(ObjectUtils.genKey(0, objectId), ThrottleStrategy.THROTTLE);
this.writer = s3Operator.writer(ObjectUtils.genKey(0, objectId), ThrottleStrategy.THROTTLE_2);
this.completedObjects = new LinkedList<>();
this.nextObjectDataStartPosition = 0;
this.blockCount = 0;
Expand Down
49 changes: 16 additions & 33 deletions s3stream/src/main/java/com/automq/stream/s3/cache/BlockCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
public class BlockCache implements DirectByteBufAlloc.OOMHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(BlockCache.class);
static final int BLOCK_SIZE = 1024 * 1024;
static final int MAX_READAHEAD_SIZE = 16 * 1024 * 1024;
static final int MAX_READ_AHEAD_SIZE = 16 * 1024 * 1024;
private final long maxSize;
final Map<Long, StreamCache> stream2cache = new HashMap<>();
private final LRUCache<CacheKey, Integer> inactive = new LRUCache<>();
Expand Down Expand Up @@ -75,8 +75,8 @@ void put0(long streamId, List<StreamRecordBatch> records) {
long startOffset = records.get(0).getBaseOffset();
long endOffset = records.get(records.size() - 1).getLastOffset();

// generate readahead.
Readahead readahead = genReadahead(streamId, records);
// generate read ahead.
ReadAhead readahead = genReadahead(streamId, records);

// remove overlapped part.
Map.Entry<Long, CacheBlock> floorEntry = streamCache.blocks.floorEntry(startOffset);
Expand Down Expand Up @@ -109,7 +109,7 @@ void put0(long streamId, List<StreamRecordBatch> records) {
part.add(record);
partSize += record.size();
} else {
// put readahead to the first block.
// put read ahead to the first block.
put(streamId, streamCache, new CacheBlock(part, readahead));
readahead = null;
part = new ArrayList<>(records.size() / 2);
Expand Down Expand Up @@ -150,7 +150,7 @@ public GetCacheResult get0(long streamId, long startOffset, long endOffset, int
NavigableMap<Long, CacheBlock> streamCacheBlocks = streamCache.blocks.tailMap(floorEntry != null ? floorEntry.getKey() : startOffset, true);
long nextStartOffset = startOffset;
int nextMaxBytes = maxBytes;
Readahead readahead = null;
ReadAhead readahead = null;
LinkedList<StreamRecordBatch> records = new LinkedList<>();
for (Map.Entry<Long, CacheBlock> entry : streamCacheBlocks.entrySet()) {
CacheBlock cacheBlock = entry.getValue();
Expand Down Expand Up @@ -261,7 +261,7 @@ private void put(long streamId, StreamCache streamCache, CacheBlock cacheBlock)
}


Readahead genReadahead(long streamId, List<StreamRecordBatch> records) {
ReadAhead genReadahead(long streamId, List<StreamRecordBatch> records) {
if (records.isEmpty()) {
return null;
}
Expand All @@ -274,18 +274,16 @@ Readahead genReadahead(long streamId, List<StreamRecordBatch> records) {
size = alignBlockSize(size / 2);
streamCache.evict = false;
} else {
if (size < MAX_READAHEAD_SIZE / 2) {
if (size < MAX_READ_AHEAD_SIZE / 2) {
// exponential growth
size = size * 2;
} else {
// linear growth
size += BLOCK_SIZE;
}
}
size = Math.min(Math.max(size, BLOCK_SIZE), MAX_READAHEAD_SIZE);
return new

Readahead(startOffset, size);
size = Math.min(Math.max(size, BLOCK_SIZE), MAX_READ_AHEAD_SIZE);
return new ReadAhead(startOffset, size);
}

int alignBlockSize(int size) {
Expand Down Expand Up @@ -343,9 +341,9 @@ public static class CacheBlock {
long firstOffset;
long lastOffset;
int size;
Readahead readahead;
ReadAhead readahead;

public CacheBlock(List<StreamRecordBatch> records, Readahead readahead) {
public CacheBlock(List<StreamRecordBatch> records, ReadAhead readahead) {
this.records = records;
this.firstOffset = records.get(0).getBaseOffset();
this.lastOffset = records.get(records.size() - 1).getLastOffset();
Expand All @@ -361,9 +359,9 @@ public void free() {

public static class GetCacheResult {
private final List<StreamRecordBatch> records;
private final Readahead readahead;
private final ReadAhead readahead;

private GetCacheResult(List<StreamRecordBatch> records, Readahead readahead) {
private GetCacheResult(List<StreamRecordBatch> records, ReadAhead readahead) {
this.records = records;
this.readahead = readahead;
}
Expand All @@ -372,15 +370,15 @@ public static GetCacheResult empty() {
return new GetCacheResult(Collections.emptyList(), null);
}

public static GetCacheResult of(List<StreamRecordBatch> records, Readahead readahead) {
public static GetCacheResult of(List<StreamRecordBatch> records, ReadAhead readahead) {
return new GetCacheResult(records, readahead);
}

public List<StreamRecordBatch> getRecords() {
return records;
}

public Optional<Readahead> getReadahead() {
public Optional<ReadAhead> getReadAhead() {
if (readahead == null) {
return Optional.empty();
} else {
Expand All @@ -389,22 +387,7 @@ public Optional<Readahead> getReadahead() {
}
}

public static class Readahead {
private final long startOffset;
private final int size;

public Readahead(long startOffset, int size) {
this.startOffset = startOffset;
this.size = size;
}

public long getStartOffset() {
return startOffset;
}

public int getSize() {
return size;
}
public record ReadAhead(long startOffset, int size) {
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -50,8 +49,8 @@
public class DefaultS3BlockCache implements S3BlockCache {
private static final Logger LOGGER = LoggerFactory.getLogger(DefaultS3BlockCache.class);
private final LRUCache<Long, ObjectReader> objectReaderLRU = new LRUCache<>();
private final Map<ReadingTaskKey, CompletableFuture<?>> readaheadTasks = new ConcurrentHashMap<>();
private final Semaphore readaheadLimiter = new Semaphore(16);
private final Map<ReadingTaskKey, CompletableFuture<?>> readAheadTasks = new ConcurrentHashMap<>();
private final Semaphore readAheadLimiter = new Semaphore(16);
private final BlockCache cache;
private final ExecutorService mainExecutor;
private final ObjectManager objectManager;
Expand All @@ -66,7 +65,7 @@ public DefaultS3BlockCache(long cacheBytesSize, ObjectManager objectManager, S3O
LOGGER);
this.objectManager = objectManager;
this.s3Operator = s3Operator;
dataBlockReadAccumulator = new DataBlockReadAccumulator(dataBlockRecords -> {
this.dataBlockReadAccumulator = new DataBlockReadAccumulator(dataBlockRecords -> {
List<StreamRecordBatch> records = dataBlockRecords.records();
if (!records.isEmpty()) {
long streamId = records.get(0).getStreamId();
Expand Down Expand Up @@ -101,36 +100,37 @@ public CompletableFuture<ReadDataBlock> read(long streamId, long startOffset, lo
return readCf;
}

public CompletableFuture<ReadDataBlock> read0(long streamId, long startOffset, long endOffset, int maxBytes, boolean awaitReadahead) {
public CompletableFuture<ReadDataBlock> read0(long streamId, long startOffset, long endOffset, int maxBytes, boolean awaitReadAhead) {
if (startOffset >= endOffset || maxBytes <= 0) {
return CompletableFuture.completedFuture(new ReadDataBlock(Collections.emptyList()));
}

if (awaitReadahead) {
// expect readahead will fill the cache with the data we need.
CompletableFuture<?> readaheadCf = readaheadTasks.get(new ReadingTaskKey(streamId, startOffset));
if (readaheadCf != null) {
if (awaitReadAhead) {
// expect read ahead will fill the cache with the data we need.
CompletableFuture<?> readAheadCf = readAheadTasks.get(new ReadingTaskKey(streamId, startOffset));
if (readAheadCf != null) {
CompletableFuture<ReadDataBlock> readCf = new CompletableFuture<>();
readaheadCf.whenComplete((nil, ex) -> FutureUtil.propagate(read0(streamId, startOffset, endOffset, maxBytes, false), readCf));
readAheadCf.whenComplete((nil, ex) -> FutureUtil.propagate(read0(streamId, startOffset, endOffset, maxBytes, false), readCf));
return readCf;
}
}

long nextStartOffset = startOffset;
int nextMaxBytes = maxBytes;

// 1. get from cache
BlockCache.GetCacheResult cacheRst = cache.get(streamId, nextStartOffset, endOffset, nextMaxBytes);
List<StreamRecordBatch> cacheRecords = cacheRst.getRecords();
if (!cacheRecords.isEmpty()) {
nextStartOffset = cacheRecords.get(cacheRecords.size() - 1).getLastOffset();
nextMaxBytes -= Math.min(nextMaxBytes, cacheRecords.stream().mapToInt(StreamRecordBatch::size).sum());
}
cacheRst.getReadahead().ifPresent(readahead -> backgroundReadahead(streamId, readahead));
cacheRst.getReadAhead().ifPresent(readAhead -> backgroundReadAhead(streamId, readAhead));
if (nextStartOffset >= endOffset || nextMaxBytes == 0) {
return CompletableFuture.completedFuture(new ReadDataBlock(cacheRecords));
}
// 2. get from s3

// 2. get from s3
ReadContext context = new ReadContext(Collections.emptyList(), nextStartOffset, nextMaxBytes);
CompletableFuture<List<S3ObjectMetadata>> getObjectsCf = objectManager.getObjects(streamId, nextStartOffset, endOffset, 2);
return getObjectsCf.thenComposeAsync(objects -> {
Expand Down Expand Up @@ -159,7 +159,7 @@ private CompletableFuture<ReadDataBlock> readFromS3(long streamId, long endOffse
context.objects = objects;
context.objectIndex = 0;
if (context.objects.isEmpty()) {
if (endOffset == -1L) { // background readahead
if (endOffset == -1L) { // background read ahead
return true;
} else {
LOGGER.error("[BUG] fail to read, expect objects not empty, streamId={}, startOffset={}, endOffset={}",
Expand Down Expand Up @@ -238,31 +238,31 @@ private CompletableFuture<ReadDataBlock> readFromS3(long streamId, long endOffse
}, mainExecutor);
}

private void backgroundReadahead(long streamId, BlockCache.Readahead readahead) {
private void backgroundReadAhead(long streamId, BlockCache.ReadAhead readahead) {
mainExecutor.execute(() -> {
CompletableFuture<List<S3ObjectMetadata>> getObjectsCf = objectManager.getObjects(streamId, readahead.getStartOffset(), NOOP_OFFSET, 2);
CompletableFuture<List<S3ObjectMetadata>> getObjectsCf = objectManager.getObjects(streamId, readahead.startOffset(), NOOP_OFFSET, 2);
getObjectsCf.thenAccept(objects -> {
if (objects.isEmpty()) {
return;
}
if (!readaheadLimiter.tryAcquire()) {
// if inflight readahead tasks exceed limit, skip this readahead.
if (!readAheadLimiter.tryAcquire()) {
// if inflight read ahead tasks exceed limit, skip this read ahead.
return;
}

CompletableFuture<ReadDataBlock> readaheadCf = readFromS3(streamId, NOOP_OFFSET,
new ReadContext(objects, readahead.getStartOffset(), readahead.getSize()));
ReadingTaskKey readingTaskKey = new ReadingTaskKey(streamId, readahead.getStartOffset());
readaheadTasks.put(readingTaskKey, readaheadCf);
readaheadCf
CompletableFuture<ReadDataBlock> readAheadCf = readFromS3(streamId, NOOP_OFFSET,
new ReadContext(objects, readahead.startOffset(), readahead.size()));
ReadingTaskKey readingTaskKey = new ReadingTaskKey(streamId, readahead.startOffset());
readAheadTasks.put(readingTaskKey, readAheadCf);
readAheadCf
.whenComplete((rst, ex) -> {
readaheadLimiter.release();
readAheadLimiter.release();
if (ex != null) {
LOGGER.error("background readahead {} fail", readahead, ex);
LOGGER.error("background read ahead {} fail", readahead, ex);
} else {
rst.getRecords().forEach(StreamRecordBatch::release);
}
readaheadTasks.remove(readingTaskKey, readaheadCf);
readAheadTasks.remove(readingTaskKey, readAheadCf);
});
});
});
Expand Down Expand Up @@ -300,27 +300,8 @@ public ReadContext(List<S3ObjectMetadata> objects, long startOffset, int maxByte

}

static class ReadingTaskKey {
final long streamId;
final long startOffset;

public ReadingTaskKey(long streamId, long startOffset) {
this.streamId = streamId;
this.startOffset = startOffset;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ReadingTaskKey that = (ReadingTaskKey) o;
return streamId == that.streamId && startOffset == that.startOffset;
}
record ReadingTaskKey(long streamId, long startOffset) {

@Override
public int hashCode() {
return Objects.hash(streamId, startOffset);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public void parseDataBlockIndex() {
}

public void parseDataBlockIndex(long startPosition) {
s3Operator.rangeRead(objectKey, startPosition, metadata.objectSize(), ThrottleStrategy.THROTTLE)
s3Operator.rangeRead(objectKey, startPosition, metadata.objectSize(), ThrottleStrategy.THROTTLE_2)
.thenAccept(buf -> {
try {
indexBlockCf.complete(IndexBlock.parse(buf, metadata.objectSize(), metadata.objectId()));
Expand Down Expand Up @@ -184,10 +184,10 @@ private void readContinuousBlocks0(List<StreamDataBlock> streamDataBlocks) {

private CompletableFuture<ByteBuf> rangeRead(long start, long end) {
if (throttleBucket == null) {
return s3Operator.rangeRead(objectKey, start, end);
return s3Operator.rangeRead(objectKey, start, end, ThrottleStrategy.THROTTLE_2);
} else {
return throttleBucket.asScheduler().consume(end - start + 1, bucketCbExecutor)
.thenCompose(v -> s3Operator.rangeRead(objectKey, start, end, ThrottleStrategy.THROTTLE));
.thenCompose(v -> s3Operator.rangeRead(objectKey, start, end, ThrottleStrategy.THROTTLE_2));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public DataBlockWriter(long objectId, S3Operator s3Operator, int partSizeThresho
waitingUploadBlocks = new LinkedList<>();
waitingUploadBlockCfs = new ConcurrentHashMap<>();
completedBlocks = new LinkedList<>();
writer = s3Operator.writer(objectKey, ThrottleStrategy.THROTTLE);
writer = s3Operator.writer(objectKey, ThrottleStrategy.THROTTLE_2);
}

public long getObjectId() {
Expand Down
Loading

0 comments on commit e8aaf94

Please sign in to comment.