From 1d618869c1b10dc09b830ac86320153a81c541e1 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Mon, 24 Jun 2024 17:57:56 +0900 Subject: [PATCH] Using the shard throughput information in the scheduling logic. --- .../src/indexing_scheduler/mod.rs | 44 ++++++++++++++----- .../src/indexing_scheduler/scheduling/mod.rs | 16 +++++++ .../scheduling/scheduling_logic.rs | 31 ++++++++++++- .../scheduling/scheduling_logic_model.rs | 2 +- 4 files changed, 81 insertions(+), 12 deletions(-) diff --git a/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs b/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs index 06476b692c9..45543609da2 100644 --- a/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs +++ b/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs @@ -33,7 +33,7 @@ use quickwit_proto::indexing::{ ApplyIndexingPlanRequest, CpuCapacity, IndexingService, IndexingTask, PIPELINE_FULL_CAPACITY, }; use quickwit_proto::metastore::SourceType; -use quickwit_proto::types::{NodeId, ShardId}; +use quickwit_proto::types::NodeId; use scheduling::{SourceToSchedule, SourceToScheduleType}; use serde::Serialize; use tracing::{debug, info, warn}; @@ -42,7 +42,7 @@ use crate::indexing_plan::PhysicalIndexingPlan; use crate::indexing_scheduler::change_tracker::{NotifyChangeOnDrop, RebuildNotifier}; use crate::indexing_scheduler::scheduling::build_physical_indexing_plan; use crate::metrics::ShardLocalityMetrics; -use crate::model::{ControlPlaneModel, ShardLocations}; +use crate::model::{ControlPlaneModel, ShardEntry, ShardLocations}; use crate::{IndexerNodeInfo, IndexerPool}; pub(crate) const MIN_DURATION_BETWEEN_SCHEDULING: Duration = @@ -121,6 +121,28 @@ impl fmt::Debug for IndexingScheduler { } } +/// Computes the CPU load associated to a single shard of a given index. +/// +/// The array passed contains all of data we have about the shard of the index. +/// This function averages their statistics. +/// +/// For the moment, this function only takes in account the measured throughput, +/// and assumes a constant CPU usage of 4 vCPU = 30mb/s. +/// +/// It does not take in account the variation that could raise from the different +/// doc mapping / nature of the data, etc. +fn compute_load_per_shard(shard_entries: &[&ShardEntry]) -> NonZeroU32 { + let num_shards = shard_entries.len().max(1) as u32; + let average_throughput_per_shard: u32 = shard_entries + .iter() + .map(|shard_entry| u32::from(shard_entry.ingestion_rate.0)) + .sum::() + .div_ceil(num_shards); + let num_cpu_millis = (PIPELINE_FULL_CAPACITY.cpu_millis() * average_throughput_per_shard) / 20; + const MIN_CPU_LOAD_PER_SHARD: u32 = 50u32; + NonZeroU32::new(num_cpu_millis.max(MIN_CPU_LOAD_PER_SHARD)).unwrap() +} + fn get_sources_to_schedule(model: &ControlPlaneModel) -> Vec { let mut sources = Vec::new(); @@ -147,22 +169,24 @@ fn get_sources_to_schedule(model: &ControlPlaneModel) -> Vec { // Expect: the source should exist since we just read it from `get_source_configs`. // Note that we keep all shards, including Closed shards: // A closed shards still needs to be indexed. - let shard_ids: Vec = model + let shard_entries: Vec<&ShardEntry> = model .get_shards_for_source(&source_uid) .expect("source should exist") - .keys() - .cloned() + .values() .collect(); - if shard_ids.is_empty() { + if shard_entries.is_empty() { continue; } + let shard_ids = shard_entries + .iter() + .map(|shard_entry| shard_entry.shard_id().clone()) + .collect(); + let load_per_shard = compute_load_per_shard(&shard_entries[..]); sources.push(SourceToSchedule { source_uid, source_type: SourceToScheduleType::Sharded { shard_ids, - // FIXME - load_per_shard: NonZeroU32::new(PIPELINE_FULL_CAPACITY.cpu_millis() / 4) - .unwrap(), + load_per_shard, }, }); } @@ -562,7 +586,7 @@ mod tests { use proptest::{prop_compose, proptest}; use quickwit_config::{IndexConfig, KafkaSourceParams, SourceConfig, SourceParams}; use quickwit_metastore::IndexMetadata; - use quickwit_proto::types::{IndexUid, PipelineUid, SourceUid}; + use quickwit_proto::types::{IndexUid, PipelineUid, ShardId, SourceUid}; use super::*; use crate::model::ShardLocations; diff --git a/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/mod.rs b/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/mod.rs index 19b9a76d5d4..c2bbc5a4ee5 100644 --- a/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/mod.rs +++ b/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/mod.rs @@ -566,10 +566,26 @@ fn inflate_node_capacities_if_necessary(problem: &mut SchedulingProblem) { else { return; }; + + // We first artificially scale down the node capacities. + // + // The node capacity is an estimate of the amount of CPU available on a given indexer node. + // It has two purpose, + // - under a lot of load, indexer will receive work proportional to their relative capacity. + // - under low load, the absolute magnitude will be used by the scheduler, to decide whether + // to prefer having a balanced workload over other criteria (all pipeline from a same index on + // the same node, indexing local shards, etc.). + // + // The default CPU capacity is detected from the OS. Using these values directly leads + // a non uniform distribution of the load which is very confusing for users. We artificially + // scale down the indexer capacities. + problem.scale_node_capacities(0.3f32); + let min_indexer_capacity = (0..problem.num_indexers()) .map(|indexer_ord| problem.indexer_cpu_capacity(indexer_ord)) .min() .expect("At least one indexer is required"); + assert_ne!(min_indexer_capacity.cpu_millis(), 0); if min_indexer_capacity.cpu_millis() < largest_shard_load.get() { let scaling_factor = diff --git a/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/scheduling_logic.rs b/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/scheduling_logic.rs index 15feb765b72..d339d8e5459 100644 --- a/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/scheduling_logic.rs +++ b/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/scheduling_logic.rs @@ -163,7 +163,7 @@ fn assert_remove_extraneous_shards_post_condition( // Releave sources from the node that are exceeding their maximum load. fn enforce_indexers_cpu_capacity(problem: &SchedulingProblem, solution: &mut SchedulingSolution) { - for indexer_assignment in solution.indexer_assignments.iter_mut() { + for indexer_assignment in &mut solution.indexer_assignments { let indexer_cpu_capacity: CpuCapacity = problem.indexer_cpu_capacity(indexer_assignment.indexer_ord); enforce_indexer_cpu_capacity(problem, indexer_cpu_capacity, indexer_assignment); @@ -753,6 +753,35 @@ mod tests { assert_eq!(solution.indexer_assignments[0].num_shards(0), 1); } + #[test] + fn test_problem_unbalanced_simple() { + let mut problem = SchedulingProblem::with_indexer_cpu_capacities(vec![ + CpuCapacity::from_cpu_millis(1), + CpuCapacity::from_cpu_millis(1), + ]); + problem.add_source(1, NonZeroU32::new(10).unwrap()); + for _ in 0..10 { + problem.add_source(1, NonZeroU32::new(1).unwrap()); + } + let previous_solution = problem.new_solution(); + let solution = solve(problem.clone(), previous_solution); + let available_capacities: Vec = solution + .indexer_assignments + .iter() + .map(|indexer_assignment: &IndexerAssignment| { + indexer_assignment.total_cpu_load(&problem) + }) + .collect(); + assert_eq!(available_capacities.len(), 2); + let (min, max) = available_capacities + .into_iter() + .minmax() + .into_option() + .unwrap(); + assert_eq!(min, 10); + assert_eq!(max, 10); + } + proptest! { #[test] fn test_proptest_post_conditions((problem, solution) in problem_solution_strategy()) { diff --git a/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/scheduling_logic_model.rs b/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/scheduling_logic_model.rs index 0e8c64406d5..b3b68578dcd 100644 --- a/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/scheduling_logic_model.rs +++ b/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/scheduling_logic_model.rs @@ -80,7 +80,7 @@ impl Source { } } -#[derive(Debug, Serialize)] +#[derive(Debug, Serialize, Clone)] pub struct SchedulingProblem { sources: Vec, indexer_cpu_capacities: Vec,