From 44d9b59c4f6c29d4c94750d75e4bc410a03c652f Mon Sep 17 00:00:00 2001 From: Shichao Nie Date: Wed, 10 Jan 2024 11:19:56 +0800 Subject: [PATCH] fix(s3stream): fix operation latency unit Signed-off-by: Shichao Nie --- .../java/com/automq/stream/s3/S3Stream.java | 10 +++--- .../stream/s3/operator/DefaultS3Operator.java | 36 +++++++++---------- 2 files changed, 23 insertions(+), 23 deletions(-) diff --git a/s3stream/src/main/java/com/automq/stream/s3/S3Stream.java b/s3stream/src/main/java/com/automq/stream/s3/S3Stream.java index 7a695a4ee..fd45e8a0b 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/S3Stream.java +++ b/s3stream/src/main/java/com/automq/stream/s3/S3Stream.java @@ -150,7 +150,7 @@ public CompletableFuture append(AppendContext context, RecordBatch }, LOGGER, "append"); pendingAppends.add(cf); cf.whenComplete((nil, ex) -> { - StreamOperationStats.getInstance().appendStreamStats.record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.MILLISECONDS)); + StreamOperationStats.getInstance().appendStreamStats.record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); pendingAppends.remove(cf); }); return cf; @@ -196,7 +196,7 @@ public CompletableFuture fetch(FetchContext context, CompletableFuture cf = exec(() -> fetch0(context, startOffset, endOffset, maxBytes), LOGGER, "fetch"); pendingFetches.add(cf); cf.whenComplete((rs, ex) -> { - StreamOperationStats.getInstance().fetchStreamStats.record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.MILLISECONDS)); + StreamOperationStats.getInstance().fetchStreamStats.record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); if (ex != null) { Throwable cause = FutureUtil.cause(ex); if (!(cause instanceof FastReadFailFastException)) { @@ -260,7 +260,7 @@ public CompletableFuture trim(long newStartOffset) { lastPendingTrim.whenComplete((nil, ex) -> propagate(trim0(newStartOffset), cf)); this.lastPendingTrim = cf; cf.whenComplete((nil, ex) -> { - StreamOperationStats.getInstance().trimStreamStats.record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.MILLISECONDS)); + StreamOperationStats.getInstance().trimStreamStats.record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); }); return cf; }, LOGGER, "trim"); @@ -310,10 +310,10 @@ public CompletableFuture close() { closeCf.whenComplete((nil, ex) -> { if (ex != null) { LOGGER.error("{} close fail", logIdent, ex); - StreamOperationStats.getInstance().closeStreamStats(false).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.MILLISECONDS)); + StreamOperationStats.getInstance().closeStreamStats(false).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); } else { LOGGER.info("{} closed", logIdent); - StreamOperationStats.getInstance().closeStreamStats(true).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.MILLISECONDS)); + StreamOperationStats.getInstance().closeStreamStats(true).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); } }); diff --git a/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java b/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java index ce3358c2b..f0f2961a8 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java +++ b/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java @@ -210,7 +210,7 @@ public CompletableFuture rangeRead(String path, long start, long end, T TimerUtil timerUtil = new TimerUtil(); networkInboundBandwidthLimiter.consume(throttleStrategy, end - start).whenCompleteAsync((v, ex) -> { NetworkStats.getInstance().networkLimiterQueueTimeStats(AsyncNetworkBandwidthLimiter.Type.INBOUND) - .record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.MILLISECONDS)); + .record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); if (ex != null) { cf.completeExceptionally(ex); } else { @@ -316,12 +316,12 @@ void mergedRangeRead0(String path, long start, long end, CompletableFuture { - S3OperationStats.getInstance().getObjectStats(size, false).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.MILLISECONDS)); + S3OperationStats.getInstance().getObjectStats(size, false).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); if (isUnrecoverable(ex)) { LOGGER.error("GetObject for object {} [{}, {}) fail", path, start, end, ex); cf.completeExceptionally(ex); @@ -344,7 +344,7 @@ public CompletableFuture write(String path, ByteBuf data, ThrottleStrategy TimerUtil timerUtil = new TimerUtil(); networkOutboundBandwidthLimiter.consume(throttleStrategy, data.readableBytes()).whenCompleteAsync((v, ex) -> { NetworkStats.getInstance().networkLimiterQueueTimeStats(AsyncNetworkBandwidthLimiter.Type.OUTBOUND) - .record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.MILLISECONDS)); + .record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); if (ex != null) { cf.completeExceptionally(ex); } else { @@ -364,12 +364,12 @@ private void write0(String path, ByteBuf data, CompletableFuture cf) { AsyncRequestBody body = AsyncRequestBody.fromByteBuffersUnsafe(data.nioBuffers()); writeS3Client.putObject(request, body).thenAccept(putObjectResponse -> { S3OperationStats.getInstance().uploadSizeTotalStats.add(MetricsLevel.INFO, objectSize); - S3OperationStats.getInstance().putObjectStats(objectSize, true).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.MILLISECONDS)); + S3OperationStats.getInstance().putObjectStats(objectSize, true).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); LOGGER.debug("put object {} with size {}, cost {}ms", path, objectSize, timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); cf.complete(null); data.release(); }).exceptionally(ex -> { - S3OperationStats.getInstance().putObjectStats(objectSize, false).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.MILLISECONDS)); + S3OperationStats.getInstance().putObjectStats(objectSize, false).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); if (isUnrecoverable(ex)) { LOGGER.error("PutObject for object {} fail", path, ex); cf.completeExceptionally(ex); @@ -392,10 +392,10 @@ public CompletableFuture delete(String path) { TimerUtil timerUtil = new TimerUtil(); DeleteObjectRequest request = DeleteObjectRequest.builder().bucket(bucket).key(path).build(); return writeS3Client.deleteObject(request).thenAccept(deleteObjectResponse -> { - S3OperationStats.getInstance().deleteObjectStats(true).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.MILLISECONDS)); + S3OperationStats.getInstance().deleteObjectStats(true).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); LOGGER.info("[ControllerS3Operator]: Delete object finished, path: {}, cost: {}", path, timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); }).exceptionally(ex -> { - S3OperationStats.getInstance().deleteObjectsStats(false).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.MILLISECONDS)); + S3OperationStats.getInstance().deleteObjectsStats(false).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); LOGGER.info("[ControllerS3Operator]: Delete object failed, path: {}, cost: {}, ex: {}", path, timerUtil.elapsedAs(TimeUnit.NANOSECONDS), ex.getMessage()); return null; }); @@ -415,11 +415,11 @@ public CompletableFuture> delete(List objectKeys) { .build(); // TODO: handle not exist object, should we regard it as deleted or ignore it. return this.writeS3Client.deleteObjects(request).thenApply(resp -> { - S3OperationStats.getInstance().deleteObjectsStats(true).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.MILLISECONDS)); + S3OperationStats.getInstance().deleteObjectsStats(true).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); LOGGER.info("[ControllerS3Operator]: Delete objects finished, count: {}, cost: {}", resp.deleted().size(), timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); return resp.deleted().stream().map(DeletedObject::key).collect(Collectors.toList()); }).exceptionally(ex -> { - S3OperationStats.getInstance().deleteObjectsStats(false).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.MILLISECONDS)); + S3OperationStats.getInstance().deleteObjectsStats(false).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); LOGGER.info("[ControllerS3Operator]: Delete objects failed, count: {}, cost: {}, ex: {}", objectKeys.size(), timerUtil.elapsedAs(TimeUnit.NANOSECONDS), ex.getMessage()); return Collections.emptyList(); }); @@ -440,10 +440,10 @@ void createMultipartUpload0(String path, CompletableFuture cf) { TimerUtil timerUtil = new TimerUtil(); CreateMultipartUploadRequest request = CreateMultipartUploadRequest.builder().bucket(bucket).key(path).build(); writeS3Client.createMultipartUpload(request).thenAccept(createMultipartUploadResponse -> { - S3OperationStats.getInstance().createMultiPartUploadStats(true).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.MILLISECONDS)); + S3OperationStats.getInstance().createMultiPartUploadStats(true).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); cf.complete(createMultipartUploadResponse.uploadId()); }).exceptionally(ex -> { - S3OperationStats.getInstance().createMultiPartUploadStats(false).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.MILLISECONDS)); + S3OperationStats.getInstance().createMultiPartUploadStats(false).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); if (isUnrecoverable(ex)) { LOGGER.error("CreateMultipartUpload for object {} fail", path, ex); cf.completeExceptionally(ex); @@ -488,11 +488,11 @@ private void uploadPart0(String path, String uploadId, int partNumber, ByteBuf p CompletableFuture uploadPartCf = writeS3Client.uploadPart(request, body); uploadPartCf.thenAccept(uploadPartResponse -> { S3OperationStats.getInstance().uploadSizeTotalStats.add(MetricsLevel.INFO, size); - S3OperationStats.getInstance().uploadPartStats(size, true).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.MILLISECONDS)); + S3OperationStats.getInstance().uploadPartStats(size, true).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); CompletedPart completedPart = CompletedPart.builder().partNumber(partNumber).eTag(uploadPartResponse.eTag()).build(); cf.complete(completedPart); }).exceptionally(ex -> { - S3OperationStats.getInstance().uploadPartStats(size, false).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.MILLISECONDS)); + S3OperationStats.getInstance().uploadPartStats(size, false).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); if (isUnrecoverable(ex)) { LOGGER.error("UploadPart for object {}-{} fail", path, partNumber, ex); cf.completeExceptionally(ex); @@ -523,12 +523,12 @@ private void uploadPartCopy0(String sourcePath, String path, long start, long en UploadPartCopyRequest request = UploadPartCopyRequest.builder().sourceBucket(bucket).sourceKey(sourcePath) .destinationBucket(bucket).destinationKey(path).copySourceRange(range(start, inclusiveEnd)).uploadId(uploadId).partNumber(partNumber).build(); writeS3Client.uploadPartCopy(request).thenAccept(uploadPartCopyResponse -> { - S3OperationStats.getInstance().uploadPartCopyStats(true).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.MILLISECONDS)); + S3OperationStats.getInstance().uploadPartCopyStats(true).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); CompletedPart completedPart = CompletedPart.builder().partNumber(partNumber) .eTag(uploadPartCopyResponse.copyPartResult().eTag()).build(); cf.complete(completedPart); }).exceptionally(ex -> { - S3OperationStats.getInstance().uploadPartCopyStats(false).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.MILLISECONDS)); + S3OperationStats.getInstance().uploadPartCopyStats(false).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); if (isUnrecoverable(ex)) { LOGGER.warn("UploadPartCopy for object {}-{} fail", path, partNumber, ex); cf.completeExceptionally(ex); @@ -558,10 +558,10 @@ public void completeMultipartUpload0(String path, String uploadId, List { - S3OperationStats.getInstance().completeMultiPartUploadStats(true).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.MILLISECONDS)); + S3OperationStats.getInstance().completeMultiPartUploadStats(true).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); cf.complete(null); }).exceptionally(ex -> { - S3OperationStats.getInstance().completeMultiPartUploadStats(false).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.MILLISECONDS)); + S3OperationStats.getInstance().completeMultiPartUploadStats(false).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); if (isUnrecoverable(ex)) { LOGGER.error("CompleteMultipartUpload for object {} fail", path, ex); cf.completeExceptionally(ex);