Skip to content

Commit

Permalink
Adding flags to disable docmapper validation and shard load in contro…
Browse files Browse the repository at this point in the history
…l plane.
  • Loading branch information
fulmicoton committed Jun 28, 2024
1 parent 0dc4135 commit be41cb6
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 11 deletions.
23 changes: 18 additions & 5 deletions quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use std::time::{Duration, Instant};

use fnv::{FnvHashMap, FnvHashSet};
use itertools::Itertools;
use once_cell::sync::OnceCell;
use quickwit_proto::control_plane::{RebuildPlanRequest, RebuildPlanResponse};
use quickwit_proto::indexing::{
ApplyIndexingPlanRequest, CpuCapacity, IndexingService, IndexingTask, PIPELINE_FULL_CAPACITY,
Expand Down Expand Up @@ -121,6 +122,14 @@ impl fmt::Debug for IndexingScheduler {
}
}


fn enable_variable_shard_load() -> bool {
static ENABLE_SHARD_LOAD_CP: OnceCell<bool> = OnceCell::new();
*ENABLE_SHARD_LOAD_CP
.get_or_init(|| !quickwit_common::get_from_env("QW_DISABLE_VARIABLE_SHARD_LOAD", false))
}


/// Computes the CPU load associated to a single shard of a given index.
///
/// The array passed contains all of data we have about the shard of the index.
Expand All @@ -132,15 +141,19 @@ impl fmt::Debug for IndexingScheduler {
/// It does not take in account the variation that could raise from the different
/// doc mapping / nature of the data, etc.
fn compute_load_per_shard(shard_entries: &[&ShardEntry]) -> NonZeroU32 {
let num_shards = shard_entries.len().max(1) as u32;
let average_throughput_per_shard: u32 = shard_entries
if enable_variable_shard_load() {
let num_shards = shard_entries.len().max(1) as u32;
let average_throughput_per_shard: u32 = shard_entries
.iter()
.map(|shard_entry| u32::from(shard_entry.ingestion_rate.0))
.sum::<u32>()
.div_ceil(num_shards);
let num_cpu_millis = (PIPELINE_FULL_CAPACITY.cpu_millis() * average_throughput_per_shard) / 20;
const MIN_CPU_LOAD_PER_SHARD: u32 = 50u32;
NonZeroU32::new(num_cpu_millis.max(MIN_CPU_LOAD_PER_SHARD)).unwrap()
let num_cpu_millis = (PIPELINE_FULL_CAPACITY.cpu_millis() * average_throughput_per_shard) / 20;
const MIN_CPU_LOAD_PER_SHARD: u32 = 50u32;
NonZeroU32::new(num_cpu_millis.max(MIN_CPU_LOAD_PER_SHARD)).unwrap()
} else {
NonZeroU32::new(PIPELINE_FULL_CAPACITY.cpu_millis() / 4).unwrap()
}
}

fn get_sources_to_schedule(model: &ControlPlaneModel) -> Vec<SourceToSchedule> {
Expand Down
23 changes: 17 additions & 6 deletions quickwit/quickwit-ingest/src/ingest_v2/doc_mapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::sync::{Arc, Weak};

use once_cell::sync::OnceCell;
use quickwit_common::thread_pool::run_cpu_intensive;
use quickwit_config::{build_doc_mapper, DocMapping, SearchSettings};
use quickwit_doc_mapper::DocMapper;
Expand Down Expand Up @@ -107,18 +108,28 @@ fn validate_doc_batch_impl(
(doc_batch, parse_failures)
}

fn document_validation_enabled() -> bool {
static ENABLE_DOCUMENT_VALIDATION: OnceCell<bool> = OnceCell::new();
*ENABLE_DOCUMENT_VALIDATION
.get_or_init(|| !quickwit_common::get_from_env("QW_DISABLE_DOCUMENT_VALIDATION", false))
}

/// Parses the JSON documents contained in the batch and applies the doc mapper. Returns the
/// original batch and a list of parse failures.
pub(super) async fn validate_doc_batch(
doc_batch: DocBatchV2,
doc_mapper: Arc<dyn DocMapper>,
) -> IngestV2Result<(DocBatchV2, Vec<ParseFailure>)> {
run_cpu_intensive(move || validate_doc_batch_impl(doc_batch, doc_mapper))
.await
.map_err(|error| {
let message = format!("failed to validate documents: {error}");
IngestV2Error::Internal(message)
})
if document_validation_enabled() {
run_cpu_intensive(move || validate_doc_batch_impl(doc_batch, doc_mapper))
.await
.map_err(|error| {
let message = format!("failed to validate documents: {error}");
IngestV2Error::Internal(message)
})
} else {
Ok((doc_batch, Vec::new()))
}
}

#[cfg(test)]
Expand Down

0 comments on commit be41cb6

Please sign in to comment.