Skip to content

Commit

Permalink
Also use num_docs to estimate init cache size
Browse files Browse the repository at this point in the history
  • Loading branch information
rdettai committed Dec 4, 2024
1 parent fd13536 commit 56362da
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 35 deletions.
16 changes: 10 additions & 6 deletions quickwit/quickwit-search/src/leaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ use tracing::*;
use crate::collector::{make_collector_for_split, make_merge_collector, IncrementalCollector};
use crate::metrics::SEARCH_METRICS;
use crate::root::is_metadata_count_request_with_ast;
use crate::search_permit_provider::SearchPermit;
use crate::search_permit_provider::{compute_initial_memory_allocation, SearchPermit};
use crate::service::{deserialize_doc_mapper, SearcherContext};
use crate::{QuickwitAggregations, SearchError};

Expand Down Expand Up @@ -1315,13 +1315,17 @@ pub async fn leaf_search(

// We acquire all of the leaf search permits to make sure our single split search tasks
// do no interleave with other leaf search requests.
let permit_sizes = split_with_req.iter().map(|(split, _)| {
compute_initial_memory_allocation(
split,
searcher_context
.searcher_config
.warmup_single_split_initial_allocation,
)
});
let permit_futures = searcher_context
.search_permit_provider
.get_permits(
split_with_req
.iter()
.map(|(split, _)| ByteSize(split.split_footer_start)),
)
.get_permits(permit_sizes)
.await;

for ((split, mut request), permit_fut) in
Expand Down
16 changes: 10 additions & 6 deletions quickwit/quickwit-search/src/list_terms.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ use std::ops::Bound;
use std::sync::Arc;

use anyhow::Context;
use bytesize::ByteSize;
use futures::future::try_join_all;
use itertools::{Either, Itertools};
use quickwit_common::pretty::PrettySample;
Expand All @@ -41,6 +40,7 @@ use tracing::{debug, error, info, instrument};

use crate::leaf::open_index_with_caches;
use crate::search_job_placer::group_jobs_by_index_id;
use crate::search_permit_provider::compute_initial_memory_allocation;
use crate::{resolve_index_patterns, ClusterClient, SearchError, SearchJob, SearcherContext};

/// Performs a distributed list terms.
Expand Down Expand Up @@ -329,13 +329,17 @@ pub async fn leaf_list_terms(
splits: &[SplitIdAndFooterOffsets],
) -> Result<LeafListTermsResponse, SearchError> {
info!(split_offsets = ?PrettySample::new(splits, 5));
let permit_sizes = splits.iter().map(|split| {
compute_initial_memory_allocation(
split,
searcher_context
.searcher_config
.warmup_single_split_initial_allocation,
)
});
let permits = searcher_context
.search_permit_provider
.get_permits(
splits
.iter()
.map(|split| ByteSize(split.split_footer_start)),
)
.get_permits(permit_sizes)
.await;
let leaf_search_single_split_futures: Vec<_> = splits
.iter()
Expand Down
55 changes: 33 additions & 22 deletions quickwit/quickwit-search/src/search_permit_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use std::task::{Context, Poll};

use bytesize::ByteSize;
use quickwit_common::metrics::GaugeGuard;
use quickwit_proto::search::SplitIdAndFooterOffsets;
#[cfg(test)]
use tokio::sync::watch;
use tokio::sync::{mpsc, oneshot};
Expand All @@ -40,7 +41,6 @@ pub struct SearchPermitProvider {
message_sender: mpsc::UnboundedSender<SearchPermitMessage>,
#[cfg(test)]
actor_stopped: watch::Receiver<bool>,
per_permit_initial_memory_allocation: u64,
}

#[derive(Debug)]
Expand All @@ -59,12 +59,29 @@ pub enum SearchPermitMessage {
},
}

/// Makes very pessimistic estimate of the memory allocation required for a split search
///
/// This is refined later on when more data is available about the split.
pub fn compute_initial_memory_allocation(
split: &SplitIdAndFooterOffsets,
warmup_single_split_initial_allocation: ByteSize,
) -> ByteSize {
let split_size = split.split_footer_start;
let proportional_allocation =
warmup_single_split_initial_allocation.as_u64() * split.num_docs / 10_000_000;
let size_bytes = [
split_size,
proportional_allocation,
warmup_single_split_initial_allocation.as_u64(),
]
.into_iter()
.min()
.unwrap();
ByteSize(size_bytes)
}

impl SearchPermitProvider {
pub fn new(
num_download_slots: usize,
memory_budget: ByteSize,
initial_allocation: ByteSize,
) -> Self {
pub fn new(num_download_slots: usize, memory_budget: ByteSize) -> Self {
let (message_sender, message_receiver) = mpsc::unbounded_channel();
#[cfg(test)]
let (state_sender, state_receiver) = watch::channel(false);
Expand All @@ -83,11 +100,10 @@ impl SearchPermitProvider {
message_sender,
#[cfg(test)]
actor_stopped: state_receiver,
per_permit_initial_memory_allocation: initial_allocation.as_u64(),
}
}

/// Returns one permit future for each provided split size.
/// Returns one permit future for each provided split metadata.
///
/// The permits returned are guaranteed to be resolved in order. In
/// addition, the permits are guaranteed to be resolved before permits
Expand All @@ -96,18 +112,14 @@ impl SearchPermitProvider {
/// The permit memory size is capped by per_permit_initial_memory_allocation.
pub async fn get_permits(
&self,
split_sizes: impl IntoIterator<Item = ByteSize>,
splits: impl IntoIterator<Item = ByteSize>,
) -> Vec<SearchPermitFuture> {
let (permit_sender, permit_receiver) = oneshot::channel();
let permit_sizes = splits.into_iter().map(|size| size.as_u64()).collect();
self.message_sender
.send(SearchPermitMessage::Request {
permit_sender,
permit_sizes: split_sizes
.into_iter()
.map(|size| {
std::cmp::min(size.as_u64(), self.per_permit_initial_memory_allocation)
})
.collect(),
permit_sizes,
})
.expect("Receiver lives longer than sender");
permit_receiver
Expand Down Expand Up @@ -191,7 +203,7 @@ impl SearchPermitActor {
return None;
}
if let Some((_, next_permit_size)) = self.permits_requests.front() {
if self.total_memory_allocated + next_permit_size < self.total_memory_budget {
if self.total_memory_allocated + next_permit_size <= self.total_memory_budget {
return self.permits_requests.pop_front();
}
}
Expand Down Expand Up @@ -307,7 +319,7 @@ mod tests {

#[tokio::test]
async fn test_search_permit_order() {
let permit_provider = SearchPermitProvider::new(1, ByteSize::mb(100), ByteSize::mb(10));
let permit_provider = SearchPermitProvider::new(1, ByteSize::mb(100));
let mut all_futures = Vec::new();
let first_batch_of_permits = permit_provider
.get_permits(repeat(ByteSize::mb(10)).take(10))
Expand Down Expand Up @@ -357,7 +369,7 @@ mod tests {

#[tokio::test]
async fn test_search_permit_early_drops() {
let permit_provider = SearchPermitProvider::new(1, ByteSize::mb(100), ByteSize::mb(10));
let permit_provider = SearchPermitProvider::new(1, ByteSize::mb(100));
let permit_fut1 = permit_provider
.get_permits(vec![ByteSize::mb(10)])
.await
Expand Down Expand Up @@ -398,7 +410,7 @@ mod tests {

#[tokio::test]
async fn test_memory_budget() {
let permit_provider = SearchPermitProvider::new(100, ByteSize::mb(100), ByteSize::mb(10));
let permit_provider = SearchPermitProvider::new(100, ByteSize::mb(100));
let mut permit_futs = permit_provider
.get_permits(repeat(ByteSize::mb(10)).take(14))
.await;
Expand Down Expand Up @@ -428,10 +440,9 @@ mod tests {

#[tokio::test]
async fn test_warmup_slot() {
let permit_provider = SearchPermitProvider::new(10, ByteSize::mb(100), ByteSize::mb(1));
let permit_provider = SearchPermitProvider::new(10, ByteSize::mb(100));
let mut permit_futs = permit_provider
// permit sizes are capped by per_permit_initial_memory_allocation
.get_permits(repeat(ByteSize::mb(100)).take(16))
.get_permits(repeat(ByteSize::mb(1)).take(16))
.await;
let mut remaining_permit_futs = permit_futs.split_off(10).into_iter();
assert_eq!(remaining_permit_futs.len(), 6);
Expand Down
1 change: 0 additions & 1 deletion quickwit/quickwit-search/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,6 @@ impl SearcherContext {
let leaf_search_split_semaphore = SearchPermitProvider::new(
searcher_config.max_num_concurrent_split_searches,
searcher_config.warmup_memory_budget,
searcher_config.warmup_single_split_initial_allocation,
);
let split_stream_semaphore =
Semaphore::new(searcher_config.max_num_concurrent_split_streams);
Expand Down

0 comments on commit 56362da

Please sign in to comment.