diff --git a/quickwit/quickwit-directories/src/caching_directory.rs b/quickwit/quickwit-directories/src/caching_directory.rs index 5ee8b63777a..e55c49f7fad 100644 --- a/quickwit/quickwit-directories/src/caching_directory.rs +++ b/quickwit/quickwit-directories/src/caching_directory.rs @@ -30,44 +30,46 @@ use tantivy::{Directory, HasLen}; /// The caching directory is a simple cache that wraps another directory. #[derive(Clone)] -pub struct CachingDirectory { +pub struct CachingDirectory { underlying: Arc, - cache: C, + // TODO fixme: that's a pretty ugly cache we have here. + cache: ByteRangeCache, } -impl CachingDirectory> { - /// Creates a new CachingDirectory with a default cache. +impl CachingDirectory { + /// Creates a new CachingDirectory. /// /// Warming: The resulting CacheDirectory will cache all information without ever /// removing any item from the cache. - pub fn new_unbounded(underlying: Arc) -> CachingDirectory> { + pub fn new_unbounded(underlying: Arc) -> CachingDirectory { let byte_range_cache = ByteRangeCache::with_infinite_capacity( &quickwit_storage::STORAGE_METRICS.shortlived_cache, ); - CachingDirectory::new(underlying, Arc::new(byte_range_cache)) + CachingDirectory::new(underlying, byte_range_cache) } -} -impl CachingDirectory { - /// Creates a new CachingDirectory with an existing cache. - pub fn new(underlying: Arc, cache: C) -> CachingDirectory { + /// Creates a new CachingDirectory. + /// + /// Warming: The resulting CacheDirectory will cache all information without ever + /// removing any item from the cache. + pub fn new(underlying: Arc, cache: ByteRangeCache) -> CachingDirectory { CachingDirectory { underlying, cache } } } -impl fmt::Debug for CachingDirectory { +impl fmt::Debug for CachingDirectory { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { write!(f, "CachingDirectory({:?})", self.underlying) } } -struct CachingFileHandle { +struct CachingFileHandle { path: PathBuf, - cache: C, + cache: ByteRangeCache, underlying_filehandle: Arc, } -impl fmt::Debug for CachingFileHandle { +impl fmt::Debug for CachingFileHandle { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { write!( f, @@ -79,7 +81,7 @@ impl fmt::Debug for CachingFileHandle { } #[async_trait] -impl FileHandle for CachingFileHandle { +impl FileHandle for CachingFileHandle { fn read_bytes(&self, byte_range: Range) -> io::Result { if let Some(bytes) = self.cache.get_slice(&self.path, byte_range.clone()) { return Ok(bytes); @@ -104,13 +106,13 @@ impl FileHandle for CachingFileHandle { } } -impl HasLen for CachingFileHandle { +impl HasLen for CachingFileHandle { fn len(&self) -> usize { self.underlying_filehandle.len() } } -impl Directory for CachingDirectory { +impl Directory for CachingDirectory { fn exists(&self, path: &Path) -> std::result::Result { self.underlying.exists(path) } @@ -140,25 +142,6 @@ impl Directory for CachingDirectory { crate::read_only_directory!(); } -/// A byte range cache that to be used in front of the directory. -pub trait DirectoryCache: Clone + Send + Sync + 'static { - /// If available, returns the cached view of the slice. - fn get_slice(&self, path: &Path, byte_range: Range) -> Option; - - /// Put the given amount of data in the cache. - fn put_slice(&self, path: PathBuf, byte_range: Range, bytes: OwnedBytes); -} - -impl DirectoryCache for Arc { - fn get_slice(&self, path: &Path, byte_range: Range) -> Option { - ByteRangeCache::get_slice(self, path, byte_range) - } - - fn put_slice(&self, path: PathBuf, byte_range: Range, bytes: OwnedBytes) { - ByteRangeCache::put_slice(self, path, byte_range, bytes) - } -} - #[cfg(test)] mod tests { diff --git a/quickwit/quickwit-directories/src/lib.rs b/quickwit/quickwit-directories/src/lib.rs index d8cd9e02fa3..4df4f2799ec 100644 --- a/quickwit/quickwit-directories/src/lib.rs +++ b/quickwit/quickwit-directories/src/lib.rs @@ -37,7 +37,7 @@ mod storage_directory; mod union_directory; pub use self::bundle_directory::{get_hotcache_from_split, read_split_footer, BundleDirectory}; -pub use self::caching_directory::{CachingDirectory, DirectoryCache}; +pub use self::caching_directory::CachingDirectory; pub use self::debug_proxy_directory::{DebugProxyDirectory, ReadOperation}; pub use self::hot_directory::{write_hotcache, HotDirectory}; pub use self::storage_directory::StorageDirectory; diff --git a/quickwit/quickwit-search/src/fetch_docs.rs b/quickwit/quickwit-search/src/fetch_docs.rs index 9240d5399ae..e8cf5c3a1fb 100644 --- a/quickwit/quickwit-search/src/fetch_docs.rs +++ b/quickwit/quickwit-search/src/fetch_docs.rs @@ -27,7 +27,7 @@ use quickwit_doc_mapper::DocMapper; use quickwit_proto::search::{ FetchDocsResponse, PartialHit, SnippetRequest, SplitIdAndFooterOffsets, }; -use quickwit_storage::{ByteRangeCache, Storage}; +use quickwit_storage::Storage; use tantivy::query::Query; use tantivy::schema::document::CompactDocValue; use tantivy::schema::{Document as DocumentTrait, Field, TantivyDocument, Value}; @@ -179,7 +179,7 @@ async fn fetch_docs_in_split( index_storage, split, Some(doc_mapper.tokenizer_manager()), - Option::>::None, + None, ) .await .context("open-index-for-split")?; diff --git a/quickwit/quickwit-search/src/leaf.rs b/quickwit/quickwit-search/src/leaf.rs index 9d67445f174..dd11743c6cb 100644 --- a/quickwit/quickwit-search/src/leaf.rs +++ b/quickwit/quickwit-search/src/leaf.rs @@ -25,9 +25,10 @@ use std::sync::{Arc, Mutex, RwLock}; use std::time::{Duration, Instant}; use anyhow::Context; +use bytesize::ByteSize; use futures::future::try_join_all; use quickwit_common::pretty::PrettySample; -use quickwit_directories::{CachingDirectory, DirectoryCache, HotDirectory, StorageDirectory}; +use quickwit_directories::{CachingDirectory, HotDirectory, StorageDirectory}; use quickwit_doc_mapper::{DocMapper, TermRange, WarmupInfo}; use quickwit_proto::search::{ CountHits, LeafSearchRequest, LeafSearchResponse, PartialHit, ResourceStats, SearchRequest, @@ -53,7 +54,6 @@ use crate::metrics::SEARCH_METRICS; use crate::root::is_metadata_count_request_with_ast; use crate::search_permit_provider::SearchPermit; use crate::service::{deserialize_doc_mapper, SearcherContext}; -use crate::tracked_cache::TrackedByteRangeCache; use crate::{QuickwitAggregations, SearchError}; #[instrument(skip_all)] @@ -131,16 +131,13 @@ pub(crate) async fn open_split_bundle( /// - A fast fields cache given by `SearcherContext.storage_long_term_cache`. /// - An ephemeral unbounded cache directory whose lifetime is tied to the /// returned `Index`. -/// -/// TODO: generic T should be forced to SearcherPermit, but this requires the -/// search stream to also request permits. #[instrument(skip_all, fields(split_footer_start=split_and_footer_offsets.split_footer_start, split_footer_end=split_and_footer_offsets.split_footer_end))] -pub(crate) async fn open_index_with_caches( +pub(crate) async fn open_index_with_caches( searcher_context: &SearcherContext, index_storage: Arc, split_and_footer_offsets: &SplitIdAndFooterOffsets, tokenizer_manager: Option<&TokenizerManager>, - ephemeral_unbounded_cache: Option, + ephemeral_unbounded_cache: Option, ) -> anyhow::Result { // Let's add a storage proxy to retry `get_slice` requests if they are taking too long, // if configured in the searcher config. @@ -383,7 +380,7 @@ async fn leaf_search_single_split( doc_mapper: Arc, split_filter: Arc>, aggregations_limits: AggregationLimitsGuard, - search_permit: SearchPermit, + search_permit: &mut SearchPermit, ) -> crate::Result { rewrite_request( &mut search_request, @@ -411,13 +408,12 @@ async fn leaf_search_single_split( let split_id = split.split_id.to_string(); let byte_range_cache = ByteRangeCache::with_infinite_capacity(&quickwit_storage::STORAGE_METRICS.shortlived_cache); - let tracked_cache = TrackedByteRangeCache::new(byte_range_cache, search_permit); let index = open_index_with_caches( searcher_context, storage, &split, Some(doc_mapper.tokenizer_manager()), - Some(tracked_cache.clone()), + Some(byte_range_cache.clone()), ) .await?; let split_schema = index.schema(); @@ -441,8 +437,9 @@ async fn leaf_search_single_split( warmup(&searcher, &warmup_info).await?; let warmup_end = Instant::now(); let warmup_duration: Duration = warmup_end.duration_since(warmup_start); - tracked_cache.warmup_completed(); - let short_lived_cache_num_bytes = tracked_cache.get_num_bytes(); + let warmup_size = byte_range_cache.get_num_bytes(); + search_permit.warmup_completed(ByteSize(warmup_size)); + let short_lived_cache_num_bytes = byte_range_cache.get_num_bytes(); let split_num_docs = split.num_docs; let span = info_span!("tantivy_search"); @@ -1386,7 +1383,7 @@ async fn leaf_search_single_split_wrapper( split: SplitIdAndFooterOffsets, split_filter: Arc>, incremental_merge_collector: Arc>, - search_permit: SearchPermit, + mut search_permit: SearchPermit, aggregations_limits: AggregationLimitsGuard, ) { crate::SEARCH_METRICS.leaf_searches_splits_total.inc(); @@ -1401,10 +1398,13 @@ async fn leaf_search_single_split_wrapper( doc_mapper, split_filter.clone(), aggregations_limits, - search_permit, + &mut search_permit, ) .await; + // We explicitly drop it, to highlight it to the reader + std::mem::drop(search_permit); + if leaf_search_single_split_res.is_ok() { timer.observe_duration(); } diff --git a/quickwit/quickwit-search/src/lib.rs b/quickwit/quickwit-search/src/lib.rs index c9031865c52..b7c03a0c5ea 100644 --- a/quickwit/quickwit-search/src/lib.rs +++ b/quickwit/quickwit-search/src/lib.rs @@ -42,7 +42,6 @@ mod search_response_rest; mod search_stream; mod service; pub(crate) mod top_k_collector; -mod tracked_cache; mod metrics; mod search_permit_provider; diff --git a/quickwit/quickwit-search/src/list_terms.rs b/quickwit/quickwit-search/src/list_terms.rs index 06136b0fefb..cfd2b724689 100644 --- a/quickwit/quickwit-search/src/list_terms.rs +++ b/quickwit/quickwit-search/src/list_terms.rs @@ -41,7 +41,6 @@ use tracing::{debug, error, info, instrument}; use crate::leaf::open_index_with_caches; use crate::search_job_placer::group_jobs_by_index_id; use crate::search_permit_provider::SearchPermit; -use crate::tracked_cache::TrackedByteRangeCache; use crate::{resolve_index_patterns, ClusterClient, SearchError, SearchJob, SearcherContext}; /// Performs a distributed list terms. @@ -217,14 +216,12 @@ async fn leaf_list_terms_single_split( search_request: &ListTermsRequest, storage: Arc, split: SplitIdAndFooterOffsets, - search_permit: SearchPermit, + _search_permit: SearchPermit, ) -> crate::Result { let cache = ByteRangeCache::with_infinite_capacity(&quickwit_storage::STORAGE_METRICS.shortlived_cache); - let tracked_cache = TrackedByteRangeCache::new(cache, search_permit); let index = - open_index_with_caches(searcher_context, storage, &split, None, Some(tracked_cache)) - .await?; + open_index_with_caches(searcher_context, storage, &split, None, Some(cache)).await?; let split_schema = index.schema(); let reader = index .reader_builder() diff --git a/quickwit/quickwit-search/src/search_stream/leaf.rs b/quickwit/quickwit-search/src/search_stream/leaf.rs index 80a3509ea0e..d8e4e3b6536 100644 --- a/quickwit/quickwit-search/src/search_stream/leaf.rs +++ b/quickwit/quickwit-search/src/search_stream/leaf.rs @@ -116,6 +116,7 @@ async fn leaf_search_stream_single_split( mut stream_request: SearchStreamRequest, storage: Arc, ) -> crate::Result { + // TODO: Should we track the memory here using the SearchPermitProvider? let _leaf_split_stream_permit = searcher_context .split_stream_semaphore .acquire() @@ -129,13 +130,12 @@ async fn leaf_search_stream_single_split( let cache = ByteRangeCache::with_infinite_capacity(&quickwit_storage::STORAGE_METRICS.shortlived_cache); - // TODO should create a SearchPermit and wrap ByteRangeCache with TrackedByteRangeCache here? let index = open_index_with_caches( &searcher_context, storage, &split, Some(doc_mapper.tokenizer_manager()), - Some(Arc::new(cache)), + Some(cache), ) .await?; let split_schema = index.schema(); diff --git a/quickwit/quickwit-search/src/tracked_cache.rs b/quickwit/quickwit-search/src/tracked_cache.rs deleted file mode 100644 index 6529f38332b..00000000000 --- a/quickwit/quickwit-search/src/tracked_cache.rs +++ /dev/null @@ -1,78 +0,0 @@ -// Copyright (C) 2024 Quickwit, Inc. -// -// Quickwit is offered under the AGPL v3.0 and as commercial software. -// For commercial licensing, contact us at hello@quickwit.io. -// -// AGPL: -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as -// published by the Free Software Foundation, either version 3 of the -// License, or (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -use std::sync::{Arc, Mutex}; - -use bytesize::ByteSize; -use quickwit_directories::DirectoryCache; -use quickwit_storage::ByteRangeCache; - -use crate::search_permit_provider::SearchPermit; - -/// A [`ByteRangeCache`] tied to a [`SearchPermit`]. -#[derive(Clone)] -pub struct TrackedByteRangeCache { - inner: Arc, -} - -struct Inner { - cache: ByteRangeCache, - search_permit: Mutex, -} - -impl TrackedByteRangeCache { - pub fn new(cache: ByteRangeCache, search_permit: SearchPermit) -> TrackedByteRangeCache { - TrackedByteRangeCache { - inner: Arc::new(Inner { - cache, - search_permit: Mutex::new(search_permit), - }), - } - } - - pub fn warmup_completed(&self) { - self.inner - .search_permit - .lock() - .unwrap() - .warmup_completed(ByteSize(self.get_num_bytes())); - } - - pub fn get_num_bytes(&self) -> u64 { - self.inner.cache.get_num_bytes() - } -} - -impl DirectoryCache for TrackedByteRangeCache { - fn get_slice( - &self, - path: &std::path::Path, - byte_range: std::ops::Range, - ) -> Option { - self.inner.cache.get_slice(path, byte_range) - } - fn put_slice( - &self, - path: std::path::PathBuf, - byte_range: std::ops::Range, - bytes: quickwit_storage::OwnedBytes, - ) { - self.inner.cache.put_slice(path, byte_range, bytes) - } -} diff --git a/quickwit/quickwit-storage/src/cache/byte_range_cache.rs b/quickwit/quickwit-storage/src/cache/byte_range_cache.rs index fbe0031f291..425e4f9a043 100644 --- a/quickwit/quickwit-storage/src/cache/byte_range_cache.rs +++ b/quickwit/quickwit-storage/src/cache/byte_range_cache.rs @@ -22,7 +22,7 @@ use std::collections::BTreeMap; use std::ops::Range; use std::path::{Path, PathBuf}; use std::sync::atomic::{AtomicU64, Ordering}; -use std::sync::Mutex; +use std::sync::{Arc, Mutex}; use tantivy::directory::OwnedBytes; @@ -345,9 +345,14 @@ impl Drop for NeedMutByteRangeCache { /// cached data, the changes may or may not get recorded. /// /// At the moment this is hardly a cache as it features no eviction policy. +#[derive(Clone)] pub struct ByteRangeCache { + inner_arc: Arc, +} + +struct Inner { num_stored_bytes: AtomicU64, - inner: Mutex>, + need_mut_byte_range_cache: Mutex>, } impl ByteRangeCache { @@ -355,29 +360,39 @@ impl ByteRangeCache { pub fn with_infinite_capacity(cache_counters: &'static CacheMetrics) -> Self { let need_mut_byte_range_cache = NeedMutByteRangeCache::with_infinite_capacity(cache_counters); - ByteRangeCache { + let inner = Inner { num_stored_bytes: AtomicU64::default(), - inner: Mutex::new(need_mut_byte_range_cache), + need_mut_byte_range_cache: Mutex::new(need_mut_byte_range_cache), + }; + ByteRangeCache { + inner_arc: Arc::new(inner), } } /// Overall amount of bytes stored in the cache. pub fn get_num_bytes(&self) -> u64 { - self.num_stored_bytes.load(Ordering::Relaxed) + self.inner_arc.num_stored_bytes.load(Ordering::Relaxed) } /// If available, returns the cached view of the slice. pub fn get_slice(&self, path: &Path, byte_range: Range) -> Option { - self.inner.lock().unwrap().get_slice(path, byte_range) + self.inner_arc + .need_mut_byte_range_cache + .lock() + .unwrap() + .get_slice(path, byte_range) } /// Put the given amount of data in the cache. pub fn put_slice(&self, path: PathBuf, byte_range: Range, bytes: OwnedBytes) { - let mut need_mut_byte_range_cache_locked = self.inner.lock().unwrap(); + let mut need_mut_byte_range_cache_locked = + self.inner_arc.need_mut_byte_range_cache.lock().unwrap(); need_mut_byte_range_cache_locked.put_slice(path, byte_range, bytes); let num_bytes = need_mut_byte_range_cache_locked.num_bytes; drop(need_mut_byte_range_cache_locked); - self.num_stored_bytes.store(num_bytes, Ordering::Relaxed); + self.inner_arc + .num_stored_bytes + .store(num_bytes, Ordering::Relaxed); } } @@ -455,13 +470,13 @@ mod tests { .sum(); // in some case we have ranges touching each other, count_items count them // as only one, but cache count them as 2. - assert!(cache.inner.lock().unwrap().num_items >= expected_item_count as u64); + assert!(cache.inner_arc.need_mut_byte_range_cache.lock().unwrap().num_items >= expected_item_count as u64); let expected_byte_count = state.values() .flatten() .filter(|stored| **stored) .count(); - assert_eq!(cache.inner.lock().unwrap().num_bytes, expected_byte_count as u64); + assert_eq!(cache.inner_arc.need_mut_byte_range_cache.lock().unwrap().num_bytes, expected_byte_count as u64); } Operation::Get { range, @@ -528,7 +543,7 @@ mod tests { ); { - let mutable_cache = &cache.inner.lock().unwrap(); + let mutable_cache = cache.inner_arc.need_mut_byte_range_cache.lock().unwrap(); assert_eq!(mutable_cache.cache.len(), 4); assert_eq!(mutable_cache.num_items, 4); assert_eq!(mutable_cache.cache_counters.in_cache_count.get(), 4); @@ -540,7 +555,7 @@ mod tests { { // now they should've been merged, except the last one - let mutable_cache = &cache.inner.lock().unwrap(); + let mutable_cache = cache.inner_arc.need_mut_byte_range_cache.lock().unwrap(); assert_eq!(mutable_cache.cache.len(), 2); assert_eq!(mutable_cache.num_items, 2); assert_eq!(mutable_cache.cache_counters.in_cache_count.get(), 2);