From e02874662afd9ecac2d5aab03e5c0d7dea9c8404 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Wed, 23 Oct 2024 09:51:01 +0900 Subject: [PATCH] CR comments --- quickwit/quickwit-search/src/leaf.rs | 10 +- quickwit/quickwit-search/src/lib.rs | 2 +- quickwit/quickwit-search/src/list_terms.rs | 4 +- ...h_permits.rs => search_permit_provider.rs} | 123 +++++++++--------- quickwit/quickwit-search/src/service.rs | 8 +- 5 files changed, 77 insertions(+), 70 deletions(-) rename quickwit/quickwit-search/src/{search_permits.rs => search_permit_provider.rs} (63%) diff --git a/quickwit/quickwit-search/src/leaf.rs b/quickwit/quickwit-search/src/leaf.rs index 4104d80376e..8c5c7095422 100644 --- a/quickwit/quickwit-search/src/leaf.rs +++ b/quickwit/quickwit-search/src/leaf.rs @@ -50,7 +50,7 @@ use tracing::*; use crate::collector::{make_collector_for_split, make_merge_collector, IncrementalCollector}; use crate::metrics::SEARCH_METRICS; use crate::root::is_metadata_count_request_with_ast; -use crate::search_permits::SearchPermit; +use crate::search_permit_provider::SearchPermit; use crate::service::{deserialize_doc_mapper, SearcherContext}; use crate::{QuickwitAggregations, SearchError}; @@ -1262,8 +1262,8 @@ pub async fn leaf_search( // We acquire all of the leaf search permits to make sure our single split search tasks // do no interleave with other leaf search requests. let permit_futures = searcher_context - .leaf_search_split_semaphore - .get_permits_futures(split_with_req.len()); + .search_permit_provider + .get_permits(split_with_req.len()); for ((split, mut request), permit_fut) in split_with_req.into_iter().zip(permit_futures.into_iter()) @@ -1361,7 +1361,7 @@ async fn leaf_search_single_split_wrapper( split: SplitIdAndFooterOffsets, split_filter: Arc>, incremental_merge_collector: Arc>, - leaf_split_search_permit: SearchPermit, + search_permit: SearchPermit, aggregations_limits: AggregationLimitsGuard, ) { crate::SEARCH_METRICS.leaf_searches_splits_total.inc(); @@ -1380,7 +1380,7 @@ async fn leaf_search_single_split_wrapper( .await; // We explicitly drop it, to highlight it to the reader - std::mem::drop(leaf_split_search_permit); + std::mem::drop(search_permit); if leaf_search_single_split_res.is_ok() { timer.observe_duration(); diff --git a/quickwit/quickwit-search/src/lib.rs b/quickwit/quickwit-search/src/lib.rs index dec493fdb7b..58c464b7463 100644 --- a/quickwit/quickwit-search/src/lib.rs +++ b/quickwit/quickwit-search/src/lib.rs @@ -44,7 +44,7 @@ mod service; pub(crate) mod top_k_collector; mod metrics; -mod search_permits; +mod search_permit_provider; #[cfg(test)] mod tests; diff --git a/quickwit/quickwit-search/src/list_terms.rs b/quickwit/quickwit-search/src/list_terms.rs index a9d0ddf204e..765203438d1 100644 --- a/quickwit/quickwit-search/src/list_terms.rs +++ b/quickwit/quickwit-search/src/list_terms.rs @@ -332,8 +332,8 @@ pub async fn leaf_list_terms( let searcher_context_clone = searcher_context.clone(); async move { let _leaf_split_search_permit = searcher_context_clone - .leaf_search_split_semaphore - .get_one_permit() + .search_permit_provider + .get_permit() .await .expect("Failed to acquire permit. This should never happen! Please, report on https://github.com/quickwit-oss/quickwit/issues."); diff --git a/quickwit/quickwit-search/src/search_permits.rs b/quickwit/quickwit-search/src/search_permit_provider.rs similarity index 63% rename from quickwit/quickwit-search/src/search_permits.rs rename to quickwit/quickwit-search/src/search_permit_provider.rs index 3405e2f0df2..f6883efb34b 100644 --- a/quickwit/quickwit-search/src/search_permits.rs +++ b/quickwit/quickwit-search/src/search_permit_provider.rs @@ -18,57 +18,71 @@ // along with this program. If not, see . use std::collections::VecDeque; -use std::sync::{Arc, Mutex, Weak}; +use std::sync::{Arc, Mutex}; use quickwit_common::metrics::GaugeGuard; use tokio::sync::oneshot; -/// `SearchPermits` is a distributor of permits to perform single split +/// `SearchPermitProvider` is a distributor of permits to perform single split /// search operation. /// /// Requests are served in order. #[derive(Clone)] -pub struct SearchPermits { - inner: Arc>, +pub struct SearchPermitProvider { + inner_arc: Arc>, } -impl SearchPermits { - pub fn new(num_permits: usize) -> SearchPermits { - SearchPermits { - inner: Arc::new(Mutex::new(InnerSearchPermits { +impl SearchPermitProvider { + pub fn new(num_permits: usize) -> SearchPermitProvider { + SearchPermitProvider { + inner_arc: Arc::new(Mutex::new(InnerSearchPermitProvider { num_permits_available: num_permits, permits_requests: VecDeque::new(), })), } } - /// Returns a list of future permits in the form of a Receiver channel. - pub fn get_one_permit(&self) -> oneshot::Receiver { - let mut permits_lock = self.inner.lock().unwrap(); - permits_lock.get_permit(&self.inner) + /// Returns a future permit in the form of a oneshot Receiver channel. + /// + /// At this point the permit is not acquired yet. + #[must_use] + pub fn get_permit(&self) -> oneshot::Receiver { + let mut permits_lock = self.inner_arc.lock().unwrap(); + permits_lock.get_permit(&self.inner_arc) } - /// Returns a list of future permits in the form of a Receiver channel. + /// Returns a list of future permits in the form of oneshot Receiver channels. /// /// The permits returned are guaranteed to be resolved in order. /// In addition, the permits are guaranteed to be resolved before permits returned by - /// subsequent calls to this function (or get_permit). - pub fn get_permits_futures(&self, num_permits: usize) -> Vec> { - let mut permits_lock = self.inner.lock().unwrap(); - permits_lock.get_permits(num_permits, &self.inner) + /// subsequent calls to this function (or `get_permit`). + #[must_use] + pub fn get_permits(&self, num_permits: usize) -> Vec> { + let mut permits_lock = self.inner_arc.lock().unwrap(); + permits_lock.get_permits(num_permits, &self.inner_arc) } } -struct InnerSearchPermits { +struct InnerSearchPermitProvider { num_permits_available: usize, permits_requests: VecDeque>, } -impl InnerSearchPermits { +impl InnerSearchPermitProvider { + fn get_permit( + &mut self, + inner_arc: &Arc>, + ) -> oneshot::Receiver { + let (tx, rx) = oneshot::channel(); + self.permits_requests.push_back(tx); + self.assign_available_permits(inner_arc); + rx + } + fn get_permits( &mut self, num_permits: usize, - inner: &Arc>, + inner_arc: &Arc>, ) -> Vec> { let mut permits = Vec::with_capacity(num_permits); for _ in 0..num_permits { @@ -76,43 +90,35 @@ impl InnerSearchPermits { self.permits_requests.push_back(tx); permits.push(rx); } - self.assign_available_permits(inner); + self.assign_available_permits(inner_arc); permits } - fn get_permit( - &mut self, - inner: &Arc>, - ) -> oneshot::Receiver { - let (tx, rx) = oneshot::channel(); - self.permits_requests.push_back(tx); - self.assign_available_permits(inner); - rx - } - - fn recycle_permit(&mut self, inner: &Arc>) { + fn recycle_permit(&mut self, inner_arc: &Arc>) { self.num_permits_available += 1; - self.assign_available_permits(inner); + self.assign_available_permits(inner_arc); } - fn assign_available_permits(&mut self, inner: &Arc>) { + fn assign_available_permits(&mut self, inner_arc: &Arc>) { while self.num_permits_available > 0 { let Some(sender) = self.permits_requests.pop_front() else { break; }; + let mut ongoing_gauge_guard = GaugeGuard::from_gauge( + &crate::SEARCH_METRICS.leaf_search_single_split_tasks_ongoing, + ); + ongoing_gauge_guard.add(1); let send_res = sender.send(SearchPermit { - _ongoing_gauge_guard: GaugeGuard::from_gauge( - &crate::SEARCH_METRICS.leaf_search_single_split_tasks_ongoing, - ), - inner_opt: Some(Arc::downgrade(inner)), + _ongoing_gauge_guard: ongoing_gauge_guard, + inner_arc: inner_arc.clone(), + recycle_on_drop: true, }); match send_res { Ok(()) => { self.num_permits_available -= 1; } - Err(mut search_permit) => { - search_permit.disable_drop(); - drop(search_permit); + Err(search_permit) => { + search_permit.drop_without_recycling_permit(); } } } @@ -124,25 +130,24 @@ impl InnerSearchPermits { pub struct SearchPermit { _ongoing_gauge_guard: GaugeGuard<'static>, - inner_opt: Option>>, + inner_arc: Arc>, + recycle_on_drop: bool, } impl SearchPermit { - fn disable_drop(&mut self) { - self.inner_opt = None; + fn drop_without_recycling_permit(mut self) { + self.recycle_on_drop = false; + drop(self); } } impl Drop for SearchPermit { fn drop(&mut self) { - let Some(inner) = self.inner_opt.take() else { - return; - }; - let Some(inner) = inner.upgrade() else { + if !self.recycle_on_drop { return; - }; - let mut inner_guard = inner.lock().unwrap(); - inner_guard.recycle_permit(&inner); + } + let mut inner_guard = self.inner_arc.lock().unwrap(); + inner_guard.recycle_permit(&self.inner_arc.clone()); } } @@ -150,12 +155,14 @@ impl Drop for SearchPermit { mod tests { use tokio::task::JoinSet; + use super::*; + #[tokio::test] async fn test_search_permits_get_permits_future() { // We test here that `get_permits_futures` does not interleave - let search_permits = super::SearchPermits::new(1); + let search_permits = SearchPermitProvider::new(1); let mut all_futures = Vec::new(); - let first_batch_of_permits = search_permits.get_permits_futures(10); + let first_batch_of_permits = search_permits.get_permits(10); assert_eq!(first_batch_of_permits.len(), 10); all_futures.extend( first_batch_of_permits @@ -164,7 +171,7 @@ mod tests { .map(move |(i, fut)| ((1, i), fut)), ); - let second_batch_of_permits = search_permits.get_permits_futures(10); + let second_batch_of_permits = search_permits.get_permits(10); assert_eq!(second_batch_of_permits.len(), 10); all_futures.extend( second_batch_of_permits @@ -203,11 +210,11 @@ mod tests { // Here we test that we don't have a problem if the Receiver is dropped. // In particular, we want to check that there is not a race condition where drop attempts to // lock the mutex. - let search_permits = super::SearchPermits::new(1); - let permit_rx = search_permits.get_one_permit(); - let permit_rx2 = search_permits.get_one_permit(); + let search_permits = SearchPermitProvider::new(1); + let permit_rx = search_permits.get_permit(); + let permit_rx2 = search_permits.get_permit(); drop(permit_rx2); drop(permit_rx); - let _permit_rx = search_permits.get_one_permit(); + let _permit_rx = search_permits.get_permit(); } } diff --git a/quickwit/quickwit-search/src/service.rs b/quickwit/quickwit-search/src/service.rs index b9bf596de21..7b288cefc3d 100644 --- a/quickwit/quickwit-search/src/service.rs +++ b/quickwit/quickwit-search/src/service.rs @@ -50,7 +50,7 @@ use crate::list_fields_cache::ListFieldsCache; use crate::list_terms::{leaf_list_terms, root_list_terms}; use crate::root::fetch_docs_phase; use crate::scroll_context::{MiniKV, ScrollContext, ScrollKeyAndStartOffset}; -use crate::search_permits::SearchPermits; +use crate::search_permit_provider::SearchPermitProvider; use crate::search_stream::{leaf_search_stream, root_search_stream}; use crate::{fetch_docs, root_search, search_plan, ClusterClient, SearchError}; @@ -450,7 +450,7 @@ pub struct SearcherContext { /// Fast fields cache. pub fast_fields_cache: Arc, /// Counting semaphore to limit concurrent leaf search split requests. - pub leaf_search_split_semaphore: SearchPermits, + pub search_permit_provider: SearchPermitProvider, /// Split footer cache. pub split_footer_cache: MemorySizedCache, /// Counting semaphore to limit concurrent split stream requests. @@ -489,7 +489,7 @@ impl SearcherContext { &quickwit_storage::STORAGE_METRICS.split_footer_cache, ); let leaf_search_split_semaphore = - SearchPermits::new(searcher_config.max_num_concurrent_split_searches); + SearchPermitProvider::new(searcher_config.max_num_concurrent_split_searches); let split_stream_semaphore = Semaphore::new(searcher_config.max_num_concurrent_split_streams); let fast_field_cache_capacity = searcher_config.fast_field_cache_capacity.as_u64() as usize; @@ -506,7 +506,7 @@ impl SearcherContext { Self { searcher_config, fast_fields_cache: storage_long_term_cache, - leaf_search_split_semaphore, + search_permit_provider: leaf_search_split_semaphore, split_footer_cache: global_split_footer_cache, split_stream_semaphore, leaf_search_cache,