Skip to content

Commit

Permalink
Limit the number of merge pipelines that can be spawned concurrently (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
guilload authored Feb 13, 2024
1 parent 4ae0d97 commit 652453a
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 6 deletions.
14 changes: 9 additions & 5 deletions quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,15 +312,19 @@ impl IndexingPipeline {
let _spawn_pipeline_permit = ctx
.protect_future(SPAWN_PIPELINE_SEMAPHORE.acquire())
.await
.expect("The semaphore should not be closed.");
.expect("semaphore should not be closed");

self.statistics.num_spawn_attempts += 1;
let index_id = &self.params.pipeline_id.index_uid.index_id;
let source_id = self.params.pipeline_id.source_id.as_str();
self.kill_switch = ctx.kill_switch().child();

let index_id = &self.params.pipeline_id.index_uid.index_id;
let source_id = &self.params.pipeline_id.source_id;

info!(
index_id=%index_id,
source_id=%source_id,
index_id,
source_id,
pipeline_uid=%self.params.pipeline_id.pipeline_uid,
root_dir=%self.params.indexing_directory.path().display(),
"spawning indexing pipeline",
);
let (source_mailbox, source_inbox) = ctx
Expand Down
13 changes: 12 additions & 1 deletion quickwit/quickwit-indexing/src/actors/merge_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use quickwit_proto::metastore::{
ListSplitsRequest, MetastoreError, MetastoreService, MetastoreServiceClient,
};
use time::OffsetDateTime;
use tokio::sync::Semaphore;
use tracing::{debug, error, info, instrument};

use super::MergeSchedulerService;
Expand All @@ -49,6 +50,11 @@ use crate::merge_policy::MergePolicy;
use crate::models::MergeStatistics;
use crate::split_store::IndexingSplitStore;

/// Spawning a merge pipeline puts a lot of pressure on the metastore so
/// we rely on this semaphore to limit the number of merge pipelines that can be spawned
/// concurrently.
static SPAWN_PIPELINE_SEMAPHORE: Semaphore = Semaphore::const_new(10);

#[derive(Debug)]
struct ObserveLoop;

Expand Down Expand Up @@ -208,6 +214,11 @@ impl MergePipeline {
// TODO: Should return an error saying whether we can retry or not.
#[instrument(name="spawn_merge_pipeline", level="info", skip_all, fields(index=self.params.pipeline_id.index_uid.index_id, gen=self.generation()))]
async fn spawn_pipeline(&mut self, ctx: &ActorContext<Self>) -> anyhow::Result<()> {
let _spawn_pipeline_permit = ctx
.protect_future(SPAWN_PIPELINE_SEMAPHORE.acquire())
.await
.expect("semaphore should not be closed");

self.statistics.num_spawn_attempts += 1;
self.kill_switch = ctx.kill_switch().child();

Expand All @@ -217,7 +228,7 @@ impl MergePipeline {
pipeline_uid=%self.params.pipeline_id.pipeline_uid,
root_dir=%self.params.indexing_directory.path().display(),
merge_policy=?self.params.merge_policy,
"spawn merge pipeline",
"spawning merge pipeline",
);
let query = ListSplitsQuery::for_index(self.params.pipeline_id.index_uid.clone())
.with_split_state(SplitState::Published)
Expand Down

0 comments on commit 652453a

Please sign in to comment.