Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid creating sharded pipelines with no pipelines. #4393

Merged
merged 1 commit into from
Jan 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,8 @@ fn get_sources_to_schedule(model: &ControlPlaneModel) -> Vec<SourceToSchedule> {
source_type: SourceToScheduleType::Sharded {
shard_ids,
// FIXME
load_per_shard: NonZeroU32::new(250u32).unwrap(),
load_per_shard: NonZeroU32::new(PIPELINE_FULL_CAPACITY.cpu_millis() / 4)
.unwrap(),
},
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,25 +197,30 @@ fn compute_max_num_shards_per_pipeline(source_type: &SourceToScheduleType) -> No
}
}

// This converts a scheduling solution for a given node and a given source.
// Major quirk however:
// For sharded function, this function only partially performs this conversion.
// In the resulting function some of the shards may not be allocated.
// The remaining shards will be added in postprocessing pass.
fn convert_scheduling_solution_to_physical_plan_single_node_single_source(
mut remaining_num_shards_to_schedule_on_node: u32,
// Specific to the source.
mut previous_tasks: &[&IndexingTask],
previous_tasks: &[&IndexingTask],
source: &SourceToSchedule,
) -> Vec<IndexingTask> {
match &source.source_type {
SourceToScheduleType::Sharded {
shard_ids,
load_per_shard,
} => {
if remaining_num_shards_to_schedule_on_node == 0 {
return Vec::new();
}
// For the moment we do something voluntarily suboptimal.
let max_num_pipelines = quickwit_common::div_ceil_u32(
shard_ids.len() as u32 * load_per_shard.get(),
remaining_num_shards_to_schedule_on_node * load_per_shard.get(),
CPU_PER_PIPELINE_LOAD_LOWER_THRESHOLD.cpu_millis(),
);
if previous_tasks.len() > max_num_pipelines as usize {
previous_tasks = &previous_tasks[..max_num_pipelines as usize];
}
let max_num_shards_per_pipeline: NonZeroU32 =
compute_max_num_shards_per_pipeline(&source.source_type);
let mut new_tasks = Vec::new();
Expand All @@ -239,6 +244,9 @@ fn convert_scheduling_solution_to_physical_plan_single_node_single_source(
shard_ids,
};
new_tasks.push(new_task);
if new_tasks.len() >= max_num_pipelines as usize {
break;
}
if remaining_num_shards_to_schedule_on_node == 0 {
break;
}
Expand Down Expand Up @@ -586,7 +594,11 @@ mod tests {
use quickwit_proto::indexing::{mcpu, CpuCapacity, IndexingTask};
use quickwit_proto::types::{IndexUid, PipelineUid, ShardId, SourceUid};

use super::{build_physical_indexing_plan, SourceToSchedule, SourceToScheduleType};
use super::{
build_physical_indexing_plan,
convert_scheduling_solution_to_physical_plan_single_node_single_source, SourceToSchedule,
SourceToScheduleType,
};
use crate::indexing_plan::PhysicalIndexingPlan;

fn source_id() -> SourceUid {
Expand Down Expand Up @@ -905,4 +917,158 @@ mod tests {
capacities.insert("indexer-1".to_string(), CpuCapacity::from_cpu_millis(8000));
build_physical_indexing_plan(&sources_to_schedule, &capacities, None);
}

#[test]
fn test_convert_scheduling_solution_to_physical_plan_single_node_single_source_sharded() {
let source_uid = SourceUid {
index_uid: IndexUid::new_with_random_ulid("testindex"),
source_id: "testsource".to_string(),
};
let previous_task1 = IndexingTask {
index_uid: source_uid.index_uid.to_string(),
source_id: source_uid.source_id.to_string(),
pipeline_uid: Some(PipelineUid::new()),
shard_ids: vec![1, 4, 5],
};
let previous_task2 = IndexingTask {
index_uid: source_uid.index_uid.to_string(),
source_id: source_uid.source_id.to_string(),
pipeline_uid: Some(PipelineUid::new()),
shard_ids: vec![6, 7, 8, 9, 10],
};
{
let sharded_source = SourceToSchedule {
source_uid: source_uid.clone(),
source_type: SourceToScheduleType::Sharded {
shard_ids: vec![1, 2, 4, 6],
load_per_shard: NonZeroU32::new(1_000).unwrap(),
},
};
let tasks = convert_scheduling_solution_to_physical_plan_single_node_single_source(
4,
&[&previous_task1, &previous_task2],
&sharded_source,
);
assert_eq!(tasks.len(), 2);
assert_eq!(&tasks[0].index_uid, source_uid.index_uid.as_str());
assert_eq!(&tasks[0].shard_ids, &[1, 4]);
assert_eq!(&tasks[1].index_uid, source_uid.index_uid.as_str());
assert_eq!(&tasks[1].shard_ids, &[6]);
}
{
// smaller shards force a merge into a single pipeline
let sharded_source = SourceToSchedule {
source_uid: source_uid.clone(),
source_type: SourceToScheduleType::Sharded {
shard_ids: vec![1, 2, 4, 6],
load_per_shard: NonZeroU32::new(250).unwrap(),
},
};
let tasks = convert_scheduling_solution_to_physical_plan_single_node_single_source(
4,
&[&previous_task1, &previous_task2],
&sharded_source,
);
assert_eq!(tasks.len(), 1);
assert_eq!(&tasks[0].index_uid, source_uid.index_uid.as_str());
assert_eq!(&tasks[0].shard_ids, &[1, 4]);
}
}

#[test]
fn test_convert_scheduling_solution_to_physical_plan_single_node_single_source_non_sharded() {
let source_uid = SourceUid {
index_uid: IndexUid::new_with_random_ulid("testindex"),
source_id: "testsource".to_string(),
};
let pipeline_uid1 = PipelineUid::new();
let previous_task1 = IndexingTask {
index_uid: source_uid.index_uid.to_string(),
source_id: source_uid.source_id.to_string(),
pipeline_uid: Some(pipeline_uid1),
shard_ids: vec![],
};
let pipeline_uid2 = PipelineUid::new();
let previous_task2 = IndexingTask {
index_uid: source_uid.index_uid.to_string(),
source_id: source_uid.source_id.to_string(),
pipeline_uid: Some(pipeline_uid2),
shard_ids: vec![],
};
{
let sharded_source = SourceToSchedule {
source_uid: source_uid.clone(),
source_type: SourceToScheduleType::NonSharded {
num_pipelines: 1,
load_per_pipeline: NonZeroU32::new(4000).unwrap(),
},
};
let tasks = convert_scheduling_solution_to_physical_plan_single_node_single_source(
1,
&[&previous_task1, &previous_task2],
&sharded_source,
);
assert_eq!(tasks.len(), 1);
assert_eq!(&tasks[0].index_uid, source_uid.index_uid.as_str());
assert!(&tasks[0].shard_ids.is_empty());
assert_eq!(tasks[0].pipeline_uid.as_ref().unwrap(), &pipeline_uid1);
}
{
let sharded_source = SourceToSchedule {
source_uid: source_uid.clone(),
source_type: SourceToScheduleType::NonSharded {
num_pipelines: 0,
load_per_pipeline: NonZeroU32::new(1_000).unwrap(),
},
};
let tasks = convert_scheduling_solution_to_physical_plan_single_node_single_source(
0,
&[&previous_task1, &previous_task2],
&sharded_source,
);
assert_eq!(tasks.len(), 0);
}
{
let sharded_source = SourceToSchedule {
source_uid: source_uid.clone(),
source_type: SourceToScheduleType::NonSharded {
num_pipelines: 2,
load_per_pipeline: NonZeroU32::new(1_000).unwrap(),
},
};
let tasks = convert_scheduling_solution_to_physical_plan_single_node_single_source(
2,
&[&previous_task1, &previous_task2],
&sharded_source,
);
assert_eq!(tasks.len(), 2);
assert_eq!(&tasks[0].index_uid, source_uid.index_uid.as_str());
assert!(&tasks[0].shard_ids.is_empty());
assert_eq!(tasks[0].pipeline_uid.as_ref().unwrap(), &pipeline_uid1);
assert_eq!(&tasks[1].index_uid, source_uid.index_uid.as_str());
assert!(&tasks[1].shard_ids.is_empty());
assert_eq!(tasks[1].pipeline_uid.as_ref().unwrap(), &pipeline_uid2);
}
{
let sharded_source = SourceToSchedule {
source_uid: source_uid.clone(),
source_type: SourceToScheduleType::NonSharded {
num_pipelines: 2,
load_per_pipeline: NonZeroU32::new(1_000).unwrap(),
},
};
let tasks = convert_scheduling_solution_to_physical_plan_single_node_single_source(
2,
&[&previous_task1],
&sharded_source,
);
assert_eq!(tasks.len(), 2);
assert_eq!(&tasks[0].index_uid, source_uid.index_uid.as_str());
assert!(&tasks[0].shard_ids.is_empty());
assert_eq!(tasks[0].pipeline_uid.as_ref().unwrap(), &pipeline_uid1);
assert_eq!(&tasks[1].index_uid, source_uid.index_uid.as_str());
assert!(&tasks[1].shard_ids.is_empty());
assert_ne!(tasks[1].pipeline_uid.as_ref().unwrap(), &pipeline_uid1);
}
}
}