diff --git a/quickwit/quickwit-config/src/node_config/mod.rs b/quickwit/quickwit-config/src/node_config/mod.rs index 3eef1f10428..bd855ec0395 100644 --- a/quickwit/quickwit-config/src/node_config/mod.rs +++ b/quickwit/quickwit-config/src/node_config/mod.rs @@ -226,6 +226,8 @@ pub struct SearcherConfig { #[serde(default)] #[serde(skip_serializing_if = "Option::is_none")] pub storage_timeout_policy: Option, + pub warmup_memory_budget: ByteSize, + pub warmup_single_split_initial_allocation: ByteSize, } /// Configuration controlling how fast a searcher should timeout a `get_slice` @@ -263,7 +265,7 @@ impl StorageTimeoutPolicy { impl Default for SearcherConfig { fn default() -> Self { - Self { + SearcherConfig { fast_field_cache_capacity: ByteSize::gb(1), split_footer_cache_capacity: ByteSize::mb(500), partial_request_cache_capacity: ByteSize::mb(64), @@ -274,6 +276,8 @@ impl Default for SearcherConfig { split_cache: None, request_timeout_secs: Self::default_request_timeout_secs(), storage_timeout_policy: None, + warmup_memory_budget: ByteSize::gb(1), + warmup_single_split_initial_allocation: ByteSize::mb(50), } } } @@ -308,6 +312,14 @@ impl SearcherConfig { split_cache_limits.max_file_descriptors ); } + if self.warmup_single_split_initial_allocation > self.warmup_memory_budget { + anyhow::bail!( + "warmup_single_split_initial_allocation ({}) must be lower or equal to \ + warmup_memory_budget ({})", + self.warmup_single_split_initial_allocation, + self.warmup_memory_budget + ); + } } Ok(()) } diff --git a/quickwit/quickwit-config/src/node_config/serialize.rs b/quickwit/quickwit-config/src/node_config/serialize.rs index 8a1337636cf..457cf5c5fae 100644 --- a/quickwit/quickwit-config/src/node_config/serialize.rs +++ b/quickwit/quickwit-config/src/node_config/serialize.rs @@ -616,7 +616,9 @@ mod tests { min_throughtput_bytes_per_secs: 100_000, timeout_millis: 2_000, max_num_retries: 2 - }) + }), + warmup_memory_budget: ByteSize::gb(1), + warmup_single_split_initial_allocation: ByteSize::mb(50), } ); assert_eq!( diff --git a/quickwit/quickwit-directories/src/caching_directory.rs b/quickwit/quickwit-directories/src/caching_directory.rs index b90f444d062..5ee8b63777a 100644 --- a/quickwit/quickwit-directories/src/caching_directory.rs +++ b/quickwit/quickwit-directories/src/caching_directory.rs @@ -30,40 +30,44 @@ use tantivy::{Directory, HasLen}; /// The caching directory is a simple cache that wraps another directory. #[derive(Clone)] -pub struct CachingDirectory { +pub struct CachingDirectory { underlying: Arc, - // TODO fixme: that's a pretty ugly cache we have here. - cache: Arc, + cache: C, } -impl CachingDirectory { - /// Creates a new CachingDirectory. +impl CachingDirectory> { + /// Creates a new CachingDirectory with a default cache. /// /// Warming: The resulting CacheDirectory will cache all information without ever /// removing any item from the cache. - pub fn new_unbounded(underlying: Arc) -> CachingDirectory { - CachingDirectory { - underlying, - cache: Arc::new(ByteRangeCache::with_infinite_capacity( - &quickwit_storage::STORAGE_METRICS.shortlived_cache, - )), - } + pub fn new_unbounded(underlying: Arc) -> CachingDirectory> { + let byte_range_cache = ByteRangeCache::with_infinite_capacity( + &quickwit_storage::STORAGE_METRICS.shortlived_cache, + ); + CachingDirectory::new(underlying, Arc::new(byte_range_cache)) + } +} + +impl CachingDirectory { + /// Creates a new CachingDirectory with an existing cache. + pub fn new(underlying: Arc, cache: C) -> CachingDirectory { + CachingDirectory { underlying, cache } } } -impl fmt::Debug for CachingDirectory { +impl fmt::Debug for CachingDirectory { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { write!(f, "CachingDirectory({:?})", self.underlying) } } -struct CachingFileHandle { +struct CachingFileHandle { path: PathBuf, - cache: Arc, + cache: C, underlying_filehandle: Arc, } -impl fmt::Debug for CachingFileHandle { +impl fmt::Debug for CachingFileHandle { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { write!( f, @@ -75,7 +79,7 @@ impl fmt::Debug for CachingFileHandle { } #[async_trait] -impl FileHandle for CachingFileHandle { +impl FileHandle for CachingFileHandle { fn read_bytes(&self, byte_range: Range) -> io::Result { if let Some(bytes) = self.cache.get_slice(&self.path, byte_range.clone()) { return Ok(bytes); @@ -100,13 +104,13 @@ impl FileHandle for CachingFileHandle { } } -impl HasLen for CachingFileHandle { +impl HasLen for CachingFileHandle { fn len(&self) -> usize { self.underlying_filehandle.len() } } -impl Directory for CachingDirectory { +impl Directory for CachingDirectory { fn exists(&self, path: &Path) -> std::result::Result { self.underlying.exists(path) } @@ -136,6 +140,25 @@ impl Directory for CachingDirectory { crate::read_only_directory!(); } +/// A byte range cache that to be used in front of the directory. +pub trait DirectoryCache: Clone + Send + Sync + 'static { + /// If available, returns the cached view of the slice. + fn get_slice(&self, path: &Path, byte_range: Range) -> Option; + + /// Put the given amount of data in the cache. + fn put_slice(&self, path: PathBuf, byte_range: Range, bytes: OwnedBytes); +} + +impl DirectoryCache for Arc { + fn get_slice(&self, path: &Path, byte_range: Range) -> Option { + ByteRangeCache::get_slice(self, path, byte_range) + } + + fn put_slice(&self, path: PathBuf, byte_range: Range, bytes: OwnedBytes) { + ByteRangeCache::put_slice(self, path, byte_range, bytes) + } +} + #[cfg(test)] mod tests { diff --git a/quickwit/quickwit-directories/src/lib.rs b/quickwit/quickwit-directories/src/lib.rs index 4df4f2799ec..d8cd9e02fa3 100644 --- a/quickwit/quickwit-directories/src/lib.rs +++ b/quickwit/quickwit-directories/src/lib.rs @@ -37,7 +37,7 @@ mod storage_directory; mod union_directory; pub use self::bundle_directory::{get_hotcache_from_split, read_split_footer, BundleDirectory}; -pub use self::caching_directory::CachingDirectory; +pub use self::caching_directory::{CachingDirectory, DirectoryCache}; pub use self::debug_proxy_directory::{DebugProxyDirectory, ReadOperation}; pub use self::hot_directory::{write_hotcache, HotDirectory}; pub use self::storage_directory::StorageDirectory; diff --git a/quickwit/quickwit-proto/protos/quickwit/search.proto b/quickwit/quickwit-proto/protos/quickwit/search.proto index 60671239ecc..1213ce2040e 100644 --- a/quickwit/quickwit-proto/protos/quickwit/search.proto +++ b/quickwit/quickwit-proto/protos/quickwit/search.proto @@ -347,6 +347,14 @@ message LeafSearchRequest { repeated string index_uris = 9; } +message ResourceStats { + uint64 short_lived_cache_num_bytes = 1; + uint64 split_num_docs = 2; + uint64 warmup_microsecs = 3; + uint64 cpu_thread_pool_wait_microsecs = 4; + uint64 cpu_microsecs = 5; +} + /// LeafRequestRef references data in LeafSearchRequest to deduplicate data. message LeafRequestRef { // The ordinal of the doc_mapper in `LeafSearchRequest.doc_mappers` @@ -479,6 +487,8 @@ message LeafSearchResponse { // postcard serialized intermediate aggregation_result. optional bytes intermediate_aggregation_result = 6; + + ResourceStats resource_stats = 8; } message SnippetRequest { diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs index 3fc4d5bdcaa..e29cae37fec 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs @@ -286,6 +286,21 @@ pub struct LeafSearchRequest { #[prost(string, repeated, tag = "9")] pub index_uris: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, } +#[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ResourceStats { + #[prost(uint64, tag = "1")] + pub short_lived_cache_num_bytes: u64, + #[prost(uint64, tag = "2")] + pub split_num_docs: u64, + #[prost(uint64, tag = "3")] + pub warmup_microsecs: u64, + #[prost(uint64, tag = "4")] + pub cpu_thread_pool_wait_microsecs: u64, + #[prost(uint64, tag = "5")] + pub cpu_microsecs: u64, +} /// / LeafRequestRef references data in LeafSearchRequest to deduplicate data. #[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] #[allow(clippy::derive_partial_eq_without_eq)] @@ -457,6 +472,8 @@ pub struct LeafSearchResponse { pub intermediate_aggregation_result: ::core::option::Option< ::prost::alloc::vec::Vec, >, + #[prost(message, optional, tag = "8")] + pub resource_stats: ::core::option::Option, } #[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] #[allow(clippy::derive_partial_eq_without_eq)] diff --git a/quickwit/quickwit-search/src/cluster_client.rs b/quickwit/quickwit-search/src/cluster_client.rs index b8042f03fb7..8f2b08487fc 100644 --- a/quickwit/quickwit-search/src/cluster_client.rs +++ b/quickwit/quickwit-search/src/cluster_client.rs @@ -36,7 +36,7 @@ use tracing::{debug, error, info, warn}; use crate::retry::search::LeafSearchRetryPolicy; use crate::retry::search_stream::{LeafSearchStreamRetryPolicy, SuccessfulSplitIds}; use crate::retry::{retry_client, DefaultRetryPolicy, RetryPolicy}; -use crate::{SearchError, SearchJobPlacer, SearchServiceClient}; +use crate::{merge_resource_stats_it, SearchError, SearchJobPlacer, SearchServiceClient}; /// Maximum number of put requests emitted to perform a replicated given PUT KV. const MAX_PUT_KV_ATTEMPTS: usize = 6; @@ -317,6 +317,10 @@ fn merge_original_with_retry_leaf_search_response( (Some(left), None) => Some(left), (None, None) => None, }; + let resource_stats = merge_resource_stats_it([ + &original_response.resource_stats, + &retry_response.resource_stats, + ]); Ok(LeafSearchResponse { intermediate_aggregation_result, num_hits: original_response.num_hits + retry_response.num_hits, @@ -326,6 +330,7 @@ fn merge_original_with_retry_leaf_search_response( partial_hits: original_response.partial_hits, num_successful_splits: original_response.num_successful_splits + retry_response.num_successful_splits, + resource_stats, }) } diff --git a/quickwit/quickwit-search/src/collector.rs b/quickwit/quickwit-search/src/collector.rs index 4b69348ecde..67beb8090cb 100644 --- a/quickwit/quickwit-search/src/collector.rs +++ b/quickwit/quickwit-search/src/collector.rs @@ -25,8 +25,8 @@ use itertools::Itertools; use quickwit_common::binary_heap::{SortKeyMapper, TopK}; use quickwit_doc_mapper::WarmupInfo; use quickwit_proto::search::{ - LeafSearchResponse, PartialHit, SearchRequest, SortByValue, SortOrder, SortValue, - SplitSearchError, + LeafSearchResponse, PartialHit, ResourceStats, SearchRequest, SortByValue, SortOrder, + SortValue, SplitSearchError, }; use quickwit_proto::types::SplitId; use serde::Deserialize; @@ -40,7 +40,7 @@ use tantivy::{DateTime, DocId, Score, SegmentOrdinal, SegmentReader, TantivyErro use crate::find_trace_ids_collector::{FindTraceIdsCollector, FindTraceIdsSegmentCollector, Span}; use crate::top_k_collector::{specialized_top_k_segment_collector, QuickwitSegmentTopKCollector}; -use crate::GlobalDocAddress; +use crate::{merge_resource_stats, merge_resource_stats_it, GlobalDocAddress}; #[derive(Clone, Debug)] pub(crate) enum SortByComponent { @@ -587,6 +587,7 @@ impl SegmentCollector for QuickwitSegmentCollector { } None => None, }; + Ok(LeafSearchResponse { intermediate_aggregation_result, num_hits: self.num_hits, @@ -594,6 +595,7 @@ impl SegmentCollector for QuickwitSegmentCollector { failed_splits: Vec::new(), num_attempted_splits: 1, num_successful_splits: 1, + resource_stats: None, }) } } @@ -919,6 +921,11 @@ fn merge_leaf_responses( return Ok(leaf_responses.pop().unwrap()); } + let resource_stats_it = leaf_responses + .iter() + .map(|leaf_response| &leaf_response.resource_stats); + let merged_resource_stats = merge_resource_stats_it(resource_stats_it); + let merged_intermediate_aggregation_result: Option> = merge_intermediate_aggregation_result( aggregations_opt, @@ -960,6 +967,7 @@ fn merge_leaf_responses( failed_splits, num_attempted_splits, num_successful_splits, + resource_stats: merged_resource_stats, }) } @@ -1183,6 +1191,7 @@ pub(crate) struct IncrementalCollector { num_attempted_splits: u64, num_successful_splits: u64, start_offset: usize, + resource_stats: Option, } impl IncrementalCollector { @@ -1203,6 +1212,7 @@ impl IncrementalCollector { failed_splits: Vec::new(), num_attempted_splits: 0, num_successful_splits: 0, + resource_stats: None, } } @@ -1215,8 +1225,11 @@ impl IncrementalCollector { num_attempted_splits, intermediate_aggregation_result, num_successful_splits, + resource_stats, } = leaf_response; + merge_resource_stats(&resource_stats, &mut self.resource_stats); + self.num_hits += num_hits; self.top_k_hits.add_entries(partial_hits.into_iter()); self.failed_splits.extend(failed_splits); @@ -1266,6 +1279,7 @@ impl IncrementalCollector { num_attempted_splits: self.num_attempted_splits, num_successful_splits: self.num_successful_splits, intermediate_aggregation_result, + resource_stats: self.resource_stats, }) } } @@ -1275,8 +1289,8 @@ mod tests { use std::cmp::Ordering; use quickwit_proto::search::{ - LeafSearchResponse, PartialHit, SearchRequest, SortByValue, SortField, SortOrder, - SortValue, SplitSearchError, + LeafSearchResponse, PartialHit, ResourceStats, SearchRequest, SortByValue, SortField, + SortOrder, SortValue, SplitSearchError, }; use tantivy::collector::Collector; use tantivy::TantivyDocument; @@ -1772,6 +1786,7 @@ mod tests { num_attempted_splits: 3, num_successful_splits: 3, intermediate_aggregation_result: None, + resource_stats: None, }], ); @@ -1789,7 +1804,8 @@ mod tests { failed_splits: Vec::new(), num_attempted_splits: 3, num_successful_splits: 3, - intermediate_aggregation_result: None + intermediate_aggregation_result: None, + resource_stats: None, } ); @@ -1828,6 +1844,7 @@ mod tests { num_attempted_splits: 3, num_successful_splits: 3, intermediate_aggregation_result: None, + resource_stats: None, }, LeafSearchResponse { num_hits: 10, @@ -1846,6 +1863,7 @@ mod tests { num_attempted_splits: 2, num_successful_splits: 1, intermediate_aggregation_result: None, + resource_stats: None, }, ], ); @@ -1877,7 +1895,8 @@ mod tests { }], num_attempted_splits: 5, num_successful_splits: 4, - intermediate_aggregation_result: None + intermediate_aggregation_result: None, + resource_stats: None, } ); @@ -1917,6 +1936,10 @@ mod tests { num_attempted_splits: 3, num_successful_splits: 3, intermediate_aggregation_result: None, + resource_stats: Some(ResourceStats { + cpu_microsecs: 100, + ..Default::default() + }), }, LeafSearchResponse { num_hits: 10, @@ -1935,6 +1958,10 @@ mod tests { num_attempted_splits: 2, num_successful_splits: 1, intermediate_aggregation_result: None, + resource_stats: Some(ResourceStats { + cpu_microsecs: 50, + ..Default::default() + }), }, ], ); @@ -1966,7 +1993,11 @@ mod tests { }], num_attempted_splits: 5, num_successful_splits: 4, - intermediate_aggregation_result: None + intermediate_aggregation_result: None, + resource_stats: Some(ResourceStats { + cpu_microsecs: 150, + ..Default::default() + }), } ); // TODO would be nice to test aggregation too. diff --git a/quickwit/quickwit-search/src/fetch_docs.rs b/quickwit/quickwit-search/src/fetch_docs.rs index 9c326764539..9240d5399ae 100644 --- a/quickwit/quickwit-search/src/fetch_docs.rs +++ b/quickwit/quickwit-search/src/fetch_docs.rs @@ -27,7 +27,7 @@ use quickwit_doc_mapper::DocMapper; use quickwit_proto::search::{ FetchDocsResponse, PartialHit, SnippetRequest, SplitIdAndFooterOffsets, }; -use quickwit_storage::Storage; +use quickwit_storage::{ByteRangeCache, Storage}; use tantivy::query::Query; use tantivy::schema::document::CompactDocValue; use tantivy::schema::{Document as DocumentTrait, Field, TantivyDocument, Value}; @@ -179,7 +179,7 @@ async fn fetch_docs_in_split( index_storage, split, Some(doc_mapper.tokenizer_manager()), - false, + Option::>::None, ) .await .context("open-index-for-split")?; diff --git a/quickwit/quickwit-search/src/leaf.rs b/quickwit/quickwit-search/src/leaf.rs index 5ad92f63aa2..9d67445f174 100644 --- a/quickwit/quickwit-search/src/leaf.rs +++ b/quickwit/quickwit-search/src/leaf.rs @@ -22,28 +22,29 @@ use std::ops::Bound; use std::path::PathBuf; use std::str::FromStr; use std::sync::{Arc, Mutex, RwLock}; +use std::time::{Duration, Instant}; use anyhow::Context; use futures::future::try_join_all; use quickwit_common::pretty::PrettySample; -use quickwit_directories::{CachingDirectory, HotDirectory, StorageDirectory}; +use quickwit_directories::{CachingDirectory, DirectoryCache, HotDirectory, StorageDirectory}; use quickwit_doc_mapper::{DocMapper, TermRange, WarmupInfo}; use quickwit_proto::search::{ - CountHits, LeafSearchRequest, LeafSearchResponse, PartialHit, SearchRequest, SortOrder, - SortValue, SplitIdAndFooterOffsets, SplitSearchError, + CountHits, LeafSearchRequest, LeafSearchResponse, PartialHit, ResourceStats, SearchRequest, + SortOrder, SortValue, SplitIdAndFooterOffsets, SplitSearchError, }; use quickwit_query::query_ast::{BoolQuery, QueryAst, QueryAstTransformer, RangeQuery, TermQuery}; use quickwit_query::tokenizers::TokenizerManager; use quickwit_storage::{ - wrap_storage_with_cache, BundleStorage, MemorySizedCache, OwnedBytes, SplitCache, Storage, - StorageResolver, TimeoutAndRetryStorage, + wrap_storage_with_cache, BundleStorage, ByteRangeCache, MemorySizedCache, OwnedBytes, + SplitCache, Storage, StorageResolver, TimeoutAndRetryStorage, }; use tantivy::aggregation::agg_req::{AggregationVariants, Aggregations}; use tantivy::aggregation::AggregationLimitsGuard; use tantivy::directory::FileSlice; use tantivy::fastfield::FastFieldReaders; use tantivy::schema::Field; -use tantivy::{DateTime, Index, ReloadPolicy, Searcher, Term}; +use tantivy::{DateTime, Index, ReloadPolicy, Searcher, TantivyError, Term}; use tokio::task::JoinError; use tracing::*; @@ -52,6 +53,7 @@ use crate::metrics::SEARCH_METRICS; use crate::root::is_metadata_count_request_with_ast; use crate::search_permit_provider::SearchPermit; use crate::service::{deserialize_doc_mapper, SearcherContext}; +use crate::tracked_cache::TrackedByteRangeCache; use crate::{QuickwitAggregations, SearchError}; #[instrument(skip_all)] @@ -127,14 +129,18 @@ pub(crate) async fn open_split_bundle( /// Opens a `tantivy::Index` for the given split with several cache layers: /// - A split footer cache given by `SearcherContext.split_footer_cache`. /// - A fast fields cache given by `SearcherContext.storage_long_term_cache`. -/// - An ephemeral unbounded cache directory whose lifetime is tied to the returned `Index`. +/// - An ephemeral unbounded cache directory whose lifetime is tied to the +/// returned `Index`. +/// +/// TODO: generic T should be forced to SearcherPermit, but this requires the +/// search stream to also request permits. #[instrument(skip_all, fields(split_footer_start=split_and_footer_offsets.split_footer_start, split_footer_end=split_and_footer_offsets.split_footer_end))] -pub(crate) async fn open_index_with_caches( +pub(crate) async fn open_index_with_caches( searcher_context: &SearcherContext, index_storage: Arc, split_and_footer_offsets: &SplitIdAndFooterOffsets, tokenizer_manager: Option<&TokenizerManager>, - ephemeral_unbounded_cache: bool, + ephemeral_unbounded_cache: Option, ) -> anyhow::Result { // Let's add a storage proxy to retry `get_slice` requests if they are taking too long, // if configured in the searcher config. @@ -166,8 +172,8 @@ pub(crate) async fn open_index_with_caches( let directory = StorageDirectory::new(bundle_storage_with_cache); - let hot_directory = if ephemeral_unbounded_cache { - let caching_directory = CachingDirectory::new_unbounded(Arc::new(directory)); + let hot_directory = if let Some(cache) = ephemeral_unbounded_cache { + let caching_directory = CachingDirectory::new(Arc::new(directory), cache); HotDirectory::open(caching_directory, hotcache_bytes.read_bytes()?)? } else { HotDirectory::open(directory, hotcache_bytes.read_bytes()?)? @@ -363,10 +369,12 @@ fn get_leaf_resp_from_count(count: u64) -> LeafSearchResponse { num_attempted_splits: 1, num_successful_splits: 1, intermediate_aggregation_result: None, + resource_stats: None, } } /// Apply a leaf search on a single split. +#[allow(clippy::too_many_arguments)] async fn leaf_search_single_split( searcher_context: &SearcherContext, mut search_request: SearchRequest, @@ -375,6 +383,7 @@ async fn leaf_search_single_split( doc_mapper: Arc, split_filter: Arc>, aggregations_limits: AggregationLimitsGuard, + search_permit: SearchPermit, ) -> crate::Result { rewrite_request( &mut search_request, @@ -400,12 +409,15 @@ async fn leaf_search_single_split( } let split_id = split.split_id.to_string(); + let byte_range_cache = + ByteRangeCache::with_infinite_capacity(&quickwit_storage::STORAGE_METRICS.shortlived_cache); + let tracked_cache = TrackedByteRangeCache::new(byte_range_cache, search_permit); let index = open_index_with_caches( searcher_context, storage, &split, Some(doc_mapper.tokenizer_manager()), - true, + Some(tracked_cache.clone()), ) .await?; let split_schema = index.schema(); @@ -425,7 +437,14 @@ async fn leaf_search_single_split( warmup_info.merge(collector_warmup_info); warmup_info.simplify(); + let warmup_start = Instant::now(); warmup(&searcher, &warmup_info).await?; + let warmup_end = Instant::now(); + let warmup_duration: Duration = warmup_end.duration_since(warmup_start); + tracked_cache.warmup_completed(); + let short_lived_cache_num_bytes = tracked_cache.get_num_bytes(); + let split_num_docs = split.num_docs; + let span = info_span!("tantivy_search"); let (search_request, leaf_search_response) = { @@ -433,25 +452,31 @@ async fn leaf_search_single_split( crate::search_thread_pool() .run_cpu_intensive(move || { + let cpu_start = Instant::now(); + let cpu_thread_pool_wait_microsecs = cpu_start.duration_since(warmup_end); let _span_guard = span.enter(); // Our search execution has been scheduled, let's check if we can improve the // request based on the results of the preceding searches check_optimize_search_request(&mut search_request, &split, &split_filter); collector.update_search_param(&search_request); - if is_metadata_count_request_with_ast(&query_ast, &search_request) { - return Ok(( - search_request, - get_leaf_resp_from_count(searcher.num_docs() as u64), - )); - } - if collector.is_count_only() { - let count = query.count(&searcher)? as u64; - Ok((search_request, get_leaf_resp_from_count(count))) - } else { - searcher - .search(&query, &collector) - .map(|resp| (search_request, resp)) - } + let mut leaf_search_response: LeafSearchResponse = + if is_metadata_count_request_with_ast(&query_ast, &search_request) { + get_leaf_resp_from_count(searcher.num_docs() as u64) + } else if collector.is_count_only() { + let count = query.count(&searcher)? as u64; + get_leaf_resp_from_count(count) + } else { + searcher.search(&query, &collector)? + }; + leaf_search_response.resource_stats = Some(ResourceStats { + cpu_microsecs: cpu_start.elapsed().as_micros() as u64, + short_lived_cache_num_bytes, + split_num_docs, + warmup_microsecs: warmup_duration.as_micros() as u64, + cpu_thread_pool_wait_microsecs: cpu_thread_pool_wait_microsecs.as_micros() + as u64, + }); + Result::<_, TantivyError>::Ok((search_request, leaf_search_response)) }) .await .map_err(|_| { @@ -1263,15 +1288,15 @@ pub async fn leaf_search( // do no interleave with other leaf search requests. let permit_futures = searcher_context .search_permit_provider - .get_permits(split_with_req.len()); + .get_permits(split_with_req.len()) + .await; for ((split, mut request), permit_fut) in split_with_req.into_iter().zip(permit_futures.into_iter()) { let leaf_split_search_permit = permit_fut .instrument(info_span!("waiting_for_leaf_search_split_semaphore")) - .await - .expect("Failed to acquire permit. This should never happen! Please, report on https://github.com/quickwit-oss/quickwit/issues."); + .await; let can_be_better = check_optimize_search_request(&mut request, &split, &split_filter); if !can_be_better && !run_all_splits { @@ -1376,12 +1401,10 @@ async fn leaf_search_single_split_wrapper( doc_mapper, split_filter.clone(), aggregations_limits, + search_permit, ) .await; - // We explicitly drop it, to highlight it to the reader - std::mem::drop(search_permit); - if leaf_search_single_split_res.is_ok() { timer.observe_duration(); } diff --git a/quickwit/quickwit-search/src/leaf_cache.rs b/quickwit/quickwit-search/src/leaf_cache.rs index 491f66f3aee..016cdd5b00f 100644 --- a/quickwit/quickwit-search/src/leaf_cache.rs +++ b/quickwit/quickwit-search/src/leaf_cache.rs @@ -192,7 +192,8 @@ impl std::ops::RangeBounds for Range { #[cfg(test)] mod tests { use quickwit_proto::search::{ - LeafSearchResponse, PartialHit, SearchRequest, SortValue, SplitIdAndFooterOffsets, + LeafSearchResponse, PartialHit, ResourceStats, SearchRequest, SortValue, + SplitIdAndFooterOffsets, }; use super::LeafSearchCache; @@ -252,6 +253,7 @@ mod tests { sort_value2: None, split_id: "split_1".to_string(), }], + resource_stats: None, }; assert!(cache.get(split_1.clone(), query_1.clone()).is_none()); @@ -342,6 +344,7 @@ mod tests { sort_value2: None, split_id: "split_1".to_string(), }], + resource_stats: Some(ResourceStats::default()), }; // for split_1, 1 and 1bis cover different timestamp ranges diff --git a/quickwit/quickwit-search/src/lib.rs b/quickwit/quickwit-search/src/lib.rs index a81a974d75d..c9031865c52 100644 --- a/quickwit/quickwit-search/src/lib.rs +++ b/quickwit/quickwit-search/src/lib.rs @@ -42,6 +42,7 @@ mod search_response_rest; mod search_stream; mod service; pub(crate) mod top_k_collector; +mod tracked_cache; mod metrics; mod search_permit_provider; @@ -72,7 +73,9 @@ use quickwit_metastore::{ IndexMetadata, ListIndexesMetadataResponseExt, ListSplitsQuery, ListSplitsRequestExt, MetastoreServiceStreamSplitsExt, SplitMetadata, SplitState, }; -use quickwit_proto::search::{PartialHit, SearchRequest, SearchResponse, SplitIdAndFooterOffsets}; +use quickwit_proto::search::{ + PartialHit, ResourceStats, SearchRequest, SearchResponse, SplitIdAndFooterOffsets, +}; use quickwit_proto::types::IndexUid; use quickwit_storage::StorageResolver; pub use service::SearcherContext; @@ -340,3 +343,126 @@ pub fn searcher_pool_for_test( }), ) } + +pub(crate) fn merge_resource_stats_it<'a>( + stats_it: impl IntoIterator>, +) -> Option { + let mut acc_stats: Option = None; + for new_stats in stats_it { + merge_resource_stats(new_stats, &mut acc_stats); + } + acc_stats +} + +fn merge_resource_stats( + new_stats_opt: &Option, + stat_accs_opt: &mut Option, +) { + if let Some(new_stats) = new_stats_opt { + if let Some(stat_accs) = stat_accs_opt { + stat_accs.short_lived_cache_num_bytes += new_stats.short_lived_cache_num_bytes; + stat_accs.split_num_docs += new_stats.split_num_docs; + stat_accs.warmup_microsecs += new_stats.warmup_microsecs; + stat_accs.cpu_thread_pool_wait_microsecs += new_stats.cpu_thread_pool_wait_microsecs; + stat_accs.cpu_microsecs += new_stats.cpu_microsecs; + } else { + *stat_accs_opt = Some(new_stats.clone()); + } + } +} +#[cfg(test)] +mod stats_merge_tests { + use super::*; + + #[test] + fn test_merge_resource_stats() { + let mut acc_stats = None; + + merge_resource_stats(&None, &mut acc_stats); + + assert_eq!(acc_stats, None); + + let stats = Some(ResourceStats { + short_lived_cache_num_bytes: 100, + split_num_docs: 200, + warmup_microsecs: 300, + cpu_thread_pool_wait_microsecs: 400, + cpu_microsecs: 500, + }); + + merge_resource_stats(&stats, &mut acc_stats); + + assert_eq!(acc_stats, stats); + + let new_stats = Some(ResourceStats { + short_lived_cache_num_bytes: 50, + split_num_docs: 100, + warmup_microsecs: 150, + cpu_thread_pool_wait_microsecs: 200, + cpu_microsecs: 250, + }); + + merge_resource_stats(&new_stats, &mut acc_stats); + + let stats_plus_new_stats = Some(ResourceStats { + short_lived_cache_num_bytes: 150, + split_num_docs: 300, + warmup_microsecs: 450, + cpu_thread_pool_wait_microsecs: 600, + cpu_microsecs: 750, + }); + + assert_eq!(acc_stats, stats_plus_new_stats); + + merge_resource_stats(&None, &mut acc_stats); + + assert_eq!(acc_stats, stats_plus_new_stats); + } + + #[test] + fn test_merge_resource_stats_it() { + let merged_stats = merge_resource_stats_it(Vec::<&Option>::new()); + assert_eq!(merged_stats, None); + + let stats1 = Some(ResourceStats { + short_lived_cache_num_bytes: 100, + split_num_docs: 200, + warmup_microsecs: 300, + cpu_thread_pool_wait_microsecs: 400, + cpu_microsecs: 500, + }); + + let merged_stats = merge_resource_stats_it(vec![&None, &stats1, &None]); + + assert_eq!(merged_stats, stats1); + + let stats2 = Some(ResourceStats { + short_lived_cache_num_bytes: 50, + split_num_docs: 100, + warmup_microsecs: 150, + cpu_thread_pool_wait_microsecs: 200, + cpu_microsecs: 250, + }); + + let stats3 = Some(ResourceStats { + short_lived_cache_num_bytes: 25, + split_num_docs: 50, + warmup_microsecs: 75, + cpu_thread_pool_wait_microsecs: 100, + cpu_microsecs: 125, + }); + + let merged_stats = merge_resource_stats_it(vec![&stats1, &stats2, &stats3]); + + assert_eq!( + merged_stats, + Some(ResourceStats { + short_lived_cache_num_bytes: 175, + split_num_docs: 350, + warmup_microsecs: 525, + cpu_thread_pool_wait_microsecs: 700, + cpu_microsecs: 875, + }) + ); + } +} diff --git a/quickwit/quickwit-search/src/list_terms.rs b/quickwit/quickwit-search/src/list_terms.rs index 765203438d1..06136b0fefb 100644 --- a/quickwit/quickwit-search/src/list_terms.rs +++ b/quickwit/quickwit-search/src/list_terms.rs @@ -33,13 +33,15 @@ use quickwit_proto::search::{ SplitIdAndFooterOffsets, SplitSearchError, }; use quickwit_proto::types::IndexUid; -use quickwit_storage::Storage; +use quickwit_storage::{ByteRangeCache, Storage}; use tantivy::schema::{Field, FieldType}; use tantivy::{ReloadPolicy, Term}; use tracing::{debug, error, info, instrument}; use crate::leaf::open_index_with_caches; use crate::search_job_placer::group_jobs_by_index_id; +use crate::search_permit_provider::SearchPermit; +use crate::tracked_cache::TrackedByteRangeCache; use crate::{resolve_index_patterns, ClusterClient, SearchError, SearchJob, SearcherContext}; /// Performs a distributed list terms. @@ -215,8 +217,14 @@ async fn leaf_list_terms_single_split( search_request: &ListTermsRequest, storage: Arc, split: SplitIdAndFooterOffsets, + search_permit: SearchPermit, ) -> crate::Result { - let index = open_index_with_caches(searcher_context, storage, &split, None, true).await?; + let cache = + ByteRangeCache::with_infinite_capacity(&quickwit_storage::STORAGE_METRICS.shortlived_cache); + let tracked_cache = TrackedByteRangeCache::new(cache, search_permit); + let index = + open_index_with_caches(searcher_context, storage, &split, None, Some(tracked_cache)) + .await?; let split_schema = index.schema(); let reader = index .reader_builder() @@ -325,18 +333,18 @@ pub async fn leaf_list_terms( splits: &[SplitIdAndFooterOffsets], ) -> Result { info!(split_offsets = ?PrettySample::new(splits, 5)); + let permits = searcher_context + .search_permit_provider + .get_permits(splits.len()) + .await; let leaf_search_single_split_futures: Vec<_> = splits .iter() - .map(|split| { + .zip(permits.into_iter()) + .map(|(split, search_permit_recv)| { let index_storage_clone = index_storage.clone(); let searcher_context_clone = searcher_context.clone(); async move { - let _leaf_split_search_permit = searcher_context_clone - .search_permit_provider - .get_permit() - .await - .expect("Failed to acquire permit. This should never happen! Please, report on https://github.com/quickwit-oss/quickwit/issues."); - + let leaf_split_search_permit = search_permit_recv.await; // TODO dedicated counter and timer? crate::SEARCH_METRICS.leaf_searches_splits_total.inc(); let timer = crate::SEARCH_METRICS @@ -347,6 +355,7 @@ pub async fn leaf_list_terms( request, index_storage_clone, split.clone(), + leaf_split_search_permit, ) .await; timer.observe_duration(); diff --git a/quickwit/quickwit-search/src/root.rs b/quickwit/quickwit-search/src/root.rs index 608bc87e479..7cf9fe16bbd 100644 --- a/quickwit/quickwit-search/src/root.rs +++ b/quickwit/quickwit-search/src/root.rs @@ -18,6 +18,7 @@ // along with this program. If not, see . use std::collections::{HashMap, HashSet}; +use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::OnceLock; use std::time::Duration; @@ -49,7 +50,7 @@ use tantivy::aggregation::intermediate_agg_result::IntermediateAggregationResult use tantivy::collector::Collector; use tantivy::schema::{Field, FieldEntry, FieldType, Schema}; use tantivy::TantivyError; -use tracing::{debug, info, info_span, instrument}; +use tracing::{debug, info_span, instrument}; use crate::cluster_client::ClusterClient; use crate::collector::{make_merge_collector, QuickwitAggregations}; @@ -683,10 +684,46 @@ pub fn get_count_from_metadata(split_metadatas: &[SplitMetadata]) -> Vec bool { + // It is not worth considering small splits for this. + if split_num_docs < 100_000 { + return false; + } + // We multiply those figure by 1_000 for accuracy. + const PERCENTILE: u64 = 95; + const PRIOR_NUM_BYTES_PER_DOC: u64 = 3 * 1_000; + static NUM_BYTES_PER_DOC_95_PERCENTILE_ESTIMATOR: AtomicU64 = + AtomicU64::new(PRIOR_NUM_BYTES_PER_DOC); + let num_bits_per_docs = num_bytes * 1_000 / split_num_docs; + let current_estimator = NUM_BYTES_PER_DOC_95_PERCENTILE_ESTIMATOR.load(Ordering::Relaxed); + let is_memory_intensive = num_bits_per_docs > current_estimator; + let new_estimator: u64 = if is_memory_intensive { + current_estimator.saturating_add(PRIOR_NUM_BYTES_PER_DOC * PERCENTILE / 100) + } else { + current_estimator.saturating_sub(PRIOR_NUM_BYTES_PER_DOC * (100 - PERCENTILE) / 100) + }; + // We do not use fetch_add / fetch_sub directly as they wrap around. + // Concurrency could lead to different results here, but really we don't care. + // + // This is just ignoring some gradient updates. + NUM_BYTES_PER_DOC_95_PERCENTILE_ESTIMATOR.store(new_estimator, Ordering::Relaxed); + is_memory_intensive +} + /// If this method fails for some splits, a partial search response is returned, with the list of /// faulty splits in the failed_splits field. #[instrument(level = "debug", skip_all)] @@ -744,9 +781,21 @@ pub(crate) async fn search_partial_hits_phase( has_intermediate_aggregation_result = leaf_search_response.intermediate_aggregation_result.is_some(), "Merged leaf search response." ); + + if let Some(resource_stats) = &leaf_search_response.resource_stats { + if is_top_5pct_memory_intensive( + resource_stats.short_lived_cache_num_bytes, + resource_stats.split_num_docs, + ) { + // We log at most 5 times per minute. + quickwit_common::rate_limited_info!(limit_per_min=5, split_num_docs=resource_stats.split_num_docs, %search_request.query_ast, short_lived_cached_num_bytes=resource_stats.short_lived_cache_num_bytes, query=%search_request.query_ast, "memory intensive query"); + } + } + if !leaf_search_response.failed_splits.is_empty() { quickwit_common::rate_limited_error!(limit_per_min=6, failed_splits = ?leaf_search_response.failed_splits, "leaf search response contains at least one failed split"); } + Ok(leaf_search_response) } @@ -1107,14 +1156,13 @@ async fn refine_and_list_matches( /// 2. Merges the search results. /// 3. Sends fetch docs requests to multiple leaf nodes. /// 4. Builds the response with docs and returns. -#[instrument(skip_all)] +#[instrument(skip_all, fields(request_id))] pub async fn root_search( searcher_context: &SearcherContext, mut search_request: SearchRequest, mut metastore: MetastoreServiceClient, cluster_client: &ClusterClient, ) -> crate::Result { - info!(searcher_context = ?searcher_context, search_request = ?search_request); let start_instant = tokio::time::Instant::now(); let list_indexes_metadatas_request = ListIndexesMetadataRequest { index_id_patterns: search_request.index_id_patterns.clone(), @@ -1169,9 +1217,12 @@ pub async fn root_search( ) .await; + let elapsed = start_instant.elapsed(); + if let Ok(search_response) = &mut search_response_result { - search_response.elapsed_time_micros = start_instant.elapsed().as_micros() as u64; + search_response.elapsed_time_micros = elapsed.as_micros() as u64; } + let label_values = if search_response_result.is_ok() { ["success"] } else { diff --git a/quickwit/quickwit-search/src/search_permit_provider.rs b/quickwit/quickwit-search/src/search_permit_provider.rs index f6883efb34b..f396c42dd01 100644 --- a/quickwit/quickwit-search/src/search_permit_provider.rs +++ b/quickwit/quickwit-search/src/search_permit_provider.rs @@ -18,109 +18,187 @@ // along with this program. If not, see . use std::collections::VecDeque; -use std::sync::{Arc, Mutex}; +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; +use bytesize::ByteSize; use quickwit_common::metrics::GaugeGuard; -use tokio::sync::oneshot; +#[cfg(test)] +use tokio::sync::watch; +use tokio::sync::{mpsc, oneshot}; +use tracing::warn; -/// `SearchPermitProvider` is a distributor of permits to perform single split -/// search operation. +/// Distributor of permits to perform split search operation. /// -/// Requests are served in order. +/// Requests are served in order. Each permit initially reserves a slot for the +/// warmup (limit concurrent downloads) and a pessimistic amount of memory. Once +/// the warmup is completed, the actual memory usage is set and the warmup slot +/// is released. Once the search is completed and the permit is dropped, the +/// remaining memory is also released. #[derive(Clone)] pub struct SearchPermitProvider { - inner_arc: Arc>, + message_sender: mpsc::UnboundedSender, + #[cfg(test)] + actor_stopped: watch::Receiver, +} + +#[derive(Debug)] +pub enum SearchPermitMessage { + Request { + permit_sender: oneshot::Sender>, + num_permits: usize, + }, + WarmupCompleted { + memory_delta: i64, + }, + Drop { + memory_size: u64, + warmup_permit_held: bool, + }, } impl SearchPermitProvider { - pub fn new(num_permits: usize) -> SearchPermitProvider { - SearchPermitProvider { - inner_arc: Arc::new(Mutex::new(InnerSearchPermitProvider { - num_permits_available: num_permits, - permits_requests: VecDeque::new(), - })), + pub fn new( + num_download_slots: usize, + memory_budget: ByteSize, + initial_allocation: ByteSize, + ) -> Self { + let (message_sender, message_receiver) = mpsc::unbounded_channel(); + #[cfg(test)] + let (state_sender, state_receiver) = watch::channel(false); + let actor = SearchPermitActor { + msg_receiver: message_receiver, + msg_sender: message_sender.downgrade(), + num_warmup_slots_available: num_download_slots, + total_memory_budget: memory_budget.as_u64(), + permits_requests: VecDeque::new(), + total_memory_allocated: 0u64, + per_permit_initial_memory_allocation: initial_allocation.as_u64(), + #[cfg(test)] + stopped: state_sender, + }; + tokio::spawn(actor.run()); + Self { + message_sender, + #[cfg(test)] + actor_stopped: state_receiver, } } - /// Returns a future permit in the form of a oneshot Receiver channel. - /// - /// At this point the permit is not acquired yet. - #[must_use] - pub fn get_permit(&self) -> oneshot::Receiver { - let mut permits_lock = self.inner_arc.lock().unwrap(); - permits_lock.get_permit(&self.inner_arc) - } - - /// Returns a list of future permits in the form of oneshot Receiver channels. + /// Returns `num_permits` futures that complete once enough resources are + /// available. /// - /// The permits returned are guaranteed to be resolved in order. - /// In addition, the permits are guaranteed to be resolved before permits returned by - /// subsequent calls to this function (or `get_permit`). - #[must_use] - pub fn get_permits(&self, num_permits: usize) -> Vec> { - let mut permits_lock = self.inner_arc.lock().unwrap(); - permits_lock.get_permits(num_permits, &self.inner_arc) + /// The permits returned are guaranteed to be resolved in order. In + /// addition, the permits are guaranteed to be resolved before permits + /// returned by subsequent calls to this function. + pub async fn get_permits(&self, num_permits: usize) -> Vec { + let (permit_sender, permit_receiver) = oneshot::channel(); + self.message_sender + .send(SearchPermitMessage::Request { + permit_sender, + num_permits, + }) + .expect("Receiver lives longer than sender"); + permit_receiver + .await + .expect("Receiver lives longer than sender") } } -struct InnerSearchPermitProvider { - num_permits_available: usize, +struct SearchPermitActor { + msg_receiver: mpsc::UnboundedReceiver, + msg_sender: mpsc::WeakUnboundedSender, + num_warmup_slots_available: usize, + /// Note it is possible for memory_allocated to exceed memory_budget temporarily, + /// if and only if a split leaf search task ended up using more than `initial_allocation`. + /// When it happens, new permits will not be assigned until the memory is freed. + total_memory_budget: u64, + total_memory_allocated: u64, + per_permit_initial_memory_allocation: u64, permits_requests: VecDeque>, + #[cfg(test)] + stopped: watch::Sender, } -impl InnerSearchPermitProvider { - fn get_permit( - &mut self, - inner_arc: &Arc>, - ) -> oneshot::Receiver { - let (tx, rx) = oneshot::channel(); - self.permits_requests.push_back(tx); - self.assign_available_permits(inner_arc); - rx - } - - fn get_permits( - &mut self, - num_permits: usize, - inner_arc: &Arc>, - ) -> Vec> { - let mut permits = Vec::with_capacity(num_permits); - for _ in 0..num_permits { - let (tx, rx) = oneshot::channel(); - self.permits_requests.push_back(tx); - permits.push(rx); +impl SearchPermitActor { + async fn run(mut self) { + // Stops when the last clone of SearchPermitProvider is dropped. + while let Some(msg) = self.msg_receiver.recv().await { + self.handle_message(msg); } - self.assign_available_permits(inner_arc); - permits + #[cfg(test)] + self.stopped.send(true).ok(); } - fn recycle_permit(&mut self, inner_arc: &Arc>) { - self.num_permits_available += 1; - self.assign_available_permits(inner_arc); + fn handle_message(&mut self, msg: SearchPermitMessage) { + match msg { + SearchPermitMessage::Request { + num_permits, + permit_sender, + } => { + let mut permits = Vec::with_capacity(num_permits); + for _ in 0..num_permits { + let (tx, rx) = oneshot::channel(); + self.permits_requests.push_back(tx); + permits.push(SearchPermitFuture(rx)); + } + self.assign_available_permits(); + permit_sender + .send(permits) + .ok() + // This is a request response pattern, so we can safely ignore the error. + .expect("Receiver lives longer than sender"); + } + SearchPermitMessage::WarmupCompleted { memory_delta } => { + self.num_warmup_slots_available += 1; + if self.total_memory_allocated as i64 + memory_delta < 0 { + panic!("More memory released than allocated, should never happen.") + } + self.total_memory_allocated = + (self.total_memory_allocated as i64 + memory_delta) as u64; + self.assign_available_permits(); + } + SearchPermitMessage::Drop { + memory_size, + warmup_permit_held, + } => { + if warmup_permit_held { + self.num_warmup_slots_available += 1; + } + self.total_memory_allocated = self + .total_memory_allocated + .checked_sub(memory_size) + .expect("More memory released than allocated, should never happen."); + self.assign_available_permits(); + } + } } - fn assign_available_permits(&mut self, inner_arc: &Arc>) { - while self.num_permits_available > 0 { - let Some(sender) = self.permits_requests.pop_front() else { + fn assign_available_permits(&mut self) { + while self.num_warmup_slots_available > 0 + && self.total_memory_allocated + self.per_permit_initial_memory_allocation + <= self.total_memory_budget + { + let Some(permit_requester_tx) = self.permits_requests.pop_front() else { break; }; let mut ongoing_gauge_guard = GaugeGuard::from_gauge( &crate::SEARCH_METRICS.leaf_search_single_split_tasks_ongoing, ); ongoing_gauge_guard.add(1); - let send_res = sender.send(SearchPermit { - _ongoing_gauge_guard: ongoing_gauge_guard, - inner_arc: inner_arc.clone(), - recycle_on_drop: true, - }); - match send_res { - Ok(()) => { - self.num_permits_available -= 1; - } - Err(search_permit) => { - search_permit.drop_without_recycling_permit(); - } - } + self.total_memory_allocated += self.per_permit_initial_memory_allocation; + self.num_warmup_slots_available -= 1; + permit_requester_tx + .send(SearchPermit { + _ongoing_gauge_guard: ongoing_gauge_guard, + msg_sender: self.msg_sender.clone(), + memory_allocation: self.per_permit_initial_memory_allocation, + warmup_permit_held: true, + }) + // if the requester dropped its receiver, we drop the newly + // created SearchPermit which releases the resources + .ok(); } crate::SEARCH_METRICS .leaf_search_single_split_tasks_pending @@ -128,41 +206,84 @@ impl InnerSearchPermitProvider { } } +#[derive(Debug)] pub struct SearchPermit { _ongoing_gauge_guard: GaugeGuard<'static>, - inner_arc: Arc>, - recycle_on_drop: bool, + msg_sender: mpsc::WeakUnboundedSender, + memory_allocation: u64, + warmup_permit_held: bool, } impl SearchPermit { - fn drop_without_recycling_permit(mut self) { - self.recycle_on_drop = false; - drop(self); + /// After warm up, we have a proper estimate of the memory usage of a single + /// split leaf search. We can thus set the actual memory usage and release + /// the warmup slot. + pub fn warmup_completed(&mut self, new_memory_usage: ByteSize) { + let new_usage_bytes = new_memory_usage.as_u64(); + if new_usage_bytes > self.memory_allocation { + warn!( + memory_usage = new_usage_bytes, + memory_allocation = self.memory_allocation, + "current leaf search is consuming more memory than the initial allocation" + ); + } + let memory_delta = new_usage_bytes as i64 - self.memory_allocation as i64; + self.warmup_permit_held = false; + self.memory_allocation = new_usage_bytes; + self.send_if_still_running(SearchPermitMessage::WarmupCompleted { memory_delta }); + } + + fn send_if_still_running(&self, msg: SearchPermitMessage) { + if let Some(sender) = self.msg_sender.upgrade() { + sender + .send(msg) + // Receiver instance in the event loop is never dropped or + // closed as long as there is a strong sender reference. + .expect("Receiver should live longer than sender"); + } } } impl Drop for SearchPermit { fn drop(&mut self) { - if !self.recycle_on_drop { - return; + self.send_if_still_running(SearchPermitMessage::Drop { + memory_size: self.memory_allocation, + warmup_permit_held: self.warmup_permit_held, + }); + } +} + +#[derive(Debug)] +pub struct SearchPermitFuture(oneshot::Receiver); + +impl Future for SearchPermitFuture { + type Output = SearchPermit; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let receiver = Pin::new(&mut self.get_mut().0); + match receiver.poll(cx) { + Poll::Ready(Ok(search_permit)) => Poll::Ready(search_permit), + Poll::Ready(Err(_)) => panic!("Failed to acquire permit. This should never happen! Please, report on https://github.com/quickwit-oss/quickwit/issues."), + Poll::Pending => Poll::Pending, } - let mut inner_guard = self.inner_arc.lock().unwrap(); - inner_guard.recycle_permit(&self.inner_arc.clone()); } } #[cfg(test)] mod tests { + use std::time::Duration; + + use futures::StreamExt; + use rand::seq::SliceRandom; use tokio::task::JoinSet; use super::*; #[tokio::test] - async fn test_search_permits_get_permits_future() { - // We test here that `get_permits_futures` does not interleave - let search_permits = SearchPermitProvider::new(1); + async fn test_search_permit_order() { + let permit_provider = SearchPermitProvider::new(1, ByteSize::mb(100), ByteSize::mb(10)); let mut all_futures = Vec::new(); - let first_batch_of_permits = search_permits.get_permits(10); + let first_batch_of_permits = permit_provider.get_permits(10).await; assert_eq!(first_batch_of_permits.len(), 10); all_futures.extend( first_batch_of_permits @@ -171,7 +292,7 @@ mod tests { .map(move |(i, fut)| ((1, i), fut)), ); - let second_batch_of_permits = search_permits.get_permits(10); + let second_batch_of_permits = permit_provider.get_permits(10).await; assert_eq!(second_batch_of_permits.len(), 10); all_futures.extend( second_batch_of_permits @@ -180,7 +301,6 @@ mod tests { .map(move |(i, fut)| ((2, i), fut)), ); - use rand::seq::SliceRandom; // not super useful, considering what join set does, but still a tiny bit more sound. all_futures.shuffle(&mut rand::thread_rng()); @@ -206,15 +326,71 @@ mod tests { } #[tokio::test] - async fn test_search_permits_receiver_race_condition() { - // Here we test that we don't have a problem if the Receiver is dropped. - // In particular, we want to check that there is not a race condition where drop attempts to - // lock the mutex. - let search_permits = SearchPermitProvider::new(1); - let permit_rx = search_permits.get_permit(); - let permit_rx2 = search_permits.get_permit(); - drop(permit_rx2); - drop(permit_rx); - let _permit_rx = search_permits.get_permit(); + async fn test_search_permit_early_drops() { + let permit_provider = SearchPermitProvider::new(1, ByteSize::mb(100), ByteSize::mb(10)); + let permit_fut1 = permit_provider + .get_permits(1) + .await + .into_iter() + .next() + .unwrap(); + let permit_fut2 = permit_provider + .get_permits(1) + .await + .into_iter() + .next() + .unwrap(); + drop(permit_fut1); + let permit = permit_fut2.await; + assert_eq!(permit.memory_allocation, ByteSize::mb(10).as_u64()); + assert_eq!(*permit_provider.actor_stopped.borrow(), false); + + let _permit_fut3 = permit_provider + .get_permits(1) + .await + .into_iter() + .next() + .unwrap(); + let mut actor_stopped = permit_provider.actor_stopped.clone(); + drop(permit_provider); + { + actor_stopped.changed().await.unwrap(); + assert!(*actor_stopped.borrow()); + } + } + + /// Tries to wait for a permit + async fn try_get(permit_fut: SearchPermitFuture) -> anyhow::Result { + // using a short timeout is a bit flaky, but it should be enough for these tests + let permit = tokio::time::timeout(Duration::from_millis(20), permit_fut).await?; + Ok(permit) + } + + #[tokio::test] + async fn test_memory_permit() { + let permit_provider = SearchPermitProvider::new(100, ByteSize::mb(100), ByteSize::mb(10)); + let mut permit_futs = permit_provider.get_permits(14).await; + let mut remaining_permit_futs = permit_futs.split_off(10).into_iter(); + assert_eq!(remaining_permit_futs.len(), 4); + // we should be able to obtain 10 permits right away (100MB / 10MB) + let mut permits: Vec = futures::stream::iter(permit_futs.into_iter()) + .buffered(1) + .collect() + .await; + // the next permit is blocked by the memory budget + let next_blocked_permit_fut = remaining_permit_futs.next().unwrap(); + try_get(next_blocked_permit_fut).await.unwrap_err(); + // if we drop one of the permits, we can get a new one + permits.drain(0..1); + let next_permit_fut = remaining_permit_futs.next().unwrap(); + let _new_permit = try_get(next_permit_fut).await.unwrap(); + // the next permit is blocked again by the memory budget + let next_blocked_permit_fut = remaining_permit_futs.next().unwrap(); + try_get(next_blocked_permit_fut).await.unwrap_err(); + // by setting a more accurate memory usage after a completed warmup, we can get more permits + permits[0].warmup_completed(ByteSize::mb(4)); + permits[1].warmup_completed(ByteSize::mb(6)); + let next_permit_fut = remaining_permit_futs.next().unwrap(); + try_get(next_permit_fut).await.unwrap(); } } diff --git a/quickwit/quickwit-search/src/search_stream/leaf.rs b/quickwit/quickwit-search/src/search_stream/leaf.rs index 941e0d12612..80a3509ea0e 100644 --- a/quickwit/quickwit-search/src/search_stream/leaf.rs +++ b/quickwit/quickwit-search/src/search_stream/leaf.rs @@ -29,7 +29,7 @@ use quickwit_proto::search::{ LeafSearchStreamResponse, OutputFormat, SearchRequest, SearchStreamRequest, SplitIdAndFooterOffsets, }; -use quickwit_storage::Storage; +use quickwit_storage::{ByteRangeCache, Storage}; use tantivy::columnar::{DynamicColumn, HasAssociatedColumnType}; use tantivy::fastfield::Column; use tantivy::query::Query; @@ -127,12 +127,15 @@ async fn leaf_search_stream_single_split( &split, ); + let cache = + ByteRangeCache::with_infinite_capacity(&quickwit_storage::STORAGE_METRICS.shortlived_cache); + // TODO should create a SearchPermit and wrap ByteRangeCache with TrackedByteRangeCache here? let index = open_index_with_caches( &searcher_context, storage, &split, Some(doc_mapper.tokenizer_manager()), - true, + Some(Arc::new(cache)), ) .await?; let split_schema = index.schema(); diff --git a/quickwit/quickwit-search/src/service.rs b/quickwit/quickwit-search/src/service.rs index 0029f4dd3a7..e6d1238e644 100644 --- a/quickwit/quickwit-search/src/service.rs +++ b/quickwit/quickwit-search/src/service.rs @@ -488,8 +488,11 @@ impl SearcherContext { capacity_in_bytes, &quickwit_storage::STORAGE_METRICS.split_footer_cache, ); - let leaf_search_split_semaphore = - SearchPermitProvider::new(searcher_config.max_num_concurrent_split_searches); + let leaf_search_split_semaphore = SearchPermitProvider::new( + searcher_config.max_num_concurrent_split_searches, + searcher_config.warmup_memory_budget, + searcher_config.warmup_single_split_initial_allocation, + ); let split_stream_semaphore = Semaphore::new(searcher_config.max_num_concurrent_split_streams); let fast_field_cache_capacity = searcher_config.fast_field_cache_capacity.as_u64() as usize; diff --git a/quickwit/quickwit-search/src/tracked_cache.rs b/quickwit/quickwit-search/src/tracked_cache.rs new file mode 100644 index 00000000000..6529f38332b --- /dev/null +++ b/quickwit/quickwit-search/src/tracked_cache.rs @@ -0,0 +1,78 @@ +// Copyright (C) 2024 Quickwit, Inc. +// +// Quickwit is offered under the AGPL v3.0 and as commercial software. +// For commercial licensing, contact us at hello@quickwit.io. +// +// AGPL: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use std::sync::{Arc, Mutex}; + +use bytesize::ByteSize; +use quickwit_directories::DirectoryCache; +use quickwit_storage::ByteRangeCache; + +use crate::search_permit_provider::SearchPermit; + +/// A [`ByteRangeCache`] tied to a [`SearchPermit`]. +#[derive(Clone)] +pub struct TrackedByteRangeCache { + inner: Arc, +} + +struct Inner { + cache: ByteRangeCache, + search_permit: Mutex, +} + +impl TrackedByteRangeCache { + pub fn new(cache: ByteRangeCache, search_permit: SearchPermit) -> TrackedByteRangeCache { + TrackedByteRangeCache { + inner: Arc::new(Inner { + cache, + search_permit: Mutex::new(search_permit), + }), + } + } + + pub fn warmup_completed(&self) { + self.inner + .search_permit + .lock() + .unwrap() + .warmup_completed(ByteSize(self.get_num_bytes())); + } + + pub fn get_num_bytes(&self) -> u64 { + self.inner.cache.get_num_bytes() + } +} + +impl DirectoryCache for TrackedByteRangeCache { + fn get_slice( + &self, + path: &std::path::Path, + byte_range: std::ops::Range, + ) -> Option { + self.inner.cache.get_slice(path, byte_range) + } + fn put_slice( + &self, + path: std::path::PathBuf, + byte_range: std::ops::Range, + bytes: quickwit_storage::OwnedBytes, + ) { + self.inner.cache.put_slice(path, byte_range, bytes) + } +} diff --git a/quickwit/quickwit-storage/src/cache/byte_range_cache.rs b/quickwit/quickwit-storage/src/cache/byte_range_cache.rs index 9ef3b7f523f..fbe0031f291 100644 --- a/quickwit/quickwit-storage/src/cache/byte_range_cache.rs +++ b/quickwit/quickwit-storage/src/cache/byte_range_cache.rs @@ -21,6 +21,7 @@ use std::borrow::{Borrow, Cow}; use std::collections::BTreeMap; use std::ops::Range; use std::path::{Path, PathBuf}; +use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Mutex; use tantivy::directory::OwnedBytes; @@ -345,19 +346,26 @@ impl Drop for NeedMutByteRangeCache { /// /// At the moment this is hardly a cache as it features no eviction policy. pub struct ByteRangeCache { + num_stored_bytes: AtomicU64, inner: Mutex>, } impl ByteRangeCache { /// Creates a slice cache that never removes any entry. pub fn with_infinite_capacity(cache_counters: &'static CacheMetrics) -> Self { + let need_mut_byte_range_cache = + NeedMutByteRangeCache::with_infinite_capacity(cache_counters); ByteRangeCache { - inner: Mutex::new(NeedMutByteRangeCache::with_infinite_capacity( - cache_counters, - )), + num_stored_bytes: AtomicU64::default(), + inner: Mutex::new(need_mut_byte_range_cache), } } + /// Overall amount of bytes stored in the cache. + pub fn get_num_bytes(&self) -> u64 { + self.num_stored_bytes.load(Ordering::Relaxed) + } + /// If available, returns the cached view of the slice. pub fn get_slice(&self, path: &Path, byte_range: Range) -> Option { self.inner.lock().unwrap().get_slice(path, byte_range) @@ -365,10 +373,11 @@ impl ByteRangeCache { /// Put the given amount of data in the cache. pub fn put_slice(&self, path: PathBuf, byte_range: Range, bytes: OwnedBytes) { - self.inner - .lock() - .unwrap() - .put_slice(path, byte_range, bytes) + let mut need_mut_byte_range_cache_locked = self.inner.lock().unwrap(); + need_mut_byte_range_cache_locked.put_slice(path, byte_range, bytes); + let num_bytes = need_mut_byte_range_cache_locked.num_bytes; + drop(need_mut_byte_range_cache_locked); + self.num_stored_bytes.store(num_bytes, Ordering::Relaxed); } } @@ -519,7 +528,7 @@ mod tests { ); { - let mutable_cache = cache.inner.lock().unwrap(); + let mutable_cache = &cache.inner.lock().unwrap(); assert_eq!(mutable_cache.cache.len(), 4); assert_eq!(mutable_cache.num_items, 4); assert_eq!(mutable_cache.cache_counters.in_cache_count.get(), 4); @@ -531,7 +540,7 @@ mod tests { { // now they should've been merged, except the last one - let mutable_cache = cache.inner.lock().unwrap(); + let mutable_cache = &cache.inner.lock().unwrap(); assert_eq!(mutable_cache.cache.len(), 2); assert_eq!(mutable_cache.num_items, 2); assert_eq!(mutable_cache.cache_counters.in_cache_count.get(), 2);