From 652453a83be4b3d0ef427a87599e9773ad95b29a Mon Sep 17 00:00:00 2001 From: Adrien Guillo Date: Tue, 13 Feb 2024 12:49:48 -0500 Subject: [PATCH] Limit the number of merge pipelines that can be spawned concurrently (#4574) --- .../src/actors/indexing_pipeline.rs | 14 +++++++++----- .../quickwit-indexing/src/actors/merge_pipeline.rs | 13 ++++++++++++- 2 files changed, 21 insertions(+), 6 deletions(-) diff --git a/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs b/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs index e24f53ddafa..ed010e419a7 100644 --- a/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs +++ b/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs @@ -312,15 +312,19 @@ impl IndexingPipeline { let _spawn_pipeline_permit = ctx .protect_future(SPAWN_PIPELINE_SEMAPHORE.acquire()) .await - .expect("The semaphore should not be closed."); + .expect("semaphore should not be closed"); + self.statistics.num_spawn_attempts += 1; - let index_id = &self.params.pipeline_id.index_uid.index_id; - let source_id = self.params.pipeline_id.source_id.as_str(); self.kill_switch = ctx.kill_switch().child(); + + let index_id = &self.params.pipeline_id.index_uid.index_id; + let source_id = &self.params.pipeline_id.source_id; + info!( - index_id=%index_id, - source_id=%source_id, + index_id, + source_id, pipeline_uid=%self.params.pipeline_id.pipeline_uid, + root_dir=%self.params.indexing_directory.path().display(), "spawning indexing pipeline", ); let (source_mailbox, source_inbox) = ctx diff --git a/quickwit/quickwit-indexing/src/actors/merge_pipeline.rs b/quickwit/quickwit-indexing/src/actors/merge_pipeline.rs index fa40f260c21..afd4a077134 100644 --- a/quickwit/quickwit-indexing/src/actors/merge_pipeline.rs +++ b/quickwit/quickwit-indexing/src/actors/merge_pipeline.rs @@ -38,6 +38,7 @@ use quickwit_proto::metastore::{ ListSplitsRequest, MetastoreError, MetastoreService, MetastoreServiceClient, }; use time::OffsetDateTime; +use tokio::sync::Semaphore; use tracing::{debug, error, info, instrument}; use super::MergeSchedulerService; @@ -49,6 +50,11 @@ use crate::merge_policy::MergePolicy; use crate::models::MergeStatistics; use crate::split_store::IndexingSplitStore; +/// Spawning a merge pipeline puts a lot of pressure on the metastore so +/// we rely on this semaphore to limit the number of merge pipelines that can be spawned +/// concurrently. +static SPAWN_PIPELINE_SEMAPHORE: Semaphore = Semaphore::const_new(10); + #[derive(Debug)] struct ObserveLoop; @@ -208,6 +214,11 @@ impl MergePipeline { // TODO: Should return an error saying whether we can retry or not. #[instrument(name="spawn_merge_pipeline", level="info", skip_all, fields(index=self.params.pipeline_id.index_uid.index_id, gen=self.generation()))] async fn spawn_pipeline(&mut self, ctx: &ActorContext) -> anyhow::Result<()> { + let _spawn_pipeline_permit = ctx + .protect_future(SPAWN_PIPELINE_SEMAPHORE.acquire()) + .await + .expect("semaphore should not be closed"); + self.statistics.num_spawn_attempts += 1; self.kill_switch = ctx.kill_switch().child(); @@ -217,7 +228,7 @@ impl MergePipeline { pipeline_uid=%self.params.pipeline_id.pipeline_uid, root_dir=%self.params.indexing_directory.path().display(), merge_policy=?self.params.merge_policy, - "spawn merge pipeline", + "spawning merge pipeline", ); let query = ListSplitsQuery::for_index(self.params.pipeline_id.index_uid.clone()) .with_split_state(SplitState::Published)