From 5a2e73a883b6881bccda01a248c7fe45c912eb62 Mon Sep 17 00:00:00 2001 From: Shichao Nie Date: Tue, 6 Feb 2024 19:37:14 +0800 Subject: [PATCH] fix(s3stream): prevent erorr logging when compaction is shutdown Signed-off-by: Shichao Nie --- .../automq/stream/s3/compact/CompactionManager.java | 2 +- .../automq/stream/s3/compact/CompactionUploader.java | 11 +++++++++-- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionManager.java b/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionManager.java index 07311f015..80dc72683 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionManager.java +++ b/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionManager.java @@ -138,7 +138,7 @@ private void scheduleNextCompaction(long delayMillis) { public void shutdown() { this.compactScheduledExecutor.shutdown(); this.bucketCallbackScheduledExecutor.shutdown(); - this.uploader.stop(); + this.uploader.shutdown(); } public CompletableFuture compact() { diff --git a/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionUploader.java b/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionUploader.java index 744719608..c702dfe47 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionUploader.java +++ b/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionUploader.java @@ -38,6 +38,7 @@ public class CompactionUploader { private CompletableFuture streamSetObjectIdCf = null; private DataBlockWriter streamSetObjectWriter = null; private volatile boolean isAborted = false; + private volatile boolean isShutdown = false; public CompactionUploader(ObjectManager objectManager, S3Operator s3Operator, Config config) { this.objectManager = objectManager; @@ -49,7 +50,8 @@ public CompactionUploader(ObjectManager objectManager, S3Operator s3Operator, Co ThreadUtils.createThreadFactory("compaction-stream-set-object-uploader-%d", true), LOGGER); } - public void stop() { + public void shutdown() { + this.isShutdown = true; this.streamSetObjectUploadPool.shutdown(); this.streamObjectUploadPool.shutdown(); } @@ -106,7 +108,12 @@ public CompletableFuture writeStreamObject(CompactedObject compact return streamObject; }).whenComplete((ret, ex) -> { if (ex != null) { - LOGGER.error("write to stream object {} failed", objectId, ex); + if (isShutdown) { + // TODO: remove this when we're able to abort object uploading gracefully + LOGGER.warn("write to stream object {} failed", objectId, ex); + } else { + LOGGER.error("write to stream object {} failed", objectId, ex); + } dataBlockWriter.release(); compactedObject.streamDataBlocks().forEach(StreamDataBlock::release); }