diff --git a/quickwit/quickwit-control-plane/src/control_plane.rs b/quickwit/quickwit-control-plane/src/control_plane.rs index 76ac9e38303..0bc1bb37cdc 100644 --- a/quickwit/quickwit-control-plane/src/control_plane.rs +++ b/quickwit/quickwit-control-plane/src/control_plane.rs @@ -257,6 +257,8 @@ impl Handler for ControlPlane { _message: ControlPlanLoop, ctx: &ActorContext, ) -> Result<(), ActorExitStatus> { + self.indexing_scheduler + .schedule_indexing_plan_if_needed(&self.model); self.indexing_scheduler.control_running_plan(&self.model); ctx.schedule_self_msg(CONTROL_PLAN_LOOP_INTERVAL, ControlPlanLoop) .await; diff --git a/quickwit/quickwit-control-plane/src/indexing_plan.rs b/quickwit/quickwit-control-plane/src/indexing_plan.rs index 0b063623814..da41aaf8bc8 100644 --- a/quickwit/quickwit-control-plane/src/indexing_plan.rs +++ b/quickwit/quickwit-control-plane/src/indexing_plan.rs @@ -65,6 +65,9 @@ impl PhysicalIndexingPlan { } pub fn normalize(&mut self) { + self.indexing_tasks_per_indexer_id.retain(|_indexer_id, tasks| { + !tasks.is_empty() + }); for tasks in self.indexing_tasks_per_indexer_id.values_mut() { tasks.sort_by(|left, right| { left.index_uid diff --git a/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs b/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs index 896a62bebb1..7978a5d6631 100644 --- a/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs +++ b/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs @@ -30,10 +30,11 @@ use quickwit_proto::indexing::{ ApplyIndexingPlanRequest, CpuCapacity, IndexingService, IndexingTask, PIPELINE_FULL_CAPACITY, }; use quickwit_proto::metastore::SourceType; -use quickwit_proto::types::{NodeId, ShardId}; +use quickwit_proto::types::{IndexUid, NodeId, ShardId}; use scheduling::{SourceToSchedule, SourceToScheduleType}; use serde::Serialize; use tracing::{debug, error, info, warn}; +use ulid::Ulid; use crate::indexing_plan::PhysicalIndexingPlan; use crate::indexing_scheduler::scheduling::build_physical_indexing_plan; @@ -224,6 +225,16 @@ impl IndexingScheduler { &indexer_id_to_cpu_capacities, self.state.last_applied_physical_plan.as_ref(), ); + println!("\n\n\n\n\n\n\n==============="); + println!("# CURRENT PLAN"); + println!("---------------"); + display_plan(&new_physical_plan); + println!("# PREVIOUS PLAN"); + println!("---------------"); + if let Some(last_applied_plan) = self.state.last_applied_physical_plan.as_ref() { + display_plan(last_applied_plan); + } + println!("==============="); if let Some(last_applied_plan) = &self.state.last_applied_physical_plan { let plans_diff = get_indexing_plans_diff( last_applied_plan.indexing_tasks_per_indexer(), @@ -231,6 +242,7 @@ impl IndexingScheduler { ); // No need to apply the new plan as it is the same as the old one. if plans_diff.is_empty() { + info!("no difference"); return; } } @@ -324,6 +336,29 @@ impl IndexingScheduler { } } +fn display_plan(plan: &PhysicalIndexingPlan) { + for (node, tasks) in plan.indexing_tasks_per_indexer() { + println!("{node}"); + for task in tasks { + if task.source_id == "_ingest-source" { + if task.shard_ids.is_empty() { + continue; + } + let index_uid = IndexUid::parse(&task.index_uid).unwrap(); + println!( + " {:.5}:{} => {:?}", + task.pipeline_uid + .as_ref() + .map(|pipeline_uid| pipeline_uid.0) + .unwrap_or(Ulid::new()), + index_uid.index_id(), + task.shard_ids + ); + } + } + } +} + struct IndexingPlansDiff<'a> { pub missing_node_ids: FnvHashSet<&'a str>, pub unplanned_node_ids: FnvHashSet<&'a str>, diff --git a/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/mod.rs b/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/mod.rs index 9b47e3abdc4..efa7b8bf179 100644 --- a/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/mod.rs +++ b/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/mod.rs @@ -412,7 +412,6 @@ fn convert_scheduling_solution_to_physical_plan( } new_physical_plan.normalize(); - new_physical_plan } diff --git a/quickwit/quickwit-proto/src/types/pipeline_uid.rs b/quickwit/quickwit-proto/src/types/pipeline_uid.rs index 11ae92c3a6d..3ebe6ee14c0 100644 --- a/quickwit/quickwit-proto/src/types/pipeline_uid.rs +++ b/quickwit/quickwit-proto/src/types/pipeline_uid.rs @@ -28,7 +28,7 @@ const ULID_SIZE: usize = 16; /// A pipeline uid identify an indexing pipeline and an indexing task. #[derive(Clone, Copy, Default, Hash, Eq, PartialEq, Ord, PartialOrd)] -pub struct PipelineUid(Ulid); +pub struct PipelineUid(pub Ulid); impl std::fmt::Debug for PipelineUid { fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {