From 56362daf7232e25a7936a447b4847ea0f90d5da1 Mon Sep 17 00:00:00 2001 From: Remi Dettai Date: Wed, 4 Dec 2024 21:13:12 +0100 Subject: [PATCH] Also use num_docs to estimate init cache size --- quickwit/quickwit-search/src/leaf.rs | 16 ++++-- quickwit/quickwit-search/src/list_terms.rs | 16 ++++-- .../src/search_permit_provider.rs | 55 +++++++++++-------- quickwit/quickwit-search/src/service.rs | 1 - 4 files changed, 53 insertions(+), 35 deletions(-) diff --git a/quickwit/quickwit-search/src/leaf.rs b/quickwit/quickwit-search/src/leaf.rs index 71dd52a0acc..d83579f08e0 100644 --- a/quickwit/quickwit-search/src/leaf.rs +++ b/quickwit/quickwit-search/src/leaf.rs @@ -52,7 +52,7 @@ use tracing::*; use crate::collector::{make_collector_for_split, make_merge_collector, IncrementalCollector}; use crate::metrics::SEARCH_METRICS; use crate::root::is_metadata_count_request_with_ast; -use crate::search_permit_provider::SearchPermit; +use crate::search_permit_provider::{compute_initial_memory_allocation, SearchPermit}; use crate::service::{deserialize_doc_mapper, SearcherContext}; use crate::{QuickwitAggregations, SearchError}; @@ -1315,13 +1315,17 @@ pub async fn leaf_search( // We acquire all of the leaf search permits to make sure our single split search tasks // do no interleave with other leaf search requests. + let permit_sizes = split_with_req.iter().map(|(split, _)| { + compute_initial_memory_allocation( + split, + searcher_context + .searcher_config + .warmup_single_split_initial_allocation, + ) + }); let permit_futures = searcher_context .search_permit_provider - .get_permits( - split_with_req - .iter() - .map(|(split, _)| ByteSize(split.split_footer_start)), - ) + .get_permits(permit_sizes) .await; for ((split, mut request), permit_fut) in diff --git a/quickwit/quickwit-search/src/list_terms.rs b/quickwit/quickwit-search/src/list_terms.rs index 3b204273b4a..f796252c125 100644 --- a/quickwit/quickwit-search/src/list_terms.rs +++ b/quickwit/quickwit-search/src/list_terms.rs @@ -22,7 +22,6 @@ use std::ops::Bound; use std::sync::Arc; use anyhow::Context; -use bytesize::ByteSize; use futures::future::try_join_all; use itertools::{Either, Itertools}; use quickwit_common::pretty::PrettySample; @@ -41,6 +40,7 @@ use tracing::{debug, error, info, instrument}; use crate::leaf::open_index_with_caches; use crate::search_job_placer::group_jobs_by_index_id; +use crate::search_permit_provider::compute_initial_memory_allocation; use crate::{resolve_index_patterns, ClusterClient, SearchError, SearchJob, SearcherContext}; /// Performs a distributed list terms. @@ -329,13 +329,17 @@ pub async fn leaf_list_terms( splits: &[SplitIdAndFooterOffsets], ) -> Result { info!(split_offsets = ?PrettySample::new(splits, 5)); + let permit_sizes = splits.iter().map(|split| { + compute_initial_memory_allocation( + split, + searcher_context + .searcher_config + .warmup_single_split_initial_allocation, + ) + }); let permits = searcher_context .search_permit_provider - .get_permits( - splits - .iter() - .map(|split| ByteSize(split.split_footer_start)), - ) + .get_permits(permit_sizes) .await; let leaf_search_single_split_futures: Vec<_> = splits .iter() diff --git a/quickwit/quickwit-search/src/search_permit_provider.rs b/quickwit/quickwit-search/src/search_permit_provider.rs index 913dc73046b..b9e6366d5e4 100644 --- a/quickwit/quickwit-search/src/search_permit_provider.rs +++ b/quickwit/quickwit-search/src/search_permit_provider.rs @@ -24,6 +24,7 @@ use std::task::{Context, Poll}; use bytesize::ByteSize; use quickwit_common::metrics::GaugeGuard; +use quickwit_proto::search::SplitIdAndFooterOffsets; #[cfg(test)] use tokio::sync::watch; use tokio::sync::{mpsc, oneshot}; @@ -40,7 +41,6 @@ pub struct SearchPermitProvider { message_sender: mpsc::UnboundedSender, #[cfg(test)] actor_stopped: watch::Receiver, - per_permit_initial_memory_allocation: u64, } #[derive(Debug)] @@ -59,12 +59,29 @@ pub enum SearchPermitMessage { }, } +/// Makes very pessimistic estimate of the memory allocation required for a split search +/// +/// This is refined later on when more data is available about the split. +pub fn compute_initial_memory_allocation( + split: &SplitIdAndFooterOffsets, + warmup_single_split_initial_allocation: ByteSize, +) -> ByteSize { + let split_size = split.split_footer_start; + let proportional_allocation = + warmup_single_split_initial_allocation.as_u64() * split.num_docs / 10_000_000; + let size_bytes = [ + split_size, + proportional_allocation, + warmup_single_split_initial_allocation.as_u64(), + ] + .into_iter() + .min() + .unwrap(); + ByteSize(size_bytes) +} + impl SearchPermitProvider { - pub fn new( - num_download_slots: usize, - memory_budget: ByteSize, - initial_allocation: ByteSize, - ) -> Self { + pub fn new(num_download_slots: usize, memory_budget: ByteSize) -> Self { let (message_sender, message_receiver) = mpsc::unbounded_channel(); #[cfg(test)] let (state_sender, state_receiver) = watch::channel(false); @@ -83,11 +100,10 @@ impl SearchPermitProvider { message_sender, #[cfg(test)] actor_stopped: state_receiver, - per_permit_initial_memory_allocation: initial_allocation.as_u64(), } } - /// Returns one permit future for each provided split size. + /// Returns one permit future for each provided split metadata. /// /// The permits returned are guaranteed to be resolved in order. In /// addition, the permits are guaranteed to be resolved before permits @@ -96,18 +112,14 @@ impl SearchPermitProvider { /// The permit memory size is capped by per_permit_initial_memory_allocation. pub async fn get_permits( &self, - split_sizes: impl IntoIterator, + splits: impl IntoIterator, ) -> Vec { let (permit_sender, permit_receiver) = oneshot::channel(); + let permit_sizes = splits.into_iter().map(|size| size.as_u64()).collect(); self.message_sender .send(SearchPermitMessage::Request { permit_sender, - permit_sizes: split_sizes - .into_iter() - .map(|size| { - std::cmp::min(size.as_u64(), self.per_permit_initial_memory_allocation) - }) - .collect(), + permit_sizes, }) .expect("Receiver lives longer than sender"); permit_receiver @@ -191,7 +203,7 @@ impl SearchPermitActor { return None; } if let Some((_, next_permit_size)) = self.permits_requests.front() { - if self.total_memory_allocated + next_permit_size < self.total_memory_budget { + if self.total_memory_allocated + next_permit_size <= self.total_memory_budget { return self.permits_requests.pop_front(); } } @@ -307,7 +319,7 @@ mod tests { #[tokio::test] async fn test_search_permit_order() { - let permit_provider = SearchPermitProvider::new(1, ByteSize::mb(100), ByteSize::mb(10)); + let permit_provider = SearchPermitProvider::new(1, ByteSize::mb(100)); let mut all_futures = Vec::new(); let first_batch_of_permits = permit_provider .get_permits(repeat(ByteSize::mb(10)).take(10)) @@ -357,7 +369,7 @@ mod tests { #[tokio::test] async fn test_search_permit_early_drops() { - let permit_provider = SearchPermitProvider::new(1, ByteSize::mb(100), ByteSize::mb(10)); + let permit_provider = SearchPermitProvider::new(1, ByteSize::mb(100)); let permit_fut1 = permit_provider .get_permits(vec![ByteSize::mb(10)]) .await @@ -398,7 +410,7 @@ mod tests { #[tokio::test] async fn test_memory_budget() { - let permit_provider = SearchPermitProvider::new(100, ByteSize::mb(100), ByteSize::mb(10)); + let permit_provider = SearchPermitProvider::new(100, ByteSize::mb(100)); let mut permit_futs = permit_provider .get_permits(repeat(ByteSize::mb(10)).take(14)) .await; @@ -428,10 +440,9 @@ mod tests { #[tokio::test] async fn test_warmup_slot() { - let permit_provider = SearchPermitProvider::new(10, ByteSize::mb(100), ByteSize::mb(1)); + let permit_provider = SearchPermitProvider::new(10, ByteSize::mb(100)); let mut permit_futs = permit_provider - // permit sizes are capped by per_permit_initial_memory_allocation - .get_permits(repeat(ByteSize::mb(100)).take(16)) + .get_permits(repeat(ByteSize::mb(1)).take(16)) .await; let mut remaining_permit_futs = permit_futs.split_off(10).into_iter(); assert_eq!(remaining_permit_futs.len(), 6); diff --git a/quickwit/quickwit-search/src/service.rs b/quickwit/quickwit-search/src/service.rs index e6d1238e644..d566463b42e 100644 --- a/quickwit/quickwit-search/src/service.rs +++ b/quickwit/quickwit-search/src/service.rs @@ -491,7 +491,6 @@ impl SearcherContext { let leaf_search_split_semaphore = SearchPermitProvider::new( searcher_config.max_num_concurrent_split_searches, searcher_config.warmup_memory_budget, - searcher_config.warmup_single_split_initial_allocation, ); let split_stream_semaphore = Semaphore::new(searcher_config.max_num_concurrent_split_streams);