diff --git a/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionManager.java b/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionManager.java index 07311f015..80dc72683 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionManager.java +++ b/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionManager.java @@ -138,7 +138,7 @@ private void scheduleNextCompaction(long delayMillis) { public void shutdown() { this.compactScheduledExecutor.shutdown(); this.bucketCallbackScheduledExecutor.shutdown(); - this.uploader.stop(); + this.uploader.shutdown(); } public CompletableFuture compact() { diff --git a/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionUploader.java b/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionUploader.java index 744719608..c702dfe47 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionUploader.java +++ b/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionUploader.java @@ -38,6 +38,7 @@ public class CompactionUploader { private CompletableFuture streamSetObjectIdCf = null; private DataBlockWriter streamSetObjectWriter = null; private volatile boolean isAborted = false; + private volatile boolean isShutdown = false; public CompactionUploader(ObjectManager objectManager, S3Operator s3Operator, Config config) { this.objectManager = objectManager; @@ -49,7 +50,8 @@ public CompactionUploader(ObjectManager objectManager, S3Operator s3Operator, Co ThreadUtils.createThreadFactory("compaction-stream-set-object-uploader-%d", true), LOGGER); } - public void stop() { + public void shutdown() { + this.isShutdown = true; this.streamSetObjectUploadPool.shutdown(); this.streamObjectUploadPool.shutdown(); } @@ -106,7 +108,12 @@ public CompletableFuture writeStreamObject(CompactedObject compact return streamObject; }).whenComplete((ret, ex) -> { if (ex != null) { - LOGGER.error("write to stream object {} failed", objectId, ex); + if (isShutdown) { + // TODO: remove this when we're able to abort object uploading gracefully + LOGGER.warn("write to stream object {} failed", objectId, ex); + } else { + LOGGER.error("write to stream object {} failed", objectId, ex); + } dataBlockWriter.release(); compactedObject.streamDataBlocks().forEach(StreamDataBlock::release); }