Skip to content

Commit

Permalink
blop
Browse files Browse the repository at this point in the history
  • Loading branch information
fulmicoton committed Jun 24, 2024
1 parent c3c607b commit 2a7d6ea
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -566,10 +566,26 @@ fn inflate_node_capacities_if_necessary(problem: &mut SchedulingProblem) {
else {
return;
};

// We first artificially scale down the node capacities.
//
// The node capacity is an estimate of the amount of CPU available on a given indexer node.
// It has two purpose,
// - under a lot of load, indexer will receive work proportional to their relative capacity.
// - under low load, the absolute magnitude will be used by the scheduler, to decide whether
// to prefer having a balanced workload over other criteria (all pipeline from a same index on the
// same node, indexing local shards, etc.).
//
// The default CPU capacity is detected from the OS. Using these values directly leads
// a non uniform distribution of the load which is very confusing for users. We artificially
// scale down the indexer capacities.
problem.scale_node_capacities(0.3f32);

let min_indexer_capacity = (0..problem.num_indexers())
.map(|indexer_ord| problem.indexer_cpu_capacity(indexer_ord))
.min()
.expect("At least one indexer is required");

assert_ne!(min_indexer_capacity.cpu_millis(), 0);
if min_indexer_capacity.cpu_millis() < largest_shard_load.get() {
let scaling_factor =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ fn assert_remove_extraneous_shards_post_condition(
// Releave sources from the node that are exceeding their maximum load.

fn enforce_indexers_cpu_capacity(problem: &SchedulingProblem, solution: &mut SchedulingSolution) {
for indexer_assignment in solution.indexer_assignments.iter_mut() {
for indexer_assignment in &mut solution.indexer_assignments {
let indexer_cpu_capacity: CpuCapacity =
problem.indexer_cpu_capacity(indexer_assignment.indexer_ord);
enforce_indexer_cpu_capacity(problem, indexer_cpu_capacity, indexer_assignment);
Expand Down Expand Up @@ -753,6 +753,32 @@ mod tests {
assert_eq!(solution.indexer_assignments[0].num_shards(0), 1);
}

#[test]
fn test_problem_unbalanced_simple() {
let mut problem = SchedulingProblem::with_indexer_cpu_capacities(vec![
CpuCapacity::from_cpu_millis(1),
CpuCapacity::from_cpu_millis(1),
]);
problem.add_source(1, NonZeroU32::new(1).unwrap());
for _ in 0..10 {
problem.add_source(1, NonZeroU32::new(1).unwrap());
}
for i in [5] {
let mut problem = SchedulingProblem::with_indexer_cpu_capacities(vec![
CpuCapacity::from_cpu_millis(1),
CpuCapacity::from_cpu_millis(1),
]);
problem.add_source(i, NonZeroU32::new(1).unwrap());
for _ in 0..10 {
problem.add_source(1, NonZeroU32::new(1).unwrap());
}
dbg!(&solution);
solution = solve(problem, solution);
}
dbg!(&solution);
// assert_eq!(solution.indexer_assignments[0].num_shards(0), 1);
}

proptest! {
#[test]
fn test_proptest_post_conditions((problem, solution) in problem_solution_strategy()) {
Expand Down

0 comments on commit 2a7d6ea

Please sign in to comment.