Skip to content

Commit

Permalink
CR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
fulmicoton committed Oct 23, 2024
1 parent 61483b9 commit e028746
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 70 deletions.
10 changes: 5 additions & 5 deletions quickwit/quickwit-search/src/leaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ use tracing::*;
use crate::collector::{make_collector_for_split, make_merge_collector, IncrementalCollector};
use crate::metrics::SEARCH_METRICS;
use crate::root::is_metadata_count_request_with_ast;
use crate::search_permits::SearchPermit;
use crate::search_permit_provider::SearchPermit;
use crate::service::{deserialize_doc_mapper, SearcherContext};
use crate::{QuickwitAggregations, SearchError};

Expand Down Expand Up @@ -1262,8 +1262,8 @@ pub async fn leaf_search(
// We acquire all of the leaf search permits to make sure our single split search tasks
// do no interleave with other leaf search requests.
let permit_futures = searcher_context
.leaf_search_split_semaphore
.get_permits_futures(split_with_req.len());
.search_permit_provider
.get_permits(split_with_req.len());

for ((split, mut request), permit_fut) in
split_with_req.into_iter().zip(permit_futures.into_iter())
Expand Down Expand Up @@ -1361,7 +1361,7 @@ async fn leaf_search_single_split_wrapper(
split: SplitIdAndFooterOffsets,
split_filter: Arc<RwLock<CanSplitDoBetter>>,
incremental_merge_collector: Arc<Mutex<IncrementalCollector>>,
leaf_split_search_permit: SearchPermit,
search_permit: SearchPermit,
aggregations_limits: AggregationLimitsGuard,
) {
crate::SEARCH_METRICS.leaf_searches_splits_total.inc();
Expand All @@ -1380,7 +1380,7 @@ async fn leaf_search_single_split_wrapper(
.await;

// We explicitly drop it, to highlight it to the reader
std::mem::drop(leaf_split_search_permit);
std::mem::drop(search_permit);

if leaf_search_single_split_res.is_ok() {
timer.observe_duration();
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-search/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ mod service;
pub(crate) mod top_k_collector;

mod metrics;
mod search_permits;
mod search_permit_provider;

#[cfg(test)]
mod tests;
Expand Down
4 changes: 2 additions & 2 deletions quickwit/quickwit-search/src/list_terms.rs
Original file line number Diff line number Diff line change
Expand Up @@ -332,8 +332,8 @@ pub async fn leaf_list_terms(
let searcher_context_clone = searcher_context.clone();
async move {
let _leaf_split_search_permit = searcher_context_clone
.leaf_search_split_semaphore
.get_one_permit()
.search_permit_provider
.get_permit()
.await
.expect("Failed to acquire permit. This should never happen! Please, report on https://github.com/quickwit-oss/quickwit/issues.");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,101 +18,107 @@
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::collections::VecDeque;
use std::sync::{Arc, Mutex, Weak};
use std::sync::{Arc, Mutex};

use quickwit_common::metrics::GaugeGuard;
use tokio::sync::oneshot;

/// `SearchPermits` is a distributor of permits to perform single split
/// `SearchPermitProvider` is a distributor of permits to perform single split
/// search operation.
///
/// Requests are served in order.
#[derive(Clone)]
pub struct SearchPermits {
inner: Arc<Mutex<InnerSearchPermits>>,
pub struct SearchPermitProvider {
inner_arc: Arc<Mutex<InnerSearchPermitProvider>>,
}

impl SearchPermits {
pub fn new(num_permits: usize) -> SearchPermits {
SearchPermits {
inner: Arc::new(Mutex::new(InnerSearchPermits {
impl SearchPermitProvider {
pub fn new(num_permits: usize) -> SearchPermitProvider {
SearchPermitProvider {
inner_arc: Arc::new(Mutex::new(InnerSearchPermitProvider {
num_permits_available: num_permits,
permits_requests: VecDeque::new(),
})),
}
}

/// Returns a list of future permits in the form of a Receiver channel.
pub fn get_one_permit(&self) -> oneshot::Receiver<SearchPermit> {
let mut permits_lock = self.inner.lock().unwrap();
permits_lock.get_permit(&self.inner)
/// Returns a future permit in the form of a oneshot Receiver channel.
///
/// At this point the permit is not acquired yet.
#[must_use]
pub fn get_permit(&self) -> oneshot::Receiver<SearchPermit> {
let mut permits_lock = self.inner_arc.lock().unwrap();
permits_lock.get_permit(&self.inner_arc)
}

/// Returns a list of future permits in the form of a Receiver channel.
/// Returns a list of future permits in the form of oneshot Receiver channels.
///
/// The permits returned are guaranteed to be resolved in order.
/// In addition, the permits are guaranteed to be resolved before permits returned by
/// subsequent calls to this function (or get_permit).
pub fn get_permits_futures(&self, num_permits: usize) -> Vec<oneshot::Receiver<SearchPermit>> {
let mut permits_lock = self.inner.lock().unwrap();
permits_lock.get_permits(num_permits, &self.inner)
/// subsequent calls to this function (or `get_permit`).
#[must_use]
pub fn get_permits(&self, num_permits: usize) -> Vec<oneshot::Receiver<SearchPermit>> {
let mut permits_lock = self.inner_arc.lock().unwrap();
permits_lock.get_permits(num_permits, &self.inner_arc)
}
}

struct InnerSearchPermits {
struct InnerSearchPermitProvider {
num_permits_available: usize,
permits_requests: VecDeque<oneshot::Sender<SearchPermit>>,
}

impl InnerSearchPermits {
impl InnerSearchPermitProvider {
fn get_permit(
&mut self,
inner_arc: &Arc<Mutex<InnerSearchPermitProvider>>,
) -> oneshot::Receiver<SearchPermit> {
let (tx, rx) = oneshot::channel();
self.permits_requests.push_back(tx);
self.assign_available_permits(inner_arc);
rx
}

fn get_permits(
&mut self,
num_permits: usize,
inner: &Arc<Mutex<InnerSearchPermits>>,
inner_arc: &Arc<Mutex<InnerSearchPermitProvider>>,
) -> Vec<oneshot::Receiver<SearchPermit>> {
let mut permits = Vec::with_capacity(num_permits);
for _ in 0..num_permits {
let (tx, rx) = oneshot::channel();
self.permits_requests.push_back(tx);
permits.push(rx);
}
self.assign_available_permits(inner);
self.assign_available_permits(inner_arc);
permits
}

fn get_permit(
&mut self,
inner: &Arc<Mutex<InnerSearchPermits>>,
) -> oneshot::Receiver<SearchPermit> {
let (tx, rx) = oneshot::channel();
self.permits_requests.push_back(tx);
self.assign_available_permits(inner);
rx
}

fn recycle_permit(&mut self, inner: &Arc<Mutex<InnerSearchPermits>>) {
fn recycle_permit(&mut self, inner_arc: &Arc<Mutex<InnerSearchPermitProvider>>) {
self.num_permits_available += 1;
self.assign_available_permits(inner);
self.assign_available_permits(inner_arc);
}

fn assign_available_permits(&mut self, inner: &Arc<Mutex<InnerSearchPermits>>) {
fn assign_available_permits(&mut self, inner_arc: &Arc<Mutex<InnerSearchPermitProvider>>) {
while self.num_permits_available > 0 {
let Some(sender) = self.permits_requests.pop_front() else {
break;
};
let mut ongoing_gauge_guard = GaugeGuard::from_gauge(
&crate::SEARCH_METRICS.leaf_search_single_split_tasks_ongoing,
);
ongoing_gauge_guard.add(1);
let send_res = sender.send(SearchPermit {
_ongoing_gauge_guard: GaugeGuard::from_gauge(
&crate::SEARCH_METRICS.leaf_search_single_split_tasks_ongoing,
),
inner_opt: Some(Arc::downgrade(inner)),
_ongoing_gauge_guard: ongoing_gauge_guard,
inner_arc: inner_arc.clone(),
recycle_on_drop: true,
});
match send_res {
Ok(()) => {
self.num_permits_available -= 1;
}
Err(mut search_permit) => {
search_permit.disable_drop();
drop(search_permit);
Err(search_permit) => {
search_permit.drop_without_recycling_permit();
}
}
}
Expand All @@ -124,38 +130,39 @@ impl InnerSearchPermits {

pub struct SearchPermit {
_ongoing_gauge_guard: GaugeGuard<'static>,
inner_opt: Option<Weak<Mutex<InnerSearchPermits>>>,
inner_arc: Arc<Mutex<InnerSearchPermitProvider>>,
recycle_on_drop: bool,
}

impl SearchPermit {
fn disable_drop(&mut self) {
self.inner_opt = None;
fn drop_without_recycling_permit(mut self) {
self.recycle_on_drop = false;
drop(self);
}
}

impl Drop for SearchPermit {
fn drop(&mut self) {
let Some(inner) = self.inner_opt.take() else {
return;
};
let Some(inner) = inner.upgrade() else {
if !self.recycle_on_drop {
return;
};
let mut inner_guard = inner.lock().unwrap();
inner_guard.recycle_permit(&inner);
}
let mut inner_guard = self.inner_arc.lock().unwrap();
inner_guard.recycle_permit(&self.inner_arc.clone());
}
}

#[cfg(test)]
mod tests {
use tokio::task::JoinSet;

use super::*;

#[tokio::test]
async fn test_search_permits_get_permits_future() {
// We test here that `get_permits_futures` does not interleave
let search_permits = super::SearchPermits::new(1);
let search_permits = SearchPermitProvider::new(1);
let mut all_futures = Vec::new();
let first_batch_of_permits = search_permits.get_permits_futures(10);
let first_batch_of_permits = search_permits.get_permits(10);
assert_eq!(first_batch_of_permits.len(), 10);
all_futures.extend(
first_batch_of_permits
Expand All @@ -164,7 +171,7 @@ mod tests {
.map(move |(i, fut)| ((1, i), fut)),
);

let second_batch_of_permits = search_permits.get_permits_futures(10);
let second_batch_of_permits = search_permits.get_permits(10);
assert_eq!(second_batch_of_permits.len(), 10);
all_futures.extend(
second_batch_of_permits
Expand Down Expand Up @@ -203,11 +210,11 @@ mod tests {
// Here we test that we don't have a problem if the Receiver is dropped.
// In particular, we want to check that there is not a race condition where drop attempts to
// lock the mutex.
let search_permits = super::SearchPermits::new(1);
let permit_rx = search_permits.get_one_permit();
let permit_rx2 = search_permits.get_one_permit();
let search_permits = SearchPermitProvider::new(1);
let permit_rx = search_permits.get_permit();
let permit_rx2 = search_permits.get_permit();
drop(permit_rx2);
drop(permit_rx);
let _permit_rx = search_permits.get_one_permit();
let _permit_rx = search_permits.get_permit();
}
}
8 changes: 4 additions & 4 deletions quickwit/quickwit-search/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ use crate::list_fields_cache::ListFieldsCache;
use crate::list_terms::{leaf_list_terms, root_list_terms};
use crate::root::fetch_docs_phase;
use crate::scroll_context::{MiniKV, ScrollContext, ScrollKeyAndStartOffset};
use crate::search_permits::SearchPermits;
use crate::search_permit_provider::SearchPermitProvider;
use crate::search_stream::{leaf_search_stream, root_search_stream};
use crate::{fetch_docs, root_search, search_plan, ClusterClient, SearchError};

Expand Down Expand Up @@ -450,7 +450,7 @@ pub struct SearcherContext {
/// Fast fields cache.
pub fast_fields_cache: Arc<dyn StorageCache>,
/// Counting semaphore to limit concurrent leaf search split requests.
pub leaf_search_split_semaphore: SearchPermits,
pub search_permit_provider: SearchPermitProvider,
/// Split footer cache.
pub split_footer_cache: MemorySizedCache<String>,
/// Counting semaphore to limit concurrent split stream requests.
Expand Down Expand Up @@ -489,7 +489,7 @@ impl SearcherContext {
&quickwit_storage::STORAGE_METRICS.split_footer_cache,
);
let leaf_search_split_semaphore =
SearchPermits::new(searcher_config.max_num_concurrent_split_searches);
SearchPermitProvider::new(searcher_config.max_num_concurrent_split_searches);
let split_stream_semaphore =
Semaphore::new(searcher_config.max_num_concurrent_split_streams);
let fast_field_cache_capacity = searcher_config.fast_field_cache_capacity.as_u64() as usize;
Expand All @@ -506,7 +506,7 @@ impl SearcherContext {
Self {
searcher_config,
fast_fields_cache: storage_long_term_cache,
leaf_search_split_semaphore,
search_permit_provider: leaf_search_split_semaphore,
split_footer_cache: global_split_footer_cache,
split_stream_semaphore,
leaf_search_cache,
Expand Down

0 comments on commit e028746

Please sign in to comment.