diff --git a/quickwit/quickwit-control-plane/src/control_plane.rs b/quickwit/quickwit-control-plane/src/control_plane.rs index a281201ad52..8b7da39f47f 100644 --- a/quickwit/quickwit-control-plane/src/control_plane.rs +++ b/quickwit/quickwit-control-plane/src/control_plane.rs @@ -225,7 +225,8 @@ impl Handler for ControlPlane { _message: ControlPlanLoop, ctx: &ActorContext, ) -> Result<(), ActorExitStatus> { - self.indexing_scheduler.schedule_indexing_plan_if_needed(&self.model); + self.indexing_scheduler + .schedule_indexing_plan_if_needed(&self.model); self.indexing_scheduler.control_running_plan(&self.model); ctx.schedule_self_msg(CONTROL_PLAN_LOOP_INTERVAL, ControlPlanLoop) .await; diff --git a/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs b/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs index 4914c3f4b6f..a274c3cf9f0 100644 --- a/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs +++ b/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs @@ -30,10 +30,11 @@ use quickwit_proto::indexing::{ ApplyIndexingPlanRequest, CpuCapacity, IndexingService, IndexingTask, PIPELINE_FULL_CAPACITY, }; use quickwit_proto::metastore::SourceType; -use quickwit_proto::types::{NodeId, ShardId}; +use quickwit_proto::types::{IndexUid, NodeId, ShardId}; use scheduling::{SourceToSchedule, SourceToScheduleType}; use serde::Serialize; use tracing::{debug, error, info, warn}; +use ulid::Ulid; use crate::indexing_plan::PhysicalIndexingPlan; use crate::indexing_scheduler::scheduling::build_physical_indexing_plan; @@ -212,6 +213,16 @@ impl IndexingScheduler { &indexer_id_to_cpu_capacities, self.state.last_applied_physical_plan.as_ref(), ); + println!("\n\n\n\n\n\n\n==============="); + println!("# CURRENT PLAN"); + println!("---------------"); + display_plan(&new_physical_plan); + println!("# PREVIOUS PLAN"); + println!("---------------"); + if let Some(last_applied_plan) = self.state.last_applied_physical_plan.as_ref() { + display_plan(last_applied_plan); + } + println!("==============="); if let Some(last_applied_plan) = &self.state.last_applied_physical_plan { let plans_diff = get_indexing_plans_diff( last_applied_plan.indexing_tasks_per_indexer(), @@ -219,6 +230,7 @@ impl IndexingScheduler { ); // No need to apply the new plan as it is the same as the old one. if plans_diff.is_empty() { + info!("no difference"); return; } } @@ -315,6 +327,26 @@ impl IndexingScheduler { } } +fn display_plan(plan: &PhysicalIndexingPlan) { + for (node, tasks) in plan.indexing_tasks_per_indexer() { + println!("{node}"); + for task in tasks { + if task.source_id == "_ingest-source" { + let index_uid = IndexUid::parse(&task.index_uid).unwrap(); + println!( + " {:.5}:{} => {:?}", + task.pipeline_uid + .as_ref() + .map(|pipeline_uid| pipeline_uid.0) + .unwrap_or(Ulid::new()), + index_uid.index_id(), + task.shard_ids + ); + } + } + } +} + struct IndexingPlansDiff<'a> { pub missing_node_ids: FnvHashSet<&'a str>, pub unplanned_node_ids: FnvHashSet<&'a str>, diff --git a/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/mod.rs b/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/mod.rs index c94be5f5d2f..8687d240049 100644 --- a/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/mod.rs +++ b/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/mod.rs @@ -396,7 +396,6 @@ fn convert_scheduling_solution_to_physical_plan( } new_physical_plan.normalize(); - new_physical_plan } diff --git a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs index 0a9339a70da..406c17812f6 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs @@ -965,7 +965,7 @@ impl IngesterService for Ingester { info!(shard=%shard, "currently in shard hashmap"); } for queue in state_guard.mrecordlog.list_queues() { - info!(queue=queue, "currently in mrecordlog"); + info!(queue = queue, "currently in mrecordlog"); } info!(queues=?remove_queue_ids, "removing queues"); for queue_id in remove_queue_ids { diff --git a/quickwit/quickwit-proto/src/types/pipeline_uid.rs b/quickwit/quickwit-proto/src/types/pipeline_uid.rs index e571dedd9e0..4760884a958 100644 --- a/quickwit/quickwit-proto/src/types/pipeline_uid.rs +++ b/quickwit/quickwit-proto/src/types/pipeline_uid.rs @@ -28,7 +28,7 @@ const ULID_SIZE: usize = 16; /// A pipeline uid identify an indexing pipeline and an indexing task. #[derive(Clone, Copy, Default, Hash, Eq, PartialEq, Ord, PartialOrd)] -pub struct PipelineUid(Ulid); +pub struct PipelineUid(pub Ulid); impl std::fmt::Debug for PipelineUid { fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {