Skip to content

Commit

Permalink
Make search permit provider into an actor.
Browse files Browse the repository at this point in the history
Also attach the permit to the actual memory cache to ensure memory is freed at the right moment.
  • Loading branch information
rdettai committed Nov 26, 2024
1 parent fd1c9a1 commit 08bff06
Show file tree
Hide file tree
Showing 12 changed files with 278 additions and 184 deletions.
14 changes: 8 additions & 6 deletions quickwit/quickwit-config/src/node_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,11 +226,7 @@ pub struct SearcherConfig {
#[serde(default)]
#[serde(skip_serializing_if = "Option::is_none")]
pub storage_timeout_policy: Option<StorageTimeoutPolicy>,

// TODO validate that `warmup_memory_budget` is greater than `warmup_single_split_initial_allocation`
// TODO set serde default
pub warmup_memory_budget: ByteSize,
// TODO set serde default
pub warmup_single_split_initial_allocation: ByteSize,
}

Expand Down Expand Up @@ -280,9 +276,7 @@ impl Default for SearcherConfig {
split_cache: None,
request_timeout_secs: Self::default_request_timeout_secs(),
storage_timeout_policy: None,
// TODO change this to the method used for serde default.
warmup_memory_budget: ByteSize::gb(1),
// TODO change this to the method used for serde default.
warmup_single_split_initial_allocation: ByteSize::mb(50),
}
}
Expand Down Expand Up @@ -318,6 +312,14 @@ impl SearcherConfig {
split_cache_limits.max_file_descriptors
);
}
if self.warmup_single_split_initial_allocation > self.warmup_memory_budget {
anyhow::bail!(
"warmup_single_split_initial_allocation ({}) must be lower or equal to \
warmup_memory_budget ({})",
self.warmup_single_split_initial_allocation,
self.warmup_memory_budget
);
}
}
Ok(())
}
Expand Down
4 changes: 3 additions & 1 deletion quickwit/quickwit-config/src/node_config/serialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -616,7 +616,9 @@ mod tests {
min_throughtput_bytes_per_secs: 100_000,
timeout_millis: 2_000,
max_num_retries: 2
})
}),
warmup_memory_budget: ByteSize::gb(1),
warmup_single_split_initial_allocation: ByteSize::mb(50),
}
);
assert_eq!(
Expand Down
33 changes: 22 additions & 11 deletions quickwit/quickwit-directories/src/caching_directory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,19 @@ use tantivy::directory::{FileHandle, OwnedBytes};
use tantivy::{Directory, HasLen};

/// The caching directory is a simple cache that wraps another directory.
#[derive(Clone)]
pub struct CachingDirectory {
pub struct CachingDirectory<T = ()> {
underlying: Arc<dyn Directory>,
// TODO fixme: that's a pretty ugly cache we have here.
cache: ByteRangeCache,
cache: ByteRangeCache<T>,
}

impl<T> Clone for CachingDirectory<T> {
fn clone(&self) -> Self {
CachingDirectory {
underlying: self.underlying.clone(),
cache: self.cache.clone(),
}
}
}

impl CachingDirectory {
Expand All @@ -44,32 +52,35 @@ impl CachingDirectory {
pub fn new_unbounded(underlying: Arc<dyn Directory>) -> CachingDirectory {
let byte_range_cache = ByteRangeCache::with_infinite_capacity(
&quickwit_storage::STORAGE_METRICS.shortlived_cache,
(),
);
CachingDirectory::new(underlying, byte_range_cache)
}
}

impl<T> CachingDirectory<T> {
/// Creates a new CachingDirectory.
///
/// Warming: The resulting CacheDirectory will cache all information without ever
/// removing any item from the cache.
pub fn new(underlying: Arc<dyn Directory>, cache: ByteRangeCache) -> CachingDirectory {
pub fn new(underlying: Arc<dyn Directory>, cache: ByteRangeCache<T>) -> CachingDirectory<T> {
CachingDirectory { underlying, cache }
}
}

impl fmt::Debug for CachingDirectory {
impl<T> fmt::Debug for CachingDirectory<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "CachingDirectory({:?})", self.underlying)
}
}

struct CachingFileHandle {
struct CachingFileHandle<T> {
path: PathBuf,
cache: ByteRangeCache,
cache: ByteRangeCache<T>,
underlying_filehandle: Arc<dyn FileHandle>,
}

impl fmt::Debug for CachingFileHandle {
impl<T> fmt::Debug for CachingFileHandle<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(
f,
Expand All @@ -81,7 +92,7 @@ impl fmt::Debug for CachingFileHandle {
}

#[async_trait]
impl FileHandle for CachingFileHandle {
impl<T: Send + 'static> FileHandle for CachingFileHandle<T> {
fn read_bytes(&self, byte_range: Range<usize>) -> io::Result<OwnedBytes> {
if let Some(bytes) = self.cache.get_slice(&self.path, byte_range.clone()) {
return Ok(bytes);
Expand All @@ -106,13 +117,13 @@ impl FileHandle for CachingFileHandle {
}
}

impl HasLen for CachingFileHandle {
impl<T> HasLen for CachingFileHandle<T> {
fn len(&self) -> usize {
self.underlying_filehandle.len()
}
}

impl Directory for CachingDirectory {
impl<T: Send + 'static> Directory for CachingDirectory<T> {
fn exists(&self, path: &Path) -> std::result::Result<bool, OpenReadError> {
self.underlying.exists(path)
}
Expand Down
13 changes: 10 additions & 3 deletions quickwit/quickwit-search/src/collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1805,7 +1805,8 @@ mod tests {
failed_splits: Vec::new(),
num_attempted_splits: 3,
num_successful_splits: 3,
intermediate_aggregation_result: None
intermediate_aggregation_result: None,
resource_stats: None,
}
);

Expand Down Expand Up @@ -1844,6 +1845,7 @@ mod tests {
num_attempted_splits: 3,
num_successful_splits: 3,
intermediate_aggregation_result: None,
resource_stats: None,
},
LeafSearchResponse {
num_hits: 10,
Expand All @@ -1862,6 +1864,7 @@ mod tests {
num_attempted_splits: 2,
num_successful_splits: 1,
intermediate_aggregation_result: None,
resource_stats: None,
},
],
);
Expand Down Expand Up @@ -1893,7 +1896,8 @@ mod tests {
}],
num_attempted_splits: 5,
num_successful_splits: 4,
intermediate_aggregation_result: None
intermediate_aggregation_result: None,
resource_stats: None,
}
);

Expand Down Expand Up @@ -1933,6 +1937,7 @@ mod tests {
num_attempted_splits: 3,
num_successful_splits: 3,
intermediate_aggregation_result: None,
resource_stats: None,
},
LeafSearchResponse {
num_hits: 10,
Expand All @@ -1951,6 +1956,7 @@ mod tests {
num_attempted_splits: 2,
num_successful_splits: 1,
intermediate_aggregation_result: None,
resource_stats: None,
},
],
);
Expand Down Expand Up @@ -1982,7 +1988,8 @@ mod tests {
}],
num_attempted_splits: 5,
num_successful_splits: 4,
intermediate_aggregation_result: None
intermediate_aggregation_result: None,
resource_stats: None,
}
);
// TODO would be nice to test aggregation too.
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-search/src/fetch_docs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ async fn fetch_docs_in_split(
global_doc_addrs.sort_by_key(|doc| doc.doc_addr);
// Opens the index without the ephemeral unbounded cache, this cache is indeed not useful
// when fetching docs as we will fetch them only once.
let mut index = open_index_with_caches(
let mut index = open_index_with_caches::<()>(
&searcher_context,
index_storage,
split,
Expand Down
40 changes: 22 additions & 18 deletions quickwit/quickwit-search/src/leaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ use quickwit_common::pretty::PrettySample;
use quickwit_directories::{CachingDirectory, HotDirectory, StorageDirectory};
use quickwit_doc_mapper::{DocMapper, TermRange, WarmupInfo};
use quickwit_proto::search::{
CountHits, LeafSearchRequest, LeafSearchResponse, PartialHit, SearchRequest, SortOrder, SortValue, SplitIdAndFooterOffsets, SplitSearchError
CountHits, LeafSearchRequest, LeafSearchResponse, PartialHit, SearchRequest, SortOrder,
SortValue, SplitIdAndFooterOffsets, SplitSearchError,
};
use quickwit_query::query_ast::{BoolQuery, QueryAst, QueryAstTransformer, RangeQuery, TermQuery};
use quickwit_query::tokenizers::TokenizerManager;
Expand Down Expand Up @@ -127,14 +128,18 @@ pub(crate) async fn open_split_bundle(
/// Opens a `tantivy::Index` for the given split with several cache layers:
/// - A split footer cache given by `SearcherContext.split_footer_cache`.
/// - A fast fields cache given by `SearcherContext.storage_long_term_cache`.
/// - An ephemeral unbounded cache directory whose lifetime is tied to the returned `Index`.
/// - An ephemeral unbounded cache directory whose lifetime is tied to the
/// returned `Index`.
///
/// TODO: generic T should be forced to SearcherPermit, but this requires the
/// search stream to also request permits.
#[instrument(skip_all, fields(split_footer_start=split_and_footer_offsets.split_footer_start, split_footer_end=split_and_footer_offsets.split_footer_end))]
pub(crate) async fn open_index_with_caches(
pub(crate) async fn open_index_with_caches<T: Send + 'static>(
searcher_context: &SearcherContext,
index_storage: Arc<dyn Storage>,
split_and_footer_offsets: &SplitIdAndFooterOffsets,
tokenizer_manager: Option<&TokenizerManager>,
ephemeral_unbounded_cache: Option<ByteRangeCache>,
ephemeral_unbounded_cache: Option<ByteRangeCache<T>>,
) -> anyhow::Result<Index> {
// Let's add a storage proxy to retry `get_slice` requests if they are taking too long,
// if configured in the searcher config.
Expand Down Expand Up @@ -368,6 +373,7 @@ fn get_leaf_resp_from_count(count: u64) -> LeafSearchResponse {
}

/// Apply a leaf search on a single split.
#[allow(clippy::too_many_arguments)]
async fn leaf_search_single_split(
searcher_context: &SearcherContext,
mut search_request: SearchRequest,
Expand All @@ -376,7 +382,7 @@ async fn leaf_search_single_split(
doc_mapper: Arc<DocMapper>,
split_filter: Arc<RwLock<CanSplitDoBetter>>,
aggregations_limits: AggregationLimitsGuard,
search_permit: &mut SearchPermit,
search_permit: SearchPermit,
) -> crate::Result<LeafSearchResponse> {
rewrite_request(
&mut search_request,
Expand All @@ -402,8 +408,10 @@ async fn leaf_search_single_split(
}

let split_id = split.split_id.to_string();
let byte_range_cache =
ByteRangeCache::with_infinite_capacity(&quickwit_storage::STORAGE_METRICS.shortlived_cache);
let byte_range_cache = ByteRangeCache::with_infinite_capacity(
&quickwit_storage::STORAGE_METRICS.shortlived_cache,
search_permit,
);
let index = open_index_with_caches(
searcher_context,
storage,
Expand Down Expand Up @@ -433,9 +441,8 @@ async fn leaf_search_single_split(
warmup(&searcher, &warmup_info).await?;
let warmup_end = Instant::now();
let warmup_duration: Duration = warmup_end.duration_since(warmup_start);

let short_lived_cache_num_bytes: u64 = byte_range_cache.get_num_bytes();
search_permit.set_actual_memory_usage_and_release_permit_after(short_lived_cache_num_bytes);
let short_lived_cache_num_bytes = byte_range_cache.get_num_bytes();
byte_range_cache.track(|permit| permit.warmup_completed(short_lived_cache_num_bytes));
let split_num_docs = split.num_docs;

let span = info_span!("tantivy_search");
Expand Down Expand Up @@ -1281,15 +1288,15 @@ pub async fn leaf_search(
// do no interleave with other leaf search requests.
let permit_futures = searcher_context
.search_permit_provider
.get_permits(split_with_req.len());
.get_permits(split_with_req.len())
.await;

for ((split, mut request), permit_fut) in
split_with_req.into_iter().zip(permit_futures.into_iter())
{
let leaf_split_search_permit = permit_fut
.instrument(info_span!("waiting_for_leaf_search_split_semaphore"))
.await
.expect("Failed to acquire permit. This should never happen! Please, report on https://github.com/quickwit-oss/quickwit/issues.");
.await;

let can_be_better = check_optimize_search_request(&mut request, &split, &split_filter);
if !can_be_better && !run_all_splits {
Expand Down Expand Up @@ -1379,7 +1386,7 @@ async fn leaf_search_single_split_wrapper(
split: SplitIdAndFooterOffsets,
split_filter: Arc<RwLock<CanSplitDoBetter>>,
incremental_merge_collector: Arc<Mutex<IncrementalCollector>>,
mut search_permit: SearchPermit,
search_permit: SearchPermit,
aggregations_limits: AggregationLimitsGuard,
) {
crate::SEARCH_METRICS.leaf_searches_splits_total.inc();
Expand All @@ -1394,13 +1401,10 @@ async fn leaf_search_single_split_wrapper(
doc_mapper,
split_filter.clone(),
aggregations_limits,
&mut search_permit,
search_permit,
)
.await;

// We explicitly drop it, to highlight it to the reader
std::mem::drop(search_permit);

if leaf_search_single_split_res.is_ok() {
timer.observe_duration();
}
Expand Down
2 changes: 2 additions & 0 deletions quickwit/quickwit-search/src/leaf_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,7 @@ mod tests {
sort_value2: None,
split_id: "split_1".to_string(),
}],
resource_stats: None,
};

assert!(cache.get(split_1.clone(), query_1.clone()).is_none());
Expand Down Expand Up @@ -342,6 +343,7 @@ mod tests {
sort_value2: None,
split_id: "split_1".to_string(),
}],
resource_stats: None,
};

// for split_1, 1 and 1bis cover different timestamp ranges
Expand Down
14 changes: 10 additions & 4 deletions quickwit/quickwit-search/src/list_terms.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use tracing::{debug, error, info, instrument};

use crate::leaf::open_index_with_caches;
use crate::search_job_placer::group_jobs_by_index_id;
use crate::search_permit_provider::SearchPermit;
use crate::{resolve_index_patterns, ClusterClient, SearchError, SearchJob, SearcherContext};

/// Performs a distributed list terms.
Expand Down Expand Up @@ -215,9 +216,12 @@ async fn leaf_list_terms_single_split(
search_request: &ListTermsRequest,
storage: Arc<dyn Storage>,
split: SplitIdAndFooterOffsets,
search_permit: SearchPermit,
) -> crate::Result<LeafListTermsResponse> {
let cache =
ByteRangeCache::with_infinite_capacity(&quickwit_storage::STORAGE_METRICS.shortlived_cache);
let cache = ByteRangeCache::with_infinite_capacity(
&quickwit_storage::STORAGE_METRICS.shortlived_cache,
search_permit,
);
let index =
open_index_with_caches(searcher_context, storage, &split, None, Some(cache)).await?;
let split_schema = index.schema();
Expand Down Expand Up @@ -330,15 +334,16 @@ pub async fn leaf_list_terms(
info!(split_offsets = ?PrettySample::new(splits, 5));
let permits = searcher_context
.search_permit_provider
.get_permits(splits.len());
.get_permits(splits.len())
.await;
let leaf_search_single_split_futures: Vec<_> = splits
.iter()
.zip(permits.into_iter())
.map(|(split, search_permit_recv)| {
let index_storage_clone = index_storage.clone();
let searcher_context_clone = searcher_context.clone();
async move {
let _leaf_split_search_permit = search_permit_recv.await;
let leaf_split_search_permit = search_permit_recv.await;
// TODO dedicated counter and timer?
crate::SEARCH_METRICS.leaf_searches_splits_total.inc();
let timer = crate::SEARCH_METRICS
Expand All @@ -349,6 +354,7 @@ pub async fn leaf_list_terms(
request,
index_storage_clone,
split.clone(),
leaf_split_search_permit,
)
.await;
timer.observe_duration();
Expand Down
Loading

0 comments on commit 08bff06

Please sign in to comment.