Skip to content

Commit

Permalink
fix(store): fix bug network limiter (#574)
Browse files Browse the repository at this point in the history
Signed-off-by: SSpirits <[email protected]>
  • Loading branch information
ShadowySpirits authored Nov 6, 2023
1 parent 4fb98ce commit 6abb4c9
Showing 1 changed file with 14 additions and 14 deletions.
28 changes: 14 additions & 14 deletions store/src/main/java/com/automq/rocketmq/store/S3StreamStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,20 +65,6 @@ public S3StreamStore(StoreConfig storeConfig, S3StreamConfig streamConfig, Store
StreamManager streamManager = new S3StreamManager(metadataService);
ObjectManager objectManager = new S3ObjectManager(metadataService);

S3Operator defaultOperator = new DefaultS3Operator(streamConfig.s3Endpoint(), streamConfig.s3Region(), streamConfig.s3Bucket(),
streamConfig.s3ForcePathStyle(), streamConfig.s3AccessKey(), streamConfig.s3SecretKey());

WriteAheadLog writeAheadLog = BlockWALService.builder(s3Config.s3WALPath(), s3Config.s3WALCapacity()).config(s3Config).build();
S3BlockCache blockCache = new DefaultS3BlockCache(s3Config.s3BlockCacheSize(), objectManager, defaultOperator);

// Build the s3 storage
this.storage = new S3Storage(s3Config, writeAheadLog, streamManager, objectManager, blockCache, defaultOperator);

// Build the compaction manager
S3Operator compactionOperator = new DefaultS3Operator(streamConfig.s3Endpoint(), streamConfig.s3Region(), streamConfig.s3Bucket(),
streamConfig.s3ForcePathStyle(), streamConfig.s3AccessKey(), streamConfig.s3SecretKey());
this.compactionManager = new CompactionManager(s3Config, objectManager, streamManager, compactionOperator);

AsyncNetworkBandwidthLimiter networkInboundLimiter = null;
AsyncNetworkBandwidthLimiter networkOutboundLimiter = null;

Expand All @@ -97,6 +83,20 @@ public S3StreamStore(StoreConfig storeConfig, S3StreamConfig streamConfig, Store
);
}

S3Operator defaultOperator = new DefaultS3Operator(streamConfig.s3Endpoint(), streamConfig.s3Region(), streamConfig.s3Bucket(),
streamConfig.s3ForcePathStyle(), streamConfig.s3AccessKey(), streamConfig.s3SecretKey(), networkInboundLimiter, networkOutboundLimiter, true);

WriteAheadLog writeAheadLog = BlockWALService.builder(s3Config.s3WALPath(), s3Config.s3WALCapacity()).config(s3Config).build();
S3BlockCache blockCache = new DefaultS3BlockCache(s3Config.s3BlockCacheSize(), objectManager, defaultOperator);

// Build the s3 storage
this.storage = new S3Storage(s3Config, writeAheadLog, streamManager, objectManager, blockCache, defaultOperator);

// Build the compaction manager
S3Operator compactionOperator = new DefaultS3Operator(streamConfig.s3Endpoint(), streamConfig.s3Region(), streamConfig.s3Bucket(),
streamConfig.s3ForcePathStyle(), streamConfig.s3AccessKey(), streamConfig.s3SecretKey(), networkInboundLimiter, networkOutboundLimiter, true);
this.compactionManager = new CompactionManager(s3Config, objectManager, streamManager, compactionOperator);

this.streamClient = new S3StreamClient(streamManager, storage, objectManager, defaultOperator, s3Config, networkInboundLimiter, networkOutboundLimiter);
this.storeWorkingThreadPool = ThreadPoolMonitor.createAndMonitor(
storeConfig.workingThreadPoolNums(),
Expand Down

0 comments on commit 6abb4c9

Please sign in to comment.