diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index f5b1bada6fe..5816093bf23 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -6442,6 +6442,7 @@ dependencies = [ "itertools 0.13.0", "mockall", "once_cell", + "opentelemetry", "postcard", "proptest", "prost", @@ -6464,6 +6465,7 @@ dependencies = [ "tokio-stream", "tower", "tracing", + "tracing-opentelemetry", "ttl_cache", "ulid", "utoipa", diff --git a/quickwit/quickwit-directories/src/caching_directory.rs b/quickwit/quickwit-directories/src/caching_directory.rs index b90f444d062..0879a512940 100644 --- a/quickwit/quickwit-directories/src/caching_directory.rs +++ b/quickwit/quickwit-directories/src/caching_directory.rs @@ -42,12 +42,18 @@ impl CachingDirectory { /// Warming: The resulting CacheDirectory will cache all information without ever /// removing any item from the cache. pub fn new_unbounded(underlying: Arc) -> CachingDirectory { - CachingDirectory { - underlying, - cache: Arc::new(ByteRangeCache::with_infinite_capacity( - &quickwit_storage::STORAGE_METRICS.shortlived_cache, - )), - } + let byte_range_cache = ByteRangeCache::with_infinite_capacity( + &quickwit_storage::STORAGE_METRICS.shortlived_cache, + ); + CachingDirectory::new(underlying, Arc::new(byte_range_cache)) + } + + /// Creates a new CachingDirectory. + /// + /// Warming: The resulting CacheDirectory will cache all information without ever + /// removing any item from the cache. + pub fn new(underlying: Arc, cache: Arc) -> CachingDirectory { + CachingDirectory { underlying, cache } } } diff --git a/quickwit/quickwit-search/Cargo.toml b/quickwit/quickwit-search/Cargo.toml index 35c85fb7fd4..fd774f44bb3 100644 --- a/quickwit/quickwit-search/Cargo.toml +++ b/quickwit/quickwit-search/Cargo.toml @@ -22,6 +22,7 @@ http = { workspace = true } itertools = { workspace = true } mockall = { workspace = true } once_cell = { workspace = true } +opentelemetry = { workspace = true } postcard = { workspace = true } prost = { workspace = true } rayon = { workspace = true } @@ -33,6 +34,7 @@ tokio = { workspace = true } tokio-stream = { workspace = true } tower = { workspace = true } tracing = { workspace = true } +tracing-opentelemetry = { workspace = true } ttl_cache = { workspace = true } ulid = { workspace = true } utoipa = { workspace = true } diff --git a/quickwit/quickwit-search/src/fetch_docs.rs b/quickwit/quickwit-search/src/fetch_docs.rs index 9c326764539..e8cf5c3a1fb 100644 --- a/quickwit/quickwit-search/src/fetch_docs.rs +++ b/quickwit/quickwit-search/src/fetch_docs.rs @@ -179,7 +179,7 @@ async fn fetch_docs_in_split( index_storage, split, Some(doc_mapper.tokenizer_manager()), - false, + None, ) .await .context("open-index-for-split")?; diff --git a/quickwit/quickwit-search/src/leaf.rs b/quickwit/quickwit-search/src/leaf.rs index 550556a0dd0..d33fe35cc44 100644 --- a/quickwit/quickwit-search/src/leaf.rs +++ b/quickwit/quickwit-search/src/leaf.rs @@ -35,8 +35,8 @@ use quickwit_proto::search::{ use quickwit_query::query_ast::{BoolQuery, QueryAst, QueryAstTransformer, RangeQuery, TermQuery}; use quickwit_query::tokenizers::TokenizerManager; use quickwit_storage::{ - wrap_storage_with_cache, BundleStorage, MemorySizedCache, OwnedBytes, SplitCache, Storage, - StorageResolver, TimeoutAndRetryStorage, + wrap_storage_with_cache, BundleStorage, ByteRangeCache, MemorySizedCache, OwnedBytes, + SplitCache, Storage, StorageResolver, TimeoutAndRetryStorage, }; use tantivy::aggregation::agg_req::{AggregationVariants, Aggregations}; use tantivy::aggregation::AggregationLimitsGuard; @@ -133,7 +133,7 @@ pub(crate) async fn open_index_with_caches( index_storage: Arc, split_and_footer_offsets: &SplitIdAndFooterOffsets, tokenizer_manager: Option<&TokenizerManager>, - ephemeral_unbounded_cache: bool, + ephemeral_unbounded_cache: Option>, ) -> anyhow::Result { // Let's add a storage proxy to retry `get_slice` requests if they are taking too long, // if configured in the searcher config. @@ -165,8 +165,8 @@ pub(crate) async fn open_index_with_caches( let directory = StorageDirectory::new(bundle_storage_with_cache); - let hot_directory = if ephemeral_unbounded_cache { - let caching_directory = CachingDirectory::new_unbounded(Arc::new(directory)); + let hot_directory = if let Some(cache) = ephemeral_unbounded_cache { + let caching_directory = CachingDirectory::new(Arc::new(directory), cache); HotDirectory::open(caching_directory, hotcache_bytes.read_bytes()?)? } else { HotDirectory::open(directory, hotcache_bytes.read_bytes()?)? @@ -375,6 +375,7 @@ async fn leaf_search_single_split( split_filter: Arc>, aggregations_limits: AggregationLimitsGuard, ) -> crate::Result { + rewrite_request( &mut search_request, &split, @@ -399,12 +400,15 @@ async fn leaf_search_single_split( } let split_id = split.split_id.to_string(); + let byte_range_cache = Arc::new(ByteRangeCache::with_infinite_capacity( + &quickwit_storage::STORAGE_METRICS.shortlived_cache, + )); let index = open_index_with_caches( searcher_context, storage, &split, Some(doc_mapper.tokenizer_manager()), - true, + Some(byte_range_cache.clone()), ) .await?; let split_schema = index.schema(); @@ -425,6 +429,10 @@ async fn leaf_search_single_split( warmup_info.simplify(); warmup(&searcher, &warmup_info).await?; + + let trace_id = crate::get_trace_id(); + info!(trace_id=%trace_id, split_id=%split.split_id.as_str(), num_docs=split.num_docs, input_data=byte_range_cache.get_num_bytes(), "split search input data memory"); + let span = info_span!("tantivy_search"); let (search_request, leaf_search_response) = { diff --git a/quickwit/quickwit-search/src/lib.rs b/quickwit/quickwit-search/src/lib.rs index bfee7da212c..0e61500e0f8 100644 --- a/quickwit/quickwit-search/src/lib.rs +++ b/quickwit/quickwit-search/src/lib.rs @@ -50,6 +50,7 @@ mod tests; pub use collector::QuickwitAggregations; use metrics::SEARCH_METRICS; +use opentelemetry::trace::TraceId; use quickwit_common::thread_pool::ThreadPool; use quickwit_common::tower::Pool; use quickwit_doc_mapper::DocMapper; @@ -339,3 +340,13 @@ pub fn searcher_pool_for_test( }), ) } + +pub(crate) fn get_trace_id() -> TraceId { + use opentelemetry::trace::TraceContextExt as _; + use tracing_opentelemetry::OpenTelemetrySpanExt as _; + tracing::Span::current() + .context() + .span() + .span_context() + .trace_id() +} diff --git a/quickwit/quickwit-search/src/list_terms.rs b/quickwit/quickwit-search/src/list_terms.rs index 0a781355162..85702414428 100644 --- a/quickwit/quickwit-search/src/list_terms.rs +++ b/quickwit/quickwit-search/src/list_terms.rs @@ -33,7 +33,7 @@ use quickwit_proto::search::{ SplitIdAndFooterOffsets, SplitSearchError, }; use quickwit_proto::types::IndexUid; -use quickwit_storage::Storage; +use quickwit_storage::{ByteRangeCache, Storage}; use tantivy::schema::{Field, FieldType}; use tantivy::{ReloadPolicy, Term}; use tracing::{debug, error, info, instrument}; @@ -216,7 +216,16 @@ async fn leaf_list_terms_single_split( storage: Arc, split: SplitIdAndFooterOffsets, ) -> crate::Result { - let index = open_index_with_caches(searcher_context, storage, &split, None, true).await?; + let cache = + ByteRangeCache::with_infinite_capacity(&quickwit_storage::STORAGE_METRICS.shortlived_cache); + let index = open_index_with_caches( + searcher_context, + storage, + &split, + None, + Some(Arc::new(cache)), + ) + .await?; let split_schema = index.schema(); let reader = index .reader_builder() diff --git a/quickwit/quickwit-search/src/root.rs b/quickwit/quickwit-search/src/root.rs index 608bc87e479..9ba9988777e 100644 --- a/quickwit/quickwit-search/src/root.rs +++ b/quickwit/quickwit-search/src/root.rs @@ -1107,14 +1107,16 @@ async fn refine_and_list_matches( /// 2. Merges the search results. /// 3. Sends fetch docs requests to multiple leaf nodes. /// 4. Builds the response with docs and returns. -#[instrument(skip_all)] +#[instrument(skip_all, fields(request_id))] pub async fn root_search( searcher_context: &SearcherContext, mut search_request: SearchRequest, mut metastore: MetastoreServiceClient, cluster_client: &ClusterClient, ) -> crate::Result { - info!(searcher_context = ?searcher_context, search_request = ?search_request); + let trace_id = crate::get_trace_id(); + + info!(trace_id=%trace_id, search_request = ?search_request); let start_instant = tokio::time::Instant::now(); let list_indexes_metadatas_request = ListIndexesMetadataRequest { index_id_patterns: search_request.index_id_patterns.clone(), @@ -1169,9 +1171,14 @@ pub async fn root_search( ) .await; + let elapsed = start_instant.elapsed(); + + info!(trace_id=%trace_id, num_docs=num_docs, num_splits=num_splits, elapsed_time_millis=%elapsed.as_millis(), "search completed"); + if let Ok(search_response) = &mut search_response_result { - search_response.elapsed_time_micros = start_instant.elapsed().as_micros() as u64; + search_response.elapsed_time_micros = elapsed.as_micros() as u64; } + let label_values = if search_response_result.is_ok() { ["success"] } else { diff --git a/quickwit/quickwit-search/src/search_stream/leaf.rs b/quickwit/quickwit-search/src/search_stream/leaf.rs index 941e0d12612..88c04be233c 100644 --- a/quickwit/quickwit-search/src/search_stream/leaf.rs +++ b/quickwit/quickwit-search/src/search_stream/leaf.rs @@ -29,7 +29,7 @@ use quickwit_proto::search::{ LeafSearchStreamResponse, OutputFormat, SearchRequest, SearchStreamRequest, SplitIdAndFooterOffsets, }; -use quickwit_storage::Storage; +use quickwit_storage::{ByteRangeCache, Storage}; use tantivy::columnar::{DynamicColumn, HasAssociatedColumnType}; use tantivy::fastfield::Column; use tantivy::query::Query; @@ -127,12 +127,15 @@ async fn leaf_search_stream_single_split( &split, ); + let cache = + ByteRangeCache::with_infinite_capacity(&quickwit_storage::STORAGE_METRICS.shortlived_cache); + let index = open_index_with_caches( &searcher_context, storage, &split, Some(doc_mapper.tokenizer_manager()), - true, + Some(Arc::new(cache)), ) .await?; let split_schema = index.schema(); diff --git a/quickwit/quickwit-storage/src/cache/byte_range_cache.rs b/quickwit/quickwit-storage/src/cache/byte_range_cache.rs index 9068e23657e..ea66b341189 100644 --- a/quickwit/quickwit-storage/src/cache/byte_range_cache.rs +++ b/quickwit/quickwit-storage/src/cache/byte_range_cache.rs @@ -356,6 +356,11 @@ impl ByteRangeCache { } } + /// Overall amount of bytes stored in the cache. + pub fn get_num_bytes(&self) -> u64 { + self.inner.lock().unwrap().num_bytes + } + /// If available, returns the cached view of the slice. pub fn get_slice(&self, path: &Path, byte_range: Range) -> Option { self.inner.lock().unwrap().get_slice(path, byte_range)