Skip to content

Commit

Permalink
added display control plane
Browse files Browse the repository at this point in the history
  • Loading branch information
fulmicoton committed Jan 12, 2024
1 parent bf42e51 commit 2edd1e2
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 3 deletions.
2 changes: 2 additions & 0 deletions quickwit/quickwit-control-plane/src/control_plane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,8 @@ impl Handler<ControlPlanLoop> for ControlPlane {
_message: ControlPlanLoop,
ctx: &ActorContext<Self>,
) -> Result<(), ActorExitStatus> {
self.indexing_scheduler
.schedule_indexing_plan_if_needed(&self.model);
self.indexing_scheduler.control_running_plan(&self.model);
ctx.schedule_self_msg(CONTROL_PLAN_LOOP_INTERVAL, ControlPlanLoop)
.await;
Expand Down
3 changes: 3 additions & 0 deletions quickwit/quickwit-control-plane/src/indexing_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ impl PhysicalIndexingPlan {
}

pub fn normalize(&mut self) {
self.indexing_tasks_per_indexer_id.retain(|_indexer_id, tasks| {
!tasks.is_empty()
});
for tasks in self.indexing_tasks_per_indexer_id.values_mut() {
tasks.sort_by(|left, right| {
left.index_uid
Expand Down
37 changes: 36 additions & 1 deletion quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,11 @@ use quickwit_proto::indexing::{
ApplyIndexingPlanRequest, CpuCapacity, IndexingService, IndexingTask, PIPELINE_FULL_CAPACITY,
};
use quickwit_proto::metastore::SourceType;
use quickwit_proto::types::{NodeId, ShardId};
use quickwit_proto::types::{IndexUid, NodeId, ShardId};
use scheduling::{SourceToSchedule, SourceToScheduleType};
use serde::Serialize;
use tracing::{debug, error, info, warn};
use ulid::Ulid;

use crate::indexing_plan::PhysicalIndexingPlan;
use crate::indexing_scheduler::scheduling::build_physical_indexing_plan;
Expand Down Expand Up @@ -224,13 +225,24 @@ impl IndexingScheduler {
&indexer_id_to_cpu_capacities,
self.state.last_applied_physical_plan.as_ref(),
);
println!("\n\n\n\n\n\n\n===============");
println!("# CURRENT PLAN");
println!("---------------");
display_plan(&new_physical_plan);
println!("# PREVIOUS PLAN");
println!("---------------");
if let Some(last_applied_plan) = self.state.last_applied_physical_plan.as_ref() {
display_plan(last_applied_plan);
}
println!("===============");
if let Some(last_applied_plan) = &self.state.last_applied_physical_plan {
let plans_diff = get_indexing_plans_diff(
last_applied_plan.indexing_tasks_per_indexer(),
new_physical_plan.indexing_tasks_per_indexer(),
);
// No need to apply the new plan as it is the same as the old one.
if plans_diff.is_empty() {
info!("no difference");
return;
}
}
Expand Down Expand Up @@ -324,6 +336,29 @@ impl IndexingScheduler {
}
}

fn display_plan(plan: &PhysicalIndexingPlan) {
for (node, tasks) in plan.indexing_tasks_per_indexer() {
println!("{node}");
for task in tasks {
if task.source_id == "_ingest-source" {
if task.shard_ids.is_empty() {
continue;
}
let index_uid = IndexUid::parse(&task.index_uid).unwrap();
println!(
" {:.5}:{} => {:?}",
task.pipeline_uid
.as_ref()
.map(|pipeline_uid| pipeline_uid.0)
.unwrap_or(Ulid::new()),
index_uid.index_id(),
task.shard_ids
);
}
}
}
}

struct IndexingPlansDiff<'a> {
pub missing_node_ids: FnvHashSet<&'a str>,
pub unplanned_node_ids: FnvHashSet<&'a str>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,6 @@ fn convert_scheduling_solution_to_physical_plan(
);

new_physical_plan.normalize();

new_physical_plan
}

Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-proto/src/types/pipeline_uid.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ const ULID_SIZE: usize = 16;

/// A pipeline uid identify an indexing pipeline and an indexing task.
#[derive(Clone, Copy, Default, Hash, Eq, PartialEq, Ord, PartialOrd)]
pub struct PipelineUid(Ulid);
pub struct PipelineUid(pub Ulid);

impl std::fmt::Debug for PipelineUid {
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
Expand Down

0 comments on commit 2edd1e2

Please sign in to comment.