diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index b26b4272d0a..e18bf82144f 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -6473,6 +6473,7 @@ dependencies = [ "quickwit-proto", "quickwit-query", "quickwit-storage", + "rand 0.8.5", "rayon", "serde", "serde_json", diff --git a/quickwit/quickwit-search/Cargo.toml b/quickwit/quickwit-search/Cargo.toml index fa815f47b56..f1c1f8dbdad 100644 --- a/quickwit/quickwit-search/Cargo.toml +++ b/quickwit/quickwit-search/Cargo.toml @@ -50,6 +50,7 @@ quickwit-storage = { workspace = true } [dev-dependencies] assert-json-diff = { workspace = true } proptest = { workspace = true } +rand = { workspace = true } serde_json = { workspace = true } typetag = { workspace = true } diff --git a/quickwit/quickwit-search/src/leaf.rs b/quickwit/quickwit-search/src/leaf.rs index 819015b6d26..4104d80376e 100644 --- a/quickwit/quickwit-search/src/leaf.rs +++ b/quickwit/quickwit-search/src/leaf.rs @@ -50,6 +50,7 @@ use tracing::*; use crate::collector::{make_collector_for_split, make_merge_collector, IncrementalCollector}; use crate::metrics::SEARCH_METRICS; use crate::root::is_metadata_count_request_with_ast; +use crate::search_permits::SearchPermit; use crate::service::{deserialize_doc_mapper, SearcherContext}; use crate::{QuickwitAggregations, SearchError}; @@ -1183,7 +1184,6 @@ async fn resolve_storage_and_leaf_search( aggregations_limits: AggregationLimitsGuard, ) -> crate::Result { let storage = storage_resolver.resolve(&index_uri).await?; - leaf_search( searcher_context.clone(), search_request.clone(), @@ -1259,10 +1259,16 @@ pub async fn leaf_search( let incremental_merge_collector = IncrementalCollector::new(merge_collector); let incremental_merge_collector = Arc::new(Mutex::new(incremental_merge_collector)); - for (split, mut request) in split_with_req { - let leaf_split_search_permit = searcher_context.leaf_search_split_semaphore - .clone() - .acquire_owned() + // We acquire all of the leaf search permits to make sure our single split search tasks + // do no interleave with other leaf search requests. + let permit_futures = searcher_context + .leaf_search_split_semaphore + .get_permits_futures(split_with_req.len()); + + for ((split, mut request), permit_fut) in + split_with_req.into_iter().zip(permit_futures.into_iter()) + { + let leaf_split_search_permit = permit_fut .instrument(info_span!("waiting_for_leaf_search_split_semaphore")) .await .expect("Failed to acquire permit. This should never happen! Please, report on https://github.com/quickwit-oss/quickwit/issues."); @@ -1355,7 +1361,7 @@ async fn leaf_search_single_split_wrapper( split: SplitIdAndFooterOffsets, split_filter: Arc>, incremental_merge_collector: Arc>, - leaf_split_search_permit: tokio::sync::OwnedSemaphorePermit, + leaf_split_search_permit: SearchPermit, aggregations_limits: AggregationLimitsGuard, ) { crate::SEARCH_METRICS.leaf_searches_splits_total.inc(); diff --git a/quickwit/quickwit-search/src/lib.rs b/quickwit/quickwit-search/src/lib.rs index a09545d92b8..dec493fdb7b 100644 --- a/quickwit/quickwit-search/src/lib.rs +++ b/quickwit/quickwit-search/src/lib.rs @@ -44,6 +44,7 @@ mod service; pub(crate) mod top_k_collector; mod metrics; +mod search_permits; #[cfg(test)] mod tests; diff --git a/quickwit/quickwit-search/src/list_terms.rs b/quickwit/quickwit-search/src/list_terms.rs index 0a781355162..a9d0ddf204e 100644 --- a/quickwit/quickwit-search/src/list_terms.rs +++ b/quickwit/quickwit-search/src/list_terms.rs @@ -331,10 +331,12 @@ pub async fn leaf_list_terms( let index_storage_clone = index_storage.clone(); let searcher_context_clone = searcher_context.clone(); async move { - let _leaf_split_search_permit = searcher_context_clone.leaf_search_split_semaphore.clone() - .acquire_owned() + let _leaf_split_search_permit = searcher_context_clone + .leaf_search_split_semaphore + .get_one_permit() .await .expect("Failed to acquire permit. This should never happen! Please, report on https://github.com/quickwit-oss/quickwit/issues."); + // TODO dedicated counter and timer? crate::SEARCH_METRICS.leaf_searches_splits_total.inc(); let timer = crate::SEARCH_METRICS diff --git a/quickwit/quickwit-search/src/metrics.rs b/quickwit/quickwit-search/src/metrics.rs index ccdcc6a519c..1cb202a4d6f 100644 --- a/quickwit/quickwit-search/src/metrics.rs +++ b/quickwit/quickwit-search/src/metrics.rs @@ -21,8 +21,8 @@ use once_cell::sync::Lazy; use quickwit_common::metrics::{ - exponential_buckets, linear_buckets, new_counter, new_counter_vec, new_histogram, - new_histogram_vec, Histogram, HistogramVec, IntCounter, IntCounterVec, + exponential_buckets, linear_buckets, new_counter, new_counter_vec, new_gauge_vec, + new_histogram, new_histogram_vec, Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, }; pub struct SearchMetrics { @@ -35,6 +35,8 @@ pub struct SearchMetrics { pub leaf_searches_splits_total: IntCounter, pub leaf_search_split_duration_secs: Histogram, pub job_assigned_total: IntCounterVec<1>, + pub leaf_search_single_split_tasks_pending: IntGauge, + pub leaf_search_single_split_tasks_active: IntGauge, } impl Default for SearchMetrics { @@ -50,6 +52,14 @@ impl Default for SearchMetrics { .copied() .collect(); + let leaf_search_single_split_tasks = new_gauge_vec::<1>( + "leaf_search_single_split_tasks", + "Number of leaf single split search awaiting or running", + "search", + &[], + ["status"], // takes values "active" or "pending" + ); + SearchMetrics { root_search_requests_total: new_counter_vec( "root_search_requests_total", @@ -110,6 +120,10 @@ impl Default for SearchMetrics { "search", exponential_buckets(0.001, 2.0, 15).unwrap(), ), + leaf_search_single_split_tasks_active: leaf_search_single_split_tasks + .with_label_values(["active"]), + leaf_search_single_split_tasks_pending: leaf_search_single_split_tasks + .with_label_values(["pending"]), job_assigned_total: new_counter_vec( "job_assigned_total", "Number of job assigned to searchers, per affinity rank.", diff --git a/quickwit/quickwit-search/src/search_permits.rs b/quickwit/quickwit-search/src/search_permits.rs new file mode 100644 index 00000000000..02bb24eca8a --- /dev/null +++ b/quickwit/quickwit-search/src/search_permits.rs @@ -0,0 +1,218 @@ +// Copyright (C) 2024 Quickwit, Inc. +// +// Quickwit is offered under the AGPL v3.0 and as commercial software. +// For commercial licensing, contact us at hello@quickwit.io. +// +// AGPL: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use std::collections::VecDeque; +use std::sync::{Arc, Mutex, Weak}; + +use quickwit_common::metrics::GaugeGuard; +use tokio::sync::oneshot; + +/// Search permits serve as a small wrapper above a semaphore to enhance it: +/// - it provides a way to really make sure that the permits from two +/// different search request are never interleaved. Before it was a +/// race condition. +/// - it keeps a gauge of the number of tasks waiting for a permit, +/// or running. +#[derive(Clone)] +pub struct SearchPermits { + inner: Arc>, +} + +struct InnerSearchPermits { + num_permits_available: usize, + permits_requests: VecDeque>, +} + +impl InnerSearchPermits { + fn get_permits( + &mut self, + num_permits: usize, + inner: &Arc>, + ) -> Vec> { + let mut permits = Vec::with_capacity(num_permits); + for _ in 0..num_permits { + let (tx, rx) = oneshot::channel(); + self.permits_requests.push_back(tx); + permits.push(rx); + } + self.assign_available_permits(inner); + permits + } + + fn get_permit( + &mut self, + inner: &Arc>, + ) -> oneshot::Receiver { + let (tx, rx) = oneshot::channel(); + self.permits_requests.push_back(tx); + self.assign_available_permits(inner); + rx + } + + fn recycle_permit(&mut self, inner: &Arc>) { + self.num_permits_available += 1; + self.assign_available_permits(inner); + } + + fn assign_available_permits(&mut self, inner: &Arc>) { + while self.num_permits_available > 0 { + let Some(sender) = self.permits_requests.pop_front() else { + break; + }; + let send_res = sender.send(SearchPermit { + _active_gauge_guard: GaugeGuard::from_gauge( + &crate::SEARCH_METRICS.leaf_search_single_split_tasks_active, + ), + inner_opt: Some(Arc::downgrade(inner)), + }); + match send_res { + Ok(()) => { + self.num_permits_available -= 1; + } + Err(mut search_permit) => { + search_permit.disable_drop(); + // The receiver was dropped. + // We need to recycle the permit. + self.recycle_permit(inner); + } + } + } + crate::SEARCH_METRICS + .leaf_search_single_split_tasks_pending + .set(self.permits_requests.len() as i64); + } +} + +pub struct SearchPermit { + _active_gauge_guard: GaugeGuard<'static>, + inner_opt: Option>>, +} + +impl SearchPermit { + fn disable_drop(&mut self) { + self.inner_opt = None; + } +} + +impl Drop for SearchPermit { + fn drop(&mut self) { + let Some(inner) = self.inner_opt.take() else { + return; + }; + let Some(inner) = inner.upgrade() else { + return; + }; + let mut inner_guard = inner.lock().unwrap(); + inner_guard.recycle_permit(&inner); + } +} + +impl SearchPermits { + pub fn new(num_permits: usize) -> SearchPermits { + SearchPermits { + inner: Arc::new(Mutex::new(InnerSearchPermits { + num_permits_available: num_permits, + permits_requests: VecDeque::new(), + })), + } + } + + /// Returns a list of future permits. + /// + /// Each permit gets its own future, and there order of resolution is the same as in the Vec. + pub fn get_one_permit(&self) -> oneshot::Receiver { + let mut permits_lock = self.inner.lock().unwrap(); + permits_lock.get_permit(&self.inner) + } + + /// Returns a list of future permits. + /// + /// Each permit gets its own future, and there order of resolution is the same as in the Vec. + pub fn get_permits_futures( + &self, + num_permits: usize, + ) -> Vec> { + let mut permits_lock = self.inner.lock().unwrap(); + permits_lock.get_permits(num_permits, &self.inner) + } +} + +#[cfg(test)] +mod tests { + use tokio::task::JoinSet; + + #[tokio::test] + async fn test_search_permits_get_permits_future() { + // We test here that `get_permits_futures` does not interleave + let search_permits = super::SearchPermits::new(1); + let mut all_futures = Vec::new(); + let first_batch_of_permits = search_permits.get_permits_futures(10); + assert_eq!(first_batch_of_permits.len(), 10); + all_futures.extend( + first_batch_of_permits + .into_iter() + .enumerate() + .map(move |(i, fut)| ((1, i), fut)), + ); + + let second_batch_of_permits = search_permits.get_permits_futures(10); + assert_eq!(second_batch_of_permits.len(), 10); + all_futures.extend( + second_batch_of_permits + .into_iter() + .enumerate() + .map(move |(i, fut)| ((2, i), fut)), + ); + + use rand::seq::SliceRandom; + // not super useful, considering what join set does, but still a tiny bit more sound. + all_futures.shuffle(&mut rand::thread_rng()); + + let mut join_set = JoinSet::new(); + for (res, fut) in all_futures { + join_set.spawn(async move { + let permit = fut.await; + (res, permit) + }); + } + let mut ordered_result: Vec<(usize, usize)> = Vec::with_capacity(20); + while let Some(Ok(((batch_id, order), _permit))) = join_set.join_next().await { + ordered_result.push((batch_id, order)); + } + + assert_eq!(ordered_result.len(), 20); + for (i, res) in ordered_result[0..10].iter().enumerate() { + assert_eq!(res, &(1, i)); + } + for (i, res) in ordered_result[10..20].iter().enumerate() { + assert_eq!(res, &(2, i)); + } + } + + #[tokio::test] + async fn test_search_permits_receiver_race_condition() { + // Here we test that we don't have a problem if the Receiver is dropped. + // In particular, we want to check that there is not a race condition where drop attempts to lock the mutex. + let search_permits = super::SearchPermits::new(1); + let permit_rx = search_permits.get_one_permit(); + let permit_rx2 = search_permits.get_one_permit(); + drop(permit_rx2); + drop(permit_rx); + } +} diff --git a/quickwit/quickwit-search/src/service.rs b/quickwit/quickwit-search/src/service.rs index 65516a99e76..b9bf596de21 100644 --- a/quickwit/quickwit-search/src/service.rs +++ b/quickwit/quickwit-search/src/service.rs @@ -50,6 +50,7 @@ use crate::list_fields_cache::ListFieldsCache; use crate::list_terms::{leaf_list_terms, root_list_terms}; use crate::root::fetch_docs_phase; use crate::scroll_context::{MiniKV, ScrollContext, ScrollKeyAndStartOffset}; +use crate::search_permits::SearchPermits; use crate::search_stream::{leaf_search_stream, root_search_stream}; use crate::{fetch_docs, root_search, search_plan, ClusterClient, SearchError}; @@ -449,7 +450,7 @@ pub struct SearcherContext { /// Fast fields cache. pub fast_fields_cache: Arc, /// Counting semaphore to limit concurrent leaf search split requests. - pub leaf_search_split_semaphore: Arc, + pub leaf_search_split_semaphore: SearchPermits, /// Split footer cache. pub split_footer_cache: MemorySizedCache, /// Counting semaphore to limit concurrent split stream requests. @@ -468,10 +469,6 @@ impl std::fmt::Debug for SearcherContext { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { f.debug_struct("SearcherContext") .field("searcher_config", &self.searcher_config) - .field( - "leaf_search_split_semaphore", - &self.leaf_search_split_semaphore, - ) .field("split_stream_semaphore", &self.split_stream_semaphore) .finish() } @@ -491,9 +488,8 @@ impl SearcherContext { capacity_in_bytes, &quickwit_storage::STORAGE_METRICS.split_footer_cache, ); - let leaf_search_split_semaphore = Arc::new(Semaphore::new( - searcher_config.max_num_concurrent_split_searches, - )); + let leaf_search_split_semaphore = + SearchPermits::new(searcher_config.max_num_concurrent_split_searches); let split_stream_semaphore = Semaphore::new(searcher_config.max_num_concurrent_split_streams); let fast_field_cache_capacity = searcher_config.fast_field_cache_capacity.as_u64() as usize;