Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(s3stream): support filter operation status by label #853

Merged
merged 1 commit into from
Dec 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -271,13 +271,22 @@ public static void recordOperationNum(long value, S3Operation operation) {
}

public static void recordOperationLatency(long value, S3Operation operation) {
recordOperationLatency(value, operation, 0);
recordOperationLatency(value, operation, 0, true);
}

public static void recordOperationLatency(long value, S3Operation operation, boolean isSuccess) {
recordOperationLatency(value, operation, 0, isSuccess);
}

public static void recordOperationLatency(long value, S3Operation operation, long size) {
recordOperationLatency(value, operation, size, true);
}

public static void recordOperationLatency(long value, S3Operation operation, long size, boolean isSuccess) {
AttributesBuilder attributesBuilder = newAttributesBuilder()
.put(S3StreamMetricsConstant.LABEL_OPERATION_TYPE, operation.getType().getName())
.put(S3StreamMetricsConstant.LABEL_OPERATION_NAME, operation.getName());
.put(S3StreamMetricsConstant.LABEL_OPERATION_NAME, operation.getName())
.put(S3StreamMetricsConstant.LABEL_STATUS, isSuccess ? "success" : "failed");
if (operation == S3Operation.GET_OBJECT || operation == S3Operation.PUT_OBJECT || operation == S3Operation.UPLOAD_PART) {
attributesBuilder.put(S3StreamMetricsConstant.LABEL_SIZE_NAME, getObjectBucketLabel(size));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,21 +47,13 @@ public enum S3Operation {

/* S3 request operations start */
GET_OBJECT(S3MetricsType.S3Request, "get_object"),
GET_OBJECT_FAIL(S3MetricsType.S3Request, "get_object_fail"),
PUT_OBJECT(S3MetricsType.S3Request, "put_object"),
PUT_OBJECT_FAIL(S3MetricsType.S3Request, "put_object_fail"),
DELETE_OBJECT(S3MetricsType.S3Request, "delete_object"),
DELETE_OBJECT_FAIL(S3MetricsType.S3Request, "delete_object_fail"),
DELETE_OBJECTS(S3MetricsType.S3Request, "delete_objects"),
DELETE_OBJECTS_FAIL(S3MetricsType.S3Request, "delete_objects_fail"),
CREATE_MULTI_PART_UPLOAD(S3MetricsType.S3Request, "create_multi_part_upload"),
CREATE_MULTI_PART_UPLOAD_FAIL(S3MetricsType.S3Request, "create_multi_part_upload_fail"),
UPLOAD_PART(S3MetricsType.S3Request, "upload_part"),
UPLOAD_PART_FAIL(S3MetricsType.S3Request, "upload_part_fail"),
UPLOAD_PART_COPY(S3MetricsType.S3Request, "upload_part_copy"),
UPLOAD_PART_COPY_FAIL(S3MetricsType.S3Request, "upload_part_copy_fail"),
COMPLETE_MULTI_PART_UPLOAD(S3MetricsType.S3Request, "complete_multi_part_upload"),
COMPLETE_MULTI_PART_UPLOAD_FAIL(S3MetricsType.S3Request, "complete_multi_part_upload_fail"),
/* S3 request operations end */

/* S3 object operations start */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ void mergedRangeRead0(String path, long start, long end, CompletableFuture<ByteB
});
})
.exceptionally(ex -> {
S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.GET_OBJECT_FAIL, size);
S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.GET_OBJECT, size, false);
if (isUnrecoverable(ex)) {
LOGGER.error("GetObject for object {} [{}, {}) fail", path, start, end, ex);
cf.completeExceptionally(ex);
Expand Down Expand Up @@ -344,7 +344,7 @@ private void write0(String path, ByteBuf data, CompletableFuture<Void> cf) {
cf.complete(null);
data.release();
}).exceptionally(ex -> {
S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.PUT_OBJECT_FAIL, objectSize);
S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.PUT_OBJECT, objectSize, false);
if (isUnrecoverable(ex)) {
LOGGER.error("PutObject for object {} fail", path, ex);
cf.completeExceptionally(ex);
Expand All @@ -370,7 +370,7 @@ public CompletableFuture<Void> delete(String path) {
S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.DELETE_OBJECT);
LOGGER.info("[ControllerS3Operator]: Delete object finished, path: {}, cost: {}", path, timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
}).exceptionally(ex -> {
S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.DELETE_OBJECT_FAIL);
S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.DELETE_OBJECT, false);
LOGGER.info("[ControllerS3Operator]: Delete object failed, path: {}, cost: {}, ex: {}", path, timerUtil.elapsedAs(TimeUnit.NANOSECONDS), ex.getMessage());
return null;
});
Expand All @@ -394,7 +394,7 @@ public CompletableFuture<List<String>> delete(List<String> objectKeys) {
LOGGER.info("[ControllerS3Operator]: Delete objects finished, count: {}, cost: {}", resp.deleted().size(), timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
return resp.deleted().stream().map(DeletedObject::key).collect(Collectors.toList());
}).exceptionally(ex -> {
S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.DELETE_OBJECTS_FAIL);
S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.DELETE_OBJECTS, false);
LOGGER.info("[ControllerS3Operator]: Delete objects failed, count: {}, cost: {}, ex: {}", objectKeys.size(), timerUtil.elapsedAs(TimeUnit.NANOSECONDS), ex.getMessage());
return Collections.emptyList();
});
Expand All @@ -418,7 +418,7 @@ void createMultipartUpload0(String path, CompletableFuture<String> cf) {
S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.CREATE_MULTI_PART_UPLOAD);
cf.complete(createMultipartUploadResponse.uploadId());
}).exceptionally(ex -> {
S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.CREATE_MULTI_PART_UPLOAD_FAIL);
S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.CREATE_MULTI_PART_UPLOAD, false);
if (isUnrecoverable(ex)) {
LOGGER.error("CreateMultipartUpload for object {} fail", path, ex);
cf.completeExceptionally(ex);
Expand Down Expand Up @@ -465,7 +465,7 @@ private void uploadPart0(String path, String uploadId, int partNumber, ByteBuf p
CompletedPart completedPart = CompletedPart.builder().partNumber(partNumber).eTag(uploadPartResponse.eTag()).build();
cf.complete(completedPart);
}).exceptionally(ex -> {
S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.UPLOAD_PART_FAIL, size);
S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.UPLOAD_PART, size, false);
if (isUnrecoverable(ex)) {
LOGGER.error("UploadPart for object {}-{} fail", path, partNumber, ex);
cf.completeExceptionally(ex);
Expand Down Expand Up @@ -499,7 +499,7 @@ private void uploadPartCopy0(String sourcePath, String path, long start, long en
.eTag(uploadPartCopyResponse.copyPartResult().eTag()).build();
cf.complete(completedPart);
}).exceptionally(ex -> {
S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.UPLOAD_PART_COPY_FAIL);
S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.UPLOAD_PART_COPY, false);
if (isUnrecoverable(ex)) {
LOGGER.warn("UploadPartCopy for object {}-{} fail", path, partNumber, ex);
cf.completeExceptionally(ex);
Expand Down Expand Up @@ -531,7 +531,7 @@ public void completeMultipartUpload0(String path, String uploadId, List<Complete
S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.COMPLETE_MULTI_PART_UPLOAD);
cf.complete(null);
}).exceptionally(ex -> {
S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.COMPLETE_MULTI_PART_UPLOAD_FAIL);
S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.COMPLETE_MULTI_PART_UPLOAD, false);
if (isUnrecoverable(ex)) {
LOGGER.error("CompleteMultipartUpload for object {} fail", path, ex);
cf.completeExceptionally(ex);
Expand Down
Loading