From a3c38c0aba6efafa6995dc5d8e7f527a5beab017 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Fri, 12 Jan 2024 17:02:35 +0900 Subject: [PATCH] Removing sources from scheduling if there are no shards. (#4376) --- .../src/indexing_scheduler/mod.rs | 33 +++++++++++++++++-- .../src/indexing_scheduler/scheduling/mod.rs | 7 ++++ quickwit/quickwit-serve/src/debugging_api.rs | 2 +- 3 files changed, 39 insertions(+), 3 deletions(-) diff --git a/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs b/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs index aa0c69b8b8d..725967bba0b 100644 --- a/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs +++ b/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs @@ -139,12 +139,16 @@ fn get_sources_to_schedule(model: &ControlPlaneModel) -> Vec { } SourceType::IngestV2 => { // Expect: the source should exist since we just read it from `get_source_configs`. + // Note that we keep all shards, including Closed shards: + // A closed shards still needs to be indexed. let shard_ids: Vec = model .list_shards_for_source(&source_uid) .expect("source should exist") .map(|shard| shard.shard_id) .collect(); - + if shard_ids.is_empty() { + continue; + } sources.push(SourceToSchedule { source_uid, source_type: SourceToScheduleType::Sharded { @@ -695,7 +699,23 @@ mod tests { max_num_pipelines_per_indexer: NonZeroUsize::new(2).unwrap(), desired_num_pipelines: NonZeroUsize::new(2).unwrap(), enabled: true, - // ingest v1 + // ingest v2 + source_params: SourceParams::Ingest, + transform_config: None, + input_format: Default::default(), + }, + ) + .unwrap(); + // ingest v2 without any open shard is skipped. + model + .add_source( + &index_uid, + SourceConfig { + source_id: "ingest_v2_without_shard".to_string(), + max_num_pipelines_per_indexer: NonZeroUsize::new(2).unwrap(), + desired_num_pipelines: NonZeroUsize::new(2).unwrap(), + enabled: true, + // ingest v2 source_params: SourceParams::Ingest, transform_config: None, input_format: Default::default(), @@ -717,6 +737,14 @@ mod tests { }, ) .unwrap(); + let shard = Shard { + index_uid: index_uid.to_string(), + source_id: "ingest_v2".to_string(), + shard_id: 17, + shard_state: ShardState::Open as i32, + ..Default::default() + }; + model.insert_newly_opened_shards(&index_uid, &"ingest_v2".to_string(), vec![shard], 18); let shards: Vec = get_sources_to_schedule(&model); assert_eq!(shards.len(), 3); } @@ -816,6 +844,7 @@ mod tests { use quickwit_config::SourceInputFormat; use quickwit_proto::indexing::mcpu; + use quickwit_proto::ingest::{Shard, ShardState}; fn kafka_source_params_for_test() -> SourceParams { SourceParams::Kafka(KafkaSourceParams { diff --git a/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/mod.rs b/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/mod.rs index b05810ac929..031783e5539 100644 --- a/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/mod.rs +++ b/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/mod.rs @@ -452,11 +452,18 @@ fn add_shard_to_indexer( /// - 4) convert the new scheduling solution back to the real world by reallocating the shard ids. /// /// TODO cut into pipelines. +/// Panics if any sources has no shards. pub fn build_physical_indexing_plan( sources: &[SourceToSchedule], indexer_id_to_cpu_capacities: &FnvHashMap, previous_plan_opt: Option<&PhysicalIndexingPlan>, ) -> PhysicalIndexingPlan { + for source in sources { + if let SourceToScheduleType::Sharded { shard_ids, .. } = &source.source_type { + assert!(!shard_ids.is_empty()) + } + } + // Convert our problem to a scheduling problem. let mut id_to_ord_map = IdToOrdMap::default(); diff --git a/quickwit/quickwit-serve/src/debugging_api.rs b/quickwit/quickwit-serve/src/debugging_api.rs index c6616c37be5..ce139e69c6d 100644 --- a/quickwit/quickwit-serve/src/debugging_api.rs +++ b/quickwit/quickwit-serve/src/debugging_api.rs @@ -1,4 +1,4 @@ -// Copyright (C) 2023 Quickwit, Inc. +// Copyright (C) 2024 Quickwit, Inc. // // Quickwit is offered under the AGPL v3.0 and as commercial software. // For commercial licensing, contact us at hello@quickwit.io.