Skip to content

Commit

Permalink
feat(s3stream): dynamically adjust read ahead speed (AutoMQ#713)
Browse files Browse the repository at this point in the history
Signed-off-by: Shichao Nie <[email protected]>
  • Loading branch information
SCNieh authored Nov 24, 2023
1 parent 670b671 commit 7835905
Show file tree
Hide file tree
Showing 13 changed files with 557 additions and 45 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
<guava.version>32.0.1-jre</guava.version>
<slf4j.version>2.0.9</slf4j.version>
<snakeyaml.version>2.2</snakeyaml.version>
<s3stream.version>0.5.4-SNAPSHOT</s3stream.version>
<s3stream.version>0.5.5-SNAPSHOT</s3stream.version>

<!-- Flat buffers related -->
<flatbuffers.version>23.5.26</flatbuffers.version>
Expand Down
2 changes: 1 addition & 1 deletion s3stream/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.automq.elasticstream</groupId>
<artifactId>s3stream</artifactId>
<version>0.5.4-SNAPSHOT</version>
<version>0.5.5-SNAPSHOT</version>
<properties>
<mockito-core.version>5.5.0</mockito-core.version>
<junit-jupiter.version>5.10.0</junit-jupiter.version>
Expand Down
2 changes: 1 addition & 1 deletion s3stream/src/main/java/com/automq/stream/s3/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public class Config {
private int walWriteRateLimit = 3000;
private long walUploadThreshold = 100 * 1024 * 1024;
private int streamSplitSize = 16777216;
private int objectBlockSize = 8388608;
private int objectBlockSize = 1048576;
private int objectPartSize = 16777216;
private long blockCacheSize = 100 * 1024 * 1024;
private int streamObjectCompactionIntervalMinutes = 60;
Expand Down
32 changes: 20 additions & 12 deletions s3stream/src/main/java/com/automq/stream/s3/cache/BlockCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,19 +47,19 @@ public class BlockCache implements DirectByteBufAlloc.OOMHandler {
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private final ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();
private final ReentrantReadWriteLock.ReadLock readLock = lock.readLock();
private final List<CacheEvictListener> cacheEvictListeners = new ArrayList<>();

public BlockCache(long maxSize) {
this.maxSize = maxSize;
DirectByteBufAlloc.registerOOMHandlers(this);
}

public void registerListener(CacheEvictListener listener) {
cacheEvictListeners.add(listener);
}

public void put(long streamId, List<StreamRecordBatch> records) {
try {
writeLock.lock();
put0(streamId, -1, records);
} finally {
writeLock.unlock();
}
put(streamId, -1, records);
}

public void put(long streamId, long raAsyncOffset, List<StreamRecordBatch> records) {
Expand All @@ -86,13 +86,13 @@ void put0(long streamId, long raAsyncOffset, List<StreamRecordBatch> records) {
}

if (raAsyncOffset < startOffset || raAsyncOffset >= endOffset) {
LOGGER.error("raAsyncOffset out of range, stream={}, raAsyncOffset: {}, startOffset: {}, endOffset: {}", streamId, raAsyncOffset, startOffset, endOffset);
LOGGER.warn("raAsyncOffset out of range, stream={}, raAsyncOffset: {}, startOffset: {}, endOffset: {}", streamId, raAsyncOffset, startOffset, endOffset);
}

int size = records.stream().mapToInt(StreamRecordBatch::size).sum();

if (LOGGER.isDebugEnabled()) {
LOGGER.debug("[S3BlockCache] put block cache, stream={}, {}-{}, total bytes: {} ", streamId, startOffset, endOffset, size);
LOGGER.debug("[S3BlockCache] put block cache, stream={}, {}-{}, raAsyncOffset: {}, total bytes: {} ", streamId, startOffset, endOffset, raAsyncOffset, size);
}

// remove overlapped part.
Expand All @@ -103,7 +103,7 @@ void put0(long streamId, long raAsyncOffset, List<StreamRecordBatch> records) {
break;
}
if (isWithinRange(raAsyncOffset, cacheBlock.firstOffset, cacheBlock.lastOffset) && cacheBlock.readAheadRecord == null) {
cacheBlock.readAheadRecord = new ReadAheadRecord(endOffset, size);
cacheBlock.readAheadRecord = new ReadAheadRecord(endOffset);
}
// overlap is a rare case, so removeIf is fine for the performance.
records.removeIf(record -> {
Expand All @@ -128,7 +128,7 @@ void put0(long streamId, long raAsyncOffset, List<StreamRecordBatch> records) {
partSize += record.size();
} else {
ReadAheadRecord raRecord = isWithinRange(raAsyncOffset, batchList.getFirst().getBaseOffset(), batchList.getLast().getLastOffset()) ?
new ReadAheadRecord(endOffset, size) : null;
new ReadAheadRecord(endOffset) : null;
put(streamId, streamCache, new CacheBlock(batchList, raRecord));
batchList = new LinkedList<>();
batchList.add(record);
Expand All @@ -138,7 +138,7 @@ void put0(long streamId, long raAsyncOffset, List<StreamRecordBatch> records) {
}
if (!batchList.isEmpty()) {
ReadAheadRecord raRecord = isWithinRange(raAsyncOffset, batchList.getFirst().getBaseOffset(), batchList.getLast().getLastOffset()) ?
new ReadAheadRecord(endOffset, size) : null;
new ReadAheadRecord(endOffset) : null;
put(streamId, streamCache, new CacheBlock(batchList, raRecord));
}
}
Expand Down Expand Up @@ -183,7 +183,6 @@ boolean checkRange0(long streamId, long startOffset, int maxBytes) {
return nextMaxBytes <= 0;
}


/**
* Get records from cache.
* Note: the records is retained, the caller should release it.
Expand Down Expand Up @@ -273,6 +272,10 @@ private void ensureCapacity(int size) {
}

private int ensureCapacity0(int size, boolean forceEvict) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("[S3BlockCache] block cache size: {}/{}, ensure size: {} ", this.size.get(), maxSize, size);
}

if (!forceEvict && (maxSize - this.size.get() >= size)) {
return 0;
}
Expand All @@ -294,6 +297,7 @@ private int ensureCapacity0(int size, boolean forceEvict) {
} else {
cacheBlock.free();
evictBytes += cacheBlock.size;
cacheEvictListeners.forEach(listener -> listener.onCacheEvict(entry.getKey().streamId, cacheBlock.firstOffset, cacheBlock.lastOffset, cacheBlock.size));
if (forceEvict) {
if (evictBytes >= size) {
return evictBytes;
Expand Down Expand Up @@ -373,4 +377,8 @@ public List<ReadAheadRecord> getReadAheadRecords() {
return readAheadRecords;
}
}

public interface CacheEvictListener {
void onCacheEvict(long streamId, long startOffset, long endOffset, int size);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package com.automq.stream.s3.cache;

import com.automq.stream.s3.Config;
import com.automq.stream.s3.ObjectReader;
import com.automq.stream.s3.metadata.S3ObjectMetadata;
import com.automq.stream.s3.metrics.TimerUtil;
Expand All @@ -38,27 +39,28 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static com.automq.stream.s3.metadata.ObjectUtils.NOOP_OFFSET;

public class DefaultS3BlockCache implements S3BlockCache {
private static final Logger LOGGER = LoggerFactory.getLogger(DefaultS3BlockCache.class);
private static final Integer MAX_OBJECT_READER_SIZE = 100 * 1024 * 1024; // 100MB;
public static final Integer MAX_READ_AHEAD_SIZE = 40 * 1024 * 1024; // 40MB
public static final Integer MAX_OBJECT_READER_SIZE = 100 * 1024 * 1024; // 100MB;
private final ObjectReaderLRUCache objectReaderLRU = new ObjectReaderLRUCache(MAX_OBJECT_READER_SIZE);
private final Map<ReadAheadTaskKey, CompletableFuture<Void>> inflightReadAheadTasks = new ConcurrentHashMap<>();
private final Semaphore readAheadLimiter = new Semaphore(16);
private final BlockCache cache;
private final ExecutorService mainExecutor;
private final ObjectManager objectManager;
private final S3Operator s3Operator;
private final DataBlockReadAccumulator dataBlockReadAccumulator;
private final int blockSize;
private final ReadAheadManager readAheadManager;

public DefaultS3BlockCache(long cacheBytesSize, ObjectManager objectManager, S3Operator s3Operator) {
this.cache = new BlockCache(cacheBytesSize);
public DefaultS3BlockCache(Config config, ObjectManager objectManager, S3Operator s3Operator) {
this.blockSize = config.objectBlockSize();
this.cache = new BlockCache(config.blockCacheSize());
this.readAheadManager = new ReadAheadManager(blockSize, this.cache);
this.mainExecutor = Threads.newFixedThreadPoolWithMonitor(
2,
"s3-block-cache-main",
Expand All @@ -78,11 +80,13 @@ public CompletableFuture<ReadDataBlock> read(long streamId, long startOffset, lo
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("[S3BlockCache] read data, stream={}, {}-{}, total bytes: {} ", streamId, startOffset, endOffset, maxBytes);
}
this.readAheadManager.updateReadProgress(streamId, startOffset);
TimerUtil timerUtil = new TimerUtil();
CompletableFuture<ReadDataBlock> readCf = new CompletableFuture<>();
ReadAheadAgent agent = this.readAheadManager.getOrCreateReadAheadAgent(streamId, startOffset);
// submit read task to mainExecutor to avoid read slower the caller thread.
mainExecutor.execute(() -> FutureUtil.exec(() ->
FutureUtil.propagate(read0(streamId, startOffset, endOffset, maxBytes, true), readCf), readCf, LOGGER, "read")
FutureUtil.propagate(read0(streamId, startOffset, endOffset, maxBytes, agent, true), readCf), readCf, LOGGER, "read")
);
readCf.whenComplete((ret, ex) -> {
if (ex != null) {
Expand All @@ -91,17 +95,24 @@ public CompletableFuture<ReadDataBlock> read(long streamId, long startOffset, lo
return;
}

this.readAheadManager.updateReadResult(streamId, startOffset, ret.getRecords().get(ret.getRecords().size() - 1).getLastOffset(),
ret.getRecords().stream().mapToInt(StreamRecordBatch::size).sum());

if (ret.getCacheAccessType() == CacheAccessType.BLOCK_CACHE_HIT) {
OperationMetricsStats.getCounter(S3Operation.READ_STORAGE_BLOCK_CACHE).inc();
} else {
OperationMetricsStats.getCounter(S3Operation.READ_STORAGE_BLOCK_CACHE_MISS).inc();
}
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("[S3BlockCache] read data complete, cache hit: {}, stream={}, {}-{}, total bytes: {} ",
ret.getCacheAccessType() == CacheAccessType.BLOCK_CACHE_HIT, streamId, startOffset, endOffset, maxBytes);
}
OperationMetricsStats.getHistogram(S3Operation.READ_STORAGE_BLOCK_CACHE).update(timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
});
return readCf;
}

public CompletableFuture<ReadDataBlock> read0(long streamId, long startOffset, long endOffset, int maxBytes, boolean awaitReadAhead) {
public CompletableFuture<ReadDataBlock> read0(long streamId, long startOffset, long endOffset, int maxBytes, ReadAheadAgent agent, boolean awaitReadAhead) {
if (startOffset >= endOffset || maxBytes <= 0) {
return CompletableFuture.completedFuture(new ReadDataBlock(Collections.emptyList(), CacheAccessType.BLOCK_CACHE_MISS));
}
Expand All @@ -112,7 +123,7 @@ public CompletableFuture<ReadDataBlock> read0(long streamId, long startOffset, l
CompletableFuture<Void> readAheadCf = inflightReadAheadTasks.get(new ReadAheadTaskKey(streamId, startOffset));
if (readAheadCf != null) {
CompletableFuture<ReadDataBlock> readCf = new CompletableFuture<>();
readAheadCf.whenComplete((nil, ex) -> FutureUtil.propagate(read0(streamId, startOffset, endOffset, maxBytes, false), readCf));
readAheadCf.whenComplete((nil, ex) -> FutureUtil.propagate(read0(streamId, startOffset, endOffset, maxBytes, agent, false), readCf));
return readCf;
}
}
Expand All @@ -124,21 +135,21 @@ public CompletableFuture<ReadDataBlock> read0(long streamId, long startOffset, l
BlockCache.GetCacheResult cacheRst = cache.get(streamId, nextStartOffset, endOffset, nextMaxBytes);
List<StreamRecordBatch> cacheRecords = cacheRst.getRecords();
if (!cacheRecords.isEmpty()) {
asyncReadAhead(streamId, agent, cacheRst.getReadAheadRecords());
nextStartOffset = cacheRecords.get(cacheRecords.size() - 1).getLastOffset();
nextMaxBytes -= Math.min(nextMaxBytes, cacheRecords.stream().mapToInt(StreamRecordBatch::size).sum());
if (nextStartOffset >= endOffset || nextMaxBytes == 0) {
// cache hit
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("[S3BlockCache] read data hit cache, stream={}, {}-{}, total bytes: {} ", streamId, startOffset, endOffset, maxBytes);
}
asyncReadAhead(streamId, cacheRst.getReadAheadRecords());
return CompletableFuture.completedFuture(new ReadDataBlock(cacheRecords, CacheAccessType.BLOCK_CACHE_HIT));
} else {
// cache partially hit
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("[S3BlockCache] read data partially hit cache, stream={}, {}-{}, total bytes: {} ", streamId, nextStartOffset, endOffset, nextMaxBytes);
}
return read0(streamId, nextStartOffset, endOffset, nextMaxBytes, true).thenApply(rst -> {
return read0(streamId, nextStartOffset, endOffset, nextMaxBytes, agent, true).thenApply(rst -> {
List<StreamRecordBatch> records = new ArrayList<>(cacheRecords);
records.addAll(rst.getRecords());
return new ReadDataBlock(records, CacheAccessType.BLOCK_CACHE_MISS);
Expand All @@ -160,6 +171,8 @@ public CompletableFuture<ReadDataBlock> read0(long streamId, long startOffset, l
if (!readAheadRecords.isEmpty()) {
long readEndOffset = rst.getRecords().get(rst.getRecords().size() - 1).getLastOffset();
cache.put(streamId, readEndOffset, readAheadRecords);
agent.updateReadAheadResult(readAheadRecords.get(readAheadRecords.size() - 1).getLastOffset(),
readAheadRecords.stream().mapToInt(StreamRecordBatch::size).sum());
}
return rst;
});
Expand Down Expand Up @@ -261,14 +274,13 @@ private CompletableFuture<ReadDataBlock> readFromS3(long streamId, long endOffse
}, mainExecutor);
}

private void asyncReadAhead(long streamId, List<ReadAheadRecord> readAheadRecords) {
private void asyncReadAhead(long streamId, ReadAheadAgent agent, List<ReadAheadRecord> readAheadRecords) {
if (readAheadRecords.isEmpty()) {
return;
}
ReadAheadRecord lastRecord = readAheadRecords.get(readAheadRecords.size() - 1);
long nextRaOffset = lastRecord.nextRaOffset;
int currRaSizeSum = readAheadRecords.stream().mapToInt(ReadAheadRecord::currRaSize).sum();
int nextRaSize = Math.min(MAX_READ_AHEAD_SIZE, currRaSizeSum * 2);
long nextRaOffset = lastRecord.nextRAOffset();
int nextRaSize = agent.getNextReadAheadSize();

if (LOGGER.isDebugEnabled()) {
LOGGER.debug("[S3BlockCache] async read ahead, stream={}, {}-{}, total bytes: {} ",
Expand All @@ -287,18 +299,15 @@ private void asyncReadAhead(long streamId, List<ReadAheadRecord> readAheadRecord
if (objects.isEmpty()) {
return CompletableFuture.completedFuture(null);
}
if (!readAheadLimiter.tryAcquire()) {
// if inflight read ahead tasks exceed limit, skip this read ahead.
return CompletableFuture.completedFuture(null);
}

ReadContext context = new ReadContext(objects, nextRaOffset, nextRaSize);
return readFromS3(streamId, NOOP_OFFSET, context).thenAccept((rst) -> {
readAheadLimiter.release();
rst.getRecords().forEach(StreamRecordBatch::release);
List<StreamRecordBatch> records = context.readAheadRecords;
if (!records.isEmpty()) {
cache.put(streamId, records);
cache.put(streamId, records.get(0).getBaseOffset(), records);
agent.updateReadAheadResult(records.get(records.size() - 1).getLastOffset(),
records.stream().mapToInt(StreamRecordBatch::size).sum());
}
});
}, mainExecutor).whenComplete((nil, ex) -> {
Expand Down Expand Up @@ -345,7 +354,7 @@ record ReadAheadTaskKey(long streamId, long startOffset) {

}

public record ReadAheadRecord(long nextRaOffset, int currRaSize) {
public record ReadAheadRecord(long nextRAOffset) {
}

}
Loading

0 comments on commit 7835905

Please sign in to comment.