diff --git a/pom.xml b/pom.xml
index e98dbd606..de88d9316 100644
--- a/pom.xml
+++ b/pom.xml
@@ -53,7 +53,7 @@
32.1.3-jre
2.0.9
2.2
- 0.11.0-SNAPSHOT
+ 0.12.0-SNAPSHOT
23.5.26
diff --git a/s3stream/pom.xml b/s3stream/pom.xml
index a3e59df50..49b7c9e54 100644
--- a/s3stream/pom.xml
+++ b/s3stream/pom.xml
@@ -22,7 +22,7 @@
4.0.0
com.automq.elasticstream
s3stream
- 0.11.0-SNAPSHOT
+ 0.12.0-SNAPSHOT
5.5.0
5.10.0
diff --git a/s3stream/src/main/java/com/automq/stream/s3/DirectByteBufAlloc.java b/s3stream/src/main/java/com/automq/stream/s3/DirectByteBufAlloc.java
index 647126057..44af1eff2 100644
--- a/s3stream/src/main/java/com/automq/stream/s3/DirectByteBufAlloc.java
+++ b/s3stream/src/main/java/com/automq/stream/s3/DirectByteBufAlloc.java
@@ -17,6 +17,7 @@
package com.automq.stream.s3;
+import com.automq.stream.s3.metrics.MetricsLevel;
import com.automq.stream.s3.metrics.S3StreamMetricsManager;
import com.automq.stream.utils.Threads;
import io.netty.buffer.ByteBuf;
@@ -45,7 +46,7 @@ public static ByteBuf byteBuffer(int initCapacity) {
public static ByteBuf byteBuffer(int initCapacity, String name) {
try {
if (name != null) {
- S3StreamMetricsManager.recordAllocateByteBufSize(initCapacity, name);
+ S3StreamMetricsManager.recordAllocateByteBufSize(MetricsLevel.DEBUG, initCapacity, name);
}
return ALLOC.directBuffer(initCapacity);
} catch (OutOfMemoryError e) {
diff --git a/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java b/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java
index 34097635f..dd32878c0 100644
--- a/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java
+++ b/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java
@@ -25,6 +25,7 @@
import com.automq.stream.s3.context.AppendContext;
import com.automq.stream.s3.context.FetchContext;
import com.automq.stream.s3.metadata.StreamMetadata;
+import com.automq.stream.s3.metrics.MetricsLevel;
import com.automq.stream.s3.metrics.S3StreamMetricsManager;
import com.automq.stream.s3.metrics.TimerUtil;
import com.automq.stream.s3.metrics.operations.S3Operation;
@@ -274,7 +275,7 @@ public CompletableFuture append(AppendContext context, StreamRecordBatch s
append0(context, writeRequest, false);
cf.whenComplete((nil, ex) -> {
streamRecord.release();
- S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.APPEND_STORAGE);
+ S3StreamMetricsManager.recordOperationLatency(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.APPEND_STORAGE);
});
return cf;
}
@@ -294,7 +295,7 @@ public boolean append0(AppendContext context, WalWriteRequest request, boolean f
if (!fromBackoff) {
backoffRecords.offer(request);
}
- S3StreamMetricsManager.recordOperationLatency(0L, S3Operation.APPEND_STORAGE_LOG_CACHE_FULL);
+ S3StreamMetricsManager.recordOperationLatency(MetricsLevel.INFO, 0L, S3Operation.APPEND_STORAGE_LOG_CACHE_FULL);
if (System.currentTimeMillis() - lastLogTimestamp > 1000L) {
LOGGER.warn("[BACKOFF] log cache size {} is larger than {}", deltaWALCache.size(), maxDeltaWALCacheSize);
lastLogTimestamp = System.currentTimeMillis();
@@ -370,7 +371,7 @@ public CompletableFuture read(FetchContext context,
TimerUtil timerUtil = new TimerUtil();
CompletableFuture cf = new CompletableFuture<>();
FutureUtil.propagate(read0(context, streamId, startOffset, endOffset, maxBytes), cf);
- cf.whenComplete((nil, ex) -> S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.READ_STORAGE));
+ cf.whenComplete((nil, ex) -> S3StreamMetricsManager.recordOperationLatency(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.READ_STORAGE));
return cf;
}
@@ -448,7 +449,7 @@ public CompletableFuture forceUpload(long streamId) {
CompletableFuture cf = new CompletableFuture<>();
// Wait for a while to group force upload tasks.
forceUploadTicker.tick().whenComplete((nil, ex) -> {
- S3StreamMetricsManager.recordStageLatency(timer.elapsedAs(TimeUnit.NANOSECONDS), S3Stage.FORCE_UPLOAD_WAL_AWAIT);
+ S3StreamMetricsManager.recordStageLatency(MetricsLevel.DEBUG, timer.elapsedAs(TimeUnit.NANOSECONDS), S3Stage.FORCE_UPLOAD_WAL_AWAIT);
uploadDeltaWAL(streamId, true);
// Wait for all tasks contains streamId complete.
List> tasksContainsStream = this.inflightWALUploadTasks.stream()
@@ -460,7 +461,7 @@ public CompletableFuture forceUpload(long streamId) {
callbackSequencer.tryFree(streamId);
}
});
- cf.whenComplete((nil, ex) -> S3StreamMetricsManager.recordStageLatency(timer.elapsedAs(TimeUnit.NANOSECONDS), S3Stage.FORCE_UPLOAD_WAL_COMPLETE));
+ cf.whenComplete((nil, ex) -> S3StreamMetricsManager.recordStageLatency(MetricsLevel.DEBUG, timer.elapsedAs(TimeUnit.NANOSECONDS), S3Stage.FORCE_UPLOAD_WAL_COMPLETE));
return cf;
}
@@ -492,7 +493,7 @@ private void handleAppendCallback0(WalWriteRequest request) {
for (WalWriteRequest waitingAckRequest : waitingAckRequests) {
waitingAckRequest.cf.complete(null);
}
- S3StreamMetricsManager.recordOperationLatency(timer.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.APPEND_STORAGE_APPEND_CALLBACK);
+ S3StreamMetricsManager.recordOperationLatency(MetricsLevel.DEBUG, timer.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.APPEND_STORAGE_APPEND_CALLBACK);
}
private Lock getStreamCallbackLock(long streamId) {
@@ -537,7 +538,7 @@ CompletableFuture uploadDeltaWAL(DeltaWALUploadTaskContext context) {
inflightWALUploadTasks.add(context);
backgroundExecutor.execute(() -> FutureUtil.exec(() -> uploadDeltaWAL0(context), cf, LOGGER, "uploadDeltaWAL"));
cf.whenComplete((nil, ex) -> {
- S3StreamMetricsManager.recordStageLatency(context.timer.elapsedAs(TimeUnit.NANOSECONDS), S3Stage.UPLOAD_WAL_COMPLETE);
+ S3StreamMetricsManager.recordStageLatency(MetricsLevel.INFO, context.timer.elapsedAs(TimeUnit.NANOSECONDS), S3Stage.UPLOAD_WAL_COMPLETE);
inflightWALUploadTasks.remove(context);
if (ex != null) {
LOGGER.error("upload delta WAL fail", ex);
@@ -579,10 +580,11 @@ private void uploadDeltaWAL0(DeltaWALUploadTaskContext context) {
private void prepareDeltaWALUpload(DeltaWALUploadTaskContext context) {
context.task.prepare().thenAcceptAsync(nil -> {
- S3StreamMetricsManager.recordStageLatency(context.timer.elapsedAs(TimeUnit.NANOSECONDS), S3Stage.UPLOAD_WAL_PREPARE);
+ S3StreamMetricsManager.recordStageLatency(MetricsLevel.DEBUG, context.timer.elapsedAs(TimeUnit.NANOSECONDS), S3Stage.UPLOAD_WAL_PREPARE);
// 1. poll out current task and trigger upload.
DeltaWALUploadTaskContext peek = walPrepareQueue.poll();
- Objects.requireNonNull(peek).task.upload().thenAccept(nil2 -> S3StreamMetricsManager.recordStageLatency(context.timer.elapsedAs(TimeUnit.NANOSECONDS), S3Stage.UPLOAD_WAL_UPLOAD));
+ Objects.requireNonNull(peek).task.upload().thenAccept(nil2 -> S3StreamMetricsManager.recordStageLatency(
+ MetricsLevel.DEBUG, context.timer.elapsedAs(TimeUnit.NANOSECONDS), S3Stage.UPLOAD_WAL_UPLOAD));
// 2. add task to commit queue.
boolean walObjectCommitQueueEmpty = walCommitQueue.isEmpty();
walCommitQueue.add(peek);
@@ -599,7 +601,7 @@ private void prepareDeltaWALUpload(DeltaWALUploadTaskContext context) {
private void commitDeltaWALUpload(DeltaWALUploadTaskContext context) {
context.task.commit().thenAcceptAsync(nil -> {
- S3StreamMetricsManager.recordStageLatency(context.timer.elapsedAs(TimeUnit.NANOSECONDS), S3Stage.UPLOAD_WAL_COMMIT);
+ S3StreamMetricsManager.recordStageLatency(MetricsLevel.DEBUG, context.timer.elapsedAs(TimeUnit.NANOSECONDS), S3Stage.UPLOAD_WAL_COMMIT);
// 1. poll out current task
walCommitQueue.poll();
if (context.cache.confirmOffset() != 0) {
diff --git a/s3stream/src/main/java/com/automq/stream/s3/S3Stream.java b/s3stream/src/main/java/com/automq/stream/s3/S3Stream.java
index 4bf4ae6b5..1ad1e03bb 100644
--- a/s3stream/src/main/java/com/automq/stream/s3/S3Stream.java
+++ b/s3stream/src/main/java/com/automq/stream/s3/S3Stream.java
@@ -30,6 +30,7 @@
import com.automq.stream.s3.cache.CacheAccessType;
import com.automq.stream.s3.context.AppendContext;
import com.automq.stream.s3.context.FetchContext;
+import com.automq.stream.s3.metrics.MetricsLevel;
import com.automq.stream.s3.metrics.S3StreamMetricsManager;
import com.automq.stream.s3.metrics.TimerUtil;
import com.automq.stream.s3.metrics.operations.S3Operation;
@@ -146,7 +147,7 @@ public long nextOffset() {
public CompletableFuture append(AppendContext context, RecordBatch recordBatch) {
TimerUtil timerUtil = new TimerUtil();
writeLock.lock();
- S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.APPEND_STREAM_WRITE_LOCK);
+ S3StreamMetricsManager.recordOperationLatency(MetricsLevel.DEBUG, timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.APPEND_STREAM_WRITE_LOCK);
try {
CompletableFuture cf = exec(() -> {
if (networkInboundLimiter != null) {
@@ -156,7 +157,7 @@ public CompletableFuture append(AppendContext context, RecordBatch
}, LOGGER, "append");
pendingAppends.add(cf);
cf.whenComplete((nil, ex) -> {
- S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.APPEND_STREAM);
+ S3StreamMetricsManager.recordOperationLatency(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.APPEND_STREAM);
pendingAppends.remove(cf);
});
return cf;
@@ -198,12 +199,12 @@ public CompletableFuture fetch(FetchContext context,
@SpanAttribute int maxBytes) {
TimerUtil timerUtil = new TimerUtil();
readLock.lock();
- S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.FETCH_STREAM_READ_LOCK);
+ S3StreamMetricsManager.recordOperationLatency(MetricsLevel.DEBUG, timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.FETCH_STREAM_READ_LOCK);
try {
CompletableFuture cf = exec(() -> fetch0(context, startOffset, endOffset, maxBytes), LOGGER, "fetch");
pendingFetches.add(cf);
cf.whenComplete((rs, ex) -> {
- S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.FETCH_STREAM);
+ S3StreamMetricsManager.recordOperationLatency(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.FETCH_STREAM);
if (ex != null) {
Throwable cause = FutureUtil.cause(ex);
if (!(cause instanceof FastReadFailFastException)) {
@@ -266,7 +267,7 @@ public CompletableFuture trim(long newStartOffset) {
lastPendingTrim.whenComplete((nil, ex) -> propagate(trim0(newStartOffset), cf));
this.lastPendingTrim = cf;
cf.whenComplete((nil, ex) -> {
- S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.TRIM_STREAM);
+ S3StreamMetricsManager.recordOperationLatency(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.TRIM_STREAM);
});
return cf;
}, LOGGER, "trim");
diff --git a/s3stream/src/main/java/com/automq/stream/s3/S3StreamClient.java b/s3stream/src/main/java/com/automq/stream/s3/S3StreamClient.java
index e550b4b42..629a87c38 100644
--- a/s3stream/src/main/java/com/automq/stream/s3/S3StreamClient.java
+++ b/s3stream/src/main/java/com/automq/stream/s3/S3StreamClient.java
@@ -21,6 +21,7 @@
import com.automq.stream.api.OpenStreamOptions;
import com.automq.stream.api.Stream;
import com.automq.stream.api.StreamClient;
+import com.automq.stream.s3.metrics.MetricsLevel;
import com.automq.stream.s3.metrics.S3StreamMetricsManager;
import com.automq.stream.s3.metrics.TimerUtil;
import com.automq.stream.s3.metrics.operations.S3Operation;
@@ -83,7 +84,7 @@ public S3StreamClient(StreamManager streamManager, Storage storage, ObjectManage
public CompletableFuture createAndOpenStream(CreateStreamOptions options) {
TimerUtil timerUtil = new TimerUtil();
return FutureUtil.exec(() -> streamManager.createStream().thenCompose(streamId -> {
- S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.CREATE_STREAM);
+ S3StreamMetricsManager.recordOperationLatency(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.CREATE_STREAM);
return openStream0(streamId, options.epoch());
}), LOGGER, "createAndOpenStream");
}
@@ -138,7 +139,7 @@ private CompletableFuture openStream0(long streamId, long epoch) {
TimerUtil timerUtil = new TimerUtil();
return streamManager.openStream(streamId, epoch).
thenApply(metadata -> {
- S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.OPEN_STREAM);
+ S3StreamMetricsManager.recordOperationLatency(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.OPEN_STREAM);
StreamObjectsCompactionTask.Builder builder = new StreamObjectsCompactionTask.Builder(objectManager, s3Operator)
.compactedStreamObjectMaxSizeInBytes(config.streamObjectCompactionMaxSizeBytes())
.eligibleStreamObjectLivingTimeInMs(config.streamObjectCompactionLivingTimeMinutes() * 60L * 1000)
diff --git a/s3stream/src/main/java/com/automq/stream/s3/cache/DefaultS3BlockCache.java b/s3stream/src/main/java/com/automq/stream/s3/cache/DefaultS3BlockCache.java
index 1d198862c..dff3e1466 100644
--- a/s3stream/src/main/java/com/automq/stream/s3/cache/DefaultS3BlockCache.java
+++ b/s3stream/src/main/java/com/automq/stream/s3/cache/DefaultS3BlockCache.java
@@ -18,6 +18,7 @@
package com.automq.stream.s3.cache;
import com.automq.stream.s3.Config;
+import com.automq.stream.s3.metrics.MetricsLevel;
import com.automq.stream.s3.metrics.S3StreamMetricsManager;
import com.automq.stream.s3.metrics.TimerUtil;
import com.automq.stream.s3.metrics.operations.S3Operation;
@@ -112,7 +113,7 @@ public CompletableFuture read(TraceContext traceContext,
long timeElapsed = timerUtil.elapsedAs(TimeUnit.NANOSECONDS);
boolean isCacheHit = ret.getCacheAccessType() == CacheAccessType.BLOCK_CACHE_HIT;
Span.fromContext(finalTraceContext.currentContext()).setAttribute("cache_hit", isCacheHit);
- S3StreamMetricsManager.recordReadCacheLatency(timeElapsed, S3Operation.READ_STORAGE_BLOCK_CACHE, isCacheHit);
+ S3StreamMetricsManager.recordReadCacheLatency(MetricsLevel.INFO, timeElapsed, S3Operation.READ_STORAGE_BLOCK_CACHE, isCacheHit);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("[S3BlockCache] read data complete, cache hit: {}, stream={}, {}-{}, total bytes: {}",
ret.getCacheAccessType() == CacheAccessType.BLOCK_CACHE_HIT, streamId, startOffset, endOffset, totalReturnedSize);
diff --git a/s3stream/src/main/java/com/automq/stream/s3/cache/LogCache.java b/s3stream/src/main/java/com/automq/stream/s3/cache/LogCache.java
index d52aba729..1854391f3 100644
--- a/s3stream/src/main/java/com/automq/stream/s3/cache/LogCache.java
+++ b/s3stream/src/main/java/com/automq/stream/s3/cache/LogCache.java
@@ -17,6 +17,7 @@
package com.automq.stream.s3.cache;
+import com.automq.stream.s3.metrics.MetricsLevel;
import com.automq.stream.s3.metrics.S3StreamMetricsManager;
import com.automq.stream.s3.metrics.TimerUtil;
import com.automq.stream.s3.metrics.operations.S3Operation;
@@ -99,7 +100,7 @@ public boolean put(StreamRecordBatch recordBatch) {
} finally {
readLock.unlock();
}
- S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.APPEND_STORAGE_LOG_CACHE);
+ S3StreamMetricsManager.recordOperationLatency(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.APPEND_STORAGE_LOG_CACHE);
return full;
}
@@ -150,7 +151,7 @@ public List get(TraceContext context,
long timeElapsed = timerUtil.elapsedAs(TimeUnit.NANOSECONDS);
boolean isCacheHit = !records.isEmpty() && records.get(0).getBaseOffset() <= startOffset;
- S3StreamMetricsManager.recordReadCacheLatency(timeElapsed, S3Operation.READ_STORAGE_LOG_CACHE, isCacheHit);
+ S3StreamMetricsManager.recordReadCacheLatency(MetricsLevel.INFO, timeElapsed, S3Operation.READ_STORAGE_LOG_CACHE, isCacheHit);
return records;
}
diff --git a/s3stream/src/main/java/com/automq/stream/s3/cache/ReadAheadAgent.java b/s3stream/src/main/java/com/automq/stream/s3/cache/ReadAheadAgent.java
index d6b10448e..88be764e7 100644
--- a/s3stream/src/main/java/com/automq/stream/s3/cache/ReadAheadAgent.java
+++ b/s3stream/src/main/java/com/automq/stream/s3/cache/ReadAheadAgent.java
@@ -17,6 +17,7 @@
package com.automq.stream.s3.cache;
+import com.automq.stream.s3.metrics.MetricsLevel;
import com.automq.stream.s3.metrics.S3StreamMetricsManager;
import com.automq.stream.s3.metrics.TimerUtil;
import com.automq.stream.utils.LogContext;
@@ -99,7 +100,7 @@ public void updateReadAheadResult(long readAheadEndOffset, int readAheadSize) {
lock.lock();
this.readAheadEndOffset = readAheadEndOffset;
this.lastReadAheadSize = readAheadSize;
- S3StreamMetricsManager.recordReadAheadSize(readAheadSize);
+ S3StreamMetricsManager.recordReadAheadSize(MetricsLevel.DEBUG, readAheadSize);
if (logger.isDebugEnabled()) {
logger.debug("update read ahead offset {}, size: {}, lastReadOffset: {}", readAheadEndOffset, readAheadSize, lastReadOffset);
}
diff --git a/s3stream/src/main/java/com/automq/stream/s3/cache/StreamReader.java b/s3stream/src/main/java/com/automq/stream/s3/cache/StreamReader.java
index 8981fdc83..4ec9b4105 100644
--- a/s3stream/src/main/java/com/automq/stream/s3/cache/StreamReader.java
+++ b/s3stream/src/main/java/com/automq/stream/s3/cache/StreamReader.java
@@ -20,6 +20,7 @@
import com.automq.stream.s3.ObjectReader;
import com.automq.stream.s3.StreamDataBlock;
import com.automq.stream.s3.metadata.S3ObjectMetadata;
+import com.automq.stream.s3.metrics.MetricsLevel;
import com.automq.stream.s3.metrics.S3StreamMetricsManager;
import com.automq.stream.s3.metrics.TimerUtil;
import com.automq.stream.s3.metrics.operations.S3Operation;
@@ -136,7 +137,7 @@ public CompletableFuture> syncReadAhead(TraceContext tra
completeInflightTask0(key, ex);
}
context.taskKeySet.clear();
- S3StreamMetricsManager.recordReadAheadLatency(timer.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.BLOCK_CACHE_READ_AHEAD, true);
+ S3StreamMetricsManager.recordReadAheadLatency(MetricsLevel.INFO, timer.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.BLOCK_CACHE_READ_AHEAD, true);
});
}
@@ -298,7 +299,7 @@ public void asyncReadAhead(long streamId, long startOffset, long endOffset, int
completeInflightTask0(key, ex);
}
context.taskKeySet.clear();
- S3StreamMetricsManager.recordReadAheadLatency(timer.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.BLOCK_CACHE_READ_AHEAD, false);
+ S3StreamMetricsManager.recordReadAheadLatency(MetricsLevel.INFO, timer.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.BLOCK_CACHE_READ_AHEAD, false);
});
}
diff --git a/s3stream/src/main/java/com/automq/stream/s3/compact/operator/DataBlockReader.java b/s3stream/src/main/java/com/automq/stream/s3/compact/operator/DataBlockReader.java
index 90407f245..b595ca1f8 100644
--- a/s3stream/src/main/java/com/automq/stream/s3/compact/operator/DataBlockReader.java
+++ b/s3stream/src/main/java/com/automq/stream/s3/compact/operator/DataBlockReader.java
@@ -20,6 +20,7 @@
import com.automq.stream.s3.DirectByteBufAlloc;
import com.automq.stream.s3.StreamDataBlock;
import com.automq.stream.s3.metadata.S3ObjectMetadata;
+import com.automq.stream.s3.metrics.MetricsLevel;
import com.automq.stream.s3.metrics.S3StreamMetricsManager;
import com.automq.stream.s3.network.ThrottleStrategy;
import com.automq.stream.s3.operator.S3Operator;
@@ -188,7 +189,7 @@ private void readContinuousBlocks0(List streamDataBlocks) {
private CompletableFuture rangeRead(long start, long end) {
return rangeRead0(start, end).whenComplete((ret, ex) ->
- S3StreamMetricsManager.recordCompactionReadSizeIn(ret.readableBytes()));
+ S3StreamMetricsManager.recordCompactionReadSizeIn(MetricsLevel.INFO, ret.readableBytes()));
}
private CompletableFuture rangeRead0(long start, long end) {
diff --git a/s3stream/src/main/java/com/automq/stream/s3/compact/operator/DataBlockWriter.java b/s3stream/src/main/java/com/automq/stream/s3/compact/operator/DataBlockWriter.java
index 72a010e13..a4c230648 100644
--- a/s3stream/src/main/java/com/automq/stream/s3/compact/operator/DataBlockWriter.java
+++ b/s3stream/src/main/java/com/automq/stream/s3/compact/operator/DataBlockWriter.java
@@ -20,6 +20,7 @@
import com.automq.stream.s3.DirectByteBufAlloc;
import com.automq.stream.s3.StreamDataBlock;
import com.automq.stream.s3.metadata.ObjectUtils;
+import com.automq.stream.s3.metrics.MetricsLevel;
import com.automq.stream.s3.metrics.S3StreamMetricsManager;
import com.automq.stream.s3.network.ThrottleStrategy;
import com.automq.stream.s3.operator.S3Operator;
@@ -63,7 +64,7 @@ public long getObjectId() {
public void write(StreamDataBlock dataBlock) {
CompletableFuture cf = new CompletableFuture<>();
- cf.whenComplete((nil, ex) -> S3StreamMetricsManager.recordCompactionWriteSize(dataBlock.getBlockSize()));
+ cf.whenComplete((nil, ex) -> S3StreamMetricsManager.recordCompactionWriteSize(MetricsLevel.INFO, dataBlock.getBlockSize()));
waitingUploadBlockCfs.put(dataBlock, cf);
waitingUploadBlocks.add(dataBlock);
long waitingUploadSize = waitingUploadBlocks.stream().mapToLong(StreamDataBlock::getBlockSize).sum();
diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/MetricsLevel.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/MetricsLevel.java
new file mode 100644
index 000000000..a7085cba6
--- /dev/null
+++ b/s3stream/src/main/java/com/automq/stream/s3/metrics/MetricsLevel.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.automq.stream.s3.metrics;
+
+public enum MetricsLevel {
+ INFO,
+ DEBUG;
+
+ public boolean isWithin(MetricsLevel level) {
+ return this.ordinal() <= level.ordinal();
+ }
+}
diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsManager.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsManager.java
index 3cf44471e..f2477f9ab 100644
--- a/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsManager.java
+++ b/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsManager.java
@@ -69,6 +69,7 @@ public class S3StreamMetricsManager {
private static Supplier availableInflightS3ReadQuotaSupplier = () -> 0;
private static Supplier availableInflightS3WriteQuotaSupplier = () -> 0;
private static Supplier inflightWALUploadTasksCountSupplier = () -> 0;
+ private static MetricsLevel metricsLevel = MetricsLevel.INFO;
public static void initAttributesBuilder(Supplier attributesBuilderSupplier) {
AttributesCache.INSTANCE.setDefaultAttributes(attributesBuilderSupplier.get().build());
@@ -78,6 +79,10 @@ public static void initMetrics(Meter meter) {
initMetrics(meter, "");
}
+ public static void setMetricsLevel(MetricsLevel level) {
+ metricsLevel = level;
+ }
+
public static void initMetrics(Meter meter, String prefix) {
s3DownloadSizeInTotal = meter.counterBuilder(prefix + S3StreamMetricsConstant.DOWNLOAD_SIZE_METRIC_NAME)
.setDescription("S3 download size")
@@ -124,20 +129,36 @@ public static void initMetrics(Meter meter, String prefix) {
.setDescription("Network inbound available bandwidth")
.setUnit("bytes")
.ofLongs()
- .buildWithCallback(result -> result.record(networkInboundAvailableBandwidthSupplier.get(), AttributesCache.INSTANCE.defaultAttributes()));
+ .buildWithCallback(result -> {
+ if (MetricsLevel.INFO.isWithin(metricsLevel)) {
+ result.record(networkInboundAvailableBandwidthSupplier.get(), AttributesCache.INSTANCE.defaultAttributes());
+ }
+ });
networkOutboundAvailableBandwidth = meter.gaugeBuilder(prefix + S3StreamMetricsConstant.NETWORK_OUTBOUND_AVAILABLE_BANDWIDTH_METRIC_NAME)
.setDescription("Network outbound available bandwidth")
.setUnit("bytes")
.ofLongs()
- .buildWithCallback(result -> result.record(networkOutboundAvailableBandwidthSupplier.get(), AttributesCache.INSTANCE.defaultAttributes()));
+ .buildWithCallback(result -> {
+ if (MetricsLevel.INFO.isWithin(metricsLevel)) {
+ result.record(networkOutboundAvailableBandwidthSupplier.get(), AttributesCache.INSTANCE.defaultAttributes());
+ }
+ });
networkInboundLimiterQueueSize = meter.gaugeBuilder(prefix + S3StreamMetricsConstant.NETWORK_INBOUND_LIMITER_QUEUE_SIZE_METRIC_NAME)
.setDescription("Network inbound limiter queue size")
.ofLongs()
- .buildWithCallback(result -> result.record((long) networkInboundLimiterQueueSizeSupplier.get(), AttributesCache.INSTANCE.defaultAttributes()));
+ .buildWithCallback(result -> {
+ if (MetricsLevel.DEBUG.isWithin(metricsLevel)) {
+ result.record((long) networkInboundLimiterQueueSizeSupplier.get(), AttributesCache.INSTANCE.defaultAttributes());
+ }
+ });
networkOutboundLimiterQueueSize = meter.gaugeBuilder(prefix + S3StreamMetricsConstant.NETWORK_OUTBOUND_LIMITER_QUEUE_SIZE_METRIC_NAME)
.setDescription("Network outbound limiter queue size")
.ofLongs()
- .buildWithCallback(result -> result.record((long) networkOutboundLimiterQueueSizeSupplier.get(), AttributesCache.INSTANCE.defaultAttributes()));
+ .buildWithCallback(result -> {
+ if (MetricsLevel.DEBUG.isWithin(metricsLevel)) {
+ result.record((long) networkOutboundLimiterQueueSizeSupplier.get(), AttributesCache.INSTANCE.defaultAttributes());
+ }
+ });
networkInboundLimiterQueueTime = meter.histogramBuilder(prefix + S3StreamMetricsConstant.NETWORK_INBOUND_LIMITER_QUEUE_TIME_METRIC_NAME)
.setDescription("Network inbound limiter queue time")
.setUnit("nanoseconds")
@@ -163,38 +184,70 @@ public static void initMetrics(Meter meter, String prefix) {
deltaWalStartOffset = meter.gaugeBuilder(prefix + S3StreamMetricsConstant.WAL_START_OFFSET)
.setDescription("Delta WAL start offset")
.ofLongs()
- .buildWithCallback(result -> result.record(deltaWalStartOffsetSupplier.get(), AttributesCache.INSTANCE.defaultAttributes()));
+ .buildWithCallback(result -> {
+ if (MetricsLevel.DEBUG.isWithin(metricsLevel)) {
+ result.record(deltaWalStartOffsetSupplier.get(), AttributesCache.INSTANCE.defaultAttributes());
+ }
+ });
deltaWalTrimmedOffset = meter.gaugeBuilder(prefix + S3StreamMetricsConstant.WAL_TRIMMED_OFFSET)
.setDescription("Delta WAL trimmed offset")
.ofLongs()
- .buildWithCallback(result -> result.record(deltaWalTrimmedOffsetSupplier.get(), AttributesCache.INSTANCE.defaultAttributes()));
+ .buildWithCallback(result -> {
+ if (MetricsLevel.DEBUG.isWithin(metricsLevel)) {
+ result.record(deltaWalTrimmedOffsetSupplier.get(), AttributesCache.INSTANCE.defaultAttributes());
+ }
+ });
deltaWalCacheSize = meter.gaugeBuilder(prefix + S3StreamMetricsConstant.DELTA_WAL_CACHE_SIZE)
.setDescription("Delta WAL cache size")
.setUnit("bytes")
.ofLongs()
- .buildWithCallback(result -> result.record(deltaWALCacheSizeSupplier.get(), AttributesCache.INSTANCE.defaultAttributes()));
+ .buildWithCallback(result -> {
+ if (MetricsLevel.DEBUG.isWithin(metricsLevel)) {
+ result.record(deltaWALCacheSizeSupplier.get(), AttributesCache.INSTANCE.defaultAttributes());
+ }
+ });
blockCacheSize = meter.gaugeBuilder(prefix + S3StreamMetricsConstant.BLOCK_CACHE_SIZE)
.setDescription("Block cache size")
.setUnit("bytes")
.ofLongs()
- .buildWithCallback(result -> result.record(blockCacheSizeSupplier.get(), AttributesCache.INSTANCE.defaultAttributes()));
+ .buildWithCallback(result -> {
+ if (MetricsLevel.DEBUG.isWithin(metricsLevel)) {
+ result.record(blockCacheSizeSupplier.get(), AttributesCache.INSTANCE.defaultAttributes());
+ }
+ });
availableInflightReadAheadSize = meter.gaugeBuilder(prefix + S3StreamMetricsConstant.AVAILABLE_INFLIGHT_READ_AHEAD_SIZE_METRIC_NAME)
.setDescription("Available inflight read ahead size")
.setUnit("bytes")
.ofLongs()
- .buildWithCallback(result -> result.record((long) availableInflightReadAheadSizeSupplier.get(), AttributesCache.INSTANCE.defaultAttributes()));
+ .buildWithCallback(result -> {
+ if (MetricsLevel.DEBUG.isWithin(metricsLevel)) {
+ result.record((long) availableInflightReadAheadSizeSupplier.get(), AttributesCache.INSTANCE.defaultAttributes());
+ }
+ });
availableInflightS3ReadQuota = meter.gaugeBuilder(prefix + S3StreamMetricsConstant.AVAILABLE_S3_INFLIGHT_READ_QUOTA_METRIC_NAME)
.setDescription("Available inflight S3 read quota")
.ofLongs()
- .buildWithCallback(result -> result.record((long) availableInflightS3ReadQuotaSupplier.get(), AttributesCache.INSTANCE.defaultAttributes()));
+ .buildWithCallback(result -> {
+ if (MetricsLevel.DEBUG.isWithin(metricsLevel)) {
+ result.record((long) availableInflightS3ReadQuotaSupplier.get(), AttributesCache.INSTANCE.defaultAttributes());
+ }
+ });
availableInflightS3WriteQuota = meter.gaugeBuilder(prefix + S3StreamMetricsConstant.AVAILABLE_S3_INFLIGHT_WRITE_QUOTA_METRIC_NAME)
.setDescription("Available inflight S3 write quota")
.ofLongs()
- .buildWithCallback(result -> result.record((long) availableInflightS3WriteQuotaSupplier.get(), AttributesCache.INSTANCE.defaultAttributes()));
+ .buildWithCallback(result -> {
+ if (MetricsLevel.DEBUG.isWithin(metricsLevel)) {
+ result.record((long) availableInflightS3WriteQuotaSupplier.get(), AttributesCache.INSTANCE.defaultAttributes());
+ }
+ });
inflightWALUploadTasksCount = meter.gaugeBuilder(prefix + S3StreamMetricsConstant.INFLIGHT_WAL_UPLOAD_TASKS_COUNT_METRIC_NAME)
.setDescription("Inflight upload WAL tasks count")
.ofLongs()
- .buildWithCallback(result -> result.record((long) inflightWALUploadTasksCountSupplier.get(), AttributesCache.INSTANCE.defaultAttributes()));
+ .buildWithCallback(result -> {
+ if (MetricsLevel.DEBUG.isWithin(metricsLevel)) {
+ result.record((long) inflightWALUploadTasksCountSupplier.get(), AttributesCache.INSTANCE.defaultAttributes());
+ }
+ });
compactionReadSizeInTotal = meter.counterBuilder(prefix + S3StreamMetricsConstant.COMPACTION_READ_SIZE_METRIC_NAME)
.setDescription("Compaction read size")
.setUnit("bytes")
@@ -250,86 +303,120 @@ public static void registerInflightWALUploadTasksCountSupplier(Supplier
S3StreamMetricsManager.inflightWALUploadTasksCountSupplier = inflightWALUploadTasksCountSupplier;
}
- public static void recordS3UploadSize(long value) {
- s3UploadSizeInTotal.add(value, AttributesCache.INSTANCE.defaultAttributes());
+ public static void recordS3UploadSize(MetricsLevel level, long value) {
+ if (level.isWithin(metricsLevel)) {
+ s3UploadSizeInTotal.add(value, AttributesCache.INSTANCE.defaultAttributes());
+ }
}
- public static void recordS3DownloadSize(long value) {
- s3DownloadSizeInTotal.add(value, AttributesCache.INSTANCE.defaultAttributes());
+ public static void recordS3DownloadSize(MetricsLevel level, long value) {
+ if (level.isWithin(metricsLevel)) {
+ s3DownloadSizeInTotal.add(value, AttributesCache.INSTANCE.defaultAttributes());
+ }
}
- public static void recordOperationLatency(long value, S3Operation operation) {
- recordOperationLatency(value, operation, 0, true);
+ public static void recordOperationLatency(MetricsLevel level, long value, S3Operation operation) {
+ recordOperationLatency(level, value, operation, 0, true);
}
- public static void recordOperationLatency(long value, S3Operation operation, boolean isSuccess) {
- recordOperationLatency(value, operation, 0, isSuccess);
+ public static void recordOperationLatency(MetricsLevel level, long value, S3Operation operation, boolean isSuccess) {
+ recordOperationLatency(level, value, operation, 0, isSuccess);
}
- public static void recordOperationLatency(long value, S3Operation operation, long size) {
- recordOperationLatency(value, operation, size, true);
+ public static void recordOperationLatency(MetricsLevel level, long value, S3Operation operation, long size) {
+ recordOperationLatency(level, value, operation, size, true);
}
- public static void recordOperationLatency(long value, S3Operation operation, long size, boolean isSuccess) {
- operationLatency.record(value, AttributesCache.INSTANCE.getAttributes(operation, size, isSuccess));
+ public static void recordOperationLatency(MetricsLevel level, long value, S3Operation operation, long size, boolean isSuccess) {
+ if (level.isWithin(metricsLevel)) {
+ operationLatency.record(value, AttributesCache.INSTANCE.getAttributes(operation, size, isSuccess));
+ }
}
- public static void recordStageLatency(long value, S3Stage stage) {
- operationLatency.record(value, AttributesCache.INSTANCE.getAttributes(stage));
+ public static void recordStageLatency(MetricsLevel level, long value, S3Stage stage) {
+ if (level.isWithin(metricsLevel)) {
+ operationLatency.record(value, AttributesCache.INSTANCE.getAttributes(stage));
+ }
}
- public static void recordReadCacheLatency(long value, S3Operation operation, boolean isCacheHit) {
- operationLatency.record(value, AttributesCache.INSTANCE.getAttributes(operation, isCacheHit ? "hit" : "miss"));
+ public static void recordReadCacheLatency(MetricsLevel level, long value, S3Operation operation, boolean isCacheHit) {
+ if (level.isWithin(metricsLevel)) {
+ operationLatency.record(value, AttributesCache.INSTANCE.getAttributes(operation, isCacheHit ? "hit" : "miss"));
+ }
}
- public static void recordReadAheadLatency(long value, S3Operation operation, boolean isSync) {
- operationLatency.record(value, AttributesCache.INSTANCE.getAttributes(operation, isSync ? "sync" : "async"));
+ public static void recordReadAheadLatency(MetricsLevel level, long value, S3Operation operation, boolean isSync) {
+ if (level.isWithin(metricsLevel)) {
+ operationLatency.record(value, AttributesCache.INSTANCE.getAttributes(operation, isSync ? "sync" : "async"));
+ }
}
- public static void recordObjectNum(long value) {
- objectNumInTotal.add(value, AttributesCache.INSTANCE.defaultAttributes());
+ public static void recordObjectNum(MetricsLevel level, long value) {
+ if (level.isWithin(metricsLevel)) {
+ objectNumInTotal.add(value, AttributesCache.INSTANCE.defaultAttributes());
+ }
}
- public static void recordObjectStageCost(long value, S3ObjectStage stage) {
- objectStageCost.record(value, AttributesCache.INSTANCE.getAttributes(stage));
+ public static void recordObjectStageCost(MetricsLevel level, long value, S3ObjectStage stage) {
+ if (level.isWithin(metricsLevel)) {
+ objectStageCost.record(value, AttributesCache.INSTANCE.getAttributes(stage));
+ }
}
- public static void recordObjectUploadSize(long value) {
- objectUploadSize.record(value, AttributesCache.INSTANCE.defaultAttributes());
+ public static void recordObjectUploadSize(MetricsLevel level, long value) {
+ if (level.isWithin(metricsLevel)) {
+ objectUploadSize.record(value, AttributesCache.INSTANCE.defaultAttributes());
+ }
}
- public static void recordObjectDownloadSize(long value) {
- objectDownloadSize.record(value, AttributesCache.INSTANCE.defaultAttributes());
+ public static void recordObjectDownloadSize(MetricsLevel level, long value) {
+ if (level.isWithin(metricsLevel)) {
+ objectDownloadSize.record(value, AttributesCache.INSTANCE.defaultAttributes());
+ }
}
- public static void recordNetworkInboundUsage(long value) {
- networkInboundUsageInTotal.add(value, AttributesCache.INSTANCE.defaultAttributes());
+ public static void recordNetworkInboundUsage(MetricsLevel level, long value) {
+ if (level.isWithin(metricsLevel)) {
+ networkInboundUsageInTotal.add(value, AttributesCache.INSTANCE.defaultAttributes());
+ }
}
- public static void recordNetworkOutboundUsage(long value) {
- networkOutboundUsageInTotal.add(value, AttributesCache.INSTANCE.defaultAttributes());
+ public static void recordNetworkOutboundUsage(MetricsLevel level, long value) {
+ if (level.isWithin(metricsLevel)) {
+ networkOutboundUsageInTotal.add(value, AttributesCache.INSTANCE.defaultAttributes());
+ }
}
- public static void recordNetworkLimiterQueueTime(long value, AsyncNetworkBandwidthLimiter.Type type) {
- switch (type) {
- case INBOUND -> networkInboundLimiterQueueTime.record(value, AttributesCache.INSTANCE.defaultAttributes());
- case OUTBOUND -> networkOutboundLimiterQueueTime.record(value, AttributesCache.INSTANCE.defaultAttributes());
+ public static void recordNetworkLimiterQueueTime(MetricsLevel level, long value, AsyncNetworkBandwidthLimiter.Type type) {
+ if (level.isWithin(metricsLevel)) {
+ switch (type) {
+ case INBOUND -> networkInboundLimiterQueueTime.record(value, AttributesCache.INSTANCE.defaultAttributes());
+ case OUTBOUND -> networkOutboundLimiterQueueTime.record(value, AttributesCache.INSTANCE.defaultAttributes());
+ }
}
}
- public static void recordAllocateByteBufSize(long value, String source) {
- allocateByteBufSize.record(value, AttributesCache.INSTANCE.getAttributes(source));
+ public static void recordAllocateByteBufSize(MetricsLevel level, long value, String source) {
+ if (level.isWithin(metricsLevel)) {
+ allocateByteBufSize.record(value, AttributesCache.INSTANCE.getAttributes(source));
+ }
}
- public static void recordReadAheadSize(long value) {
- readAheadSize.record(value, AttributesCache.INSTANCE.defaultAttributes());
+ public static void recordReadAheadSize(MetricsLevel level, long value) {
+ if (level.isWithin(metricsLevel)) {
+ readAheadSize.record(value, AttributesCache.INSTANCE.defaultAttributes());
+ }
}
- public static void recordCompactionReadSizeIn(long value) {
- compactionReadSizeInTotal.add(value, AttributesCache.INSTANCE.defaultAttributes());
+ public static void recordCompactionReadSizeIn(MetricsLevel level, long value) {
+ if (level.isWithin(metricsLevel)) {
+ compactionReadSizeInTotal.add(value, AttributesCache.INSTANCE.defaultAttributes());
+ }
}
- public static void recordCompactionWriteSize(long value) {
- compactionWriteSizeInTotal.add(value, AttributesCache.INSTANCE.defaultAttributes());
+ public static void recordCompactionWriteSize(MetricsLevel level, long value) {
+ if (level.isWithin(metricsLevel)) {
+ compactionWriteSizeInTotal.add(value, AttributesCache.INSTANCE.defaultAttributes());
+ }
}
}
diff --git a/s3stream/src/main/java/com/automq/stream/s3/network/AsyncNetworkBandwidthLimiter.java b/s3stream/src/main/java/com/automq/stream/s3/network/AsyncNetworkBandwidthLimiter.java
index 057c30adc..96e6e20fd 100644
--- a/s3stream/src/main/java/com/automq/stream/s3/network/AsyncNetworkBandwidthLimiter.java
+++ b/s3stream/src/main/java/com/automq/stream/s3/network/AsyncNetworkBandwidthLimiter.java
@@ -17,6 +17,7 @@
package com.automq.stream.s3.network;
+import com.automq.stream.s3.metrics.MetricsLevel;
import com.automq.stream.s3.metrics.S3StreamMetricsManager;
import io.netty.util.concurrent.DefaultThreadFactory;
@@ -149,9 +150,9 @@ private CompletableFuture consume(int priority, long size) {
private void logMetrics(long size) {
if (type == Type.INBOUND) {
- S3StreamMetricsManager.recordNetworkInboundUsage(size);
+ S3StreamMetricsManager.recordNetworkInboundUsage(MetricsLevel.INFO, size);
} else {
- S3StreamMetricsManager.recordNetworkOutboundUsage(size);
+ S3StreamMetricsManager.recordNetworkOutboundUsage(MetricsLevel.INFO, size);
}
}
diff --git a/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java b/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java
index 90eb748b5..abe18d40d 100644
--- a/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java
+++ b/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java
@@ -18,6 +18,7 @@
package com.automq.stream.s3.operator;
import com.automq.stream.s3.DirectByteBufAlloc;
+import com.automq.stream.s3.metrics.MetricsLevel;
import com.automq.stream.s3.metrics.S3StreamMetricsManager;
import com.automq.stream.s3.metrics.TimerUtil;
import com.automq.stream.s3.metrics.operations.S3Operation;
@@ -193,7 +194,7 @@ public CompletableFuture rangeRead(String path, long start, long end, T
if (networkInboundBandwidthLimiter != null) {
TimerUtil timerUtil = new TimerUtil();
networkInboundBandwidthLimiter.consume(throttleStrategy, end - start).whenCompleteAsync((v, ex) -> {
- S3StreamMetricsManager.recordNetworkLimiterQueueTime(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), AsyncNetworkBandwidthLimiter.Type.INBOUND);
+ S3StreamMetricsManager.recordNetworkLimiterQueueTime(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS), AsyncNetworkBandwidthLimiter.Type.INBOUND);
if (ex != null) {
cf.completeExceptionally(ex);
} else {
@@ -288,7 +289,7 @@ void mergedRangeRead0(String path, long start, long end, CompletableFuture {
- S3StreamMetricsManager.recordObjectDownloadSize(size);
+ S3StreamMetricsManager.recordObjectDownloadSize(MetricsLevel.INFO, size);
CompositeByteBuf buf = DirectByteBufAlloc.compositeByteBuffer();
responsePublisher.subscribe((bytes) -> {
// the aws client will copy DefaultHttpContent to heap ByteBuffer
@@ -298,13 +299,13 @@ void mergedRangeRead0(String path, long start, long end, CompletableFuture {
- S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.GET_OBJECT, size, false);
+ S3StreamMetricsManager.recordOperationLatency(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.GET_OBJECT, size, false);
if (isUnrecoverable(ex)) {
LOGGER.error("GetObject for object {} [{}, {}) fail", path, start, end, ex);
cf.completeExceptionally(ex);
@@ -326,7 +327,7 @@ public CompletableFuture write(String path, ByteBuf data, ThrottleStrategy
if (networkOutboundBandwidthLimiter != null) {
TimerUtil timerUtil = new TimerUtil();
networkOutboundBandwidthLimiter.consume(throttleStrategy, data.readableBytes()).whenCompleteAsync((v, ex) -> {
- S3StreamMetricsManager.recordNetworkLimiterQueueTime(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), AsyncNetworkBandwidthLimiter.Type.OUTBOUND);
+ S3StreamMetricsManager.recordNetworkLimiterQueueTime(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS), AsyncNetworkBandwidthLimiter.Type.OUTBOUND);
if (ex != null) {
cf.completeExceptionally(ex);
} else {
@@ -345,13 +346,13 @@ private void write0(String path, ByteBuf data, CompletableFuture cf) {
PutObjectRequest request = PutObjectRequest.builder().bucket(bucket).key(path).build();
AsyncRequestBody body = AsyncRequestBody.fromByteBuffersUnsafe(data.nioBuffers());
writeS3Client.putObject(request, body).thenAccept(putObjectResponse -> {
- S3StreamMetricsManager.recordS3UploadSize(objectSize);
- S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.PUT_OBJECT, objectSize);
+ S3StreamMetricsManager.recordS3UploadSize(MetricsLevel.INFO, objectSize);
+ S3StreamMetricsManager.recordOperationLatency(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.PUT_OBJECT, objectSize);
LOGGER.debug("put object {} with size {}, cost {}ms", path, objectSize, timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
cf.complete(null);
data.release();
}).exceptionally(ex -> {
- S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.PUT_OBJECT, objectSize, false);
+ S3StreamMetricsManager.recordOperationLatency(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.PUT_OBJECT, objectSize, false);
if (isUnrecoverable(ex)) {
LOGGER.error("PutObject for object {} fail", path, ex);
cf.completeExceptionally(ex);
@@ -374,10 +375,10 @@ public CompletableFuture delete(String path) {
TimerUtil timerUtil = new TimerUtil();
DeleteObjectRequest request = DeleteObjectRequest.builder().bucket(bucket).key(path).build();
return writeS3Client.deleteObject(request).thenAccept(deleteObjectResponse -> {
- S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.DELETE_OBJECT);
+ S3StreamMetricsManager.recordOperationLatency(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.DELETE_OBJECT);
LOGGER.info("[ControllerS3Operator]: Delete object finished, path: {}, cost: {}", path, timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
}).exceptionally(ex -> {
- S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.DELETE_OBJECT, false);
+ S3StreamMetricsManager.recordOperationLatency(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.DELETE_OBJECT, false);
LOGGER.info("[ControllerS3Operator]: Delete object failed, path: {}, cost: {}, ex: {}", path, timerUtil.elapsedAs(TimeUnit.NANOSECONDS), ex.getMessage());
return null;
});
@@ -397,11 +398,11 @@ public CompletableFuture> delete(List objectKeys) {
.build();
// TODO: handle not exist object, should we regard it as deleted or ignore it.
return this.writeS3Client.deleteObjects(request).thenApply(resp -> {
- S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.DELETE_OBJECTS);
+ S3StreamMetricsManager.recordOperationLatency(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.DELETE_OBJECTS);
LOGGER.info("[ControllerS3Operator]: Delete objects finished, count: {}, cost: {}", resp.deleted().size(), timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
return resp.deleted().stream().map(DeletedObject::key).collect(Collectors.toList());
}).exceptionally(ex -> {
- S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.DELETE_OBJECTS, false);
+ S3StreamMetricsManager.recordOperationLatency(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.DELETE_OBJECTS, false);
LOGGER.info("[ControllerS3Operator]: Delete objects failed, count: {}, cost: {}, ex: {}", objectKeys.size(), timerUtil.elapsedAs(TimeUnit.NANOSECONDS), ex.getMessage());
return Collections.emptyList();
});
@@ -422,10 +423,10 @@ void createMultipartUpload0(String path, CompletableFuture cf) {
TimerUtil timerUtil = new TimerUtil();
CreateMultipartUploadRequest request = CreateMultipartUploadRequest.builder().bucket(bucket).key(path).build();
writeS3Client.createMultipartUpload(request).thenAccept(createMultipartUploadResponse -> {
- S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.CREATE_MULTI_PART_UPLOAD);
+ S3StreamMetricsManager.recordOperationLatency(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.CREATE_MULTI_PART_UPLOAD);
cf.complete(createMultipartUploadResponse.uploadId());
}).exceptionally(ex -> {
- S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.CREATE_MULTI_PART_UPLOAD, false);
+ S3StreamMetricsManager.recordOperationLatency(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.CREATE_MULTI_PART_UPLOAD, false);
if (isUnrecoverable(ex)) {
LOGGER.error("CreateMultipartUpload for object {} fail", path, ex);
cf.completeExceptionally(ex);
@@ -469,12 +470,12 @@ private void uploadPart0(String path, String uploadId, int partNumber, ByteBuf p
.partNumber(partNumber).build();
CompletableFuture uploadPartCf = writeS3Client.uploadPart(request, body);
uploadPartCf.thenAccept(uploadPartResponse -> {
- S3StreamMetricsManager.recordS3UploadSize(size);
- S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.UPLOAD_PART, size);
+ S3StreamMetricsManager.recordS3UploadSize(MetricsLevel.INFO, size);
+ S3StreamMetricsManager.recordOperationLatency(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.UPLOAD_PART, size);
CompletedPart completedPart = CompletedPart.builder().partNumber(partNumber).eTag(uploadPartResponse.eTag()).build();
cf.complete(completedPart);
}).exceptionally(ex -> {
- S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.UPLOAD_PART, size, false);
+ S3StreamMetricsManager.recordOperationLatency(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.UPLOAD_PART, size, false);
if (isUnrecoverable(ex)) {
LOGGER.error("UploadPart for object {}-{} fail", path, partNumber, ex);
cf.completeExceptionally(ex);
@@ -505,12 +506,12 @@ private void uploadPartCopy0(String sourcePath, String path, long start, long en
UploadPartCopyRequest request = UploadPartCopyRequest.builder().sourceBucket(bucket).sourceKey(sourcePath)
.destinationBucket(bucket).destinationKey(path).copySourceRange(range(start, inclusiveEnd)).uploadId(uploadId).partNumber(partNumber).build();
writeS3Client.uploadPartCopy(request).thenAccept(uploadPartCopyResponse -> {
- S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.UPLOAD_PART_COPY);
+ S3StreamMetricsManager.recordOperationLatency(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.UPLOAD_PART_COPY);
CompletedPart completedPart = CompletedPart.builder().partNumber(partNumber)
.eTag(uploadPartCopyResponse.copyPartResult().eTag()).build();
cf.complete(completedPart);
}).exceptionally(ex -> {
- S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.UPLOAD_PART_COPY, false);
+ S3StreamMetricsManager.recordOperationLatency(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.UPLOAD_PART_COPY, false);
if (isUnrecoverable(ex)) {
LOGGER.warn("UploadPartCopy for object {}-{} fail", path, partNumber, ex);
cf.completeExceptionally(ex);
@@ -540,10 +541,10 @@ public void completeMultipartUpload0(String path, String uploadId, List {
- S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.COMPLETE_MULTI_PART_UPLOAD);
+ S3StreamMetricsManager.recordOperationLatency(MetricsLevel.DEBUG, timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.COMPLETE_MULTI_PART_UPLOAD);
cf.complete(null);
}).exceptionally(ex -> {
- S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.COMPLETE_MULTI_PART_UPLOAD, false);
+ S3StreamMetricsManager.recordOperationLatency(MetricsLevel.DEBUG, timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.COMPLETE_MULTI_PART_UPLOAD, false);
if (isUnrecoverable(ex)) {
LOGGER.error("CompleteMultipartUpload for object {} fail", path, ex);
cf.completeExceptionally(ex);
diff --git a/s3stream/src/main/java/com/automq/stream/s3/operator/MultiPartWriter.java b/s3stream/src/main/java/com/automq/stream/s3/operator/MultiPartWriter.java
index b375fd0a6..e9dc3a119 100644
--- a/s3stream/src/main/java/com/automq/stream/s3/operator/MultiPartWriter.java
+++ b/s3stream/src/main/java/com/automq/stream/s3/operator/MultiPartWriter.java
@@ -18,6 +18,7 @@
package com.automq.stream.s3.operator;
import com.automq.stream.s3.DirectByteBufAlloc;
+import com.automq.stream.s3.metrics.MetricsLevel;
import com.automq.stream.s3.metrics.S3StreamMetricsManager;
import com.automq.stream.s3.metrics.TimerUtil;
import com.automq.stream.s3.metrics.operations.S3ObjectStage;
@@ -140,14 +141,14 @@ public CompletableFuture close() {
objectPart = null;
}
- S3StreamMetricsManager.recordObjectStageCost(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3ObjectStage.READY_CLOSE);
+ S3StreamMetricsManager.recordObjectStageCost(MetricsLevel.DEBUG, timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3ObjectStage.READY_CLOSE);
closeCf = new CompletableFuture<>();
CompletableFuture uploadDoneCf = uploadIdCf.thenCompose(uploadId -> CompletableFuture.allOf(parts.toArray(new CompletableFuture[0])));
FutureUtil.propagate(uploadDoneCf.thenCompose(nil -> operator.completeMultipartUpload(path, uploadId, genCompleteParts())), closeCf);
closeCf.whenComplete((nil, ex) -> {
- S3StreamMetricsManager.recordObjectStageCost(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3ObjectStage.TOTAL);
- S3StreamMetricsManager.recordObjectNum(1);
- S3StreamMetricsManager.recordObjectUploadSize(totalWriteSize.get());
+ S3StreamMetricsManager.recordObjectStageCost(MetricsLevel.DEBUG, timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3ObjectStage.TOTAL);
+ S3StreamMetricsManager.recordObjectNum(MetricsLevel.DEBUG, 1);
+ S3StreamMetricsManager.recordObjectUploadSize(MetricsLevel.DEBUG, totalWriteSize.get());
});
return closeCf;
}
@@ -225,7 +226,7 @@ private void upload0() {
TimerUtil timerUtil = new TimerUtil();
FutureUtil.propagate(uploadIdCf.thenCompose(uploadId -> operator.uploadPart(path, uploadId, partNumber, partBuf, throttleStrategy)), partCf);
partCf.whenComplete((nil, ex) -> {
- S3StreamMetricsManager.recordObjectStageCost(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3ObjectStage.UPLOAD_PART);
+ S3StreamMetricsManager.recordObjectStageCost(MetricsLevel.DEBUG, timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3ObjectStage.UPLOAD_PART);
});
}
diff --git a/s3stream/src/main/java/com/automq/stream/s3/wal/BlockImpl.java b/s3stream/src/main/java/com/automq/stream/s3/wal/BlockImpl.java
index 6082e3b7e..1f96fd62e 100644
--- a/s3stream/src/main/java/com/automq/stream/s3/wal/BlockImpl.java
+++ b/s3stream/src/main/java/com/automq/stream/s3/wal/BlockImpl.java
@@ -18,6 +18,7 @@
package com.automq.stream.s3.wal;
import com.automq.stream.s3.DirectByteBufAlloc;
+import com.automq.stream.s3.metrics.MetricsLevel;
import com.automq.stream.s3.metrics.S3StreamMetricsManager;
import com.automq.stream.s3.metrics.TimerUtil;
import com.automq.stream.s3.metrics.operations.S3Stage;
@@ -112,10 +113,10 @@ public ByteBuf data() {
data = DirectByteBufAlloc.compositeByteBuffer();
for (Supplier supplier : records) {
ByteBuf record = supplier.get();
- S3StreamMetricsManager.recordAllocateByteBufSize(record.readableBytes(), "wal_record");
+ S3StreamMetricsManager.recordAllocateByteBufSize(MetricsLevel.DEBUG, record.readableBytes(), "wal_record");
data.addComponent(true, record);
}
- S3StreamMetricsManager.recordAllocateByteBufSize(data.readableBytes(), "wal_block");
+ S3StreamMetricsManager.recordAllocateByteBufSize(MetricsLevel.DEBUG, data.readableBytes(), "wal_block");
return data;
}
@@ -126,6 +127,6 @@ public long size() {
@Override
public void polled() {
- S3StreamMetricsManager.recordStageLatency(timer.elapsedAs(TimeUnit.NANOSECONDS), S3Stage.APPEND_WAL_BLOCK_POLLED);
+ S3StreamMetricsManager.recordStageLatency(MetricsLevel.DEBUG, timer.elapsedAs(TimeUnit.NANOSECONDS), S3Stage.APPEND_WAL_BLOCK_POLLED);
}
}
diff --git a/s3stream/src/main/java/com/automq/stream/s3/wal/BlockWALService.java b/s3stream/src/main/java/com/automq/stream/s3/wal/BlockWALService.java
index 6e2ccfbdb..5c856f98a 100644
--- a/s3stream/src/main/java/com/automq/stream/s3/wal/BlockWALService.java
+++ b/s3stream/src/main/java/com/automq/stream/s3/wal/BlockWALService.java
@@ -19,6 +19,7 @@
import com.automq.stream.s3.Config;
import com.automq.stream.s3.DirectByteBufAlloc;
+import com.automq.stream.s3.metrics.MetricsLevel;
import com.automq.stream.s3.metrics.S3StreamMetricsManager;
import com.automq.stream.s3.metrics.TimerUtil;
import com.automq.stream.s3.metrics.operations.S3Operation;
@@ -392,7 +393,7 @@ public AppendResult append(TraceContext context, ByteBuf buf, int crc) throws Ov
return result;
} catch (OverCapacityException ex) {
buf.release();
- S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.APPEND_STORAGE_WAL_FULL);
+ S3StreamMetricsManager.recordOperationLatency(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.APPEND_STORAGE_WAL_FULL);
TraceUtils.endSpan(scope, ex);
throw ex;
}
@@ -424,8 +425,8 @@ private AppendResult append0(ByteBuf body, int crc) throws OverCapacityException
slidingWindowService.tryWriteBlock();
final AppendResult appendResult = new AppendResultImpl(expectedWriteOffset, appendResultFuture);
- appendResult.future().whenComplete((nil, ex) -> S3StreamMetricsManager.recordStageLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Stage.APPEND_WAL_COMPLETE));
- S3StreamMetricsManager.recordStageLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Stage.APPEND_WAL_BEFORE);
+ appendResult.future().whenComplete((nil, ex) -> S3StreamMetricsManager.recordStageLatency(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Stage.APPEND_WAL_COMPLETE));
+ S3StreamMetricsManager.recordStageLatency(MetricsLevel.DEBUG, timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Stage.APPEND_WAL_BEFORE);
return appendResult;
}
diff --git a/s3stream/src/main/java/com/automq/stream/s3/wal/SlidingWindowService.java b/s3stream/src/main/java/com/automq/stream/s3/wal/SlidingWindowService.java
index 9590e04a0..f7c798e6d 100644
--- a/s3stream/src/main/java/com/automq/stream/s3/wal/SlidingWindowService.java
+++ b/s3stream/src/main/java/com/automq/stream/s3/wal/SlidingWindowService.java
@@ -18,6 +18,7 @@
package com.automq.stream.s3.wal;
import com.automq.stream.s3.DirectByteBufAlloc;
+import com.automq.stream.s3.metrics.MetricsLevel;
import com.automq.stream.s3.metrics.S3StreamMetricsManager;
import com.automq.stream.s3.metrics.TimerUtil;
import com.automq.stream.s3.metrics.operations.S3Stage;
@@ -343,7 +344,7 @@ private void writeBlockData(BlockBatch blocks) throws IOException {
walChannel.write(block.data(), position);
}
walChannel.flush();
- S3StreamMetricsManager.recordStageLatency(timer.elapsedAs(TimeUnit.NANOSECONDS), S3Stage.APPEND_WAL_WRITE);
+ S3StreamMetricsManager.recordStageLatency(MetricsLevel.DEBUG, timer.elapsedAs(TimeUnit.NANOSECONDS), S3Stage.APPEND_WAL_WRITE);
}
private void makeWriteOffsetMatchWindow(long newWindowEndOffset) throws IOException, OverCapacityException {
@@ -532,7 +533,7 @@ public WriteBlockProcessor(BlockBatch blocks) {
@Override
public void run() {
- S3StreamMetricsManager.recordStageLatency(timer.elapsedAs(TimeUnit.NANOSECONDS), S3Stage.APPEND_WAL_AWAIT);
+ S3StreamMetricsManager.recordStageLatency(MetricsLevel.DEBUG, timer.elapsedAs(TimeUnit.NANOSECONDS), S3Stage.APPEND_WAL_AWAIT);
writeBlock(this.blocks);
}
@@ -556,7 +557,7 @@ public String toString() {
return "CallbackResult{" + "flushedOffset=" + flushedOffset() + '}';
}
});
- S3StreamMetricsManager.recordStageLatency(timer.elapsedAs(TimeUnit.NANOSECONDS), S3Stage.APPEND_WAL_AFTER);
+ S3StreamMetricsManager.recordStageLatency(MetricsLevel.DEBUG, timer.elapsedAs(TimeUnit.NANOSECONDS), S3Stage.APPEND_WAL_AFTER);
} catch (Exception e) {
FutureUtil.completeExceptionally(blocks.futures(), e);
LOGGER.error(String.format("failed to write blocks, startOffset: %s", blocks.startOffset()), e);