From fdf1f81d2c180676652da702f855bbd2a4eb1bd5 Mon Sep 17 00:00:00 2001 From: Robin Han Date: Sat, 25 Nov 2023 23:13:43 +0800 Subject: [PATCH 1/2] feat(s3stream_issues690): optimize log cache get performance by adding offset index Signed-off-by: Robin Han --- .../com/automq/stream/s3/cache/LogCache.java | 162 +++++++++++++----- .../biniarysearch/StreamRecordBatchList.java | 2 +- .../automq/stream/s3/cache/LogCacheTest.java | 20 +++ 3 files changed, 139 insertions(+), 45 deletions(-) diff --git a/s3stream/src/main/java/com/automq/stream/s3/cache/LogCache.java b/s3stream/src/main/java/com/automq/stream/s3/cache/LogCache.java index 347861642..6f277b6e8 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/cache/LogCache.java +++ b/s3stream/src/main/java/com/automq/stream/s3/cache/LogCache.java @@ -25,6 +25,7 @@ import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -35,6 +36,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Consumer; +import java.util.stream.Collectors; import static com.automq.stream.s3.cache.LogCache.StreamRange.NOOP_OFFSET; @@ -46,7 +48,7 @@ public class LogCache { private final long capacity; private final long cacheBlockMaxSize; private final int maxCacheBlockStreamCount; - private final List blocks = new ArrayList<>(); + final List blocks = new ArrayList<>(); private LogCacheBlock activeBlock; private long confirmOffset; private final AtomicLong size = new AtomicLong(); @@ -284,7 +286,7 @@ public static class LogCacheBlock { private final long blockId; private final long maxSize; private final int maxStreamCount; - private final Map> map = new ConcurrentHashMap<>(); + final Map map = new ConcurrentHashMap<>(); private final AtomicLong size = new AtomicLong(); private long confirmOffset; volatile boolean free; @@ -305,65 +307,38 @@ public long blockId() { } public boolean put(StreamRecordBatch recordBatch) { - map.compute(recordBatch.getStreamId(), (id, records) -> { - if (records == null) { - records = new ArrayList<>(); + map.compute(recordBatch.getStreamId(), (id, cache) -> { + if (cache == null) { + cache = new StreamCache(); } - synchronized (records) { - records.add(recordBatch); - } - return records; + cache.add(recordBatch); + return cache; }); int recordSize = recordBatch.size(); return size.addAndGet(recordSize) >= maxSize || map.size() >= maxStreamCount; } public List get(long streamId, long startOffset, long endOffset, int maxBytes) { - List streamRecords = map.get(streamId); - if (streamRecords == null) { + StreamCache cache = map.get(streamId); + if (cache == null) { return Collections.emptyList(); } - - synchronized (streamRecords) { - if (streamRecords.get(0).getBaseOffset() > startOffset || streamRecords.get(streamRecords.size() - 1).getLastOffset() <= startOffset) { - return Collections.emptyList(); - } - StreamRecordBatchList records = new StreamRecordBatchList(streamRecords); - int startIndex = records.search(startOffset); - if (startIndex == -1) { - // mismatched - return Collections.emptyList(); - } - int endIndex = -1; - int remainingBytesSize = maxBytes; - for (int i = startIndex; i < streamRecords.size(); i++) { - StreamRecordBatch record = streamRecords.get(i); - endIndex = i + 1; - remainingBytesSize -= Math.min(remainingBytesSize, record.size()); - if (record.getLastOffset() >= endOffset || remainingBytesSize == 0) { - break; - } - } - return new ArrayList<>(streamRecords.subList(startIndex, endIndex)); - } + return cache.get(startOffset, endOffset, maxBytes); } StreamRange getStreamRange(long streamId) { - List streamRecords = map.get(streamId); - if (streamRecords == null) { + StreamCache streamCache = map.get(streamId); + if (streamCache == null) { return new StreamRange(NOOP_OFFSET, NOOP_OFFSET); } else { - synchronized (streamRecords) { - if (streamRecords.isEmpty()) { - return new StreamRange(NOOP_OFFSET, NOOP_OFFSET); - } - return new StreamRange(streamRecords.get(0).getBaseOffset(), streamRecords.get(streamRecords.size() - 1).getLastOffset()); - } + return streamCache.range(); } } public Map> records() { - return map; + return map.entrySet().stream() + .map(e -> Map.entry(e.getKey(), e.getValue().records)) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); } public long confirmOffset() { @@ -379,7 +354,7 @@ public long size() { } public void free() { - map.forEach((streamId, records) -> records.forEach(StreamRecordBatch::release)); + map.forEach((streamId, records) -> records.free()); map.clear(); } } @@ -394,4 +369,103 @@ public StreamRange(long startOffset, long endOffset) { this.endOffset = endOffset; } } + + static class StreamCache { + List records = new ArrayList<>(); + long startOffset = NOOP_OFFSET; + long endOffset = NOOP_OFFSET; + Map offsetIndexMap = new HashMap<>(); + + synchronized void add(StreamRecordBatch recordBatch) { + records.add(recordBatch); + if (startOffset == NOOP_OFFSET) { + startOffset = recordBatch.getBaseOffset(); + } + endOffset = recordBatch.getLastOffset(); + } + + synchronized List get(long startOffset, long endOffset, int maxBytes) { + if (this.startOffset > startOffset || this.endOffset <= startOffset) { + return Collections.emptyList(); + } + int startIndex = searchStartIndex(startOffset); + if (startIndex == -1) { + // mismatched + return Collections.emptyList(); + } + int endIndex = -1; + int remainingBytesSize = maxBytes; + long rstEndOffset = NOOP_OFFSET; + for (int i = startIndex; i < records.size(); i++) { + StreamRecordBatch record = records.get(i); + endIndex = i + 1; + remainingBytesSize -= Math.min(remainingBytesSize, record.size()); + rstEndOffset = record.getLastOffset(); + if (record.getLastOffset() >= endOffset || remainingBytesSize == 0) { + break; + } + } + if (rstEndOffset != NOOP_OFFSET) { + map(rstEndOffset, endIndex); + } + return new ArrayList<>(records.subList(startIndex, endIndex)); + } + + int searchStartIndex(long startOffset) { + IndexAndCount indexAndCount = offsetIndexMap.get(startOffset); + if (indexAndCount != null) { + unmap(startOffset, indexAndCount); + return indexAndCount.index; + } else { + // slow path + StreamRecordBatchList search = new StreamRecordBatchList(records); + return search.search(startOffset); + } + } + + final void map(long offset, int index) { + offsetIndexMap.compute(offset, (k, v) -> { + if (v == null) { + return new IndexAndCount(index); + } else { + v.inc(); + return v; + } + }); + } + + final void unmap(long startOffset, IndexAndCount indexAndCount) { + if (indexAndCount.dec() == 0) { + offsetIndexMap.remove(startOffset); + } + } + + synchronized StreamRange range() { + return new StreamRange(startOffset, endOffset); + } + + synchronized void free() { + records.forEach(StreamRecordBatch::release); + records.clear(); + } + } + + static class IndexAndCount { + int index; + int count; + + public IndexAndCount(int index) { + this.index = index; + this.count = 1; + } + + public void inc() { + count++; + } + + public int dec() { + return --count; + } + + } } diff --git a/s3stream/src/main/java/com/automq/stream/utils/biniarysearch/StreamRecordBatchList.java b/s3stream/src/main/java/com/automq/stream/utils/biniarysearch/StreamRecordBatchList.java index e2893afd4..b70e7d787 100644 --- a/s3stream/src/main/java/com/automq/stream/utils/biniarysearch/StreamRecordBatchList.java +++ b/s3stream/src/main/java/com/automq/stream/utils/biniarysearch/StreamRecordBatchList.java @@ -32,7 +32,7 @@ public StreamRecordBatchList(List records) { } @Override - int size() { + public int size() { return size; } diff --git a/s3stream/src/test/java/com/automq/stream/s3/cache/LogCacheTest.java b/s3stream/src/test/java/com/automq/stream/s3/cache/LogCacheTest.java index 3e84ab03b..dc5a8390d 100644 --- a/s3stream/src/test/java/com/automq/stream/s3/cache/LogCacheTest.java +++ b/s3stream/src/test/java/com/automq/stream/s3/cache/LogCacheTest.java @@ -19,9 +19,11 @@ import com.automq.stream.s3.TestUtils; import com.automq.stream.s3.model.StreamRecordBatch; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import java.util.List; +import java.util.Map; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -60,4 +62,22 @@ public void testPutGet() { assertEquals(0, records.size()); } + @Test + public void testOffsetIndex() { + LogCache cache = new LogCache(Integer.MAX_VALUE, Integer.MAX_VALUE); + + for (int i = 0; i < 100000; i++) { + cache.put(new StreamRecordBatch(233L, 0L, i, 1, TestUtils.random(1))); + } + + long start = System.nanoTime(); + for (int i = 0; i < 100000; i++) { + cache.get(233L, i, i + 1, 1000); + } + System.out.println("cost: " + (System.nanoTime() - start) / 1000 + "us"); + Map offsetIndexMap = cache.blocks.get(0).map.get(233L).offsetIndexMap; + assertEquals(1, offsetIndexMap.size()); + assertEquals(100000, offsetIndexMap.get(100000L).index); + } + } From 4edb0ca07320905a21775ddec34ccc2abcf50b2c Mon Sep 17 00:00:00 2001 From: Robin Han Date: Sat, 25 Nov 2023 23:21:59 +0800 Subject: [PATCH 2/2] fix: ci Signed-off-by: Robin Han --- .../src/test/java/com/automq/stream/s3/cache/LogCacheTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/s3stream/src/test/java/com/automq/stream/s3/cache/LogCacheTest.java b/s3stream/src/test/java/com/automq/stream/s3/cache/LogCacheTest.java index dc5a8390d..51f6e1946 100644 --- a/s3stream/src/test/java/com/automq/stream/s3/cache/LogCacheTest.java +++ b/s3stream/src/test/java/com/automq/stream/s3/cache/LogCacheTest.java @@ -19,7 +19,6 @@ import com.automq.stream.s3.TestUtils; import com.automq.stream.s3.model.StreamRecordBatch; -import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import java.util.List;