diff --git a/pom.xml b/pom.xml
index 05c41f878..225b9fa9b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -53,7 +53,7 @@
32.0.1-jre
2.0.9
2.2
- 0.5.4-SNAPSHOT
+ 0.5.5-SNAPSHOT
23.5.26
diff --git a/s3stream/pom.xml b/s3stream/pom.xml
index 07a0c5180..77b2d436e 100644
--- a/s3stream/pom.xml
+++ b/s3stream/pom.xml
@@ -22,7 +22,7 @@
4.0.0
com.automq.elasticstream
s3stream
- 0.5.4-SNAPSHOT
+ 0.5.5-SNAPSHOT
5.5.0
5.10.0
diff --git a/s3stream/src/main/java/com/automq/stream/s3/Config.java b/s3stream/src/main/java/com/automq/stream/s3/Config.java
index f77d23f8b..341a84793 100644
--- a/s3stream/src/main/java/com/automq/stream/s3/Config.java
+++ b/s3stream/src/main/java/com/automq/stream/s3/Config.java
@@ -39,7 +39,7 @@ public class Config {
private int walWriteRateLimit = 3000;
private long walUploadThreshold = 100 * 1024 * 1024;
private int streamSplitSize = 16777216;
- private int objectBlockSize = 8388608;
+ private int objectBlockSize = 1048576;
private int objectPartSize = 16777216;
private long blockCacheSize = 100 * 1024 * 1024;
private int streamObjectCompactionIntervalMinutes = 60;
diff --git a/s3stream/src/main/java/com/automq/stream/s3/cache/BlockCache.java b/s3stream/src/main/java/com/automq/stream/s3/cache/BlockCache.java
index 24efd95f1..97970ea6e 100644
--- a/s3stream/src/main/java/com/automq/stream/s3/cache/BlockCache.java
+++ b/s3stream/src/main/java/com/automq/stream/s3/cache/BlockCache.java
@@ -47,19 +47,19 @@ public class BlockCache implements DirectByteBufAlloc.OOMHandler {
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private final ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();
private final ReentrantReadWriteLock.ReadLock readLock = lock.readLock();
+ private final List cacheEvictListeners = new ArrayList<>();
public BlockCache(long maxSize) {
this.maxSize = maxSize;
DirectByteBufAlloc.registerOOMHandlers(this);
}
+ public void registerListener(CacheEvictListener listener) {
+ cacheEvictListeners.add(listener);
+ }
+
public void put(long streamId, List records) {
- try {
- writeLock.lock();
- put0(streamId, -1, records);
- } finally {
- writeLock.unlock();
- }
+ put(streamId, -1, records);
}
public void put(long streamId, long raAsyncOffset, List records) {
@@ -86,13 +86,13 @@ void put0(long streamId, long raAsyncOffset, List records) {
}
if (raAsyncOffset < startOffset || raAsyncOffset >= endOffset) {
- LOGGER.error("raAsyncOffset out of range, stream={}, raAsyncOffset: {}, startOffset: {}, endOffset: {}", streamId, raAsyncOffset, startOffset, endOffset);
+ LOGGER.warn("raAsyncOffset out of range, stream={}, raAsyncOffset: {}, startOffset: {}, endOffset: {}", streamId, raAsyncOffset, startOffset, endOffset);
}
int size = records.stream().mapToInt(StreamRecordBatch::size).sum();
if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("[S3BlockCache] put block cache, stream={}, {}-{}, total bytes: {} ", streamId, startOffset, endOffset, size);
+ LOGGER.debug("[S3BlockCache] put block cache, stream={}, {}-{}, raAsyncOffset: {}, total bytes: {} ", streamId, startOffset, endOffset, raAsyncOffset, size);
}
// remove overlapped part.
@@ -103,7 +103,7 @@ void put0(long streamId, long raAsyncOffset, List records) {
break;
}
if (isWithinRange(raAsyncOffset, cacheBlock.firstOffset, cacheBlock.lastOffset) && cacheBlock.readAheadRecord == null) {
- cacheBlock.readAheadRecord = new ReadAheadRecord(endOffset, size);
+ cacheBlock.readAheadRecord = new ReadAheadRecord(endOffset);
}
// overlap is a rare case, so removeIf is fine for the performance.
records.removeIf(record -> {
@@ -128,7 +128,7 @@ void put0(long streamId, long raAsyncOffset, List records) {
partSize += record.size();
} else {
ReadAheadRecord raRecord = isWithinRange(raAsyncOffset, batchList.getFirst().getBaseOffset(), batchList.getLast().getLastOffset()) ?
- new ReadAheadRecord(endOffset, size) : null;
+ new ReadAheadRecord(endOffset) : null;
put(streamId, streamCache, new CacheBlock(batchList, raRecord));
batchList = new LinkedList<>();
batchList.add(record);
@@ -138,7 +138,7 @@ void put0(long streamId, long raAsyncOffset, List records) {
}
if (!batchList.isEmpty()) {
ReadAheadRecord raRecord = isWithinRange(raAsyncOffset, batchList.getFirst().getBaseOffset(), batchList.getLast().getLastOffset()) ?
- new ReadAheadRecord(endOffset, size) : null;
+ new ReadAheadRecord(endOffset) : null;
put(streamId, streamCache, new CacheBlock(batchList, raRecord));
}
}
@@ -183,7 +183,6 @@ boolean checkRange0(long streamId, long startOffset, int maxBytes) {
return nextMaxBytes <= 0;
}
-
/**
* Get records from cache.
* Note: the records is retained, the caller should release it.
@@ -273,6 +272,10 @@ private void ensureCapacity(int size) {
}
private int ensureCapacity0(int size, boolean forceEvict) {
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("[S3BlockCache] block cache size: {}/{}, ensure size: {} ", this.size.get(), maxSize, size);
+ }
+
if (!forceEvict && (maxSize - this.size.get() >= size)) {
return 0;
}
@@ -294,6 +297,7 @@ private int ensureCapacity0(int size, boolean forceEvict) {
} else {
cacheBlock.free();
evictBytes += cacheBlock.size;
+ cacheEvictListeners.forEach(listener -> listener.onCacheEvict(entry.getKey().streamId, cacheBlock.firstOffset, cacheBlock.lastOffset, cacheBlock.size));
if (forceEvict) {
if (evictBytes >= size) {
return evictBytes;
@@ -373,4 +377,8 @@ public List getReadAheadRecords() {
return readAheadRecords;
}
}
+
+ public interface CacheEvictListener {
+ void onCacheEvict(long streamId, long startOffset, long endOffset, int size);
+ }
}
diff --git a/s3stream/src/main/java/com/automq/stream/s3/cache/DefaultS3BlockCache.java b/s3stream/src/main/java/com/automq/stream/s3/cache/DefaultS3BlockCache.java
index f75347d9f..549486b22 100644
--- a/s3stream/src/main/java/com/automq/stream/s3/cache/DefaultS3BlockCache.java
+++ b/s3stream/src/main/java/com/automq/stream/s3/cache/DefaultS3BlockCache.java
@@ -17,6 +17,7 @@
package com.automq.stream.s3.cache;
+import com.automq.stream.s3.Config;
import com.automq.stream.s3.ObjectReader;
import com.automq.stream.s3.metadata.S3ObjectMetadata;
import com.automq.stream.s3.metrics.TimerUtil;
@@ -38,7 +39,6 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@@ -46,19 +46,21 @@
public class DefaultS3BlockCache implements S3BlockCache {
private static final Logger LOGGER = LoggerFactory.getLogger(DefaultS3BlockCache.class);
- private static final Integer MAX_OBJECT_READER_SIZE = 100 * 1024 * 1024; // 100MB;
- public static final Integer MAX_READ_AHEAD_SIZE = 40 * 1024 * 1024; // 40MB
+ public static final Integer MAX_OBJECT_READER_SIZE = 100 * 1024 * 1024; // 100MB;
private final ObjectReaderLRUCache objectReaderLRU = new ObjectReaderLRUCache(MAX_OBJECT_READER_SIZE);
private final Map> inflightReadAheadTasks = new ConcurrentHashMap<>();
- private final Semaphore readAheadLimiter = new Semaphore(16);
private final BlockCache cache;
private final ExecutorService mainExecutor;
private final ObjectManager objectManager;
private final S3Operator s3Operator;
private final DataBlockReadAccumulator dataBlockReadAccumulator;
+ private final int blockSize;
+ private final ReadAheadManager readAheadManager;
- public DefaultS3BlockCache(long cacheBytesSize, ObjectManager objectManager, S3Operator s3Operator) {
- this.cache = new BlockCache(cacheBytesSize);
+ public DefaultS3BlockCache(Config config, ObjectManager objectManager, S3Operator s3Operator) {
+ this.blockSize = config.objectBlockSize();
+ this.cache = new BlockCache(config.blockCacheSize());
+ this.readAheadManager = new ReadAheadManager(blockSize, this.cache);
this.mainExecutor = Threads.newFixedThreadPoolWithMonitor(
2,
"s3-block-cache-main",
@@ -78,11 +80,13 @@ public CompletableFuture read(long streamId, long startOffset, lo
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("[S3BlockCache] read data, stream={}, {}-{}, total bytes: {} ", streamId, startOffset, endOffset, maxBytes);
}
+ this.readAheadManager.updateReadProgress(streamId, startOffset);
TimerUtil timerUtil = new TimerUtil();
CompletableFuture readCf = new CompletableFuture<>();
+ ReadAheadAgent agent = this.readAheadManager.getOrCreateReadAheadAgent(streamId, startOffset);
// submit read task to mainExecutor to avoid read slower the caller thread.
mainExecutor.execute(() -> FutureUtil.exec(() ->
- FutureUtil.propagate(read0(streamId, startOffset, endOffset, maxBytes, true), readCf), readCf, LOGGER, "read")
+ FutureUtil.propagate(read0(streamId, startOffset, endOffset, maxBytes, agent, true), readCf), readCf, LOGGER, "read")
);
readCf.whenComplete((ret, ex) -> {
if (ex != null) {
@@ -91,17 +95,24 @@ public CompletableFuture read(long streamId, long startOffset, lo
return;
}
+ this.readAheadManager.updateReadResult(streamId, startOffset, ret.getRecords().get(ret.getRecords().size() - 1).getLastOffset(),
+ ret.getRecords().stream().mapToInt(StreamRecordBatch::size).sum());
+
if (ret.getCacheAccessType() == CacheAccessType.BLOCK_CACHE_HIT) {
OperationMetricsStats.getCounter(S3Operation.READ_STORAGE_BLOCK_CACHE).inc();
} else {
OperationMetricsStats.getCounter(S3Operation.READ_STORAGE_BLOCK_CACHE_MISS).inc();
}
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("[S3BlockCache] read data complete, cache hit: {}, stream={}, {}-{}, total bytes: {} ",
+ ret.getCacheAccessType() == CacheAccessType.BLOCK_CACHE_HIT, streamId, startOffset, endOffset, maxBytes);
+ }
OperationMetricsStats.getHistogram(S3Operation.READ_STORAGE_BLOCK_CACHE).update(timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
});
return readCf;
}
- public CompletableFuture read0(long streamId, long startOffset, long endOffset, int maxBytes, boolean awaitReadAhead) {
+ public CompletableFuture read0(long streamId, long startOffset, long endOffset, int maxBytes, ReadAheadAgent agent, boolean awaitReadAhead) {
if (startOffset >= endOffset || maxBytes <= 0) {
return CompletableFuture.completedFuture(new ReadDataBlock(Collections.emptyList(), CacheAccessType.BLOCK_CACHE_MISS));
}
@@ -112,7 +123,7 @@ public CompletableFuture read0(long streamId, long startOffset, l
CompletableFuture readAheadCf = inflightReadAheadTasks.get(new ReadAheadTaskKey(streamId, startOffset));
if (readAheadCf != null) {
CompletableFuture readCf = new CompletableFuture<>();
- readAheadCf.whenComplete((nil, ex) -> FutureUtil.propagate(read0(streamId, startOffset, endOffset, maxBytes, false), readCf));
+ readAheadCf.whenComplete((nil, ex) -> FutureUtil.propagate(read0(streamId, startOffset, endOffset, maxBytes, agent, false), readCf));
return readCf;
}
}
@@ -124,6 +135,7 @@ public CompletableFuture read0(long streamId, long startOffset, l
BlockCache.GetCacheResult cacheRst = cache.get(streamId, nextStartOffset, endOffset, nextMaxBytes);
List cacheRecords = cacheRst.getRecords();
if (!cacheRecords.isEmpty()) {
+ asyncReadAhead(streamId, agent, cacheRst.getReadAheadRecords());
nextStartOffset = cacheRecords.get(cacheRecords.size() - 1).getLastOffset();
nextMaxBytes -= Math.min(nextMaxBytes, cacheRecords.stream().mapToInt(StreamRecordBatch::size).sum());
if (nextStartOffset >= endOffset || nextMaxBytes == 0) {
@@ -131,14 +143,13 @@ public CompletableFuture read0(long streamId, long startOffset, l
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("[S3BlockCache] read data hit cache, stream={}, {}-{}, total bytes: {} ", streamId, startOffset, endOffset, maxBytes);
}
- asyncReadAhead(streamId, cacheRst.getReadAheadRecords());
return CompletableFuture.completedFuture(new ReadDataBlock(cacheRecords, CacheAccessType.BLOCK_CACHE_HIT));
} else {
// cache partially hit
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("[S3BlockCache] read data partially hit cache, stream={}, {}-{}, total bytes: {} ", streamId, nextStartOffset, endOffset, nextMaxBytes);
}
- return read0(streamId, nextStartOffset, endOffset, nextMaxBytes, true).thenApply(rst -> {
+ return read0(streamId, nextStartOffset, endOffset, nextMaxBytes, agent, true).thenApply(rst -> {
List records = new ArrayList<>(cacheRecords);
records.addAll(rst.getRecords());
return new ReadDataBlock(records, CacheAccessType.BLOCK_CACHE_MISS);
@@ -160,6 +171,8 @@ public CompletableFuture read0(long streamId, long startOffset, l
if (!readAheadRecords.isEmpty()) {
long readEndOffset = rst.getRecords().get(rst.getRecords().size() - 1).getLastOffset();
cache.put(streamId, readEndOffset, readAheadRecords);
+ agent.updateReadAheadResult(readAheadRecords.get(readAheadRecords.size() - 1).getLastOffset(),
+ readAheadRecords.stream().mapToInt(StreamRecordBatch::size).sum());
}
return rst;
});
@@ -261,14 +274,13 @@ private CompletableFuture readFromS3(long streamId, long endOffse
}, mainExecutor);
}
- private void asyncReadAhead(long streamId, List readAheadRecords) {
+ private void asyncReadAhead(long streamId, ReadAheadAgent agent, List readAheadRecords) {
if (readAheadRecords.isEmpty()) {
return;
}
ReadAheadRecord lastRecord = readAheadRecords.get(readAheadRecords.size() - 1);
- long nextRaOffset = lastRecord.nextRaOffset;
- int currRaSizeSum = readAheadRecords.stream().mapToInt(ReadAheadRecord::currRaSize).sum();
- int nextRaSize = Math.min(MAX_READ_AHEAD_SIZE, currRaSizeSum * 2);
+ long nextRaOffset = lastRecord.nextRAOffset();
+ int nextRaSize = agent.getNextReadAheadSize();
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("[S3BlockCache] async read ahead, stream={}, {}-{}, total bytes: {} ",
@@ -287,18 +299,15 @@ private void asyncReadAhead(long streamId, List readAheadRecord
if (objects.isEmpty()) {
return CompletableFuture.completedFuture(null);
}
- if (!readAheadLimiter.tryAcquire()) {
- // if inflight read ahead tasks exceed limit, skip this read ahead.
- return CompletableFuture.completedFuture(null);
- }
ReadContext context = new ReadContext(objects, nextRaOffset, nextRaSize);
return readFromS3(streamId, NOOP_OFFSET, context).thenAccept((rst) -> {
- readAheadLimiter.release();
rst.getRecords().forEach(StreamRecordBatch::release);
List records = context.readAheadRecords;
if (!records.isEmpty()) {
- cache.put(streamId, records);
+ cache.put(streamId, records.get(0).getBaseOffset(), records);
+ agent.updateReadAheadResult(records.get(records.size() - 1).getLastOffset(),
+ records.stream().mapToInt(StreamRecordBatch::size).sum());
}
});
}, mainExecutor).whenComplete((nil, ex) -> {
@@ -345,7 +354,7 @@ record ReadAheadTaskKey(long streamId, long startOffset) {
}
- public record ReadAheadRecord(long nextRaOffset, int currRaSize) {
+ public record ReadAheadRecord(long nextRAOffset) {
}
}
diff --git a/s3stream/src/main/java/com/automq/stream/s3/cache/ReadAheadAgent.java b/s3stream/src/main/java/com/automq/stream/s3/cache/ReadAheadAgent.java
new file mode 100644
index 000000000..294fc117f
--- /dev/null
+++ b/s3stream/src/main/java/com/automq/stream/s3/cache/ReadAheadAgent.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.automq.stream.s3.cache;
+
+import com.automq.stream.s3.metrics.TimerUtil;
+import com.automq.stream.utils.LogContext;
+import com.google.common.base.Objects;
+import org.apache.commons.lang3.tuple.Pair;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class ReadAheadAgent {
+ private final Logger logger;
+ private static final Integer MAX_READ_AHEAD_SIZE = 40 * 1024 * 1024; // 40MB
+ private static final Integer S3_OPERATION_DELAY = 400; // 400ms
+ private final Lock lock = new ReentrantLock();
+ private final TimerUtil timer;
+ private final long streamId;
+ private final int dataBlockSize;
+ private final List> evictedOffsetRanges = new ArrayList<>();
+ private double bytesPerMillis;
+ private long readCount;
+ private long lastReadOffset;
+ private int lastReadSize;
+ private long readAheadEndOffset;
+ private int lastReadAheadSize;
+
+ public ReadAheadAgent(int dataBlockSize, long streamId, long startOffset) {
+ this.logger = new LogContext(String.format("[S3BlockCache] stream=%d ", streamId)).logger(ReadAheadAgent.class);
+ this.timer = new TimerUtil();
+ this.dataBlockSize = dataBlockSize;
+ this.streamId = streamId;
+ this.lastReadOffset = startOffset;
+ this.readCount = 0;
+ logger.info("create read ahead agent for stream={}, startOffset={}", streamId, startOffset);
+ }
+
+ public void updateReadProgress(long startOffset) {
+ try {
+ lock.lock();
+ if (startOffset != lastReadOffset) {
+ logger.error("update read progress for stream={} failed, offset not match: expected offset {}, but get {}", streamId, lastReadOffset, startOffset);
+ return;
+ }
+ long timeElapsed = timer.elapsedAs(TimeUnit.MILLISECONDS);
+ double lastReadSpeed = (double) this.lastReadSize / timeElapsed;
+ readCount++;
+ double factor = (double) readCount / (1 + readCount);
+ bytesPerMillis = (1 - factor) * bytesPerMillis + factor * lastReadSpeed;
+ if (logger.isDebugEnabled()) {
+ logger.debug("update read progress offset {}, lastReadSpeed: {} bytes/s, corrected speed: {} bytes/s", startOffset, lastReadSpeed * 1000, bytesPerMillis * 1000);
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public void updateReadResult(long startOffset, long endOffset, int size) {
+ try {
+ lock.lock();
+ if (startOffset != lastReadOffset) {
+ logger.error("update read result for stream={} failed, offset not match: expected offset {}, but get {}", streamId, lastReadOffset, startOffset);
+ return;
+ }
+ this.lastReadSize = size;
+ this.lastReadOffset = endOffset;
+ timer.reset();
+ if (logger.isDebugEnabled()) {
+ logger.debug("update read result offset {}-{}, size: {}, readAheadOffset: {}", startOffset, endOffset, size, readAheadEndOffset);
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public void updateReadAheadResult(long readAheadEndOffset, int readAheadSize) {
+ try {
+ lock.lock();
+ this.readAheadEndOffset = readAheadEndOffset;
+ this.lastReadAheadSize = readAheadSize;
+ if (logger.isDebugEnabled()) {
+ logger.debug("update read ahead offset {}, size: {}, lastReadOffset: {}", readAheadEndOffset, readAheadSize, lastReadOffset);
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public int getNextReadAheadSize() {
+ try {
+ lock.lock();
+ // remove range that is not within read ahead
+ this.evictedOffsetRanges.removeIf(range -> range.getLeft() >= readAheadEndOffset || range.getRight() <= lastReadOffset);
+ int nextSize = calculateNextSize();
+ this.lastReadAheadSize = nextSize;
+ if (logger.isDebugEnabled()) {
+ logger.debug("get next read ahead size {}, {}", nextSize, this);
+ }
+ return nextSize;
+ } finally {
+ lock.unlock();
+ }
+
+ }
+
+ private int calculateNextSize() {
+ long totalEvictedSize = this.evictedOffsetRanges.stream().mapToLong(range -> {
+ long left = Math.max(range.getLeft(), lastReadOffset);
+ long right = Math.min(range.getRight(), readAheadEndOffset);
+ return right - left;
+ }).sum();
+ double evictedFraction = (double) totalEvictedSize / (readAheadEndOffset - lastReadOffset);
+ int nextSize = (int) (bytesPerMillis * S3_OPERATION_DELAY * (1 - evictedFraction));
+ nextSize = Math.max(dataBlockSize, Math.min(nextSize, MAX_READ_AHEAD_SIZE));
+ return nextSize;
+ }
+
+ public long getStreamId() {
+ return streamId;
+ }
+
+ public long getReadAheadOffset() {
+ try {
+ lock.lock();
+ return readAheadEndOffset;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public long getLastReadAheadSize() {
+ try {
+ lock.lock();
+ return lastReadAheadSize;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public long getLastReadOffset() {
+ try {
+ lock.lock();
+ return lastReadOffset;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public int getLastReadSize() {
+ try {
+ lock.lock();
+ return lastReadSize;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public double getBytesPerMillis() {
+ try {
+ lock.lock();
+ return bytesPerMillis;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public void evict(long startOffset, long endOffset) {
+ try {
+ lock.lock();
+ if (startOffset >= endOffset
+ || lastReadOffset >= readAheadEndOffset
+ || endOffset < lastReadOffset
+ || startOffset > readAheadEndOffset) {
+ return;
+ }
+ logger.info("evict range [{}, {}], lastReadOffset: {}, readAheadOffset: {}", startOffset, endOffset, lastReadOffset, readAheadEndOffset);
+ this.evictedOffsetRanges.add(Pair.of(startOffset, endOffset));
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ ReadAheadAgent agent = (ReadAheadAgent) o;
+ return streamId == agent.streamId && lastReadOffset == agent.lastReadOffset;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(streamId, lastReadOffset);
+ }
+
+ @Override
+ public String toString() {
+ return "ReadAheadAgent{" +
+ "stream=" + streamId +
+ ", bytesPerMillis=" + bytesPerMillis +
+ ", lastReadOffset=" + lastReadOffset +
+ ", lastReadSize=" + lastReadSize +
+ ", readAheadEndOffset=" + readAheadEndOffset +
+ ", evictedOffsetRanges=" + evictedOffsetRanges +
+ '}';
+ }
+}
diff --git a/s3stream/src/main/java/com/automq/stream/s3/cache/ReadAheadManager.java b/s3stream/src/main/java/com/automq/stream/s3/cache/ReadAheadManager.java
new file mode 100644
index 000000000..4dda7f2b8
--- /dev/null
+++ b/s3stream/src/main/java/com/automq/stream/s3/cache/ReadAheadManager.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.automq.stream.s3.cache;
+
+import com.automq.stream.utils.LogContext;
+import org.slf4j.Logger;
+
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Objects;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class ReadAheadManager implements BlockCache.CacheEvictListener {
+ private static final Logger LOGGER = new LogContext("[S3BlockCache] ").logger(ReadAheadManager.class);
+ private static final Integer MAX_READ_AHEAD_AGENT_NUM = 1024;
+ // >
+ private final Map> readAheadAgentMap;
+ private final LRUCache readAheadAgentLRUCache = new LRUCache<>();
+ private final int dataBlockSize;
+ private final BlockCache blockCache;
+
+ public ReadAheadManager(int dataBlockSize, BlockCache blockCache) {
+ this.dataBlockSize = dataBlockSize;
+ this.readAheadAgentMap = new ConcurrentHashMap<>();
+ this.blockCache = blockCache;
+ this.blockCache.registerListener(this);
+ }
+
+ public void updateReadResult(long streamId, long startOffset, long endOffset, int size) {
+ NavigableMap agentMap = readAheadAgentMap.get(streamId);
+ if (agentMap != null) {
+ synchronized (agentMap) {
+ ReadAheadAgent agent = agentMap.get(startOffset);
+ if (agent == null) {
+ return;
+ }
+ readAheadAgentLRUCache.remove(agent);
+ agent.updateReadResult(startOffset, endOffset, size);
+ agentMap.remove(startOffset);
+ agentMap.put(endOffset, agent);
+ readAheadAgentLRUCache.put(agent, null);
+ }
+ }
+ }
+
+ public void updateReadProgress(long streamId, long startOffset) {
+ NavigableMap agentMap = readAheadAgentMap.get(streamId);
+ if (agentMap != null) {
+ synchronized (agentMap) {
+ ReadAheadAgent agent = agentMap.get(startOffset);
+ if (agent == null) {
+ return;
+ }
+ agent.updateReadProgress(startOffset);
+ readAheadAgentLRUCache.touch(agent);
+ }
+ }
+ }
+
+ public ReadAheadAgent getReadAheadAgent(long streamId, long startOffset) {
+ NavigableMap agentMap = readAheadAgentMap.get(streamId);
+ if (agentMap != null) {
+ synchronized (agentMap) {
+ ReadAheadAgent agent = agentMap.get(startOffset);
+ if (agent != null) {
+ readAheadAgentLRUCache.touch(agent);
+ }
+ return agent;
+ }
+ }
+ return null;
+ }
+
+ public ReadAheadAgent getOrCreateReadAheadAgent(long streamId, long startOffset) {
+ NavigableMap agentMap = readAheadAgentMap.computeIfAbsent(streamId, k -> new TreeMap<>());
+ synchronized (agentMap) {
+ while (readAheadAgentLRUCache.size() > MAX_READ_AHEAD_AGENT_NUM) {
+ Map.Entry entry = readAheadAgentLRUCache.pop();
+ if (entry == null) {
+ LOGGER.error("read ahead agent num exceed limit, but no agent can be evicted");
+ return null;
+ }
+ ReadAheadAgent agent = entry.getKey();
+ agentMap.remove(agent.getLastReadOffset());
+ LOGGER.info("evict read ahead agent for stream={}, startOffset={}", agent.getStreamId(), agent.getLastReadOffset());
+ }
+ return agentMap.computeIfAbsent(startOffset, k -> {
+ ReadAheadAgent agent = new ReadAheadAgent(dataBlockSize, streamId, k);
+ readAheadAgentLRUCache.put(agent, null);
+ LOGGER.info("put read ahead agent for stream={}, startOffset={}, total agent num={}", agent.getStreamId(), agent.getLastReadOffset(), readAheadAgentLRUCache.size());
+ return agent;
+ });
+ }
+ }
+
+ Set getReadAheadAgents() {
+ return readAheadAgentLRUCache.cache.keySet();
+ }
+
+ @Override
+ public void onCacheEvict(long streamId, long startOffset, long endOffset, int size) {
+ NavigableMap agentMap = readAheadAgentMap.get(streamId);
+ if (agentMap != null) {
+ synchronized (agentMap) {
+ Long floor = agentMap.floorKey(startOffset);
+ if (floor == null) {
+ floor = agentMap.firstKey();
+ }
+ Long ceil = agentMap.ceilingKey(endOffset);
+ if (ceil == null) {
+ ceil = agentMap.lastKey();
+ }
+ NavigableMap subMap = agentMap.subMap(floor, true, ceil, Objects.equals(ceil, agentMap.lastKey()));
+ for (Map.Entry entry : subMap.entrySet()) {
+ ReadAheadAgent agent = entry.getValue();
+ agent.evict(startOffset, endOffset);
+ }
+ }
+ }
+ }
+}
diff --git a/s3stream/src/main/java/com/automq/stream/s3/cache/StreamReader.java b/s3stream/src/main/java/com/automq/stream/s3/cache/StreamReader.java
new file mode 100644
index 000000000..d62ae3b79
--- /dev/null
+++ b/s3stream/src/main/java/com/automq/stream/s3/cache/StreamReader.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.automq.stream.s3.cache;
+
+import com.automq.stream.s3.ObjectReader;
+import com.automq.stream.s3.metadata.S3ObjectMetadata;
+import com.automq.stream.s3.objects.ObjectManager;
+import com.automq.stream.s3.operator.S3Operator;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+public class StreamReader {
+ private final S3Operator operator;
+ private final ObjectManager objectManager;
+ private final ObjectReaderLRUCache objectReaders;
+
+ public StreamReader(int maxObjectReaderSize, S3Operator operator, ObjectManager objectManager) {
+ this.operator = operator;
+ this.objectManager = objectManager;
+ this.objectReaders = new ObjectReaderLRUCache(maxObjectReaderSize);
+ }
+
+ //TODO: limit concurrent object read number
+ public void read(long streamId, long startOffset, long endOffset, int maxBytes) {
+ CompletableFuture> getObjectsCf = objectManager.getObjects(streamId, startOffset, endOffset, 2);
+
+ }
+
+ private List getDataBlockIndexList(long streamId, long startOffset, long endOffset, int maxBytes) {
+ return null;
+ }
+}
diff --git a/s3stream/src/test/java/com/automq/stream/s3/DefaultS3BlockCacheTest.java b/s3stream/src/test/java/com/automq/stream/s3/DefaultS3BlockCacheTest.java
index 53e210775..eece46d9c 100644
--- a/s3stream/src/test/java/com/automq/stream/s3/DefaultS3BlockCacheTest.java
+++ b/s3stream/src/test/java/com/automq/stream/s3/DefaultS3BlockCacheTest.java
@@ -49,12 +49,15 @@ public class DefaultS3BlockCacheTest {
ObjectManager objectManager;
S3Operator s3Operator;
DefaultS3BlockCache s3BlockCache;
+ Config config;
@BeforeEach
public void setup() {
+ config = new Config();
+ config.blockCacheSize(0);
objectManager = Mockito.mock(ObjectManager.class);
s3Operator = new MemoryS3Operator();
- s3BlockCache = new DefaultS3BlockCache(0, objectManager, s3Operator);
+ s3BlockCache = new DefaultS3BlockCache(config, objectManager, s3Operator);
}
@Test
@@ -96,7 +99,8 @@ public void testRead() throws Exception {
public void testRead_readAhead() throws ExecutionException, InterruptedException {
objectManager = Mockito.mock(ObjectManager.class);
s3Operator = Mockito.spy(new MemoryS3Operator());
- s3BlockCache = new DefaultS3BlockCache(1024 * 1024, objectManager, s3Operator);
+ config.blockCacheSize(1024 * 1024);
+ s3BlockCache = new DefaultS3BlockCache(config, objectManager, s3Operator);
ObjectWriter objectWriter = ObjectWriter.writer(0, s3Operator, 1024, 1024);
objectWriter.write(233, List.of(
diff --git a/s3stream/src/test/java/com/automq/stream/s3/S3StorageTest.java b/s3stream/src/test/java/com/automq/stream/s3/S3StorageTest.java
index 669af9872..07977e555 100644
--- a/s3stream/src/test/java/com/automq/stream/s3/S3StorageTest.java
+++ b/s3stream/src/test/java/com/automq/stream/s3/S3StorageTest.java
@@ -62,14 +62,17 @@ public class S3StorageTest {
StreamManager streamManager;
ObjectManager objectManager;
S3Storage storage;
+ Config config;
@BeforeEach
public void setup() {
+ config = new Config();
+ config.blockCacheSize(0);
objectManager = mock(ObjectManager.class);
streamManager = mock(StreamManager.class);
S3Operator s3Operator = new MemoryS3Operator();
- storage = new S3Storage(new Config(), new MemoryWriteAheadLog(),
- streamManager, objectManager, new DefaultS3BlockCache(0L, objectManager, s3Operator), s3Operator);
+ storage = new S3Storage(config, new MemoryWriteAheadLog(),
+ streamManager, objectManager, new DefaultS3BlockCache(config, objectManager, s3Operator), s3Operator);
}
@Test
diff --git a/s3stream/src/test/java/com/automq/stream/s3/cache/BlockCacheTest.java b/s3stream/src/test/java/com/automq/stream/s3/cache/BlockCacheTest.java
index aa801f5ff..d8fb5314c 100644
--- a/s3stream/src/test/java/com/automq/stream/s3/cache/BlockCacheTest.java
+++ b/s3stream/src/test/java/com/automq/stream/s3/cache/BlockCacheTest.java
@@ -100,7 +100,7 @@ public void testPutGet3() {
assertEquals(26L, records.get(0).getBaseOffset());
assertEquals(30L, records.get(1).getBaseOffset());
assertEquals(1, rst.getReadAheadRecords().size());
- assertEquals(new DefaultS3BlockCache.ReadAheadRecord(40L, 5), rst.getReadAheadRecords().get(0));
+ assertEquals(new DefaultS3BlockCache.ReadAheadRecord(40L), rst.getReadAheadRecords().get(0));
}
@Test
@@ -161,8 +161,7 @@ public void testReadAhead() {
BlockCache.GetCacheResult rst = blockCache.get(233L, 10, 11, Integer.MAX_VALUE);
assertEquals(1, rst.getRecords().size());
assertEquals(10L, rst.getRecords().get(0).getBaseOffset());
- assertEquals(12, rst.getReadAheadRecords().get(0).nextRaOffset());
- assertEquals(1025 * 1024, rst.getReadAheadRecords().get(0).currRaSize());
+ assertEquals(12, rst.getReadAheadRecords().get(0).nextRAOffset());
// repeat read the block, the readahead mark is clear.
rst = blockCache.get(233L, 10, 11, Integer.MAX_VALUE);
diff --git a/s3stream/src/test/java/com/automq/stream/s3/cache/ReadAheadManagerTest.java b/s3stream/src/test/java/com/automq/stream/s3/cache/ReadAheadManagerTest.java
new file mode 100644
index 000000000..878025e01
--- /dev/null
+++ b/s3stream/src/test/java/com/automq/stream/s3/cache/ReadAheadManagerTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.automq.stream.s3.cache;
+
+import com.automq.stream.utils.Threads;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+public class ReadAheadManagerTest {
+
+ @Test
+ public void testUpdateAgent() {
+ BlockCache blockCache = Mockito.mock(BlockCache.class);
+ ReadAheadManager manager = new ReadAheadManager(10, blockCache);
+ manager.updateReadProgress(233L, 0);
+ Assertions.assertNull(manager.getReadAheadAgent(233L, 0));
+ manager.getOrCreateReadAheadAgent(233L, 0);
+ ReadAheadAgent agent = manager.getReadAheadAgent(233L, 0);
+ Assertions.assertNotNull(agent);
+ agent.updateReadAheadResult(1024, 1224);
+ manager.updateReadResult(233L, 0, 512, 612);
+ Assertions.assertEquals(1024, agent.getReadAheadOffset());
+ Assertions.assertEquals(1224, agent.getLastReadAheadSize());
+ Assertions.assertEquals(512, agent.getLastReadOffset());
+ Assertions.assertEquals(612, agent.getLastReadSize());
+ Assertions.assertEquals(0, agent.getBytesPerMillis());
+
+ Threads.sleep(1000);
+ manager.updateReadProgress(233L, 512);
+ Assertions.assertEquals(0.306, agent.getBytesPerMillis(), 0.01);
+ Assertions.assertEquals(122, agent.getNextReadAheadSize(), 5);
+ manager.updateReadResult(233L, 512, 1024, 612);
+ agent.updateReadAheadResult(2048, 1224);
+
+ manager.onCacheEvict(233L, 0, 256, 257);
+ manager.onCacheEvict(233L, 768, 1345, 712);
+ manager.onCacheEvict(233L, 1678, 1789, 299);
+
+ Assertions.assertEquals(70, agent.getNextReadAheadSize(), 1);
+ }
+
+ @Test
+ public void testReadAheadAgents() {
+ BlockCache blockCache = Mockito.mock(BlockCache.class);
+ ReadAheadManager manager = new ReadAheadManager(10, blockCache);
+ manager.updateReadProgress(233L, 0);
+ manager.getOrCreateReadAheadAgent(233L, 0);
+ manager.updateReadResult(233L, 0, 10, 10);
+
+ manager.updateReadProgress(233L, 10);
+ manager.getOrCreateReadAheadAgent(233L, 10);
+ manager.updateReadResult(233L, 10, 20, 10);
+
+ manager.updateReadProgress(233L, 20);
+ manager.getOrCreateReadAheadAgent(233L, 20);
+ manager.updateReadResult(233L, 20, 50, 30);
+
+ Assertions.assertEquals(1, manager.getReadAheadAgents().size());
+ }
+}
diff --git a/store/src/main/java/com/automq/rocketmq/store/S3StreamStore.java b/store/src/main/java/com/automq/rocketmq/store/S3StreamStore.java
index 8c0daec55..f7525146f 100644
--- a/store/src/main/java/com/automq/rocketmq/store/S3StreamStore.java
+++ b/store/src/main/java/com/automq/rocketmq/store/S3StreamStore.java
@@ -92,7 +92,7 @@ public S3StreamStore(StoreConfig storeConfig, S3StreamConfig streamConfig, Store
streamConfig.s3ForcePathStyle(), streamConfig.s3AccessKey(), streamConfig.s3SecretKey(), networkInboundLimiter, networkOutboundLimiter, true);
WriteAheadLog writeAheadLog = BlockWALService.builder(s3Config.walPath(), s3Config.walCapacity()).config(s3Config).build();
- S3BlockCache blockCache = new DefaultS3BlockCache(s3Config.blockCacheSize(), objectManager, defaultOperator);
+ S3BlockCache blockCache = new DefaultS3BlockCache(s3Config, objectManager, defaultOperator);
// Build the s3 storage
this.storage = new S3Storage(s3Config, writeAheadLog, streamManager, objectManager, blockCache, defaultOperator);