diff --git a/quickwit/quickwit-config/src/node_config/mod.rs b/quickwit/quickwit-config/src/node_config/mod.rs index 190e5b44295..bd855ec0395 100644 --- a/quickwit/quickwit-config/src/node_config/mod.rs +++ b/quickwit/quickwit-config/src/node_config/mod.rs @@ -226,11 +226,7 @@ pub struct SearcherConfig { #[serde(default)] #[serde(skip_serializing_if = "Option::is_none")] pub storage_timeout_policy: Option, - - // TODO validate that `warmup_memory_budget` is greater than `warmup_single_split_initial_allocation` - // TODO set serde default pub warmup_memory_budget: ByteSize, - // TODO set serde default pub warmup_single_split_initial_allocation: ByteSize, } @@ -280,9 +276,7 @@ impl Default for SearcherConfig { split_cache: None, request_timeout_secs: Self::default_request_timeout_secs(), storage_timeout_policy: None, - // TODO change this to the method used for serde default. warmup_memory_budget: ByteSize::gb(1), - // TODO change this to the method used for serde default. warmup_single_split_initial_allocation: ByteSize::mb(50), } } @@ -318,6 +312,14 @@ impl SearcherConfig { split_cache_limits.max_file_descriptors ); } + if self.warmup_single_split_initial_allocation > self.warmup_memory_budget { + anyhow::bail!( + "warmup_single_split_initial_allocation ({}) must be lower or equal to \ + warmup_memory_budget ({})", + self.warmup_single_split_initial_allocation, + self.warmup_memory_budget + ); + } } Ok(()) } diff --git a/quickwit/quickwit-config/src/node_config/serialize.rs b/quickwit/quickwit-config/src/node_config/serialize.rs index 8a1337636cf..457cf5c5fae 100644 --- a/quickwit/quickwit-config/src/node_config/serialize.rs +++ b/quickwit/quickwit-config/src/node_config/serialize.rs @@ -616,7 +616,9 @@ mod tests { min_throughtput_bytes_per_secs: 100_000, timeout_millis: 2_000, max_num_retries: 2 - }) + }), + warmup_memory_budget: ByteSize::gb(1), + warmup_single_split_initial_allocation: ByteSize::mb(50), } ); assert_eq!( diff --git a/quickwit/quickwit-directories/src/caching_directory.rs b/quickwit/quickwit-directories/src/caching_directory.rs index e55c49f7fad..b5093680458 100644 --- a/quickwit/quickwit-directories/src/caching_directory.rs +++ b/quickwit/quickwit-directories/src/caching_directory.rs @@ -29,11 +29,19 @@ use tantivy::directory::{FileHandle, OwnedBytes}; use tantivy::{Directory, HasLen}; /// The caching directory is a simple cache that wraps another directory. -#[derive(Clone)] -pub struct CachingDirectory { +pub struct CachingDirectory { underlying: Arc, // TODO fixme: that's a pretty ugly cache we have here. - cache: ByteRangeCache, + cache: ByteRangeCache, +} + +impl Clone for CachingDirectory { + fn clone(&self) -> Self { + CachingDirectory { + underlying: self.underlying.clone(), + cache: self.cache.clone(), + } + } } impl CachingDirectory { @@ -44,32 +52,35 @@ impl CachingDirectory { pub fn new_unbounded(underlying: Arc) -> CachingDirectory { let byte_range_cache = ByteRangeCache::with_infinite_capacity( &quickwit_storage::STORAGE_METRICS.shortlived_cache, + (), ); CachingDirectory::new(underlying, byte_range_cache) } +} +impl CachingDirectory { /// Creates a new CachingDirectory. /// /// Warming: The resulting CacheDirectory will cache all information without ever /// removing any item from the cache. - pub fn new(underlying: Arc, cache: ByteRangeCache) -> CachingDirectory { + pub fn new(underlying: Arc, cache: ByteRangeCache) -> CachingDirectory { CachingDirectory { underlying, cache } } } -impl fmt::Debug for CachingDirectory { +impl fmt::Debug for CachingDirectory { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { write!(f, "CachingDirectory({:?})", self.underlying) } } -struct CachingFileHandle { +struct CachingFileHandle { path: PathBuf, - cache: ByteRangeCache, + cache: ByteRangeCache, underlying_filehandle: Arc, } -impl fmt::Debug for CachingFileHandle { +impl fmt::Debug for CachingFileHandle { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { write!( f, @@ -81,7 +92,7 @@ impl fmt::Debug for CachingFileHandle { } #[async_trait] -impl FileHandle for CachingFileHandle { +impl FileHandle for CachingFileHandle { fn read_bytes(&self, byte_range: Range) -> io::Result { if let Some(bytes) = self.cache.get_slice(&self.path, byte_range.clone()) { return Ok(bytes); @@ -106,13 +117,13 @@ impl FileHandle for CachingFileHandle { } } -impl HasLen for CachingFileHandle { +impl HasLen for CachingFileHandle { fn len(&self) -> usize { self.underlying_filehandle.len() } } -impl Directory for CachingDirectory { +impl Directory for CachingDirectory { fn exists(&self, path: &Path) -> std::result::Result { self.underlying.exists(path) } diff --git a/quickwit/quickwit-search/src/collector.rs b/quickwit/quickwit-search/src/collector.rs index 5e8df94527c..ab69c8711b4 100644 --- a/quickwit/quickwit-search/src/collector.rs +++ b/quickwit/quickwit-search/src/collector.rs @@ -1805,7 +1805,8 @@ mod tests { failed_splits: Vec::new(), num_attempted_splits: 3, num_successful_splits: 3, - intermediate_aggregation_result: None + intermediate_aggregation_result: None, + resource_stats: None, } ); @@ -1844,6 +1845,7 @@ mod tests { num_attempted_splits: 3, num_successful_splits: 3, intermediate_aggregation_result: None, + resource_stats: None, }, LeafSearchResponse { num_hits: 10, @@ -1862,6 +1864,7 @@ mod tests { num_attempted_splits: 2, num_successful_splits: 1, intermediate_aggregation_result: None, + resource_stats: None, }, ], ); @@ -1893,7 +1896,8 @@ mod tests { }], num_attempted_splits: 5, num_successful_splits: 4, - intermediate_aggregation_result: None + intermediate_aggregation_result: None, + resource_stats: None, } ); @@ -1933,6 +1937,7 @@ mod tests { num_attempted_splits: 3, num_successful_splits: 3, intermediate_aggregation_result: None, + resource_stats: None, }, LeafSearchResponse { num_hits: 10, @@ -1951,6 +1956,7 @@ mod tests { num_attempted_splits: 2, num_successful_splits: 1, intermediate_aggregation_result: None, + resource_stats: None, }, ], ); @@ -1982,7 +1988,8 @@ mod tests { }], num_attempted_splits: 5, num_successful_splits: 4, - intermediate_aggregation_result: None + intermediate_aggregation_result: None, + resource_stats: None, } ); // TODO would be nice to test aggregation too. diff --git a/quickwit/quickwit-search/src/fetch_docs.rs b/quickwit/quickwit-search/src/fetch_docs.rs index e8cf5c3a1fb..070109d1114 100644 --- a/quickwit/quickwit-search/src/fetch_docs.rs +++ b/quickwit/quickwit-search/src/fetch_docs.rs @@ -174,7 +174,7 @@ async fn fetch_docs_in_split( global_doc_addrs.sort_by_key(|doc| doc.doc_addr); // Opens the index without the ephemeral unbounded cache, this cache is indeed not useful // when fetching docs as we will fetch them only once. - let mut index = open_index_with_caches( + let mut index = open_index_with_caches::<()>( &searcher_context, index_storage, split, diff --git a/quickwit/quickwit-search/src/leaf.rs b/quickwit/quickwit-search/src/leaf.rs index 78dfc706b7c..41a1fad0465 100644 --- a/quickwit/quickwit-search/src/leaf.rs +++ b/quickwit/quickwit-search/src/leaf.rs @@ -30,7 +30,8 @@ use quickwit_common::pretty::PrettySample; use quickwit_directories::{CachingDirectory, HotDirectory, StorageDirectory}; use quickwit_doc_mapper::{DocMapper, TermRange, WarmupInfo}; use quickwit_proto::search::{ - CountHits, LeafSearchRequest, LeafSearchResponse, PartialHit, SearchRequest, SortOrder, SortValue, SplitIdAndFooterOffsets, SplitSearchError + CountHits, LeafSearchRequest, LeafSearchResponse, PartialHit, SearchRequest, SortOrder, + SortValue, SplitIdAndFooterOffsets, SplitSearchError, }; use quickwit_query::query_ast::{BoolQuery, QueryAst, QueryAstTransformer, RangeQuery, TermQuery}; use quickwit_query::tokenizers::TokenizerManager; @@ -127,14 +128,18 @@ pub(crate) async fn open_split_bundle( /// Opens a `tantivy::Index` for the given split with several cache layers: /// - A split footer cache given by `SearcherContext.split_footer_cache`. /// - A fast fields cache given by `SearcherContext.storage_long_term_cache`. -/// - An ephemeral unbounded cache directory whose lifetime is tied to the returned `Index`. +/// - An ephemeral unbounded cache directory whose lifetime is tied to the +/// returned `Index`. +/// +/// TODO: generic T should be forced to SearcherPermit, but this requires the +/// search stream to also request permits. #[instrument(skip_all, fields(split_footer_start=split_and_footer_offsets.split_footer_start, split_footer_end=split_and_footer_offsets.split_footer_end))] -pub(crate) async fn open_index_with_caches( +pub(crate) async fn open_index_with_caches( searcher_context: &SearcherContext, index_storage: Arc, split_and_footer_offsets: &SplitIdAndFooterOffsets, tokenizer_manager: Option<&TokenizerManager>, - ephemeral_unbounded_cache: Option, + ephemeral_unbounded_cache: Option>, ) -> anyhow::Result { // Let's add a storage proxy to retry `get_slice` requests if they are taking too long, // if configured in the searcher config. @@ -368,6 +373,7 @@ fn get_leaf_resp_from_count(count: u64) -> LeafSearchResponse { } /// Apply a leaf search on a single split. +#[allow(clippy::too_many_arguments)] async fn leaf_search_single_split( searcher_context: &SearcherContext, mut search_request: SearchRequest, @@ -376,7 +382,7 @@ async fn leaf_search_single_split( doc_mapper: Arc, split_filter: Arc>, aggregations_limits: AggregationLimitsGuard, - search_permit: &mut SearchPermit, + search_permit: SearchPermit, ) -> crate::Result { rewrite_request( &mut search_request, @@ -402,8 +408,10 @@ async fn leaf_search_single_split( } let split_id = split.split_id.to_string(); - let byte_range_cache = - ByteRangeCache::with_infinite_capacity(&quickwit_storage::STORAGE_METRICS.shortlived_cache); + let byte_range_cache = ByteRangeCache::with_infinite_capacity( + &quickwit_storage::STORAGE_METRICS.shortlived_cache, + search_permit, + ); let index = open_index_with_caches( searcher_context, storage, @@ -433,9 +441,8 @@ async fn leaf_search_single_split( warmup(&searcher, &warmup_info).await?; let warmup_end = Instant::now(); let warmup_duration: Duration = warmup_end.duration_since(warmup_start); - - let short_lived_cache_num_bytes: u64 = byte_range_cache.get_num_bytes(); - search_permit.set_actual_memory_usage_and_release_permit_after(short_lived_cache_num_bytes); + let short_lived_cache_num_bytes = byte_range_cache.get_num_bytes(); + byte_range_cache.track(|permit| permit.warmup_completed(short_lived_cache_num_bytes)); let split_num_docs = split.num_docs; let span = info_span!("tantivy_search"); @@ -1281,15 +1288,15 @@ pub async fn leaf_search( // do no interleave with other leaf search requests. let permit_futures = searcher_context .search_permit_provider - .get_permits(split_with_req.len()); + .get_permits(split_with_req.len()) + .await; for ((split, mut request), permit_fut) in split_with_req.into_iter().zip(permit_futures.into_iter()) { let leaf_split_search_permit = permit_fut .instrument(info_span!("waiting_for_leaf_search_split_semaphore")) - .await - .expect("Failed to acquire permit. This should never happen! Please, report on https://github.com/quickwit-oss/quickwit/issues."); + .await; let can_be_better = check_optimize_search_request(&mut request, &split, &split_filter); if !can_be_better && !run_all_splits { @@ -1379,7 +1386,7 @@ async fn leaf_search_single_split_wrapper( split: SplitIdAndFooterOffsets, split_filter: Arc>, incremental_merge_collector: Arc>, - mut search_permit: SearchPermit, + search_permit: SearchPermit, aggregations_limits: AggregationLimitsGuard, ) { crate::SEARCH_METRICS.leaf_searches_splits_total.inc(); @@ -1394,13 +1401,10 @@ async fn leaf_search_single_split_wrapper( doc_mapper, split_filter.clone(), aggregations_limits, - &mut search_permit, + search_permit, ) .await; - // We explicitly drop it, to highlight it to the reader - std::mem::drop(search_permit); - if leaf_search_single_split_res.is_ok() { timer.observe_duration(); } diff --git a/quickwit/quickwit-search/src/leaf_cache.rs b/quickwit/quickwit-search/src/leaf_cache.rs index 491f66f3aee..e2601081cd5 100644 --- a/quickwit/quickwit-search/src/leaf_cache.rs +++ b/quickwit/quickwit-search/src/leaf_cache.rs @@ -252,6 +252,7 @@ mod tests { sort_value2: None, split_id: "split_1".to_string(), }], + resource_stats: None, }; assert!(cache.get(split_1.clone(), query_1.clone()).is_none()); @@ -342,6 +343,7 @@ mod tests { sort_value2: None, split_id: "split_1".to_string(), }], + resource_stats: None, }; // for split_1, 1 and 1bis cover different timestamp ranges diff --git a/quickwit/quickwit-search/src/list_terms.rs b/quickwit/quickwit-search/src/list_terms.rs index 4b80b3aed83..5bb03168120 100644 --- a/quickwit/quickwit-search/src/list_terms.rs +++ b/quickwit/quickwit-search/src/list_terms.rs @@ -40,6 +40,7 @@ use tracing::{debug, error, info, instrument}; use crate::leaf::open_index_with_caches; use crate::search_job_placer::group_jobs_by_index_id; +use crate::search_permit_provider::SearchPermit; use crate::{resolve_index_patterns, ClusterClient, SearchError, SearchJob, SearcherContext}; /// Performs a distributed list terms. @@ -215,9 +216,12 @@ async fn leaf_list_terms_single_split( search_request: &ListTermsRequest, storage: Arc, split: SplitIdAndFooterOffsets, + search_permit: SearchPermit, ) -> crate::Result { - let cache = - ByteRangeCache::with_infinite_capacity(&quickwit_storage::STORAGE_METRICS.shortlived_cache); + let cache = ByteRangeCache::with_infinite_capacity( + &quickwit_storage::STORAGE_METRICS.shortlived_cache, + search_permit, + ); let index = open_index_with_caches(searcher_context, storage, &split, None, Some(cache)).await?; let split_schema = index.schema(); @@ -330,7 +334,8 @@ pub async fn leaf_list_terms( info!(split_offsets = ?PrettySample::new(splits, 5)); let permits = searcher_context .search_permit_provider - .get_permits(splits.len()); + .get_permits(splits.len()) + .await; let leaf_search_single_split_futures: Vec<_> = splits .iter() .zip(permits.into_iter()) @@ -338,7 +343,7 @@ pub async fn leaf_list_terms( let index_storage_clone = index_storage.clone(); let searcher_context_clone = searcher_context.clone(); async move { - let _leaf_split_search_permit = search_permit_recv.await; + let leaf_split_search_permit = search_permit_recv.await; // TODO dedicated counter and timer? crate::SEARCH_METRICS.leaf_searches_splits_total.inc(); let timer = crate::SEARCH_METRICS @@ -349,6 +354,7 @@ pub async fn leaf_list_terms( request, index_storage_clone, split.clone(), + leaf_split_search_permit, ) .await; timer.observe_duration(); diff --git a/quickwit/quickwit-search/src/search_permit_provider.rs b/quickwit/quickwit-search/src/search_permit_provider.rs index 15980acf4a8..2b86b5191b5 100644 --- a/quickwit/quickwit-search/src/search_permit_provider.rs +++ b/quickwit/quickwit-search/src/search_permit_provider.rs @@ -20,22 +20,37 @@ use std::collections::VecDeque; use std::future::Future; use std::pin::Pin; -use std::sync::{Arc, Mutex}; use std::task::{Context, Poll}; use bytesize::ByteSize; use quickwit_common::metrics::GaugeGuard; -use tokio::sync::oneshot; +use tokio::sync::{mpsc, oneshot}; use tracing::warn; -/// `SearchPermitProvider` is a distributor of permits to perform single split -/// search operation. +/// Distributor of permits to perform split search operation. /// -/// - Two types of resources are managed: memory allocations and download slots. -/// - Requests are served in order. +/// Requests are served in order. Each permit initially reserves a slot for the +/// warmup (limit concurrent downloads) and a pessimistic amount of memory. Once +/// the warmup is completed, the actual memory usage is set and the warmup slot +/// is released. Once the search is completed and the permit is dropped, the +/// remaining memory is also released. #[derive(Clone)] pub struct SearchPermitProvider { - inner_arc: Arc>, + sender: mpsc::UnboundedSender, +} + +pub enum SearchPermitMessage { + Request { + permit_sender: oneshot::Sender>, + num_permits: usize, + }, + WarmupCompleted { + memory_delta: i64, + }, + Drop { + memory_size: u64, + warmup_permit_held: bool, + }, } impl SearchPermitProvider { @@ -43,66 +58,110 @@ impl SearchPermitProvider { num_download_slots: usize, memory_budget: ByteSize, initial_allocation: ByteSize, - ) -> SearchPermitProvider { - SearchPermitProvider { - inner_arc: Arc::new(Mutex::new(InnerSearchPermitProvider { - num_download_slots_available: num_download_slots, - memory_budget: memory_budget.as_u64(), - permits_requests: VecDeque::new(), - memory_allocated: 0u64, - initial_allocation: initial_allocation.as_u64(), - })), - } + ) -> Self { + let (sender, receiver) = mpsc::unbounded_channel(); + let mut actor = SearchPermitActor { + msg_receiver: receiver, + msg_sender: sender.downgrade(), + num_warmup_slots_available: num_download_slots, + total_memory_budget: memory_budget.as_u64(), + permits_requests: VecDeque::new(), + total_memory_allocated: 0u64, + per_permit_initial_memory_allocation: initial_allocation.as_u64(), + }; + tokio::spawn(async move { actor.run().await }); + Self { sender } } - /// Returns a list of future permits in the form of awaitable futures. + /// Returns `num_permits` futures that complete once enough resources are + /// available. /// - /// The permits returned are guaranteed to be resolved in order. - /// In addition, the permits are guaranteed to be resolved before permits returned by - /// subsequent calls to this function (or `get_permit`). - #[must_use] - pub fn get_permits(&self, num_permits: usize) -> Vec { - let mut permits_lock = self.inner_arc.lock().unwrap(); - permits_lock.get_permits(num_permits, &self.inner_arc) + /// The permits returned are guaranteed to be resolved in order. In + /// addition, the permits are guaranteed to be resolved before permits + /// returned by subsequent calls to this function. + pub async fn get_permits(&self, num_permits: usize) -> Vec { + let (permit_sender, permit_receiver) = oneshot::channel(); + self.sender + .send(SearchPermitMessage::Request { + permit_sender, + num_permits, + }) + .expect("Receiver lives longer than sender"); + permit_receiver + .await + .expect("Receiver lives longer than sender") } } -struct InnerSearchPermitProvider { - num_download_slots_available: usize, - - // Note it is possible for memory_allocated to exceed memory_budget temporarily, - // if and only if a split leaf search task ended up using more than `initial_allocation`. - // - // When it happens, new permits will not be assigned until the memory is freed. - memory_budget: u64, - memory_allocated: u64, - initial_allocation: u64, +struct SearchPermitActor { + msg_receiver: mpsc::UnboundedReceiver, + msg_sender: mpsc::WeakUnboundedSender, + num_warmup_slots_available: usize, + /// Note it is possible for memory_allocated to exceed memory_budget temporarily, + /// if and only if a split leaf search task ended up using more than `initial_allocation`. + /// When it happens, new permits will not be assigned until the memory is freed. + total_memory_budget: u64, + total_memory_allocated: u64, + per_permit_initial_memory_allocation: u64, permits_requests: VecDeque>, } -impl InnerSearchPermitProvider { - fn get_permits( - &mut self, - num_permits: usize, - inner_arc: &Arc>, - ) -> Vec { - let mut permits = Vec::with_capacity(num_permits); - for _ in 0..num_permits { - let (tx, rx) = oneshot::channel(); - self.permits_requests.push_back(tx); - permits.push(SearchPermitFuture(rx)); +impl SearchPermitActor { + async fn run(&mut self) { + // Stops when the last clone of SearchPermitProvider is dropped. + while let Some(msg) = self.msg_receiver.recv().await { + self.handle_message(msg); } - self.assign_available_permits(inner_arc); - permits } - /// Called each time a permit is requested or released - /// - /// Calling lock on `inner_arc` inside this method will cause a deadlock as - /// `&mut self` and `inner_arc` reference the same instance. - fn assign_available_permits(&mut self, inner_arc: &Arc>) { - while self.num_download_slots_available > 0 - && self.memory_allocated + self.initial_allocation <= self.memory_budget + fn handle_message(&mut self, msg: SearchPermitMessage) { + match msg { + SearchPermitMessage::Request { + num_permits, + permit_sender, + } => { + let mut permits = Vec::with_capacity(num_permits); + for _ in 0..num_permits { + let (tx, rx) = oneshot::channel(); + self.permits_requests.push_back(tx); + permits.push(SearchPermitFuture(rx)); + } + self.assign_available_permits(); + permit_sender + .send(permits) + .ok() + // This is a request response pattern, so we can safely ignore the error. + .expect("Receiver lives longer than sender"); + } + SearchPermitMessage::WarmupCompleted { memory_delta } => { + self.num_warmup_slots_available += 1; + if self.total_memory_allocated as i64 + memory_delta < 0 { + panic!("More memory released than allocated, should never happen.") + } + self.total_memory_allocated = + (self.total_memory_allocated as i64 + memory_delta) as u64; + self.assign_available_permits(); + } + SearchPermitMessage::Drop { + memory_size, + warmup_permit_held, + } => { + if warmup_permit_held { + self.num_warmup_slots_available += 1; + } + self.total_memory_allocated = self + .total_memory_allocated + .checked_sub(memory_size) + .expect("More memory released than allocated, should never happen."); + self.assign_available_permits(); + } + } + } + + fn assign_available_permits(&mut self) { + while self.num_warmup_slots_available > 0 + && self.total_memory_allocated + self.per_permit_initial_memory_allocation + <= self.total_memory_budget { let Some(permit_requester_tx) = self.permits_requests.pop_front() else { break; @@ -111,23 +170,17 @@ impl InnerSearchPermitProvider { &crate::SEARCH_METRICS.leaf_search_single_split_tasks_ongoing, ); ongoing_gauge_guard.add(1); - let send_res = permit_requester_tx.send(SearchPermit { - _ongoing_gauge_guard: ongoing_gauge_guard, - inner_arc: inner_arc.clone(), - warmup_permit_held: true, - memory_allocation: self.initial_allocation, - }); - match send_res { - Ok(()) => { - self.num_download_slots_available -= 1; - self.memory_allocated += self.initial_allocation; - } - Err(search_permit) => { - // We cannot just decrease the num_permits_available in all case and rely on - // the drop logic here: it would cause a dead lock on the inner_arc Mutex. - search_permit.drop_without_recycling_permit(); - } - } + self.total_memory_allocated += self.per_permit_initial_memory_allocation; + permit_requester_tx + .send(SearchPermit { + _ongoing_gauge_guard: ongoing_gauge_guard, + msg_sender: self.msg_sender.clone(), + memory_allocation: self.per_permit_initial_memory_allocation, + warmup_permit_held: true, + }) + // if the requester dropped its receiver, we drop the newly + // created SearchPermit which releases the resources + .ok(); } crate::SEARCH_METRICS .leaf_search_single_split_tasks_pending @@ -137,16 +190,16 @@ impl InnerSearchPermitProvider { pub struct SearchPermit { _ongoing_gauge_guard: GaugeGuard<'static>, - inner_arc: Arc>, - warmup_permit_held: bool, + msg_sender: mpsc::WeakUnboundedSender, memory_allocation: u64, + warmup_permit_held: bool, } impl SearchPermit { - /// After warm up, we have a proper estimate of the memory usage of a single split leaf search. - /// - /// We can then set the actual memory usage. - pub fn set_actual_memory_usage_and_release_permit_after(&mut self, new_memory_usage: u64) { + /// After warm up, we have a proper estimate of the memory usage of a single + /// split leaf search. We can thus set the actual memory usage and release + /// the warmup slot. + pub fn warmup_completed(&mut self, new_memory_usage: u64) { if new_memory_usage > self.memory_allocation { warn!( memory_usage = new_memory_usage, @@ -154,53 +207,40 @@ impl SearchPermit { "current leaf search is consuming more memory than the initial allocation" ); } - let mut inner_guard = self.inner_arc.lock().unwrap(); - let delta = new_memory_usage as i64 - inner_guard.initial_allocation as i64; - inner_guard.memory_allocated += delta as u64; - inner_guard.num_download_slots_available += 1; - if inner_guard.memory_allocated > inner_guard.memory_budget { - warn!( - memory_allocated = inner_guard.memory_allocated, - memory_budget = inner_guard.memory_budget, - "memory allocated exceeds memory budget" - ); - } - self.memory_allocation = new_memory_usage; - inner_guard.assign_available_permits(&self.inner_arc); + let memory_delta = new_memory_usage as i64 - self.memory_allocation as i64; + self.warmup_permit_held = false; + self.send_if_still_running(SearchPermitMessage::WarmupCompleted { memory_delta }); } - fn drop_without_recycling_permit(mut self) { - self.warmup_permit_held = false; - self.memory_allocation = 0u64; - drop(self); + fn send_if_still_running(&self, msg: SearchPermitMessage) { + if let Some(sender) = self.msg_sender.upgrade() { + sender + .send(msg) + // Receiver instance in the event loop is never dropped or + // closed as long as there is a strong sender reference. + .expect("Receiver should live longer than sender"); + } } } impl Drop for SearchPermit { fn drop(&mut self) { - // This is not just an optimization. This is necessary to avoid a dead lock when the - // permit requester dropped its receiver channel. - if !self.warmup_permit_held && self.memory_allocation == 0 { - return; - } - let mut inner_guard = self.inner_arc.lock().unwrap(); - if self.warmup_permit_held { - inner_guard.num_download_slots_available += 1; - } - inner_guard.memory_allocated -= self.memory_allocation; - inner_guard.assign_available_permits(&self.inner_arc); + self.send_if_still_running(SearchPermitMessage::Drop { + memory_size: self.memory_allocation, + warmup_permit_held: self.warmup_permit_held, + }); } } pub struct SearchPermitFuture(oneshot::Receiver); impl Future for SearchPermitFuture { - type Output = Option; + type Output = SearchPermit; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let receiver = Pin::new(&mut self.get_mut().0); match receiver.poll(cx) { - Poll::Ready(Ok(search_permit)) => Poll::Ready(Some(search_permit)), + Poll::Ready(Ok(search_permit)) => Poll::Ready(search_permit), Poll::Ready(Err(_)) => panic!("Failed to acquire permit. This should never happen! Please, report on https://github.com/quickwit-oss/quickwit/issues."), Poll::Pending => Poll::Pending, } diff --git a/quickwit/quickwit-search/src/search_stream/leaf.rs b/quickwit/quickwit-search/src/search_stream/leaf.rs index ed7d39965a9..22d63853b4e 100644 --- a/quickwit/quickwit-search/src/search_stream/leaf.rs +++ b/quickwit/quickwit-search/src/search_stream/leaf.rs @@ -127,8 +127,11 @@ async fn leaf_search_stream_single_split( &split, ); - let cache = - ByteRangeCache::with_infinite_capacity(&quickwit_storage::STORAGE_METRICS.shortlived_cache); + let cache = ByteRangeCache::with_infinite_capacity( + &quickwit_storage::STORAGE_METRICS.shortlived_cache, + // should we not track the memory with a SearcherPermit? + (), + ); let index = open_index_with_caches( &searcher_context, diff --git a/quickwit/quickwit-search/src/service.rs b/quickwit/quickwit-search/src/service.rs index 734344f2c74..e6d1238e644 100644 --- a/quickwit/quickwit-search/src/service.rs +++ b/quickwit/quickwit-search/src/service.rs @@ -488,8 +488,11 @@ impl SearcherContext { capacity_in_bytes, &quickwit_storage::STORAGE_METRICS.split_footer_cache, ); - let leaf_search_split_semaphore = - SearchPermitProvider::new(searcher_config.max_num_concurrent_split_searches, searcher_config.warmup_memory_budget, searcher_config.warmup_single_split_initial_allocation); + let leaf_search_split_semaphore = SearchPermitProvider::new( + searcher_config.max_num_concurrent_split_searches, + searcher_config.warmup_memory_budget, + searcher_config.warmup_single_split_initial_allocation, + ); let split_stream_semaphore = Semaphore::new(searcher_config.max_num_concurrent_split_streams); let fast_field_cache_capacity = searcher_config.fast_field_cache_capacity.as_u64() as usize; diff --git a/quickwit/quickwit-storage/src/cache/byte_range_cache.rs b/quickwit/quickwit-storage/src/cache/byte_range_cache.rs index c9f261c347b..076d64a22d1 100644 --- a/quickwit/quickwit-storage/src/cache/byte_range_cache.rs +++ b/quickwit/quickwit-storage/src/cache/byte_range_cache.rs @@ -21,7 +21,6 @@ use std::borrow::{Borrow, Cow}; use std::collections::BTreeMap; use std::ops::Range; use std::path::{Path, PathBuf}; -use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::{Arc, Mutex}; use tantivy::directory::OwnedBytes; @@ -345,54 +344,69 @@ impl Drop for NeedMutByteRangeCache { /// cached data, the changes may or may not get recorded. /// /// At the moment this is hardly a cache as it features no eviction policy. -#[derive(Clone)] -pub struct ByteRangeCache { - inner_arc: Arc, +pub struct ByteRangeCache { + inner_arc: Arc>>, } -struct Inner { - num_stored_bytes: AtomicU64, - need_mut_byte_range_cache: Mutex>, +struct Inner { + need_mut_byte_range_cache: NeedMutByteRangeCache, + tracker: T, } -impl ByteRangeCache { +impl Clone for ByteRangeCache { + fn clone(&self) -> Self { + ByteRangeCache { + inner_arc: self.inner_arc.clone(), + } + } +} + +impl ByteRangeCache { /// Creates a slice cache that never removes any entry. - pub fn with_infinite_capacity(cache_counters: &'static CacheMetrics) -> Self { + pub fn with_infinite_capacity(cache_counters: &'static CacheMetrics, tracker: T) -> Self { let need_mut_byte_range_cache = NeedMutByteRangeCache::with_infinite_capacity(cache_counters); let inner = Inner { - num_stored_bytes: AtomicU64::default(), - need_mut_byte_range_cache: Mutex::new(need_mut_byte_range_cache), + need_mut_byte_range_cache, + tracker, }; ByteRangeCache { - inner_arc: Arc::new(inner), + inner_arc: Arc::new(Mutex::new(inner)), } } /// Overall amount of bytes stored in the cache. pub fn get_num_bytes(&self) -> u64 { - self.inner_arc.num_stored_bytes.load(Ordering::Relaxed) + self.inner_arc + .lock() + .unwrap() + .need_mut_byte_range_cache + .num_bytes } /// If available, returns the cached view of the slice. pub fn get_slice(&self, path: &Path, byte_range: Range) -> Option { self.inner_arc - .need_mut_byte_range_cache .lock() .unwrap() + .need_mut_byte_range_cache .get_slice(path, byte_range) } /// Put the given amount of data in the cache. pub fn put_slice(&self, path: PathBuf, byte_range: Range, bytes: OwnedBytes) { - let mut need_mut_byte_range_cache_locked = - self.inner_arc.need_mut_byte_range_cache.lock().unwrap(); - need_mut_byte_range_cache_locked.put_slice(path, byte_range, bytes); - let num_bytes = need_mut_byte_range_cache_locked.num_bytes; - drop(need_mut_byte_range_cache_locked); - self.inner_arc - .num_stored_bytes - .store(num_bytes as u64, Ordering::Relaxed); + let mut inner = self.inner_arc.lock().unwrap(); + inner + .need_mut_byte_range_cache + .put_slice(path, byte_range, bytes); + } + + /// Apply the provided action on the tracker. + /// + /// The action should not block for long. + pub fn track(&self, action: impl FnOnce(&mut T)) { + let mut inner = self.inner_arc.lock().unwrap(); + action(&mut inner.tracker); } } @@ -449,7 +463,7 @@ mod tests { state.insert("path1", vec![false; 12]); state.insert("path2", vec![false; 12]); - let cache = ByteRangeCache::with_infinite_capacity(&CACHE_METRICS_FOR_TESTS); + let cache = ByteRangeCache::with_infinite_capacity(&CACHE_METRICS_FOR_TESTS, ()); for op in ops { match op { @@ -470,13 +484,13 @@ mod tests { .sum(); // in some case we have ranges touching each other, count_items count them // as only one, but cache count them as 2. - assert!(cache.inner_arc.need_mut_byte_range_cache.lock().unwrap().num_items >= expected_item_count as u64); + assert!(cache.inner_arc.lock().unwrap().need_mut_byte_range_cache.num_items >= expected_item_count as u64); let expected_byte_count = state.values() .flatten() .filter(|stored| **stored) .count(); - assert_eq!(cache.inner_arc.need_mut_byte_range_cache.lock().unwrap().num_bytes, expected_byte_count as u64); + assert_eq!(cache.inner_arc.lock().unwrap().need_mut_byte_range_cache.num_bytes, expected_byte_count as u64); } Operation::Get { range, @@ -517,7 +531,7 @@ mod tests { static METRICS: Lazy = Lazy::new(|| CacheMetrics::for_component("byterange_cache_test")); - let cache = ByteRangeCache::with_infinite_capacity(&METRICS); + let cache = ByteRangeCache::with_infinite_capacity(&METRICS, ()); let key: std::path::PathBuf = "key".into(); @@ -543,7 +557,7 @@ mod tests { ); { - let mutable_cache = cache.inner_arc.need_mut_byte_range_cache.lock().unwrap(); + let mutable_cache = &cache.inner_arc.lock().unwrap().need_mut_byte_range_cache; assert_eq!(mutable_cache.cache.len(), 4); assert_eq!(mutable_cache.num_items, 4); assert_eq!(mutable_cache.cache_counters.in_cache_count.get(), 4); @@ -555,7 +569,7 @@ mod tests { { // now they should've been merged, except the last one - let mutable_cache = cache.inner_arc.need_mut_byte_range_cache.lock().unwrap(); + let mutable_cache = &cache.inner_arc.lock().unwrap().need_mut_byte_range_cache; assert_eq!(mutable_cache.cache.len(), 2); assert_eq!(mutable_cache.num_items, 2); assert_eq!(mutable_cache.cache_counters.in_cache_count.get(), 2);