Skip to content

Commit

Permalink
minor
Browse files Browse the repository at this point in the history
  • Loading branch information
fulmicoton committed Apr 15, 2024
1 parent 1b63e46 commit 9eeee84
Show file tree
Hide file tree
Showing 7 changed files with 401 additions and 142 deletions.
4 changes: 4 additions & 0 deletions quickwit/quickwit-control-plane/src/indexing_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ impl PhysicalIndexingPlan {
&self.indexing_tasks_per_indexer_id
}

pub fn num_indexers(&self) -> usize {
self.indexing_tasks_per_indexer_id.len()
}

/// Returns the hashmap of (indexer ID, indexing tasks).
pub fn indexing_tasks_per_indexer_mut(&mut self) -> &mut FnvHashMap<String, Vec<IndexingTask>> {
&mut self.indexing_tasks_per_indexer_id
Expand Down
33 changes: 32 additions & 1 deletion quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ use tracing::{debug, info, warn};
use crate::indexing_plan::PhysicalIndexingPlan;
use crate::indexing_scheduler::change_tracker::{NotifyChangeOnDrop, RebuildNotifier};
use crate::indexing_scheduler::scheduling::build_physical_indexing_plan;
use crate::model::ControlPlaneModel;
use crate::metrics::ShardLocalityMetrics;
use crate::model::{ControlPlaneModel, ShardLocations};
use crate::{IndexerNodeInfo, IndexerPool};

pub(crate) const MIN_DURATION_BETWEEN_SCHEDULING: Duration =
Expand Down Expand Up @@ -238,6 +239,9 @@ impl IndexingScheduler {
self.state.last_applied_physical_plan.as_ref(),
&shard_locations,
);
let shard_locality_metrics =
get_shard_locality_metrics(&new_physical_plan, &shard_locations);
crate::metrics::CONTROL_PLANE_METRICS.set_shard_locality_metrics(shard_locality_metrics);
if let Some(last_applied_plan) = &self.state.last_applied_physical_plan {
let plans_diff = get_indexing_plans_diff(
last_applied_plan.indexing_tasks_per_indexer(),
Expand Down Expand Up @@ -374,6 +378,33 @@ impl<'a> IndexingPlansDiff<'a> {
}
}

fn get_shard_locality_metrics(
physical_plan: &PhysicalIndexingPlan,
shard_locations: &ShardLocations,
) -> ShardLocalityMetrics {
let mut num_local_shards = 0;
let mut num_remote_shards = 0;
for (indexer, tasks) in physical_plan.indexing_tasks_per_indexer() {
for task in tasks {
for shard_id in &task.shard_ids {
if shard_locations
.get_shard_locations(shard_id)
.iter()
.any(|node| node.as_str() == indexer)
{
num_local_shards += 1;
} else {
num_remote_shards += 1;
}
}
}
}
ShardLocalityMetrics {
num_remote_shards,
num_local_shards,
}
}

impl<'a> fmt::Debug for IndexingPlansDiff<'a> {
fn fmt(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
if self.has_same_nodes() && self.has_same_tasks() {
Expand Down
Loading

0 comments on commit 9eeee84

Please sign in to comment.