diff --git a/s3stream/src/main/java/com/automq/stream/s3/cache/LogCache.java b/s3stream/src/main/java/com/automq/stream/s3/cache/LogCache.java index f139fdca5..9bb393d1f 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/cache/LogCache.java +++ b/s3stream/src/main/java/com/automq/stream/s3/cache/LogCache.java @@ -22,6 +22,8 @@ import com.automq.stream.s3.metrics.operations.S3Operation; import com.automq.stream.s3.model.StreamRecordBatch; import com.automq.stream.utils.biniarysearch.StreamRecordBatchList; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Collections; @@ -41,6 +43,7 @@ import static com.automq.stream.s3.cache.LogCache.StreamRange.NOOP_OFFSET; public class LogCache { + private static final Logger LOGGER = LoggerFactory.getLogger(LogCache.class); public static final long MATCH_ALL_STREAMS = -1L; private static final int DEFAULT_MAX_BLOCK_STREAM_COUNT = 10000; private static final Consumer DEFAULT_BLOCK_FREE_LISTENER = block -> { @@ -84,7 +87,13 @@ public boolean put(StreamRecordBatch recordBatch) { TimerUtil timerUtil = new TimerUtil(); tryRealFree(); size.addAndGet(recordBatch.size()); - boolean full = activeBlock.put(recordBatch); + readLock.lock(); + boolean full; + try { + full = activeBlock.put(recordBatch); + } finally { + readLock.unlock(); + } S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.APPEND_STORAGE_LOG_CACHE); return full; } @@ -379,6 +388,11 @@ static class StreamCache { Map offsetIndexMap = new HashMap<>(); synchronized void add(StreamRecordBatch recordBatch) { + if (recordBatch.getBaseOffset() != endOffset && endOffset != NOOP_OFFSET) { + RuntimeException ex = new IllegalArgumentException(String.format("streamId=%s record batch base offset mismatch, expect %s, actual %s", + recordBatch.getStreamId(), endOffset, recordBatch.getBaseOffset())); + LOGGER.error("[FATAL]", ex); + } records.add(recordBatch); if (startOffset == NOOP_OFFSET) { startOffset = recordBatch.getBaseOffset();