diff --git a/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java b/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java index c1b5aaeef..3dab89b0b 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java +++ b/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java @@ -64,9 +64,7 @@ public class S3Storage implements Storage { private final long maxDeltaWALCacheSize; private final Config config; private final WriteAheadLog deltaWAL; - /** - * WAL log cache. Single thread mainReadExecutor will ensure the memory safety. - */ + /** WAL log cache */ private final LogCache deltaWALCache; /** * WAL out of order callback sequencer. Single thread mainWriteExecutor will ensure the memory safety. @@ -78,8 +76,6 @@ public class S3Storage implements Storage { private final ExecutorService mainWriteExecutor = Threads.newFixedThreadPoolWithMonitor(1, "s3-storage-main-write", false, LOGGER); - private final ExecutorService mainReadExecutor = Threads.newFixedThreadPoolWithMonitor(1, - "s3-storage-main-read", false, LOGGER); private final ScheduledExecutorService backgroundExecutor = Threads.newSingleThreadScheduledExecutor( ThreadUtils.createThreadFactory("s3-storage-background", true), LOGGER); private final ExecutorService uploadWALExecutor = Threads.newFixedThreadPoolWithMonitor( @@ -225,7 +221,6 @@ public void shutdown() { } deltaWAL.shutdownGracefully(); backgroundExecutor.shutdown(); - mainReadExecutor.shutdown(); mainWriteExecutor.shutdown(); } @@ -317,7 +312,7 @@ private void tryDrainBackoffRecords() { public CompletableFuture read(long streamId, long startOffset, long endOffset, int maxBytes) { TimerUtil timerUtil = new TimerUtil(); CompletableFuture cf = new CompletableFuture<>(); - mainReadExecutor.execute(() -> FutureUtil.propagate(read0(streamId, startOffset, endOffset, maxBytes), cf)); + FutureUtil.propagate(read0(streamId, startOffset, endOffset, maxBytes), cf); cf.whenComplete((nil, ex) -> OperationMetricsStats.getHistogram(S3Operation.READ_STORAGE).update(timerUtil.elapsedAs(TimeUnit.NANOSECONDS))); return cf; } @@ -346,7 +341,7 @@ private CompletableFuture read0(long streamId, long startOffset, } continuousCheck(rst); return new ReadDataBlock(rst, readDataBlock.getCacheAccessType()); - }, mainReadExecutor).whenComplete((rst, ex) -> { + }).whenComplete((rst, ex) -> { if (ex != null) { logCacheRecords.forEach(StreamRecordBatch::release); } @@ -380,7 +375,7 @@ public synchronized CompletableFuture forceUpload(long streamId) { } FutureUtil.propagate(CompletableFuture.allOf(this.inflightWALUploadTasks.toArray(new CompletableFuture[0])), cf); mainWriteExecutor.execute(() -> callbackSequencer.tryFree(streamId)); - }, mainReadExecutor); + }); return cf; } @@ -396,16 +391,14 @@ private void handleAppendCallback0(WalWriteRequest request) { List waitingAckRequests = callbackSequencer.after(request); long walConfirmOffset = callbackSequencer.getWALConfirmOffset(); waitingAckRequests.forEach(r -> r.record.retain()); - mainReadExecutor.execute(() -> { - for (WalWriteRequest waitingAckRequest : waitingAckRequests) { - if (deltaWALCache.put(waitingAckRequest.record)) { - // cache block is full, trigger WAL upload. - deltaWALCache.setConfirmOffset(walConfirmOffset); - LogCache.LogCacheBlock logCacheBlock = deltaWALCache.archiveCurrentBlock(); - uploadDeltaWAL(logCacheBlock); - } + for (WalWriteRequest waitingAckRequest : waitingAckRequests) { + if (deltaWALCache.put(waitingAckRequest.record)) { + // cache block is full, trigger WAL upload. + deltaWALCache.setConfirmOffset(walConfirmOffset); + LogCache.LogCacheBlock logCacheBlock = deltaWALCache.archiveCurrentBlock(); + uploadDeltaWAL(logCacheBlock); } - }); + } for (WalWriteRequest waitingAckRequest : waitingAckRequests) { waitingAckRequest.cf.complete(null); } @@ -495,7 +488,7 @@ private void commitDeltaWALUpload(DeltaWALUploadTaskContext context) { } private void freeCache(LogCache.LogCacheBlock cacheBlock) { - mainReadExecutor.execute(() -> deltaWALCache.markFree(cacheBlock)); + deltaWALCache.markFree(cacheBlock); } /** @@ -581,7 +574,7 @@ class LogCacheEvictOOMHandler implements DirectByteBufAlloc.OOMHandler { public int handle(int memoryRequired) { try { CompletableFuture cf = new CompletableFuture<>(); - mainReadExecutor.submit(() -> FutureUtil.exec(() -> cf.complete(deltaWALCache.forceFree(memoryRequired)), cf, LOGGER, "handleOOM")); + FutureUtil.exec(() -> cf.complete(deltaWALCache.forceFree(memoryRequired)), cf, LOGGER, "handleOOM"); return cf.get(); } catch (Throwable e) { return 0; diff --git a/s3stream/src/main/java/com/automq/stream/s3/cache/LogCache.java b/s3stream/src/main/java/com/automq/stream/s3/cache/LogCache.java index ed15d746f..45570f2d6 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/cache/LogCache.java +++ b/s3stream/src/main/java/com/automq/stream/s3/cache/LogCache.java @@ -25,14 +25,15 @@ import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Consumer; import static com.automq.stream.s3.cache.LogCache.StreamRange.NOOP_OFFSET; @@ -51,6 +52,11 @@ public class LogCache { private final AtomicLong size = new AtomicLong(); private final Consumer blockFreeListener; + // read write lock which guards the LogCache.blocks + private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + private final ReentrantReadWriteLock.ReadLock readLock = lock.readLock(); + private final ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock(); + public LogCache(long capacity, long cacheBlockMaxSize, int maxCacheBlockStreamCount, Consumer blockFreeListener) { this.capacity = capacity; this.cacheBlockMaxSize = cacheBlockMaxSize; @@ -103,8 +109,15 @@ public boolean put(StreamRecordBatch recordBatch) { */ public List get(long streamId, long startOffset, long endOffset, int maxBytes) { TimerUtil timerUtil = new TimerUtil(); - List records = get0(streamId, startOffset, endOffset, maxBytes); - records.forEach(StreamRecordBatch::retain); + List records; + readLock.lock(); + try { + records = get0(streamId, startOffset, endOffset, maxBytes); + records.forEach(StreamRecordBatch::retain); + } finally { + readLock.unlock(); + } + if (!records.isEmpty() && records.get(0).getBaseOffset() <= startOffset) { OperationMetricsStats.getCounter(S3Operation.READ_STORAGE_LOG_CACHE).inc(); } else { @@ -119,6 +132,7 @@ public List get0(long streamId, long startOffset, long endOff long nextStartOffset = startOffset; int nextMaxBytes = maxBytes; boolean fulfill = false; + List blocks = this.blocks; for (LogCacheBlock archiveBlock : blocks) { List records = archiveBlock.get(streamId, nextStartOffset, endOffset, nextMaxBytes); if (records.isEmpty()) { @@ -158,11 +172,16 @@ public List get0(long streamId, long startOffset, long endOff } public LogCacheBlock archiveCurrentBlock() { - LogCacheBlock block = activeBlock; - block.confirmOffset = confirmOffset; - activeBlock = new LogCacheBlock(cacheBlockMaxSize, maxCacheBlockStreamCount); - blocks.add(activeBlock); - return block; + writeLock.lock(); + try { + LogCacheBlock block = activeBlock; + block.confirmOffset = confirmOffset; + activeBlock = new LogCacheBlock(cacheBlockMaxSize, maxCacheBlockStreamCount); + blocks.add(activeBlock); + return block; + } finally { + writeLock.unlock(); + } } public Optional archiveCurrentBlockIfContains(long streamId) { @@ -191,30 +210,48 @@ private void tryRealFree() { if (size.get() <= capacity * 0.9) { return; } - blocks.removeIf(b -> { - if (size.get() <= capacity * 0.9) { - return false; - } - if (b.free) { - size.addAndGet(-b.size); - blockFreeListener.accept(b); - b.free(); - } - return b.free; + List removed = new ArrayList<>(); + writeLock.lock(); + try { + blocks.removeIf(b -> { + if (size.get() <= capacity * 0.9) { + return false; + } + if (b.free) { + size.addAndGet(-b.size); + removed.add(b); + } + return b.free; + }); + } finally { + writeLock.unlock(); + } + removed.forEach(b -> { + blockFreeListener.accept(b); + b.free(); }); } public int forceFree(int required) { AtomicInteger freedBytes = new AtomicInteger(); - blocks.removeIf(block -> { - if (!block.free || freedBytes.get() >= required) { - return false; - } - size.addAndGet(-block.size); - freedBytes.addAndGet((int) block.size); - blockFreeListener.accept(block); - block.free(); - return true; + List removed = new ArrayList<>(); + writeLock.lock(); + try { + blocks.removeIf(block -> { + if (!block.free || freedBytes.get() >= required) { + return false; + } + size.addAndGet(-block.size); + freedBytes.addAndGet((int) block.size); + removed.add(block); + return true; + }); + } finally { + writeLock.unlock(); + } + removed.forEach(b -> { + blockFreeListener.accept(b); + b.free(); }); return freedBytes.get(); } @@ -232,10 +269,10 @@ public static class LogCacheBlock { private final long blockId; private final long maxSize; private final int maxStreamCount; - private final Map> map = new HashMap<>(); + private final Map> map = new ConcurrentHashMap<>(); private long size = 0; private long confirmOffset; - boolean free; + volatile boolean free; public LogCacheBlock(long maxSize, int maxStreamCount) { this.blockId = BLOCK_ID_ALLOC.getAndIncrement(); @@ -253,8 +290,15 @@ public long blockId() { } public boolean put(StreamRecordBatch recordBatch) { - List streamCache = map.computeIfAbsent(recordBatch.getStreamId(), id -> new ArrayList<>()); - streamCache.add(recordBatch); + map.compute(recordBatch.getStreamId(), (id, records) -> { + if (records == null) { + records = new ArrayList<>(); + } + synchronized (records) { + records.add(recordBatch); + } + return records; + }); int recordSize = recordBatch.size(); size += recordSize; return size >= maxSize || map.size() >= maxStreamCount; @@ -265,34 +309,42 @@ public List get(long streamId, long startOffset, long endOffs if (streamRecords == null) { return Collections.emptyList(); } - if (streamRecords.get(0).getBaseOffset() > startOffset || streamRecords.get(streamRecords.size() - 1).getLastOffset() <= startOffset) { - return Collections.emptyList(); - } - StreamRecordBatchList records = new StreamRecordBatchList(streamRecords); - int startIndex = records.search(startOffset); - if (startIndex == -1) { - // mismatched - return Collections.emptyList(); - } - int endIndex = -1; - int remainingBytesSize = maxBytes; - for (int i = startIndex; i < streamRecords.size(); i++) { - StreamRecordBatch record = streamRecords.get(i); - endIndex = i + 1; - remainingBytesSize -= Math.min(remainingBytesSize, record.size()); - if (record.getLastOffset() >= endOffset || remainingBytesSize == 0) { - break; + + synchronized (streamRecords) { + if (streamRecords.get(0).getBaseOffset() > startOffset || streamRecords.get(streamRecords.size() - 1).getLastOffset() <= startOffset) { + return Collections.emptyList(); + } + StreamRecordBatchList records = new StreamRecordBatchList(streamRecords); + int startIndex = records.search(startOffset); + if (startIndex == -1) { + // mismatched + return Collections.emptyList(); } + int endIndex = -1; + int remainingBytesSize = maxBytes; + for (int i = startIndex; i < streamRecords.size(); i++) { + StreamRecordBatch record = streamRecords.get(i); + endIndex = i + 1; + remainingBytesSize -= Math.min(remainingBytesSize, record.size()); + if (record.getLastOffset() >= endOffset || remainingBytesSize == 0) { + break; + } + } + return new ArrayList<>(streamRecords.subList(startIndex, endIndex)); } - return streamRecords.subList(startIndex, endIndex); } StreamRange getStreamRange(long streamId) { List streamRecords = map.get(streamId); - if (streamRecords == null || streamRecords.isEmpty()) { + if (streamRecords == null) { return new StreamRange(NOOP_OFFSET, NOOP_OFFSET); } else { - return new StreamRange(streamRecords.get(0).getBaseOffset(), streamRecords.get(streamRecords.size() - 1).getLastOffset()); + synchronized (streamRecords) { + if (streamRecords.isEmpty()) { + return new StreamRange(NOOP_OFFSET, NOOP_OFFSET); + } + return new StreamRange(streamRecords.get(0).getBaseOffset(), streamRecords.get(streamRecords.size() - 1).getLastOffset()); + } } } diff --git a/s3stream/src/main/java/com/automq/stream/utils/biniarysearch/StreamRecordBatchList.java b/s3stream/src/main/java/com/automq/stream/utils/biniarysearch/StreamRecordBatchList.java index 76e7ae416..e2893afd4 100644 --- a/s3stream/src/main/java/com/automq/stream/utils/biniarysearch/StreamRecordBatchList.java +++ b/s3stream/src/main/java/com/automq/stream/utils/biniarysearch/StreamRecordBatchList.java @@ -24,14 +24,16 @@ public class StreamRecordBatchList extends AbstractOrderedCollection { private final List records; + private final int size; public StreamRecordBatchList(List records) { this.records = records.stream().map(ComparableStreamRecordBatch::new).toList(); + this.size = records.size(); } @Override int size() { - return records.size(); + return size; } @Override