Skip to content

Commit

Permalink
Adds a concept of request_id for logging/correlation purpose.
Browse files Browse the repository at this point in the history
We also measure the amount of memory taken by a split search, and log
this.
  • Loading branch information
fulmicoton committed Oct 25, 2024
1 parent 02a5b6a commit 1e08a77
Show file tree
Hide file tree
Showing 12 changed files with 72 additions and 22 deletions.
2 changes: 2 additions & 0 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 12 additions & 6 deletions quickwit/quickwit-directories/src/caching_directory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,18 @@ impl CachingDirectory {
/// Warming: The resulting CacheDirectory will cache all information without ever
/// removing any item from the cache.
pub fn new_unbounded(underlying: Arc<dyn Directory>) -> CachingDirectory {
CachingDirectory {
underlying,
cache: Arc::new(ByteRangeCache::with_infinite_capacity(
&quickwit_storage::STORAGE_METRICS.shortlived_cache,
)),
}
let byte_range_cache = ByteRangeCache::with_infinite_capacity(
&quickwit_storage::STORAGE_METRICS.shortlived_cache,
);
CachingDirectory::new(underlying, Arc::new(byte_range_cache))
}

/// Creates a new CachingDirectory.
///
/// Warming: The resulting CacheDirectory will cache all information without ever
/// removing any item from the cache.
pub fn new(underlying: Arc<dyn Directory>, cache: Arc<ByteRangeCache>) -> CachingDirectory {
CachingDirectory { underlying, cache }
}
}

Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions quickwit/quickwit-search/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ http = { workspace = true }
itertools = { workspace = true }
mockall = { workspace = true }
once_cell = { workspace = true }
opentelemetry = { workspace = true }
postcard = { workspace = true }
prost = { workspace = true }
rayon = { workspace = true }
Expand All @@ -33,6 +34,7 @@ tokio = { workspace = true }
tokio-stream = { workspace = true }
tower = { workspace = true }
tracing = { workspace = true }
tracing-opentelemetry = { workspace = true }
ttl_cache = { workspace = true }
ulid = { workspace = true }
utoipa = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-search/src/fetch_docs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ async fn fetch_docs_in_split(
index_storage,
split,
Some(doc_mapper.tokenizer_manager()),
false,
None,
)
.await
.context("open-index-for-split")?;
Expand Down
21 changes: 15 additions & 6 deletions quickwit/quickwit-search/src/leaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ use quickwit_proto::search::{
use quickwit_query::query_ast::{BoolQuery, QueryAst, QueryAstTransformer, RangeQuery, TermQuery};
use quickwit_query::tokenizers::TokenizerManager;
use quickwit_storage::{
wrap_storage_with_cache, BundleStorage, MemorySizedCache, OwnedBytes, SplitCache, Storage,
StorageResolver, TimeoutAndRetryStorage,
wrap_storage_with_cache, BundleStorage, ByteRangeCache, MemorySizedCache, OwnedBytes,
SplitCache, Storage, StorageResolver, TimeoutAndRetryStorage,
};
use tantivy::aggregation::agg_req::{AggregationVariants, Aggregations};
use tantivy::aggregation::AggregationLimitsGuard;
Expand Down Expand Up @@ -133,7 +133,7 @@ pub(crate) async fn open_index_with_caches(
index_storage: Arc<dyn Storage>,
split_and_footer_offsets: &SplitIdAndFooterOffsets,
tokenizer_manager: Option<&TokenizerManager>,
ephemeral_unbounded_cache: bool,
ephemeral_unbounded_cache: Option<Arc<ByteRangeCache>>,
) -> anyhow::Result<Index> {
// Let's add a storage proxy to retry `get_slice` requests if they are taking too long,
// if configured in the searcher config.
Expand Down Expand Up @@ -165,8 +165,8 @@ pub(crate) async fn open_index_with_caches(

let directory = StorageDirectory::new(bundle_storage_with_cache);

let hot_directory = if ephemeral_unbounded_cache {
let caching_directory = CachingDirectory::new_unbounded(Arc::new(directory));
let hot_directory = if let Some(cache) = ephemeral_unbounded_cache {
let caching_directory = CachingDirectory::new(Arc::new(directory), cache);
HotDirectory::open(caching_directory, hotcache_bytes.read_bytes()?)?
} else {
HotDirectory::open(directory, hotcache_bytes.read_bytes()?)?
Expand Down Expand Up @@ -375,6 +375,9 @@ async fn leaf_search_single_split(
split_filter: Arc<RwLock<CanSplitDoBetter>>,
aggregations_limits: AggregationLimitsGuard,
) -> crate::Result<LeafSearchResponse> {

let trace_id = crate::get_trace_id();

rewrite_request(
&mut search_request,
&split,
Expand All @@ -399,12 +402,15 @@ async fn leaf_search_single_split(
}

let split_id = split.split_id.to_string();
let byte_range_cache = Arc::new(ByteRangeCache::with_infinite_capacity(
&quickwit_storage::STORAGE_METRICS.shortlived_cache,
));
let index = open_index_with_caches(
searcher_context,
storage,
&split,
Some(doc_mapper.tokenizer_manager()),
true,
Some(byte_range_cache.clone()),
)
.await?;
let split_schema = index.schema();
Expand All @@ -425,6 +431,9 @@ async fn leaf_search_single_split(
warmup_info.simplify();

warmup(&searcher, &warmup_info).await?;

info!(trace_id=%trace_id, input_data=byte_range_cache.get_num_bytes(), "split search input data memory");

let span = info_span!("tantivy_search");

let (search_request, leaf_search_response) = {
Expand Down
7 changes: 7 additions & 0 deletions quickwit/quickwit-search/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ mod tests;

pub use collector::QuickwitAggregations;
use metrics::SEARCH_METRICS;
use opentelemetry::trace::TraceId;
use quickwit_common::thread_pool::ThreadPool;
use quickwit_common::tower::Pool;
use quickwit_doc_mapper::DocMapper;
Expand Down Expand Up @@ -339,3 +340,9 @@ pub fn searcher_pool_for_test(
}),
)
}

pub(crate) fn get_trace_id() -> TraceId {
use tracing_opentelemetry::OpenTelemetrySpanExt as _;
use opentelemetry::trace::TraceContextExt as _;
tracing::Span::current().context().span().span_context().trace_id()
}
13 changes: 11 additions & 2 deletions quickwit/quickwit-search/src/list_terms.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use quickwit_proto::search::{
SplitIdAndFooterOffsets, SplitSearchError,
};
use quickwit_proto::types::IndexUid;
use quickwit_storage::Storage;
use quickwit_storage::{ByteRangeCache, Storage};
use tantivy::schema::{Field, FieldType};
use tantivy::{ReloadPolicy, Term};
use tracing::{debug, error, info, instrument};
Expand Down Expand Up @@ -216,7 +216,16 @@ async fn leaf_list_terms_single_split(
storage: Arc<dyn Storage>,
split: SplitIdAndFooterOffsets,
) -> crate::Result<LeafListTermsResponse> {
let index = open_index_with_caches(searcher_context, storage, &split, None, true).await?;
let cache =
ByteRangeCache::with_infinite_capacity(&quickwit_storage::STORAGE_METRICS.shortlived_cache);
let index = open_index_with_caches(
searcher_context,
storage,
&split,
None,
Some(Arc::new(cache)),
)
.await?;
let split_schema = index.schema();
let reader = index
.reader_builder()
Expand Down
15 changes: 12 additions & 3 deletions quickwit/quickwit-search/src/root.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use std::collections::{HashMap, HashSet};
use std::sync::OnceLock;
use std::time::Duration;


use anyhow::Context;
use futures::future::try_join_all;
use itertools::Itertools;
Expand Down Expand Up @@ -1107,14 +1108,17 @@ async fn refine_and_list_matches(
/// 2. Merges the search results.
/// 3. Sends fetch docs requests to multiple leaf nodes.
/// 4. Builds the response with docs and returns.
#[instrument(skip_all)]
#[instrument(skip_all, fields(request_id))]
pub async fn root_search(
searcher_context: &SearcherContext,
mut search_request: SearchRequest,
mut metastore: MetastoreServiceClient,
cluster_client: &ClusterClient,
) -> crate::Result<SearchResponse> {
info!(searcher_context = ?searcher_context, search_request = ?search_request);

let trace_id = crate::get_trace_id();

info!(trace_id=%trace_id, search_request = ?search_request);
let start_instant = tokio::time::Instant::now();
let list_indexes_metadatas_request = ListIndexesMetadataRequest {
index_id_patterns: search_request.index_id_patterns.clone(),
Expand Down Expand Up @@ -1169,9 +1173,14 @@ pub async fn root_search(
)
.await;

let elapsed = start_instant.elapsed();

info!(trace_id=%trace_id, num_docs=num_docs, num_splits=num_splits, elapsed_time_millis=%elapsed.as_millis(), "search completed");

if let Ok(search_response) = &mut search_response_result {
search_response.elapsed_time_micros = start_instant.elapsed().as_micros() as u64;
search_response.elapsed_time_micros = elapsed.as_micros() as u64;
}

let label_values = if search_response_result.is_ok() {
["success"]
} else {
Expand Down
7 changes: 5 additions & 2 deletions quickwit/quickwit-search/src/search_stream/leaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use quickwit_proto::search::{
LeafSearchStreamResponse, OutputFormat, SearchRequest, SearchStreamRequest,
SplitIdAndFooterOffsets,
};
use quickwit_storage::Storage;
use quickwit_storage::{ByteRangeCache, Storage};
use tantivy::columnar::{DynamicColumn, HasAssociatedColumnType};
use tantivy::fastfield::Column;
use tantivy::query::Query;
Expand Down Expand Up @@ -127,12 +127,15 @@ async fn leaf_search_stream_single_split(
&split,
);

let cache =
ByteRangeCache::with_infinite_capacity(&quickwit_storage::STORAGE_METRICS.shortlived_cache);

let index = open_index_with_caches(
&searcher_context,
storage,
&split,
Some(doc_mapper.tokenizer_manager()),
true,
Some(Arc::new(cache)),
)
.await?;
let split_schema = index.schema();
Expand Down
5 changes: 5 additions & 0 deletions quickwit/quickwit-storage/src/cache/byte_range_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,11 @@ impl ByteRangeCache {
}
}

/// Overall amount of bytes stored in the cache.
pub fn get_num_bytes(&self) -> u64 {
self.inner.lock().unwrap().num_bytes
}

/// If available, returns the cached view of the slice.
pub fn get_slice(&self, path: &Path, byte_range: Range<usize>) -> Option<OwnedBytes> {
self.inner.lock().unwrap().get_slice(path, byte_range)
Expand Down

0 comments on commit 1e08a77

Please sign in to comment.