diff --git a/store/src/main/java/com/automq/rocketmq/store/S3StreamStore.java b/store/src/main/java/com/automq/rocketmq/store/S3StreamStore.java index d67cf9529..9fa9df79c 100644 --- a/store/src/main/java/com/automq/rocketmq/store/S3StreamStore.java +++ b/store/src/main/java/com/automq/rocketmq/store/S3StreamStore.java @@ -65,20 +65,6 @@ public S3StreamStore(StoreConfig storeConfig, S3StreamConfig streamConfig, Store StreamManager streamManager = new S3StreamManager(metadataService); ObjectManager objectManager = new S3ObjectManager(metadataService); - S3Operator defaultOperator = new DefaultS3Operator(streamConfig.s3Endpoint(), streamConfig.s3Region(), streamConfig.s3Bucket(), - streamConfig.s3ForcePathStyle(), streamConfig.s3AccessKey(), streamConfig.s3SecretKey()); - - WriteAheadLog writeAheadLog = BlockWALService.builder(s3Config.s3WALPath(), s3Config.s3WALCapacity()).config(s3Config).build(); - S3BlockCache blockCache = new DefaultS3BlockCache(s3Config.s3BlockCacheSize(), objectManager, defaultOperator); - - // Build the s3 storage - this.storage = new S3Storage(s3Config, writeAheadLog, streamManager, objectManager, blockCache, defaultOperator); - - // Build the compaction manager - S3Operator compactionOperator = new DefaultS3Operator(streamConfig.s3Endpoint(), streamConfig.s3Region(), streamConfig.s3Bucket(), - streamConfig.s3ForcePathStyle(), streamConfig.s3AccessKey(), streamConfig.s3SecretKey()); - this.compactionManager = new CompactionManager(s3Config, objectManager, streamManager, compactionOperator); - AsyncNetworkBandwidthLimiter networkInboundLimiter = null; AsyncNetworkBandwidthLimiter networkOutboundLimiter = null; @@ -97,6 +83,20 @@ public S3StreamStore(StoreConfig storeConfig, S3StreamConfig streamConfig, Store ); } + S3Operator defaultOperator = new DefaultS3Operator(streamConfig.s3Endpoint(), streamConfig.s3Region(), streamConfig.s3Bucket(), + streamConfig.s3ForcePathStyle(), streamConfig.s3AccessKey(), streamConfig.s3SecretKey(), networkInboundLimiter, networkOutboundLimiter, true); + + WriteAheadLog writeAheadLog = BlockWALService.builder(s3Config.s3WALPath(), s3Config.s3WALCapacity()).config(s3Config).build(); + S3BlockCache blockCache = new DefaultS3BlockCache(s3Config.s3BlockCacheSize(), objectManager, defaultOperator); + + // Build the s3 storage + this.storage = new S3Storage(s3Config, writeAheadLog, streamManager, objectManager, blockCache, defaultOperator); + + // Build the compaction manager + S3Operator compactionOperator = new DefaultS3Operator(streamConfig.s3Endpoint(), streamConfig.s3Region(), streamConfig.s3Bucket(), + streamConfig.s3ForcePathStyle(), streamConfig.s3AccessKey(), streamConfig.s3SecretKey(), networkInboundLimiter, networkOutboundLimiter, true); + this.compactionManager = new CompactionManager(s3Config, objectManager, streamManager, compactionOperator); + this.streamClient = new S3StreamClient(streamManager, storage, objectManager, defaultOperator, s3Config, networkInboundLimiter, networkOutboundLimiter); this.storeWorkingThreadPool = ThreadPoolMonitor.createAndMonitor( storeConfig.workingThreadPoolNums(),