From 7eb5629e65428f8192f2d66cd1835b9bf720c687 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Fri, 5 Jul 2024 17:33:11 +0900 Subject: [PATCH] added cli flags --- .../src/indexing_scheduler/mod.rs | 40 ++++++++++++------- .../scheduling/scheduling_logic_model.rs | 2 +- .../src/ingest_v2/doc_mapper.rs | 24 ++++++++--- 3 files changed, 45 insertions(+), 21 deletions(-) diff --git a/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs b/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs index ca32312e1f6..110e08a5139 100644 --- a/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs +++ b/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs @@ -28,6 +28,7 @@ use std::time::{Duration, Instant}; use fnv::{FnvHashMap, FnvHashSet}; use itertools::Itertools; +use once_cell::sync::OnceCell; use quickwit_proto::indexing::{ ApplyIndexingPlanRequest, CpuCapacity, IndexingService, IndexingTask, PIPELINE_FULL_CAPACITY, PIPELINE_THROUGHTPUT, @@ -121,6 +122,13 @@ impl fmt::Debug for IndexingScheduler { } } +fn enable_variable_shard_load() -> bool { + static IS_SHARD_LOAD_CP_ENABLED: OnceCell = OnceCell::new(); + *IS_SHARD_LOAD_CP_ENABLED.get_or_init(|| { + !quickwit_common::get_bool_from_env("QW_DISABLE_VARIABLE_SHARD_LOAD", false) + }) +} + /// Computes the CPU load associated to a single shard of a given index. /// /// The array passed contains all of data we have about the shard of the index. @@ -132,20 +140,24 @@ impl fmt::Debug for IndexingScheduler { /// It does not take in account the variation that could raise from the different /// doc mapping / nature of the data, etc. fn compute_load_per_shard(shard_entries: &[&ShardEntry]) -> NonZeroU32 { - let num_shards = shard_entries.len().max(1) as u64; - let average_throughput_per_shard_bytes: u64 = shard_entries - .iter() - .map(|shard_entry| shard_entry.ingestion_rate.0 as u64 * bytesize::MIB) - .sum::() - .div_ceil(num_shards) - // A shard throughput cannot exceed PIPELINE_THROUGHPUT in the long term (this is enforced - // by the configuration). - .min(PIPELINE_THROUGHTPUT.as_u64()); - let num_cpu_millis = (PIPELINE_FULL_CAPACITY.cpu_millis() as u64 - * average_throughput_per_shard_bytes) - / PIPELINE_THROUGHTPUT.as_u64(); - const MIN_CPU_LOAD_PER_SHARD: u32 = 50u32; - NonZeroU32::new((num_cpu_millis as u32).max(MIN_CPU_LOAD_PER_SHARD)).unwrap() + if enable_variable_shard_load() { + let num_shards = shard_entries.len().max(1) as u64; + let average_throughput_per_shard_bytes: u64 = shard_entries + .iter() + .map(|shard_entry| shard_entry.ingestion_rate.0 as u64 * bytesize::MIB) + .sum::() + .div_ceil(num_shards) + // A shard throughput cannot exceed PIPELINE_THROUGHPUT in the long term (this is + // enforced by the configuration). + .min(PIPELINE_THROUGHTPUT.as_u64()); + let num_cpu_millis = (PIPELINE_FULL_CAPACITY.cpu_millis() as u64 + * average_throughput_per_shard_bytes) + / PIPELINE_THROUGHTPUT.as_u64(); + const MIN_CPU_LOAD_PER_SHARD: u32 = 50u32; + NonZeroU32::new((num_cpu_millis as u32).max(MIN_CPU_LOAD_PER_SHARD)).unwrap() + } else { + NonZeroU32::new(PIPELINE_FULL_CAPACITY.cpu_millis() / 4).unwrap() + } } fn get_sources_to_schedule(model: &ControlPlaneModel) -> Vec { diff --git a/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/scheduling_logic_model.rs b/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/scheduling_logic_model.rs index 89d47fc50b4..eee1f416638 100644 --- a/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/scheduling_logic_model.rs +++ b/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/scheduling_logic_model.rs @@ -79,7 +79,7 @@ impl Source { } } -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct SchedulingProblem { sources: Vec, indexer_cpu_capacities: Vec, diff --git a/quickwit/quickwit-ingest/src/ingest_v2/doc_mapper.rs b/quickwit/quickwit-ingest/src/ingest_v2/doc_mapper.rs index 2748828131b..a93513a9584 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/doc_mapper.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/doc_mapper.rs @@ -21,6 +21,7 @@ use std::collections::hash_map::Entry; use std::collections::HashMap; use std::sync::{Arc, Weak}; +use once_cell::sync::OnceCell; use quickwit_common::thread_pool::run_cpu_intensive; use quickwit_config::{build_doc_mapper, DocMapping, SearchSettings}; use quickwit_doc_mapper::DocMapper; @@ -107,18 +108,29 @@ fn validate_doc_batch_impl( (doc_batch, parse_failures) } +fn is_document_validation_enabled() -> bool { + static IS_DOCUMENT_VALIDATION_ENABLED: OnceCell = OnceCell::new(); + *IS_DOCUMENT_VALIDATION_ENABLED.get_or_init(|| { + !quickwit_common::get_bool_from_env("QW_DISABLE_DOCUMENT_VALIDATION", false) + }) +} + /// Parses the JSON documents contained in the batch and applies the doc mapper. Returns the /// original batch and a list of parse failures. pub(super) async fn validate_doc_batch( doc_batch: DocBatchV2, doc_mapper: Arc, ) -> IngestV2Result<(DocBatchV2, Vec)> { - run_cpu_intensive(move || validate_doc_batch_impl(doc_batch, doc_mapper)) - .await - .map_err(|error| { - let message = format!("failed to validate documents: {error}"); - IngestV2Error::Internal(message) - }) + if is_document_validation_enabled() { + run_cpu_intensive(move || validate_doc_batch_impl(doc_batch, doc_mapper)) + .await + .map_err(|error| { + let message = format!("failed to validate documents: {error}"); + IngestV2Error::Internal(message) + }) + } else { + Ok((doc_batch, Vec::new())) + } } #[cfg(test)]