Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
rdettai committed Nov 25, 2024
1 parent fd1c9a1 commit 0d7341d
Show file tree
Hide file tree
Showing 11 changed files with 272 additions and 182 deletions.
14 changes: 8 additions & 6 deletions quickwit/quickwit-config/src/node_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,11 +226,7 @@ pub struct SearcherConfig {
#[serde(default)]
#[serde(skip_serializing_if = "Option::is_none")]
pub storage_timeout_policy: Option<StorageTimeoutPolicy>,

// TODO validate that `warmup_memory_budget` is greater than `warmup_single_split_initial_allocation`
// TODO set serde default
pub warmup_memory_budget: ByteSize,
// TODO set serde default
pub warmup_single_split_initial_allocation: ByteSize,
}

Expand Down Expand Up @@ -280,9 +276,7 @@ impl Default for SearcherConfig {
split_cache: None,
request_timeout_secs: Self::default_request_timeout_secs(),
storage_timeout_policy: None,
// TODO change this to the method used for serde default.
warmup_memory_budget: ByteSize::gb(1),
// TODO change this to the method used for serde default.
warmup_single_split_initial_allocation: ByteSize::mb(50),
}
}
Expand Down Expand Up @@ -318,6 +312,14 @@ impl SearcherConfig {
split_cache_limits.max_file_descriptors
);
}
if self.warmup_single_split_initial_allocation > self.warmup_memory_budget {
anyhow::bail!(
"warmup_single_split_initial_allocation ({}) must be lower or equal to \
warmup_memory_budget ({})",
self.warmup_single_split_initial_allocation,
self.warmup_memory_budget
);
}
}
Ok(())
}
Expand Down
4 changes: 3 additions & 1 deletion quickwit/quickwit-config/src/node_config/serialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -616,7 +616,9 @@ mod tests {
min_throughtput_bytes_per_secs: 100_000,
timeout_millis: 2_000,
max_num_retries: 2
})
}),
warmup_memory_budget: ByteSize::gb(1),
warmup_single_split_initial_allocation: ByteSize::mb(50),
}
);
assert_eq!(
Expand Down
33 changes: 22 additions & 11 deletions quickwit/quickwit-directories/src/caching_directory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,19 @@ use tantivy::directory::{FileHandle, OwnedBytes};
use tantivy::{Directory, HasLen};

/// The caching directory is a simple cache that wraps another directory.
#[derive(Clone)]
pub struct CachingDirectory {
pub struct CachingDirectory<T = ()> {
underlying: Arc<dyn Directory>,
// TODO fixme: that's a pretty ugly cache we have here.
cache: ByteRangeCache,
cache: ByteRangeCache<T>,
}

impl<T> Clone for CachingDirectory<T> {
fn clone(&self) -> Self {
CachingDirectory {
underlying: self.underlying.clone(),
cache: self.cache.clone(),
}
}
}

impl CachingDirectory {
Expand All @@ -44,32 +52,35 @@ impl CachingDirectory {
pub fn new_unbounded(underlying: Arc<dyn Directory>) -> CachingDirectory {
let byte_range_cache = ByteRangeCache::with_infinite_capacity(
&quickwit_storage::STORAGE_METRICS.shortlived_cache,
(),
);
CachingDirectory::new(underlying, byte_range_cache)
}
}

impl<T> CachingDirectory<T> {
/// Creates a new CachingDirectory.
///
/// Warming: The resulting CacheDirectory will cache all information without ever
/// removing any item from the cache.
pub fn new(underlying: Arc<dyn Directory>, cache: ByteRangeCache) -> CachingDirectory {
pub fn new(underlying: Arc<dyn Directory>, cache: ByteRangeCache<T>) -> CachingDirectory<T> {
CachingDirectory { underlying, cache }
}
}

impl fmt::Debug for CachingDirectory {
impl<T> fmt::Debug for CachingDirectory<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "CachingDirectory({:?})", self.underlying)
}
}

struct CachingFileHandle {
struct CachingFileHandle<T> {
path: PathBuf,
cache: ByteRangeCache,
cache: ByteRangeCache<T>,
underlying_filehandle: Arc<dyn FileHandle>,
}

impl fmt::Debug for CachingFileHandle {
impl<T> fmt::Debug for CachingFileHandle<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(
f,
Expand All @@ -81,7 +92,7 @@ impl fmt::Debug for CachingFileHandle {
}

#[async_trait]
impl FileHandle for CachingFileHandle {
impl<T: Send + 'static> FileHandle for CachingFileHandle<T> {
fn read_bytes(&self, byte_range: Range<usize>) -> io::Result<OwnedBytes> {
if let Some(bytes) = self.cache.get_slice(&self.path, byte_range.clone()) {
return Ok(bytes);
Expand All @@ -106,13 +117,13 @@ impl FileHandle for CachingFileHandle {
}
}

impl HasLen for CachingFileHandle {
impl<T> HasLen for CachingFileHandle<T> {
fn len(&self) -> usize {
self.underlying_filehandle.len()
}
}

impl Directory for CachingDirectory {
impl<T: Send + 'static> Directory for CachingDirectory<T> {
fn exists(&self, path: &Path) -> std::result::Result<bool, OpenReadError> {
self.underlying.exists(path)
}
Expand Down
13 changes: 10 additions & 3 deletions quickwit/quickwit-search/src/collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1805,7 +1805,8 @@ mod tests {
failed_splits: Vec::new(),
num_attempted_splits: 3,
num_successful_splits: 3,
intermediate_aggregation_result: None
intermediate_aggregation_result: None,
resource_stats: None,
}
);

Expand Down Expand Up @@ -1844,6 +1845,7 @@ mod tests {
num_attempted_splits: 3,
num_successful_splits: 3,
intermediate_aggregation_result: None,
resource_stats: None,
},
LeafSearchResponse {
num_hits: 10,
Expand All @@ -1862,6 +1864,7 @@ mod tests {
num_attempted_splits: 2,
num_successful_splits: 1,
intermediate_aggregation_result: None,
resource_stats: None,
},
],
);
Expand Down Expand Up @@ -1893,7 +1896,8 @@ mod tests {
}],
num_attempted_splits: 5,
num_successful_splits: 4,
intermediate_aggregation_result: None
intermediate_aggregation_result: None,
resource_stats: None,
}
);

Expand Down Expand Up @@ -1933,6 +1937,7 @@ mod tests {
num_attempted_splits: 3,
num_successful_splits: 3,
intermediate_aggregation_result: None,
resource_stats: None,
},
LeafSearchResponse {
num_hits: 10,
Expand All @@ -1951,6 +1956,7 @@ mod tests {
num_attempted_splits: 2,
num_successful_splits: 1,
intermediate_aggregation_result: None,
resource_stats: None,
},
],
);
Expand Down Expand Up @@ -1982,7 +1988,8 @@ mod tests {
}],
num_attempted_splits: 5,
num_successful_splits: 4,
intermediate_aggregation_result: None
intermediate_aggregation_result: None,
resource_stats: None,
}
);
// TODO would be nice to test aggregation too.
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-search/src/fetch_docs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ async fn fetch_docs_in_split(
global_doc_addrs.sort_by_key(|doc| doc.doc_addr);
// Opens the index without the ephemeral unbounded cache, this cache is indeed not useful
// when fetching docs as we will fetch them only once.
let mut index = open_index_with_caches(
let mut index = open_index_with_caches::<()>(
&searcher_context,
index_storage,
split,
Expand Down
39 changes: 21 additions & 18 deletions quickwit/quickwit-search/src/leaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ use quickwit_common::pretty::PrettySample;
use quickwit_directories::{CachingDirectory, HotDirectory, StorageDirectory};
use quickwit_doc_mapper::{DocMapper, TermRange, WarmupInfo};
use quickwit_proto::search::{
CountHits, LeafSearchRequest, LeafSearchResponse, PartialHit, SearchRequest, SortOrder, SortValue, SplitIdAndFooterOffsets, SplitSearchError
CountHits, LeafSearchRequest, LeafSearchResponse, PartialHit, SearchRequest, SortOrder,
SortValue, SplitIdAndFooterOffsets, SplitSearchError,
};
use quickwit_query::query_ast::{BoolQuery, QueryAst, QueryAstTransformer, RangeQuery, TermQuery};
use quickwit_query::tokenizers::TokenizerManager;
Expand Down Expand Up @@ -127,14 +128,18 @@ pub(crate) async fn open_split_bundle(
/// Opens a `tantivy::Index` for the given split with several cache layers:
/// - A split footer cache given by `SearcherContext.split_footer_cache`.
/// - A fast fields cache given by `SearcherContext.storage_long_term_cache`.
/// - An ephemeral unbounded cache directory whose lifetime is tied to the returned `Index`.
/// - An ephemeral unbounded cache directory whose lifetime is tied to the
/// returned `Index`.
///
/// TODO: generic T should be forced to SearcherPermit, but this requires the
/// search stream to also request permits.
#[instrument(skip_all, fields(split_footer_start=split_and_footer_offsets.split_footer_start, split_footer_end=split_and_footer_offsets.split_footer_end))]
pub(crate) async fn open_index_with_caches(
pub(crate) async fn open_index_with_caches<T: Send + 'static>(
searcher_context: &SearcherContext,
index_storage: Arc<dyn Storage>,
split_and_footer_offsets: &SplitIdAndFooterOffsets,
tokenizer_manager: Option<&TokenizerManager>,
ephemeral_unbounded_cache: Option<ByteRangeCache>,
ephemeral_unbounded_cache: Option<ByteRangeCache<T>>,
) -> anyhow::Result<Index> {
// Let's add a storage proxy to retry `get_slice` requests if they are taking too long,
// if configured in the searcher config.
Expand Down Expand Up @@ -376,7 +381,7 @@ async fn leaf_search_single_split(
doc_mapper: Arc<DocMapper>,
split_filter: Arc<RwLock<CanSplitDoBetter>>,
aggregations_limits: AggregationLimitsGuard,
search_permit: &mut SearchPermit,
search_permit: SearchPermit,
) -> crate::Result<LeafSearchResponse> {
rewrite_request(
&mut search_request,
Expand All @@ -402,8 +407,10 @@ async fn leaf_search_single_split(
}

let split_id = split.split_id.to_string();
let byte_range_cache =
ByteRangeCache::with_infinite_capacity(&quickwit_storage::STORAGE_METRICS.shortlived_cache);
let byte_range_cache = ByteRangeCache::with_infinite_capacity(
&quickwit_storage::STORAGE_METRICS.shortlived_cache,
search_permit,
);
let index = open_index_with_caches(
searcher_context,
storage,
Expand Down Expand Up @@ -433,9 +440,8 @@ async fn leaf_search_single_split(
warmup(&searcher, &warmup_info).await?;
let warmup_end = Instant::now();
let warmup_duration: Duration = warmup_end.duration_since(warmup_start);

let short_lived_cache_num_bytes: u64 = byte_range_cache.get_num_bytes();
search_permit.set_actual_memory_usage_and_release_permit_after(short_lived_cache_num_bytes);
let short_lived_cache_num_bytes = byte_range_cache.get_num_bytes();
byte_range_cache.track(|permit| permit.warmup_completed(short_lived_cache_num_bytes));
let split_num_docs = split.num_docs;

let span = info_span!("tantivy_search");
Expand Down Expand Up @@ -1281,15 +1287,15 @@ pub async fn leaf_search(
// do no interleave with other leaf search requests.
let permit_futures = searcher_context
.search_permit_provider
.get_permits(split_with_req.len());
.get_permits(split_with_req.len())
.await;

for ((split, mut request), permit_fut) in
split_with_req.into_iter().zip(permit_futures.into_iter())
{
let leaf_split_search_permit = permit_fut
.instrument(info_span!("waiting_for_leaf_search_split_semaphore"))
.await
.expect("Failed to acquire permit. This should never happen! Please, report on https://github.com/quickwit-oss/quickwit/issues.");
.await;

let can_be_better = check_optimize_search_request(&mut request, &split, &split_filter);
if !can_be_better && !run_all_splits {
Expand Down Expand Up @@ -1379,7 +1385,7 @@ async fn leaf_search_single_split_wrapper(
split: SplitIdAndFooterOffsets,
split_filter: Arc<RwLock<CanSplitDoBetter>>,
incremental_merge_collector: Arc<Mutex<IncrementalCollector>>,
mut search_permit: SearchPermit,
search_permit: SearchPermit,
aggregations_limits: AggregationLimitsGuard,
) {
crate::SEARCH_METRICS.leaf_searches_splits_total.inc();
Expand All @@ -1394,13 +1400,10 @@ async fn leaf_search_single_split_wrapper(
doc_mapper,
split_filter.clone(),
aggregations_limits,
&mut search_permit,
search_permit,
)
.await;

// We explicitly drop it, to highlight it to the reader
std::mem::drop(search_permit);

if leaf_search_single_split_res.is_ok() {
timer.observe_duration();
}
Expand Down
2 changes: 2 additions & 0 deletions quickwit/quickwit-search/src/leaf_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,7 @@ mod tests {
sort_value2: None,
split_id: "split_1".to_string(),
}],
resource_stats: None,
};

assert!(cache.get(split_1.clone(), query_1.clone()).is_none());
Expand Down Expand Up @@ -342,6 +343,7 @@ mod tests {
sort_value2: None,
split_id: "split_1".to_string(),
}],
resource_stats: None,
};

// for split_1, 1 and 1bis cover different timestamp ranges
Expand Down
14 changes: 10 additions & 4 deletions quickwit/quickwit-search/src/list_terms.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use tracing::{debug, error, info, instrument};

use crate::leaf::open_index_with_caches;
use crate::search_job_placer::group_jobs_by_index_id;
use crate::search_permit_provider::SearchPermit;
use crate::{resolve_index_patterns, ClusterClient, SearchError, SearchJob, SearcherContext};

/// Performs a distributed list terms.
Expand Down Expand Up @@ -215,9 +216,12 @@ async fn leaf_list_terms_single_split(
search_request: &ListTermsRequest,
storage: Arc<dyn Storage>,
split: SplitIdAndFooterOffsets,
search_permit: SearchPermit,
) -> crate::Result<LeafListTermsResponse> {
let cache =
ByteRangeCache::with_infinite_capacity(&quickwit_storage::STORAGE_METRICS.shortlived_cache);
let cache = ByteRangeCache::with_infinite_capacity(
&quickwit_storage::STORAGE_METRICS.shortlived_cache,
search_permit,
);
let index =
open_index_with_caches(searcher_context, storage, &split, None, Some(cache)).await?;
let split_schema = index.schema();
Expand Down Expand Up @@ -330,15 +334,16 @@ pub async fn leaf_list_terms(
info!(split_offsets = ?PrettySample::new(splits, 5));
let permits = searcher_context
.search_permit_provider
.get_permits(splits.len());
.get_permits(splits.len())
.await;
let leaf_search_single_split_futures: Vec<_> = splits
.iter()
.zip(permits.into_iter())
.map(|(split, search_permit_recv)| {
let index_storage_clone = index_storage.clone();
let searcher_context_clone = searcher_context.clone();
async move {
let _leaf_split_search_permit = search_permit_recv.await;
let leaf_split_search_permit = search_permit_recv.await;
// TODO dedicated counter and timer?
crate::SEARCH_METRICS.leaf_searches_splits_total.inc();
let timer = crate::SEARCH_METRICS
Expand All @@ -349,6 +354,7 @@ pub async fn leaf_list_terms(
request,
index_storage_clone,
split.clone(),
leaf_split_search_permit,
)
.await;
timer.observe_duration();
Expand Down
Loading

0 comments on commit 0d7341d

Please sign in to comment.