diff --git a/pom.xml b/pom.xml
index a1b7a318c..7d1d1c137 100644
--- a/pom.xml
+++ b/pom.xml
@@ -53,7 +53,7 @@
32.0.1-jre
2.0.9
2.2
- 0.1.20-SNAPSHOT
+ 0.1.21-SNAPSHOT
23.5.26
diff --git a/s3stream/pom.xml b/s3stream/pom.xml
index 18dab23d8..cb4242be1 100644
--- a/s3stream/pom.xml
+++ b/s3stream/pom.xml
@@ -22,7 +22,7 @@
4.0.0
com.automq.elasticstream
s3stream
- 0.1.20-SNAPSHOT
+ 0.1.21-SNAPSHOT
5.5.0
5.10.0
diff --git a/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java b/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java
index 8e9c6669a..bd8063ce9 100644
--- a/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java
+++ b/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java
@@ -227,8 +227,7 @@ public CompletableFuture append(StreamRecordBatch streamRecord) {
append0(writeRequest, false);
cf.whenComplete((nil, ex) -> {
streamRecord.release();
- OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.APPEND_STORAGE).operationCount.inc();
- OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.APPEND_STORAGE).operationTime.update(timerUtil.elapsed());
+ OperationMetricsStats.getHistogram(S3Operation.APPEND_STORAGE).update(timerUtil.elapsed());
});
return cf;
}
@@ -247,7 +246,7 @@ public boolean append0(WalWriteRequest request, boolean fromBackoff) {
if (!fromBackoff) {
backoffRecords.offer(request);
}
- OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.APPEND_STORAGE_LOG_CACHE_FULL).operationCount.inc();
+ OperationMetricsStats.getCounter(S3Operation.APPEND_STORAGE_LOG_CACHE_FULL).inc();
if (System.currentTimeMillis() - lastLogTimestamp > 1000L) {
LOGGER.warn("[BACKOFF] log cache size {} is larger than {}", logCache.size(), maxWALCacheSize);
lastLogTimestamp = System.currentTimeMillis();
@@ -305,8 +304,7 @@ public CompletableFuture read(long streamId, long startOffset, lo
CompletableFuture cf = new CompletableFuture<>();
mainReadExecutor.execute(() -> FutureUtil.propagate(read0(streamId, startOffset, endOffset, maxBytes), cf));
cf.whenComplete((nil, ex) -> {
- OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.READ_STORAGE).operationCount.inc();
- OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.READ_STORAGE).operationTime.update(timerUtil.elapsed());
+ OperationMetricsStats.getHistogram(S3Operation.READ_STORAGE).update(timerUtil.elapsed());
});
return cf;
}
@@ -409,8 +407,7 @@ CompletableFuture uploadWALObject(LogCache.LogCacheBlock logCacheBlock) {
inflightWALUploadTasks.add(cf);
backgroundExecutor.execute(() -> FutureUtil.exec(() -> uploadWALObject0(logCacheBlock, cf), cf, LOGGER, "uploadWALObject"));
cf.whenComplete((nil, ex) -> {
- OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.UPLOAD_STORAGE_WAL).operationCount.inc();
- OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.UPLOAD_STORAGE_WAL).operationTime.update(timerUtil.elapsed());
+ OperationMetricsStats.getHistogram(S3Operation.UPLOAD_STORAGE_WAL).update(timerUtil.elapsed());
inflightWALUploadTasks.remove(cf);
if (ex != null) {
LOGGER.error("upload WAL object fail", ex);
diff --git a/s3stream/src/main/java/com/automq/stream/s3/S3Stream.java b/s3stream/src/main/java/com/automq/stream/s3/S3Stream.java
index 892c4dc0c..5b10782d9 100644
--- a/s3stream/src/main/java/com/automq/stream/s3/S3Stream.java
+++ b/s3stream/src/main/java/com/automq/stream/s3/S3Stream.java
@@ -141,8 +141,7 @@ public CompletableFuture append(RecordBatch recordBatch) {
}, LOGGER, "append");
pendingAppends.add(cf);
cf.whenComplete((nil, ex) -> {
- OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.APPEND_STREAM).operationCount.inc();
- OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.APPEND_STREAM).operationTime.update(System.currentTimeMillis() - start);
+ OperationMetricsStats.getHistogram(S3Operation.APPEND_STREAM).update(System.currentTimeMillis() - start);
pendingAppends.remove(cf);
});
return cf;
@@ -183,8 +182,7 @@ public CompletableFuture fetch(long startOffset, long endOffset, in
CompletableFuture cf = exec(() -> fetch0(startOffset, endOffset, maxBytes), LOGGER, "fetch");
pendingFetches.add(cf);
cf.whenComplete((rs, ex) -> {
- OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.FETCH_STREAM).operationCount.inc();
- OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.FETCH_STREAM).operationTime.update(System.currentTimeMillis() - start);
+ OperationMetricsStats.getHistogram(S3Operation.FETCH_STREAM).update(System.currentTimeMillis() - start);
if (ex != null) {
LOGGER.error("{} stream fetch [{}, {}) {} fail", logIdent, startOffset, endOffset, maxBytes, ex);
} else if (networkOutboundLimiter != null) {
@@ -229,8 +227,7 @@ public CompletableFuture trim(long newStartOffset) {
lastPendingTrim.whenComplete((nil, ex) -> propagate(trim0(newStartOffset), cf));
this.lastPendingTrim = cf;
cf.whenComplete((nil, ex) -> {
- OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.TRIM_STREAM).operationCount.inc();
- OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.TRIM_STREAM).operationTime.update(System.currentTimeMillis() - start);
+ OperationMetricsStats.getHistogram(S3Operation.TRIM_STREAM).update(System.currentTimeMillis() - start);
});
return cf;
}, LOGGER, "trim");
diff --git a/s3stream/src/main/java/com/automq/stream/s3/S3StreamClient.java b/s3stream/src/main/java/com/automq/stream/s3/S3StreamClient.java
index 7c2773f0b..37b2052e1 100644
--- a/s3stream/src/main/java/com/automq/stream/s3/S3StreamClient.java
+++ b/s3stream/src/main/java/com/automq/stream/s3/S3StreamClient.java
@@ -84,8 +84,7 @@ public S3StreamClient(StreamManager streamManager, Storage storage, ObjectManage
public CompletableFuture createAndOpenStream(CreateStreamOptions options) {
TimerUtil timerUtil = new TimerUtil();
return FutureUtil.exec(() -> streamManager.createStream().thenCompose(streamId -> {
- OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.CREATE_STREAM).operationCount.inc();
- OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.CREATE_STREAM).operationTime.update(timerUtil.elapsed());
+ OperationMetricsStats.getHistogram(S3Operation.CREATE_STREAM).update(timerUtil.elapsed());
return openStream0(streamId, options.epoch());
}), LOGGER, "createAndOpenStream");
}
@@ -140,8 +139,7 @@ private CompletableFuture openStream0(long streamId, long epoch) {
TimerUtil timerUtil = new TimerUtil();
return streamManager.openStream(streamId, epoch).
thenApply(metadata -> {
- OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.OPEN_STREAM).operationCount.inc();
- OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.OPEN_STREAM).operationTime.update(timerUtil.elapsed());
+ OperationMetricsStats.getHistogram(S3Operation.OPEN_STREAM).update(timerUtil.elapsed());
StreamObjectsCompactionTask.Builder builder = new StreamObjectsCompactionTask.Builder(objectManager, s3Operator)
.compactedStreamObjectMaxSizeInBytes(config.s3StreamObjectCompactionMaxSizeBytes())
.eligibleStreamObjectLivingTimeInMs(config.s3StreamObjectCompactionLivingTimeMinutes() * 60L * 1000)
diff --git a/s3stream/src/main/java/com/automq/stream/s3/cache/DefaultS3BlockCache.java b/s3stream/src/main/java/com/automq/stream/s3/cache/DefaultS3BlockCache.java
index 916543b74..4b99054ee 100644
--- a/s3stream/src/main/java/com/automq/stream/s3/cache/DefaultS3BlockCache.java
+++ b/s3stream/src/main/java/com/automq/stream/s3/cache/DefaultS3BlockCache.java
@@ -92,11 +92,11 @@ public CompletableFuture read(long streamId, long startOffset, lo
}
if (ret.isCacheHit()) {
- OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.READ_STORAGE_BLOCK_CACHE).operationCount.inc();
+ OperationMetricsStats.getCounter(S3Operation.READ_STORAGE_BLOCK_CACHE).inc();
} else {
- OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.READ_STORAGE_BLOCK_CACHE_MISS).operationCount.inc();
+ OperationMetricsStats.getCounter(S3Operation.READ_STORAGE_BLOCK_CACHE_MISS).inc();
}
- OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.READ_STORAGE_BLOCK_CACHE).operationTime.update(timerUtil.elapsedAndReset());
+ OperationMetricsStats.getHistogram(S3Operation.READ_STORAGE_BLOCK_CACHE).update(timerUtil.elapsedAndReset());
});
return readCf;
}
diff --git a/s3stream/src/main/java/com/automq/stream/s3/cache/LogCache.java b/s3stream/src/main/java/com/automq/stream/s3/cache/LogCache.java
index 1dbc25e26..c4590db9a 100644
--- a/s3stream/src/main/java/com/automq/stream/s3/cache/LogCache.java
+++ b/s3stream/src/main/java/com/automq/stream/s3/cache/LogCache.java
@@ -70,8 +70,7 @@ public boolean put(StreamRecordBatch recordBatch) {
tryRealFree();
size.addAndGet(recordBatch.size());
boolean full = activeBlock.put(recordBatch);
- OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.APPEND_STORAGE_LOG_CACHE).operationCount.inc();
- OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.APPEND_STORAGE_LOG_CACHE).operationTime.update(timerUtil.elapsed());
+ OperationMetricsStats.getHistogram(S3Operation.APPEND_STORAGE_LOG_CACHE).update(timerUtil.elapsed());
return full;
}
@@ -104,11 +103,11 @@ public List get(long streamId, long startOffset, long endOffs
List records = get0(streamId, startOffset, endOffset, maxBytes);
records.forEach(StreamRecordBatch::retain);
if (!records.isEmpty() && records.get(0).getBaseOffset() <= startOffset) {
- OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.READ_STORAGE_LOG_CACHE).operationCount.inc();
+ OperationMetricsStats.getCounter(S3Operation.READ_STORAGE_LOG_CACHE).inc();
} else {
- OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.READ_STORAGE_LOG_CACHE_MISS).operationCount.inc();
+ OperationMetricsStats.getCounter(S3Operation.READ_STORAGE_LOG_CACHE_MISS).inc();
}
- OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.READ_STORAGE_LOG_CACHE).operationTime.update(timerUtil.elapsed());
+ OperationMetricsStats.getHistogram(S3Operation.READ_STORAGE_LOG_CACHE).update(timerUtil.elapsed());
return records;
}
diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/OperationMetricsStats.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/OperationMetricsStats.java
index 5bf97fff2..740c4ff3d 100644
--- a/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/OperationMetricsStats.java
+++ b/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/OperationMetricsStats.java
@@ -26,24 +26,30 @@
import java.util.concurrent.ConcurrentHashMap;
public class OperationMetricsStats {
- private static final Map OPERATION_METRICS_MAP = new ConcurrentHashMap<>();
+ private static final Map OPERATION_COUNTER_MAP = new ConcurrentHashMap<>();
+ private static final Map OPERATION_HIST_MAP = new ConcurrentHashMap<>();
- public static OperationMetrics getOrCreateOperationMetrics(S3Operation s3Operation) {
- return OPERATION_METRICS_MAP.computeIfAbsent(s3Operation.getUniqueKey(), id ->
- new OperationMetrics(s3Operation));
+ public static Counter getCounter(S3Operation s3Operation) {
+ return getOrCreateCounterMetrics(s3Operation);
}
- public static class OperationMetrics {
- public final Counter operationCount;
- public final Histogram operationTime;
+ public static Histogram getHistogram(S3Operation s3Operation) {
+ return getOrCreateHistMetrics(s3Operation);
+ }
+
+ private static Counter getOrCreateCounterMetrics(S3Operation s3Operation) {
+ return OPERATION_COUNTER_MAP.computeIfAbsent(s3Operation.getUniqueKey(), id -> S3StreamMetricsRegistry.getMetricsGroup()
+ .newCounter("operation_count" + Counter.SUFFIX, tags(s3Operation)));
+ }
- public OperationMetrics(S3Operation s3Operation) {
- Map tags = Map.of(
- "operation", s3Operation.getName(),
- "op_type", s3Operation.getType().getName());
- operationCount = S3StreamMetricsRegistry.getMetricsGroup().newCounter("operation_count" + Counter.SUFFIX, tags);
- operationTime = S3StreamMetricsRegistry.getMetricsGroup().newHistogram("operation_time", tags);
- }
+ private static Histogram getOrCreateHistMetrics(S3Operation s3Operation) {
+ return OPERATION_HIST_MAP.computeIfAbsent(s3Operation.getUniqueKey(), id -> S3StreamMetricsRegistry.getMetricsGroup()
+ .newHistogram("operation_time", tags(s3Operation)));
+ }
+ private static Map tags(S3Operation s3Operation) {
+ return Map.of(
+ "operation", s3Operation.getName(),
+ "op_type", s3Operation.getType().getName());
}
}
diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/S3ObjectMetricsStats.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/S3ObjectMetricsStats.java
index 8a4d72a27..ab676f4cd 100644
--- a/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/S3ObjectMetricsStats.java
+++ b/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/S3ObjectMetricsStats.java
@@ -32,7 +32,7 @@ public class S3ObjectMetricsStats {
public static final Histogram S3_OBJECT_UPLOAD_SIZE = S3StreamMetricsRegistry.getMetricsGroup().newHistogram("s3_object_upload_size", Collections.emptyMap());
public static final Histogram S3_OBJECT_DOWNLOAD_SIZE = S3StreamMetricsRegistry.getMetricsGroup().newHistogram("s3_object_download_size", Collections.emptyMap());
- public static Histogram getOrCreateS3ObjectMetrics(S3ObjectStage stage) {
+ public static Histogram getHistogram(S3ObjectStage stage) {
return S3_OBJECT_TIME_MAP.computeIfAbsent(stage.getName(), op -> {
Map tags = Map.of("stage", stage.getName());
return S3StreamMetricsRegistry.getMetricsGroup().newHistogram("s3_object_stage_time", tags);
diff --git a/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java b/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java
index ed63f43a6..d3b7482c7 100644
--- a/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java
+++ b/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java
@@ -248,16 +248,14 @@ void mergedRangeRead0(String path, long start, long end, CompletableFuture {
- OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.GET_OBJECT).operationCount.inc();
- OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.GET_OBJECT).operationTime.update(timerUtil.elapsed());
+ OperationMetricsStats.getHistogram(S3Operation.GET_OBJECT).update(timerUtil.elapsed());
long size = end - start + 1;
S3ObjectMetricsStats.S3_OBJECT_DOWNLOAD_SIZE.update(size);
ByteBuf buf = DirectByteBufAlloc.byteBuffer((int) size, "merge_read");
responsePublisher.subscribe(buf::writeBytes).thenAccept(v -> cf.complete(buf));
})
.exceptionally(ex -> {
- OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.GET_OBJECT_FAIL).operationCount.inc();
- OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.GET_OBJECT_FAIL).operationTime.update(timerUtil.elapsed());
+ OperationMetricsStats.getHistogram(S3Operation.GET_OBJECT_FAIL).update(timerUtil.elapsed());
if (isUnrecoverable(ex)) {
LOGGER.error("GetObject for object {} [{}, {}) fail", path, start, end, ex);
cf.completeExceptionally(ex);
@@ -296,14 +294,12 @@ private void write0(String path, ByteBuf data, CompletableFuture cf) {
PutObjectRequest request = PutObjectRequest.builder().bucket(bucket).key(path).build();
AsyncRequestBody body = AsyncRequestBody.fromByteBuffersUnsafe(data.nioBuffers());
writeS3Client.putObject(request, body).thenAccept(putObjectResponse -> {
- OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.PUT_OBJECT).operationCount.inc();
- OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.PUT_OBJECT).operationTime.update(timerUtil.elapsed());
+ OperationMetricsStats.getHistogram(S3Operation.PUT_OBJECT).update(timerUtil.elapsed());
LOGGER.debug("put object {} with size {}, cost {}ms", path, objectSize, timerUtil.elapsed());
cf.complete(null);
data.release();
}).exceptionally(ex -> {
- OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.PUT_OBJECT_FAIL).operationCount.inc();
- OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.PUT_OBJECT_FAIL).operationTime.update(timerUtil.elapsed());
+ OperationMetricsStats.getHistogram(S3Operation.PUT_OBJECT_FAIL).update(timerUtil.elapsed());
if (isUnrecoverable(ex)) {
LOGGER.error("PutObject for object {} fail", path, ex);
cf.completeExceptionally(ex);
@@ -326,12 +322,10 @@ public CompletableFuture delete(String path) {
TimerUtil timerUtil = new TimerUtil();
DeleteObjectRequest request = DeleteObjectRequest.builder().bucket(bucket).key(path).build();
return writeS3Client.deleteObject(request).thenAccept(deleteObjectResponse -> {
- OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.DELETE_OBJECT).operationCount.inc();
- OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.DELETE_OBJECT).operationTime.update(timerUtil.elapsed());
+ OperationMetricsStats.getHistogram(S3Operation.DELETE_OBJECT).update(timerUtil.elapsed());
LOGGER.info("[ControllerS3Operator]: Delete object finished, path: {}, cost: {}", path, timerUtil.elapsed());
}).exceptionally(ex -> {
- OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.DELETE_OBJECT_FAIL).operationCount.inc();
- OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.DELETE_OBJECT_FAIL).operationTime.update(timerUtil.elapsed());
+ OperationMetricsStats.getHistogram(S3Operation.DELETE_OBJECT_FAIL).update(timerUtil.elapsed());
LOGGER.info("[ControllerS3Operator]: Delete object failed, path: {}, cost: {}, ex: {}", path, timerUtil.elapsed(), ex.getMessage());
return null;
});
@@ -351,13 +345,11 @@ public CompletableFuture> delete(List objectKeys) {
.build();
// TODO: handle not exist object, should we regard it as deleted or ignore it.
return this.writeS3Client.deleteObjects(request).thenApply(resp -> {
- OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.DELETE_OBJECTS).operationCount.inc();
- OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.DELETE_OBJECTS).operationTime.update(timerUtil.elapsed());
+ OperationMetricsStats.getHistogram(S3Operation.DELETE_OBJECTS).update(timerUtil.elapsed());
LOGGER.info("[ControllerS3Operator]: Delete objects finished, count: {}, cost: {}", resp.deleted().size(), timerUtil.elapsed());
return resp.deleted().stream().map(DeletedObject::key).collect(Collectors.toList());
}).exceptionally(ex -> {
- OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.DELETE_OBJECTS_FAIL).operationCount.inc();
- OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.DELETE_OBJECTS_FAIL).operationTime.update(timerUtil.elapsed());
+ OperationMetricsStats.getHistogram(S3Operation.DELETE_OBJECTS_FAIL).update(timerUtil.elapsed());
LOGGER.info("[ControllerS3Operator]: Delete objects failed, count: {}, cost: {}, ex: {}", objectKeys.size(), timerUtil.elapsed(), ex.getMessage());
return Collections.emptyList();
});
@@ -378,12 +370,10 @@ void createMultipartUpload0(String path, CompletableFuture cf) {
TimerUtil timerUtil = new TimerUtil();
CreateMultipartUploadRequest request = CreateMultipartUploadRequest.builder().bucket(bucket).key(path).build();
writeS3Client.createMultipartUpload(request).thenAccept(createMultipartUploadResponse -> {
- OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.CREATE_MULTI_PART_UPLOAD).operationCount.inc();
- OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.CREATE_MULTI_PART_UPLOAD).operationTime.update(timerUtil.elapsed());
+ OperationMetricsStats.getHistogram(S3Operation.CREATE_MULTI_PART_UPLOAD).update(timerUtil.elapsed());
cf.complete(createMultipartUploadResponse.uploadId());
}).exceptionally(ex -> {
- OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.CREATE_MULTI_PART_UPLOAD_FAIL).operationCount.inc();
- OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.CREATE_MULTI_PART_UPLOAD_FAIL).operationTime.update(timerUtil.elapsed());
+ OperationMetricsStats.getHistogram(S3Operation.CREATE_MULTI_PART_UPLOAD_FAIL).update(timerUtil.elapsed());
if (isUnrecoverable(ex)) {
LOGGER.error("CreateMultipartUpload for object {} fail", path, ex);
cf.completeExceptionally(ex);
@@ -424,13 +414,11 @@ private void uploadPart0(String path, String uploadId, int partNumber, ByteBuf p
.partNumber(partNumber).build();
CompletableFuture uploadPartCf = writeS3Client.uploadPart(request, body);
uploadPartCf.thenAccept(uploadPartResponse -> {
- OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.UPLOAD_PART).operationCount.inc();
- OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.UPLOAD_PART).operationTime.update(timerUtil.elapsed());
+ OperationMetricsStats.getHistogram(S3Operation.UPLOAD_PART).update(timerUtil.elapsed());
CompletedPart completedPart = CompletedPart.builder().partNumber(partNumber).eTag(uploadPartResponse.eTag()).build();
cf.complete(completedPart);
}).exceptionally(ex -> {
- OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.UPLOAD_PART_FAIL).operationCount.inc();
- OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.UPLOAD_PART_FAIL).operationTime.update(timerUtil.elapsed());
+ OperationMetricsStats.getHistogram(S3Operation.UPLOAD_PART_FAIL).update(timerUtil.elapsed());
if (isUnrecoverable(ex)) {
LOGGER.error("UploadPart for object {}-{} fail", path, partNumber, ex);
cf.completeExceptionally(ex);
@@ -459,14 +447,12 @@ private void uploadPartCopy0(String sourcePath, String path, long start, long en
UploadPartCopyRequest request = UploadPartCopyRequest.builder().sourceBucket(bucket).sourceKey(sourcePath)
.destinationBucket(bucket).destinationKey(path).copySourceRange(range(start, inclusiveEnd)).uploadId(uploadId).partNumber(partNumber).build();
writeS3Client.uploadPartCopy(request).thenAccept(uploadPartCopyResponse -> {
- OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.UPLOAD_PART_COPY).operationCount.inc();
- OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.UPLOAD_PART_COPY).operationTime.update(timerUtil.elapsed());
+ OperationMetricsStats.getHistogram(S3Operation.UPLOAD_PART_COPY).update(timerUtil.elapsed());
CompletedPart completedPart = CompletedPart.builder().partNumber(partNumber)
.eTag(uploadPartCopyResponse.copyPartResult().eTag()).build();
cf.complete(completedPart);
}).exceptionally(ex -> {
- OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.UPLOAD_PART_COPY_FAIL).operationCount.inc();
- OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.UPLOAD_PART_COPY_FAIL).operationTime.update(timerUtil.elapsed());
+ OperationMetricsStats.getHistogram(S3Operation.UPLOAD_PART_COPY_FAIL).update(timerUtil.elapsed());
if (isUnrecoverable(ex)) {
LOGGER.warn("UploadPartCopy for object {}-{} fail", path, partNumber, ex);
cf.completeExceptionally(ex);
@@ -495,12 +481,10 @@ public void completeMultipartUpload0(String path, String uploadId, List {
- OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.COMPLETE_MULTI_PART_UPLOAD).operationCount.inc();
- OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.COMPLETE_MULTI_PART_UPLOAD).operationTime.update(timerUtil.elapsed());
+ OperationMetricsStats.getHistogram(S3Operation.COMPLETE_MULTI_PART_UPLOAD).update(timerUtil.elapsed());
cf.complete(null);
}).exceptionally(ex -> {
- OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.COMPLETE_MULTI_PART_UPLOAD_FAIL).operationCount.inc();
- OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.COMPLETE_MULTI_PART_UPLOAD_FAIL).operationTime.update(timerUtil.elapsed());
+ OperationMetricsStats.getHistogram(S3Operation.COMPLETE_MULTI_PART_UPLOAD_FAIL).update(timerUtil.elapsed());
if (isUnrecoverable(ex)) {
LOGGER.error("CompleteMultipartUpload for object {} fail", path, ex);
cf.completeExceptionally(ex);
diff --git a/s3stream/src/main/java/com/automq/stream/s3/operator/MultiPartWriter.java b/s3stream/src/main/java/com/automq/stream/s3/operator/MultiPartWriter.java
index 6b0e74edb..28b72a744 100644
--- a/s3stream/src/main/java/com/automq/stream/s3/operator/MultiPartWriter.java
+++ b/s3stream/src/main/java/com/automq/stream/s3/operator/MultiPartWriter.java
@@ -139,12 +139,12 @@ public CompletableFuture close() {
objectPart = null;
}
- S3ObjectMetricsStats.getOrCreateS3ObjectMetrics(S3ObjectStage.READY_CLOSE).update(timerUtil.elapsed());
+ S3ObjectMetricsStats.getHistogram(S3ObjectStage.READY_CLOSE).update(timerUtil.elapsed());
closeCf = new CompletableFuture<>();
CompletableFuture uploadDoneCf = uploadIdCf.thenCompose(uploadId -> CompletableFuture.allOf(parts.toArray(new CompletableFuture[0])));
FutureUtil.propagate(uploadDoneCf.thenCompose(nil -> operator.completeMultipartUpload(path, uploadId, genCompleteParts())), closeCf);
closeCf.whenComplete((nil, ex) -> {
- S3ObjectMetricsStats.getOrCreateS3ObjectMetrics(S3ObjectStage.TOTAL).update(timerUtil.elapsed());
+ S3ObjectMetricsStats.getHistogram(S3ObjectStage.TOTAL).update(timerUtil.elapsed());
S3ObjectMetricsStats.S3_OBJECT_COUNT.inc();
S3ObjectMetricsStats.S3_OBJECT_UPLOAD_SIZE.update(totalWriteSize.get());
});
@@ -213,7 +213,7 @@ public void upload() {
private void upload0() {
TimerUtil timerUtil = new TimerUtil();
FutureUtil.propagate(uploadIdCf.thenCompose(uploadId -> operator.uploadPart(path, uploadId, partNumber, partBuf, throttleStrategy)), partCf);
- partCf.whenComplete((nil, ex) -> S3ObjectMetricsStats.getOrCreateS3ObjectMetrics(S3ObjectStage.UPLOAD_PART).update(timerUtil.elapsed()));
+ partCf.whenComplete((nil, ex) -> S3ObjectMetricsStats.getHistogram(S3ObjectStage.UPLOAD_PART).update(timerUtil.elapsed()));
}
public long size() {
diff --git a/s3stream/src/main/java/com/automq/stream/s3/wal/BlockWALService.java b/s3stream/src/main/java/com/automq/stream/s3/wal/BlockWALService.java
index 2770686a2..f951a35d0 100644
--- a/s3stream/src/main/java/com/automq/stream/s3/wal/BlockWALService.java
+++ b/s3stream/src/main/java/com/automq/stream/s3/wal/BlockWALService.java
@@ -431,7 +431,7 @@ public AppendResult append(ByteBuf buf, int crc) throws OverCapacityException {
return append0(buf, crc);
} catch (OverCapacityException ex) {
buf.release();
- OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.APPEND_STORAGE_WAL_FULL).operationCount.inc();
+ OperationMetricsStats.getCounter(S3Operation.APPEND_STORAGE_WAL_FULL).inc();
throw ex;
}
}
@@ -461,8 +461,7 @@ public AppendResult append0(ByteBuf body, int crc) throws OverCapacityException
final AppendResult appendResult = new AppendResultImpl(expectedWriteOffset, appendResultFuture);
appendResult.future().whenComplete((nil, ex) -> {
- OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.APPEND_STORAGE_WAL).operationCount.inc();
- OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.APPEND_STORAGE_WAL).operationTime.update(timerUtil.elapsed());
+ OperationMetricsStats.getHistogram(S3Operation.APPEND_STORAGE_WAL).update(timerUtil.elapsed());
});
return appendResult;
}