Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(s3stream): optimize cache performance by adding offset index #731

Merged
merged 2 commits into from
Nov 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
162 changes: 118 additions & 44 deletions s3stream/src/main/java/com/automq/stream/s3/cache/LogCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
Expand All @@ -35,6 +36,7 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import java.util.stream.Collectors;

import static com.automq.stream.s3.cache.LogCache.StreamRange.NOOP_OFFSET;

Expand All @@ -46,7 +48,7 @@ public class LogCache {
private final long capacity;
private final long cacheBlockMaxSize;
private final int maxCacheBlockStreamCount;
private final List<LogCacheBlock> blocks = new ArrayList<>();
final List<LogCacheBlock> blocks = new ArrayList<>();
private LogCacheBlock activeBlock;
private long confirmOffset;
private final AtomicLong size = new AtomicLong();
Expand Down Expand Up @@ -284,7 +286,7 @@ public static class LogCacheBlock {
private final long blockId;
private final long maxSize;
private final int maxStreamCount;
private final Map<Long, List<StreamRecordBatch>> map = new ConcurrentHashMap<>();
final Map<Long, StreamCache> map = new ConcurrentHashMap<>();
private final AtomicLong size = new AtomicLong();
private long confirmOffset;
volatile boolean free;
Expand All @@ -305,65 +307,38 @@ public long blockId() {
}

public boolean put(StreamRecordBatch recordBatch) {
map.compute(recordBatch.getStreamId(), (id, records) -> {
if (records == null) {
records = new ArrayList<>();
map.compute(recordBatch.getStreamId(), (id, cache) -> {
if (cache == null) {
cache = new StreamCache();
}
synchronized (records) {
records.add(recordBatch);
}
return records;
cache.add(recordBatch);
return cache;
});
int recordSize = recordBatch.size();
return size.addAndGet(recordSize) >= maxSize || map.size() >= maxStreamCount;
}

public List<StreamRecordBatch> get(long streamId, long startOffset, long endOffset, int maxBytes) {
List<StreamRecordBatch> streamRecords = map.get(streamId);
if (streamRecords == null) {
StreamCache cache = map.get(streamId);
if (cache == null) {
return Collections.emptyList();
}

synchronized (streamRecords) {
if (streamRecords.get(0).getBaseOffset() > startOffset || streamRecords.get(streamRecords.size() - 1).getLastOffset() <= startOffset) {
return Collections.emptyList();
}
StreamRecordBatchList records = new StreamRecordBatchList(streamRecords);
int startIndex = records.search(startOffset);
if (startIndex == -1) {
// mismatched
return Collections.emptyList();
}
int endIndex = -1;
int remainingBytesSize = maxBytes;
for (int i = startIndex; i < streamRecords.size(); i++) {
StreamRecordBatch record = streamRecords.get(i);
endIndex = i + 1;
remainingBytesSize -= Math.min(remainingBytesSize, record.size());
if (record.getLastOffset() >= endOffset || remainingBytesSize == 0) {
break;
}
}
return new ArrayList<>(streamRecords.subList(startIndex, endIndex));
}
return cache.get(startOffset, endOffset, maxBytes);
}

StreamRange getStreamRange(long streamId) {
List<StreamRecordBatch> streamRecords = map.get(streamId);
if (streamRecords == null) {
StreamCache streamCache = map.get(streamId);
if (streamCache == null) {
return new StreamRange(NOOP_OFFSET, NOOP_OFFSET);
} else {
synchronized (streamRecords) {
if (streamRecords.isEmpty()) {
return new StreamRange(NOOP_OFFSET, NOOP_OFFSET);
}
return new StreamRange(streamRecords.get(0).getBaseOffset(), streamRecords.get(streamRecords.size() - 1).getLastOffset());
}
return streamCache.range();
}
}

public Map<Long, List<StreamRecordBatch>> records() {
return map;
return map.entrySet().stream()
.map(e -> Map.entry(e.getKey(), e.getValue().records))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}

public long confirmOffset() {
Expand All @@ -379,7 +354,7 @@ public long size() {
}

public void free() {
map.forEach((streamId, records) -> records.forEach(StreamRecordBatch::release));
map.forEach((streamId, records) -> records.free());
map.clear();
}
}
Expand All @@ -394,4 +369,103 @@ public StreamRange(long startOffset, long endOffset) {
this.endOffset = endOffset;
}
}

static class StreamCache {
List<StreamRecordBatch> records = new ArrayList<>();
long startOffset = NOOP_OFFSET;
long endOffset = NOOP_OFFSET;
Map<Long, IndexAndCount> offsetIndexMap = new HashMap<>();

synchronized void add(StreamRecordBatch recordBatch) {
records.add(recordBatch);
if (startOffset == NOOP_OFFSET) {
startOffset = recordBatch.getBaseOffset();
}
endOffset = recordBatch.getLastOffset();
}

synchronized List<StreamRecordBatch> get(long startOffset, long endOffset, int maxBytes) {
if (this.startOffset > startOffset || this.endOffset <= startOffset) {
return Collections.emptyList();
}
int startIndex = searchStartIndex(startOffset);
if (startIndex == -1) {
// mismatched
return Collections.emptyList();
}
int endIndex = -1;
int remainingBytesSize = maxBytes;
long rstEndOffset = NOOP_OFFSET;
for (int i = startIndex; i < records.size(); i++) {
StreamRecordBatch record = records.get(i);
endIndex = i + 1;
remainingBytesSize -= Math.min(remainingBytesSize, record.size());
rstEndOffset = record.getLastOffset();
if (record.getLastOffset() >= endOffset || remainingBytesSize == 0) {
break;
}
}
if (rstEndOffset != NOOP_OFFSET) {
map(rstEndOffset, endIndex);
}
return new ArrayList<>(records.subList(startIndex, endIndex));
}

int searchStartIndex(long startOffset) {
IndexAndCount indexAndCount = offsetIndexMap.get(startOffset);
if (indexAndCount != null) {
unmap(startOffset, indexAndCount);
return indexAndCount.index;
} else {
// slow path
StreamRecordBatchList search = new StreamRecordBatchList(records);
return search.search(startOffset);
}
}

final void map(long offset, int index) {
offsetIndexMap.compute(offset, (k, v) -> {
if (v == null) {
return new IndexAndCount(index);
} else {
v.inc();
return v;
}
});
}

final void unmap(long startOffset, IndexAndCount indexAndCount) {
if (indexAndCount.dec() == 0) {
offsetIndexMap.remove(startOffset);
}
}

synchronized StreamRange range() {
return new StreamRange(startOffset, endOffset);
}

synchronized void free() {
records.forEach(StreamRecordBatch::release);
records.clear();
}
}

static class IndexAndCount {
int index;
int count;

public IndexAndCount(int index) {
this.index = index;
this.count = 1;
}

public void inc() {
count++;
}

public int dec() {
return --count;
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public StreamRecordBatchList(List<StreamRecordBatch> records) {
}

@Override
int size() {
public int size() {
return size;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.junit.jupiter.api.Test;

import java.util.List;
import java.util.Map;

import static org.junit.jupiter.api.Assertions.assertEquals;

Expand Down Expand Up @@ -60,4 +61,22 @@ public void testPutGet() {
assertEquals(0, records.size());
}

@Test
public void testOffsetIndex() {
LogCache cache = new LogCache(Integer.MAX_VALUE, Integer.MAX_VALUE);

for (int i = 0; i < 100000; i++) {
cache.put(new StreamRecordBatch(233L, 0L, i, 1, TestUtils.random(1)));
}

long start = System.nanoTime();
for (int i = 0; i < 100000; i++) {
cache.get(233L, i, i + 1, 1000);
}
System.out.println("cost: " + (System.nanoTime() - start) / 1000 + "us");
Map<Long, LogCache.IndexAndCount> offsetIndexMap = cache.blocks.get(0).map.get(233L).offsetIndexMap;
assertEquals(1, offsetIndexMap.size());
assertEquals(100000, offsetIndexMap.get(100000L).index);
}

}
Loading