diff --git a/quickwit/quickwit-directories/src/caching_directory.rs b/quickwit/quickwit-directories/src/caching_directory.rs index b90f444d062..0879a512940 100644 --- a/quickwit/quickwit-directories/src/caching_directory.rs +++ b/quickwit/quickwit-directories/src/caching_directory.rs @@ -42,12 +42,18 @@ impl CachingDirectory { /// Warming: The resulting CacheDirectory will cache all information without ever /// removing any item from the cache. pub fn new_unbounded(underlying: Arc) -> CachingDirectory { - CachingDirectory { - underlying, - cache: Arc::new(ByteRangeCache::with_infinite_capacity( - &quickwit_storage::STORAGE_METRICS.shortlived_cache, - )), - } + let byte_range_cache = ByteRangeCache::with_infinite_capacity( + &quickwit_storage::STORAGE_METRICS.shortlived_cache, + ); + CachingDirectory::new(underlying, Arc::new(byte_range_cache)) + } + + /// Creates a new CachingDirectory. + /// + /// Warming: The resulting CacheDirectory will cache all information without ever + /// removing any item from the cache. + pub fn new(underlying: Arc, cache: Arc) -> CachingDirectory { + CachingDirectory { underlying, cache } } } diff --git a/quickwit/quickwit-directories/src/hot_directory.rs b/quickwit/quickwit-directories/src/hot_directory.rs index d217ac29851..dc2161e38a5 100644 --- a/quickwit/quickwit-directories/src/hot_directory.rs +++ b/quickwit/quickwit-directories/src/hot_directory.rs @@ -491,6 +491,7 @@ pub fn write_hotcache( // We use the caching directory here in order to defensively ensure that // the content of the directory that will be written in the hotcache is precisely // the same that was read on the first pass. + let caching_directory = CachingDirectory::new_unbounded(Arc::new(directory)); let debug_proxy_directory = DebugProxyDirectory::wrap(caching_directory); let index = Index::open(debug_proxy_directory.clone())?; diff --git a/quickwit/quickwit-proto/protos/quickwit/search.proto b/quickwit/quickwit-proto/protos/quickwit/search.proto index 60671239ecc..8219d65f838 100644 --- a/quickwit/quickwit-proto/protos/quickwit/search.proto +++ b/quickwit/quickwit-proto/protos/quickwit/search.proto @@ -191,6 +191,10 @@ message ListFields { // -- Search ------------------- message SearchRequest { + // id used for logging/debugging purpose. + // If left empty, the root node will generate one. + string request_id = 18; + // Index ID patterns repeated string index_id_patterns = 1; diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs index 473f1ec42f1..31ffd5e2fc5 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs @@ -147,6 +147,10 @@ pub struct ListFields { #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct SearchRequest { + /// id used for logging/debugging purpose. + /// If left empty, the root node will generate one. + #[prost(string, tag = "18")] + pub request_id: ::prost::alloc::string::String, /// Index ID patterns #[prost(string, repeated, tag = "1")] pub index_id_patterns: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, @@ -371,7 +375,6 @@ pub struct Hit { pub index_id: ::prost::alloc::string::String, } /// A partial hit, is a hit for which we have not fetch the content yet. -/// /// Instead, it holds a document_uri which is enough information to /// go and fetch the actual document data, by performing a `get_doc(...)` /// request. diff --git a/quickwit/quickwit-search/src/fetch_docs.rs b/quickwit/quickwit-search/src/fetch_docs.rs index 9c326764539..e8cf5c3a1fb 100644 --- a/quickwit/quickwit-search/src/fetch_docs.rs +++ b/quickwit/quickwit-search/src/fetch_docs.rs @@ -179,7 +179,7 @@ async fn fetch_docs_in_split( index_storage, split, Some(doc_mapper.tokenizer_manager()), - false, + None, ) .await .context("open-index-for-split")?; diff --git a/quickwit/quickwit-search/src/leaf.rs b/quickwit/quickwit-search/src/leaf.rs index 550556a0dd0..4bd5a6338ab 100644 --- a/quickwit/quickwit-search/src/leaf.rs +++ b/quickwit/quickwit-search/src/leaf.rs @@ -35,8 +35,8 @@ use quickwit_proto::search::{ use quickwit_query::query_ast::{BoolQuery, QueryAst, QueryAstTransformer, RangeQuery, TermQuery}; use quickwit_query::tokenizers::TokenizerManager; use quickwit_storage::{ - wrap_storage_with_cache, BundleStorage, MemorySizedCache, OwnedBytes, SplitCache, Storage, - StorageResolver, TimeoutAndRetryStorage, + wrap_storage_with_cache, BundleStorage, ByteRangeCache, MemorySizedCache, OwnedBytes, + SplitCache, Storage, StorageResolver, TimeoutAndRetryStorage, }; use tantivy::aggregation::agg_req::{AggregationVariants, Aggregations}; use tantivy::aggregation::AggregationLimitsGuard; @@ -133,7 +133,7 @@ pub(crate) async fn open_index_with_caches( index_storage: Arc, split_and_footer_offsets: &SplitIdAndFooterOffsets, tokenizer_manager: Option<&TokenizerManager>, - ephemeral_unbounded_cache: bool, + ephemeral_unbounded_cache: Option>, ) -> anyhow::Result { // Let's add a storage proxy to retry `get_slice` requests if they are taking too long, // if configured in the searcher config. @@ -165,8 +165,8 @@ pub(crate) async fn open_index_with_caches( let directory = StorageDirectory::new(bundle_storage_with_cache); - let hot_directory = if ephemeral_unbounded_cache { - let caching_directory = CachingDirectory::new_unbounded(Arc::new(directory)); + let hot_directory = if let Some(cache) = ephemeral_unbounded_cache { + let caching_directory = CachingDirectory::new(Arc::new(directory), cache); HotDirectory::open(caching_directory, hotcache_bytes.read_bytes()?)? } else { HotDirectory::open(directory, hotcache_bytes.read_bytes()?)? @@ -399,12 +399,15 @@ async fn leaf_search_single_split( } let split_id = split.split_id.to_string(); + let byte_range_cache = Arc::new(ByteRangeCache::with_infinite_capacity( + &quickwit_storage::STORAGE_METRICS.shortlived_cache, + )); let index = open_index_with_caches( searcher_context, storage, &split, Some(doc_mapper.tokenizer_manager()), - true, + Some(byte_range_cache.clone()), ) .await?; let split_schema = index.schema(); @@ -425,6 +428,9 @@ async fn leaf_search_single_split( warmup_info.simplify(); warmup(&searcher, &warmup_info).await?; + + info!(request=%search_request.request_id, input_data=byte_range_cache.get_num_bytes(), "split search input data memory"); + let span = info_span!("tantivy_search"); let (search_request, leaf_search_response) = { diff --git a/quickwit/quickwit-search/src/list_terms.rs b/quickwit/quickwit-search/src/list_terms.rs index 0a781355162..85702414428 100644 --- a/quickwit/quickwit-search/src/list_terms.rs +++ b/quickwit/quickwit-search/src/list_terms.rs @@ -33,7 +33,7 @@ use quickwit_proto::search::{ SplitIdAndFooterOffsets, SplitSearchError, }; use quickwit_proto::types::IndexUid; -use quickwit_storage::Storage; +use quickwit_storage::{ByteRangeCache, Storage}; use tantivy::schema::{Field, FieldType}; use tantivy::{ReloadPolicy, Term}; use tracing::{debug, error, info, instrument}; @@ -216,7 +216,16 @@ async fn leaf_list_terms_single_split( storage: Arc, split: SplitIdAndFooterOffsets, ) -> crate::Result { - let index = open_index_with_caches(searcher_context, storage, &split, None, true).await?; + let cache = + ByteRangeCache::with_infinite_capacity(&quickwit_storage::STORAGE_METRICS.shortlived_cache); + let index = open_index_with_caches( + searcher_context, + storage, + &split, + None, + Some(Arc::new(cache)), + ) + .await?; let split_schema = index.schema(); let reader = index .reader_builder() diff --git a/quickwit/quickwit-search/src/root.rs b/quickwit/quickwit-search/src/root.rs index 608bc87e479..a6a51d04c27 100644 --- a/quickwit/quickwit-search/src/root.rs +++ b/quickwit/quickwit-search/src/root.rs @@ -50,6 +50,7 @@ use tantivy::collector::Collector; use tantivy::schema::{Field, FieldEntry, FieldType, Schema}; use tantivy::TantivyError; use tracing::{debug, info, info_span, instrument}; +use ulid::Ulid; use crate::cluster_client::ClusterClient; use crate::collector::{make_merge_collector, QuickwitAggregations}; @@ -352,6 +353,7 @@ fn simplify_search_request_for_scroll_api(req: &SearchRequest) -> crate::Result< // We do not mutate Ok(SearchRequest { + request_id: req.request_id.clone(), index_id_patterns: req.index_id_patterns.clone(), query_ast: req.query_ast.clone(), start_timestamp: req.start_timestamp, @@ -1107,14 +1109,21 @@ async fn refine_and_list_matches( /// 2. Merges the search results. /// 3. Sends fetch docs requests to multiple leaf nodes. /// 4. Builds the response with docs and returns. -#[instrument(skip_all)] +#[instrument(skip_all, fields(request_id))] pub async fn root_search( searcher_context: &SearcherContext, mut search_request: SearchRequest, mut metastore: MetastoreServiceClient, cluster_client: &ClusterClient, ) -> crate::Result { - info!(searcher_context = ?searcher_context, search_request = ?search_request); + if search_request.request_id.is_empty() { + search_request.request_id = Ulid::new().to_string(); + } + + let request_id = search_request.request_id.clone(); + tracing::Span::current().record("request_id", request_id.as_str()); + + info!(search_request = ?search_request); let start_instant = tokio::time::Instant::now(); let list_indexes_metadatas_request = ListIndexesMetadataRequest { index_id_patterns: search_request.index_id_patterns.clone(), @@ -1169,9 +1178,14 @@ pub async fn root_search( ) .await; + let elapsed = start_instant.elapsed(); + + info!(request_id=%request_id.as_str(), num_docs=num_docs, num_splits=num_splits, elapsed_time_millis=%elapsed.as_millis(), "search completed"); + if let Ok(search_response) = &mut search_response_result { - search_response.elapsed_time_micros = start_instant.elapsed().as_micros() as u64; + search_response.elapsed_time_micros = elapsed.as_micros() as u64; } + let label_values = if search_response_result.is_ok() { ["success"] } else { diff --git a/quickwit/quickwit-search/src/search_stream/leaf.rs b/quickwit/quickwit-search/src/search_stream/leaf.rs index 941e0d12612..88c04be233c 100644 --- a/quickwit/quickwit-search/src/search_stream/leaf.rs +++ b/quickwit/quickwit-search/src/search_stream/leaf.rs @@ -29,7 +29,7 @@ use quickwit_proto::search::{ LeafSearchStreamResponse, OutputFormat, SearchRequest, SearchStreamRequest, SplitIdAndFooterOffsets, }; -use quickwit_storage::Storage; +use quickwit_storage::{ByteRangeCache, Storage}; use tantivy::columnar::{DynamicColumn, HasAssociatedColumnType}; use tantivy::fastfield::Column; use tantivy::query::Query; @@ -127,12 +127,15 @@ async fn leaf_search_stream_single_split( &split, ); + let cache = + ByteRangeCache::with_infinite_capacity(&quickwit_storage::STORAGE_METRICS.shortlived_cache); + let index = open_index_with_caches( &searcher_context, storage, &split, Some(doc_mapper.tokenizer_manager()), - true, + Some(Arc::new(cache)), ) .await?; let split_schema = index.schema(); diff --git a/quickwit/quickwit-serve/src/elasticsearch_api/rest_handler.rs b/quickwit/quickwit-serve/src/elasticsearch_api/rest_handler.rs index 4bca30add33..5726a1e6cc1 100644 --- a/quickwit/quickwit-serve/src/elasticsearch_api/rest_handler.rs +++ b/quickwit/quickwit-serve/src/elasticsearch_api/rest_handler.rs @@ -336,6 +336,7 @@ fn build_request_for_es_api( Ok(( quickwit_proto::search::SearchRequest { + request_id: String::default(), index_id_patterns, query_ast: serde_json::to_string(&query_ast).expect("Failed to serialize QueryAst"), max_hits, diff --git a/quickwit/quickwit-serve/src/search_api/rest_handler.rs b/quickwit/quickwit-serve/src/search_api/rest_handler.rs index 42a9a0ff44b..fa24a3bcd2e 100644 --- a/quickwit/quickwit-serve/src/search_api/rest_handler.rs +++ b/quickwit/quickwit-serve/src/search_api/rest_handler.rs @@ -283,6 +283,7 @@ pub fn search_request_from_api_request( let query_ast = query_ast_from_user_text(&search_request.query, search_request.search_fields); let query_ast_json = serde_json::to_string(&query_ast)?; let search_request = quickwit_proto::search::SearchRequest { + request_id: String::default(), index_id_patterns, query_ast: query_ast_json, snippet_fields: search_request.snippet_fields.unwrap_or_default(), diff --git a/quickwit/quickwit-storage/src/cache/byte_range_cache.rs b/quickwit/quickwit-storage/src/cache/byte_range_cache.rs index 9068e23657e..ea66b341189 100644 --- a/quickwit/quickwit-storage/src/cache/byte_range_cache.rs +++ b/quickwit/quickwit-storage/src/cache/byte_range_cache.rs @@ -356,6 +356,11 @@ impl ByteRangeCache { } } + /// Overall amount of bytes stored in the cache. + pub fn get_num_bytes(&self) -> u64 { + self.inner.lock().unwrap().num_bytes + } + /// If available, returns the cached view of the slice. pub fn get_slice(&self, path: &Path, byte_range: Range) -> Option { self.inner.lock().unwrap().get_slice(path, byte_range)