Skip to content

Commit

Permalink
feat(s3stream): optimize cache performance by adding offset index (#731)
Browse files Browse the repository at this point in the history
Signed-off-by: Robin Han <[email protected]>
  • Loading branch information
superhx authored Nov 25, 2023
1 parent f9bc266 commit 69254d2
Show file tree
Hide file tree
Showing 3 changed files with 138 additions and 45 deletions.
162 changes: 118 additions & 44 deletions s3stream/src/main/java/com/automq/stream/s3/cache/LogCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
Expand All @@ -35,6 +36,7 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import java.util.stream.Collectors;

import static com.automq.stream.s3.cache.LogCache.StreamRange.NOOP_OFFSET;

Expand All @@ -46,7 +48,7 @@ public class LogCache {
private final long capacity;
private final long cacheBlockMaxSize;
private final int maxCacheBlockStreamCount;
private final List<LogCacheBlock> blocks = new ArrayList<>();
final List<LogCacheBlock> blocks = new ArrayList<>();
private LogCacheBlock activeBlock;
private long confirmOffset;
private final AtomicLong size = new AtomicLong();
Expand Down Expand Up @@ -284,7 +286,7 @@ public static class LogCacheBlock {
private final long blockId;
private final long maxSize;
private final int maxStreamCount;
private final Map<Long, List<StreamRecordBatch>> map = new ConcurrentHashMap<>();
final Map<Long, StreamCache> map = new ConcurrentHashMap<>();
private final AtomicLong size = new AtomicLong();
private long confirmOffset;
volatile boolean free;
Expand All @@ -305,65 +307,38 @@ public long blockId() {
}

public boolean put(StreamRecordBatch recordBatch) {
map.compute(recordBatch.getStreamId(), (id, records) -> {
if (records == null) {
records = new ArrayList<>();
map.compute(recordBatch.getStreamId(), (id, cache) -> {
if (cache == null) {
cache = new StreamCache();
}
synchronized (records) {
records.add(recordBatch);
}
return records;
cache.add(recordBatch);
return cache;
});
int recordSize = recordBatch.size();
return size.addAndGet(recordSize) >= maxSize || map.size() >= maxStreamCount;
}

public List<StreamRecordBatch> get(long streamId, long startOffset, long endOffset, int maxBytes) {
List<StreamRecordBatch> streamRecords = map.get(streamId);
if (streamRecords == null) {
StreamCache cache = map.get(streamId);
if (cache == null) {
return Collections.emptyList();
}

synchronized (streamRecords) {
if (streamRecords.get(0).getBaseOffset() > startOffset || streamRecords.get(streamRecords.size() - 1).getLastOffset() <= startOffset) {
return Collections.emptyList();
}
StreamRecordBatchList records = new StreamRecordBatchList(streamRecords);
int startIndex = records.search(startOffset);
if (startIndex == -1) {
// mismatched
return Collections.emptyList();
}
int endIndex = -1;
int remainingBytesSize = maxBytes;
for (int i = startIndex; i < streamRecords.size(); i++) {
StreamRecordBatch record = streamRecords.get(i);
endIndex = i + 1;
remainingBytesSize -= Math.min(remainingBytesSize, record.size());
if (record.getLastOffset() >= endOffset || remainingBytesSize == 0) {
break;
}
}
return new ArrayList<>(streamRecords.subList(startIndex, endIndex));
}
return cache.get(startOffset, endOffset, maxBytes);
}

StreamRange getStreamRange(long streamId) {
List<StreamRecordBatch> streamRecords = map.get(streamId);
if (streamRecords == null) {
StreamCache streamCache = map.get(streamId);
if (streamCache == null) {
return new StreamRange(NOOP_OFFSET, NOOP_OFFSET);
} else {
synchronized (streamRecords) {
if (streamRecords.isEmpty()) {
return new StreamRange(NOOP_OFFSET, NOOP_OFFSET);
}
return new StreamRange(streamRecords.get(0).getBaseOffset(), streamRecords.get(streamRecords.size() - 1).getLastOffset());
}
return streamCache.range();
}
}

public Map<Long, List<StreamRecordBatch>> records() {
return map;
return map.entrySet().stream()
.map(e -> Map.entry(e.getKey(), e.getValue().records))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}

public long confirmOffset() {
Expand All @@ -379,7 +354,7 @@ public long size() {
}

public void free() {
map.forEach((streamId, records) -> records.forEach(StreamRecordBatch::release));
map.forEach((streamId, records) -> records.free());
map.clear();
}
}
Expand All @@ -394,4 +369,103 @@ public StreamRange(long startOffset, long endOffset) {
this.endOffset = endOffset;
}
}

static class StreamCache {
List<StreamRecordBatch> records = new ArrayList<>();
long startOffset = NOOP_OFFSET;
long endOffset = NOOP_OFFSET;
Map<Long, IndexAndCount> offsetIndexMap = new HashMap<>();

synchronized void add(StreamRecordBatch recordBatch) {
records.add(recordBatch);
if (startOffset == NOOP_OFFSET) {
startOffset = recordBatch.getBaseOffset();
}
endOffset = recordBatch.getLastOffset();
}

synchronized List<StreamRecordBatch> get(long startOffset, long endOffset, int maxBytes) {
if (this.startOffset > startOffset || this.endOffset <= startOffset) {
return Collections.emptyList();
}
int startIndex = searchStartIndex(startOffset);
if (startIndex == -1) {
// mismatched
return Collections.emptyList();
}
int endIndex = -1;
int remainingBytesSize = maxBytes;
long rstEndOffset = NOOP_OFFSET;
for (int i = startIndex; i < records.size(); i++) {
StreamRecordBatch record = records.get(i);
endIndex = i + 1;
remainingBytesSize -= Math.min(remainingBytesSize, record.size());
rstEndOffset = record.getLastOffset();
if (record.getLastOffset() >= endOffset || remainingBytesSize == 0) {
break;
}
}
if (rstEndOffset != NOOP_OFFSET) {
map(rstEndOffset, endIndex);
}
return new ArrayList<>(records.subList(startIndex, endIndex));
}

int searchStartIndex(long startOffset) {
IndexAndCount indexAndCount = offsetIndexMap.get(startOffset);
if (indexAndCount != null) {
unmap(startOffset, indexAndCount);
return indexAndCount.index;
} else {
// slow path
StreamRecordBatchList search = new StreamRecordBatchList(records);
return search.search(startOffset);
}
}

final void map(long offset, int index) {
offsetIndexMap.compute(offset, (k, v) -> {
if (v == null) {
return new IndexAndCount(index);
} else {
v.inc();
return v;
}
});
}

final void unmap(long startOffset, IndexAndCount indexAndCount) {
if (indexAndCount.dec() == 0) {
offsetIndexMap.remove(startOffset);
}
}

synchronized StreamRange range() {
return new StreamRange(startOffset, endOffset);
}

synchronized void free() {
records.forEach(StreamRecordBatch::release);
records.clear();
}
}

static class IndexAndCount {
int index;
int count;

public IndexAndCount(int index) {
this.index = index;
this.count = 1;
}

public void inc() {
count++;
}

public int dec() {
return --count;
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public StreamRecordBatchList(List<StreamRecordBatch> records) {
}

@Override
int size() {
public int size() {
return size;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.junit.jupiter.api.Test;

import java.util.List;
import java.util.Map;

import static org.junit.jupiter.api.Assertions.assertEquals;

Expand Down Expand Up @@ -60,4 +61,22 @@ public void testPutGet() {
assertEquals(0, records.size());
}

@Test
public void testOffsetIndex() {
LogCache cache = new LogCache(Integer.MAX_VALUE, Integer.MAX_VALUE);

for (int i = 0; i < 100000; i++) {
cache.put(new StreamRecordBatch(233L, 0L, i, 1, TestUtils.random(1)));
}

long start = System.nanoTime();
for (int i = 0; i < 100000; i++) {
cache.get(233L, i, i + 1, 1000);
}
System.out.println("cost: " + (System.nanoTime() - start) / 1000 + "us");
Map<Long, LogCache.IndexAndCount> offsetIndexMap = cache.blocks.get(0).map.get(233L).offsetIndexMap;
assertEquals(1, offsetIndexMap.size());
assertEquals(100000, offsetIndexMap.get(100000L).index);
}

}

0 comments on commit 69254d2

Please sign in to comment.