Skip to content

Commit

Permalink
Adds a concept of request_id for logging/correlation purpose.
Browse files Browse the repository at this point in the history
We also measure the amount of memory taken by a split search, and log
this.
  • Loading branch information
fulmicoton committed Oct 25, 2024
1 parent 02a5b6a commit 020ad53
Show file tree
Hide file tree
Showing 12 changed files with 78 additions and 21 deletions.
18 changes: 12 additions & 6 deletions quickwit/quickwit-directories/src/caching_directory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,18 @@ impl CachingDirectory {
/// Warming: The resulting CacheDirectory will cache all information without ever
/// removing any item from the cache.
pub fn new_unbounded(underlying: Arc<dyn Directory>) -> CachingDirectory {
CachingDirectory {
underlying,
cache: Arc::new(ByteRangeCache::with_infinite_capacity(
&quickwit_storage::STORAGE_METRICS.shortlived_cache,
)),
}
let byte_range_cache = ByteRangeCache::with_infinite_capacity(
&quickwit_storage::STORAGE_METRICS.shortlived_cache,
);
CachingDirectory::new(underlying, Arc::new(byte_range_cache))
}

/// Creates a new CachingDirectory.
///
/// Warming: The resulting CacheDirectory will cache all information without ever
/// removing any item from the cache.
pub fn new(underlying: Arc<dyn Directory>, cache: Arc<ByteRangeCache>) -> CachingDirectory {
CachingDirectory { underlying, cache }
}
}

Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-directories/src/hot_directory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,7 @@ pub fn write_hotcache<D: Directory>(
// We use the caching directory here in order to defensively ensure that
// the content of the directory that will be written in the hotcache is precisely
// the same that was read on the first pass.

let caching_directory = CachingDirectory::new_unbounded(Arc::new(directory));
let debug_proxy_directory = DebugProxyDirectory::wrap(caching_directory);
let index = Index::open(debug_proxy_directory.clone())?;
Expand Down
4 changes: 4 additions & 0 deletions quickwit/quickwit-proto/protos/quickwit/search.proto
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,10 @@ message ListFields {
// -- Search -------------------

message SearchRequest {
// id used for logging/debugging purpose.
// If left empty, the root node will generate one.
string request_id = 18;

// Index ID patterns
repeated string index_id_patterns = 1;

Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion quickwit/quickwit-search/src/fetch_docs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ async fn fetch_docs_in_split(
index_storage,
split,
Some(doc_mapper.tokenizer_manager()),
false,
None,
)
.await
.context("open-index-for-split")?;
Expand Down
18 changes: 12 additions & 6 deletions quickwit/quickwit-search/src/leaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ use quickwit_proto::search::{
use quickwit_query::query_ast::{BoolQuery, QueryAst, QueryAstTransformer, RangeQuery, TermQuery};
use quickwit_query::tokenizers::TokenizerManager;
use quickwit_storage::{
wrap_storage_with_cache, BundleStorage, MemorySizedCache, OwnedBytes, SplitCache, Storage,
StorageResolver, TimeoutAndRetryStorage,
wrap_storage_with_cache, BundleStorage, ByteRangeCache, MemorySizedCache, OwnedBytes,
SplitCache, Storage, StorageResolver, TimeoutAndRetryStorage,
};
use tantivy::aggregation::agg_req::{AggregationVariants, Aggregations};
use tantivy::aggregation::AggregationLimitsGuard;
Expand Down Expand Up @@ -133,7 +133,7 @@ pub(crate) async fn open_index_with_caches(
index_storage: Arc<dyn Storage>,
split_and_footer_offsets: &SplitIdAndFooterOffsets,
tokenizer_manager: Option<&TokenizerManager>,
ephemeral_unbounded_cache: bool,
ephemeral_unbounded_cache: Option<Arc<ByteRangeCache>>,
) -> anyhow::Result<Index> {
// Let's add a storage proxy to retry `get_slice` requests if they are taking too long,
// if configured in the searcher config.
Expand Down Expand Up @@ -165,8 +165,8 @@ pub(crate) async fn open_index_with_caches(

let directory = StorageDirectory::new(bundle_storage_with_cache);

let hot_directory = if ephemeral_unbounded_cache {
let caching_directory = CachingDirectory::new_unbounded(Arc::new(directory));
let hot_directory = if let Some(cache) = ephemeral_unbounded_cache {
let caching_directory = CachingDirectory::new(Arc::new(directory), cache);
HotDirectory::open(caching_directory, hotcache_bytes.read_bytes()?)?
} else {
HotDirectory::open(directory, hotcache_bytes.read_bytes()?)?
Expand Down Expand Up @@ -399,12 +399,15 @@ async fn leaf_search_single_split(
}

let split_id = split.split_id.to_string();
let byte_range_cache = Arc::new(ByteRangeCache::with_infinite_capacity(
&quickwit_storage::STORAGE_METRICS.shortlived_cache,
));
let index = open_index_with_caches(
searcher_context,
storage,
&split,
Some(doc_mapper.tokenizer_manager()),
true,
Some(byte_range_cache.clone()),
)
.await?;
let split_schema = index.schema();
Expand All @@ -425,6 +428,9 @@ async fn leaf_search_single_split(
warmup_info.simplify();

warmup(&searcher, &warmup_info).await?;

info!(request=%search_request.request_id, input_data=byte_range_cache.get_num_bytes(), "split search input data memory");

let span = info_span!("tantivy_search");

let (search_request, leaf_search_response) = {
Expand Down
13 changes: 11 additions & 2 deletions quickwit/quickwit-search/src/list_terms.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use quickwit_proto::search::{
SplitIdAndFooterOffsets, SplitSearchError,
};
use quickwit_proto::types::IndexUid;
use quickwit_storage::Storage;
use quickwit_storage::{ByteRangeCache, Storage};
use tantivy::schema::{Field, FieldType};
use tantivy::{ReloadPolicy, Term};
use tracing::{debug, error, info, instrument};
Expand Down Expand Up @@ -216,7 +216,16 @@ async fn leaf_list_terms_single_split(
storage: Arc<dyn Storage>,
split: SplitIdAndFooterOffsets,
) -> crate::Result<LeafListTermsResponse> {
let index = open_index_with_caches(searcher_context, storage, &split, None, true).await?;
let cache =
ByteRangeCache::with_infinite_capacity(&quickwit_storage::STORAGE_METRICS.shortlived_cache);
let index = open_index_with_caches(
searcher_context,
storage,
&split,
None,
Some(Arc::new(cache)),
)
.await?;
let split_schema = index.schema();
let reader = index
.reader_builder()
Expand Down
20 changes: 17 additions & 3 deletions quickwit/quickwit-search/src/root.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ use tantivy::collector::Collector;
use tantivy::schema::{Field, FieldEntry, FieldType, Schema};
use tantivy::TantivyError;
use tracing::{debug, info, info_span, instrument};
use ulid::Ulid;

use crate::cluster_client::ClusterClient;
use crate::collector::{make_merge_collector, QuickwitAggregations};
Expand Down Expand Up @@ -352,6 +353,7 @@ fn simplify_search_request_for_scroll_api(req: &SearchRequest) -> crate::Result<

// We do not mutate
Ok(SearchRequest {
request_id: req.request_id.clone(),
index_id_patterns: req.index_id_patterns.clone(),
query_ast: req.query_ast.clone(),
start_timestamp: req.start_timestamp,
Expand Down Expand Up @@ -1107,14 +1109,21 @@ async fn refine_and_list_matches(
/// 2. Merges the search results.
/// 3. Sends fetch docs requests to multiple leaf nodes.
/// 4. Builds the response with docs and returns.
#[instrument(skip_all)]
#[instrument(skip_all, fields(request_id))]
pub async fn root_search(
searcher_context: &SearcherContext,
mut search_request: SearchRequest,
mut metastore: MetastoreServiceClient,
cluster_client: &ClusterClient,
) -> crate::Result<SearchResponse> {
info!(searcher_context = ?searcher_context, search_request = ?search_request);
if search_request.request_id.is_empty() {
search_request.request_id = Ulid::new().to_string();
}

let request_id = search_request.request_id.clone();
tracing::Span::current().record("request_id", request_id.as_str());

info!(search_request = ?search_request);
let start_instant = tokio::time::Instant::now();
let list_indexes_metadatas_request = ListIndexesMetadataRequest {
index_id_patterns: search_request.index_id_patterns.clone(),
Expand Down Expand Up @@ -1169,8 +1178,13 @@ pub async fn root_search(
)
.await;

let elapsed = start_instant.elapsed();

info!(request_id=%request_id.as_str(), num_docs=num_docs, num_splits=num_splits, elapsed_time_millis=%elapsed.as_millis(), "search completed");
En0KEwoEMTIzNBgDIgkKBwgKEgMYgAgSJAgAEiAs2CFWr5WyHHWEiMhTXxVNw4gP7PlADPaGfr_AQk9WohpA6LZTjFfFhcFQrMsp2O7bOI9BOzP-jIE5PGhha62HDfX4t5FLQivX5rUhH5iTv2c-rd0kDSazrww4cD1UCeytDSIiCiCfMgpVPOuqq371l1wHVhCXoIscKW-wrwiKN80vR_Rfzg==

if let Ok(search_response) = &mut search_response_result {
search_response.elapsed_time_micros = start_instant.elapsed().as_micros() as u64;
search_response.elapsed_time_micros = elapsed.as_micros() as u64;
}
let label_values = if search_response_result.is_ok() {
["success"]
Expand Down
7 changes: 5 additions & 2 deletions quickwit/quickwit-search/src/search_stream/leaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use quickwit_proto::search::{
LeafSearchStreamResponse, OutputFormat, SearchRequest, SearchStreamRequest,
SplitIdAndFooterOffsets,
};
use quickwit_storage::Storage;
use quickwit_storage::{ByteRangeCache, Storage};
use tantivy::columnar::{DynamicColumn, HasAssociatedColumnType};
use tantivy::fastfield::Column;
use tantivy::query::Query;
Expand Down Expand Up @@ -127,12 +127,15 @@ async fn leaf_search_stream_single_split(
&split,
);

let cache =
ByteRangeCache::with_infinite_capacity(&quickwit_storage::STORAGE_METRICS.shortlived_cache);

let index = open_index_with_caches(
&searcher_context,
storage,
&split,
Some(doc_mapper.tokenizer_manager()),
true,
Some(Arc::new(cache)),
)
.await?;
let split_schema = index.schema();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,7 @@ fn build_request_for_es_api(

Ok((
quickwit_proto::search::SearchRequest {
request_id: String::default(),
index_id_patterns,
query_ast: serde_json::to_string(&query_ast).expect("Failed to serialize QueryAst"),
max_hits,
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-serve/src/search_api/rest_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@ pub fn search_request_from_api_request(
let query_ast = query_ast_from_user_text(&search_request.query, search_request.search_fields);
let query_ast_json = serde_json::to_string(&query_ast)?;
let search_request = quickwit_proto::search::SearchRequest {
request_id: String::default(),
index_id_patterns,
query_ast: query_ast_json,
snippet_fields: search_request.snippet_fields.unwrap_or_default(),
Expand Down
9 changes: 9 additions & 0 deletions quickwit/quickwit-storage/src/cache/byte_range_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ impl<T: 'static + ToOwned + ?Sized + Ord> NeedMutByteRangeCache<T> {
}
}

fn get_num_bytes(&self) -> u64 {
self.num_bytes
}

fn get_slice(&mut self, tag: &T, byte_range: Range<usize>) -> Option<OwnedBytes> {
if byte_range.start == byte_range.end {
return Some(OwnedBytes::empty());
Expand Down Expand Up @@ -356,6 +360,11 @@ impl ByteRangeCache {
}
}

/// Overall amount of bytes stored in the cache.
pub fn get_num_bytes(&self) -> u64 {
self.inner.lock().unwrap().get_num_bytes()
}

/// If available, returns the cached view of the slice.
pub fn get_slice(&self, path: &Path, byte_range: Range<usize>) -> Option<OwnedBytes> {
self.inner.lock().unwrap().get_slice(path, byte_range)
Expand Down

0 comments on commit 020ad53

Please sign in to comment.