Skip to content

Commit

Permalink
fix: run purge jobs in another scheduler (#3621)
Browse files Browse the repository at this point in the history
  • Loading branch information
evenyag authored Apr 1, 2024
1 parent 0eb023b commit bfd3257
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 11 deletions.
10 changes: 5 additions & 5 deletions src/mito2/src/region/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ pub(crate) struct RegionOpener {
memtable_builder_provider: MemtableBuilderProvider,
object_store_manager: ObjectStoreManagerRef,
region_dir: String,
scheduler: SchedulerRef,
purge_scheduler: SchedulerRef,
options: Option<RegionOptions>,
cache_manager: Option<CacheManagerRef>,
skip_wal_replay: bool,
Expand All @@ -71,7 +71,7 @@ impl RegionOpener {
region_dir: &str,
memtable_builder_provider: MemtableBuilderProvider,
object_store_manager: ObjectStoreManagerRef,
scheduler: SchedulerRef,
purge_scheduler: SchedulerRef,
intermediate_manager: IntermediateManager,
) -> RegionOpener {
RegionOpener {
Expand All @@ -80,7 +80,7 @@ impl RegionOpener {
memtable_builder_provider,
object_store_manager,
region_dir: normalize_dir(region_dir),
scheduler,
purge_scheduler,
options: None,
cache_manager: None,
skip_wal_replay: false,
Expand Down Expand Up @@ -204,7 +204,7 @@ impl RegionOpener {
access_layer: access_layer.clone(),
manifest_manager,
file_purger: Arc::new(LocalFilePurger::new(
self.scheduler,
self.purge_scheduler,
access_layer,
self.cache_manager,
)),
Expand Down Expand Up @@ -277,7 +277,7 @@ impl RegionOpener {
self.intermediate_manager.clone(),
));
let file_purger = Arc::new(LocalFilePurger::new(
self.scheduler.clone(),
self.purge_scheduler.clone(),
access_layer.clone(),
self.cache_manager.clone(),
));
Expand Down
19 changes: 16 additions & 3 deletions src/mito2/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ pub(crate) struct WorkerGroup {
workers: Vec<RegionWorker>,
/// Global background job scheduelr.
scheduler: SchedulerRef,
/// Scheduler for file purgers.
purge_scheduler: SchedulerRef,
/// Cache.
cache_manager: CacheManagerRef,
}
Expand All @@ -131,6 +133,9 @@ impl WorkerGroup {
.await?
.with_buffer_size(Some(config.inverted_index.write_buffer_size.as_bytes() as _));
let scheduler = Arc::new(LocalScheduler::new(config.max_background_jobs));
// We use another scheduler to avoid purge jobs blocking other jobs.
// A purge job is cheaper than other background jobs so they share the same job limit.
let purge_scheduler = Arc::new(LocalScheduler::new(config.max_background_jobs));
let write_cache = write_cache_from_config(
&config,
object_store_manager.clone(),
Expand All @@ -156,6 +161,7 @@ impl WorkerGroup {
object_store_manager: object_store_manager.clone(),
write_buffer_manager: write_buffer_manager.clone(),
scheduler: scheduler.clone(),
purge_scheduler: purge_scheduler.clone(),
listener: WorkerListener::default(),
cache_manager: cache_manager.clone(),
intermediate_manager: intermediate_manager.clone(),
Expand All @@ -168,6 +174,7 @@ impl WorkerGroup {
Ok(WorkerGroup {
workers,
scheduler,
purge_scheduler,
cache_manager,
})
}
Expand All @@ -178,6 +185,8 @@ impl WorkerGroup {

// Stops the scheduler gracefully.
self.scheduler.stop(true).await?;
// Stops the purge scheduler gracefully.
self.purge_scheduler.stop(true).await?;

try_join_all(self.workers.iter().map(|worker| worker.stop())).await?;

Expand Down Expand Up @@ -238,6 +247,7 @@ impl WorkerGroup {
))
});
let scheduler = Arc::new(LocalScheduler::new(config.max_background_jobs));
let purge_scheduler = Arc::new(LocalScheduler::new(config.max_background_jobs));
let intermediate_manager =
IntermediateManager::init_fs(&config.inverted_index.intermediate_path)
.await?
Expand Down Expand Up @@ -265,6 +275,7 @@ impl WorkerGroup {
object_store_manager: object_store_manager.clone(),
write_buffer_manager: write_buffer_manager.clone(),
scheduler: scheduler.clone(),
purge_scheduler: purge_scheduler.clone(),
listener: WorkerListener::new(listener.clone()),
cache_manager: cache_manager.clone(),
intermediate_manager: intermediate_manager.clone(),
Expand All @@ -277,6 +288,7 @@ impl WorkerGroup {
Ok(WorkerGroup {
workers,
scheduler,
purge_scheduler,
cache_manager,
})
}
Expand Down Expand Up @@ -323,6 +335,7 @@ struct WorkerStarter<S> {
object_store_manager: ObjectStoreManagerRef,
write_buffer_manager: WriteBufferManagerRef,
scheduler: SchedulerRef,
purge_scheduler: SchedulerRef,
listener: WorkerListener,
cache_manager: CacheManagerRef,
intermediate_manager: IntermediateManager,
Expand Down Expand Up @@ -351,7 +364,7 @@ impl<S: LogStore> WorkerStarter<S> {
Some(self.write_buffer_manager.clone()),
self.config,
),
scheduler: self.scheduler.clone(),
purge_scheduler: self.purge_scheduler.clone(),
write_buffer_manager: self.write_buffer_manager,
flush_scheduler: FlushScheduler::new(self.scheduler.clone()),
compaction_scheduler: CompactionScheduler::new(
Expand Down Expand Up @@ -507,8 +520,8 @@ struct RegionWorkerLoop<S> {
running: Arc<AtomicBool>,
/// Memtable builder provider for each region.
memtable_builder_provider: MemtableBuilderProvider,
/// Background job scheduler.
scheduler: SchedulerRef,
/// Background purge job scheduler.
purge_scheduler: SchedulerRef,
/// Engine write buffer manager.
write_buffer_manager: WriteBufferManagerRef,
/// Schedules background flush requests.
Expand Down
2 changes: 1 addition & 1 deletion src/mito2/src/worker/handle_catchup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
region.region_dir(),
self.memtable_builder_provider.clone(),
self.object_store_manager.clone(),
self.scheduler.clone(),
self.purge_scheduler.clone(),
self.intermediate_manager.clone(),
)
.cache(Some(self.cache_manager.clone()))
Expand Down
2 changes: 1 addition & 1 deletion src/mito2/src/worker/handle_create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
&request.region_dir,
self.memtable_builder_provider.clone(),
self.object_store_manager.clone(),
self.scheduler.clone(),
self.purge_scheduler.clone(),
self.intermediate_manager.clone(),
)
.metadata(metadata)
Expand Down
2 changes: 1 addition & 1 deletion src/mito2/src/worker/handle_open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
&request.region_dir,
self.memtable_builder_provider.clone(),
self.object_store_manager.clone(),
self.scheduler.clone(),
self.purge_scheduler.clone(),
self.intermediate_manager.clone(),
)
.skip_wal_replay(request.skip_wal_replay)
Expand Down

0 comments on commit bfd3257

Please sign in to comment.