Skip to content

Commit

Permalink
Revert permit tied to range cache
Browse files Browse the repository at this point in the history
  • Loading branch information
rdettai committed Dec 2, 2024
1 parent edfc0b1 commit c34ce22
Show file tree
Hide file tree
Showing 9 changed files with 67 additions and 151 deletions.
55 changes: 19 additions & 36 deletions quickwit/quickwit-directories/src/caching_directory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,44 +30,46 @@ use tantivy::{Directory, HasLen};

/// The caching directory is a simple cache that wraps another directory.
#[derive(Clone)]
pub struct CachingDirectory<C> {
pub struct CachingDirectory {
underlying: Arc<dyn Directory>,
cache: C,
// TODO fixme: that's a pretty ugly cache we have here.
cache: ByteRangeCache,
}

impl CachingDirectory<Arc<ByteRangeCache>> {
/// Creates a new CachingDirectory with a default cache.
impl CachingDirectory {
/// Creates a new CachingDirectory.
///
/// Warming: The resulting CacheDirectory will cache all information without ever
/// removing any item from the cache.
pub fn new_unbounded(underlying: Arc<dyn Directory>) -> CachingDirectory<Arc<ByteRangeCache>> {
pub fn new_unbounded(underlying: Arc<dyn Directory>) -> CachingDirectory {
let byte_range_cache = ByteRangeCache::with_infinite_capacity(
&quickwit_storage::STORAGE_METRICS.shortlived_cache,
);
CachingDirectory::new(underlying, Arc::new(byte_range_cache))
CachingDirectory::new(underlying, byte_range_cache)
}
}

impl<C: DirectoryCache> CachingDirectory<C> {
/// Creates a new CachingDirectory with an existing cache.
pub fn new(underlying: Arc<dyn Directory>, cache: C) -> CachingDirectory<C> {
/// Creates a new CachingDirectory.
///
/// Warming: The resulting CacheDirectory will cache all information without ever
/// removing any item from the cache.
pub fn new(underlying: Arc<dyn Directory>, cache: ByteRangeCache) -> CachingDirectory {
CachingDirectory { underlying, cache }
}
}

impl<T> fmt::Debug for CachingDirectory<T> {
impl fmt::Debug for CachingDirectory {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "CachingDirectory({:?})", self.underlying)
}
}

struct CachingFileHandle<C> {
struct CachingFileHandle {
path: PathBuf,
cache: C,
cache: ByteRangeCache,
underlying_filehandle: Arc<dyn FileHandle>,
}

impl<T> fmt::Debug for CachingFileHandle<T> {
impl fmt::Debug for CachingFileHandle {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(
f,
Expand All @@ -79,7 +81,7 @@ impl<T> fmt::Debug for CachingFileHandle<T> {
}

#[async_trait]
impl<C: DirectoryCache> FileHandle for CachingFileHandle<C> {
impl FileHandle for CachingFileHandle {
fn read_bytes(&self, byte_range: Range<usize>) -> io::Result<OwnedBytes> {
if let Some(bytes) = self.cache.get_slice(&self.path, byte_range.clone()) {
return Ok(bytes);
Expand All @@ -104,13 +106,13 @@ impl<C: DirectoryCache> FileHandle for CachingFileHandle<C> {
}
}

impl<C> HasLen for CachingFileHandle<C> {
impl HasLen for CachingFileHandle {
fn len(&self) -> usize {
self.underlying_filehandle.len()
}
}

impl<C: DirectoryCache> Directory for CachingDirectory<C> {
impl Directory for CachingDirectory {
fn exists(&self, path: &Path) -> std::result::Result<bool, OpenReadError> {
self.underlying.exists(path)
}
Expand Down Expand Up @@ -140,25 +142,6 @@ impl<C: DirectoryCache> Directory for CachingDirectory<C> {
crate::read_only_directory!();
}

/// A byte range cache that to be used in front of the directory.
pub trait DirectoryCache: Clone + Send + Sync + 'static {
/// If available, returns the cached view of the slice.
fn get_slice(&self, path: &Path, byte_range: Range<usize>) -> Option<OwnedBytes>;

/// Put the given amount of data in the cache.
fn put_slice(&self, path: PathBuf, byte_range: Range<usize>, bytes: OwnedBytes);
}

impl DirectoryCache for Arc<ByteRangeCache> {
fn get_slice(&self, path: &Path, byte_range: Range<usize>) -> Option<OwnedBytes> {
ByteRangeCache::get_slice(self, path, byte_range)
}

fn put_slice(&self, path: PathBuf, byte_range: Range<usize>, bytes: OwnedBytes) {
ByteRangeCache::put_slice(self, path, byte_range, bytes)
}
}

#[cfg(test)]
mod tests {

Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-directories/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ mod storage_directory;
mod union_directory;

pub use self::bundle_directory::{get_hotcache_from_split, read_split_footer, BundleDirectory};
pub use self::caching_directory::{CachingDirectory, DirectoryCache};
pub use self::caching_directory::CachingDirectory;
pub use self::debug_proxy_directory::{DebugProxyDirectory, ReadOperation};
pub use self::hot_directory::{write_hotcache, HotDirectory};
pub use self::storage_directory::StorageDirectory;
Expand Down
4 changes: 2 additions & 2 deletions quickwit/quickwit-search/src/fetch_docs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use quickwit_doc_mapper::DocMapper;
use quickwit_proto::search::{
FetchDocsResponse, PartialHit, SnippetRequest, SplitIdAndFooterOffsets,
};
use quickwit_storage::{ByteRangeCache, Storage};
use quickwit_storage::Storage;
use tantivy::query::Query;
use tantivy::schema::document::CompactDocValue;
use tantivy::schema::{Document as DocumentTrait, Field, TantivyDocument, Value};
Expand Down Expand Up @@ -179,7 +179,7 @@ async fn fetch_docs_in_split(
index_storage,
split,
Some(doc_mapper.tokenizer_manager()),
Option::<Arc<ByteRangeCache>>::None,
None,
)
.await
.context("open-index-for-split")?;
Expand Down
28 changes: 14 additions & 14 deletions quickwit/quickwit-search/src/leaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@ use std::sync::{Arc, Mutex, RwLock};
use std::time::{Duration, Instant};

use anyhow::Context;
use bytesize::ByteSize;
use futures::future::try_join_all;
use quickwit_common::pretty::PrettySample;
use quickwit_directories::{CachingDirectory, DirectoryCache, HotDirectory, StorageDirectory};
use quickwit_directories::{CachingDirectory, HotDirectory, StorageDirectory};
use quickwit_doc_mapper::{DocMapper, TermRange, WarmupInfo};
use quickwit_proto::search::{
CountHits, LeafSearchRequest, LeafSearchResponse, PartialHit, ResourceStats, SearchRequest,
Expand All @@ -53,7 +54,6 @@ use crate::metrics::SEARCH_METRICS;
use crate::root::is_metadata_count_request_with_ast;
use crate::search_permit_provider::SearchPermit;
use crate::service::{deserialize_doc_mapper, SearcherContext};
use crate::tracked_cache::TrackedByteRangeCache;
use crate::{QuickwitAggregations, SearchError};

#[instrument(skip_all)]
Expand Down Expand Up @@ -131,16 +131,13 @@ pub(crate) async fn open_split_bundle(
/// - A fast fields cache given by `SearcherContext.storage_long_term_cache`.
/// - An ephemeral unbounded cache directory whose lifetime is tied to the
/// returned `Index`.
///
/// TODO: generic T should be forced to SearcherPermit, but this requires the
/// search stream to also request permits.
#[instrument(skip_all, fields(split_footer_start=split_and_footer_offsets.split_footer_start, split_footer_end=split_and_footer_offsets.split_footer_end))]
pub(crate) async fn open_index_with_caches<C: DirectoryCache>(
pub(crate) async fn open_index_with_caches(
searcher_context: &SearcherContext,
index_storage: Arc<dyn Storage>,
split_and_footer_offsets: &SplitIdAndFooterOffsets,
tokenizer_manager: Option<&TokenizerManager>,
ephemeral_unbounded_cache: Option<C>,
ephemeral_unbounded_cache: Option<ByteRangeCache>,
) -> anyhow::Result<Index> {
// Let's add a storage proxy to retry `get_slice` requests if they are taking too long,
// if configured in the searcher config.
Expand Down Expand Up @@ -383,7 +380,7 @@ async fn leaf_search_single_split(
doc_mapper: Arc<DocMapper>,
split_filter: Arc<RwLock<CanSplitDoBetter>>,
aggregations_limits: AggregationLimitsGuard,
search_permit: SearchPermit,
search_permit: &mut SearchPermit,
) -> crate::Result<LeafSearchResponse> {
rewrite_request(
&mut search_request,
Expand Down Expand Up @@ -411,13 +408,12 @@ async fn leaf_search_single_split(
let split_id = split.split_id.to_string();
let byte_range_cache =
ByteRangeCache::with_infinite_capacity(&quickwit_storage::STORAGE_METRICS.shortlived_cache);
let tracked_cache = TrackedByteRangeCache::new(byte_range_cache, search_permit);
let index = open_index_with_caches(
searcher_context,
storage,
&split,
Some(doc_mapper.tokenizer_manager()),
Some(tracked_cache.clone()),
Some(byte_range_cache.clone()),
)
.await?;
let split_schema = index.schema();
Expand All @@ -441,8 +437,9 @@ async fn leaf_search_single_split(
warmup(&searcher, &warmup_info).await?;
let warmup_end = Instant::now();
let warmup_duration: Duration = warmup_end.duration_since(warmup_start);
tracked_cache.warmup_completed();
let short_lived_cache_num_bytes = tracked_cache.get_num_bytes();
let warmup_size = byte_range_cache.get_num_bytes();
search_permit.warmup_completed(ByteSize(warmup_size));
let short_lived_cache_num_bytes = byte_range_cache.get_num_bytes();
let split_num_docs = split.num_docs;

let span = info_span!("tantivy_search");
Expand Down Expand Up @@ -1386,7 +1383,7 @@ async fn leaf_search_single_split_wrapper(
split: SplitIdAndFooterOffsets,
split_filter: Arc<RwLock<CanSplitDoBetter>>,
incremental_merge_collector: Arc<Mutex<IncrementalCollector>>,
search_permit: SearchPermit,
mut search_permit: SearchPermit,
aggregations_limits: AggregationLimitsGuard,
) {
crate::SEARCH_METRICS.leaf_searches_splits_total.inc();
Expand All @@ -1401,10 +1398,13 @@ async fn leaf_search_single_split_wrapper(
doc_mapper,
split_filter.clone(),
aggregations_limits,
search_permit,
&mut search_permit,
)
.await;

// We explicitly drop it, to highlight it to the reader
std::mem::drop(search_permit);

if leaf_search_single_split_res.is_ok() {
timer.observe_duration();
}
Expand Down
1 change: 0 additions & 1 deletion quickwit/quickwit-search/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ mod search_response_rest;
mod search_stream;
mod service;
pub(crate) mod top_k_collector;
mod tracked_cache;

mod metrics;
mod search_permit_provider;
Expand Down
7 changes: 2 additions & 5 deletions quickwit/quickwit-search/src/list_terms.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ use tracing::{debug, error, info, instrument};
use crate::leaf::open_index_with_caches;
use crate::search_job_placer::group_jobs_by_index_id;
use crate::search_permit_provider::SearchPermit;
use crate::tracked_cache::TrackedByteRangeCache;
use crate::{resolve_index_patterns, ClusterClient, SearchError, SearchJob, SearcherContext};

/// Performs a distributed list terms.
Expand Down Expand Up @@ -217,14 +216,12 @@ async fn leaf_list_terms_single_split(
search_request: &ListTermsRequest,
storage: Arc<dyn Storage>,
split: SplitIdAndFooterOffsets,
search_permit: SearchPermit,
_search_permit: SearchPermit,
) -> crate::Result<LeafListTermsResponse> {
let cache =
ByteRangeCache::with_infinite_capacity(&quickwit_storage::STORAGE_METRICS.shortlived_cache);
let tracked_cache = TrackedByteRangeCache::new(cache, search_permit);
let index =
open_index_with_caches(searcher_context, storage, &split, None, Some(tracked_cache))
.await?;
open_index_with_caches(searcher_context, storage, &split, None, Some(cache)).await?;
let split_schema = index.schema();
let reader = index
.reader_builder()
Expand Down
4 changes: 2 additions & 2 deletions quickwit/quickwit-search/src/search_stream/leaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ async fn leaf_search_stream_single_split(
mut stream_request: SearchStreamRequest,
storage: Arc<dyn Storage>,
) -> crate::Result<LeafSearchStreamResponse> {
// TODO: Should we track the memory here using the SearchPermitProvider?
let _leaf_split_stream_permit = searcher_context
.split_stream_semaphore
.acquire()
Expand All @@ -129,13 +130,12 @@ async fn leaf_search_stream_single_split(

let cache =
ByteRangeCache::with_infinite_capacity(&quickwit_storage::STORAGE_METRICS.shortlived_cache);
// TODO should create a SearchPermit and wrap ByteRangeCache with TrackedByteRangeCache here?
let index = open_index_with_caches(
&searcher_context,
storage,
&split,
Some(doc_mapper.tokenizer_manager()),
Some(Arc::new(cache)),
Some(cache),
)
.await?;
let split_schema = index.schema();
Expand Down
78 changes: 0 additions & 78 deletions quickwit/quickwit-search/src/tracked_cache.rs

This file was deleted.

Loading

0 comments on commit c34ce22

Please sign in to comment.