Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Limit warmup memory usage #5524

Closed
wants to merge 12 commits into from
14 changes: 13 additions & 1 deletion quickwit/quickwit-config/src/node_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,8 @@ pub struct SearcherConfig {
#[serde(default)]
#[serde(skip_serializing_if = "Option::is_none")]
pub storage_timeout_policy: Option<StorageTimeoutPolicy>,
pub warmup_memory_budget: ByteSize,
pub warmup_single_split_initial_allocation: ByteSize,
}

/// Configuration controlling how fast a searcher should timeout a `get_slice`
Expand Down Expand Up @@ -263,7 +265,7 @@ impl StorageTimeoutPolicy {

impl Default for SearcherConfig {
fn default() -> Self {
Self {
SearcherConfig {
fast_field_cache_capacity: ByteSize::gb(1),
split_footer_cache_capacity: ByteSize::mb(500),
partial_request_cache_capacity: ByteSize::mb(64),
Expand All @@ -274,6 +276,8 @@ impl Default for SearcherConfig {
split_cache: None,
request_timeout_secs: Self::default_request_timeout_secs(),
storage_timeout_policy: None,
warmup_memory_budget: ByteSize::gb(1),
warmup_single_split_initial_allocation: ByteSize::mb(50),
}
}
}
Expand Down Expand Up @@ -308,6 +312,14 @@ impl SearcherConfig {
split_cache_limits.max_file_descriptors
);
}
if self.warmup_single_split_initial_allocation > self.warmup_memory_budget {
anyhow::bail!(
"warmup_single_split_initial_allocation ({}) must be lower or equal to \
warmup_memory_budget ({})",
self.warmup_single_split_initial_allocation,
self.warmup_memory_budget
);
}
}
Ok(())
}
Expand Down
4 changes: 3 additions & 1 deletion quickwit/quickwit-config/src/node_config/serialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -616,7 +616,9 @@ mod tests {
min_throughtput_bytes_per_secs: 100_000,
timeout_millis: 2_000,
max_num_retries: 2
})
}),
warmup_memory_budget: ByteSize::gb(1),
warmup_single_split_initial_allocation: ByteSize::mb(50),
}
);
assert_eq!(
Expand Down
61 changes: 42 additions & 19 deletions quickwit/quickwit-directories/src/caching_directory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,40 +30,44 @@ use tantivy::{Directory, HasLen};

/// The caching directory is a simple cache that wraps another directory.
#[derive(Clone)]
pub struct CachingDirectory {
pub struct CachingDirectory<C> {
underlying: Arc<dyn Directory>,
// TODO fixme: that's a pretty ugly cache we have here.
cache: Arc<ByteRangeCache>,
cache: C,
}

impl CachingDirectory {
/// Creates a new CachingDirectory.
impl CachingDirectory<Arc<ByteRangeCache>> {
/// Creates a new CachingDirectory with a default cache.
///
/// Warming: The resulting CacheDirectory will cache all information without ever
/// removing any item from the cache.
pub fn new_unbounded(underlying: Arc<dyn Directory>) -> CachingDirectory {
CachingDirectory {
underlying,
cache: Arc::new(ByteRangeCache::with_infinite_capacity(
&quickwit_storage::STORAGE_METRICS.shortlived_cache,
)),
}
pub fn new_unbounded(underlying: Arc<dyn Directory>) -> CachingDirectory<Arc<ByteRangeCache>> {
let byte_range_cache = ByteRangeCache::with_infinite_capacity(
&quickwit_storage::STORAGE_METRICS.shortlived_cache,
);
CachingDirectory::new(underlying, Arc::new(byte_range_cache))
}
}

impl<C: DirectoryCache> CachingDirectory<C> {
/// Creates a new CachingDirectory with an existing cache.
pub fn new(underlying: Arc<dyn Directory>, cache: C) -> CachingDirectory<C> {
CachingDirectory { underlying, cache }
}
}

impl fmt::Debug for CachingDirectory {
impl<T> fmt::Debug for CachingDirectory<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "CachingDirectory({:?})", self.underlying)
}
}

struct CachingFileHandle {
struct CachingFileHandle<C> {
path: PathBuf,
cache: Arc<ByteRangeCache>,
cache: C,
underlying_filehandle: Arc<dyn FileHandle>,
}

impl fmt::Debug for CachingFileHandle {
impl<T> fmt::Debug for CachingFileHandle<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(
f,
Expand All @@ -75,7 +79,7 @@ impl fmt::Debug for CachingFileHandle {
}

#[async_trait]
impl FileHandle for CachingFileHandle {
impl<C: DirectoryCache> FileHandle for CachingFileHandle<C> {
fn read_bytes(&self, byte_range: Range<usize>) -> io::Result<OwnedBytes> {
if let Some(bytes) = self.cache.get_slice(&self.path, byte_range.clone()) {
return Ok(bytes);
Expand All @@ -100,13 +104,13 @@ impl FileHandle for CachingFileHandle {
}
}

impl HasLen for CachingFileHandle {
impl<C> HasLen for CachingFileHandle<C> {
fn len(&self) -> usize {
self.underlying_filehandle.len()
}
}

impl Directory for CachingDirectory {
impl<C: DirectoryCache> Directory for CachingDirectory<C> {
fn exists(&self, path: &Path) -> std::result::Result<bool, OpenReadError> {
self.underlying.exists(path)
}
Expand Down Expand Up @@ -136,6 +140,25 @@ impl Directory for CachingDirectory {
crate::read_only_directory!();
}

/// A byte range cache that to be used in front of the directory.
pub trait DirectoryCache: Clone + Send + Sync + 'static {
/// If available, returns the cached view of the slice.
fn get_slice(&self, path: &Path, byte_range: Range<usize>) -> Option<OwnedBytes>;

/// Put the given amount of data in the cache.
fn put_slice(&self, path: PathBuf, byte_range: Range<usize>, bytes: OwnedBytes);
}

impl DirectoryCache for Arc<ByteRangeCache> {
fn get_slice(&self, path: &Path, byte_range: Range<usize>) -> Option<OwnedBytes> {
ByteRangeCache::get_slice(self, path, byte_range)
}

fn put_slice(&self, path: PathBuf, byte_range: Range<usize>, bytes: OwnedBytes) {
ByteRangeCache::put_slice(self, path, byte_range, bytes)
}
}

#[cfg(test)]
mod tests {

Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-directories/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ mod storage_directory;
mod union_directory;

pub use self::bundle_directory::{get_hotcache_from_split, read_split_footer, BundleDirectory};
pub use self::caching_directory::CachingDirectory;
pub use self::caching_directory::{CachingDirectory, DirectoryCache};
pub use self::debug_proxy_directory::{DebugProxyDirectory, ReadOperation};
pub use self::hot_directory::{write_hotcache, HotDirectory};
pub use self::storage_directory::StorageDirectory;
Expand Down
10 changes: 10 additions & 0 deletions quickwit/quickwit-proto/protos/quickwit/search.proto
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,14 @@ message LeafSearchRequest {
repeated string index_uris = 9;
}

message ResourceStats {
uint64 short_lived_cache_num_bytes = 1;
uint64 split_num_docs = 2;
uint64 warmup_microsecs = 3;
uint64 cpu_thread_pool_wait_microsecs = 4;
uint64 cpu_microsecs = 5;
}

/// LeafRequestRef references data in LeafSearchRequest to deduplicate data.
message LeafRequestRef {
// The ordinal of the doc_mapper in `LeafSearchRequest.doc_mappers`
Expand Down Expand Up @@ -479,6 +487,8 @@ message LeafSearchResponse {

// postcard serialized intermediate aggregation_result.
optional bytes intermediate_aggregation_result = 6;

ResourceStats resource_stats = 8;
}

message SnippetRequest {
Expand Down
17 changes: 17 additions & 0 deletions quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 6 additions & 1 deletion quickwit/quickwit-search/src/cluster_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use tracing::{debug, error, info, warn};
use crate::retry::search::LeafSearchRetryPolicy;
use crate::retry::search_stream::{LeafSearchStreamRetryPolicy, SuccessfulSplitIds};
use crate::retry::{retry_client, DefaultRetryPolicy, RetryPolicy};
use crate::{SearchError, SearchJobPlacer, SearchServiceClient};
use crate::{merge_resource_stats_it, SearchError, SearchJobPlacer, SearchServiceClient};

/// Maximum number of put requests emitted to perform a replicated given PUT KV.
const MAX_PUT_KV_ATTEMPTS: usize = 6;
Expand Down Expand Up @@ -317,6 +317,10 @@ fn merge_original_with_retry_leaf_search_response(
(Some(left), None) => Some(left),
(None, None) => None,
};
let resource_stats = merge_resource_stats_it([
&original_response.resource_stats,
&retry_response.resource_stats,
]);
Ok(LeafSearchResponse {
intermediate_aggregation_result,
num_hits: original_response.num_hits + retry_response.num_hits,
Expand All @@ -326,6 +330,7 @@ fn merge_original_with_retry_leaf_search_response(
partial_hits: original_response.partial_hits,
num_successful_splits: original_response.num_successful_splits
+ retry_response.num_successful_splits,
resource_stats,
})
}

Expand Down
Loading
Loading