Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(s3stream): fix operation latency unit #892

Merged
merged 1 commit into from
Jan 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions s3stream/src/main/java/com/automq/stream/s3/S3Stream.java
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ public CompletableFuture<AppendResult> append(AppendContext context, RecordBatch
}, LOGGER, "append");
pendingAppends.add(cf);
cf.whenComplete((nil, ex) -> {
StreamOperationStats.getInstance().appendStreamStats.record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.MILLISECONDS));
StreamOperationStats.getInstance().appendStreamStats.record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
pendingAppends.remove(cf);
});
return cf;
Expand Down Expand Up @@ -196,7 +196,7 @@ public CompletableFuture<FetchResult> fetch(FetchContext context,
CompletableFuture<FetchResult> cf = exec(() -> fetch0(context, startOffset, endOffset, maxBytes), LOGGER, "fetch");
pendingFetches.add(cf);
cf.whenComplete((rs, ex) -> {
StreamOperationStats.getInstance().fetchStreamStats.record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.MILLISECONDS));
StreamOperationStats.getInstance().fetchStreamStats.record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
if (ex != null) {
Throwable cause = FutureUtil.cause(ex);
if (!(cause instanceof FastReadFailFastException)) {
Expand Down Expand Up @@ -260,7 +260,7 @@ public CompletableFuture<Void> trim(long newStartOffset) {
lastPendingTrim.whenComplete((nil, ex) -> propagate(trim0(newStartOffset), cf));
this.lastPendingTrim = cf;
cf.whenComplete((nil, ex) -> {
StreamOperationStats.getInstance().trimStreamStats.record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.MILLISECONDS));
StreamOperationStats.getInstance().trimStreamStats.record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
});
return cf;
}, LOGGER, "trim");
Expand Down Expand Up @@ -310,10 +310,10 @@ public CompletableFuture<Void> close() {
closeCf.whenComplete((nil, ex) -> {
if (ex != null) {
LOGGER.error("{} close fail", logIdent, ex);
StreamOperationStats.getInstance().closeStreamStats(false).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.MILLISECONDS));
StreamOperationStats.getInstance().closeStreamStats(false).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
} else {
LOGGER.info("{} closed", logIdent);
StreamOperationStats.getInstance().closeStreamStats(true).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.MILLISECONDS));
StreamOperationStats.getInstance().closeStreamStats(true).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ public CompletableFuture<ByteBuf> rangeRead(String path, long start, long end, T
TimerUtil timerUtil = new TimerUtil();
networkInboundBandwidthLimiter.consume(throttleStrategy, end - start).whenCompleteAsync((v, ex) -> {
NetworkStats.getInstance().networkLimiterQueueTimeStats(AsyncNetworkBandwidthLimiter.Type.INBOUND)
.record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.MILLISECONDS));
.record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
if (ex != null) {
cf.completeExceptionally(ex);
} else {
Expand Down Expand Up @@ -316,12 +316,12 @@ void mergedRangeRead0(String path, long start, long end, CompletableFuture<ByteB
path, start, end, size, timerUtil.elapsedAs(TimeUnit.MILLISECONDS));
}
S3OperationStats.getInstance().downloadSizeTotalStats.add(MetricsLevel.INFO, size);
S3OperationStats.getInstance().getObjectStats(size, true).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.MILLISECONDS));
S3OperationStats.getInstance().getObjectStats(size, true).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
cf.complete(buf);
});
})
.exceptionally(ex -> {
S3OperationStats.getInstance().getObjectStats(size, false).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.MILLISECONDS));
S3OperationStats.getInstance().getObjectStats(size, false).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
if (isUnrecoverable(ex)) {
LOGGER.error("GetObject for object {} [{}, {}) fail", path, start, end, ex);
cf.completeExceptionally(ex);
Expand All @@ -344,7 +344,7 @@ public CompletableFuture<Void> write(String path, ByteBuf data, ThrottleStrategy
TimerUtil timerUtil = new TimerUtil();
networkOutboundBandwidthLimiter.consume(throttleStrategy, data.readableBytes()).whenCompleteAsync((v, ex) -> {
NetworkStats.getInstance().networkLimiterQueueTimeStats(AsyncNetworkBandwidthLimiter.Type.OUTBOUND)
.record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.MILLISECONDS));
.record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
if (ex != null) {
cf.completeExceptionally(ex);
} else {
Expand All @@ -364,12 +364,12 @@ private void write0(String path, ByteBuf data, CompletableFuture<Void> cf) {
AsyncRequestBody body = AsyncRequestBody.fromByteBuffersUnsafe(data.nioBuffers());
writeS3Client.putObject(request, body).thenAccept(putObjectResponse -> {
S3OperationStats.getInstance().uploadSizeTotalStats.add(MetricsLevel.INFO, objectSize);
S3OperationStats.getInstance().putObjectStats(objectSize, true).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.MILLISECONDS));
S3OperationStats.getInstance().putObjectStats(objectSize, true).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
LOGGER.debug("put object {} with size {}, cost {}ms", path, objectSize, timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
cf.complete(null);
data.release();
}).exceptionally(ex -> {
S3OperationStats.getInstance().putObjectStats(objectSize, false).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.MILLISECONDS));
S3OperationStats.getInstance().putObjectStats(objectSize, false).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
if (isUnrecoverable(ex)) {
LOGGER.error("PutObject for object {} fail", path, ex);
cf.completeExceptionally(ex);
Expand All @@ -392,10 +392,10 @@ public CompletableFuture<Void> delete(String path) {
TimerUtil timerUtil = new TimerUtil();
DeleteObjectRequest request = DeleteObjectRequest.builder().bucket(bucket).key(path).build();
return writeS3Client.deleteObject(request).thenAccept(deleteObjectResponse -> {
S3OperationStats.getInstance().deleteObjectStats(true).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.MILLISECONDS));
S3OperationStats.getInstance().deleteObjectStats(true).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
LOGGER.info("[ControllerS3Operator]: Delete object finished, path: {}, cost: {}", path, timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
}).exceptionally(ex -> {
S3OperationStats.getInstance().deleteObjectsStats(false).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.MILLISECONDS));
S3OperationStats.getInstance().deleteObjectsStats(false).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
LOGGER.info("[ControllerS3Operator]: Delete object failed, path: {}, cost: {}, ex: {}", path, timerUtil.elapsedAs(TimeUnit.NANOSECONDS), ex.getMessage());
return null;
});
Expand All @@ -415,11 +415,11 @@ public CompletableFuture<List<String>> delete(List<String> objectKeys) {
.build();
// TODO: handle not exist object, should we regard it as deleted or ignore it.
return this.writeS3Client.deleteObjects(request).thenApply(resp -> {
S3OperationStats.getInstance().deleteObjectsStats(true).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.MILLISECONDS));
S3OperationStats.getInstance().deleteObjectsStats(true).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
LOGGER.info("[ControllerS3Operator]: Delete objects finished, count: {}, cost: {}", resp.deleted().size(), timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
return resp.deleted().stream().map(DeletedObject::key).collect(Collectors.toList());
}).exceptionally(ex -> {
S3OperationStats.getInstance().deleteObjectsStats(false).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.MILLISECONDS));
S3OperationStats.getInstance().deleteObjectsStats(false).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
LOGGER.info("[ControllerS3Operator]: Delete objects failed, count: {}, cost: {}, ex: {}", objectKeys.size(), timerUtil.elapsedAs(TimeUnit.NANOSECONDS), ex.getMessage());
return Collections.emptyList();
});
Expand All @@ -440,10 +440,10 @@ void createMultipartUpload0(String path, CompletableFuture<String> cf) {
TimerUtil timerUtil = new TimerUtil();
CreateMultipartUploadRequest request = CreateMultipartUploadRequest.builder().bucket(bucket).key(path).build();
writeS3Client.createMultipartUpload(request).thenAccept(createMultipartUploadResponse -> {
S3OperationStats.getInstance().createMultiPartUploadStats(true).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.MILLISECONDS));
S3OperationStats.getInstance().createMultiPartUploadStats(true).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
cf.complete(createMultipartUploadResponse.uploadId());
}).exceptionally(ex -> {
S3OperationStats.getInstance().createMultiPartUploadStats(false).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.MILLISECONDS));
S3OperationStats.getInstance().createMultiPartUploadStats(false).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
if (isUnrecoverable(ex)) {
LOGGER.error("CreateMultipartUpload for object {} fail", path, ex);
cf.completeExceptionally(ex);
Expand Down Expand Up @@ -488,11 +488,11 @@ private void uploadPart0(String path, String uploadId, int partNumber, ByteBuf p
CompletableFuture<UploadPartResponse> uploadPartCf = writeS3Client.uploadPart(request, body);
uploadPartCf.thenAccept(uploadPartResponse -> {
S3OperationStats.getInstance().uploadSizeTotalStats.add(MetricsLevel.INFO, size);
S3OperationStats.getInstance().uploadPartStats(size, true).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.MILLISECONDS));
S3OperationStats.getInstance().uploadPartStats(size, true).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
CompletedPart completedPart = CompletedPart.builder().partNumber(partNumber).eTag(uploadPartResponse.eTag()).build();
cf.complete(completedPart);
}).exceptionally(ex -> {
S3OperationStats.getInstance().uploadPartStats(size, false).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.MILLISECONDS));
S3OperationStats.getInstance().uploadPartStats(size, false).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
if (isUnrecoverable(ex)) {
LOGGER.error("UploadPart for object {}-{} fail", path, partNumber, ex);
cf.completeExceptionally(ex);
Expand Down Expand Up @@ -523,12 +523,12 @@ private void uploadPartCopy0(String sourcePath, String path, long start, long en
UploadPartCopyRequest request = UploadPartCopyRequest.builder().sourceBucket(bucket).sourceKey(sourcePath)
.destinationBucket(bucket).destinationKey(path).copySourceRange(range(start, inclusiveEnd)).uploadId(uploadId).partNumber(partNumber).build();
writeS3Client.uploadPartCopy(request).thenAccept(uploadPartCopyResponse -> {
S3OperationStats.getInstance().uploadPartCopyStats(true).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.MILLISECONDS));
S3OperationStats.getInstance().uploadPartCopyStats(true).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
CompletedPart completedPart = CompletedPart.builder().partNumber(partNumber)
.eTag(uploadPartCopyResponse.copyPartResult().eTag()).build();
cf.complete(completedPart);
}).exceptionally(ex -> {
S3OperationStats.getInstance().uploadPartCopyStats(false).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.MILLISECONDS));
S3OperationStats.getInstance().uploadPartCopyStats(false).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
if (isUnrecoverable(ex)) {
LOGGER.warn("UploadPartCopy for object {}-{} fail", path, partNumber, ex);
cf.completeExceptionally(ex);
Expand Down Expand Up @@ -558,10 +558,10 @@ public void completeMultipartUpload0(String path, String uploadId, List<Complete
CompleteMultipartUploadRequest request = CompleteMultipartUploadRequest.builder().bucket(bucket).key(path).uploadId(uploadId).multipartUpload(multipartUpload).build();

writeS3Client.completeMultipartUpload(request).thenAccept(completeMultipartUploadResponse -> {
S3OperationStats.getInstance().completeMultiPartUploadStats(true).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.MILLISECONDS));
S3OperationStats.getInstance().completeMultiPartUploadStats(true).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
cf.complete(null);
}).exceptionally(ex -> {
S3OperationStats.getInstance().completeMultiPartUploadStats(false).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.MILLISECONDS));
S3OperationStats.getInstance().completeMultiPartUploadStats(false).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
if (isUnrecoverable(ex)) {
LOGGER.error("CompleteMultipartUpload for object {} fail", path, ex);
cf.completeExceptionally(ex);
Expand Down