Skip to content

Commit

Permalink
feat(issues690): concurrent read log cache (#701)
Browse files Browse the repository at this point in the history
Signed-off-by: Robin Han <[email protected]>
  • Loading branch information
superhx authored Nov 22, 2023
1 parent 4365682 commit 81ea9e1
Show file tree
Hide file tree
Showing 3 changed files with 119 additions and 72 deletions.
33 changes: 13 additions & 20 deletions s3stream/src/main/java/com/automq/stream/s3/S3Storage.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,7 @@ public class S3Storage implements Storage {
private final long maxDeltaWALCacheSize;
private final Config config;
private final WriteAheadLog deltaWAL;
/**
* WAL log cache. Single thread mainReadExecutor will ensure the memory safety.
*/
/** WAL log cache */
private final LogCache deltaWALCache;
/**
* WAL out of order callback sequencer. Single thread mainWriteExecutor will ensure the memory safety.
Expand All @@ -78,8 +76,6 @@ public class S3Storage implements Storage {

private final ExecutorService mainWriteExecutor = Threads.newFixedThreadPoolWithMonitor(1,
"s3-storage-main-write", false, LOGGER);
private final ExecutorService mainReadExecutor = Threads.newFixedThreadPoolWithMonitor(1,
"s3-storage-main-read", false, LOGGER);
private final ScheduledExecutorService backgroundExecutor = Threads.newSingleThreadScheduledExecutor(
ThreadUtils.createThreadFactory("s3-storage-background", true), LOGGER);
private final ExecutorService uploadWALExecutor = Threads.newFixedThreadPoolWithMonitor(
Expand Down Expand Up @@ -225,7 +221,6 @@ public void shutdown() {
}
deltaWAL.shutdownGracefully();
backgroundExecutor.shutdown();
mainReadExecutor.shutdown();
mainWriteExecutor.shutdown();
}

Expand Down Expand Up @@ -317,7 +312,7 @@ private void tryDrainBackoffRecords() {
public CompletableFuture<ReadDataBlock> read(long streamId, long startOffset, long endOffset, int maxBytes) {
TimerUtil timerUtil = new TimerUtil();
CompletableFuture<ReadDataBlock> cf = new CompletableFuture<>();
mainReadExecutor.execute(() -> FutureUtil.propagate(read0(streamId, startOffset, endOffset, maxBytes), cf));
FutureUtil.propagate(read0(streamId, startOffset, endOffset, maxBytes), cf);
cf.whenComplete((nil, ex) -> OperationMetricsStats.getHistogram(S3Operation.READ_STORAGE).update(timerUtil.elapsedAs(TimeUnit.NANOSECONDS)));
return cf;
}
Expand Down Expand Up @@ -346,7 +341,7 @@ private CompletableFuture<ReadDataBlock> read0(long streamId, long startOffset,
}
continuousCheck(rst);
return new ReadDataBlock(rst, readDataBlock.getCacheAccessType());
}, mainReadExecutor).whenComplete((rst, ex) -> {
}).whenComplete((rst, ex) -> {
if (ex != null) {
logCacheRecords.forEach(StreamRecordBatch::release);
}
Expand Down Expand Up @@ -380,7 +375,7 @@ public synchronized CompletableFuture<Void> forceUpload(long streamId) {
}
FutureUtil.propagate(CompletableFuture.allOf(this.inflightWALUploadTasks.toArray(new CompletableFuture[0])), cf);
mainWriteExecutor.execute(() -> callbackSequencer.tryFree(streamId));
}, mainReadExecutor);
});
return cf;
}

Expand All @@ -396,16 +391,14 @@ private void handleAppendCallback0(WalWriteRequest request) {
List<WalWriteRequest> waitingAckRequests = callbackSequencer.after(request);
long walConfirmOffset = callbackSequencer.getWALConfirmOffset();
waitingAckRequests.forEach(r -> r.record.retain());
mainReadExecutor.execute(() -> {
for (WalWriteRequest waitingAckRequest : waitingAckRequests) {
if (deltaWALCache.put(waitingAckRequest.record)) {
// cache block is full, trigger WAL upload.
deltaWALCache.setConfirmOffset(walConfirmOffset);
LogCache.LogCacheBlock logCacheBlock = deltaWALCache.archiveCurrentBlock();
uploadDeltaWAL(logCacheBlock);
}
for (WalWriteRequest waitingAckRequest : waitingAckRequests) {
if (deltaWALCache.put(waitingAckRequest.record)) {
// cache block is full, trigger WAL upload.
deltaWALCache.setConfirmOffset(walConfirmOffset);
LogCache.LogCacheBlock logCacheBlock = deltaWALCache.archiveCurrentBlock();
uploadDeltaWAL(logCacheBlock);
}
});
}
for (WalWriteRequest waitingAckRequest : waitingAckRequests) {
waitingAckRequest.cf.complete(null);
}
Expand Down Expand Up @@ -495,7 +488,7 @@ private void commitDeltaWALUpload(DeltaWALUploadTaskContext context) {
}

private void freeCache(LogCache.LogCacheBlock cacheBlock) {
mainReadExecutor.execute(() -> deltaWALCache.markFree(cacheBlock));
deltaWALCache.markFree(cacheBlock);
}

/**
Expand Down Expand Up @@ -581,7 +574,7 @@ class LogCacheEvictOOMHandler implements DirectByteBufAlloc.OOMHandler {
public int handle(int memoryRequired) {
try {
CompletableFuture<Integer> cf = new CompletableFuture<>();
mainReadExecutor.submit(() -> FutureUtil.exec(() -> cf.complete(deltaWALCache.forceFree(memoryRequired)), cf, LOGGER, "handleOOM"));
FutureUtil.exec(() -> cf.complete(deltaWALCache.forceFree(memoryRequired)), cf, LOGGER, "handleOOM");
return cf.get();
} catch (Throwable e) {
return 0;
Expand Down
154 changes: 103 additions & 51 deletions s3stream/src/main/java/com/automq/stream/s3/cache/LogCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,15 @@

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;

import static com.automq.stream.s3.cache.LogCache.StreamRange.NOOP_OFFSET;
Expand All @@ -51,6 +52,11 @@ public class LogCache {
private final AtomicLong size = new AtomicLong();
private final Consumer<LogCacheBlock> blockFreeListener;

// read write lock which guards the <code>LogCache.blocks</code>
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private final ReentrantReadWriteLock.ReadLock readLock = lock.readLock();
private final ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();

public LogCache(long capacity, long cacheBlockMaxSize, int maxCacheBlockStreamCount, Consumer<LogCacheBlock> blockFreeListener) {
this.capacity = capacity;
this.cacheBlockMaxSize = cacheBlockMaxSize;
Expand Down Expand Up @@ -103,8 +109,15 @@ public boolean put(StreamRecordBatch recordBatch) {
*/
public List<StreamRecordBatch> get(long streamId, long startOffset, long endOffset, int maxBytes) {
TimerUtil timerUtil = new TimerUtil();
List<StreamRecordBatch> records = get0(streamId, startOffset, endOffset, maxBytes);
records.forEach(StreamRecordBatch::retain);
List<StreamRecordBatch> records;
readLock.lock();
try {
records = get0(streamId, startOffset, endOffset, maxBytes);
records.forEach(StreamRecordBatch::retain);
} finally {
readLock.unlock();
}

if (!records.isEmpty() && records.get(0).getBaseOffset() <= startOffset) {
OperationMetricsStats.getCounter(S3Operation.READ_STORAGE_LOG_CACHE).inc();
} else {
Expand All @@ -119,6 +132,7 @@ public List<StreamRecordBatch> get0(long streamId, long startOffset, long endOff
long nextStartOffset = startOffset;
int nextMaxBytes = maxBytes;
boolean fulfill = false;
List<LogCacheBlock> blocks = this.blocks;
for (LogCacheBlock archiveBlock : blocks) {
List<StreamRecordBatch> records = archiveBlock.get(streamId, nextStartOffset, endOffset, nextMaxBytes);
if (records.isEmpty()) {
Expand Down Expand Up @@ -158,11 +172,16 @@ public List<StreamRecordBatch> get0(long streamId, long startOffset, long endOff
}

public LogCacheBlock archiveCurrentBlock() {
LogCacheBlock block = activeBlock;
block.confirmOffset = confirmOffset;
activeBlock = new LogCacheBlock(cacheBlockMaxSize, maxCacheBlockStreamCount);
blocks.add(activeBlock);
return block;
writeLock.lock();
try {
LogCacheBlock block = activeBlock;
block.confirmOffset = confirmOffset;
activeBlock = new LogCacheBlock(cacheBlockMaxSize, maxCacheBlockStreamCount);
blocks.add(activeBlock);
return block;
} finally {
writeLock.unlock();
}
}

public Optional<LogCacheBlock> archiveCurrentBlockIfContains(long streamId) {
Expand Down Expand Up @@ -191,30 +210,48 @@ private void tryRealFree() {
if (size.get() <= capacity * 0.9) {
return;
}
blocks.removeIf(b -> {
if (size.get() <= capacity * 0.9) {
return false;
}
if (b.free) {
size.addAndGet(-b.size);
blockFreeListener.accept(b);
b.free();
}
return b.free;
List<LogCacheBlock> removed = new ArrayList<>();
writeLock.lock();
try {
blocks.removeIf(b -> {
if (size.get() <= capacity * 0.9) {
return false;
}
if (b.free) {
size.addAndGet(-b.size);
removed.add(b);
}
return b.free;
});
} finally {
writeLock.unlock();
}
removed.forEach(b -> {
blockFreeListener.accept(b);
b.free();
});
}

public int forceFree(int required) {
AtomicInteger freedBytes = new AtomicInteger();
blocks.removeIf(block -> {
if (!block.free || freedBytes.get() >= required) {
return false;
}
size.addAndGet(-block.size);
freedBytes.addAndGet((int) block.size);
blockFreeListener.accept(block);
block.free();
return true;
List<LogCacheBlock> removed = new ArrayList<>();
writeLock.lock();
try {
blocks.removeIf(block -> {
if (!block.free || freedBytes.get() >= required) {
return false;
}
size.addAndGet(-block.size);
freedBytes.addAndGet((int) block.size);
removed.add(block);
return true;
});
} finally {
writeLock.unlock();
}
removed.forEach(b -> {
blockFreeListener.accept(b);
b.free();
});
return freedBytes.get();
}
Expand All @@ -232,10 +269,10 @@ public static class LogCacheBlock {
private final long blockId;
private final long maxSize;
private final int maxStreamCount;
private final Map<Long, List<StreamRecordBatch>> map = new HashMap<>();
private final Map<Long, List<StreamRecordBatch>> map = new ConcurrentHashMap<>();
private long size = 0;
private long confirmOffset;
boolean free;
volatile boolean free;

public LogCacheBlock(long maxSize, int maxStreamCount) {
this.blockId = BLOCK_ID_ALLOC.getAndIncrement();
Expand All @@ -253,8 +290,15 @@ public long blockId() {
}

public boolean put(StreamRecordBatch recordBatch) {
List<StreamRecordBatch> streamCache = map.computeIfAbsent(recordBatch.getStreamId(), id -> new ArrayList<>());
streamCache.add(recordBatch);
map.compute(recordBatch.getStreamId(), (id, records) -> {
if (records == null) {
records = new ArrayList<>();
}
synchronized (records) {
records.add(recordBatch);
}
return records;
});
int recordSize = recordBatch.size();
size += recordSize;
return size >= maxSize || map.size() >= maxStreamCount;
Expand All @@ -265,34 +309,42 @@ public List<StreamRecordBatch> get(long streamId, long startOffset, long endOffs
if (streamRecords == null) {
return Collections.emptyList();
}
if (streamRecords.get(0).getBaseOffset() > startOffset || streamRecords.get(streamRecords.size() - 1).getLastOffset() <= startOffset) {
return Collections.emptyList();
}
StreamRecordBatchList records = new StreamRecordBatchList(streamRecords);
int startIndex = records.search(startOffset);
if (startIndex == -1) {
// mismatched
return Collections.emptyList();
}
int endIndex = -1;
int remainingBytesSize = maxBytes;
for (int i = startIndex; i < streamRecords.size(); i++) {
StreamRecordBatch record = streamRecords.get(i);
endIndex = i + 1;
remainingBytesSize -= Math.min(remainingBytesSize, record.size());
if (record.getLastOffset() >= endOffset || remainingBytesSize == 0) {
break;

synchronized (streamRecords) {
if (streamRecords.get(0).getBaseOffset() > startOffset || streamRecords.get(streamRecords.size() - 1).getLastOffset() <= startOffset) {
return Collections.emptyList();
}
StreamRecordBatchList records = new StreamRecordBatchList(streamRecords);
int startIndex = records.search(startOffset);
if (startIndex == -1) {
// mismatched
return Collections.emptyList();
}
int endIndex = -1;
int remainingBytesSize = maxBytes;
for (int i = startIndex; i < streamRecords.size(); i++) {
StreamRecordBatch record = streamRecords.get(i);
endIndex = i + 1;
remainingBytesSize -= Math.min(remainingBytesSize, record.size());
if (record.getLastOffset() >= endOffset || remainingBytesSize == 0) {
break;
}
}
return new ArrayList<>(streamRecords.subList(startIndex, endIndex));
}
return streamRecords.subList(startIndex, endIndex);
}

StreamRange getStreamRange(long streamId) {
List<StreamRecordBatch> streamRecords = map.get(streamId);
if (streamRecords == null || streamRecords.isEmpty()) {
if (streamRecords == null) {
return new StreamRange(NOOP_OFFSET, NOOP_OFFSET);
} else {
return new StreamRange(streamRecords.get(0).getBaseOffset(), streamRecords.get(streamRecords.size() - 1).getLastOffset());
synchronized (streamRecords) {
if (streamRecords.isEmpty()) {
return new StreamRange(NOOP_OFFSET, NOOP_OFFSET);
}
return new StreamRange(streamRecords.get(0).getBaseOffset(), streamRecords.get(streamRecords.size() - 1).getLastOffset());
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,16 @@
public class StreamRecordBatchList extends AbstractOrderedCollection<Long> {

private final List<ComparableStreamRecordBatch> records;
private final int size;

public StreamRecordBatchList(List<StreamRecordBatch> records) {
this.records = records.stream().map(ComparableStreamRecordBatch::new).toList();
this.size = records.size();
}

@Override
int size() {
return records.size();
return size;
}

@Override
Expand Down

0 comments on commit 81ea9e1

Please sign in to comment.