Skip to content

Commit

Permalink
feat(s3stream): support filter operation status by label (#853)
Browse files Browse the repository at this point in the history
Signed-off-by: Shichao Nie <[email protected]>
  • Loading branch information
SCNieh authored Dec 25, 2023
1 parent 504ed6d commit c591925
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -271,13 +271,22 @@ public static void recordOperationNum(long value, S3Operation operation) {
}

public static void recordOperationLatency(long value, S3Operation operation) {
recordOperationLatency(value, operation, 0);
recordOperationLatency(value, operation, 0, true);
}

public static void recordOperationLatency(long value, S3Operation operation, boolean isSuccess) {
recordOperationLatency(value, operation, 0, isSuccess);
}

public static void recordOperationLatency(long value, S3Operation operation, long size) {
recordOperationLatency(value, operation, size, true);
}

public static void recordOperationLatency(long value, S3Operation operation, long size, boolean isSuccess) {
AttributesBuilder attributesBuilder = newAttributesBuilder()
.put(S3StreamMetricsConstant.LABEL_OPERATION_TYPE, operation.getType().getName())
.put(S3StreamMetricsConstant.LABEL_OPERATION_NAME, operation.getName());
.put(S3StreamMetricsConstant.LABEL_OPERATION_NAME, operation.getName())
.put(S3StreamMetricsConstant.LABEL_STATUS, isSuccess ? "success" : "failed");
if (operation == S3Operation.GET_OBJECT || operation == S3Operation.PUT_OBJECT || operation == S3Operation.UPLOAD_PART) {
attributesBuilder.put(S3StreamMetricsConstant.LABEL_SIZE_NAME, getObjectBucketLabel(size));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,21 +47,13 @@ public enum S3Operation {

/* S3 request operations start */
GET_OBJECT(S3MetricsType.S3Request, "get_object"),
GET_OBJECT_FAIL(S3MetricsType.S3Request, "get_object_fail"),
PUT_OBJECT(S3MetricsType.S3Request, "put_object"),
PUT_OBJECT_FAIL(S3MetricsType.S3Request, "put_object_fail"),
DELETE_OBJECT(S3MetricsType.S3Request, "delete_object"),
DELETE_OBJECT_FAIL(S3MetricsType.S3Request, "delete_object_fail"),
DELETE_OBJECTS(S3MetricsType.S3Request, "delete_objects"),
DELETE_OBJECTS_FAIL(S3MetricsType.S3Request, "delete_objects_fail"),
CREATE_MULTI_PART_UPLOAD(S3MetricsType.S3Request, "create_multi_part_upload"),
CREATE_MULTI_PART_UPLOAD_FAIL(S3MetricsType.S3Request, "create_multi_part_upload_fail"),
UPLOAD_PART(S3MetricsType.S3Request, "upload_part"),
UPLOAD_PART_FAIL(S3MetricsType.S3Request, "upload_part_fail"),
UPLOAD_PART_COPY(S3MetricsType.S3Request, "upload_part_copy"),
UPLOAD_PART_COPY_FAIL(S3MetricsType.S3Request, "upload_part_copy_fail"),
COMPLETE_MULTI_PART_UPLOAD(S3MetricsType.S3Request, "complete_multi_part_upload"),
COMPLETE_MULTI_PART_UPLOAD_FAIL(S3MetricsType.S3Request, "complete_multi_part_upload_fail"),
/* S3 request operations end */

/* S3 object operations start */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ void mergedRangeRead0(String path, long start, long end, CompletableFuture<ByteB
});
})
.exceptionally(ex -> {
S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.GET_OBJECT_FAIL, size);
S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.GET_OBJECT, size, false);
if (isUnrecoverable(ex)) {
LOGGER.error("GetObject for object {} [{}, {}) fail", path, start, end, ex);
cf.completeExceptionally(ex);
Expand Down Expand Up @@ -344,7 +344,7 @@ private void write0(String path, ByteBuf data, CompletableFuture<Void> cf) {
cf.complete(null);
data.release();
}).exceptionally(ex -> {
S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.PUT_OBJECT_FAIL, objectSize);
S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.PUT_OBJECT, objectSize, false);
if (isUnrecoverable(ex)) {
LOGGER.error("PutObject for object {} fail", path, ex);
cf.completeExceptionally(ex);
Expand All @@ -370,7 +370,7 @@ public CompletableFuture<Void> delete(String path) {
S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.DELETE_OBJECT);
LOGGER.info("[ControllerS3Operator]: Delete object finished, path: {}, cost: {}", path, timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
}).exceptionally(ex -> {
S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.DELETE_OBJECT_FAIL);
S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.DELETE_OBJECT, false);
LOGGER.info("[ControllerS3Operator]: Delete object failed, path: {}, cost: {}, ex: {}", path, timerUtil.elapsedAs(TimeUnit.NANOSECONDS), ex.getMessage());
return null;
});
Expand All @@ -394,7 +394,7 @@ public CompletableFuture<List<String>> delete(List<String> objectKeys) {
LOGGER.info("[ControllerS3Operator]: Delete objects finished, count: {}, cost: {}", resp.deleted().size(), timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
return resp.deleted().stream().map(DeletedObject::key).collect(Collectors.toList());
}).exceptionally(ex -> {
S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.DELETE_OBJECTS_FAIL);
S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.DELETE_OBJECTS, false);
LOGGER.info("[ControllerS3Operator]: Delete objects failed, count: {}, cost: {}, ex: {}", objectKeys.size(), timerUtil.elapsedAs(TimeUnit.NANOSECONDS), ex.getMessage());
return Collections.emptyList();
});
Expand All @@ -418,7 +418,7 @@ void createMultipartUpload0(String path, CompletableFuture<String> cf) {
S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.CREATE_MULTI_PART_UPLOAD);
cf.complete(createMultipartUploadResponse.uploadId());
}).exceptionally(ex -> {
S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.CREATE_MULTI_PART_UPLOAD_FAIL);
S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.CREATE_MULTI_PART_UPLOAD, false);
if (isUnrecoverable(ex)) {
LOGGER.error("CreateMultipartUpload for object {} fail", path, ex);
cf.completeExceptionally(ex);
Expand Down Expand Up @@ -465,7 +465,7 @@ private void uploadPart0(String path, String uploadId, int partNumber, ByteBuf p
CompletedPart completedPart = CompletedPart.builder().partNumber(partNumber).eTag(uploadPartResponse.eTag()).build();
cf.complete(completedPart);
}).exceptionally(ex -> {
S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.UPLOAD_PART_FAIL, size);
S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.UPLOAD_PART, size, false);
if (isUnrecoverable(ex)) {
LOGGER.error("UploadPart for object {}-{} fail", path, partNumber, ex);
cf.completeExceptionally(ex);
Expand Down Expand Up @@ -499,7 +499,7 @@ private void uploadPartCopy0(String sourcePath, String path, long start, long en
.eTag(uploadPartCopyResponse.copyPartResult().eTag()).build();
cf.complete(completedPart);
}).exceptionally(ex -> {
S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.UPLOAD_PART_COPY_FAIL);
S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.UPLOAD_PART_COPY, false);
if (isUnrecoverable(ex)) {
LOGGER.warn("UploadPartCopy for object {}-{} fail", path, partNumber, ex);
cf.completeExceptionally(ex);
Expand Down Expand Up @@ -531,7 +531,7 @@ public void completeMultipartUpload0(String path, String uploadId, List<Complete
S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.COMPLETE_MULTI_PART_UPLOAD);
cf.complete(null);
}).exceptionally(ex -> {
S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.COMPLETE_MULTI_PART_UPLOAD_FAIL);
S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.COMPLETE_MULTI_PART_UPLOAD, false);
if (isUnrecoverable(ex)) {
LOGGER.error("CompleteMultipartUpload for object {} fail", path, ex);
cf.completeExceptionally(ex);
Expand Down

0 comments on commit c591925

Please sign in to comment.