From 3ec6a07606660a71dd54f077615004e7495cb5bd Mon Sep 17 00:00:00 2001 From: Remi Dettai Date: Thu, 12 Dec 2024 16:07:34 +0100 Subject: [PATCH] Limit and monitor warmup memory usage (#5568) * Measure and log the amount of memory taken by a split search, and log this. * Limit search memory usage associated with warmup. Due to tantivy limitations, searching a split requires downloading all of the required data, and keep them in memory. We call this phase warmup. Before this PR, the only thing that curbed memory usage was the search permits: only N split search may happen concurrently. Unfortunately, the amount of data required here varies vastly. We need a mechanism to measure and avoid running more split search when memory is tight. Just using a semaphore is however not an option. We do not know beforehands how much memory will be required by a split search, so it could easily lead to a dead lock. Instead, this commit builds upon the search permit provider. The search permit provider is in charge of managing a configurable memory budget for this warmup memory. We introduce here a configurable "warmup_single_split_initial_allocation". A new leaf split search cannot be started if this memory is not available. This initial allocation is meant to be greater than what will be actually needed most of the time. The split search then holds this allocation until the end of warmup. After warmup, we can get the actual memory usage by interrogating the warmup cache. We can then update the amount of memory held. (most of the time, this should mean releasing some memory) In addition, in this PR, at this point, we also release the warmup search permit: We still have to perform the actual task of searching, but the thread pool will take care of limiting the number of concurrent task. Closes #5355 * Bring some clarifications and remove single permit getter * Make search permit provider into an actor. Also attach the permit to the actual memory cache to ensure memory is freed at the right moment. * Revert weird cargo lock update * Improve separation of concern by using wrapping instead of nesting Adding an extra generic field into the cache to optionally allow permit tracking is weird. Instead, we make the directory generic on the type of cache and use a wrapped cache when tracking is necessary. * Fix clippy * Fix undefined incremental resource stat * Add tests to permit provider * Improve and test stats merging utils * Fix minor typos * Add test for permit resizing * Increase default warmup memory * Increase default warmup memory * Add warmup cache metric * Limit permit memory size with split size * Also use num_docs to estimate init cache size * Restore sort on HotCache file list * Minor closure renaming * Add minimum allocation size * Increase default warmup memory to limit its effect --------- Co-authored-by: Paul Masurel --- .../quickwit-config/src/node_config/mod.rs | 14 +- .../src/node_config/serialize.rs | 4 +- .../src/caching_directory.rs | 24 +- .../quickwit-directories/src/hot_directory.rs | 32 +- .../protos/quickwit/search.proto | 10 + .../src/codegen/quickwit/quickwit.search.rs | 17 + .../quickwit-search/src/cluster_client.rs | 7 +- quickwit/quickwit-search/src/collector.rs | 47 +- quickwit/quickwit-search/src/fetch_docs.rs | 4 +- quickwit/quickwit-search/src/leaf.rs | 266 ++++++++-- quickwit/quickwit-search/src/leaf_cache.rs | 5 +- quickwit/quickwit-search/src/lib.rs | 127 ++++- quickwit/quickwit-search/src/list_terms.rs | 35 +- quickwit/quickwit-search/src/metrics.rs | 20 + quickwit/quickwit-search/src/root.rs | 57 ++- .../src/search_permit_provider.rs | 460 ++++++++++++++---- .../quickwit-search/src/search_stream/leaf.rs | 9 +- quickwit/quickwit-search/src/service.rs | 6 +- .../src/cache/byte_range_cache.rs | 52 +- 19 files changed, 972 insertions(+), 224 deletions(-) diff --git a/quickwit/quickwit-config/src/node_config/mod.rs b/quickwit/quickwit-config/src/node_config/mod.rs index 3eef1f10428..822fe86cb91 100644 --- a/quickwit/quickwit-config/src/node_config/mod.rs +++ b/quickwit/quickwit-config/src/node_config/mod.rs @@ -226,6 +226,8 @@ pub struct SearcherConfig { #[serde(default)] #[serde(skip_serializing_if = "Option::is_none")] pub storage_timeout_policy: Option, + pub warmup_memory_budget: ByteSize, + pub warmup_single_split_initial_allocation: ByteSize, } /// Configuration controlling how fast a searcher should timeout a `get_slice` @@ -263,7 +265,7 @@ impl StorageTimeoutPolicy { impl Default for SearcherConfig { fn default() -> Self { - Self { + SearcherConfig { fast_field_cache_capacity: ByteSize::gb(1), split_footer_cache_capacity: ByteSize::mb(500), partial_request_cache_capacity: ByteSize::mb(64), @@ -274,6 +276,8 @@ impl Default for SearcherConfig { split_cache: None, request_timeout_secs: Self::default_request_timeout_secs(), storage_timeout_policy: None, + warmup_memory_budget: ByteSize::gb(100), + warmup_single_split_initial_allocation: ByteSize::gb(1), } } } @@ -308,6 +312,14 @@ impl SearcherConfig { split_cache_limits.max_file_descriptors ); } + if self.warmup_single_split_initial_allocation > self.warmup_memory_budget { + anyhow::bail!( + "warmup_single_split_initial_allocation ({}) must be lower or equal to \ + warmup_memory_budget ({})", + self.warmup_single_split_initial_allocation, + self.warmup_memory_budget + ); + } } Ok(()) } diff --git a/quickwit/quickwit-config/src/node_config/serialize.rs b/quickwit/quickwit-config/src/node_config/serialize.rs index 8a1337636cf..b208309af4c 100644 --- a/quickwit/quickwit-config/src/node_config/serialize.rs +++ b/quickwit/quickwit-config/src/node_config/serialize.rs @@ -616,7 +616,9 @@ mod tests { min_throughtput_bytes_per_secs: 100_000, timeout_millis: 2_000, max_num_retries: 2 - }) + }), + warmup_memory_budget: ByteSize::gb(100), + warmup_single_split_initial_allocation: ByteSize::gb(1), } ); assert_eq!( diff --git a/quickwit/quickwit-directories/src/caching_directory.rs b/quickwit/quickwit-directories/src/caching_directory.rs index b90f444d062..58d5ffd8028 100644 --- a/quickwit/quickwit-directories/src/caching_directory.rs +++ b/quickwit/quickwit-directories/src/caching_directory.rs @@ -33,21 +33,27 @@ use tantivy::{Directory, HasLen}; pub struct CachingDirectory { underlying: Arc, // TODO fixme: that's a pretty ugly cache we have here. - cache: Arc, + cache: ByteRangeCache, } impl CachingDirectory { /// Creates a new CachingDirectory. /// - /// Warming: The resulting CacheDirectory will cache all information without ever + /// Warning: The resulting CacheDirectory will cache all information without ever /// removing any item from the cache. pub fn new_unbounded(underlying: Arc) -> CachingDirectory { - CachingDirectory { - underlying, - cache: Arc::new(ByteRangeCache::with_infinite_capacity( - &quickwit_storage::STORAGE_METRICS.shortlived_cache, - )), - } + let byte_range_cache = ByteRangeCache::with_infinite_capacity( + &quickwit_storage::STORAGE_METRICS.shortlived_cache, + ); + CachingDirectory::new(underlying, byte_range_cache) + } + + /// Creates a new CachingDirectory. + /// + /// Warning: The resulting CacheDirectory will cache all information without ever + /// removing any item from the cache. + pub fn new(underlying: Arc, cache: ByteRangeCache) -> CachingDirectory { + CachingDirectory { underlying, cache } } } @@ -59,7 +65,7 @@ impl fmt::Debug for CachingDirectory { struct CachingFileHandle { path: PathBuf, - cache: Arc, + cache: ByteRangeCache, underlying_filehandle: Arc, } diff --git a/quickwit/quickwit-directories/src/hot_directory.rs b/quickwit/quickwit-directories/src/hot_directory.rs index d217ac29851..a388ea75b51 100644 --- a/quickwit/quickwit-directories/src/hot_directory.rs +++ b/quickwit/quickwit-directories/src/hot_directory.rs @@ -205,14 +205,12 @@ impl StaticDirectoryCache { self.file_lengths.get(path).copied() } - /// return the files and their cached lengths - pub fn get_stats(&self) -> Vec<(PathBuf, usize)> { + pub fn get_file_lengths(&self) -> Vec<(PathBuf, u64)> { let mut entries = self - .slices + .file_lengths .iter() - .map(|(path, cache)| (path.to_owned(), cache.len())) + .map(|(path, len)| (path.clone(), *len)) .collect::>(); - entries.sort_by_key(|el| el.0.to_owned()); entries } @@ -265,10 +263,6 @@ impl StaticSliceCache { } None } - - pub fn len(&self) -> usize { - self.bytes.len() - } } struct StaticSliceCacheBuilder { @@ -376,12 +370,12 @@ impl HotDirectory { }), }) } - /// Get files and their cached sizes. - pub fn get_stats_per_file( - hot_cache_bytes: OwnedBytes, - ) -> anyhow::Result> { - let static_cache = StaticDirectoryCache::open(hot_cache_bytes)?; - Ok(static_cache.get_stats()) + + /// Get all the files in the directory and their sizes. + /// + /// The actual cached data is a very small fraction of this length. + pub fn get_file_lengths(&self) -> Vec<(PathBuf, u64)> { + self.inner.cache.get_file_lengths() } } @@ -704,10 +698,10 @@ mod tests { assert_eq!(directory_cache.get_file_length(three_path), Some(300)); assert_eq!(directory_cache.get_file_length(four_path), None); - let stats = directory_cache.get_stats(); - assert_eq!(stats[0], (one_path.to_owned(), 8)); - assert_eq!(stats[1], (three_path.to_owned(), 0)); - assert_eq!(stats[2], (two_path.to_owned(), 7)); + let file_lengths = directory_cache.get_file_lengths(); + assert_eq!(file_lengths[0], (one_path.to_owned(), 100)); + assert_eq!(file_lengths[1], (three_path.to_owned(), 300)); + assert_eq!(file_lengths[2], (two_path.to_owned(), 200)); assert_eq!( directory_cache diff --git a/quickwit/quickwit-proto/protos/quickwit/search.proto b/quickwit/quickwit-proto/protos/quickwit/search.proto index 60671239ecc..1213ce2040e 100644 --- a/quickwit/quickwit-proto/protos/quickwit/search.proto +++ b/quickwit/quickwit-proto/protos/quickwit/search.proto @@ -347,6 +347,14 @@ message LeafSearchRequest { repeated string index_uris = 9; } +message ResourceStats { + uint64 short_lived_cache_num_bytes = 1; + uint64 split_num_docs = 2; + uint64 warmup_microsecs = 3; + uint64 cpu_thread_pool_wait_microsecs = 4; + uint64 cpu_microsecs = 5; +} + /// LeafRequestRef references data in LeafSearchRequest to deduplicate data. message LeafRequestRef { // The ordinal of the doc_mapper in `LeafSearchRequest.doc_mappers` @@ -479,6 +487,8 @@ message LeafSearchResponse { // postcard serialized intermediate aggregation_result. optional bytes intermediate_aggregation_result = 6; + + ResourceStats resource_stats = 8; } message SnippetRequest { diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs index 3fc4d5bdcaa..e29cae37fec 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs @@ -286,6 +286,21 @@ pub struct LeafSearchRequest { #[prost(string, repeated, tag = "9")] pub index_uris: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, } +#[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ResourceStats { + #[prost(uint64, tag = "1")] + pub short_lived_cache_num_bytes: u64, + #[prost(uint64, tag = "2")] + pub split_num_docs: u64, + #[prost(uint64, tag = "3")] + pub warmup_microsecs: u64, + #[prost(uint64, tag = "4")] + pub cpu_thread_pool_wait_microsecs: u64, + #[prost(uint64, tag = "5")] + pub cpu_microsecs: u64, +} /// / LeafRequestRef references data in LeafSearchRequest to deduplicate data. #[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] #[allow(clippy::derive_partial_eq_without_eq)] @@ -457,6 +472,8 @@ pub struct LeafSearchResponse { pub intermediate_aggregation_result: ::core::option::Option< ::prost::alloc::vec::Vec, >, + #[prost(message, optional, tag = "8")] + pub resource_stats: ::core::option::Option, } #[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] #[allow(clippy::derive_partial_eq_without_eq)] diff --git a/quickwit/quickwit-search/src/cluster_client.rs b/quickwit/quickwit-search/src/cluster_client.rs index d32ad92327c..32f375ca06c 100644 --- a/quickwit/quickwit-search/src/cluster_client.rs +++ b/quickwit/quickwit-search/src/cluster_client.rs @@ -36,7 +36,7 @@ use tracing::{debug, error, info, warn}; use crate::retry::search::LeafSearchRetryPolicy; use crate::retry::search_stream::{LeafSearchStreamRetryPolicy, SuccessfulSplitIds}; use crate::retry::{retry_client, DefaultRetryPolicy, RetryPolicy}; -use crate::{SearchError, SearchJobPlacer, SearchServiceClient}; +use crate::{merge_resource_stats_it, SearchError, SearchJobPlacer, SearchServiceClient}; /// Maximum number of put requests emitted to perform a replicated given PUT KV. const MAX_PUT_KV_ATTEMPTS: usize = 6; @@ -317,6 +317,10 @@ fn merge_original_with_retry_leaf_search_response( (Some(left), None) => Some(left), (None, None) => None, }; + let resource_stats = merge_resource_stats_it([ + &original_response.resource_stats, + &retry_response.resource_stats, + ]); Ok(LeafSearchResponse { intermediate_aggregation_result, num_hits: original_response.num_hits + retry_response.num_hits, @@ -326,6 +330,7 @@ fn merge_original_with_retry_leaf_search_response( partial_hits: original_response.partial_hits, num_successful_splits: original_response.num_successful_splits + retry_response.num_successful_splits, + resource_stats, }) } diff --git a/quickwit/quickwit-search/src/collector.rs b/quickwit/quickwit-search/src/collector.rs index 4b69348ecde..67beb8090cb 100644 --- a/quickwit/quickwit-search/src/collector.rs +++ b/quickwit/quickwit-search/src/collector.rs @@ -25,8 +25,8 @@ use itertools::Itertools; use quickwit_common::binary_heap::{SortKeyMapper, TopK}; use quickwit_doc_mapper::WarmupInfo; use quickwit_proto::search::{ - LeafSearchResponse, PartialHit, SearchRequest, SortByValue, SortOrder, SortValue, - SplitSearchError, + LeafSearchResponse, PartialHit, ResourceStats, SearchRequest, SortByValue, SortOrder, + SortValue, SplitSearchError, }; use quickwit_proto::types::SplitId; use serde::Deserialize; @@ -40,7 +40,7 @@ use tantivy::{DateTime, DocId, Score, SegmentOrdinal, SegmentReader, TantivyErro use crate::find_trace_ids_collector::{FindTraceIdsCollector, FindTraceIdsSegmentCollector, Span}; use crate::top_k_collector::{specialized_top_k_segment_collector, QuickwitSegmentTopKCollector}; -use crate::GlobalDocAddress; +use crate::{merge_resource_stats, merge_resource_stats_it, GlobalDocAddress}; #[derive(Clone, Debug)] pub(crate) enum SortByComponent { @@ -587,6 +587,7 @@ impl SegmentCollector for QuickwitSegmentCollector { } None => None, }; + Ok(LeafSearchResponse { intermediate_aggregation_result, num_hits: self.num_hits, @@ -594,6 +595,7 @@ impl SegmentCollector for QuickwitSegmentCollector { failed_splits: Vec::new(), num_attempted_splits: 1, num_successful_splits: 1, + resource_stats: None, }) } } @@ -919,6 +921,11 @@ fn merge_leaf_responses( return Ok(leaf_responses.pop().unwrap()); } + let resource_stats_it = leaf_responses + .iter() + .map(|leaf_response| &leaf_response.resource_stats); + let merged_resource_stats = merge_resource_stats_it(resource_stats_it); + let merged_intermediate_aggregation_result: Option> = merge_intermediate_aggregation_result( aggregations_opt, @@ -960,6 +967,7 @@ fn merge_leaf_responses( failed_splits, num_attempted_splits, num_successful_splits, + resource_stats: merged_resource_stats, }) } @@ -1183,6 +1191,7 @@ pub(crate) struct IncrementalCollector { num_attempted_splits: u64, num_successful_splits: u64, start_offset: usize, + resource_stats: Option, } impl IncrementalCollector { @@ -1203,6 +1212,7 @@ impl IncrementalCollector { failed_splits: Vec::new(), num_attempted_splits: 0, num_successful_splits: 0, + resource_stats: None, } } @@ -1215,8 +1225,11 @@ impl IncrementalCollector { num_attempted_splits, intermediate_aggregation_result, num_successful_splits, + resource_stats, } = leaf_response; + merge_resource_stats(&resource_stats, &mut self.resource_stats); + self.num_hits += num_hits; self.top_k_hits.add_entries(partial_hits.into_iter()); self.failed_splits.extend(failed_splits); @@ -1266,6 +1279,7 @@ impl IncrementalCollector { num_attempted_splits: self.num_attempted_splits, num_successful_splits: self.num_successful_splits, intermediate_aggregation_result, + resource_stats: self.resource_stats, }) } } @@ -1275,8 +1289,8 @@ mod tests { use std::cmp::Ordering; use quickwit_proto::search::{ - LeafSearchResponse, PartialHit, SearchRequest, SortByValue, SortField, SortOrder, - SortValue, SplitSearchError, + LeafSearchResponse, PartialHit, ResourceStats, SearchRequest, SortByValue, SortField, + SortOrder, SortValue, SplitSearchError, }; use tantivy::collector::Collector; use tantivy::TantivyDocument; @@ -1772,6 +1786,7 @@ mod tests { num_attempted_splits: 3, num_successful_splits: 3, intermediate_aggregation_result: None, + resource_stats: None, }], ); @@ -1789,7 +1804,8 @@ mod tests { failed_splits: Vec::new(), num_attempted_splits: 3, num_successful_splits: 3, - intermediate_aggregation_result: None + intermediate_aggregation_result: None, + resource_stats: None, } ); @@ -1828,6 +1844,7 @@ mod tests { num_attempted_splits: 3, num_successful_splits: 3, intermediate_aggregation_result: None, + resource_stats: None, }, LeafSearchResponse { num_hits: 10, @@ -1846,6 +1863,7 @@ mod tests { num_attempted_splits: 2, num_successful_splits: 1, intermediate_aggregation_result: None, + resource_stats: None, }, ], ); @@ -1877,7 +1895,8 @@ mod tests { }], num_attempted_splits: 5, num_successful_splits: 4, - intermediate_aggregation_result: None + intermediate_aggregation_result: None, + resource_stats: None, } ); @@ -1917,6 +1936,10 @@ mod tests { num_attempted_splits: 3, num_successful_splits: 3, intermediate_aggregation_result: None, + resource_stats: Some(ResourceStats { + cpu_microsecs: 100, + ..Default::default() + }), }, LeafSearchResponse { num_hits: 10, @@ -1935,6 +1958,10 @@ mod tests { num_attempted_splits: 2, num_successful_splits: 1, intermediate_aggregation_result: None, + resource_stats: Some(ResourceStats { + cpu_microsecs: 50, + ..Default::default() + }), }, ], ); @@ -1966,7 +1993,11 @@ mod tests { }], num_attempted_splits: 5, num_successful_splits: 4, - intermediate_aggregation_result: None + intermediate_aggregation_result: None, + resource_stats: Some(ResourceStats { + cpu_microsecs: 150, + ..Default::default() + }), } ); // TODO would be nice to test aggregation too. diff --git a/quickwit/quickwit-search/src/fetch_docs.rs b/quickwit/quickwit-search/src/fetch_docs.rs index 9c326764539..d75f7efff0c 100644 --- a/quickwit/quickwit-search/src/fetch_docs.rs +++ b/quickwit/quickwit-search/src/fetch_docs.rs @@ -174,12 +174,12 @@ async fn fetch_docs_in_split( global_doc_addrs.sort_by_key(|doc| doc.doc_addr); // Opens the index without the ephemeral unbounded cache, this cache is indeed not useful // when fetching docs as we will fetch them only once. - let mut index = open_index_with_caches( + let (mut index, _) = open_index_with_caches( &searcher_context, index_storage, split, Some(doc_mapper.tokenizer_manager()), - false, + None, ) .await .context("open-index-for-split")?; diff --git a/quickwit/quickwit-search/src/leaf.rs b/quickwit/quickwit-search/src/leaf.rs index 5ad92f63aa2..236149ca038 100644 --- a/quickwit/quickwit-search/src/leaf.rs +++ b/quickwit/quickwit-search/src/leaf.rs @@ -22,35 +22,37 @@ use std::ops::Bound; use std::path::PathBuf; use std::str::FromStr; use std::sync::{Arc, Mutex, RwLock}; +use std::time::{Duration, Instant}; use anyhow::Context; +use bytesize::ByteSize; use futures::future::try_join_all; use quickwit_common::pretty::PrettySample; use quickwit_directories::{CachingDirectory, HotDirectory, StorageDirectory}; use quickwit_doc_mapper::{DocMapper, TermRange, WarmupInfo}; use quickwit_proto::search::{ - CountHits, LeafSearchRequest, LeafSearchResponse, PartialHit, SearchRequest, SortOrder, - SortValue, SplitIdAndFooterOffsets, SplitSearchError, + CountHits, LeafSearchRequest, LeafSearchResponse, PartialHit, ResourceStats, SearchRequest, + SortOrder, SortValue, SplitIdAndFooterOffsets, SplitSearchError, }; use quickwit_query::query_ast::{BoolQuery, QueryAst, QueryAstTransformer, RangeQuery, TermQuery}; use quickwit_query::tokenizers::TokenizerManager; use quickwit_storage::{ - wrap_storage_with_cache, BundleStorage, MemorySizedCache, OwnedBytes, SplitCache, Storage, - StorageResolver, TimeoutAndRetryStorage, + wrap_storage_with_cache, BundleStorage, ByteRangeCache, MemorySizedCache, OwnedBytes, + SplitCache, Storage, StorageResolver, TimeoutAndRetryStorage, }; use tantivy::aggregation::agg_req::{AggregationVariants, Aggregations}; use tantivy::aggregation::AggregationLimitsGuard; use tantivy::directory::FileSlice; use tantivy::fastfield::FastFieldReaders; use tantivy::schema::Field; -use tantivy::{DateTime, Index, ReloadPolicy, Searcher, Term}; +use tantivy::{DateTime, Index, ReloadPolicy, Searcher, TantivyError, Term}; use tokio::task::JoinError; use tracing::*; use crate::collector::{make_collector_for_split, make_merge_collector, IncrementalCollector}; use crate::metrics::SEARCH_METRICS; use crate::root::is_metadata_count_request_with_ast; -use crate::search_permit_provider::SearchPermit; +use crate::search_permit_provider::{compute_initial_memory_allocation, SearchPermit}; use crate::service::{deserialize_doc_mapper, SearcherContext}; use crate::{QuickwitAggregations, SearchError}; @@ -124,33 +126,39 @@ pub(crate) async fn open_split_bundle( Ok((hotcache_bytes, bundle_storage)) } +/// Add a storage proxy to retry `get_slice` requests if they are taking too long, +/// if configured in the searcher config. +/// +/// The goal here is too ensure a low latency. +fn configure_storage_retries( + searcher_context: &SearcherContext, + index_storage: Arc, +) -> Arc { + if let Some(storage_timeout_policy) = &searcher_context.searcher_config.storage_timeout_policy { + Arc::new(TimeoutAndRetryStorage::new( + index_storage, + storage_timeout_policy.clone(), + )) + } else { + index_storage + } +} + /// Opens a `tantivy::Index` for the given split with several cache layers: /// - A split footer cache given by `SearcherContext.split_footer_cache`. /// - A fast fields cache given by `SearcherContext.storage_long_term_cache`. -/// - An ephemeral unbounded cache directory whose lifetime is tied to the returned `Index`. +/// - An ephemeral unbounded cache directory (whose lifetime is tied to the +/// returned `Index` if no `ByteRangeCache` is provided). #[instrument(skip_all, fields(split_footer_start=split_and_footer_offsets.split_footer_start, split_footer_end=split_and_footer_offsets.split_footer_end))] pub(crate) async fn open_index_with_caches( searcher_context: &SearcherContext, index_storage: Arc, split_and_footer_offsets: &SplitIdAndFooterOffsets, tokenizer_manager: Option<&TokenizerManager>, - ephemeral_unbounded_cache: bool, -) -> anyhow::Result { - // Let's add a storage proxy to retry `get_slice` requests if they are taking too long, - // if configured in the searcher config. - // - // The goal here is too ensure a low latency. - - let index_storage_with_retry_on_timeout = if let Some(storage_timeout_policy) = - &searcher_context.searcher_config.storage_timeout_policy - { - Arc::new(TimeoutAndRetryStorage::new( - index_storage, - storage_timeout_policy.clone(), - )) - } else { - index_storage - }; + ephemeral_unbounded_cache: Option, +) -> anyhow::Result<(Index, HotDirectory)> { + let index_storage_with_retry_on_timeout = + configure_storage_retries(searcher_context, index_storage); let (hotcache_bytes, bundle_storage) = open_split_bundle( searcher_context, @@ -166,14 +174,14 @@ pub(crate) async fn open_index_with_caches( let directory = StorageDirectory::new(bundle_storage_with_cache); - let hot_directory = if ephemeral_unbounded_cache { - let caching_directory = CachingDirectory::new_unbounded(Arc::new(directory)); + let hot_directory = if let Some(cache) = ephemeral_unbounded_cache { + let caching_directory = CachingDirectory::new(Arc::new(directory), cache); HotDirectory::open(caching_directory, hotcache_bytes.read_bytes()?)? } else { HotDirectory::open(directory, hotcache_bytes.read_bytes()?)? }; - let mut index = Index::open(hot_directory)?; + let mut index = Index::open(hot_directory.clone())?; if let Some(tokenizer_manager) = tokenizer_manager { index.set_tokenizers(tokenizer_manager.tantivy_manager().clone()); } @@ -182,7 +190,7 @@ pub(crate) async fn open_index_with_caches( .tantivy_manager() .clone(), ); - Ok(index) + Ok((index, hot_directory)) } /// Tantivy search does not make it possible to fetch data asynchronously during @@ -363,10 +371,23 @@ fn get_leaf_resp_from_count(count: u64) -> LeafSearchResponse { num_attempted_splits: 1, num_successful_splits: 1, intermediate_aggregation_result: None, + resource_stats: None, } } +/// Compute the size of the index, store excluded. +fn compute_index_size(hot_directory: &HotDirectory) -> ByteSize { + let size_bytes = hot_directory + .get_file_lengths() + .iter() + .filter(|(path, _)| !path.to_string_lossy().ends_with("store")) + .map(|(_, size)| *size) + .sum(); + ByteSize(size_bytes) +} + /// Apply a leaf search on a single split. +#[allow(clippy::too_many_arguments)] async fn leaf_search_single_split( searcher_context: &SearcherContext, mut search_request: SearchRequest, @@ -375,6 +396,7 @@ async fn leaf_search_single_split( doc_mapper: Arc, split_filter: Arc>, aggregations_limits: AggregationLimitsGuard, + search_permit: &mut SearchPermit, ) -> crate::Result { rewrite_request( &mut search_request, @@ -400,15 +422,21 @@ async fn leaf_search_single_split( } let split_id = split.split_id.to_string(); - let index = open_index_with_caches( + let byte_range_cache = + ByteRangeCache::with_infinite_capacity(&quickwit_storage::STORAGE_METRICS.shortlived_cache); + let (index, hot_directory) = open_index_with_caches( searcher_context, storage, &split, Some(doc_mapper.tokenizer_manager()), - true, + Some(byte_range_cache.clone()), ) .await?; - let split_schema = index.schema(); + + let index_size = compute_index_size(&hot_directory); + if index_size < search_permit.memory_allocation() { + search_permit.update_memory_usage(index_size); + } let reader = index .reader_builder() @@ -419,13 +447,33 @@ async fn leaf_search_single_split( let mut collector = make_collector_for_split(split_id.clone(), &search_request, aggregations_limits)?; + let split_schema = index.schema(); let (query, mut warmup_info) = doc_mapper.query(split_schema.clone(), &query_ast, false)?; let collector_warmup_info = collector.warmup_info(); warmup_info.merge(collector_warmup_info); warmup_info.simplify(); + let warmup_start = Instant::now(); warmup(&searcher, &warmup_info).await?; + let warmup_end = Instant::now(); + let warmup_duration: Duration = warmup_end.duration_since(warmup_start); + let warmup_size = ByteSize(byte_range_cache.get_num_bytes()); + if warmup_size > search_permit.memory_allocation() { + warn!( + memory_usage = ?warmup_size, + memory_allocation = ?search_permit.memory_allocation(), + "current leaf search is consuming more memory than the initial allocation" + ); + } + crate::SEARCH_METRICS + .leaf_search_single_split_warmup_num_bytes + .observe(warmup_size.as_u64() as f64); + search_permit.update_memory_usage(warmup_size); + search_permit.free_warmup_slot(); + + let split_num_docs = split.num_docs; + let span = info_span!("tantivy_search"); let (search_request, leaf_search_response) = { @@ -433,25 +481,31 @@ async fn leaf_search_single_split( crate::search_thread_pool() .run_cpu_intensive(move || { + let cpu_start = Instant::now(); + let cpu_thread_pool_wait_microsecs = cpu_start.duration_since(warmup_end); let _span_guard = span.enter(); // Our search execution has been scheduled, let's check if we can improve the // request based on the results of the preceding searches check_optimize_search_request(&mut search_request, &split, &split_filter); collector.update_search_param(&search_request); - if is_metadata_count_request_with_ast(&query_ast, &search_request) { - return Ok(( - search_request, - get_leaf_resp_from_count(searcher.num_docs() as u64), - )); - } - if collector.is_count_only() { - let count = query.count(&searcher)? as u64; - Ok((search_request, get_leaf_resp_from_count(count))) - } else { - searcher - .search(&query, &collector) - .map(|resp| (search_request, resp)) - } + let mut leaf_search_response: LeafSearchResponse = + if is_metadata_count_request_with_ast(&query_ast, &search_request) { + get_leaf_resp_from_count(searcher.num_docs()) + } else if collector.is_count_only() { + let count = query.count(&searcher)? as u64; + get_leaf_resp_from_count(count) + } else { + searcher.search(&query, &collector)? + }; + leaf_search_response.resource_stats = Some(ResourceStats { + cpu_microsecs: cpu_start.elapsed().as_micros() as u64, + short_lived_cache_num_bytes: warmup_size.as_u64(), + split_num_docs, + warmup_microsecs: warmup_duration.as_micros() as u64, + cpu_thread_pool_wait_microsecs: cpu_thread_pool_wait_microsecs.as_micros() + as u64, + }); + Result::<_, TantivyError>::Ok((search_request, leaf_search_response)) }) .await .map_err(|_| { @@ -1261,17 +1315,25 @@ pub async fn leaf_search( // We acquire all of the leaf search permits to make sure our single split search tasks // do no interleave with other leaf search requests. + let permit_sizes = split_with_req.iter().map(|(split, _)| { + compute_initial_memory_allocation( + split, + searcher_context + .searcher_config + .warmup_single_split_initial_allocation, + ) + }); let permit_futures = searcher_context .search_permit_provider - .get_permits(split_with_req.len()); + .get_permits(permit_sizes) + .await; for ((split, mut request), permit_fut) in split_with_req.into_iter().zip(permit_futures.into_iter()) { let leaf_split_search_permit = permit_fut .instrument(info_span!("waiting_for_leaf_search_split_semaphore")) - .await - .expect("Failed to acquire permit. This should never happen! Please, report on https://github.com/quickwit-oss/quickwit/issues."); + .await; let can_be_better = check_optimize_search_request(&mut request, &split, &split_filter); if !can_be_better && !run_all_splits { @@ -1361,7 +1423,7 @@ async fn leaf_search_single_split_wrapper( split: SplitIdAndFooterOffsets, split_filter: Arc>, incremental_merge_collector: Arc>, - search_permit: SearchPermit, + mut search_permit: SearchPermit, aggregations_limits: AggregationLimitsGuard, ) { crate::SEARCH_METRICS.leaf_searches_splits_total.inc(); @@ -1376,10 +1438,12 @@ async fn leaf_search_single_split_wrapper( doc_mapper, split_filter.clone(), aggregations_limits, + &mut search_permit, ) .await; - // We explicitly drop it, to highlight it to the reader + // Explicitly drop the permit for readability. + // This should always happen after the ephemeral search cache is dropped. std::mem::drop(search_permit); if leaf_search_single_split_res.is_ok() { @@ -1417,6 +1481,15 @@ async fn leaf_search_single_split_wrapper( mod tests { use std::ops::Bound; + use bytes::BufMut; + use quickwit_directories::write_hotcache; + use rand::{thread_rng, Rng}; + use tantivy::directory::RamDirectory; + use tantivy::schema::{ + BytesOptions, FieldEntry, Schema, TextFieldIndexing, TextOptions, Value, + }; + use tantivy::TantivyDocument; + use super::*; fn bool_filter(ast: impl Into) -> QueryAst { @@ -1852,4 +1925,97 @@ mod tests { assert_eq!(rewrote_bounds_agg, no_bounds_agg); } } + + fn create_tantivy_dir_with_hotcache<'a, V>( + field_entry: FieldEntry, + field_value: V, + ) -> (HotDirectory, usize) + where + V: Value<'a>, + { + let field_name = field_entry.name().to_string(); + let mut schema_builder = Schema::builder(); + schema_builder.add_field(field_entry); + let schema = schema_builder.build(); + + let ram_directory = RamDirectory::create(); + let index = Index::open_or_create(ram_directory.clone(), schema.clone()).unwrap(); + + let mut index_writer = index.writer(15_000_000).unwrap(); + let field = schema.get_field(&field_name).unwrap(); + let mut new_doc = TantivyDocument::default(); + new_doc.add_field_value(field, field_value); + index_writer.add_document(new_doc).unwrap(); + index_writer.commit().unwrap(); + + let mut hotcache_bytes_writer = Vec::new().writer(); + write_hotcache(ram_directory.clone(), &mut hotcache_bytes_writer).unwrap(); + let hotcache_bytes = OwnedBytes::new(hotcache_bytes_writer.into_inner()); + let hot_directory = HotDirectory::open(ram_directory.clone(), hotcache_bytes).unwrap(); + (hot_directory, ram_directory.total_mem_usage()) + } + + #[test] + fn test_compute_index_size_without_store() { + // We don't want to make assertions on absolute index sizes (it might + // change in future Tantivy versions), but rather verify that the store + // is properly excluded from the computed size. + + // We use random bytes so that the store can't compress them + let mut payload = vec![0u8; 1024]; + thread_rng().fill(&mut payload[..]); + + let (hotcache_directory_stored_payload, directory_size_stored_payload) = + create_tantivy_dir_with_hotcache( + FieldEntry::new_bytes("payload".to_string(), BytesOptions::default().set_stored()), + &payload, + ); + let size_with_stored_payload = + compute_index_size(&hotcache_directory_stored_payload).as_u64(); + + let (hotcache_directory_index_only, directory_size_index_only) = + create_tantivy_dir_with_hotcache( + FieldEntry::new_bytes("payload".to_string(), BytesOptions::default()), + &payload, + ); + let size_index_only = compute_index_size(&hotcache_directory_index_only).as_u64(); + + assert!(directory_size_stored_payload > directory_size_index_only + 1000); + assert!(size_with_stored_payload.abs_diff(size_index_only) < 10); + } + + #[test] + fn test_compute_index_size_varies_with_data() { + // We don't want to make assertions on absolute index sizes (it might + // change in future Tantivy versions), but rather verify that an index + // with more data is indeed bigger. + + let indexing_options = + TextOptions::default().set_indexing_options(TextFieldIndexing::default()); + + let (hotcache_directory_larger, directory_size_larger) = create_tantivy_dir_with_hotcache( + FieldEntry::new_text("text".to_string(), indexing_options.clone()), + "Sed ut perspiciatis unde omnis iste natus error sit voluptatem accusantium \ + doloremque laudantium, totam rem aperiam, eaque ipsa quae ab illo inventore \ + veritatis et quasi architecto beatae vitae dicta sunt explicabo. Nemo enim ipsam \ + voluptatem quia voluptas sit aspernatur aut odit aut fugit, sed quia consequuntur \ + magni dolores eos qui ratione voluptatem sequi nesciunt. Neque porro quisquam est, \ + qui dolorem ipsum quia dolor sit amet, consectetur, adipisci velit, sed quia non \ + numquam eius modi tempora incidunt ut labore et dolore magnam aliquam quaerat \ + voluptatem. Ut enim ad minima veniam, quis nostrum exercitationem ullam corporis \ + suscipit laboriosam, nisi ut aliquid ex ea commodi consequatur? Quis autem vel eum \ + iure reprehenderit qui in ea voluptate velit esse quam nihil molestiae consequatur, \ + vel illum qui dolorem eum fugiat quo voluptas nulla pariatur?", + ); + let larger_size = compute_index_size(&hotcache_directory_larger).as_u64(); + + let (hotcache_directory_smaller, directory_size_smaller) = create_tantivy_dir_with_hotcache( + FieldEntry::new_text("text".to_string(), indexing_options), + "hi", + ); + let smaller_size = compute_index_size(&hotcache_directory_smaller).as_u64(); + + assert!(directory_size_larger > directory_size_smaller + 100); + assert!(larger_size > smaller_size + 100); + } } diff --git a/quickwit/quickwit-search/src/leaf_cache.rs b/quickwit/quickwit-search/src/leaf_cache.rs index 491f66f3aee..016cdd5b00f 100644 --- a/quickwit/quickwit-search/src/leaf_cache.rs +++ b/quickwit/quickwit-search/src/leaf_cache.rs @@ -192,7 +192,8 @@ impl std::ops::RangeBounds for Range { #[cfg(test)] mod tests { use quickwit_proto::search::{ - LeafSearchResponse, PartialHit, SearchRequest, SortValue, SplitIdAndFooterOffsets, + LeafSearchResponse, PartialHit, ResourceStats, SearchRequest, SortValue, + SplitIdAndFooterOffsets, }; use super::LeafSearchCache; @@ -252,6 +253,7 @@ mod tests { sort_value2: None, split_id: "split_1".to_string(), }], + resource_stats: None, }; assert!(cache.get(split_1.clone(), query_1.clone()).is_none()); @@ -342,6 +344,7 @@ mod tests { sort_value2: None, split_id: "split_1".to_string(), }], + resource_stats: Some(ResourceStats::default()), }; // for split_1, 1 and 1bis cover different timestamp ranges diff --git a/quickwit/quickwit-search/src/lib.rs b/quickwit/quickwit-search/src/lib.rs index a81a974d75d..b7c03a0c5ea 100644 --- a/quickwit/quickwit-search/src/lib.rs +++ b/quickwit/quickwit-search/src/lib.rs @@ -72,7 +72,9 @@ use quickwit_metastore::{ IndexMetadata, ListIndexesMetadataResponseExt, ListSplitsQuery, ListSplitsRequestExt, MetastoreServiceStreamSplitsExt, SplitMetadata, SplitState, }; -use quickwit_proto::search::{PartialHit, SearchRequest, SearchResponse, SplitIdAndFooterOffsets}; +use quickwit_proto::search::{ + PartialHit, ResourceStats, SearchRequest, SearchResponse, SplitIdAndFooterOffsets, +}; use quickwit_proto::types::IndexUid; use quickwit_storage::StorageResolver; pub use service::SearcherContext; @@ -340,3 +342,126 @@ pub fn searcher_pool_for_test( }), ) } + +pub(crate) fn merge_resource_stats_it<'a>( + stats_it: impl IntoIterator>, +) -> Option { + let mut acc_stats: Option = None; + for new_stats in stats_it { + merge_resource_stats(new_stats, &mut acc_stats); + } + acc_stats +} + +fn merge_resource_stats( + new_stats_opt: &Option, + stat_accs_opt: &mut Option, +) { + if let Some(new_stats) = new_stats_opt { + if let Some(stat_accs) = stat_accs_opt { + stat_accs.short_lived_cache_num_bytes += new_stats.short_lived_cache_num_bytes; + stat_accs.split_num_docs += new_stats.split_num_docs; + stat_accs.warmup_microsecs += new_stats.warmup_microsecs; + stat_accs.cpu_thread_pool_wait_microsecs += new_stats.cpu_thread_pool_wait_microsecs; + stat_accs.cpu_microsecs += new_stats.cpu_microsecs; + } else { + *stat_accs_opt = Some(new_stats.clone()); + } + } +} +#[cfg(test)] +mod stats_merge_tests { + use super::*; + + #[test] + fn test_merge_resource_stats() { + let mut acc_stats = None; + + merge_resource_stats(&None, &mut acc_stats); + + assert_eq!(acc_stats, None); + + let stats = Some(ResourceStats { + short_lived_cache_num_bytes: 100, + split_num_docs: 200, + warmup_microsecs: 300, + cpu_thread_pool_wait_microsecs: 400, + cpu_microsecs: 500, + }); + + merge_resource_stats(&stats, &mut acc_stats); + + assert_eq!(acc_stats, stats); + + let new_stats = Some(ResourceStats { + short_lived_cache_num_bytes: 50, + split_num_docs: 100, + warmup_microsecs: 150, + cpu_thread_pool_wait_microsecs: 200, + cpu_microsecs: 250, + }); + + merge_resource_stats(&new_stats, &mut acc_stats); + + let stats_plus_new_stats = Some(ResourceStats { + short_lived_cache_num_bytes: 150, + split_num_docs: 300, + warmup_microsecs: 450, + cpu_thread_pool_wait_microsecs: 600, + cpu_microsecs: 750, + }); + + assert_eq!(acc_stats, stats_plus_new_stats); + + merge_resource_stats(&None, &mut acc_stats); + + assert_eq!(acc_stats, stats_plus_new_stats); + } + + #[test] + fn test_merge_resource_stats_it() { + let merged_stats = merge_resource_stats_it(Vec::<&Option>::new()); + assert_eq!(merged_stats, None); + + let stats1 = Some(ResourceStats { + short_lived_cache_num_bytes: 100, + split_num_docs: 200, + warmup_microsecs: 300, + cpu_thread_pool_wait_microsecs: 400, + cpu_microsecs: 500, + }); + + let merged_stats = merge_resource_stats_it(vec![&None, &stats1, &None]); + + assert_eq!(merged_stats, stats1); + + let stats2 = Some(ResourceStats { + short_lived_cache_num_bytes: 50, + split_num_docs: 100, + warmup_microsecs: 150, + cpu_thread_pool_wait_microsecs: 200, + cpu_microsecs: 250, + }); + + let stats3 = Some(ResourceStats { + short_lived_cache_num_bytes: 25, + split_num_docs: 50, + warmup_microsecs: 75, + cpu_thread_pool_wait_microsecs: 100, + cpu_microsecs: 125, + }); + + let merged_stats = merge_resource_stats_it(vec![&stats1, &stats2, &stats3]); + + assert_eq!( + merged_stats, + Some(ResourceStats { + short_lived_cache_num_bytes: 175, + split_num_docs: 350, + warmup_microsecs: 525, + cpu_thread_pool_wait_microsecs: 700, + cpu_microsecs: 875, + }) + ); + } +} diff --git a/quickwit/quickwit-search/src/list_terms.rs b/quickwit/quickwit-search/src/list_terms.rs index 765203438d1..f796252c125 100644 --- a/quickwit/quickwit-search/src/list_terms.rs +++ b/quickwit/quickwit-search/src/list_terms.rs @@ -33,13 +33,14 @@ use quickwit_proto::search::{ SplitIdAndFooterOffsets, SplitSearchError, }; use quickwit_proto::types::IndexUid; -use quickwit_storage::Storage; +use quickwit_storage::{ByteRangeCache, Storage}; use tantivy::schema::{Field, FieldType}; use tantivy::{ReloadPolicy, Term}; use tracing::{debug, error, info, instrument}; use crate::leaf::open_index_with_caches; use crate::search_job_placer::group_jobs_by_index_id; +use crate::search_permit_provider::compute_initial_memory_allocation; use crate::{resolve_index_patterns, ClusterClient, SearchError, SearchJob, SearcherContext}; /// Performs a distributed list terms. @@ -216,7 +217,10 @@ async fn leaf_list_terms_single_split( storage: Arc, split: SplitIdAndFooterOffsets, ) -> crate::Result { - let index = open_index_with_caches(searcher_context, storage, &split, None, true).await?; + let cache = + ByteRangeCache::with_infinite_capacity(&quickwit_storage::STORAGE_METRICS.shortlived_cache); + let (index, _) = + open_index_with_caches(searcher_context, storage, &split, None, Some(cache)).await?; let split_schema = index.schema(); let reader = index .reader_builder() @@ -325,18 +329,26 @@ pub async fn leaf_list_terms( splits: &[SplitIdAndFooterOffsets], ) -> Result { info!(split_offsets = ?PrettySample::new(splits, 5)); + let permit_sizes = splits.iter().map(|split| { + compute_initial_memory_allocation( + split, + searcher_context + .searcher_config + .warmup_single_split_initial_allocation, + ) + }); + let permits = searcher_context + .search_permit_provider + .get_permits(permit_sizes) + .await; let leaf_search_single_split_futures: Vec<_> = splits .iter() - .map(|split| { + .zip(permits.into_iter()) + .map(|(split, search_permit_recv)| { let index_storage_clone = index_storage.clone(); let searcher_context_clone = searcher_context.clone(); async move { - let _leaf_split_search_permit = searcher_context_clone - .search_permit_provider - .get_permit() - .await - .expect("Failed to acquire permit. This should never happen! Please, report on https://github.com/quickwit-oss/quickwit/issues."); - + let leaf_split_search_permit = search_permit_recv.await; // TODO dedicated counter and timer? crate::SEARCH_METRICS.leaf_searches_splits_total.inc(); let timer = crate::SEARCH_METRICS @@ -350,6 +362,11 @@ pub async fn leaf_list_terms( ) .await; timer.observe_duration(); + + // Explicitly drop the permit for readability. + // This should always happen after the ephemeral search cache is dropped. + std::mem::drop(leaf_split_search_permit); + leaf_search_single_split_res.map_err(|err| (split.split_id.clone(), err)) } }) diff --git a/quickwit/quickwit-search/src/metrics.rs b/quickwit/quickwit-search/src/metrics.rs index 35b7d3115c5..55bff88a565 100644 --- a/quickwit/quickwit-search/src/metrics.rs +++ b/quickwit/quickwit-search/src/metrics.rs @@ -19,6 +19,7 @@ // See https://prometheus.io/docs/practices/naming/ +use bytesize::ByteSize; use once_cell::sync::Lazy; use quickwit_common::metrics::{ exponential_buckets, linear_buckets, new_counter, new_counter_vec, new_gauge_vec, @@ -37,6 +38,7 @@ pub struct SearchMetrics { pub job_assigned_total: IntCounterVec<1>, pub leaf_search_single_split_tasks_pending: IntGauge, pub leaf_search_single_split_tasks_ongoing: IntGauge, + pub leaf_search_single_split_warmup_num_bytes: Histogram, } impl Default for SearchMetrics { @@ -52,6 +54,18 @@ impl Default for SearchMetrics { .copied() .collect(); + let pseudo_exponential_bytes_buckets = vec![ + ByteSize::mb(10).as_u64() as f64, + ByteSize::mb(20).as_u64() as f64, + ByteSize::mb(50).as_u64() as f64, + ByteSize::mb(100).as_u64() as f64, + ByteSize::mb(200).as_u64() as f64, + ByteSize::mb(500).as_u64() as f64, + ByteSize::gb(1).as_u64() as f64, + ByteSize::gb(2).as_u64() as f64, + ByteSize::gb(5).as_u64() as f64, + ]; + let leaf_search_single_split_tasks = new_gauge_vec::<1>( "leaf_search_single_split_tasks", "Number of single split search tasks pending or ongoing", @@ -124,6 +138,12 @@ impl Default for SearchMetrics { .with_label_values(["ongoing"]), leaf_search_single_split_tasks_pending: leaf_search_single_split_tasks .with_label_values(["pending"]), + leaf_search_single_split_warmup_num_bytes: new_histogram( + "leaf_search_single_split_warmup_num_bytes", + "Size of the short lived cache for a single split once the warmup is done.", + "search", + pseudo_exponential_bytes_buckets, + ), job_assigned_total: new_counter_vec( "job_assigned_total", "Number of job assigned to searchers, per affinity rank.", diff --git a/quickwit/quickwit-search/src/root.rs b/quickwit/quickwit-search/src/root.rs index 608bc87e479..724687148f2 100644 --- a/quickwit/quickwit-search/src/root.rs +++ b/quickwit/quickwit-search/src/root.rs @@ -18,6 +18,7 @@ // along with this program. If not, see . use std::collections::{HashMap, HashSet}; +use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::OnceLock; use std::time::Duration; @@ -49,7 +50,7 @@ use tantivy::aggregation::intermediate_agg_result::IntermediateAggregationResult use tantivy::collector::Collector; use tantivy::schema::{Field, FieldEntry, FieldType, Schema}; use tantivy::TantivyError; -use tracing::{debug, info, info_span, instrument}; +use tracing::{debug, info_span, instrument}; use crate::cluster_client::ClusterClient; use crate::collector::{make_merge_collector, QuickwitAggregations}; @@ -683,10 +684,46 @@ pub fn get_count_from_metadata(split_metadatas: &[SplitMetadata]) -> Vec bool { + // It is not worth considering small splits for this. + if split_num_docs < 100_000 { + return false; + } + // We multiply those figure by 1_000 for accuracy. + const PERCENTILE: u64 = 95; + const PRIOR_NUM_BYTES_PER_DOC: u64 = 3 * 1_000; + static NUM_BYTES_PER_DOC_95_PERCENTILE_ESTIMATOR: AtomicU64 = + AtomicU64::new(PRIOR_NUM_BYTES_PER_DOC); + let num_bits_per_docs = num_bytes * 1_000 / split_num_docs; + let current_estimator = NUM_BYTES_PER_DOC_95_PERCENTILE_ESTIMATOR.load(Ordering::Relaxed); + let is_memory_intensive = num_bits_per_docs > current_estimator; + let new_estimator: u64 = if is_memory_intensive { + current_estimator.saturating_add(PRIOR_NUM_BYTES_PER_DOC * PERCENTILE / 100) + } else { + current_estimator.saturating_sub(PRIOR_NUM_BYTES_PER_DOC * (100 - PERCENTILE) / 100) + }; + // We do not use fetch_add / fetch_sub directly as they wrap around. + // Concurrency could lead to different results here, but really we don't care. + // + // This is just ignoring some gradient updates. + NUM_BYTES_PER_DOC_95_PERCENTILE_ESTIMATOR.store(new_estimator, Ordering::Relaxed); + is_memory_intensive +} + /// If this method fails for some splits, a partial search response is returned, with the list of /// faulty splits in the failed_splits field. #[instrument(level = "debug", skip_all)] @@ -744,9 +781,21 @@ pub(crate) async fn search_partial_hits_phase( has_intermediate_aggregation_result = leaf_search_response.intermediate_aggregation_result.is_some(), "Merged leaf search response." ); + + if let Some(resource_stats) = &leaf_search_response.resource_stats { + if is_top_5pct_memory_intensive( + resource_stats.short_lived_cache_num_bytes, + resource_stats.split_num_docs, + ) { + // We log at most 5 times per minute. + quickwit_common::rate_limited_info!(limit_per_min=5, split_num_docs=resource_stats.split_num_docs, %search_request.query_ast, short_lived_cached_num_bytes=resource_stats.short_lived_cache_num_bytes, query=%search_request.query_ast, "memory intensive query"); + } + } + if !leaf_search_response.failed_splits.is_empty() { quickwit_common::rate_limited_error!(limit_per_min=6, failed_splits = ?leaf_search_response.failed_splits, "leaf search response contains at least one failed split"); } + Ok(leaf_search_response) } @@ -1114,7 +1163,6 @@ pub async fn root_search( mut metastore: MetastoreServiceClient, cluster_client: &ClusterClient, ) -> crate::Result { - info!(searcher_context = ?searcher_context, search_request = ?search_request); let start_instant = tokio::time::Instant::now(); let list_indexes_metadatas_request = ListIndexesMetadataRequest { index_id_patterns: search_request.index_id_patterns.clone(), @@ -1169,9 +1217,12 @@ pub async fn root_search( ) .await; + let elapsed = start_instant.elapsed(); + if let Ok(search_response) = &mut search_response_result { - search_response.elapsed_time_micros = start_instant.elapsed().as_micros() as u64; + search_response.elapsed_time_micros = elapsed.as_micros() as u64; } + let label_values = if search_response_result.is_ok() { ["success"] } else { diff --git a/quickwit/quickwit-search/src/search_permit_provider.rs b/quickwit/quickwit-search/src/search_permit_provider.rs index f6883efb34b..64bc36ff3a6 100644 --- a/quickwit/quickwit-search/src/search_permit_provider.rs +++ b/quickwit/quickwit-search/src/search_permit_provider.rs @@ -18,109 +18,221 @@ // along with this program. If not, see . use std::collections::VecDeque; -use std::sync::{Arc, Mutex}; +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; +use bytesize::ByteSize; use quickwit_common::metrics::GaugeGuard; -use tokio::sync::oneshot; +use quickwit_proto::search::SplitIdAndFooterOffsets; +#[cfg(test)] +use tokio::sync::watch; +use tokio::sync::{mpsc, oneshot}; -/// `SearchPermitProvider` is a distributor of permits to perform single split -/// search operation. +/// Distributor of permits to perform split search operation. /// -/// Requests are served in order. +/// Requests are served in order. Each permit initially reserves a slot for the +/// warmup (limit concurrent downloads) and a pessimistic amount of memory. Once +/// the warmup is completed, the actual memory usage is set and the warmup slot +/// is released. Once the search is completed and the permit is dropped, the +/// remaining memory is also released. #[derive(Clone)] pub struct SearchPermitProvider { - inner_arc: Arc>, + message_sender: mpsc::UnboundedSender, + #[cfg(test)] + actor_stopped: watch::Receiver, +} + +#[derive(Debug)] +pub enum SearchPermitMessage { + Request { + permit_sender: oneshot::Sender>, + permit_sizes: Vec, + }, + UpdateMemory { + memory_delta: i64, + }, + FreeWarmupSlot, + Drop { + memory_size: u64, + warmup_slot_freed: bool, + }, +} + +/// Makes very pessimistic estimate of the memory allocation required for a split search +/// +/// This is refined later on when more data is available about the split. +pub fn compute_initial_memory_allocation( + split: &SplitIdAndFooterOffsets, + warmup_single_split_initial_allocation: ByteSize, +) -> ByteSize { + let split_size = split.split_footer_start; + // we consider the configured initial allocation to be set for a large split with 10M docs + const LARGE_SPLIT_NUM_DOCS: u64 = 10_000_000; + let proportional_allocation = + warmup_single_split_initial_allocation.as_u64() * split.num_docs / LARGE_SPLIT_NUM_DOCS; + let size_bytes = [ + split_size, + proportional_allocation, + warmup_single_split_initial_allocation.as_u64(), + ] + .into_iter() + .min() + .unwrap(); + const MINIMUM_ALLOCATION_BYTES: u64 = 10_000_000; + ByteSize(size_bytes.max(MINIMUM_ALLOCATION_BYTES)) } impl SearchPermitProvider { - pub fn new(num_permits: usize) -> SearchPermitProvider { - SearchPermitProvider { - inner_arc: Arc::new(Mutex::new(InnerSearchPermitProvider { - num_permits_available: num_permits, - permits_requests: VecDeque::new(), - })), + pub fn new(num_download_slots: usize, memory_budget: ByteSize) -> Self { + let (message_sender, message_receiver) = mpsc::unbounded_channel(); + #[cfg(test)] + let (state_sender, state_receiver) = watch::channel(false); + let actor = SearchPermitActor { + msg_receiver: message_receiver, + msg_sender: message_sender.downgrade(), + num_warmup_slots_available: num_download_slots, + total_memory_budget: memory_budget.as_u64(), + permits_requests: VecDeque::new(), + total_memory_allocated: 0u64, + #[cfg(test)] + stopped: state_sender, + }; + tokio::spawn(actor.run()); + Self { + message_sender, + #[cfg(test)] + actor_stopped: state_receiver, } } - /// Returns a future permit in the form of a oneshot Receiver channel. + /// Returns one permit future for each provided split metadata. /// - /// At this point the permit is not acquired yet. - #[must_use] - pub fn get_permit(&self) -> oneshot::Receiver { - let mut permits_lock = self.inner_arc.lock().unwrap(); - permits_lock.get_permit(&self.inner_arc) - } - - /// Returns a list of future permits in the form of oneshot Receiver channels. + /// The permits returned are guaranteed to be resolved in order. In + /// addition, the permits are guaranteed to be resolved before permits + /// returned by subsequent calls to this function. /// - /// The permits returned are guaranteed to be resolved in order. - /// In addition, the permits are guaranteed to be resolved before permits returned by - /// subsequent calls to this function (or `get_permit`). - #[must_use] - pub fn get_permits(&self, num_permits: usize) -> Vec> { - let mut permits_lock = self.inner_arc.lock().unwrap(); - permits_lock.get_permits(num_permits, &self.inner_arc) + /// The permit memory size is capped by per_permit_initial_memory_allocation. + pub async fn get_permits( + &self, + splits: impl IntoIterator, + ) -> Vec { + let (permit_sender, permit_receiver) = oneshot::channel(); + let permit_sizes = splits.into_iter().map(|size| size.as_u64()).collect(); + self.message_sender + .send(SearchPermitMessage::Request { + permit_sender, + permit_sizes, + }) + .expect("Receiver lives longer than sender"); + permit_receiver + .await + .expect("Receiver lives longer than sender") } } -struct InnerSearchPermitProvider { - num_permits_available: usize, - permits_requests: VecDeque>, +struct SearchPermitActor { + msg_receiver: mpsc::UnboundedReceiver, + msg_sender: mpsc::WeakUnboundedSender, + num_warmup_slots_available: usize, + /// Note it is possible for memory_allocated to exceed memory_budget temporarily, + /// if and only if a split leaf search task ended up using more than `initial_allocation`. + /// When it happens, new permits will not be assigned until the memory is freed. + total_memory_budget: u64, + total_memory_allocated: u64, + permits_requests: VecDeque<(oneshot::Sender, u64)>, + #[cfg(test)] + stopped: watch::Sender, } -impl InnerSearchPermitProvider { - fn get_permit( - &mut self, - inner_arc: &Arc>, - ) -> oneshot::Receiver { - let (tx, rx) = oneshot::channel(); - self.permits_requests.push_back(tx); - self.assign_available_permits(inner_arc); - rx - } - - fn get_permits( - &mut self, - num_permits: usize, - inner_arc: &Arc>, - ) -> Vec> { - let mut permits = Vec::with_capacity(num_permits); - for _ in 0..num_permits { - let (tx, rx) = oneshot::channel(); - self.permits_requests.push_back(tx); - permits.push(rx); +impl SearchPermitActor { + async fn run(mut self) { + // Stops when the last clone of SearchPermitProvider is dropped. + while let Some(msg) = self.msg_receiver.recv().await { + self.handle_message(msg); + } + #[cfg(test)] + self.stopped.send(true).ok(); + } + + fn handle_message(&mut self, msg: SearchPermitMessage) { + match msg { + SearchPermitMessage::Request { + permit_sizes, + permit_sender, + } => { + let mut permits = Vec::with_capacity(permit_sizes.len()); + for permit_size in permit_sizes { + let (tx, rx) = oneshot::channel(); + self.permits_requests.push_back((tx, permit_size)); + permits.push(SearchPermitFuture(rx)); + } + self.assign_available_permits(); + permit_sender + .send(permits) + // This is a request response pattern, so we can safely ignore the error. + .expect("Receiver lives longer than sender"); + } + SearchPermitMessage::UpdateMemory { memory_delta } => { + if self.total_memory_allocated as i64 + memory_delta < 0 { + panic!("More memory released than allocated, should never happen.") + } + self.total_memory_allocated = + (self.total_memory_allocated as i64 + memory_delta) as u64; + self.assign_available_permits(); + } + SearchPermitMessage::FreeWarmupSlot => { + self.num_warmup_slots_available += 1; + self.assign_available_permits(); + } + SearchPermitMessage::Drop { + memory_size, + warmup_slot_freed, + } => { + if !warmup_slot_freed { + self.num_warmup_slots_available += 1; + } + self.total_memory_allocated = self + .total_memory_allocated + .checked_sub(memory_size) + .expect("More memory released than allocated, should never happen."); + self.assign_available_permits(); + } } - self.assign_available_permits(inner_arc); - permits } - fn recycle_permit(&mut self, inner_arc: &Arc>) { - self.num_permits_available += 1; - self.assign_available_permits(inner_arc); + fn pop_next_request_if_serviceable(&mut self) -> Option<(oneshot::Sender, u64)> { + if self.num_warmup_slots_available == 0 { + return None; + } + if let Some((_, next_permit_size)) = self.permits_requests.front() { + if self.total_memory_allocated + next_permit_size <= self.total_memory_budget { + return self.permits_requests.pop_front(); + } + } + None } - fn assign_available_permits(&mut self, inner_arc: &Arc>) { - while self.num_permits_available > 0 { - let Some(sender) = self.permits_requests.pop_front() else { - break; - }; + fn assign_available_permits(&mut self) { + while let Some((permit_requester_tx, next_permit_size)) = + self.pop_next_request_if_serviceable() + { let mut ongoing_gauge_guard = GaugeGuard::from_gauge( &crate::SEARCH_METRICS.leaf_search_single_split_tasks_ongoing, ); ongoing_gauge_guard.add(1); - let send_res = sender.send(SearchPermit { - _ongoing_gauge_guard: ongoing_gauge_guard, - inner_arc: inner_arc.clone(), - recycle_on_drop: true, - }); - match send_res { - Ok(()) => { - self.num_permits_available -= 1; - } - Err(search_permit) => { - search_permit.drop_without_recycling_permit(); - } - } + self.total_memory_allocated += next_permit_size; + self.num_warmup_slots_available -= 1; + permit_requester_tx + .send(SearchPermit { + _ongoing_gauge_guard: ongoing_gauge_guard, + msg_sender: self.msg_sender.clone(), + memory_allocation: next_permit_size, + warmup_slot_freed: false, + }) + // if the requester dropped its receiver, we drop the newly + // created SearchPermit which releases the resources + .ok(); } crate::SEARCH_METRICS .leaf_search_single_split_tasks_pending @@ -128,41 +240,93 @@ impl InnerSearchPermitProvider { } } +#[derive(Debug)] pub struct SearchPermit { _ongoing_gauge_guard: GaugeGuard<'static>, - inner_arc: Arc>, - recycle_on_drop: bool, + msg_sender: mpsc::WeakUnboundedSender, + memory_allocation: u64, + warmup_slot_freed: bool, } impl SearchPermit { - fn drop_without_recycling_permit(mut self) { - self.recycle_on_drop = false; - drop(self); + /// Update the memory usage attached to this permit. + /// + /// This will increase or decrease the available memory in the [`SearchPermitProvider`]. + pub fn update_memory_usage(&mut self, new_memory_usage: ByteSize) { + let new_usage_bytes = new_memory_usage.as_u64(); + let memory_delta = new_usage_bytes as i64 - self.memory_allocation as i64; + self.memory_allocation = new_usage_bytes; + self.send_if_still_running(SearchPermitMessage::UpdateMemory { memory_delta }); + } + + /// Drop the warmup permit, allowing more downloads to be started. Only one + /// slot is attached to each permit so calling this again has no effect. + pub fn free_warmup_slot(&mut self) { + if self.warmup_slot_freed { + return; + } + self.warmup_slot_freed = true; + self.send_if_still_running(SearchPermitMessage::FreeWarmupSlot); + } + + pub fn memory_allocation(&self) -> ByteSize { + ByteSize(self.memory_allocation) + } + + fn send_if_still_running(&self, msg: SearchPermitMessage) { + if let Some(sender) = self.msg_sender.upgrade() { + sender + .send(msg) + // Receiver instance in the event loop is never dropped or + // closed as long as there is a strong sender reference. + .expect("Receiver should live longer than sender"); + } } } impl Drop for SearchPermit { fn drop(&mut self) { - if !self.recycle_on_drop { - return; + self.send_if_still_running(SearchPermitMessage::Drop { + memory_size: self.memory_allocation, + warmup_slot_freed: self.warmup_slot_freed, + }); + } +} + +#[derive(Debug)] +pub struct SearchPermitFuture(oneshot::Receiver); + +impl Future for SearchPermitFuture { + type Output = SearchPermit; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let receiver = Pin::new(&mut self.get_mut().0); + match receiver.poll(cx) { + Poll::Ready(Ok(search_permit)) => Poll::Ready(search_permit), + Poll::Ready(Err(_)) => panic!("Failed to acquire permit. This should never happen! Please, report on https://github.com/quickwit-oss/quickwit/issues."), + Poll::Pending => Poll::Pending, } - let mut inner_guard = self.inner_arc.lock().unwrap(); - inner_guard.recycle_permit(&self.inner_arc.clone()); } } #[cfg(test)] mod tests { + use std::iter::repeat; + use std::time::Duration; + + use futures::StreamExt; + use rand::seq::SliceRandom; use tokio::task::JoinSet; use super::*; #[tokio::test] - async fn test_search_permits_get_permits_future() { - // We test here that `get_permits_futures` does not interleave - let search_permits = SearchPermitProvider::new(1); + async fn test_search_permit_order() { + let permit_provider = SearchPermitProvider::new(1, ByteSize::mb(100)); let mut all_futures = Vec::new(); - let first_batch_of_permits = search_permits.get_permits(10); + let first_batch_of_permits = permit_provider + .get_permits(repeat(ByteSize::mb(10)).take(10)) + .await; assert_eq!(first_batch_of_permits.len(), 10); all_futures.extend( first_batch_of_permits @@ -171,7 +335,9 @@ mod tests { .map(move |(i, fut)| ((1, i), fut)), ); - let second_batch_of_permits = search_permits.get_permits(10); + let second_batch_of_permits = permit_provider + .get_permits(repeat(ByteSize::mb(10)).take(10)) + .await; assert_eq!(second_batch_of_permits.len(), 10); all_futures.extend( second_batch_of_permits @@ -180,7 +346,6 @@ mod tests { .map(move |(i, fut)| ((2, i), fut)), ); - use rand::seq::SliceRandom; // not super useful, considering what join set does, but still a tiny bit more sound. all_futures.shuffle(&mut rand::thread_rng()); @@ -206,15 +371,110 @@ mod tests { } #[tokio::test] - async fn test_search_permits_receiver_race_condition() { - // Here we test that we don't have a problem if the Receiver is dropped. - // In particular, we want to check that there is not a race condition where drop attempts to - // lock the mutex. - let search_permits = SearchPermitProvider::new(1); - let permit_rx = search_permits.get_permit(); - let permit_rx2 = search_permits.get_permit(); - drop(permit_rx2); - drop(permit_rx); - let _permit_rx = search_permits.get_permit(); + async fn test_search_permit_early_drops() { + let permit_provider = SearchPermitProvider::new(1, ByteSize::mb(100)); + let permit_fut1 = permit_provider + .get_permits(vec![ByteSize::mb(10)]) + .await + .into_iter() + .next() + .unwrap(); + let permit_fut2 = permit_provider + .get_permits([ByteSize::mb(10)]) + .await + .into_iter() + .next() + .unwrap(); + drop(permit_fut1); + let permit = permit_fut2.await; + assert_eq!(permit.memory_allocation, ByteSize::mb(10).as_u64()); + assert_eq!(*permit_provider.actor_stopped.borrow(), false); + + let _permit_fut3 = permit_provider + .get_permits([ByteSize::mb(10)]) + .await + .into_iter() + .next() + .unwrap(); + let mut actor_stopped = permit_provider.actor_stopped.clone(); + drop(permit_provider); + { + actor_stopped.changed().await.unwrap(); + assert!(*actor_stopped.borrow()); + } + } + + /// Tries to wait for a permit + async fn try_get(permit_fut: SearchPermitFuture) -> anyhow::Result { + // using a short timeout is a bit flaky, but it should be enough for these tests + let permit = tokio::time::timeout(Duration::from_millis(20), permit_fut).await?; + Ok(permit) + } + + #[tokio::test] + async fn test_memory_budget() { + let permit_provider = SearchPermitProvider::new(100, ByteSize::mb(100)); + let mut permit_futs = permit_provider + .get_permits(repeat(ByteSize::mb(10)).take(14)) + .await; + let mut remaining_permit_futs = permit_futs.split_off(10).into_iter(); + assert_eq!(remaining_permit_futs.len(), 4); + // we should be able to obtain 10 permits right away (100MB / 10MB) + let mut permits: Vec = futures::stream::iter(permit_futs.into_iter()) + .buffered(1) + .collect() + .await; + // the next permit is blocked by the memory budget + let next_blocked_permit_fut = remaining_permit_futs.next().unwrap(); + try_get(next_blocked_permit_fut).await.unwrap_err(); + // if we drop one of the permits, we can get a new one + permits.drain(0..1); + let next_permit_fut = remaining_permit_futs.next().unwrap(); + let _new_permit = try_get(next_permit_fut).await.unwrap(); + // the next permit is blocked again by the memory budget + let next_blocked_permit_fut = remaining_permit_futs.next().unwrap(); + try_get(next_blocked_permit_fut).await.unwrap_err(); + // by setting a more accurate memory usage after a completed warmup, we can get more permits + permits[0].update_memory_usage(ByteSize::mb(4)); + permits[1].update_memory_usage(ByteSize::mb(6)); + let next_permit_fut = remaining_permit_futs.next().unwrap(); + try_get(next_permit_fut).await.unwrap(); + } + + #[tokio::test] + async fn test_warmup_slot() { + let permit_provider = SearchPermitProvider::new(10, ByteSize::mb(100)); + let mut permit_futs = permit_provider + .get_permits(repeat(ByteSize::mb(1)).take(16)) + .await; + let mut remaining_permit_futs = permit_futs.split_off(10).into_iter(); + assert_eq!(remaining_permit_futs.len(), 6); + // we should be able to obtain 10 permits right away + let mut permits: Vec = futures::stream::iter(permit_futs.into_iter()) + .buffered(1) + .collect() + .await; + // the next permit is blocked by the warmup slots + let next_blocked_permit_fut = remaining_permit_futs.next().unwrap(); + try_get(next_blocked_permit_fut).await.unwrap_err(); + // if we drop one of the permits, we can get a new one + permits.drain(0..1); + let next_permit_fut = remaining_permit_futs.next().unwrap(); + permits.push(try_get(next_permit_fut).await.unwrap()); + // the next permit is blocked again by the warmup slots + let next_blocked_permit_fut = remaining_permit_futs.next().unwrap(); + try_get(next_blocked_permit_fut).await.unwrap_err(); + // we can explicitly free the warmup slot on a permit + permits[0].free_warmup_slot(); + let next_permit_fut = remaining_permit_futs.next().unwrap(); + permits.push(try_get(next_permit_fut).await.unwrap()); + // dropping that same permit does not free up another slot + permits.drain(0..1); + let next_blocked_permit_fut = remaining_permit_futs.next().unwrap(); + try_get(next_blocked_permit_fut).await.unwrap_err(); + // but dropping a permit for which the slot wasn't explicitly free does free up a slot + permits.drain(0..1); + let next_blocked_permit_fut = remaining_permit_futs.next().unwrap(); + permits.push(try_get(next_blocked_permit_fut).await.unwrap()); } } diff --git a/quickwit/quickwit-search/src/search_stream/leaf.rs b/quickwit/quickwit-search/src/search_stream/leaf.rs index 941e0d12612..0659965b40d 100644 --- a/quickwit/quickwit-search/src/search_stream/leaf.rs +++ b/quickwit/quickwit-search/src/search_stream/leaf.rs @@ -29,7 +29,7 @@ use quickwit_proto::search::{ LeafSearchStreamResponse, OutputFormat, SearchRequest, SearchStreamRequest, SplitIdAndFooterOffsets, }; -use quickwit_storage::Storage; +use quickwit_storage::{ByteRangeCache, Storage}; use tantivy::columnar::{DynamicColumn, HasAssociatedColumnType}; use tantivy::fastfield::Column; use tantivy::query::Query; @@ -116,6 +116,7 @@ async fn leaf_search_stream_single_split( mut stream_request: SearchStreamRequest, storage: Arc, ) -> crate::Result { + // TODO: Should we track the memory here using the SearchPermitProvider? let _leaf_split_stream_permit = searcher_context .split_stream_semaphore .acquire() @@ -127,12 +128,14 @@ async fn leaf_search_stream_single_split( &split, ); - let index = open_index_with_caches( + let cache = + ByteRangeCache::with_infinite_capacity(&quickwit_storage::STORAGE_METRICS.shortlived_cache); + let (index, _) = open_index_with_caches( &searcher_context, storage, &split, Some(doc_mapper.tokenizer_manager()), - true, + Some(cache), ) .await?; let split_schema = index.schema(); diff --git a/quickwit/quickwit-search/src/service.rs b/quickwit/quickwit-search/src/service.rs index 0029f4dd3a7..d566463b42e 100644 --- a/quickwit/quickwit-search/src/service.rs +++ b/quickwit/quickwit-search/src/service.rs @@ -488,8 +488,10 @@ impl SearcherContext { capacity_in_bytes, &quickwit_storage::STORAGE_METRICS.split_footer_cache, ); - let leaf_search_split_semaphore = - SearchPermitProvider::new(searcher_config.max_num_concurrent_split_searches); + let leaf_search_split_semaphore = SearchPermitProvider::new( + searcher_config.max_num_concurrent_split_searches, + searcher_config.warmup_memory_budget, + ); let split_stream_semaphore = Semaphore::new(searcher_config.max_num_concurrent_split_streams); let fast_field_cache_capacity = searcher_config.fast_field_cache_capacity.as_u64() as usize; diff --git a/quickwit/quickwit-storage/src/cache/byte_range_cache.rs b/quickwit/quickwit-storage/src/cache/byte_range_cache.rs index 9ef3b7f523f..425e4f9a043 100644 --- a/quickwit/quickwit-storage/src/cache/byte_range_cache.rs +++ b/quickwit/quickwit-storage/src/cache/byte_range_cache.rs @@ -21,7 +21,8 @@ use std::borrow::{Borrow, Cow}; use std::collections::BTreeMap; use std::ops::Range; use std::path::{Path, PathBuf}; -use std::sync::Mutex; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::{Arc, Mutex}; use tantivy::directory::OwnedBytes; @@ -344,31 +345,54 @@ impl Drop for NeedMutByteRangeCache { /// cached data, the changes may or may not get recorded. /// /// At the moment this is hardly a cache as it features no eviction policy. +#[derive(Clone)] pub struct ByteRangeCache { - inner: Mutex>, + inner_arc: Arc, +} + +struct Inner { + num_stored_bytes: AtomicU64, + need_mut_byte_range_cache: Mutex>, } impl ByteRangeCache { /// Creates a slice cache that never removes any entry. pub fn with_infinite_capacity(cache_counters: &'static CacheMetrics) -> Self { + let need_mut_byte_range_cache = + NeedMutByteRangeCache::with_infinite_capacity(cache_counters); + let inner = Inner { + num_stored_bytes: AtomicU64::default(), + need_mut_byte_range_cache: Mutex::new(need_mut_byte_range_cache), + }; ByteRangeCache { - inner: Mutex::new(NeedMutByteRangeCache::with_infinite_capacity( - cache_counters, - )), + inner_arc: Arc::new(inner), } } + /// Overall amount of bytes stored in the cache. + pub fn get_num_bytes(&self) -> u64 { + self.inner_arc.num_stored_bytes.load(Ordering::Relaxed) + } + /// If available, returns the cached view of the slice. pub fn get_slice(&self, path: &Path, byte_range: Range) -> Option { - self.inner.lock().unwrap().get_slice(path, byte_range) + self.inner_arc + .need_mut_byte_range_cache + .lock() + .unwrap() + .get_slice(path, byte_range) } /// Put the given amount of data in the cache. pub fn put_slice(&self, path: PathBuf, byte_range: Range, bytes: OwnedBytes) { - self.inner - .lock() - .unwrap() - .put_slice(path, byte_range, bytes) + let mut need_mut_byte_range_cache_locked = + self.inner_arc.need_mut_byte_range_cache.lock().unwrap(); + need_mut_byte_range_cache_locked.put_slice(path, byte_range, bytes); + let num_bytes = need_mut_byte_range_cache_locked.num_bytes; + drop(need_mut_byte_range_cache_locked); + self.inner_arc + .num_stored_bytes + .store(num_bytes, Ordering::Relaxed); } } @@ -446,13 +470,13 @@ mod tests { .sum(); // in some case we have ranges touching each other, count_items count them // as only one, but cache count them as 2. - assert!(cache.inner.lock().unwrap().num_items >= expected_item_count as u64); + assert!(cache.inner_arc.need_mut_byte_range_cache.lock().unwrap().num_items >= expected_item_count as u64); let expected_byte_count = state.values() .flatten() .filter(|stored| **stored) .count(); - assert_eq!(cache.inner.lock().unwrap().num_bytes, expected_byte_count as u64); + assert_eq!(cache.inner_arc.need_mut_byte_range_cache.lock().unwrap().num_bytes, expected_byte_count as u64); } Operation::Get { range, @@ -519,7 +543,7 @@ mod tests { ); { - let mutable_cache = cache.inner.lock().unwrap(); + let mutable_cache = cache.inner_arc.need_mut_byte_range_cache.lock().unwrap(); assert_eq!(mutable_cache.cache.len(), 4); assert_eq!(mutable_cache.num_items, 4); assert_eq!(mutable_cache.cache_counters.in_cache_count.get(), 4); @@ -531,7 +555,7 @@ mod tests { { // now they should've been merged, except the last one - let mutable_cache = cache.inner.lock().unwrap(); + let mutable_cache = cache.inner_arc.need_mut_byte_range_cache.lock().unwrap(); assert_eq!(mutable_cache.cache.len(), 2); assert_eq!(mutable_cache.num_items, 2); assert_eq!(mutable_cache.cache_counters.in_cache_count.get(), 2);