Skip to content

Commit

Permalink
Limit search memory usage associated with warmup.
Browse files Browse the repository at this point in the history
Due to tantivy limitations, searching a split requires downloading all
of the required data, and keep them in memory. We call this phase
warmup.

Before this PR, the only thing that curbed memory usage was the search
permits: only N split search may happen concurrently.
Unfortunately, the amount of data required here varies vastly.

We need a mechanism to measure and avoid running more split search
when memory is tight.

Just using a semaphore is however not an option. We do not know
beforehands how much memory will be required by a split search, so it could easily
lead to a dead lock.

Instead, this commit builds upon the search permit provider.

The search permit provider is in charge of managing a configurable memory budget for this warmup memory.

We introduce here a configurable "warmup_single_split_initial_allocation".
A new leaf split search cannot be started if this memory is not
available. This initial allocation is meant to be greater than what will
be actually needed most of the time.

The split search then holds this allocation until the end of warmup.
After warmup, we can get the actual memory usage by interrogating the
warmup cache. We can then update the amount of memory held.
(most of the time, this should mean releasing some memory)

In addition, in this PR, at this point, we also release the warmup search permit:

We still have to perform the actual task of searching, but the thread
pool will take care of limiting the number of concurrent task.

Closes #5355
  • Loading branch information
fulmicoton committed Nov 15, 2024
1 parent 3cf9f38 commit f971b9d
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 18 deletions.
9 changes: 8 additions & 1 deletion quickwit/quickwit-config/src/node_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,10 @@ pub struct SearcherConfig {
#[serde(default)]
#[serde(skip_serializing_if = "Option::is_none")]
pub storage_timeout_policy: Option<StorageTimeoutPolicy>,

// TODO validate that `warmup_memory_budget` is greater than `warmup_single_split_initial_allocation`
pub warmup_memory_budget: ByteSize,
pub warmup_single_split_initial_allocation: ByteSize,
}

/// Configuration controlling how fast a searcher should timeout a `get_slice`
Expand Down Expand Up @@ -263,7 +267,7 @@ impl StorageTimeoutPolicy {

impl Default for SearcherConfig {
fn default() -> Self {
Self {
SearcherConfig {
fast_field_cache_capacity: ByteSize::gb(1),
split_footer_cache_capacity: ByteSize::mb(500),
partial_request_cache_capacity: ByteSize::mb(64),
Expand All @@ -274,6 +278,9 @@ impl Default for SearcherConfig {
split_cache: None,
request_timeout_secs: Self::default_request_timeout_secs(),
storage_timeout_policy: None,

warmup_memory_budget: ByteSize::gb(1),
warmup_single_split_initial_allocation: ByteSize::mb(50),
}
}
}
Expand Down
6 changes: 4 additions & 2 deletions quickwit/quickwit-search/src/leaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,7 @@ async fn leaf_search_single_split(
doc_mapper: Arc<DocMapper>,
split_filter: Arc<RwLock<CanSplitDoBetter>>,
aggregations_limits: AggregationLimitsGuard,
search_permit: &mut SearchPermit,
) -> crate::Result<LeafSearchResponse> {
rewrite_request(
&mut search_request,
Expand Down Expand Up @@ -434,9 +435,9 @@ async fn leaf_search_single_split(
let warmup_duration: Duration = warmup_end.duration_since(warmup_start);

let short_lived_cache_num_bytes: u64 = byte_range_cache.get_num_bytes();
search_permit.set_actual_memory_usage_and_release_permit_after(short_lived_cache_num_bytes);
let split_num_docs = split.num_docs;


let span = info_span!("tantivy_search");

let (search_request, leaf_search_response) = {
Expand Down Expand Up @@ -1378,7 +1379,7 @@ async fn leaf_search_single_split_wrapper(
split: SplitIdAndFooterOffsets,
split_filter: Arc<RwLock<CanSplitDoBetter>>,
incremental_merge_collector: Arc<Mutex<IncrementalCollector>>,
search_permit: SearchPermit,
mut search_permit: SearchPermit,
aggregations_limits: AggregationLimitsGuard,
) {
crate::SEARCH_METRICS.leaf_searches_splits_total.inc();
Expand All @@ -1393,6 +1394,7 @@ async fn leaf_search_single_split_wrapper(
doc_mapper,
split_filter.clone(),
aggregations_limits,
&mut search_permit,
)
.await;

Expand Down
66 changes: 52 additions & 14 deletions quickwit/quickwit-search/src/search_permit_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
use std::collections::VecDeque;
use std::sync::{Arc, Mutex};

use bytesize::ByteSize;
use quickwit_common::metrics::GaugeGuard;
use tokio::sync::oneshot;
use tracing::warn;

/// `SearchPermitProvider` is a distributor of permits to perform single split
/// search operation.
Expand All @@ -33,11 +35,14 @@ pub struct SearchPermitProvider {
}

impl SearchPermitProvider {
pub fn new(num_permits: usize) -> SearchPermitProvider {
pub fn new(num_permits: usize, memory_budget: ByteSize, initial_allocation: ByteSize) -> SearchPermitProvider {
SearchPermitProvider {
inner_arc: Arc::new(Mutex::new(InnerSearchPermitProvider {
num_permits_available: num_permits,
memory_budget: memory_budget.as_u64(),
permits_requests: VecDeque::new(),
memory_allocated: 0u64,
initial_allocation: initial_allocation.as_u64(),
})),
}
}
Expand Down Expand Up @@ -65,6 +70,14 @@ impl SearchPermitProvider {

struct InnerSearchPermitProvider {
num_permits_available: usize,

// Note it is possible for memory_allocated to exceed memory_budget temporarily,
// if and only if a split leaf search task ended up using more than `initial_allocation`.
//
// When it happens, new permits will not be assigned until the memory is freed.
memory_budget: u64,
memory_allocated: u64,
initial_allocation: u64,
permits_requests: VecDeque<oneshot::Sender<SearchPermit>>,
}

Expand Down Expand Up @@ -94,30 +107,29 @@ impl InnerSearchPermitProvider {
permits
}

fn recycle_permit(&mut self, inner_arc: &Arc<Mutex<InnerSearchPermitProvider>>) {
self.num_permits_available += 1;
self.assign_available_permits(inner_arc);
}

fn assign_available_permits(&mut self, inner_arc: &Arc<Mutex<InnerSearchPermitProvider>>) {
while self.num_permits_available > 0 {
let Some(sender) = self.permits_requests.pop_front() else {
while self.num_permits_available > 0 && self.memory_allocated + self.initial_allocation <= self.memory_budget {
let Some(permit_requester_tx) = self.permits_requests.pop_front() else {
break;
};
let mut ongoing_gauge_guard = GaugeGuard::from_gauge(
&crate::SEARCH_METRICS.leaf_search_single_split_tasks_ongoing,
);
ongoing_gauge_guard.add(1);
let send_res = sender.send(SearchPermit {
let send_res = permit_requester_tx.send(SearchPermit {
_ongoing_gauge_guard: ongoing_gauge_guard,
inner_arc: inner_arc.clone(),
recycle_on_drop: true,
warmup_permit_held: true,
memory_allocation: self.initial_allocation,
});
match send_res {
Ok(()) => {
self.num_permits_available -= 1;
self.memory_allocated += self.initial_allocation;
}
Err(search_permit) => {
// We cannot just decrease the num_permits_available in all case and rely on
// the drop logic here: it would cause a dead lock on the inner_arc Mutex.
search_permit.drop_without_recycling_permit();
}
}
Expand All @@ -131,23 +143,49 @@ impl InnerSearchPermitProvider {
pub struct SearchPermit {
_ongoing_gauge_guard: GaugeGuard<'static>,
inner_arc: Arc<Mutex<InnerSearchPermitProvider>>,
recycle_on_drop: bool,
warmup_permit_held: bool,
memory_allocation: u64,
}

impl SearchPermit {
/// After warm up, we have a proper estimate of the memory usage of a single split leaf search.
///
/// We can then set the actual memory usage.
pub fn set_actual_memory_usage_and_release_permit_after(&mut self, new_memory_usage: u64) {
if new_memory_usage > self.memory_allocation {
warn!(memory_usage=new_memory_usage, memory_allocation=self.memory_allocation, "current leaf search is consuming more memory than the initial allocation");
}
let mut inner_guard = self.inner_arc.lock().unwrap();
let delta = new_memory_usage as i64 - inner_guard.initial_allocation as i64;
inner_guard.memory_allocated += delta as u64;
inner_guard.num_permits_available += 1;
if inner_guard.memory_allocated > inner_guard.memory_budget {
warn!(memory_allocated=inner_guard.memory_allocated, memory_budget=inner_guard.memory_budget, "memory allocated exceeds memory budget");
}
self.memory_allocation = new_memory_usage;
inner_guard.assign_available_permits(&self.inner_arc);
}

fn drop_without_recycling_permit(mut self) {
self.recycle_on_drop = false;
self.warmup_permit_held = false;
self.memory_allocation = 0u64;
drop(self);
}
}

impl Drop for SearchPermit {
fn drop(&mut self) {
if !self.recycle_on_drop {
// This is not just an optimization. This is necessary to avoid a dead lock when the
// permit requester dropped its receiver channel.
if !self.warmup_permit_held && self.memory_allocation == 0 {
return;
}
let mut inner_guard = self.inner_arc.lock().unwrap();
inner_guard.recycle_permit(&self.inner_arc.clone());
if self.warmup_permit_held {
inner_guard.num_permits_available += 1;
}
inner_guard.memory_allocated -= self.memory_allocation;
inner_guard.assign_available_permits(&self.inner_arc);
}
}

Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-search/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -489,7 +489,7 @@ impl SearcherContext {
&quickwit_storage::STORAGE_METRICS.split_footer_cache,
);
let leaf_search_split_semaphore =
SearchPermitProvider::new(searcher_config.max_num_concurrent_split_searches);
SearchPermitProvider::new(searcher_config.max_num_concurrent_split_searches, searcher_config.warmup_memory_budget, searcher_config.warmup_single_split_initial_allocation);
let split_stream_semaphore =
Semaphore::new(searcher_config.max_num_concurrent_split_streams);
let fast_field_cache_capacity = searcher_config.fast_field_cache_capacity.as_u64() as usize;
Expand Down

0 comments on commit f971b9d

Please sign in to comment.