From 764827cec484f85c08e749ccc6fa8e02fb67a680 Mon Sep 17 00:00:00 2001 From: Shichao Nie Date: Tue, 2 Jan 2024 15:20:04 +0800 Subject: [PATCH] feat(s3stream): support record metrics by level for s3stream Signed-off-by: Shichao Nie --- pom.xml | 2 +- s3stream/pom.xml | 2 +- .../automq/stream/s3/DirectByteBufAlloc.java | 3 +- .../java/com/automq/stream/s3/S3Storage.java | 22 +- .../java/com/automq/stream/s3/S3Stream.java | 11 +- .../com/automq/stream/s3/S3StreamClient.java | 5 +- .../stream/s3/cache/DefaultS3BlockCache.java | 3 +- .../com/automq/stream/s3/cache/LogCache.java | 5 +- .../stream/s3/cache/ReadAheadAgent.java | 3 +- .../automq/stream/s3/cache/StreamReader.java | 5 +- .../s3/compact/operator/DataBlockReader.java | 3 +- .../s3/compact/operator/DataBlockWriter.java | 3 +- .../stream/s3/metrics/MetricsLevel.java | 27 +++ .../s3/metrics/S3StreamMetricsManager.java | 195 +++++++++++++----- .../network/AsyncNetworkBandwidthLimiter.java | 5 +- .../stream/s3/operator/DefaultS3Operator.java | 45 ++-- .../stream/s3/operator/MultiPartWriter.java | 11 +- .../com/automq/stream/s3/wal/BlockImpl.java | 7 +- .../automq/stream/s3/wal/BlockWALService.java | 7 +- .../stream/s3/wal/SlidingWindowService.java | 7 +- 20 files changed, 251 insertions(+), 120 deletions(-) create mode 100644 s3stream/src/main/java/com/automq/stream/s3/metrics/MetricsLevel.java diff --git a/pom.xml b/pom.xml index e98dbd606..de88d9316 100644 --- a/pom.xml +++ b/pom.xml @@ -53,7 +53,7 @@ 32.1.3-jre 2.0.9 2.2 - 0.11.0-SNAPSHOT + 0.12.0-SNAPSHOT 23.5.26 diff --git a/s3stream/pom.xml b/s3stream/pom.xml index a3e59df50..49b7c9e54 100644 --- a/s3stream/pom.xml +++ b/s3stream/pom.xml @@ -22,7 +22,7 @@ 4.0.0 com.automq.elasticstream s3stream - 0.11.0-SNAPSHOT + 0.12.0-SNAPSHOT 5.5.0 5.10.0 diff --git a/s3stream/src/main/java/com/automq/stream/s3/DirectByteBufAlloc.java b/s3stream/src/main/java/com/automq/stream/s3/DirectByteBufAlloc.java index 647126057..44af1eff2 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/DirectByteBufAlloc.java +++ b/s3stream/src/main/java/com/automq/stream/s3/DirectByteBufAlloc.java @@ -17,6 +17,7 @@ package com.automq.stream.s3; +import com.automq.stream.s3.metrics.MetricsLevel; import com.automq.stream.s3.metrics.S3StreamMetricsManager; import com.automq.stream.utils.Threads; import io.netty.buffer.ByteBuf; @@ -45,7 +46,7 @@ public static ByteBuf byteBuffer(int initCapacity) { public static ByteBuf byteBuffer(int initCapacity, String name) { try { if (name != null) { - S3StreamMetricsManager.recordAllocateByteBufSize(initCapacity, name); + S3StreamMetricsManager.recordAllocateByteBufSize(MetricsLevel.DEBUG, initCapacity, name); } return ALLOC.directBuffer(initCapacity); } catch (OutOfMemoryError e) { diff --git a/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java b/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java index 34097635f..dd32878c0 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java +++ b/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java @@ -25,6 +25,7 @@ import com.automq.stream.s3.context.AppendContext; import com.automq.stream.s3.context.FetchContext; import com.automq.stream.s3.metadata.StreamMetadata; +import com.automq.stream.s3.metrics.MetricsLevel; import com.automq.stream.s3.metrics.S3StreamMetricsManager; import com.automq.stream.s3.metrics.TimerUtil; import com.automq.stream.s3.metrics.operations.S3Operation; @@ -274,7 +275,7 @@ public CompletableFuture append(AppendContext context, StreamRecordBatch s append0(context, writeRequest, false); cf.whenComplete((nil, ex) -> { streamRecord.release(); - S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.APPEND_STORAGE); + S3StreamMetricsManager.recordOperationLatency(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.APPEND_STORAGE); }); return cf; } @@ -294,7 +295,7 @@ public boolean append0(AppendContext context, WalWriteRequest request, boolean f if (!fromBackoff) { backoffRecords.offer(request); } - S3StreamMetricsManager.recordOperationLatency(0L, S3Operation.APPEND_STORAGE_LOG_CACHE_FULL); + S3StreamMetricsManager.recordOperationLatency(MetricsLevel.INFO, 0L, S3Operation.APPEND_STORAGE_LOG_CACHE_FULL); if (System.currentTimeMillis() - lastLogTimestamp > 1000L) { LOGGER.warn("[BACKOFF] log cache size {} is larger than {}", deltaWALCache.size(), maxDeltaWALCacheSize); lastLogTimestamp = System.currentTimeMillis(); @@ -370,7 +371,7 @@ public CompletableFuture read(FetchContext context, TimerUtil timerUtil = new TimerUtil(); CompletableFuture cf = new CompletableFuture<>(); FutureUtil.propagate(read0(context, streamId, startOffset, endOffset, maxBytes), cf); - cf.whenComplete((nil, ex) -> S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.READ_STORAGE)); + cf.whenComplete((nil, ex) -> S3StreamMetricsManager.recordOperationLatency(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.READ_STORAGE)); return cf; } @@ -448,7 +449,7 @@ public CompletableFuture forceUpload(long streamId) { CompletableFuture cf = new CompletableFuture<>(); // Wait for a while to group force upload tasks. forceUploadTicker.tick().whenComplete((nil, ex) -> { - S3StreamMetricsManager.recordStageLatency(timer.elapsedAs(TimeUnit.NANOSECONDS), S3Stage.FORCE_UPLOAD_WAL_AWAIT); + S3StreamMetricsManager.recordStageLatency(MetricsLevel.DEBUG, timer.elapsedAs(TimeUnit.NANOSECONDS), S3Stage.FORCE_UPLOAD_WAL_AWAIT); uploadDeltaWAL(streamId, true); // Wait for all tasks contains streamId complete. List> tasksContainsStream = this.inflightWALUploadTasks.stream() @@ -460,7 +461,7 @@ public CompletableFuture forceUpload(long streamId) { callbackSequencer.tryFree(streamId); } }); - cf.whenComplete((nil, ex) -> S3StreamMetricsManager.recordStageLatency(timer.elapsedAs(TimeUnit.NANOSECONDS), S3Stage.FORCE_UPLOAD_WAL_COMPLETE)); + cf.whenComplete((nil, ex) -> S3StreamMetricsManager.recordStageLatency(MetricsLevel.DEBUG, timer.elapsedAs(TimeUnit.NANOSECONDS), S3Stage.FORCE_UPLOAD_WAL_COMPLETE)); return cf; } @@ -492,7 +493,7 @@ private void handleAppendCallback0(WalWriteRequest request) { for (WalWriteRequest waitingAckRequest : waitingAckRequests) { waitingAckRequest.cf.complete(null); } - S3StreamMetricsManager.recordOperationLatency(timer.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.APPEND_STORAGE_APPEND_CALLBACK); + S3StreamMetricsManager.recordOperationLatency(MetricsLevel.DEBUG, timer.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.APPEND_STORAGE_APPEND_CALLBACK); } private Lock getStreamCallbackLock(long streamId) { @@ -537,7 +538,7 @@ CompletableFuture uploadDeltaWAL(DeltaWALUploadTaskContext context) { inflightWALUploadTasks.add(context); backgroundExecutor.execute(() -> FutureUtil.exec(() -> uploadDeltaWAL0(context), cf, LOGGER, "uploadDeltaWAL")); cf.whenComplete((nil, ex) -> { - S3StreamMetricsManager.recordStageLatency(context.timer.elapsedAs(TimeUnit.NANOSECONDS), S3Stage.UPLOAD_WAL_COMPLETE); + S3StreamMetricsManager.recordStageLatency(MetricsLevel.INFO, context.timer.elapsedAs(TimeUnit.NANOSECONDS), S3Stage.UPLOAD_WAL_COMPLETE); inflightWALUploadTasks.remove(context); if (ex != null) { LOGGER.error("upload delta WAL fail", ex); @@ -579,10 +580,11 @@ private void uploadDeltaWAL0(DeltaWALUploadTaskContext context) { private void prepareDeltaWALUpload(DeltaWALUploadTaskContext context) { context.task.prepare().thenAcceptAsync(nil -> { - S3StreamMetricsManager.recordStageLatency(context.timer.elapsedAs(TimeUnit.NANOSECONDS), S3Stage.UPLOAD_WAL_PREPARE); + S3StreamMetricsManager.recordStageLatency(MetricsLevel.DEBUG, context.timer.elapsedAs(TimeUnit.NANOSECONDS), S3Stage.UPLOAD_WAL_PREPARE); // 1. poll out current task and trigger upload. DeltaWALUploadTaskContext peek = walPrepareQueue.poll(); - Objects.requireNonNull(peek).task.upload().thenAccept(nil2 -> S3StreamMetricsManager.recordStageLatency(context.timer.elapsedAs(TimeUnit.NANOSECONDS), S3Stage.UPLOAD_WAL_UPLOAD)); + Objects.requireNonNull(peek).task.upload().thenAccept(nil2 -> S3StreamMetricsManager.recordStageLatency( + MetricsLevel.DEBUG, context.timer.elapsedAs(TimeUnit.NANOSECONDS), S3Stage.UPLOAD_WAL_UPLOAD)); // 2. add task to commit queue. boolean walObjectCommitQueueEmpty = walCommitQueue.isEmpty(); walCommitQueue.add(peek); @@ -599,7 +601,7 @@ private void prepareDeltaWALUpload(DeltaWALUploadTaskContext context) { private void commitDeltaWALUpload(DeltaWALUploadTaskContext context) { context.task.commit().thenAcceptAsync(nil -> { - S3StreamMetricsManager.recordStageLatency(context.timer.elapsedAs(TimeUnit.NANOSECONDS), S3Stage.UPLOAD_WAL_COMMIT); + S3StreamMetricsManager.recordStageLatency(MetricsLevel.DEBUG, context.timer.elapsedAs(TimeUnit.NANOSECONDS), S3Stage.UPLOAD_WAL_COMMIT); // 1. poll out current task walCommitQueue.poll(); if (context.cache.confirmOffset() != 0) { diff --git a/s3stream/src/main/java/com/automq/stream/s3/S3Stream.java b/s3stream/src/main/java/com/automq/stream/s3/S3Stream.java index 4bf4ae6b5..1ad1e03bb 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/S3Stream.java +++ b/s3stream/src/main/java/com/automq/stream/s3/S3Stream.java @@ -30,6 +30,7 @@ import com.automq.stream.s3.cache.CacheAccessType; import com.automq.stream.s3.context.AppendContext; import com.automq.stream.s3.context.FetchContext; +import com.automq.stream.s3.metrics.MetricsLevel; import com.automq.stream.s3.metrics.S3StreamMetricsManager; import com.automq.stream.s3.metrics.TimerUtil; import com.automq.stream.s3.metrics.operations.S3Operation; @@ -146,7 +147,7 @@ public long nextOffset() { public CompletableFuture append(AppendContext context, RecordBatch recordBatch) { TimerUtil timerUtil = new TimerUtil(); writeLock.lock(); - S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.APPEND_STREAM_WRITE_LOCK); + S3StreamMetricsManager.recordOperationLatency(MetricsLevel.DEBUG, timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.APPEND_STREAM_WRITE_LOCK); try { CompletableFuture cf = exec(() -> { if (networkInboundLimiter != null) { @@ -156,7 +157,7 @@ public CompletableFuture append(AppendContext context, RecordBatch }, LOGGER, "append"); pendingAppends.add(cf); cf.whenComplete((nil, ex) -> { - S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.APPEND_STREAM); + S3StreamMetricsManager.recordOperationLatency(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.APPEND_STREAM); pendingAppends.remove(cf); }); return cf; @@ -198,12 +199,12 @@ public CompletableFuture fetch(FetchContext context, @SpanAttribute int maxBytes) { TimerUtil timerUtil = new TimerUtil(); readLock.lock(); - S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.FETCH_STREAM_READ_LOCK); + S3StreamMetricsManager.recordOperationLatency(MetricsLevel.DEBUG, timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.FETCH_STREAM_READ_LOCK); try { CompletableFuture cf = exec(() -> fetch0(context, startOffset, endOffset, maxBytes), LOGGER, "fetch"); pendingFetches.add(cf); cf.whenComplete((rs, ex) -> { - S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.FETCH_STREAM); + S3StreamMetricsManager.recordOperationLatency(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.FETCH_STREAM); if (ex != null) { Throwable cause = FutureUtil.cause(ex); if (!(cause instanceof FastReadFailFastException)) { @@ -266,7 +267,7 @@ public CompletableFuture trim(long newStartOffset) { lastPendingTrim.whenComplete((nil, ex) -> propagate(trim0(newStartOffset), cf)); this.lastPendingTrim = cf; cf.whenComplete((nil, ex) -> { - S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.TRIM_STREAM); + S3StreamMetricsManager.recordOperationLatency(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.TRIM_STREAM); }); return cf; }, LOGGER, "trim"); diff --git a/s3stream/src/main/java/com/automq/stream/s3/S3StreamClient.java b/s3stream/src/main/java/com/automq/stream/s3/S3StreamClient.java index e550b4b42..629a87c38 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/S3StreamClient.java +++ b/s3stream/src/main/java/com/automq/stream/s3/S3StreamClient.java @@ -21,6 +21,7 @@ import com.automq.stream.api.OpenStreamOptions; import com.automq.stream.api.Stream; import com.automq.stream.api.StreamClient; +import com.automq.stream.s3.metrics.MetricsLevel; import com.automq.stream.s3.metrics.S3StreamMetricsManager; import com.automq.stream.s3.metrics.TimerUtil; import com.automq.stream.s3.metrics.operations.S3Operation; @@ -83,7 +84,7 @@ public S3StreamClient(StreamManager streamManager, Storage storage, ObjectManage public CompletableFuture createAndOpenStream(CreateStreamOptions options) { TimerUtil timerUtil = new TimerUtil(); return FutureUtil.exec(() -> streamManager.createStream().thenCompose(streamId -> { - S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.CREATE_STREAM); + S3StreamMetricsManager.recordOperationLatency(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.CREATE_STREAM); return openStream0(streamId, options.epoch()); }), LOGGER, "createAndOpenStream"); } @@ -138,7 +139,7 @@ private CompletableFuture openStream0(long streamId, long epoch) { TimerUtil timerUtil = new TimerUtil(); return streamManager.openStream(streamId, epoch). thenApply(metadata -> { - S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.OPEN_STREAM); + S3StreamMetricsManager.recordOperationLatency(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.OPEN_STREAM); StreamObjectsCompactionTask.Builder builder = new StreamObjectsCompactionTask.Builder(objectManager, s3Operator) .compactedStreamObjectMaxSizeInBytes(config.streamObjectCompactionMaxSizeBytes()) .eligibleStreamObjectLivingTimeInMs(config.streamObjectCompactionLivingTimeMinutes() * 60L * 1000) diff --git a/s3stream/src/main/java/com/automq/stream/s3/cache/DefaultS3BlockCache.java b/s3stream/src/main/java/com/automq/stream/s3/cache/DefaultS3BlockCache.java index 1d198862c..dff3e1466 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/cache/DefaultS3BlockCache.java +++ b/s3stream/src/main/java/com/automq/stream/s3/cache/DefaultS3BlockCache.java @@ -18,6 +18,7 @@ package com.automq.stream.s3.cache; import com.automq.stream.s3.Config; +import com.automq.stream.s3.metrics.MetricsLevel; import com.automq.stream.s3.metrics.S3StreamMetricsManager; import com.automq.stream.s3.metrics.TimerUtil; import com.automq.stream.s3.metrics.operations.S3Operation; @@ -112,7 +113,7 @@ public CompletableFuture read(TraceContext traceContext, long timeElapsed = timerUtil.elapsedAs(TimeUnit.NANOSECONDS); boolean isCacheHit = ret.getCacheAccessType() == CacheAccessType.BLOCK_CACHE_HIT; Span.fromContext(finalTraceContext.currentContext()).setAttribute("cache_hit", isCacheHit); - S3StreamMetricsManager.recordReadCacheLatency(timeElapsed, S3Operation.READ_STORAGE_BLOCK_CACHE, isCacheHit); + S3StreamMetricsManager.recordReadCacheLatency(MetricsLevel.INFO, timeElapsed, S3Operation.READ_STORAGE_BLOCK_CACHE, isCacheHit); if (LOGGER.isDebugEnabled()) { LOGGER.debug("[S3BlockCache] read data complete, cache hit: {}, stream={}, {}-{}, total bytes: {}", ret.getCacheAccessType() == CacheAccessType.BLOCK_CACHE_HIT, streamId, startOffset, endOffset, totalReturnedSize); diff --git a/s3stream/src/main/java/com/automq/stream/s3/cache/LogCache.java b/s3stream/src/main/java/com/automq/stream/s3/cache/LogCache.java index d52aba729..1854391f3 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/cache/LogCache.java +++ b/s3stream/src/main/java/com/automq/stream/s3/cache/LogCache.java @@ -17,6 +17,7 @@ package com.automq.stream.s3.cache; +import com.automq.stream.s3.metrics.MetricsLevel; import com.automq.stream.s3.metrics.S3StreamMetricsManager; import com.automq.stream.s3.metrics.TimerUtil; import com.automq.stream.s3.metrics.operations.S3Operation; @@ -99,7 +100,7 @@ public boolean put(StreamRecordBatch recordBatch) { } finally { readLock.unlock(); } - S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.APPEND_STORAGE_LOG_CACHE); + S3StreamMetricsManager.recordOperationLatency(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.APPEND_STORAGE_LOG_CACHE); return full; } @@ -150,7 +151,7 @@ public List get(TraceContext context, long timeElapsed = timerUtil.elapsedAs(TimeUnit.NANOSECONDS); boolean isCacheHit = !records.isEmpty() && records.get(0).getBaseOffset() <= startOffset; - S3StreamMetricsManager.recordReadCacheLatency(timeElapsed, S3Operation.READ_STORAGE_LOG_CACHE, isCacheHit); + S3StreamMetricsManager.recordReadCacheLatency(MetricsLevel.INFO, timeElapsed, S3Operation.READ_STORAGE_LOG_CACHE, isCacheHit); return records; } diff --git a/s3stream/src/main/java/com/automq/stream/s3/cache/ReadAheadAgent.java b/s3stream/src/main/java/com/automq/stream/s3/cache/ReadAheadAgent.java index d6b10448e..88be764e7 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/cache/ReadAheadAgent.java +++ b/s3stream/src/main/java/com/automq/stream/s3/cache/ReadAheadAgent.java @@ -17,6 +17,7 @@ package com.automq.stream.s3.cache; +import com.automq.stream.s3.metrics.MetricsLevel; import com.automq.stream.s3.metrics.S3StreamMetricsManager; import com.automq.stream.s3.metrics.TimerUtil; import com.automq.stream.utils.LogContext; @@ -99,7 +100,7 @@ public void updateReadAheadResult(long readAheadEndOffset, int readAheadSize) { lock.lock(); this.readAheadEndOffset = readAheadEndOffset; this.lastReadAheadSize = readAheadSize; - S3StreamMetricsManager.recordReadAheadSize(readAheadSize); + S3StreamMetricsManager.recordReadAheadSize(MetricsLevel.DEBUG, readAheadSize); if (logger.isDebugEnabled()) { logger.debug("update read ahead offset {}, size: {}, lastReadOffset: {}", readAheadEndOffset, readAheadSize, lastReadOffset); } diff --git a/s3stream/src/main/java/com/automq/stream/s3/cache/StreamReader.java b/s3stream/src/main/java/com/automq/stream/s3/cache/StreamReader.java index 8981fdc83..4ec9b4105 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/cache/StreamReader.java +++ b/s3stream/src/main/java/com/automq/stream/s3/cache/StreamReader.java @@ -20,6 +20,7 @@ import com.automq.stream.s3.ObjectReader; import com.automq.stream.s3.StreamDataBlock; import com.automq.stream.s3.metadata.S3ObjectMetadata; +import com.automq.stream.s3.metrics.MetricsLevel; import com.automq.stream.s3.metrics.S3StreamMetricsManager; import com.automq.stream.s3.metrics.TimerUtil; import com.automq.stream.s3.metrics.operations.S3Operation; @@ -136,7 +137,7 @@ public CompletableFuture> syncReadAhead(TraceContext tra completeInflightTask0(key, ex); } context.taskKeySet.clear(); - S3StreamMetricsManager.recordReadAheadLatency(timer.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.BLOCK_CACHE_READ_AHEAD, true); + S3StreamMetricsManager.recordReadAheadLatency(MetricsLevel.INFO, timer.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.BLOCK_CACHE_READ_AHEAD, true); }); } @@ -298,7 +299,7 @@ public void asyncReadAhead(long streamId, long startOffset, long endOffset, int completeInflightTask0(key, ex); } context.taskKeySet.clear(); - S3StreamMetricsManager.recordReadAheadLatency(timer.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.BLOCK_CACHE_READ_AHEAD, false); + S3StreamMetricsManager.recordReadAheadLatency(MetricsLevel.INFO, timer.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.BLOCK_CACHE_READ_AHEAD, false); }); } diff --git a/s3stream/src/main/java/com/automq/stream/s3/compact/operator/DataBlockReader.java b/s3stream/src/main/java/com/automq/stream/s3/compact/operator/DataBlockReader.java index 90407f245..b595ca1f8 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/compact/operator/DataBlockReader.java +++ b/s3stream/src/main/java/com/automq/stream/s3/compact/operator/DataBlockReader.java @@ -20,6 +20,7 @@ import com.automq.stream.s3.DirectByteBufAlloc; import com.automq.stream.s3.StreamDataBlock; import com.automq.stream.s3.metadata.S3ObjectMetadata; +import com.automq.stream.s3.metrics.MetricsLevel; import com.automq.stream.s3.metrics.S3StreamMetricsManager; import com.automq.stream.s3.network.ThrottleStrategy; import com.automq.stream.s3.operator.S3Operator; @@ -188,7 +189,7 @@ private void readContinuousBlocks0(List streamDataBlocks) { private CompletableFuture rangeRead(long start, long end) { return rangeRead0(start, end).whenComplete((ret, ex) -> - S3StreamMetricsManager.recordCompactionReadSizeIn(ret.readableBytes())); + S3StreamMetricsManager.recordCompactionReadSizeIn(MetricsLevel.INFO, ret.readableBytes())); } private CompletableFuture rangeRead0(long start, long end) { diff --git a/s3stream/src/main/java/com/automq/stream/s3/compact/operator/DataBlockWriter.java b/s3stream/src/main/java/com/automq/stream/s3/compact/operator/DataBlockWriter.java index 72a010e13..a4c230648 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/compact/operator/DataBlockWriter.java +++ b/s3stream/src/main/java/com/automq/stream/s3/compact/operator/DataBlockWriter.java @@ -20,6 +20,7 @@ import com.automq.stream.s3.DirectByteBufAlloc; import com.automq.stream.s3.StreamDataBlock; import com.automq.stream.s3.metadata.ObjectUtils; +import com.automq.stream.s3.metrics.MetricsLevel; import com.automq.stream.s3.metrics.S3StreamMetricsManager; import com.automq.stream.s3.network.ThrottleStrategy; import com.automq.stream.s3.operator.S3Operator; @@ -63,7 +64,7 @@ public long getObjectId() { public void write(StreamDataBlock dataBlock) { CompletableFuture cf = new CompletableFuture<>(); - cf.whenComplete((nil, ex) -> S3StreamMetricsManager.recordCompactionWriteSize(dataBlock.getBlockSize())); + cf.whenComplete((nil, ex) -> S3StreamMetricsManager.recordCompactionWriteSize(MetricsLevel.INFO, dataBlock.getBlockSize())); waitingUploadBlockCfs.put(dataBlock, cf); waitingUploadBlocks.add(dataBlock); long waitingUploadSize = waitingUploadBlocks.stream().mapToLong(StreamDataBlock::getBlockSize).sum(); diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/MetricsLevel.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/MetricsLevel.java new file mode 100644 index 000000000..a7085cba6 --- /dev/null +++ b/s3stream/src/main/java/com/automq/stream/s3/metrics/MetricsLevel.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.automq.stream.s3.metrics; + +public enum MetricsLevel { + INFO, + DEBUG; + + public boolean isWithin(MetricsLevel level) { + return this.ordinal() <= level.ordinal(); + } +} diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsManager.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsManager.java index 3cf44471e..f2477f9ab 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsManager.java +++ b/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsManager.java @@ -69,6 +69,7 @@ public class S3StreamMetricsManager { private static Supplier availableInflightS3ReadQuotaSupplier = () -> 0; private static Supplier availableInflightS3WriteQuotaSupplier = () -> 0; private static Supplier inflightWALUploadTasksCountSupplier = () -> 0; + private static MetricsLevel metricsLevel = MetricsLevel.INFO; public static void initAttributesBuilder(Supplier attributesBuilderSupplier) { AttributesCache.INSTANCE.setDefaultAttributes(attributesBuilderSupplier.get().build()); @@ -78,6 +79,10 @@ public static void initMetrics(Meter meter) { initMetrics(meter, ""); } + public static void setMetricsLevel(MetricsLevel level) { + metricsLevel = level; + } + public static void initMetrics(Meter meter, String prefix) { s3DownloadSizeInTotal = meter.counterBuilder(prefix + S3StreamMetricsConstant.DOWNLOAD_SIZE_METRIC_NAME) .setDescription("S3 download size") @@ -124,20 +129,36 @@ public static void initMetrics(Meter meter, String prefix) { .setDescription("Network inbound available bandwidth") .setUnit("bytes") .ofLongs() - .buildWithCallback(result -> result.record(networkInboundAvailableBandwidthSupplier.get(), AttributesCache.INSTANCE.defaultAttributes())); + .buildWithCallback(result -> { + if (MetricsLevel.INFO.isWithin(metricsLevel)) { + result.record(networkInboundAvailableBandwidthSupplier.get(), AttributesCache.INSTANCE.defaultAttributes()); + } + }); networkOutboundAvailableBandwidth = meter.gaugeBuilder(prefix + S3StreamMetricsConstant.NETWORK_OUTBOUND_AVAILABLE_BANDWIDTH_METRIC_NAME) .setDescription("Network outbound available bandwidth") .setUnit("bytes") .ofLongs() - .buildWithCallback(result -> result.record(networkOutboundAvailableBandwidthSupplier.get(), AttributesCache.INSTANCE.defaultAttributes())); + .buildWithCallback(result -> { + if (MetricsLevel.INFO.isWithin(metricsLevel)) { + result.record(networkOutboundAvailableBandwidthSupplier.get(), AttributesCache.INSTANCE.defaultAttributes()); + } + }); networkInboundLimiterQueueSize = meter.gaugeBuilder(prefix + S3StreamMetricsConstant.NETWORK_INBOUND_LIMITER_QUEUE_SIZE_METRIC_NAME) .setDescription("Network inbound limiter queue size") .ofLongs() - .buildWithCallback(result -> result.record((long) networkInboundLimiterQueueSizeSupplier.get(), AttributesCache.INSTANCE.defaultAttributes())); + .buildWithCallback(result -> { + if (MetricsLevel.DEBUG.isWithin(metricsLevel)) { + result.record((long) networkInboundLimiterQueueSizeSupplier.get(), AttributesCache.INSTANCE.defaultAttributes()); + } + }); networkOutboundLimiterQueueSize = meter.gaugeBuilder(prefix + S3StreamMetricsConstant.NETWORK_OUTBOUND_LIMITER_QUEUE_SIZE_METRIC_NAME) .setDescription("Network outbound limiter queue size") .ofLongs() - .buildWithCallback(result -> result.record((long) networkOutboundLimiterQueueSizeSupplier.get(), AttributesCache.INSTANCE.defaultAttributes())); + .buildWithCallback(result -> { + if (MetricsLevel.DEBUG.isWithin(metricsLevel)) { + result.record((long) networkOutboundLimiterQueueSizeSupplier.get(), AttributesCache.INSTANCE.defaultAttributes()); + } + }); networkInboundLimiterQueueTime = meter.histogramBuilder(prefix + S3StreamMetricsConstant.NETWORK_INBOUND_LIMITER_QUEUE_TIME_METRIC_NAME) .setDescription("Network inbound limiter queue time") .setUnit("nanoseconds") @@ -163,38 +184,70 @@ public static void initMetrics(Meter meter, String prefix) { deltaWalStartOffset = meter.gaugeBuilder(prefix + S3StreamMetricsConstant.WAL_START_OFFSET) .setDescription("Delta WAL start offset") .ofLongs() - .buildWithCallback(result -> result.record(deltaWalStartOffsetSupplier.get(), AttributesCache.INSTANCE.defaultAttributes())); + .buildWithCallback(result -> { + if (MetricsLevel.DEBUG.isWithin(metricsLevel)) { + result.record(deltaWalStartOffsetSupplier.get(), AttributesCache.INSTANCE.defaultAttributes()); + } + }); deltaWalTrimmedOffset = meter.gaugeBuilder(prefix + S3StreamMetricsConstant.WAL_TRIMMED_OFFSET) .setDescription("Delta WAL trimmed offset") .ofLongs() - .buildWithCallback(result -> result.record(deltaWalTrimmedOffsetSupplier.get(), AttributesCache.INSTANCE.defaultAttributes())); + .buildWithCallback(result -> { + if (MetricsLevel.DEBUG.isWithin(metricsLevel)) { + result.record(deltaWalTrimmedOffsetSupplier.get(), AttributesCache.INSTANCE.defaultAttributes()); + } + }); deltaWalCacheSize = meter.gaugeBuilder(prefix + S3StreamMetricsConstant.DELTA_WAL_CACHE_SIZE) .setDescription("Delta WAL cache size") .setUnit("bytes") .ofLongs() - .buildWithCallback(result -> result.record(deltaWALCacheSizeSupplier.get(), AttributesCache.INSTANCE.defaultAttributes())); + .buildWithCallback(result -> { + if (MetricsLevel.DEBUG.isWithin(metricsLevel)) { + result.record(deltaWALCacheSizeSupplier.get(), AttributesCache.INSTANCE.defaultAttributes()); + } + }); blockCacheSize = meter.gaugeBuilder(prefix + S3StreamMetricsConstant.BLOCK_CACHE_SIZE) .setDescription("Block cache size") .setUnit("bytes") .ofLongs() - .buildWithCallback(result -> result.record(blockCacheSizeSupplier.get(), AttributesCache.INSTANCE.defaultAttributes())); + .buildWithCallback(result -> { + if (MetricsLevel.DEBUG.isWithin(metricsLevel)) { + result.record(blockCacheSizeSupplier.get(), AttributesCache.INSTANCE.defaultAttributes()); + } + }); availableInflightReadAheadSize = meter.gaugeBuilder(prefix + S3StreamMetricsConstant.AVAILABLE_INFLIGHT_READ_AHEAD_SIZE_METRIC_NAME) .setDescription("Available inflight read ahead size") .setUnit("bytes") .ofLongs() - .buildWithCallback(result -> result.record((long) availableInflightReadAheadSizeSupplier.get(), AttributesCache.INSTANCE.defaultAttributes())); + .buildWithCallback(result -> { + if (MetricsLevel.DEBUG.isWithin(metricsLevel)) { + result.record((long) availableInflightReadAheadSizeSupplier.get(), AttributesCache.INSTANCE.defaultAttributes()); + } + }); availableInflightS3ReadQuota = meter.gaugeBuilder(prefix + S3StreamMetricsConstant.AVAILABLE_S3_INFLIGHT_READ_QUOTA_METRIC_NAME) .setDescription("Available inflight S3 read quota") .ofLongs() - .buildWithCallback(result -> result.record((long) availableInflightS3ReadQuotaSupplier.get(), AttributesCache.INSTANCE.defaultAttributes())); + .buildWithCallback(result -> { + if (MetricsLevel.DEBUG.isWithin(metricsLevel)) { + result.record((long) availableInflightS3ReadQuotaSupplier.get(), AttributesCache.INSTANCE.defaultAttributes()); + } + }); availableInflightS3WriteQuota = meter.gaugeBuilder(prefix + S3StreamMetricsConstant.AVAILABLE_S3_INFLIGHT_WRITE_QUOTA_METRIC_NAME) .setDescription("Available inflight S3 write quota") .ofLongs() - .buildWithCallback(result -> result.record((long) availableInflightS3WriteQuotaSupplier.get(), AttributesCache.INSTANCE.defaultAttributes())); + .buildWithCallback(result -> { + if (MetricsLevel.DEBUG.isWithin(metricsLevel)) { + result.record((long) availableInflightS3WriteQuotaSupplier.get(), AttributesCache.INSTANCE.defaultAttributes()); + } + }); inflightWALUploadTasksCount = meter.gaugeBuilder(prefix + S3StreamMetricsConstant.INFLIGHT_WAL_UPLOAD_TASKS_COUNT_METRIC_NAME) .setDescription("Inflight upload WAL tasks count") .ofLongs() - .buildWithCallback(result -> result.record((long) inflightWALUploadTasksCountSupplier.get(), AttributesCache.INSTANCE.defaultAttributes())); + .buildWithCallback(result -> { + if (MetricsLevel.DEBUG.isWithin(metricsLevel)) { + result.record((long) inflightWALUploadTasksCountSupplier.get(), AttributesCache.INSTANCE.defaultAttributes()); + } + }); compactionReadSizeInTotal = meter.counterBuilder(prefix + S3StreamMetricsConstant.COMPACTION_READ_SIZE_METRIC_NAME) .setDescription("Compaction read size") .setUnit("bytes") @@ -250,86 +303,120 @@ public static void registerInflightWALUploadTasksCountSupplier(Supplier S3StreamMetricsManager.inflightWALUploadTasksCountSupplier = inflightWALUploadTasksCountSupplier; } - public static void recordS3UploadSize(long value) { - s3UploadSizeInTotal.add(value, AttributesCache.INSTANCE.defaultAttributes()); + public static void recordS3UploadSize(MetricsLevel level, long value) { + if (level.isWithin(metricsLevel)) { + s3UploadSizeInTotal.add(value, AttributesCache.INSTANCE.defaultAttributes()); + } } - public static void recordS3DownloadSize(long value) { - s3DownloadSizeInTotal.add(value, AttributesCache.INSTANCE.defaultAttributes()); + public static void recordS3DownloadSize(MetricsLevel level, long value) { + if (level.isWithin(metricsLevel)) { + s3DownloadSizeInTotal.add(value, AttributesCache.INSTANCE.defaultAttributes()); + } } - public static void recordOperationLatency(long value, S3Operation operation) { - recordOperationLatency(value, operation, 0, true); + public static void recordOperationLatency(MetricsLevel level, long value, S3Operation operation) { + recordOperationLatency(level, value, operation, 0, true); } - public static void recordOperationLatency(long value, S3Operation operation, boolean isSuccess) { - recordOperationLatency(value, operation, 0, isSuccess); + public static void recordOperationLatency(MetricsLevel level, long value, S3Operation operation, boolean isSuccess) { + recordOperationLatency(level, value, operation, 0, isSuccess); } - public static void recordOperationLatency(long value, S3Operation operation, long size) { - recordOperationLatency(value, operation, size, true); + public static void recordOperationLatency(MetricsLevel level, long value, S3Operation operation, long size) { + recordOperationLatency(level, value, operation, size, true); } - public static void recordOperationLatency(long value, S3Operation operation, long size, boolean isSuccess) { - operationLatency.record(value, AttributesCache.INSTANCE.getAttributes(operation, size, isSuccess)); + public static void recordOperationLatency(MetricsLevel level, long value, S3Operation operation, long size, boolean isSuccess) { + if (level.isWithin(metricsLevel)) { + operationLatency.record(value, AttributesCache.INSTANCE.getAttributes(operation, size, isSuccess)); + } } - public static void recordStageLatency(long value, S3Stage stage) { - operationLatency.record(value, AttributesCache.INSTANCE.getAttributes(stage)); + public static void recordStageLatency(MetricsLevel level, long value, S3Stage stage) { + if (level.isWithin(metricsLevel)) { + operationLatency.record(value, AttributesCache.INSTANCE.getAttributes(stage)); + } } - public static void recordReadCacheLatency(long value, S3Operation operation, boolean isCacheHit) { - operationLatency.record(value, AttributesCache.INSTANCE.getAttributes(operation, isCacheHit ? "hit" : "miss")); + public static void recordReadCacheLatency(MetricsLevel level, long value, S3Operation operation, boolean isCacheHit) { + if (level.isWithin(metricsLevel)) { + operationLatency.record(value, AttributesCache.INSTANCE.getAttributes(operation, isCacheHit ? "hit" : "miss")); + } } - public static void recordReadAheadLatency(long value, S3Operation operation, boolean isSync) { - operationLatency.record(value, AttributesCache.INSTANCE.getAttributes(operation, isSync ? "sync" : "async")); + public static void recordReadAheadLatency(MetricsLevel level, long value, S3Operation operation, boolean isSync) { + if (level.isWithin(metricsLevel)) { + operationLatency.record(value, AttributesCache.INSTANCE.getAttributes(operation, isSync ? "sync" : "async")); + } } - public static void recordObjectNum(long value) { - objectNumInTotal.add(value, AttributesCache.INSTANCE.defaultAttributes()); + public static void recordObjectNum(MetricsLevel level, long value) { + if (level.isWithin(metricsLevel)) { + objectNumInTotal.add(value, AttributesCache.INSTANCE.defaultAttributes()); + } } - public static void recordObjectStageCost(long value, S3ObjectStage stage) { - objectStageCost.record(value, AttributesCache.INSTANCE.getAttributes(stage)); + public static void recordObjectStageCost(MetricsLevel level, long value, S3ObjectStage stage) { + if (level.isWithin(metricsLevel)) { + objectStageCost.record(value, AttributesCache.INSTANCE.getAttributes(stage)); + } } - public static void recordObjectUploadSize(long value) { - objectUploadSize.record(value, AttributesCache.INSTANCE.defaultAttributes()); + public static void recordObjectUploadSize(MetricsLevel level, long value) { + if (level.isWithin(metricsLevel)) { + objectUploadSize.record(value, AttributesCache.INSTANCE.defaultAttributes()); + } } - public static void recordObjectDownloadSize(long value) { - objectDownloadSize.record(value, AttributesCache.INSTANCE.defaultAttributes()); + public static void recordObjectDownloadSize(MetricsLevel level, long value) { + if (level.isWithin(metricsLevel)) { + objectDownloadSize.record(value, AttributesCache.INSTANCE.defaultAttributes()); + } } - public static void recordNetworkInboundUsage(long value) { - networkInboundUsageInTotal.add(value, AttributesCache.INSTANCE.defaultAttributes()); + public static void recordNetworkInboundUsage(MetricsLevel level, long value) { + if (level.isWithin(metricsLevel)) { + networkInboundUsageInTotal.add(value, AttributesCache.INSTANCE.defaultAttributes()); + } } - public static void recordNetworkOutboundUsage(long value) { - networkOutboundUsageInTotal.add(value, AttributesCache.INSTANCE.defaultAttributes()); + public static void recordNetworkOutboundUsage(MetricsLevel level, long value) { + if (level.isWithin(metricsLevel)) { + networkOutboundUsageInTotal.add(value, AttributesCache.INSTANCE.defaultAttributes()); + } } - public static void recordNetworkLimiterQueueTime(long value, AsyncNetworkBandwidthLimiter.Type type) { - switch (type) { - case INBOUND -> networkInboundLimiterQueueTime.record(value, AttributesCache.INSTANCE.defaultAttributes()); - case OUTBOUND -> networkOutboundLimiterQueueTime.record(value, AttributesCache.INSTANCE.defaultAttributes()); + public static void recordNetworkLimiterQueueTime(MetricsLevel level, long value, AsyncNetworkBandwidthLimiter.Type type) { + if (level.isWithin(metricsLevel)) { + switch (type) { + case INBOUND -> networkInboundLimiterQueueTime.record(value, AttributesCache.INSTANCE.defaultAttributes()); + case OUTBOUND -> networkOutboundLimiterQueueTime.record(value, AttributesCache.INSTANCE.defaultAttributes()); + } } } - public static void recordAllocateByteBufSize(long value, String source) { - allocateByteBufSize.record(value, AttributesCache.INSTANCE.getAttributes(source)); + public static void recordAllocateByteBufSize(MetricsLevel level, long value, String source) { + if (level.isWithin(metricsLevel)) { + allocateByteBufSize.record(value, AttributesCache.INSTANCE.getAttributes(source)); + } } - public static void recordReadAheadSize(long value) { - readAheadSize.record(value, AttributesCache.INSTANCE.defaultAttributes()); + public static void recordReadAheadSize(MetricsLevel level, long value) { + if (level.isWithin(metricsLevel)) { + readAheadSize.record(value, AttributesCache.INSTANCE.defaultAttributes()); + } } - public static void recordCompactionReadSizeIn(long value) { - compactionReadSizeInTotal.add(value, AttributesCache.INSTANCE.defaultAttributes()); + public static void recordCompactionReadSizeIn(MetricsLevel level, long value) { + if (level.isWithin(metricsLevel)) { + compactionReadSizeInTotal.add(value, AttributesCache.INSTANCE.defaultAttributes()); + } } - public static void recordCompactionWriteSize(long value) { - compactionWriteSizeInTotal.add(value, AttributesCache.INSTANCE.defaultAttributes()); + public static void recordCompactionWriteSize(MetricsLevel level, long value) { + if (level.isWithin(metricsLevel)) { + compactionWriteSizeInTotal.add(value, AttributesCache.INSTANCE.defaultAttributes()); + } } } diff --git a/s3stream/src/main/java/com/automq/stream/s3/network/AsyncNetworkBandwidthLimiter.java b/s3stream/src/main/java/com/automq/stream/s3/network/AsyncNetworkBandwidthLimiter.java index 057c30adc..96e6e20fd 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/network/AsyncNetworkBandwidthLimiter.java +++ b/s3stream/src/main/java/com/automq/stream/s3/network/AsyncNetworkBandwidthLimiter.java @@ -17,6 +17,7 @@ package com.automq.stream.s3.network; +import com.automq.stream.s3.metrics.MetricsLevel; import com.automq.stream.s3.metrics.S3StreamMetricsManager; import io.netty.util.concurrent.DefaultThreadFactory; @@ -149,9 +150,9 @@ private CompletableFuture consume(int priority, long size) { private void logMetrics(long size) { if (type == Type.INBOUND) { - S3StreamMetricsManager.recordNetworkInboundUsage(size); + S3StreamMetricsManager.recordNetworkInboundUsage(MetricsLevel.INFO, size); } else { - S3StreamMetricsManager.recordNetworkOutboundUsage(size); + S3StreamMetricsManager.recordNetworkOutboundUsage(MetricsLevel.INFO, size); } } diff --git a/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java b/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java index 90eb748b5..abe18d40d 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java +++ b/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java @@ -18,6 +18,7 @@ package com.automq.stream.s3.operator; import com.automq.stream.s3.DirectByteBufAlloc; +import com.automq.stream.s3.metrics.MetricsLevel; import com.automq.stream.s3.metrics.S3StreamMetricsManager; import com.automq.stream.s3.metrics.TimerUtil; import com.automq.stream.s3.metrics.operations.S3Operation; @@ -193,7 +194,7 @@ public CompletableFuture rangeRead(String path, long start, long end, T if (networkInboundBandwidthLimiter != null) { TimerUtil timerUtil = new TimerUtil(); networkInboundBandwidthLimiter.consume(throttleStrategy, end - start).whenCompleteAsync((v, ex) -> { - S3StreamMetricsManager.recordNetworkLimiterQueueTime(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), AsyncNetworkBandwidthLimiter.Type.INBOUND); + S3StreamMetricsManager.recordNetworkLimiterQueueTime(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS), AsyncNetworkBandwidthLimiter.Type.INBOUND); if (ex != null) { cf.completeExceptionally(ex); } else { @@ -288,7 +289,7 @@ void mergedRangeRead0(String path, long start, long end, CompletableFuture { - S3StreamMetricsManager.recordObjectDownloadSize(size); + S3StreamMetricsManager.recordObjectDownloadSize(MetricsLevel.INFO, size); CompositeByteBuf buf = DirectByteBufAlloc.compositeByteBuffer(); responsePublisher.subscribe((bytes) -> { // the aws client will copy DefaultHttpContent to heap ByteBuffer @@ -298,13 +299,13 @@ void mergedRangeRead0(String path, long start, long end, CompletableFuture { - S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.GET_OBJECT, size, false); + S3StreamMetricsManager.recordOperationLatency(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.GET_OBJECT, size, false); if (isUnrecoverable(ex)) { LOGGER.error("GetObject for object {} [{}, {}) fail", path, start, end, ex); cf.completeExceptionally(ex); @@ -326,7 +327,7 @@ public CompletableFuture write(String path, ByteBuf data, ThrottleStrategy if (networkOutboundBandwidthLimiter != null) { TimerUtil timerUtil = new TimerUtil(); networkOutboundBandwidthLimiter.consume(throttleStrategy, data.readableBytes()).whenCompleteAsync((v, ex) -> { - S3StreamMetricsManager.recordNetworkLimiterQueueTime(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), AsyncNetworkBandwidthLimiter.Type.OUTBOUND); + S3StreamMetricsManager.recordNetworkLimiterQueueTime(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS), AsyncNetworkBandwidthLimiter.Type.OUTBOUND); if (ex != null) { cf.completeExceptionally(ex); } else { @@ -345,13 +346,13 @@ private void write0(String path, ByteBuf data, CompletableFuture cf) { PutObjectRequest request = PutObjectRequest.builder().bucket(bucket).key(path).build(); AsyncRequestBody body = AsyncRequestBody.fromByteBuffersUnsafe(data.nioBuffers()); writeS3Client.putObject(request, body).thenAccept(putObjectResponse -> { - S3StreamMetricsManager.recordS3UploadSize(objectSize); - S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.PUT_OBJECT, objectSize); + S3StreamMetricsManager.recordS3UploadSize(MetricsLevel.INFO, objectSize); + S3StreamMetricsManager.recordOperationLatency(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.PUT_OBJECT, objectSize); LOGGER.debug("put object {} with size {}, cost {}ms", path, objectSize, timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); cf.complete(null); data.release(); }).exceptionally(ex -> { - S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.PUT_OBJECT, objectSize, false); + S3StreamMetricsManager.recordOperationLatency(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.PUT_OBJECT, objectSize, false); if (isUnrecoverable(ex)) { LOGGER.error("PutObject for object {} fail", path, ex); cf.completeExceptionally(ex); @@ -374,10 +375,10 @@ public CompletableFuture delete(String path) { TimerUtil timerUtil = new TimerUtil(); DeleteObjectRequest request = DeleteObjectRequest.builder().bucket(bucket).key(path).build(); return writeS3Client.deleteObject(request).thenAccept(deleteObjectResponse -> { - S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.DELETE_OBJECT); + S3StreamMetricsManager.recordOperationLatency(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.DELETE_OBJECT); LOGGER.info("[ControllerS3Operator]: Delete object finished, path: {}, cost: {}", path, timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); }).exceptionally(ex -> { - S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.DELETE_OBJECT, false); + S3StreamMetricsManager.recordOperationLatency(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.DELETE_OBJECT, false); LOGGER.info("[ControllerS3Operator]: Delete object failed, path: {}, cost: {}, ex: {}", path, timerUtil.elapsedAs(TimeUnit.NANOSECONDS), ex.getMessage()); return null; }); @@ -397,11 +398,11 @@ public CompletableFuture> delete(List objectKeys) { .build(); // TODO: handle not exist object, should we regard it as deleted or ignore it. return this.writeS3Client.deleteObjects(request).thenApply(resp -> { - S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.DELETE_OBJECTS); + S3StreamMetricsManager.recordOperationLatency(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.DELETE_OBJECTS); LOGGER.info("[ControllerS3Operator]: Delete objects finished, count: {}, cost: {}", resp.deleted().size(), timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); return resp.deleted().stream().map(DeletedObject::key).collect(Collectors.toList()); }).exceptionally(ex -> { - S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.DELETE_OBJECTS, false); + S3StreamMetricsManager.recordOperationLatency(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.DELETE_OBJECTS, false); LOGGER.info("[ControllerS3Operator]: Delete objects failed, count: {}, cost: {}, ex: {}", objectKeys.size(), timerUtil.elapsedAs(TimeUnit.NANOSECONDS), ex.getMessage()); return Collections.emptyList(); }); @@ -422,10 +423,10 @@ void createMultipartUpload0(String path, CompletableFuture cf) { TimerUtil timerUtil = new TimerUtil(); CreateMultipartUploadRequest request = CreateMultipartUploadRequest.builder().bucket(bucket).key(path).build(); writeS3Client.createMultipartUpload(request).thenAccept(createMultipartUploadResponse -> { - S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.CREATE_MULTI_PART_UPLOAD); + S3StreamMetricsManager.recordOperationLatency(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.CREATE_MULTI_PART_UPLOAD); cf.complete(createMultipartUploadResponse.uploadId()); }).exceptionally(ex -> { - S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.CREATE_MULTI_PART_UPLOAD, false); + S3StreamMetricsManager.recordOperationLatency(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.CREATE_MULTI_PART_UPLOAD, false); if (isUnrecoverable(ex)) { LOGGER.error("CreateMultipartUpload for object {} fail", path, ex); cf.completeExceptionally(ex); @@ -469,12 +470,12 @@ private void uploadPart0(String path, String uploadId, int partNumber, ByteBuf p .partNumber(partNumber).build(); CompletableFuture uploadPartCf = writeS3Client.uploadPart(request, body); uploadPartCf.thenAccept(uploadPartResponse -> { - S3StreamMetricsManager.recordS3UploadSize(size); - S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.UPLOAD_PART, size); + S3StreamMetricsManager.recordS3UploadSize(MetricsLevel.INFO, size); + S3StreamMetricsManager.recordOperationLatency(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.UPLOAD_PART, size); CompletedPart completedPart = CompletedPart.builder().partNumber(partNumber).eTag(uploadPartResponse.eTag()).build(); cf.complete(completedPart); }).exceptionally(ex -> { - S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.UPLOAD_PART, size, false); + S3StreamMetricsManager.recordOperationLatency(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.UPLOAD_PART, size, false); if (isUnrecoverable(ex)) { LOGGER.error("UploadPart for object {}-{} fail", path, partNumber, ex); cf.completeExceptionally(ex); @@ -505,12 +506,12 @@ private void uploadPartCopy0(String sourcePath, String path, long start, long en UploadPartCopyRequest request = UploadPartCopyRequest.builder().sourceBucket(bucket).sourceKey(sourcePath) .destinationBucket(bucket).destinationKey(path).copySourceRange(range(start, inclusiveEnd)).uploadId(uploadId).partNumber(partNumber).build(); writeS3Client.uploadPartCopy(request).thenAccept(uploadPartCopyResponse -> { - S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.UPLOAD_PART_COPY); + S3StreamMetricsManager.recordOperationLatency(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.UPLOAD_PART_COPY); CompletedPart completedPart = CompletedPart.builder().partNumber(partNumber) .eTag(uploadPartCopyResponse.copyPartResult().eTag()).build(); cf.complete(completedPart); }).exceptionally(ex -> { - S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.UPLOAD_PART_COPY, false); + S3StreamMetricsManager.recordOperationLatency(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.UPLOAD_PART_COPY, false); if (isUnrecoverable(ex)) { LOGGER.warn("UploadPartCopy for object {}-{} fail", path, partNumber, ex); cf.completeExceptionally(ex); @@ -540,10 +541,10 @@ public void completeMultipartUpload0(String path, String uploadId, List { - S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.COMPLETE_MULTI_PART_UPLOAD); + S3StreamMetricsManager.recordOperationLatency(MetricsLevel.DEBUG, timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.COMPLETE_MULTI_PART_UPLOAD); cf.complete(null); }).exceptionally(ex -> { - S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.COMPLETE_MULTI_PART_UPLOAD, false); + S3StreamMetricsManager.recordOperationLatency(MetricsLevel.DEBUG, timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.COMPLETE_MULTI_PART_UPLOAD, false); if (isUnrecoverable(ex)) { LOGGER.error("CompleteMultipartUpload for object {} fail", path, ex); cf.completeExceptionally(ex); diff --git a/s3stream/src/main/java/com/automq/stream/s3/operator/MultiPartWriter.java b/s3stream/src/main/java/com/automq/stream/s3/operator/MultiPartWriter.java index b375fd0a6..e9dc3a119 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/operator/MultiPartWriter.java +++ b/s3stream/src/main/java/com/automq/stream/s3/operator/MultiPartWriter.java @@ -18,6 +18,7 @@ package com.automq.stream.s3.operator; import com.automq.stream.s3.DirectByteBufAlloc; +import com.automq.stream.s3.metrics.MetricsLevel; import com.automq.stream.s3.metrics.S3StreamMetricsManager; import com.automq.stream.s3.metrics.TimerUtil; import com.automq.stream.s3.metrics.operations.S3ObjectStage; @@ -140,14 +141,14 @@ public CompletableFuture close() { objectPart = null; } - S3StreamMetricsManager.recordObjectStageCost(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3ObjectStage.READY_CLOSE); + S3StreamMetricsManager.recordObjectStageCost(MetricsLevel.DEBUG, timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3ObjectStage.READY_CLOSE); closeCf = new CompletableFuture<>(); CompletableFuture uploadDoneCf = uploadIdCf.thenCompose(uploadId -> CompletableFuture.allOf(parts.toArray(new CompletableFuture[0]))); FutureUtil.propagate(uploadDoneCf.thenCompose(nil -> operator.completeMultipartUpload(path, uploadId, genCompleteParts())), closeCf); closeCf.whenComplete((nil, ex) -> { - S3StreamMetricsManager.recordObjectStageCost(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3ObjectStage.TOTAL); - S3StreamMetricsManager.recordObjectNum(1); - S3StreamMetricsManager.recordObjectUploadSize(totalWriteSize.get()); + S3StreamMetricsManager.recordObjectStageCost(MetricsLevel.DEBUG, timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3ObjectStage.TOTAL); + S3StreamMetricsManager.recordObjectNum(MetricsLevel.DEBUG, 1); + S3StreamMetricsManager.recordObjectUploadSize(MetricsLevel.DEBUG, totalWriteSize.get()); }); return closeCf; } @@ -225,7 +226,7 @@ private void upload0() { TimerUtil timerUtil = new TimerUtil(); FutureUtil.propagate(uploadIdCf.thenCompose(uploadId -> operator.uploadPart(path, uploadId, partNumber, partBuf, throttleStrategy)), partCf); partCf.whenComplete((nil, ex) -> { - S3StreamMetricsManager.recordObjectStageCost(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3ObjectStage.UPLOAD_PART); + S3StreamMetricsManager.recordObjectStageCost(MetricsLevel.DEBUG, timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3ObjectStage.UPLOAD_PART); }); } diff --git a/s3stream/src/main/java/com/automq/stream/s3/wal/BlockImpl.java b/s3stream/src/main/java/com/automq/stream/s3/wal/BlockImpl.java index 6082e3b7e..1f96fd62e 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/wal/BlockImpl.java +++ b/s3stream/src/main/java/com/automq/stream/s3/wal/BlockImpl.java @@ -18,6 +18,7 @@ package com.automq.stream.s3.wal; import com.automq.stream.s3.DirectByteBufAlloc; +import com.automq.stream.s3.metrics.MetricsLevel; import com.automq.stream.s3.metrics.S3StreamMetricsManager; import com.automq.stream.s3.metrics.TimerUtil; import com.automq.stream.s3.metrics.operations.S3Stage; @@ -112,10 +113,10 @@ public ByteBuf data() { data = DirectByteBufAlloc.compositeByteBuffer(); for (Supplier supplier : records) { ByteBuf record = supplier.get(); - S3StreamMetricsManager.recordAllocateByteBufSize(record.readableBytes(), "wal_record"); + S3StreamMetricsManager.recordAllocateByteBufSize(MetricsLevel.DEBUG, record.readableBytes(), "wal_record"); data.addComponent(true, record); } - S3StreamMetricsManager.recordAllocateByteBufSize(data.readableBytes(), "wal_block"); + S3StreamMetricsManager.recordAllocateByteBufSize(MetricsLevel.DEBUG, data.readableBytes(), "wal_block"); return data; } @@ -126,6 +127,6 @@ public long size() { @Override public void polled() { - S3StreamMetricsManager.recordStageLatency(timer.elapsedAs(TimeUnit.NANOSECONDS), S3Stage.APPEND_WAL_BLOCK_POLLED); + S3StreamMetricsManager.recordStageLatency(MetricsLevel.DEBUG, timer.elapsedAs(TimeUnit.NANOSECONDS), S3Stage.APPEND_WAL_BLOCK_POLLED); } } diff --git a/s3stream/src/main/java/com/automq/stream/s3/wal/BlockWALService.java b/s3stream/src/main/java/com/automq/stream/s3/wal/BlockWALService.java index 6e2ccfbdb..5c856f98a 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/wal/BlockWALService.java +++ b/s3stream/src/main/java/com/automq/stream/s3/wal/BlockWALService.java @@ -19,6 +19,7 @@ import com.automq.stream.s3.Config; import com.automq.stream.s3.DirectByteBufAlloc; +import com.automq.stream.s3.metrics.MetricsLevel; import com.automq.stream.s3.metrics.S3StreamMetricsManager; import com.automq.stream.s3.metrics.TimerUtil; import com.automq.stream.s3.metrics.operations.S3Operation; @@ -392,7 +393,7 @@ public AppendResult append(TraceContext context, ByteBuf buf, int crc) throws Ov return result; } catch (OverCapacityException ex) { buf.release(); - S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.APPEND_STORAGE_WAL_FULL); + S3StreamMetricsManager.recordOperationLatency(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.APPEND_STORAGE_WAL_FULL); TraceUtils.endSpan(scope, ex); throw ex; } @@ -424,8 +425,8 @@ private AppendResult append0(ByteBuf body, int crc) throws OverCapacityException slidingWindowService.tryWriteBlock(); final AppendResult appendResult = new AppendResultImpl(expectedWriteOffset, appendResultFuture); - appendResult.future().whenComplete((nil, ex) -> S3StreamMetricsManager.recordStageLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Stage.APPEND_WAL_COMPLETE)); - S3StreamMetricsManager.recordStageLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Stage.APPEND_WAL_BEFORE); + appendResult.future().whenComplete((nil, ex) -> S3StreamMetricsManager.recordStageLatency(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Stage.APPEND_WAL_COMPLETE)); + S3StreamMetricsManager.recordStageLatency(MetricsLevel.DEBUG, timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Stage.APPEND_WAL_BEFORE); return appendResult; } diff --git a/s3stream/src/main/java/com/automq/stream/s3/wal/SlidingWindowService.java b/s3stream/src/main/java/com/automq/stream/s3/wal/SlidingWindowService.java index 9590e04a0..f7c798e6d 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/wal/SlidingWindowService.java +++ b/s3stream/src/main/java/com/automq/stream/s3/wal/SlidingWindowService.java @@ -18,6 +18,7 @@ package com.automq.stream.s3.wal; import com.automq.stream.s3.DirectByteBufAlloc; +import com.automq.stream.s3.metrics.MetricsLevel; import com.automq.stream.s3.metrics.S3StreamMetricsManager; import com.automq.stream.s3.metrics.TimerUtil; import com.automq.stream.s3.metrics.operations.S3Stage; @@ -343,7 +344,7 @@ private void writeBlockData(BlockBatch blocks) throws IOException { walChannel.write(block.data(), position); } walChannel.flush(); - S3StreamMetricsManager.recordStageLatency(timer.elapsedAs(TimeUnit.NANOSECONDS), S3Stage.APPEND_WAL_WRITE); + S3StreamMetricsManager.recordStageLatency(MetricsLevel.DEBUG, timer.elapsedAs(TimeUnit.NANOSECONDS), S3Stage.APPEND_WAL_WRITE); } private void makeWriteOffsetMatchWindow(long newWindowEndOffset) throws IOException, OverCapacityException { @@ -532,7 +533,7 @@ public WriteBlockProcessor(BlockBatch blocks) { @Override public void run() { - S3StreamMetricsManager.recordStageLatency(timer.elapsedAs(TimeUnit.NANOSECONDS), S3Stage.APPEND_WAL_AWAIT); + S3StreamMetricsManager.recordStageLatency(MetricsLevel.DEBUG, timer.elapsedAs(TimeUnit.NANOSECONDS), S3Stage.APPEND_WAL_AWAIT); writeBlock(this.blocks); } @@ -556,7 +557,7 @@ public String toString() { return "CallbackResult{" + "flushedOffset=" + flushedOffset() + '}'; } }); - S3StreamMetricsManager.recordStageLatency(timer.elapsedAs(TimeUnit.NANOSECONDS), S3Stage.APPEND_WAL_AFTER); + S3StreamMetricsManager.recordStageLatency(MetricsLevel.DEBUG, timer.elapsedAs(TimeUnit.NANOSECONDS), S3Stage.APPEND_WAL_AFTER); } catch (Exception e) { FutureUtil.completeExceptionally(blocks.futures(), e); LOGGER.error(String.format("failed to write blocks, startOffset: %s", blocks.startOffset()), e);