Skip to content

Commit

Permalink
CR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
fulmicoton committed Nov 9, 2023
1 parent d1eb143 commit 7185fb1
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 19 deletions.
2 changes: 1 addition & 1 deletion quickwit/quickwit-config/src/index_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -637,7 +637,7 @@ mod tests {
}

#[test]
fn test_index_config_default_values() {
fn test_indexer_config_default_values() {
let default_index_root_uri = Uri::for_test("s3://defaultbucket/");
{
let index_config_filepath = get_index_config_filepath("minimal-hdfs-logs.yaml");
Expand Down
17 changes: 6 additions & 11 deletions quickwit/quickwit-config/src/node_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,17 +84,14 @@ impl IndexerConfig {
ByteSize::gb(100)
}

/// Default capacity expressed in milli-"number of pipelines".
/// 4_000 means 4 pipeline at full capacity.
// TODO add some validation.
fn default_cpu_capacity() -> CpuCapacity {
CpuCapacity::one_cpu_thread() * (num_cpus::get() as u32)
}

pub fn default_split_store_max_num_splits() -> usize {
1_000
}

fn default_cpu_capacity() -> CpuCapacity {
CpuCapacity::one_cpu_thread() * (num_cpus::get() as u32)
}

#[cfg(any(test, feature = "testsuite"))]
pub fn for_test() -> anyhow::Result<Self> {
use quickwit_proto::indexing::PIPELINE_FULL_CAPACITY;
Expand Down Expand Up @@ -393,10 +390,8 @@ mod tests {
{
let indexer_config: IndexerConfig = serde_json::from_str(r#"{}"#).unwrap();
assert_eq!(&indexer_config, &IndexerConfig::default());
assert_eq!(
indexer_config.cpu_capacity,
CpuCapacity::from_cpu_millis(8000)
);
assert!(indexer_config.cpu_capacity.cpu_millis() > 0);
assert_eq!(indexer_config.cpu_capacity.cpu_millis() % 1_000, 0);
}
{
let indexer_config: IndexerConfig =
Expand Down
4 changes: 2 additions & 2 deletions quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ impl IndexingScheduler {

let sources = get_sources_to_schedule(model);

let indexer_cpu_capacity: FnvHashMap<String, CpuCapacity> = indexers
let indexer_id_to_cpu_capacities: FnvHashMap<String, CpuCapacity> = indexers
.iter()
.map(|(indexer_id, indexer_node_info)| {
(indexer_id.to_string(), indexer_node_info.indexing_capacity)
Expand All @@ -202,7 +202,7 @@ impl IndexingScheduler {

let new_physical_plan = build_physical_indexing_plan(
&sources,
&indexer_cpu_capacity,
&indexer_id_to_cpu_capacities,
self.state.last_applied_physical_plan.as_ref(),
);
if let Some(last_applied_plan) = &self.state.last_applied_physical_plan {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,14 +258,14 @@ fn group_shards_into_pipelines(
source_uid: &SourceUid,
shard_ids: &[ShardId],
previous_indexing_tasks: &[IndexingTask],
cpu_per_shard: CpuCapacity,
cpu_load_per_shard: CpuCapacity,
) -> Vec<IndexingTask> {
let num_shards = shard_ids.len() as u32;
if num_shards == 0 {
return Vec::new();
}
let max_num_shards_per_pipeline: NonZeroU32 =
NonZeroU32::new(MAX_LOAD_PER_PIPELINE.cpu_millis() / cpu_per_shard.cpu_millis())
NonZeroU32::new(MAX_LOAD_PER_PIPELINE.cpu_millis() / cpu_load_per_shard.cpu_millis())
.unwrap_or_else(|| {
// We throttle shard at ingestion to ensure that a shard does not
// exceed 5MB/s.
Expand All @@ -278,7 +278,7 @@ fn group_shards_into_pipelines(
//
// This is a transitory state, and not a problem per se.
warn!("load per shard is higher than `MAX_LOAD_PER_PIPELINE`");
NonZeroU32::new(1).unwrap()
NonZeroU32::MIN // also colloquially known as `1`
});

// We compute the number of pipelines we will create, cooking in some hysteresis effect here.
Expand All @@ -287,7 +287,7 @@ fn group_shards_into_pipelines(
(num_shards + max_num_shards_per_pipeline.get() - 1) / max_num_shards_per_pipeline;
assert!(min_num_pipelines > 0);
let max_num_pipelines: u32 = min_num_pipelines.max(
num_shards * cpu_per_shard.cpu_millis() / CPU_PER_PIPELINE_LOAD_THRESHOLD.cpu_millis(),
num_shards * cpu_load_per_shard.cpu_millis() / CPU_PER_PIPELINE_LOAD_THRESHOLD.cpu_millis(),
);
let previous_num_pipelines = previous_indexing_tasks.len() as u32;
let num_pipelines: u32 = if previous_num_pipelines > min_num_pipelines {
Expand Down
3 changes: 2 additions & 1 deletion quickwit/quickwit-indexing/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,15 @@ mod split_store;
#[cfg(any(test, feature = "testsuite"))]
mod test_utils;

use quickwit_proto::indexing::CpuCapacity;
#[cfg(any(test, feature = "testsuite"))]
pub use test_utils::{mock_split, mock_split_meta, MockSplitBuilder, TestSandbox};

use self::merge_policy::MergePolicy;
pub use self::source::check_source_connectivity;

#[derive(utoipa::OpenApi)]
#[openapi(components(schemas(IndexingStatistics, PipelineMetrics)))]
#[openapi(components(schemas(IndexingStatistics, PipelineMetrics, CpuCapacity)))]
/// Schema used for the OpenAPI generation which are apart of this crate.
pub struct IndexingApiSchemas;

Expand Down
31 changes: 31 additions & 0 deletions quickwit/quickwit-proto/src/indexing/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,37 @@ impl From<CpuCapacity> for CpuCapacityForSerialization {
mod tests {
use super::*;

#[test]
fn test_cpu_capacity_serialization() {
assert_eq!(CpuCapacity::from_str("2000m").unwrap(), mcpu(2000));
assert_eq!(CpuCapacity::from_cpu_millis(2500), mcpu(2500));
assert_eq!(
CpuCapacity::from_str("2.5").unwrap_err(),
"invalid cpu capacity: `2.5`. String format expects a trailing 'm'."
);
assert_eq!(
serde_json::from_value::<CpuCapacity>(serde_json::Value::String("1200m".to_string()))
.unwrap(),
mcpu(1200)
);
assert_eq!(
serde_json::from_value::<CpuCapacity>(serde_json::Value::Number(
serde_json::Number::from_f64(1.2f64).unwrap()
))
.unwrap(),
mcpu(1200)
);
assert_eq!(
serde_json::from_value::<CpuCapacity>(serde_json::Value::Number(
serde_json::Number::from(1u32)
))
.unwrap(),
mcpu(1000)
);
assert_eq!(CpuCapacity::from_cpu_millis(2500).to_string(), "2500m");
assert_eq!(serde_json::to_string(&mcpu(2500)).unwrap(), "\"2500m\"");
}

#[test]
fn test_indexing_task_serialization() {
let original = IndexingTask {
Expand Down

0 comments on commit 7185fb1

Please sign in to comment.