Skip to content

Commit

Permalink
Adds a concept of request_id for logging/correlation purpose.
Browse files Browse the repository at this point in the history
We also measure the amount of memory taken by a split search, and log
this.
  • Loading branch information
fulmicoton committed Nov 8, 2024
1 parent d4ad40d commit 0c91c9e
Show file tree
Hide file tree
Showing 10 changed files with 567 additions and 261 deletions.
707 changes: 475 additions & 232 deletions quickwit/Cargo.lock

Large diffs are not rendered by default.

22 changes: 14 additions & 8 deletions quickwit/quickwit-directories/src/caching_directory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use tantivy::{Directory, HasLen};
pub struct CachingDirectory {
underlying: Arc<dyn Directory>,
// TODO fixme: that's a pretty ugly cache we have here.
cache: Arc<ByteRangeCache>,
cache: ByteRangeCache,
}

impl CachingDirectory {
Expand All @@ -42,12 +42,18 @@ impl CachingDirectory {
/// Warming: The resulting CacheDirectory will cache all information without ever
/// removing any item from the cache.
pub fn new_unbounded(underlying: Arc<dyn Directory>) -> CachingDirectory {
CachingDirectory {
underlying,
cache: Arc::new(ByteRangeCache::with_infinite_capacity(
&quickwit_storage::STORAGE_METRICS.shortlived_cache,
)),
}
let byte_range_cache = ByteRangeCache::with_infinite_capacity(
&quickwit_storage::STORAGE_METRICS.shortlived_cache,
);
CachingDirectory::new(underlying, byte_range_cache)
}

/// Creates a new CachingDirectory.
///
/// Warming: The resulting CacheDirectory will cache all information without ever
/// removing any item from the cache.
pub fn new(underlying: Arc<dyn Directory>, cache: ByteRangeCache) -> CachingDirectory {
CachingDirectory { underlying, cache }
}
}

Expand All @@ -59,7 +65,7 @@ impl fmt::Debug for CachingDirectory {

struct CachingFileHandle {
path: PathBuf,
cache: Arc<ByteRangeCache>,
cache: ByteRangeCache,
underlying_filehandle: Arc<dyn FileHandle>,
}

Expand Down
2 changes: 2 additions & 0 deletions quickwit/quickwit-search/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ http = { workspace = true }
itertools = { workspace = true }
mockall = { workspace = true }
once_cell = { workspace = true }
opentelemetry = { workspace = true }
postcard = { workspace = true }
prost = { workspace = true }
rayon = { workspace = true }
Expand All @@ -33,6 +34,7 @@ tokio = { workspace = true }
tokio-stream = { workspace = true }
tower = { workspace = true }
tracing = { workspace = true }
tracing-opentelemetry = { workspace = true }
ttl_cache = { workspace = true }
ulid = { workspace = true }
utoipa = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-search/src/fetch_docs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ async fn fetch_docs_in_split(
index_storage,
split,
Some(doc_mapper.tokenizer_manager()),
false,
None,
)
.await
.context("open-index-for-split")?;
Expand Down
18 changes: 12 additions & 6 deletions quickwit/quickwit-search/src/leaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ use quickwit_proto::search::{
use quickwit_query::query_ast::{BoolQuery, QueryAst, QueryAstTransformer, RangeQuery, TermQuery};
use quickwit_query::tokenizers::TokenizerManager;
use quickwit_storage::{
wrap_storage_with_cache, BundleStorage, MemorySizedCache, OwnedBytes, SplitCache, Storage,
StorageResolver, TimeoutAndRetryStorage,
wrap_storage_with_cache, BundleStorage, ByteRangeCache, MemorySizedCache, OwnedBytes,
SplitCache, Storage, StorageResolver, TimeoutAndRetryStorage,
};
use tantivy::aggregation::agg_req::{AggregationVariants, Aggregations};
use tantivy::aggregation::AggregationLimitsGuard;
Expand Down Expand Up @@ -134,7 +134,7 @@ pub(crate) async fn open_index_with_caches(
index_storage: Arc<dyn Storage>,
split_and_footer_offsets: &SplitIdAndFooterOffsets,
tokenizer_manager: Option<&TokenizerManager>,
ephemeral_unbounded_cache: bool,
ephemeral_unbounded_cache: Option<ByteRangeCache>,
) -> anyhow::Result<Index> {
// Let's add a storage proxy to retry `get_slice` requests if they are taking too long,
// if configured in the searcher config.
Expand Down Expand Up @@ -166,8 +166,8 @@ pub(crate) async fn open_index_with_caches(

let directory = StorageDirectory::new(bundle_storage_with_cache);

let hot_directory = if ephemeral_unbounded_cache {
let caching_directory = CachingDirectory::new_unbounded(Arc::new(directory));
let hot_directory = if let Some(cache) = ephemeral_unbounded_cache {
let caching_directory = CachingDirectory::new(Arc::new(directory), cache);
HotDirectory::open(caching_directory, hotcache_bytes.read_bytes()?)?
} else {
HotDirectory::open(directory, hotcache_bytes.read_bytes()?)?
Expand Down Expand Up @@ -400,12 +400,14 @@ async fn leaf_search_single_split(
}

let split_id = split.split_id.to_string();
let byte_range_cache =
ByteRangeCache::with_infinite_capacity(&quickwit_storage::STORAGE_METRICS.shortlived_cache);
let index = open_index_with_caches(
searcher_context,
storage,
&split,
Some(doc_mapper.tokenizer_manager()),
true,
Some(byte_range_cache.clone()),
)
.await?;
let split_schema = index.schema();
Expand All @@ -426,6 +428,10 @@ async fn leaf_search_single_split(
warmup_info.simplify();

warmup(&searcher, &warmup_info).await?;

let trace_id = crate::get_trace_id();
info!(trace_id=%trace_id, split_id=%split.split_id.as_str(), num_docs=split.num_docs, input_data=byte_range_cache.get_num_bytes(), "split search input data memory");

let span = info_span!("tantivy_search");

let (search_request, leaf_search_response) = {
Expand Down
11 changes: 11 additions & 0 deletions quickwit/quickwit-search/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ mod tests;

pub use collector::QuickwitAggregations;
use metrics::SEARCH_METRICS;
use opentelemetry::trace::TraceId;
use quickwit_common::thread_pool::ThreadPool;
use quickwit_common::tower::Pool;
use quickwit_doc_mapper::DocMapper;
Expand Down Expand Up @@ -340,3 +341,13 @@ pub fn searcher_pool_for_test(
}),
)
}

pub(crate) fn get_trace_id() -> TraceId {
use opentelemetry::trace::TraceContextExt as _;
use tracing_opentelemetry::OpenTelemetrySpanExt as _;
tracing::Span::current()
.context()
.span()
.span_context()
.trace_id()
}
7 changes: 5 additions & 2 deletions quickwit/quickwit-search/src/list_terms.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use quickwit_proto::search::{
SplitIdAndFooterOffsets, SplitSearchError,
};
use quickwit_proto::types::IndexUid;
use quickwit_storage::Storage;
use quickwit_storage::{ByteRangeCache, Storage};
use tantivy::schema::{Field, FieldType};
use tantivy::{ReloadPolicy, Term};
use tracing::{debug, error, info, instrument};
Expand Down Expand Up @@ -216,7 +216,10 @@ async fn leaf_list_terms_single_split(
storage: Arc<dyn Storage>,
split: SplitIdAndFooterOffsets,
) -> crate::Result<LeafListTermsResponse> {
let index = open_index_with_caches(searcher_context, storage, &split, None, true).await?;
let cache =
ByteRangeCache::with_infinite_capacity(&quickwit_storage::STORAGE_METRICS.shortlived_cache);
let index =
open_index_with_caches(searcher_context, storage, &split, None, Some(cache)).await?;
let split_schema = index.schema();
let reader = index
.reader_builder()
Expand Down
13 changes: 10 additions & 3 deletions quickwit/quickwit-search/src/root.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1107,14 +1107,16 @@ async fn refine_and_list_matches(
/// 2. Merges the search results.
/// 3. Sends fetch docs requests to multiple leaf nodes.
/// 4. Builds the response with docs and returns.
#[instrument(skip_all)]
#[instrument(skip_all, fields(request_id))]
pub async fn root_search(
searcher_context: &SearcherContext,
mut search_request: SearchRequest,
mut metastore: MetastoreServiceClient,
cluster_client: &ClusterClient,
) -> crate::Result<SearchResponse> {
info!(searcher_context = ?searcher_context, search_request = ?search_request);
let trace_id = crate::get_trace_id();

info!(trace_id=%trace_id, search_request = ?search_request);
let start_instant = tokio::time::Instant::now();
let list_indexes_metadatas_request = ListIndexesMetadataRequest {
index_id_patterns: search_request.index_id_patterns.clone(),
Expand Down Expand Up @@ -1169,9 +1171,14 @@ pub async fn root_search(
)
.await;

let elapsed = start_instant.elapsed();

info!(trace_id=%trace_id, num_docs=num_docs, num_splits=num_splits, elapsed_time_millis=%elapsed.as_millis(), "search completed");

if let Ok(search_response) = &mut search_response_result {
search_response.elapsed_time_micros = start_instant.elapsed().as_micros() as u64;
search_response.elapsed_time_micros = elapsed.as_micros() as u64;
}

let label_values = if search_response_result.is_ok() {
["success"]
} else {
Expand Down
7 changes: 5 additions & 2 deletions quickwit/quickwit-search/src/search_stream/leaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use quickwit_proto::search::{
LeafSearchStreamResponse, OutputFormat, SearchRequest, SearchStreamRequest,
SplitIdAndFooterOffsets,
};
use quickwit_storage::Storage;
use quickwit_storage::{ByteRangeCache, Storage};
use tantivy::columnar::{DynamicColumn, HasAssociatedColumnType};
use tantivy::fastfield::Column;
use tantivy::query::Query;
Expand Down Expand Up @@ -127,12 +127,15 @@ async fn leaf_search_stream_single_split(
&split,
);

let cache =
ByteRangeCache::with_infinite_capacity(&quickwit_storage::STORAGE_METRICS.shortlived_cache);

let index = open_index_with_caches(
&searcher_context,
storage,
&split,
Some(doc_mapper.tokenizer_manager()),
true,
Some(cache),
)
.await?;
let split_schema = index.schema();
Expand Down
39 changes: 32 additions & 7 deletions quickwit/quickwit-storage/src/cache/byte_range_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ use std::borrow::{Borrow, Cow};
use std::collections::BTreeMap;
use std::ops::Range;
use std::path::{Path, PathBuf};
use std::sync::Mutex;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex};

use tantivy::directory::OwnedBytes;

Expand Down Expand Up @@ -344,30 +345,54 @@ impl<T: 'static + ToOwned + ?Sized> Drop for NeedMutByteRangeCache<T> {
/// cached data, the changes may or may not get recorded.
///
/// At the moment this is hardly a cache as it features no eviction policy.
#[derive(Clone)]
pub struct ByteRangeCache {
inner: Mutex<NeedMutByteRangeCache<Path>>,
inner_mutex: Arc<Mutex<Inner>>,
}

struct Inner {
num_stored_bytes: AtomicU64,
need_mut_byte_range_cache: NeedMutByteRangeCache<Path>,
}

impl ByteRangeCache {
/// Creates a slice cache that never removes any entry.
pub fn with_infinite_capacity(cache_counters: &'static CacheMetrics) -> Self {
let need_mut_byte_range_cache =
NeedMutByteRangeCache::with_infinite_capacity(cache_counters);
let inner = Inner {
num_stored_bytes: AtomicU64::default(),
need_mut_byte_range_cache,
};
ByteRangeCache {
inner: Mutex::new(NeedMutByteRangeCache::with_infinite_capacity(
cache_counters,
)),
inner_mutex: Arc::new(Mutex::new(inner)),
}
}

/// Overall amount of bytes stored in the cache.
pub fn get_num_bytes(&self) -> u64 {
self.inner_mutex
.lock()
.unwrap()
.num_stored_bytes
.load(Ordering::Relaxed)
}

/// If available, returns the cached view of the slice.
pub fn get_slice(&self, path: &Path, byte_range: Range<usize>) -> Option<OwnedBytes> {
self.inner.lock().unwrap().get_slice(path, byte_range)
self.inner_mutex
.lock()
.unwrap()
.need_mut_byte_range_cache
.get_slice(path, byte_range)
}

/// Put the given amount of data in the cache.
pub fn put_slice(&self, path: PathBuf, byte_range: Range<usize>, bytes: OwnedBytes) {
self.inner
self.inner_mutex
.lock()
.unwrap()
.need_mut_byte_range_cache
.put_slice(path, byte_range, bytes)
}
}
Expand Down

0 comments on commit 0c91c9e

Please sign in to comment.